[GitHub] [incubator-pinot] mcvsubbu commented on a change in pull request #3877: Pinot controller side change to enhance LLC segment metadata upload.

2019-03-13 Thread GitBox
mcvsubbu commented on a change in pull request #3877: Pinot controller side 
change to enhance LLC segment metadata upload.
URL: https://github.com/apache/incubator-pinot/pull/3877#discussion_r265169532
 
 

 ##
 File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/LLCSegmentCompletionHandlers.java
 ##
 @@ -360,4 +539,50 @@ private String uploadSegment(FormDataMultiPart multiPart, 
String instanceId, Str
   multiPart.cleanup();
 }
   }
+
+  private void localSegmentFileToPinotFSFinalLocation(FileUploadPathProvider 
provider, File localTmpFile,
+  String segmentName, String instanceId)
+  throws Exception {
+final String rawTableName = new LLCSegmentName(segmentName).getTableName();
+URI segmentFileURI = ControllerConf
+.getUriFromPath(StringUtil.join("/", 
provider.getBaseDataDirURI().toString(), rawTableName, segmentName));
+PinotFS pinotFS = 
PinotFSFactory.create(provider.getBaseDataDirURI().getScheme());
+// Multiple threads can reach this point at the same time, if the 
following scenario happens
+// The server that was asked to commit did so very slowly (due to network 
speeds). Meanwhile the FSM in
+// SegmentCompletionManager timed out, and allowed another server to 
commit, which did so very quickly (somehow
+// the network speeds changed). The second server made it through the FSM 
and reached this point.
+// The synchronization below takes care that exactly one file gets moved 
in place.
+// There are still corner conditions that are not handled correctly. For 
example,
+// 1. What if the offset of the faster server was different?
+// 2. We know that only the faster server will get to complete the COMMIT 
call successfully. But it is possible
+//that the race to this statement is won by the slower server, and so 
the real segment that is in there is that
+//of the slower server.
+// In order to overcome controller restarts after the segment is renamed, 
but before it is committed, we DO need to
+// check for existing segment file and remove it. So, the block cannot be 
removed altogether.
+// For now, we live with these corner cases. Once we have split-commit 
enabled and working, this code will no longer
+// be used.
+synchronized (SegmentCompletionManager.getInstance()) {
 
 Review comment:
   Better yet, let us just get rid of this method and move the whole code to 
the caller. That way, the comment and synchronized block reside where they can 
be most associated with


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org



[GitHub] [incubator-pinot] mcvsubbu commented on a change in pull request #3877: Pinot controller side change to enhance LLC segment metadata upload.

2019-03-12 Thread GitBox
mcvsubbu commented on a change in pull request #3877: Pinot controller side 
change to enhance LLC segment metadata upload.
URL: https://github.com/apache/incubator-pinot/pull/3877#discussion_r264936440
 
 

 ##
 File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/LLCSegmentCompletionHandlers.java
 ##
 @@ -360,4 +539,50 @@ private String uploadSegment(FormDataMultiPart multiPart, 
String instanceId, Str
   multiPart.cleanup();
 }
   }
+
+  private void localSegmentFileToPinotFSFinalLocation(FileUploadPathProvider 
provider, File localTmpFile,
+  String segmentName, String instanceId)
+  throws Exception {
+final String rawTableName = new LLCSegmentName(segmentName).getTableName();
+URI segmentFileURI = ControllerConf
+.getUriFromPath(StringUtil.join("/", 
provider.getBaseDataDirURI().toString(), rawTableName, segmentName));
+PinotFS pinotFS = 
PinotFSFactory.create(provider.getBaseDataDirURI().getScheme());
+// Multiple threads can reach this point at the same time, if the 
following scenario happens
+// The server that was asked to commit did so very slowly (due to network 
speeds). Meanwhile the FSM in
+// SegmentCompletionManager timed out, and allowed another server to 
commit, which did so very quickly (somehow
+// the network speeds changed). The second server made it through the FSM 
and reached this point.
+// The synchronization below takes care that exactly one file gets moved 
in place.
+// There are still corner conditions that are not handled correctly. For 
example,
+// 1. What if the offset of the faster server was different?
+// 2. We know that only the faster server will get to complete the COMMIT 
call successfully. But it is possible
+//that the race to this statement is won by the slower server, and so 
the real segment that is in there is that
+//of the slower server.
+// In order to overcome controller restarts after the segment is renamed, 
but before it is committed, we DO need to
 
 Review comment:
   s/renamed/moved to pinotFS/


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org



[GitHub] [incubator-pinot] mcvsubbu commented on a change in pull request #3877: Pinot controller side change to enhance LLC segment metadata upload.

2019-03-12 Thread GitBox
mcvsubbu commented on a change in pull request #3877: Pinot controller side 
change to enhance LLC segment metadata upload.
URL: https://github.com/apache/incubator-pinot/pull/3877#discussion_r264935456
 
 

 ##
 File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/LLCSegmentCompletionHandlers.java
 ##
 @@ -260,25 +313,203 @@ public String 
segmentUpload(@QueryParam(SegmentCompletionProtocol.PARAM_INSTANCE
 
requestParams.withInstanceId(instanceId).withSegmentName(segmentName).withOffset(offset);
 LOGGER.info("Processing segmentUpload:{}", requestParams.toString());
 
-final String segmentLocation = uploadSegment(multiPart, instanceId, 
segmentName, true);
-if (segmentLocation == null) {
+// Get the segment from the form input and put it in the right place.
+File localTmpFile = uploadFileToLocalTmpFile(multiPart, instanceId, 
segmentName);
+if (localTmpFile == null) {
+  LOGGER.error("Unable to get the segment file from multipart input to 
local file {}", segmentName);
   return SegmentCompletionProtocol.RESP_FAILED.toJsonString();
 }
-SegmentCompletionProtocol.Response.Params responseParams =
-new 
SegmentCompletionProtocol.Response.Params().withOffset(requestParams.getOffset())
-.withSegmentLocation(segmentLocation)
-
.withStatus(SegmentCompletionProtocol.ControllerResponseStatus.UPLOAD_SUCCESS);
+try {
+  FileUploadPathProvider provider = new 
FileUploadPathProvider(_controllerConf);
+  URI uri = localSegementFileToPinotFsTmpLocation(provider, localTmpFile, 
segmentName);
+  if (uri == null) {
+LOGGER.error("Unable to upload local segment file {} to Pinot storage 
for segment ", localTmpFile.toPath(),
+segmentName);
+return SegmentCompletionProtocol.RESP_FAILED.toJsonString();
+  }
+  SegmentCompletionProtocol.Response.Params responseParams =
+  new 
SegmentCompletionProtocol.Response.Params().withOffset(requestParams.getOffset())
+  .withSegmentLocation(uri.toString())
+  
.withStatus(SegmentCompletionProtocol.ControllerResponseStatus.UPLOAD_SUCCESS);
+
+  String response = new 
SegmentCompletionProtocol.Response(responseParams).toJsonString();
 
-String response = new 
SegmentCompletionProtocol.Response(responseParams).toJsonString();
+  LOGGER.info("Response to segmentUpload:{}", response);
 
-LOGGER.info("Response to segmentUpload:{}", response);
+  return response;
+} catch (Exception e) {
 
-return response;
+  return SegmentCompletionProtocol.RESP_FAILED.toJsonString();
+} finally {
+  FileUtils.deleteQuietly(localTmpFile);
+}
   }
 
-  @Nullable
-  private String uploadSegment(FormDataMultiPart multiPart, String instanceId, 
String segmentName,
-  boolean isSplitCommit) {
+  @POST
+  @Path(SegmentCompletionProtocol.MSG_TYPE_COMMIT_END_METADATA)
+  @Produces(MediaType.APPLICATION_JSON)
+  @Consumes(MediaType.MULTIPART_FORM_DATA)
+  public String 
segmentCommitEndWithMetadata(@QueryParam(SegmentCompletionProtocol.PARAM_INSTANCE_ID)
 String instanceId,
+  @QueryParam(SegmentCompletionProtocol.PARAM_SEGMENT_NAME) String 
segmentName,
+  @QueryParam(SegmentCompletionProtocol.PARAM_SEGMENT_LOCATION) String 
segmentLocation,
+  @QueryParam(SegmentCompletionProtocol.PARAM_OFFSET) long offset,
+  @QueryParam(SegmentCompletionProtocol.PARAM_MEMORY_USED_BYTES) long 
memoryUsedBytes,
+  @QueryParam(SegmentCompletionProtocol.PARAM_BUILD_TIME_MILLIS) long 
buildTimeMillis,
+  @QueryParam(SegmentCompletionProtocol.PARAM_WAIT_TIME_MILLIS) long 
waitTimeMillis,
+  @QueryParam(SegmentCompletionProtocol.PARAM_ROW_COUNT) int numRows,
+  @QueryParam(SegmentCompletionProtocol.PARAM_SEGMENT_SIZE_BYTES) long 
segmentSizeBytes,
+  FormDataMultiPart metadataFiles) {
+if (instanceId == null || segmentName == null || offset == -1 || 
segmentLocation == null || metadataFiles == null) {
+  LOGGER.error("Invalid call: offset={}, segmentName={}, instanceId={}, 
segmentLocation={}", offset, segmentName,
+  instanceId, segmentLocation);
+  // TODO: memoryUsedInBytes = 0 if not present in params. Add validation 
when we start using it
+  return SegmentCompletionProtocol.RESP_FAILED.toJsonString();
+}
+
+SegmentCompletionProtocol.Request.Params requestParams = new 
SegmentCompletionProtocol.Request.Params();
+
requestParams.withInstanceId(instanceId).withSegmentName(segmentName).withOffset(offset)
+
.withSegmentLocation(segmentLocation).withSegmentSizeBytes(segmentSizeBytes)
+
.withBuildTimeMillis(buildTimeMillis).withWaitTimeMillis(waitTimeMillis).withNumRows(numRows)
+.withMemoryUsedBytes(memoryUsedBytes);
+LOGGER.info("Processing segmentCommitEnd:{}", requestParams.toString());
+
+final boolean isSuccess = true;
+final boolean isSplitCommit = true;
+

[GitHub] [incubator-pinot] mcvsubbu commented on a change in pull request #3877: Pinot controller side change to enhance LLC segment metadata upload.

2019-03-12 Thread GitBox
mcvsubbu commented on a change in pull request #3877: Pinot controller side 
change to enhance LLC segment metadata upload.
URL: https://github.com/apache/incubator-pinot/pull/3877#discussion_r264934111
 
 

 ##
 File path: 
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterSplitCommitIntegrationTest.java
 ##
 @@ -20,7 +20,14 @@
 
 import org.apache.commons.configuration.Configuration;
 import org.apache.pinot.common.utils.CommonConstants;
+import org.apache.pinot.common.utils.StringUtil;
 
 Review comment:
   Looks like there are no change to this file. Can we revert this file to the 
original please?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org



[GitHub] [incubator-pinot] mcvsubbu commented on a change in pull request #3877: Pinot controller side change to enhance LLC segment metadata upload.

2019-03-12 Thread GitBox
mcvsubbu commented on a change in pull request #3877: Pinot controller side 
change to enhance LLC segment metadata upload.
URL: https://github.com/apache/incubator-pinot/pull/3877#discussion_r264934672
 
 

 ##
 File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/LLCSegmentCompletionHandlers.java
 ##
 @@ -234,13 +257,43 @@ public String 
segmentCommit(@QueryParam(SegmentCompletionProtocol.PARAM_INSTANCE
 
 final SegmentCompletionManager segmentCompletionManager = 
SegmentCompletionManager.getInstance();
 SegmentCompletionProtocol.Response response = 
segmentCompletionManager.segmentCommitStart(requestParams);
-if (response.equals(SegmentCompletionProtocol.RESP_COMMIT_CONTINUE)) {
-  // Get the segment and put it in the right place.
-  boolean success = uploadSegment(multiPart, instanceId, segmentName, 
false) != null;
 
-  response = segmentCompletionManager.segmentCommitEnd(requestParams, 
success, false);
+CommittingSegmentDescriptor committingSegmentDescriptor =
+
CommittingSegmentDescriptor.fromSegmentCompletionReqParams(requestParams);
+boolean success = false;
+
+if (response.equals(SegmentCompletionProtocol.RESP_COMMIT_CONTINUE)) {
+  File localTmpFile = null;
+  try {
+// Get the segment from the form input and put it in the right place.
 
 Review comment:
   Wrong comment. Get the segment and put it in a tmp area in the local file 
system


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org



[GitHub] [incubator-pinot] mcvsubbu commented on a change in pull request #3877: Pinot controller side change to enhance LLC segment metadata upload.

2019-03-12 Thread GitBox
mcvsubbu commented on a change in pull request #3877: Pinot controller side 
change to enhance LLC segment metadata upload.
URL: https://github.com/apache/incubator-pinot/pull/3877#discussion_r264936543
 
 

 ##
 File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/LLCSegmentCompletionHandlers.java
 ##
 @@ -360,4 +539,50 @@ private String uploadSegment(FormDataMultiPart multiPart, 
String instanceId, Str
   multiPart.cleanup();
 }
   }
+
+  private void localSegmentFileToPinotFSFinalLocation(FileUploadPathProvider 
provider, File localTmpFile,
+  String segmentName, String instanceId)
+  throws Exception {
+final String rawTableName = new LLCSegmentName(segmentName).getTableName();
+URI segmentFileURI = ControllerConf
+.getUriFromPath(StringUtil.join("/", 
provider.getBaseDataDirURI().toString(), rawTableName, segmentName));
+PinotFS pinotFS = 
PinotFSFactory.create(provider.getBaseDataDirURI().getScheme());
+// Multiple threads can reach this point at the same time, if the 
following scenario happens
+// The server that was asked to commit did so very slowly (due to network 
speeds). Meanwhile the FSM in
+// SegmentCompletionManager timed out, and allowed another server to 
commit, which did so very quickly (somehow
+// the network speeds changed). The second server made it through the FSM 
and reached this point.
+// The synchronization below takes care that exactly one file gets moved 
in place.
+// There are still corner conditions that are not handled correctly. For 
example,
+// 1. What if the offset of the faster server was different?
+// 2. We know that only the faster server will get to complete the COMMIT 
call successfully. But it is possible
+//that the race to this statement is won by the slower server, and so 
the real segment that is in there is that
+//of the slower server.
+// In order to overcome controller restarts after the segment is renamed, 
but before it is committed, we DO need to
+// check for existing segment file and remove it. So, the block cannot be 
removed altogether.
+// For now, we live with these corner cases. Once we have split-commit 
enabled and working, this code will no longer
+// be used.
+synchronized (SegmentCompletionManager.getInstance()) {
 
 Review comment:
   Can you move the synchronized block around the caller of this method, along 
with all of the comments above? It wil make better sense there than here. 
thanks.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org



[GitHub] [incubator-pinot] mcvsubbu commented on a change in pull request #3877: Pinot controller side change to enhance LLC segment metadata upload.

2019-03-12 Thread GitBox
mcvsubbu commented on a change in pull request #3877: Pinot controller side 
change to enhance LLC segment metadata upload.
URL: https://github.com/apache/incubator-pinot/pull/3877#discussion_r264895287
 
 

 ##
 File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/LLCSegmentCompletionHandlers.java
 ##
 @@ -254,31 +311,212 @@ public String 
segmentCommit(@QueryParam(SegmentCompletionProtocol.PARAM_INSTANCE
   @Produces(MediaType.APPLICATION_JSON)
   @Consumes(MediaType.MULTIPART_FORM_DATA)
   public String 
segmentUpload(@QueryParam(SegmentCompletionProtocol.PARAM_INSTANCE_ID) String 
instanceId,
-  @QueryParam(SegmentCompletionProtocol.PARAM_SEGMENT_NAME) String 
segmentName,
-  @QueryParam(SegmentCompletionProtocol.PARAM_OFFSET) long offset, 
FormDataMultiPart multiPart) {
+  
@QueryParam(SegmentCompletionProtocol.PARAM_SEGMENT_NAME) String segmentName,
+  
@QueryParam(SegmentCompletionProtocol.PARAM_OFFSET) long offset, 
FormDataMultiPart multiPart) {
 SegmentCompletionProtocol.Request.Params requestParams = new 
SegmentCompletionProtocol.Request.Params();
 
requestParams.withInstanceId(instanceId).withSegmentName(segmentName).withOffset(offset);
 LOGGER.info("Processing segmentUpload:{}", requestParams.toString());
 
-final String segmentLocation = uploadSegment(multiPart, instanceId, 
segmentName, true);
-if (segmentLocation == null) {
+// Get the segment from the form input and put it in the right place.
+File localTmpFile = uploadFileToLocalTmpFile(multiPart, instanceId, 
segmentName);
+if (localTmpFile == null) {
+  LOGGER.error("Unable to get the segment file from multipart input to 
local file {}", segmentName);
+  return SegmentCompletionProtocol.RESP_FAILED.toJsonString();
+}
+try {
+  FileUploadPathProvider provider = new 
FileUploadPathProvider(_controllerConf);
+  URI uri = localSegementFileToPinotFsTmpLocation(provider, localTmpFile, 
segmentName);
+  if (uri == null) {
+LOGGER.error("Unable to upload local segment file {} to Pinot 
storage", localTmpFile.toPath());
+return SegmentCompletionProtocol.RESP_FAILED.toJsonString();
+  }
+  SegmentCompletionProtocol.Response.Params responseParams =
+  new 
SegmentCompletionProtocol.Response.Params().withOffset(requestParams.getOffset())
+  .withSegmentLocation(uri.toString())
+  
.withStatus(SegmentCompletionProtocol.ControllerResponseStatus.UPLOAD_SUCCESS);
+
+  String response = new 
SegmentCompletionProtocol.Response(responseParams).toJsonString();
+
+  LOGGER.info("Response to segmentUpload:{}", response);
+
+  return response;
+} catch (Exception e) {
+
+  return SegmentCompletionProtocol.RESP_FAILED.toJsonString();
+} finally {
+  FileUtils.deleteQuietly(localTmpFile);
+}
+  }
+
+  @POST
+  @Path(SegmentCompletionProtocol.MSG_TYPE_COMMIT_END_METADATA)
+  @Produces(MediaType.APPLICATION_JSON)
+  @Consumes(MediaType.MULTIPART_FORM_DATA)
+  public String 
segmentCommitEndWithMetadata(@QueryParam(SegmentCompletionProtocol.PARAM_INSTANCE_ID)
 String instanceId,
+ 
@QueryParam(SegmentCompletionProtocol.PARAM_SEGMENT_NAME) String segmentName,
+ 
@QueryParam(SegmentCompletionProtocol.PARAM_SEGMENT_LOCATION) String 
segmentLocation,
+ 
@QueryParam(SegmentCompletionProtocol.PARAM_OFFSET) long offset,
+ 
@QueryParam(SegmentCompletionProtocol.PARAM_MEMORY_USED_BYTES) long 
memoryUsedBytes,
+ 
@QueryParam(SegmentCompletionProtocol.PARAM_BUILD_TIME_MILLIS) long 
buildTimeMillis,
+ 
@QueryParam(SegmentCompletionProtocol.PARAM_WAIT_TIME_MILLIS) long 
waitTimeMillis,
+ 
@QueryParam(SegmentCompletionProtocol.PARAM_ROW_COUNT) int numRows,
+ 
@QueryParam(SegmentCompletionProtocol.PARAM_SEGMENT_SIZE_BYTES) long 
segmentSizeBytes,
+ FormDataMultiPart metadataFiles) {
+if (instanceId == null || segmentName == null || offset == -1 || 
segmentLocation == null || metadataFiles == null) {
+  LOGGER.error("Invalid call: offset={}, segmentName={}, instanceId={}, 
segmentLocation={}", offset, segmentName,
+  instanceId, segmentLocation);
+  // TODO: memoryUsedInBytes = 0 if not present in params. Add validation 
when we start using it
+  return SegmentCompletionProtocol.RESP_FAILED.toJsonString();
+}
+
+SegmentCompletionProtocol.Request.Params requestParams = new 
SegmentCompletionProtocol.Request.Params();
+

[GitHub] [incubator-pinot] mcvsubbu commented on a change in pull request #3877: Pinot controller side change to enhance LLC segment metadata upload.

2019-03-12 Thread GitBox
mcvsubbu commented on a change in pull request #3877: Pinot controller side 
change to enhance LLC segment metadata upload.
URL: https://github.com/apache/incubator-pinot/pull/3877#discussion_r264885442
 
 

 ##
 File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/LLCSegmentCompletionHandlers.java
 ##
 @@ -254,31 +311,212 @@ public String 
segmentCommit(@QueryParam(SegmentCompletionProtocol.PARAM_INSTANCE
   @Produces(MediaType.APPLICATION_JSON)
   @Consumes(MediaType.MULTIPART_FORM_DATA)
   public String 
segmentUpload(@QueryParam(SegmentCompletionProtocol.PARAM_INSTANCE_ID) String 
instanceId,
-  @QueryParam(SegmentCompletionProtocol.PARAM_SEGMENT_NAME) String 
segmentName,
-  @QueryParam(SegmentCompletionProtocol.PARAM_OFFSET) long offset, 
FormDataMultiPart multiPart) {
+  
@QueryParam(SegmentCompletionProtocol.PARAM_SEGMENT_NAME) String segmentName,
+  
@QueryParam(SegmentCompletionProtocol.PARAM_OFFSET) long offset, 
FormDataMultiPart multiPart) {
 SegmentCompletionProtocol.Request.Params requestParams = new 
SegmentCompletionProtocol.Request.Params();
 
requestParams.withInstanceId(instanceId).withSegmentName(segmentName).withOffset(offset);
 LOGGER.info("Processing segmentUpload:{}", requestParams.toString());
 
-final String segmentLocation = uploadSegment(multiPart, instanceId, 
segmentName, true);
-if (segmentLocation == null) {
+// Get the segment from the form input and put it in the right place.
+File localTmpFile = uploadFileToLocalTmpFile(multiPart, instanceId, 
segmentName);
+if (localTmpFile == null) {
+  LOGGER.error("Unable to get the segment file from multipart input to 
local file {}", segmentName);
+  return SegmentCompletionProtocol.RESP_FAILED.toJsonString();
+}
+try {
+  FileUploadPathProvider provider = new 
FileUploadPathProvider(_controllerConf);
+  URI uri = localSegementFileToPinotFsTmpLocation(provider, localTmpFile, 
segmentName);
+  if (uri == null) {
+LOGGER.error("Unable to upload local segment file {} to Pinot 
storage", localTmpFile.toPath());
+return SegmentCompletionProtocol.RESP_FAILED.toJsonString();
+  }
+  SegmentCompletionProtocol.Response.Params responseParams =
+  new 
SegmentCompletionProtocol.Response.Params().withOffset(requestParams.getOffset())
+  .withSegmentLocation(uri.toString())
+  
.withStatus(SegmentCompletionProtocol.ControllerResponseStatus.UPLOAD_SUCCESS);
+
+  String response = new 
SegmentCompletionProtocol.Response(responseParams).toJsonString();
+
+  LOGGER.info("Response to segmentUpload:{}", response);
+
+  return response;
+} catch (Exception e) {
+
+  return SegmentCompletionProtocol.RESP_FAILED.toJsonString();
+} finally {
+  FileUtils.deleteQuietly(localTmpFile);
+}
+  }
+
+  @POST
+  @Path(SegmentCompletionProtocol.MSG_TYPE_COMMIT_END_METADATA)
+  @Produces(MediaType.APPLICATION_JSON)
+  @Consumes(MediaType.MULTIPART_FORM_DATA)
+  public String 
segmentCommitEndWithMetadata(@QueryParam(SegmentCompletionProtocol.PARAM_INSTANCE_ID)
 String instanceId,
+ 
@QueryParam(SegmentCompletionProtocol.PARAM_SEGMENT_NAME) String segmentName,
+ 
@QueryParam(SegmentCompletionProtocol.PARAM_SEGMENT_LOCATION) String 
segmentLocation,
+ 
@QueryParam(SegmentCompletionProtocol.PARAM_OFFSET) long offset,
+ 
@QueryParam(SegmentCompletionProtocol.PARAM_MEMORY_USED_BYTES) long 
memoryUsedBytes,
+ 
@QueryParam(SegmentCompletionProtocol.PARAM_BUILD_TIME_MILLIS) long 
buildTimeMillis,
+ 
@QueryParam(SegmentCompletionProtocol.PARAM_WAIT_TIME_MILLIS) long 
waitTimeMillis,
+ 
@QueryParam(SegmentCompletionProtocol.PARAM_ROW_COUNT) int numRows,
+ 
@QueryParam(SegmentCompletionProtocol.PARAM_SEGMENT_SIZE_BYTES) long 
segmentSizeBytes,
+ FormDataMultiPart metadataFiles) {
+if (instanceId == null || segmentName == null || offset == -1 || 
segmentLocation == null || metadataFiles == null) {
+  LOGGER.error("Invalid call: offset={}, segmentName={}, instanceId={}, 
segmentLocation={}", offset, segmentName,
+  instanceId, segmentLocation);
+  // TODO: memoryUsedInBytes = 0 if not present in params. Add validation 
when we start using it
+  return SegmentCompletionProtocol.RESP_FAILED.toJsonString();
+}
+
+SegmentCompletionProtocol.Request.Params requestParams = new 
SegmentCompletionProtocol.Request.Params();
+

[GitHub] [incubator-pinot] mcvsubbu commented on a change in pull request #3877: Pinot controller side change to enhance LLC segment metadata upload.

2019-03-12 Thread GitBox
mcvsubbu commented on a change in pull request #3877: Pinot controller side 
change to enhance LLC segment metadata upload.
URL: https://github.com/apache/incubator-pinot/pull/3877#discussion_r264895637
 
 

 ##
 File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/LLCSegmentCompletionHandlers.java
 ##
 @@ -254,31 +311,212 @@ public String 
segmentCommit(@QueryParam(SegmentCompletionProtocol.PARAM_INSTANCE
   @Produces(MediaType.APPLICATION_JSON)
   @Consumes(MediaType.MULTIPART_FORM_DATA)
   public String 
segmentUpload(@QueryParam(SegmentCompletionProtocol.PARAM_INSTANCE_ID) String 
instanceId,
-  @QueryParam(SegmentCompletionProtocol.PARAM_SEGMENT_NAME) String 
segmentName,
-  @QueryParam(SegmentCompletionProtocol.PARAM_OFFSET) long offset, 
FormDataMultiPart multiPart) {
+  
@QueryParam(SegmentCompletionProtocol.PARAM_SEGMENT_NAME) String segmentName,
+  
@QueryParam(SegmentCompletionProtocol.PARAM_OFFSET) long offset, 
FormDataMultiPart multiPart) {
 SegmentCompletionProtocol.Request.Params requestParams = new 
SegmentCompletionProtocol.Request.Params();
 
requestParams.withInstanceId(instanceId).withSegmentName(segmentName).withOffset(offset);
 LOGGER.info("Processing segmentUpload:{}", requestParams.toString());
 
-final String segmentLocation = uploadSegment(multiPart, instanceId, 
segmentName, true);
-if (segmentLocation == null) {
+// Get the segment from the form input and put it in the right place.
+File localTmpFile = uploadFileToLocalTmpFile(multiPart, instanceId, 
segmentName);
+if (localTmpFile == null) {
+  LOGGER.error("Unable to get the segment file from multipart input to 
local file {}", segmentName);
+  return SegmentCompletionProtocol.RESP_FAILED.toJsonString();
+}
+try {
+  FileUploadPathProvider provider = new 
FileUploadPathProvider(_controllerConf);
+  URI uri = localSegementFileToPinotFsTmpLocation(provider, localTmpFile, 
segmentName);
+  if (uri == null) {
+LOGGER.error("Unable to upload local segment file {} to Pinot 
storage", localTmpFile.toPath());
+return SegmentCompletionProtocol.RESP_FAILED.toJsonString();
+  }
+  SegmentCompletionProtocol.Response.Params responseParams =
+  new 
SegmentCompletionProtocol.Response.Params().withOffset(requestParams.getOffset())
+  .withSegmentLocation(uri.toString())
+  
.withStatus(SegmentCompletionProtocol.ControllerResponseStatus.UPLOAD_SUCCESS);
+
+  String response = new 
SegmentCompletionProtocol.Response(responseParams).toJsonString();
+
+  LOGGER.info("Response to segmentUpload:{}", response);
+
+  return response;
+} catch (Exception e) {
+
+  return SegmentCompletionProtocol.RESP_FAILED.toJsonString();
+} finally {
+  FileUtils.deleteQuietly(localTmpFile);
+}
+  }
+
+  @POST
+  @Path(SegmentCompletionProtocol.MSG_TYPE_COMMIT_END_METADATA)
+  @Produces(MediaType.APPLICATION_JSON)
+  @Consumes(MediaType.MULTIPART_FORM_DATA)
+  public String 
segmentCommitEndWithMetadata(@QueryParam(SegmentCompletionProtocol.PARAM_INSTANCE_ID)
 String instanceId,
+ 
@QueryParam(SegmentCompletionProtocol.PARAM_SEGMENT_NAME) String segmentName,
+ 
@QueryParam(SegmentCompletionProtocol.PARAM_SEGMENT_LOCATION) String 
segmentLocation,
+ 
@QueryParam(SegmentCompletionProtocol.PARAM_OFFSET) long offset,
+ 
@QueryParam(SegmentCompletionProtocol.PARAM_MEMORY_USED_BYTES) long 
memoryUsedBytes,
+ 
@QueryParam(SegmentCompletionProtocol.PARAM_BUILD_TIME_MILLIS) long 
buildTimeMillis,
+ 
@QueryParam(SegmentCompletionProtocol.PARAM_WAIT_TIME_MILLIS) long 
waitTimeMillis,
+ 
@QueryParam(SegmentCompletionProtocol.PARAM_ROW_COUNT) int numRows,
+ 
@QueryParam(SegmentCompletionProtocol.PARAM_SEGMENT_SIZE_BYTES) long 
segmentSizeBytes,
+ FormDataMultiPart metadataFiles) {
+if (instanceId == null || segmentName == null || offset == -1 || 
segmentLocation == null || metadataFiles == null) {
+  LOGGER.error("Invalid call: offset={}, segmentName={}, instanceId={}, 
segmentLocation={}", offset, segmentName,
+  instanceId, segmentLocation);
+  // TODO: memoryUsedInBytes = 0 if not present in params. Add validation 
when we start using it
+  return SegmentCompletionProtocol.RESP_FAILED.toJsonString();
+}
+
+SegmentCompletionProtocol.Request.Params requestParams = new 
SegmentCompletionProtocol.Request.Params();
+

[GitHub] [incubator-pinot] mcvsubbu commented on a change in pull request #3877: Pinot controller side change to enhance LLC segment metadata upload.

2019-03-12 Thread GitBox
mcvsubbu commented on a change in pull request #3877: Pinot controller side 
change to enhance LLC segment metadata upload.
URL: https://github.com/apache/incubator-pinot/pull/3877#discussion_r264890953
 
 

 ##
 File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/LLCSegmentCompletionHandlers.java
 ##
 @@ -182,33 +195,46 @@ public String 
segmentCommitStart(@QueryParam(SegmentCompletionProtocol.PARAM_INS
   @Path(SegmentCompletionProtocol.MSG_TYPE_COMMIT_END)
   @Produces(MediaType.APPLICATION_JSON)
   public String 
segmentCommitEnd(@QueryParam(SegmentCompletionProtocol.PARAM_INSTANCE_ID) 
String instanceId,
-  @QueryParam(SegmentCompletionProtocol.PARAM_SEGMENT_NAME) String 
segmentName,
-  @QueryParam(SegmentCompletionProtocol.PARAM_SEGMENT_LOCATION) String 
segmentLocation,
-  @QueryParam(SegmentCompletionProtocol.PARAM_OFFSET) long offset,
-  @QueryParam(SegmentCompletionProtocol.PARAM_MEMORY_USED_BYTES) long 
memoryUsedBytes,
-  @QueryParam(SegmentCompletionProtocol.PARAM_BUILD_TIME_MILLIS) long 
buildTimeMillis,
-  @QueryParam(SegmentCompletionProtocol.PARAM_WAIT_TIME_MILLIS) long 
waitTimeMillis,
-  @QueryParam(SegmentCompletionProtocol.PARAM_ROW_COUNT) int numRows,
-  @QueryParam(SegmentCompletionProtocol.PARAM_SEGMENT_SIZE_BYTES) long 
segmentSizeBytes) {
+ 
@QueryParam(SegmentCompletionProtocol.PARAM_SEGMENT_NAME) String segmentName,
+ 
@QueryParam(SegmentCompletionProtocol.PARAM_SEGMENT_LOCATION) String 
segmentLocation,
+ 
@QueryParam(SegmentCompletionProtocol.PARAM_OFFSET) long offset,
+ 
@QueryParam(SegmentCompletionProtocol.PARAM_MEMORY_USED_BYTES) long 
memoryUsedBytes,
+ 
@QueryParam(SegmentCompletionProtocol.PARAM_BUILD_TIME_MILLIS) long 
buildTimeMillis,
+ 
@QueryParam(SegmentCompletionProtocol.PARAM_WAIT_TIME_MILLIS) long 
waitTimeMillis,
+ 
@QueryParam(SegmentCompletionProtocol.PARAM_ROW_COUNT) int numRows,
+ 
@QueryParam(SegmentCompletionProtocol.PARAM_SEGMENT_SIZE_BYTES) long 
segmentSizeBytes) {
 if (instanceId == null || segmentName == null || offset == -1 || 
segmentLocation == null) {
   LOGGER.error("Invalid call: offset={}, segmentName={}, instanceId={}, 
segmentLocation={}", offset, segmentName,
-  instanceId, segmentLocation);
+  instanceId, segmentLocation);
   // TODO: memoryUsedInBytes = 0 if not present in params. Add validation 
when we start using it
   return SegmentCompletionProtocol.RESP_FAILED.toJsonString();
 }
 
+SegmentMetadataImpl segmentMetadata;
+try {
+  segmentMetadata = extractMetadataFromSegmentFile(segmentName, new 
URI(segmentLocation));
+} catch (URISyntaxException e) {
+  LOGGER.error("Invalid segment location: ", segmentLocation);
 
 Review comment:
   Please include segment name in this log. The idea is that we should be able 
to grep a segment name in the logs and find out what happened to it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org



[GitHub] [incubator-pinot] mcvsubbu commented on a change in pull request #3877: Pinot controller side change to enhance LLC segment metadata upload.

2019-03-12 Thread GitBox
mcvsubbu commented on a change in pull request #3877: Pinot controller side 
change to enhance LLC segment metadata upload.
URL: https://github.com/apache/incubator-pinot/pull/3877#discussion_r264883712
 
 

 ##
 File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/LLCSegmentCompletionHandlers.java
 ##
 @@ -19,15 +19,21 @@
 package org.apache.pinot.controller.api.resources;
 
 import com.google.common.annotations.VisibleForTesting;
+
+
 
 Review comment:
   remove extra newine


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org



[GitHub] [incubator-pinot] mcvsubbu commented on a change in pull request #3877: Pinot controller side change to enhance LLC segment metadata upload.

2019-03-12 Thread GitBox
mcvsubbu commented on a change in pull request #3877: Pinot controller side 
change to enhance LLC segment metadata upload.
URL: https://github.com/apache/incubator-pinot/pull/3877#discussion_r264890031
 
 

 ##
 File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/LLCSegmentCompletionHandlers.java
 ##
 @@ -219,30 +245,61 @@ public String 
segmentCommitEnd(@QueryParam(SegmentCompletionProtocol.PARAM_INSTA
   @Consumes(MediaType.MULTIPART_FORM_DATA)
   @Produces(MediaType.APPLICATION_JSON)
   public String 
segmentCommit(@QueryParam(SegmentCompletionProtocol.PARAM_INSTANCE_ID) String 
instanceId,
-  @QueryParam(SegmentCompletionProtocol.PARAM_SEGMENT_NAME) String 
segmentName,
-  @QueryParam(SegmentCompletionProtocol.PARAM_OFFSET) long offset,
-  @QueryParam(SegmentCompletionProtocol.PARAM_MEMORY_USED_BYTES) long 
memoryUsedBytes,
-  @QueryParam(SegmentCompletionProtocol.PARAM_BUILD_TIME_MILLIS) long 
buildTimeMillis,
-  @QueryParam(SegmentCompletionProtocol.PARAM_WAIT_TIME_MILLIS) long 
waitTimeMillis,
-  @QueryParam(SegmentCompletionProtocol.PARAM_SEGMENT_SIZE_BYTES) long 
segmentSizeBytes,
-  @QueryParam(SegmentCompletionProtocol.PARAM_ROW_COUNT) int numRows, 
FormDataMultiPart multiPart) {
+  
@QueryParam(SegmentCompletionProtocol.PARAM_SEGMENT_NAME) String segmentName,
 
 Review comment:
   please use the coding guidelines as in 
https://pinot.readthedocs.io/en/latest/dev_env.html


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org



[GitHub] [incubator-pinot] mcvsubbu commented on a change in pull request #3877: Pinot controller side change to enhance LLC segment metadata upload.

2019-03-12 Thread GitBox
mcvsubbu commented on a change in pull request #3877: Pinot controller side 
change to enhance LLC segment metadata upload.
URL: https://github.com/apache/incubator-pinot/pull/3877#discussion_r264893761
 
 

 ##
 File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/LLCSegmentCompletionHandlers.java
 ##
 @@ -360,4 +546,47 @@ private String uploadSegment(FormDataMultiPart multiPart, 
String instanceId, Str
   multiPart.cleanup();
 }
   }
+
+  private void localSegmentFileToPinotFSFinalLocation(FileUploadPathProvider 
provider, File localTmpFile,
+  String segmentName, 
String instanceId)
+  throws Exception {
+final String rawTableName = new LLCSegmentName(segmentName).getTableName();
+URI segmentFileURI = ControllerConf.getUriFromPath(StringUtil.join("/", 
provider.getBaseDataDirURI().toString(),
+
rawTableName, segmentName));
+PinotFS pinotFS = 
PinotFSFactory.create(provider.getBaseDataDirURI().getScheme());
+// Multiple threads can reach this point at the same time, if the 
following scenario happens
+// The server that was asked to commit did so very slowly (due to network 
speeds). Meanwhile the FSM in
+// SegmentCompletionManager timed out, and allowed another server to 
commit, which did so very quickly (somehow
+// the network speeds changed). The second server made it through the FSM 
and reached this point.
+// The synchronization below takes care that exactly one file gets moved 
in place.
+// There are still corner conditions that are not handled correctly. For 
example,
+// 1. What if the offset of the faster server was different?
+// 2. We know that only the faster server will get to complete the COMMIT 
call successfully. But it is possible
+//that the race to this statement is won by the slower server, and so 
the real segment that is in there is that
+//of the slower server.
+// In order to overcome controller restarts after the segment is renamed, 
but before it is committed, we DO need to
+// check for existing segment file and remove it. So, the block cannot be 
removed altogether.
+// For now, we live with these corner cases. Once we have split-commit 
enabled and working, this code will no longer
+// be used.
+synchronized (SegmentCompletionManager.getInstance()) {
+  if (pinotFS.exists(segmentFileURI)) {
+LOGGER.warn("Segment file {} exists. Replacing with upload from {}", 
segmentFileURI.toString(), instanceId);
+pinotFS.delete(segmentFileURI, true);
+  }
+  pinotFS.copyFromLocalFile(localTmpFile, segmentFileURI);
+}
+  }
+
+  private URI localSegementFileToPinotFsTmpLocation(FileUploadPathProvider 
provider, File localTmpFile,
+String segmentName) throws 
Exception {
+final String rawTableName = new LLCSegmentName(segmentName).getTableName();
+// We only clean up tmp segment file under table dir, so don't create any 
sub-dir under table dir.
+// See PinotLLCRealtimeSegmentManager.commitSegmentFile().
 
 Review comment:
   Please include the TODO here regarding moving the tmpfile logic to 
SegmentCompletionUtils


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org



[GitHub] [incubator-pinot] mcvsubbu commented on a change in pull request #3877: Pinot controller side change to enhance LLC segment metadata upload.

2019-03-08 Thread GitBox
mcvsubbu commented on a change in pull request #3877: Pinot controller side 
change to enhance LLC segment metadata upload.
URL: https://github.com/apache/incubator-pinot/pull/3877#discussion_r263956908
 
 

 ##
 File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/LLCSegmentCompletionHandlers.java
 ##
 @@ -276,10 +317,182 @@ public String 
segmentUpload(@QueryParam(SegmentCompletionProtocol.PARAM_INSTANCE
 return response;
   }
 
+  @POST
+  @Path(SegmentCompletionProtocol.MSG_TYPE_COMMIT_END_METADATA)
+  @Produces(MediaType.APPLICATION_JSON)
+  @Consumes(MediaType.MULTIPART_FORM_DATA)
+  public String 
segmentCommitEndWithMetadata(@QueryParam(SegmentCompletionProtocol.PARAM_INSTANCE_ID)
 String instanceId,
+ 
@QueryParam(SegmentCompletionProtocol.PARAM_SEGMENT_NAME) String segmentName,
+ 
@QueryParam(SegmentCompletionProtocol.PARAM_SEGMENT_LOCATION) String 
segmentLocation,
+ 
@QueryParam(SegmentCompletionProtocol.PARAM_OFFSET) long offset,
+ 
@QueryParam(SegmentCompletionProtocol.PARAM_MEMORY_USED_BYTES) long 
memoryUsedBytes,
+ 
@QueryParam(SegmentCompletionProtocol.PARAM_BUILD_TIME_MILLIS) long 
buildTimeMillis,
+ 
@QueryParam(SegmentCompletionProtocol.PARAM_WAIT_TIME_MILLIS) long 
waitTimeMillis,
+ 
@QueryParam(SegmentCompletionProtocol.PARAM_ROW_COUNT) int numRows,
+ 
@QueryParam(SegmentCompletionProtocol.PARAM_SEGMENT_SIZE_BYTES) long 
segmentSizeBytes,
+ FormDataMultiPart metadataFiles) {
+if (instanceId == null || segmentName == null || offset == -1 || 
segmentLocation == null || metadataFiles == null) {
+  LOGGER.error("Invalid call: offset={}, segmentName={}, instanceId={}, 
segmentLocation={}", offset, segmentName,
+  instanceId, segmentLocation);
+  // TODO: memoryUsedInBytes = 0 if not present in params. Add validation 
when we start using it
+  return SegmentCompletionProtocol.RESP_FAILED.toJsonString();
+}
+
+SegmentCompletionProtocol.Request.Params requestParams = new 
SegmentCompletionProtocol.Request.Params();
+
requestParams.withInstanceId(instanceId).withSegmentName(segmentName).withOffset(offset)
+
.withSegmentLocation(segmentLocation).withSegmentSizeBytes(segmentSizeBytes)
+
.withBuildTimeMillis(buildTimeMillis).withWaitTimeMillis(waitTimeMillis).withNumRows(numRows)
+.withMemoryUsedBytes(memoryUsedBytes);
+LOGGER.info("Processing segmentCommitEnd:{}", requestParams.toString());
+
+
+final boolean isSuccess = true;
+final boolean isSplitCommit = true;
+SegmentMetadataImpl segmentMetadata = 
extractMetadataFromInput(metadataFiles, segmentName);
+// If it fails to extract metadata from the input form, return failure.
+if (segmentMetadata == null) {
+  LOGGER.warn("Segment metadata extraction failure for segment {}", 
segmentName);
+  return SegmentCompletionProtocol.RESP_FAILED.toJsonString();
+}
+SegmentCompletionProtocol.Response response =
+
SegmentCompletionManager.getInstance().segmentCommitEnd(requestParams, 
isSuccess, isSplitCommit,
+
CommittingSegmentDescriptor.fromSegmentCompletionReqParamsAndMetadata(requestParams,
 segmentMetadata));
+final String responseStr = response.toJsonString();
+LOGGER.info("Response to segmentCommitEnd:{}", responseStr);
+return responseStr;
+  }
+
+  /**
+   * Extract and return the segment metadata from the two input form data 
files (metadata file and creation meta).
+   * Return null if any of the two files is missing or there is exception 
during parsing and extraction.
+   *
+   */
+  private SegmentMetadataImpl extractMetadataFromInput(FormDataMultiPart 
metadataFiles, String segmentNameStr) {
+String tempMetadataDirStr = StringUtil.join("/", 
_controllerConf.getLocalTempDir(), segmentNameStr + METADATA_TEMP_DIR_SUFFIX);
+File tempMetadataDir = new File(tempMetadataDirStr);
+
+try {
+  Preconditions.checkState(tempMetadataDir.mkdirs(), "Failed to create 
directory: %s", tempMetadataDirStr);
+  // Extract metadata.properties from the metadataFiles.
+  if (!extractMetadataFromInputField(metadataFiles, tempMetadataDirStr, 
V1Constants.MetadataKeys.METADATA_FILE_NAME)) {
+return null;
+  }
+  // Extract creation.meta from the metadataFiles.
+  if (!extractMetadataFromInputField(metadataFiles, tempMetadataDirStr, 
V1Constants.SEGMENT_CREATION_META)) {
+return null;
+  }
+  // Load segment metadata
+  return new SegmentMetadataImpl(tempMetadataDir);
+} catch (Exception e) {
+  

[GitHub] [incubator-pinot] mcvsubbu commented on a change in pull request #3877: Pinot controller side change to enhance LLC segment metadata upload.

2019-03-08 Thread GitBox
mcvsubbu commented on a change in pull request #3877: Pinot controller side 
change to enhance LLC segment metadata upload.
URL: https://github.com/apache/incubator-pinot/pull/3877#discussion_r263949161
 
 

 ##
 File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/LLCSegmentCompletionHandlers.java
 ##
 @@ -276,10 +317,182 @@ public String 
segmentUpload(@QueryParam(SegmentCompletionProtocol.PARAM_INSTANCE
 return response;
   }
 
+  @POST
+  @Path(SegmentCompletionProtocol.MSG_TYPE_COMMIT_END_METADATA)
+  @Produces(MediaType.APPLICATION_JSON)
+  @Consumes(MediaType.MULTIPART_FORM_DATA)
+  public String 
segmentCommitEndWithMetadata(@QueryParam(SegmentCompletionProtocol.PARAM_INSTANCE_ID)
 String instanceId,
+ 
@QueryParam(SegmentCompletionProtocol.PARAM_SEGMENT_NAME) String segmentName,
+ 
@QueryParam(SegmentCompletionProtocol.PARAM_SEGMENT_LOCATION) String 
segmentLocation,
+ 
@QueryParam(SegmentCompletionProtocol.PARAM_OFFSET) long offset,
+ 
@QueryParam(SegmentCompletionProtocol.PARAM_MEMORY_USED_BYTES) long 
memoryUsedBytes,
+ 
@QueryParam(SegmentCompletionProtocol.PARAM_BUILD_TIME_MILLIS) long 
buildTimeMillis,
+ 
@QueryParam(SegmentCompletionProtocol.PARAM_WAIT_TIME_MILLIS) long 
waitTimeMillis,
+ 
@QueryParam(SegmentCompletionProtocol.PARAM_ROW_COUNT) int numRows,
+ 
@QueryParam(SegmentCompletionProtocol.PARAM_SEGMENT_SIZE_BYTES) long 
segmentSizeBytes,
+ FormDataMultiPart metadataFiles) {
+if (instanceId == null || segmentName == null || offset == -1 || 
segmentLocation == null || metadataFiles == null) {
+  LOGGER.error("Invalid call: offset={}, segmentName={}, instanceId={}, 
segmentLocation={}", offset, segmentName,
+  instanceId, segmentLocation);
+  // TODO: memoryUsedInBytes = 0 if not present in params. Add validation 
when we start using it
+  return SegmentCompletionProtocol.RESP_FAILED.toJsonString();
+}
+
+SegmentCompletionProtocol.Request.Params requestParams = new 
SegmentCompletionProtocol.Request.Params();
+
requestParams.withInstanceId(instanceId).withSegmentName(segmentName).withOffset(offset)
+
.withSegmentLocation(segmentLocation).withSegmentSizeBytes(segmentSizeBytes)
+
.withBuildTimeMillis(buildTimeMillis).withWaitTimeMillis(waitTimeMillis).withNumRows(numRows)
+.withMemoryUsedBytes(memoryUsedBytes);
+LOGGER.info("Processing segmentCommitEnd:{}", requestParams.toString());
+
+
+final boolean isSuccess = true;
+final boolean isSplitCommit = true;
+SegmentMetadataImpl segmentMetadata = 
extractMetadataFromInput(metadataFiles, segmentName);
+// If it fails to extract metadata from the input form, return failure.
+if (segmentMetadata == null) {
+  LOGGER.warn("Segment metadata extraction failure for segment {}", 
segmentName);
+  return SegmentCompletionProtocol.RESP_FAILED.toJsonString();
+}
+SegmentCompletionProtocol.Response response =
+
SegmentCompletionManager.getInstance().segmentCommitEnd(requestParams, 
isSuccess, isSplitCommit,
+
CommittingSegmentDescriptor.fromSegmentCompletionReqParamsAndMetadata(requestParams,
 segmentMetadata));
+final String responseStr = response.toJsonString();
+LOGGER.info("Response to segmentCommitEnd:{}", responseStr);
+return responseStr;
+  }
+
+  /**
+   * Extract and return the segment metadata from the two input form data 
files (metadata file and creation meta).
+   * Return null if any of the two files is missing or there is exception 
during parsing and extraction.
+   *
+   */
+  private SegmentMetadataImpl extractMetadataFromInput(FormDataMultiPart 
metadataFiles, String segmentNameStr) {
+String tempMetadataDirStr = StringUtil.join("/", 
_controllerConf.getLocalTempDir(), segmentNameStr + METADATA_TEMP_DIR_SUFFIX);
 
 Review comment:
   Best to add current time as well as tmp_dir_suffix. Although with rare race 
and timing conditions, it is possible that multiple servers are executing this 
code path. We don't have lock here, but we do lock inside the state machine to 
protect ourselves that only one of them can commit.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [incubator-pinot] mcvsubbu commented on a change in pull request #3877: Pinot controller side change to enhance LLC segment metadata upload.

2019-03-08 Thread GitBox
mcvsubbu commented on a change in pull request #3877: Pinot controller side 
change to enhance LLC segment metadata upload.
URL: https://github.com/apache/incubator-pinot/pull/3877#discussion_r263960805
 
 

 ##
 File path: 
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterSplitCommitIntegrationTest.java
 ##
 @@ -41,4 +48,57 @@ public void startServer() {
 
serverConfig.setProperty(CommonConstants.Server.CONFIG_OF_ENABLE_SPLIT_COMMIT, 
true);
 startServer(serverConfig);
   }
+  // The goals of the following tests to show the diff between 
org.apache.commons.httpclient.URI and java.net.URI.
 
 Review comment:
   Thanks so much for demonstrating this in an automated fashion. Yes, you can 
remove these tests in your next iteration.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org



[GitHub] [incubator-pinot] mcvsubbu commented on a change in pull request #3877: Pinot controller side change to enhance LLC segment metadata upload.

2019-03-08 Thread GitBox
mcvsubbu commented on a change in pull request #3877: Pinot controller side 
change to enhance LLC segment metadata upload.
URL: https://github.com/apache/incubator-pinot/pull/3877#discussion_r263947987
 
 

 ##
 File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/LLCSegmentCompletionHandlers.java
 ##
 @@ -276,10 +317,182 @@ public String 
segmentUpload(@QueryParam(SegmentCompletionProtocol.PARAM_INSTANCE
 return response;
   }
 
+  @POST
+  @Path(SegmentCompletionProtocol.MSG_TYPE_COMMIT_END_METADATA)
+  @Produces(MediaType.APPLICATION_JSON)
+  @Consumes(MediaType.MULTIPART_FORM_DATA)
+  public String 
segmentCommitEndWithMetadata(@QueryParam(SegmentCompletionProtocol.PARAM_INSTANCE_ID)
 String instanceId,
+ 
@QueryParam(SegmentCompletionProtocol.PARAM_SEGMENT_NAME) String segmentName,
+ 
@QueryParam(SegmentCompletionProtocol.PARAM_SEGMENT_LOCATION) String 
segmentLocation,
+ 
@QueryParam(SegmentCompletionProtocol.PARAM_OFFSET) long offset,
+ 
@QueryParam(SegmentCompletionProtocol.PARAM_MEMORY_USED_BYTES) long 
memoryUsedBytes,
+ 
@QueryParam(SegmentCompletionProtocol.PARAM_BUILD_TIME_MILLIS) long 
buildTimeMillis,
+ 
@QueryParam(SegmentCompletionProtocol.PARAM_WAIT_TIME_MILLIS) long 
waitTimeMillis,
+ 
@QueryParam(SegmentCompletionProtocol.PARAM_ROW_COUNT) int numRows,
+ 
@QueryParam(SegmentCompletionProtocol.PARAM_SEGMENT_SIZE_BYTES) long 
segmentSizeBytes,
+ FormDataMultiPart metadataFiles) {
+if (instanceId == null || segmentName == null || offset == -1 || 
segmentLocation == null || metadataFiles == null) {
+  LOGGER.error("Invalid call: offset={}, segmentName={}, instanceId={}, 
segmentLocation={}", offset, segmentName,
+  instanceId, segmentLocation);
+  // TODO: memoryUsedInBytes = 0 if not present in params. Add validation 
when we start using it
+  return SegmentCompletionProtocol.RESP_FAILED.toJsonString();
+}
+
+SegmentCompletionProtocol.Request.Params requestParams = new 
SegmentCompletionProtocol.Request.Params();
+
requestParams.withInstanceId(instanceId).withSegmentName(segmentName).withOffset(offset)
+
.withSegmentLocation(segmentLocation).withSegmentSizeBytes(segmentSizeBytes)
+
.withBuildTimeMillis(buildTimeMillis).withWaitTimeMillis(waitTimeMillis).withNumRows(numRows)
+.withMemoryUsedBytes(memoryUsedBytes);
+LOGGER.info("Processing segmentCommitEnd:{}", requestParams.toString());
+
+
+final boolean isSuccess = true;
+final boolean isSplitCommit = true;
+SegmentMetadataImpl segmentMetadata = 
extractMetadataFromInput(metadataFiles, segmentName);
+// If it fails to extract metadata from the input form, return failure.
+if (segmentMetadata == null) {
+  LOGGER.warn("Segment metadata extraction failure for segment {}", 
segmentName);
 
 Review comment:
   should be an error log, since we have detected a mis-behaving server.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org



[GitHub] [incubator-pinot] mcvsubbu commented on a change in pull request #3877: Pinot controller side change to enhance LLC segment metadata upload.

2019-03-08 Thread GitBox
mcvsubbu commented on a change in pull request #3877: Pinot controller side 
change to enhance LLC segment metadata upload.
URL: https://github.com/apache/incubator-pinot/pull/3877#discussion_r263957977
 
 

 ##
 File path: 
pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
 ##
 @@ -1392,11 +1415,6 @@ protected boolean 
createNewSegmentMetadataZNRecord(TableConfig realtimeTableConf
   partitionAssignment, committingSegmentDescriptor, isNewTableSetup);
 }
 
-@Override
 
 Review comment:
   thanks for removing this :-)


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org



[GitHub] [incubator-pinot] mcvsubbu commented on a change in pull request #3877: Pinot controller side change to enhance LLC segment metadata upload.

2019-03-08 Thread GitBox
mcvsubbu commented on a change in pull request #3877: Pinot controller side 
change to enhance LLC segment metadata upload.
URL: https://github.com/apache/incubator-pinot/pull/3877#discussion_r263531788
 
 

 ##
 File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/LLCSegmentCompletionHandlers.java
 ##
 @@ -260,13 +291,13 @@ public String 
segmentUpload(@QueryParam(SegmentCompletionProtocol.PARAM_INSTANCE
 
requestParams.withInstanceId(instanceId).withSegmentName(segmentName).withOffset(offset);
 LOGGER.info("Processing segmentUpload:{}", requestParams.toString());
 
-final String segmentLocation = uploadSegment(multiPart, instanceId, 
segmentName, true);
-if (segmentLocation == null) {
+final ImmutablePair uploadResults = 
uploadSegment(multiPart, instanceId, segmentName, true);
 
 Review comment:
   Agreed


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org



[GitHub] [incubator-pinot] mcvsubbu commented on a change in pull request #3877: Pinot controller side change to enhance LLC segment metadata upload.

2019-03-08 Thread GitBox
mcvsubbu commented on a change in pull request #3877: Pinot controller side 
change to enhance LLC segment metadata upload.
URL: https://github.com/apache/incubator-pinot/pull/3877#discussion_r263532019
 
 

 ##
 File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/LLCSegmentCompletionHandlers.java
 ##
 @@ -276,10 +307,193 @@ public String 
segmentUpload(@QueryParam(SegmentCompletionProtocol.PARAM_INSTANCE
 return response;
   }
 
+  @POST
+  @Path(SegmentCompletionProtocol.MSG_TYPE_COMMIT_END_METADATA)
+  @Produces(MediaType.APPLICATION_JSON)
+  @Consumes(MediaType.MULTIPART_FORM_DATA)
+  public String 
segmentCommitEndWithMetadata(@QueryParam(SegmentCompletionProtocol.PARAM_INSTANCE_ID)
 String instanceId,
+ 
@QueryParam(SegmentCompletionProtocol.PARAM_SEGMENT_NAME) String segmentName,
+ 
@QueryParam(SegmentCompletionProtocol.PARAM_SEGMENT_LOCATION) String 
segmentLocation,
+ 
@QueryParam(SegmentCompletionProtocol.PARAM_OFFSET) long offset,
+ 
@QueryParam(SegmentCompletionProtocol.PARAM_MEMORY_USED_BYTES) long 
memoryUsedBytes,
+ 
@QueryParam(SegmentCompletionProtocol.PARAM_BUILD_TIME_MILLIS) long 
buildTimeMillis,
+ 
@QueryParam(SegmentCompletionProtocol.PARAM_WAIT_TIME_MILLIS) long 
waitTimeMillis,
+ 
@QueryParam(SegmentCompletionProtocol.PARAM_ROW_COUNT) int numRows,
+ 
@QueryParam(SegmentCompletionProtocol.PARAM_SEGMENT_SIZE_BYTES) long 
segmentSizeBytes,
+ FormDataMultiPart metadataFiles) {
+if (instanceId == null || segmentName == null || offset == -1 || 
segmentLocation == null || metadataFiles == null) {
+  LOGGER.error("Invalid call: offset={}, segmentName={}, instanceId={}, 
segmentLocation={}", offset, segmentName,
+  instanceId, segmentLocation);
+  // TODO: memoryUsedInBytes = 0 if not present in params. Add validation 
when we start using it
+  return SegmentCompletionProtocol.RESP_FAILED.toJsonString();
+}
+
+SegmentCompletionProtocol.Request.Params requestParams = new 
SegmentCompletionProtocol.Request.Params();
+
requestParams.withInstanceId(instanceId).withSegmentName(segmentName).withOffset(offset)
+
.withSegmentLocation(segmentLocation).withSegmentSizeBytes(segmentSizeBytes)
+
.withBuildTimeMillis(buildTimeMillis).withWaitTimeMillis(waitTimeMillis).withNumRows(numRows)
+.withMemoryUsedBytes(memoryUsedBytes);
+LOGGER.info("Processing segmentCommitEnd:{}", requestParams.toString());
+
+
+final boolean isSuccess = true;
+final boolean isSplitCommit = true;
+SegmentMetadataImpl segmentMetadata = 
extractMetadataFromInput(metadataFiles, segmentName);
+// If it fails to extract metadata from the input form, try to download 
the segment and extract it from the segment.
+if (segmentMetadata == null) {
+  LOGGER.info("Failed to extract segment metadata for {} from input form, 
fallback to use the segment file.",
+  segmentName);
+  try {
+segmentMetadata = extractMetadataFromSegmentFile(segmentName, new 
URI(segmentLocation));
 
 Review comment:
   I am saying we should not be calling extractMetadataFromSegmentFile() at 
all. Metadata better be in the POST body


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org



[GitHub] [incubator-pinot] mcvsubbu commented on a change in pull request #3877: Pinot controller side change to enhance LLC segment metadata upload.

2019-03-07 Thread GitBox
mcvsubbu commented on a change in pull request #3877: Pinot controller side 
change to enhance LLC segment metadata upload.
URL: https://github.com/apache/incubator-pinot/pull/3877#discussion_r263464348
 
 

 ##
 File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/LLCSegmentCompletionHandlers.java
 ##
 @@ -260,13 +291,13 @@ public String 
segmentUpload(@QueryParam(SegmentCompletionProtocol.PARAM_INSTANCE
 
requestParams.withInstanceId(instanceId).withSegmentName(segmentName).withOffset(offset);
 LOGGER.info("Processing segmentUpload:{}", requestParams.toString());
 
-final String segmentLocation = uploadSegment(multiPart, instanceId, 
segmentName, true);
-if (segmentLocation == null) {
+final ImmutablePair uploadResults = 
uploadSegment(multiPart, instanceId, segmentName, true);
 
 Review comment:
   we should not be extracting metadata in this handler, since we are doing 
that when the commit end happens. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org



[GitHub] [incubator-pinot] mcvsubbu commented on a change in pull request #3877: Pinot controller side change to enhance LLC segment metadata upload.

2019-03-07 Thread GitBox
mcvsubbu commented on a change in pull request #3877: Pinot controller side 
change to enhance LLC segment metadata upload.
URL: https://github.com/apache/incubator-pinot/pull/3877#discussion_r263456770
 
 

 ##
 File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/LLCSegmentCompletionHandlers.java
 ##
 @@ -19,11 +19,12 @@
 package org.apache.pinot.controller.api.resources;
 
 import com.google.common.annotations.VisibleForTesting;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
+
+import java.io.*;
 
 Review comment:
   Can we have individual imports as needed instead of wild-card? thanks


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org