This is an automated email from the ASF dual-hosted git repository. mcvsubbu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push: new fe203b5 Pinot server side change to optimize LLC segment completion with direct metadata upload. (#3941) fe203b5 is described below commit fe203b5be2bbbf6464f1c757f07cb3374fa3bf19 Author: Ting Chen <chenb...@gmail.com> AuthorDate: Tue Mar 19 16:35:41 2019 -0700 Pinot server side change to optimize LLC segment completion with direct metadata upload. (#3941) * The Pinot server side change to optimize LLC segment complete protocol by uploading metadata directly. * Minor style and comment fixes. * Fix error handling and style issues. * Add missing header.. * Add the config for commit end with metadata into server instance config. Combine integration tests. * Change comments. * Minor comment fix. * Use the metadata files created in the server directly. * Deprecated the old segmentCommitEnd method and change the default to commentEndWithMetadata. * Return null instead of crashing server... * Add comments. * Invert config checks. --- .../protocols/SegmentCompletionProtocol.java | 6 +++ .../apache/pinot/common/utils/CommonConstants.java | 1 + .../common/utils/FileUploadDownloadClient.java | 24 +++++++++++ .../manager/config/InstanceDataManagerConfig.java | 2 + .../realtime/LLRealtimeSegmentDataManager.java | 48 +++++++++++++++++++--- .../segment/index/loader/IndexLoadingConfig.java | 4 ++ .../ServerSegmentCompletionProtocolHandler.java | 37 +++++++++++++++++ .../realtime/LLRealtimeSegmentDataManagerTest.java | 4 +- ...CRealtimeClusterSplitCommitIntegrationTest.java | 2 +- .../helix/HelixInstanceDataManagerConfig.java | 7 ++++ 10 files changed, 126 insertions(+), 9 deletions(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/protocols/SegmentCompletionProtocol.java b/pinot-common/src/main/java/org/apache/pinot/common/protocols/SegmentCompletionProtocol.java index 60ee1e0..822ad21 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/protocols/SegmentCompletionProtocol.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/protocols/SegmentCompletionProtocol.java @@ -359,6 +359,12 @@ public class SegmentCompletionProtocol { } } + public static class SegmentCommitEndWithMetadataRequest extends Request { + public SegmentCommitEndWithMetadataRequest(Params params) { + super(params, MSG_TYPE_COMMIT_END_METADATA); + } + } + public static class SegmentStoppedConsuming extends Request { public SegmentStoppedConsuming(Params params) { super(params, MSG_TYPE_STOPPED_CONSUMING); diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java index b76f2b8..0777ad4 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java @@ -156,6 +156,7 @@ public class CommonConstants { public static final String CONFIG_OF_ENABLE_DEFAULT_COLUMNS = "pinot.server.instance.enable.default.columns"; public static final String CONFIG_OF_ENABLE_SHUTDOWN_DELAY = "pinot.server.instance.enable.shutdown.delay"; public static final String CONFIG_OF_ENABLE_SPLIT_COMMIT = "pinot.server.instance.enable.split.commit"; + public static final String CONFIG_OF_ENABLE_COMMIT_END_WITH_METADATA = "pinot.server.instance.enable.commitend.metadata"; public static final String CONFIG_OF_REALTIME_OFFHEAP_ALLOCATION = "pinot.server.instance.realtime.alloc.offheap"; public static final String CONFIG_OF_REALTIME_OFFHEAP_DIRECT_ALLOCATION = "pinot.server.instance.realtime.alloc.offheap.direct"; diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java index 4a37dcf..de8c76a 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java @@ -29,6 +29,7 @@ import java.io.OutputStream; import java.net.URI; import java.net.URISyntaxException; import java.util.List; +import java.util.Map; import javax.annotation.Nullable; import javax.net.ssl.SSLContext; import org.apache.commons.io.IOUtils; @@ -229,6 +230,22 @@ public class FileUploadDownloadClient implements Closeable { parameters, socketTimeoutMs); } + private static HttpUriRequest getUploadSegmentMetadataFilesRequest(URI uri, Map<String, File> metadataFiles, + int segmentUploadRequestTimeoutMs) { + MultipartEntityBuilder multipartEntityBuilder = MultipartEntityBuilder.create(). + setMode(HttpMultipartMode.BROWSER_COMPATIBLE); + for (Map.Entry<String, File> entry : metadataFiles.entrySet()) { + multipartEntityBuilder.addPart(entry.getKey(), getContentBody(entry.getKey(), entry.getValue())); + } + HttpEntity entity = multipartEntityBuilder.build(); + + // Build the POST request. + RequestBuilder requestBuilder = + RequestBuilder.create(HttpPost.METHOD_NAME).setVersion(HttpVersion.HTTP_1_1).setUri(uri).setEntity(entity); + setTimeout(requestBuilder, segmentUploadRequestTimeoutMs); + return requestBuilder.build(); + } + private static HttpUriRequest getSendSegmentUriRequest(URI uri, String downloadUri, @Nullable List<Header> headers, @Nullable List<NameValuePair> parameters, int socketTimeoutMs) { RequestBuilder requestBuilder = RequestBuilder.post(uri).setVersion(HttpVersion.HTTP_1_1) @@ -388,6 +405,13 @@ public class FileUploadDownloadClient implements Closeable { getUploadSegmentMetadataRequest(uri, segmentName, segmentMetadataFile, headers, parameters, socketTimeoutMs)); } + // Upload a set of segment metadata files (e.g., meta.properties and creation.meta) to controllers. + public SimpleHttpResponse uploadSegmentMetadataFiles(URI uri, Map<String, File> metadataFiles, + int segmentUploadRequestTimeoutMs) + throws IOException, HttpErrorStatusException { + return sendRequest(getUploadSegmentMetadataFilesRequest(uri, metadataFiles, segmentUploadRequestTimeoutMs)); + } + /** * Upload segment with segment file. * diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/config/InstanceDataManagerConfig.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/config/InstanceDataManagerConfig.java index 2685768..ba1896f 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/config/InstanceDataManagerConfig.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/config/InstanceDataManagerConfig.java @@ -45,6 +45,8 @@ public interface InstanceDataManagerConfig { boolean isEnableSplitCommit(); + boolean isEnableSplitCommitEndWithMetadata(); + boolean isRealtimeOffHeapAllocation(); boolean isDirectRealtimeOffheapAllocation(); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java index 18b1409..50ee54e 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java @@ -25,7 +25,9 @@ import com.yammer.metrics.core.Meter; import java.io.File; import java.io.IOException; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; @@ -67,6 +69,7 @@ import org.apache.pinot.core.realtime.stream.StreamDecoderProvider; import org.apache.pinot.core.realtime.stream.StreamMessageDecoder; import org.apache.pinot.core.realtime.stream.StreamMetadataProvider; import org.apache.pinot.core.realtime.stream.TransientConsumerException; +import org.apache.pinot.core.segment.creator.impl.V1Constants; import org.apache.pinot.core.segment.index.loader.IndexLoadingConfig; import org.apache.pinot.server.realtime.ServerSegmentCompletionProtocolHandler; import org.joda.time.DateTime; @@ -130,15 +133,17 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { protected class SegmentBuildDescriptor { final String _segmentTarFilePath; + final Map<String, File> _metadataFileMap; final long _offset; final long _waitTimeMillis; final long _buildTimeMillis; final String _segmentDirPath; final long _segmentSizeBytes; - SegmentBuildDescriptor(String segmentTarFilePath, long offset, String segmentDirPath, long buildTimeMillis, - long waitTimeMillis, long segmentSizeBytes) { + SegmentBuildDescriptor(String segmentTarFilePath, Map<String, File> metadataFileMap, long offset, + String segmentDirPath, long buildTimeMillis, long waitTimeMillis, long segmentSizeBytes) { _segmentTarFilePath = segmentTarFilePath; + _metadataFileMap = metadataFileMap; _offset = offset; _buildTimeMillis = buildTimeMillis; _waitTimeMillis = waitTimeMillis; @@ -173,6 +178,10 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { FileUtils.deleteQuietly(new File(_segmentTarFilePath)); } } + + public Map<String, File> getMetadataFiles() { + return _metadataFileMap; + } } private static final long TIME_THRESHOLD_FOR_LOG_MINUTES = 1; @@ -681,11 +690,32 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { TimeUnit.MILLISECONDS.toSeconds(waitTimeMillis)); if (forCommit) { + File[] segmentfiles = destDir.listFiles(); + if (segmentfiles == null || segmentfiles.length == 0) { + segmentLogger.error("The index dir is empty: {}", destDir); + return null; + } + // segmentfiles[0] is the sub directory with version name (e.g., V3). + File metadataFileName = new File(segmentfiles[0], V1Constants.MetadataKeys.METADATA_FILE_NAME); + if (!metadataFileName.exists()) { + segmentLogger + .error("File does not exist in {} for {}.", destDir, V1Constants.MetadataKeys.METADATA_FILE_NAME); + return null; + } + File creationMetaFile = new File(segmentfiles[0], V1Constants.SEGMENT_CREATION_META); + if (!creationMetaFile.exists()) { + segmentLogger.error("File does not exist in {} for {}.", destDir, V1Constants.SEGMENT_CREATION_META); + return null; + } + + Map<String, File> metadataFiles = new HashMap<>(); + metadataFiles.put(V1Constants.MetadataKeys.METADATA_FILE_NAME, metadataFileName); + metadataFiles.put(V1Constants.SEGMENT_CREATION_META, creationMetaFile); return new SegmentBuildDescriptor(destDir.getAbsolutePath() + TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION, - _currentOffset, null, buildTimeMillis, waitTimeMillis, segmentSizeBytes); + metadataFiles, _currentOffset, null, buildTimeMillis, waitTimeMillis, segmentSizeBytes); } - return new SegmentBuildDescriptor(null, _currentOffset, destDir.getAbsolutePath(), buildTimeMillis, - waitTimeMillis, segmentSizeBytes); + return new SegmentBuildDescriptor(null, null, _currentOffset, destDir.getAbsolutePath(), + buildTimeMillis, waitTimeMillis, segmentSizeBytes); } catch (InterruptedException e) { segmentLogger.error("Interrupted while waiting for semaphore"); return null; @@ -735,7 +765,13 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { if (_isOffHeap) { params.withMemoryUsedBytes(_memoryManager.getTotalAllocatedBytes()); } - SegmentCompletionProtocol.Response commitEndResponse = _protocolHandler.segmentCommitEnd(params); + SegmentCompletionProtocol.Response commitEndResponse; + if (_indexLoadingConfig.isEnableSplitCommitEndWithMetadata()) { + commitEndResponse = _protocolHandler.segmentCommitEndWithMetadata(params, _segmentBuildDescriptor.getMetadataFiles()); + } else { + commitEndResponse = _protocolHandler.segmentCommitEnd(params); + } + if (!commitEndResponse.getStatus().equals(SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_SUCCESS)) { segmentLogger.warn("CommitEnd failed with response {}", commitEndResponse.toJsonString()); return SegmentCompletionProtocol.RESP_FAILED; diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/IndexLoadingConfig.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/IndexLoadingConfig.java index 7e46d0c..04f9beb 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/IndexLoadingConfig.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/IndexLoadingConfig.java @@ -57,6 +57,7 @@ public class IndexLoadingConfig { private boolean _enableSplitCommit; private boolean _isRealtimeOffheapAllocation; private boolean _isDirectRealtimeOffheapAllocation; + private boolean _enableSplitCommitEndWithMetadata; public IndexLoadingConfig(@Nonnull InstanceDataManagerConfig instanceDataManagerConfig, @Nonnull TableConfig tableConfig) { @@ -135,6 +136,7 @@ public class IndexLoadingConfig { if (avgMultiValueCount != null) { _realtimeAvgMultiValueCount = Integer.valueOf(avgMultiValueCount); } + _enableSplitCommitEndWithMetadata = instanceDataManagerConfig.isEnableSplitCommitEndWithMetadata(); } /** @@ -222,6 +224,8 @@ public class IndexLoadingConfig { return _enableSplitCommit; } + public boolean isEnableSplitCommitEndWithMetadata() { return _enableSplitCommitEndWithMetadata; } + public boolean isRealtimeOffheapAllocation() { return _isRealtimeOffheapAllocation; } diff --git a/pinot-core/src/main/java/org/apache/pinot/server/realtime/ServerSegmentCompletionProtocolHandler.java b/pinot-core/src/main/java/org/apache/pinot/server/realtime/ServerSegmentCompletionProtocolHandler.java index c411d16..2261c55 100644 --- a/pinot-core/src/main/java/org/apache/pinot/server/realtime/ServerSegmentCompletionProtocolHandler.java +++ b/pinot-core/src/main/java/org/apache/pinot/server/realtime/ServerSegmentCompletionProtocolHandler.java @@ -20,6 +20,7 @@ package org.apache.pinot.server.realtime; import java.io.File; import java.net.URI; +import java.util.Map; import javax.net.ssl.SSLContext; import org.apache.commons.configuration.Configuration; import org.apache.pinot.common.metrics.ServerMeter; @@ -95,6 +96,8 @@ public class ServerSegmentCompletionProtocolHandler { return uploadSegment(url, params.getSegmentName(), segmentTarFile); } + // Replaced by segmentCommitEndWithMetadata(). + @Deprecated public SegmentCompletionProtocol.Response segmentCommitEnd(SegmentCompletionProtocol.Request.Params params) { SegmentCompletionProtocol.SegmentCommitEndRequest request = new SegmentCompletionProtocol.SegmentCommitEndRequest(params); @@ -105,6 +108,17 @@ public class ServerSegmentCompletionProtocolHandler { return sendRequest(url); } + public SegmentCompletionProtocol.Response segmentCommitEndWithMetadata( + SegmentCompletionProtocol.Request.Params params, final Map<String, File> metadataFiles) { + SegmentCompletionProtocol.SegmentCommitEndWithMetadataRequest request = + new SegmentCompletionProtocol.SegmentCommitEndWithMetadataRequest(params); + String url = createSegmentCompletionUrl(request); + if (url == null) { + return SegmentCompletionProtocol.RESP_NOT_SENT; + } + return sendCommitEndWithMetadataFiles(url, metadataFiles); + } + public SegmentCompletionProtocol.Response segmentCommit(SegmentCompletionProtocol.Request.Params params, final File segmentTarFile) { SegmentCompletionProtocol.SegmentCommitRequest request = new SegmentCompletionProtocol.SegmentCommitRequest(params); @@ -182,6 +196,29 @@ public class ServerSegmentCompletionProtocolHandler { return response; } + private SegmentCompletionProtocol.Response sendCommitEndWithMetadataFiles(String url, + Map<String, File> metadataFiles) { + SegmentCompletionProtocol.Response response; + try { + String responseStr = _fileUploadDownloadClient + .uploadSegmentMetadataFiles(new URI(url), metadataFiles, SEGMENT_UPLOAD_REQUEST_TIMEOUT_MS).getResponse(); + response = SegmentCompletionProtocol.Response.fromJsonString(responseStr); + LOGGER.info("Controller response {} for {}", response.toJsonString(), url); + if (response.getStatus().equals(SegmentCompletionProtocol.ControllerResponseStatus.NOT_LEADER)) { + ControllerLeaderLocator.getInstance().invalidateCachedControllerLeader(); + } + } catch (Exception e) { + // Catch all exceptions, we want the protocol to handle the case assuming the request was never sent. + response = SegmentCompletionProtocol.RESP_NOT_SENT; + LOGGER.error("Could not send request {}", url, e); + // Invalidate controller leader cache, as exception could be because of leader being down (deployment/failure) and hence unable to send {@link SegmentCompletionProtocol.ControllerResponseStatus.NOT_LEADER} + // If cache is not invalidated, we will not recover from exceptions until the controller comes back up + ControllerLeaderLocator.getInstance().invalidateCachedControllerLeader(); + } + raiseSegmentCompletionProtocolResponseMetric(response); + return response; + } + private SegmentCompletionProtocol.Response uploadSegment(String url, final String segmentName, final File segmentTarFile) { SegmentCompletionProtocol.Response response; diff --git a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java index bf584d1..e87a85a 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java @@ -766,7 +766,7 @@ public class LLRealtimeSegmentDataManagerTest { return null; } if (!forCommit) { - return new SegmentBuildDescriptor(null, getCurrentOffset(), _segmentDir, 0, 0, -1); + return new SegmentBuildDescriptor(null, null, getCurrentOffset(), _segmentDir, 0, 0, -1); } final String segTarFileName = _segmentDir + "/" + "segmentFile"; File segmentTgzFile = new File(segTarFileName); @@ -775,7 +775,7 @@ public class LLRealtimeSegmentDataManagerTest { } catch (IOException e) { Assert.fail("Could not create file " + segmentTgzFile); } - return new SegmentBuildDescriptor(segTarFileName, getCurrentOffset(), null, 0, 0, -1); + return new SegmentBuildDescriptor(segTarFileName, null, getCurrentOffset(), null, 0, 0, -1); } @Override diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterSplitCommitIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterSplitCommitIntegrationTest.java index fca5d99..454cdb4 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterSplitCommitIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterSplitCommitIntegrationTest.java @@ -27,7 +27,6 @@ import org.apache.pinot.controller.ControllerConf; * Integration test that extends LLCRealtimeClusterIntegrationTest but with split commit enabled. */ public class LLCRealtimeClusterSplitCommitIntegrationTest extends LLCRealtimeClusterIntegrationTest { - @Override public void startController() { ControllerConf controllerConfig = getDefaultControllerConfiguration(); @@ -39,6 +38,7 @@ public class LLCRealtimeClusterSplitCommitIntegrationTest extends LLCRealtimeClu public void startServer() { Configuration serverConfig = getDefaultServerConfiguration(); serverConfig.setProperty(CommonConstants.Server.CONFIG_OF_ENABLE_SPLIT_COMMIT, true); + serverConfig.setProperty(CommonConstants.Server.CONFIG_OF_ENABLE_COMMIT_END_WITH_METADATA, true); startServer(serverConfig); } } diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManagerConfig.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManagerConfig.java index 2154d88..5439618 100644 --- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManagerConfig.java +++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManagerConfig.java @@ -68,6 +68,8 @@ public class HelixInstanceDataManagerConfig implements InstanceDataManagerConfig // Key of whether to enable split commit private static final String ENABLE_SPLIT_COMMIT = "enable.split.commit"; + // Key of whether to enable split commit end with segment metadata files. + private static final String ENABLE_SPLIT_COMMIT_END_WITH_METADATA = "enable.commitend.metadata"; // Whether memory for realtime consuming segments should be allocated off-heap. private static final String REALTIME_OFFHEAP_ALLOCATION = "realtime.alloc.offheap"; @@ -164,6 +166,11 @@ public class HelixInstanceDataManagerConfig implements InstanceDataManagerConfig } @Override + public boolean isEnableSplitCommitEndWithMetadata() { + return _instanceDataManagerConfiguration.getBoolean(ENABLE_SPLIT_COMMIT_END_WITH_METADATA, true); + } + + @Override public boolean isRealtimeOffHeapAllocation() { return _instanceDataManagerConfiguration.getBoolean(REALTIME_OFFHEAP_ALLOCATION, false); } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org