This is an automated email from the ASF dual-hosted git repository.

tingchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 1763e70f79 Minor Realtime Segment Commit Upload Improvements (#10725)
1763e70f79 is described below

commit 1763e70f79ed1328e3e33ae3817557f927d5fa38
Author: Ankit Sultana <ankitsult...@uber.com>
AuthorDate: Thu May 11 00:17:04 2023 +0530

    Minor Realtime Segment Commit Upload Improvements (#10725)
    
    * Improve Monitoring for Realtime Segment Commit + Configurable Segment 
Upload Timeout
    
    * Add metric for upload timeout
    
    * Minor refactor
    
    * Add metrics for success/failure
    
    * Set timeout in HelixInstanceDataManager as well
---
 .../pinot/common/metrics/ControllerMeter.java      |  1 +
 .../apache/pinot/common/metrics/ServerMeter.java   |  4 ++++
 .../apache/pinot/common/metrics/ServerTimer.java   |  2 ++
 .../realtime/PinotLLCRealtimeSegmentManager.java   |  5 +++++
 .../manager/realtime/PinotFSSegmentUploader.java   | 22 +++++++++++++++++++++-
 .../manager/realtime/SegmentCommitterFactory.java  |  4 ++--
 .../realtime/Server2ControllerSegmentUploader.java | 18 ++++++++++++++++--
 .../ServerSegmentCompletionProtocolHandler.java    |  2 +-
 .../realtime/PinotFSSegmentUploaderTest.java       | 11 +++++++----
 .../Server2ControllerSegmentUploaderTest.java      |  4 ++--
 .../starter/helix/HelixInstanceDataManager.java    |  3 ++-
 11 files changed, 63 insertions(+), 13 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerMeter.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerMeter.java
index edd70ee642..758be1550a 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerMeter.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerMeter.java
@@ -57,6 +57,7 @@ public enum ControllerMeter implements AbstractMetrics.Meter {
   CRON_SCHEDULER_JOB_TRIGGERED("cronSchedulerJobTriggered", false),
   CRON_SCHEDULER_JOB_SKIPPED("cronSchedulerJobSkipped", false),
   
LLC_SEGMENTS_DEEP_STORE_UPLOAD_RETRY_ERROR("LLCSegmentDeepStoreUploadRetryError",
 false),
+  SEGMENT_MISSING_DEEP_STORE_LINK("RealtimeSegmentMissingDeepStoreLink", 
false),
   NUMBER_ADHOC_TASKS_SUBMITTED("adhocTasks", false);
 
 
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
index ff292a5092..435ec53566 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
@@ -74,6 +74,10 @@ public enum ServerMeter implements AbstractMetrics.Meter {
   SEGMENT_DOWNLOAD_FAILURES("segments", false),
   SEGMENT_DOWNLOAD_FROM_REMOTE_FAILURES("segments", false),
   SEGMENT_DOWNLOAD_FROM_PEERS_FAILURES("segments", false),
+  SEGMENT_UPLOAD_FAILURE("segments", false),
+  SEGMENT_UPLOAD_SUCCESS("segments", false),
+  // Emitted only by Server to Deep-store segment uploader.
+  SEGMENT_UPLOAD_TIMEOUT("segments", false),
   NUM_RESIZES("numResizes", false),
   NO_TABLE_ACCESS("tables", true),
   INDEXING_FAILURES("attributeValues", true),
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerTimer.java 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerTimer.java
index 5631aeda8e..aa0952730b 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerTimer.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerTimer.java
@@ -44,6 +44,8 @@ public enum ServerTimer implements AbstractMetrics.Timer {
   RESPONSE_SER_CPU_TIME_NS("nanoseconds", false, "Query cost (response 
serialization cpu time) "
       + "for query processing on server. Computed as the time spent in 
serializing query response on servers"),
 
+  SEGMENT_UPLOAD_TIME_MS("milliseconds", false),
+
   TOTAL_CPU_TIME_NS("nanoseconds", false, "Total query cost (thread cpu time + 
system "
       + "activities cpu time + response serialization cpu time) for query 
processing on server.");
 
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index e17dc8d46d..455ff15894 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -36,6 +36,7 @@ import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.stream.Collectors;
 import javax.annotation.Nullable;
+import org.apache.commons.lang.StringUtils;
 import org.apache.helix.AccessOption;
 import org.apache.helix.Criteria;
 import org.apache.helix.HelixAdmin;
@@ -606,6 +607,10 @@ public class PinotLLCRealtimeSegmentManager {
 
     // Trigger the metadata event notifier
     _metadataEventNotifierFactory.create().notifyOnSegmentFlush(tableConfig);
+
+    if (StringUtils.isBlank(committingSegmentDescriptor.getSegmentLocation())) 
{
+      _controllerMetrics.addMeteredTableValue(realtimeTableName, 
ControllerMeter.SEGMENT_MISSING_DEEP_STORE_LINK, 1);
+    }
   }
 
   /**
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/PinotFSSegmentUploader.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/PinotFSSegmentUploader.java
index 2325a44e19..9a34872a50 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/PinotFSSegmentUploader.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/PinotFSSegmentUploader.java
@@ -26,10 +26,15 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import org.apache.pinot.common.metrics.ServerMeter;
+import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.common.metrics.ServerTimer;
 import org.apache.pinot.common.utils.LLCSegmentName;
 import org.apache.pinot.spi.filesystem.PinotFS;
 import org.apache.pinot.spi.filesystem.PinotFSFactory;
 import org.apache.pinot.spi.utils.StringUtil;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -46,10 +51,12 @@ public class PinotFSSegmentUploader implements 
SegmentUploader {
   private final String _segmentStoreUriStr;
   private final ExecutorService _executorService = 
Executors.newCachedThreadPool();
   private final int _timeoutInMs;
+  private final ServerMetrics _serverMetrics;
 
-  public PinotFSSegmentUploader(String segmentStoreDirUri, int timeoutMillis) {
+  public PinotFSSegmentUploader(String segmentStoreDirUri, int timeoutMillis, 
ServerMetrics serverMetrics) {
     _segmentStoreUriStr = segmentStoreDirUri;
     _timeoutInMs = timeoutMillis;
+    _serverMetrics = serverMetrics;
   }
 
   public URI uploadSegment(File segmentFile, LLCSegmentName segmentName) {
@@ -58,9 +65,11 @@ public class PinotFSSegmentUploader implements 
SegmentUploader {
           segmentName.getSegmentName());
       return null;
     }
+    final String rawTableName = 
TableNameBuilder.extractRawTableName(segmentName.getTableName());
     Callable<URI> uploadTask = () -> {
       URI destUri = new URI(StringUtil.join(File.separator, 
_segmentStoreUriStr, segmentName.getTableName(),
           segmentName.getSegmentName() + UUID.randomUUID().toString()));
+      long startTime = System.currentTimeMillis();
       try {
         PinotFS pinotFS = PinotFSFactory.create(new 
URI(_segmentStoreUriStr).getScheme());
         // Check and delete any existing segment file.
@@ -71,6 +80,10 @@ public class PinotFSSegmentUploader implements 
SegmentUploader {
         return destUri;
       } catch (Exception e) {
         LOGGER.warn("Failed copy segment tar file {} to segment store {}: {}", 
segmentFile.getName(), destUri, e);
+      } finally {
+        long duration = System.currentTimeMillis() - startTime;
+        _serverMetrics.addTimedTableValue(rawTableName, 
ServerTimer.SEGMENT_UPLOAD_TIME_MS, duration,
+            TimeUnit.MILLISECONDS);
       }
       return null;
     };
@@ -78,14 +91,21 @@ public class PinotFSSegmentUploader implements 
SegmentUploader {
     try {
       URI segmentLocation = future.get(_timeoutInMs, TimeUnit.MILLISECONDS);
       LOGGER.info("Successfully upload segment {} to {}.", segmentName, 
segmentLocation);
+      _serverMetrics.addMeteredTableValue(rawTableName,
+          segmentLocation == null ? ServerMeter.SEGMENT_UPLOAD_FAILURE : 
ServerMeter.SEGMENT_UPLOAD_SUCCESS, 1);
       return segmentLocation;
     } catch (InterruptedException e) {
       LOGGER.info("Interrupted while waiting for segment upload of {} to {}.", 
segmentName, _segmentStoreUriStr);
       Thread.currentThread().interrupt();
+    } catch (TimeoutException e) {
+      // Emit a separate metric for timeout since this is relatively more 
common than other errors.
+      _serverMetrics.addMeteredTableValue(rawTableName, 
ServerMeter.SEGMENT_UPLOAD_TIMEOUT, 1);
+      LOGGER.warn("Timed out waiting to upload segment: {} for table: {}", 
segmentName.getSegmentName(), rawTableName);
     } catch (Exception e) {
       LOGGER
           .warn("Failed to upload file {} of segment {} for table {} ", 
segmentFile.getAbsolutePath(), segmentName, e);
     }
+    _serverMetrics.addMeteredTableValue(rawTableName, 
ServerMeter.SEGMENT_UPLOAD_FAILURE, 1);
 
     return null;
   }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentCommitterFactory.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentCommitterFactory.java
index 37ff2e5a82..297f30482b 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentCommitterFactory.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentCommitterFactory.java
@@ -74,13 +74,13 @@ public class SegmentCommitterFactory {
     if (uploadToFs || peerSegmentDownloadScheme != null) {
       // TODO: peer scheme non-null check exists for backwards compatibility. 
remove check once users have migrated
       segmentUploader = new PinotFSSegmentUploader(segmentStoreUri,
-          PinotFSSegmentUploader.DEFAULT_SEGMENT_UPLOAD_TIMEOUT_MILLIS);
+          
ServerSegmentCompletionProtocolHandler.getSegmentUploadRequestTimeoutMs(), 
_serverMetrics);
     } else {
       segmentUploader = new Server2ControllerSegmentUploader(_logger,
           _protocolHandler.getFileUploadDownloadClient(),
           _protocolHandler.getSegmentCommitUploadURL(params, 
controllerVipUrl), params.getSegmentName(),
           
ServerSegmentCompletionProtocolHandler.getSegmentUploadRequestTimeoutMs(), 
_serverMetrics,
-          _protocolHandler.getAuthProvider());
+          _protocolHandler.getAuthProvider(), _tableConfig.getTableName());
     }
 
     return new SplitSegmentCommitter(_logger, _protocolHandler, params, 
segmentUploader, peerSegmentDownloadScheme);
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/Server2ControllerSegmentUploader.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/Server2ControllerSegmentUploader.java
index 2559616b49..5aa5b8b266 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/Server2ControllerSegmentUploader.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/Server2ControllerSegmentUploader.java
@@ -21,14 +21,18 @@ package org.apache.pinot.core.data.manager.realtime;
 import java.io.File;
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.util.concurrent.TimeUnit;
 import org.apache.pinot.common.auth.AuthProviderUtils;
+import org.apache.pinot.common.metrics.ServerMeter;
 import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.common.metrics.ServerTimer;
 import org.apache.pinot.common.protocols.SegmentCompletionProtocol;
 import org.apache.pinot.common.utils.FileUploadDownloadClient;
 import org.apache.pinot.common.utils.LLCSegmentName;
 import org.apache.pinot.core.util.SegmentCompletionProtocolUtils;
 import org.apache.pinot.server.realtime.ControllerLeaderLocator;
 import org.apache.pinot.spi.auth.AuthProvider;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.slf4j.Logger;
 
 
@@ -42,10 +46,11 @@ public class Server2ControllerSegmentUploader implements 
SegmentUploader {
   private final int _segmentUploadRequestTimeoutMs;
   private final ServerMetrics _serverMetrics;
   private final AuthProvider _authProvider;
+  private final String _rawTableName;
 
   public Server2ControllerSegmentUploader(Logger segmentLogger, 
FileUploadDownloadClient fileUploadDownloadClient,
       String controllerSegmentUploadCommitUrl, String segmentName, int 
segmentUploadRequestTimeoutMs,
-      ServerMetrics serverMetrics, AuthProvider authProvider)
+      ServerMetrics serverMetrics, AuthProvider authProvider, String tableName)
       throws URISyntaxException {
     _segmentLogger = segmentLogger;
     _fileUploadDownloadClient = fileUploadDownloadClient;
@@ -54,6 +59,7 @@ public class Server2ControllerSegmentUploader implements 
SegmentUploader {
     _segmentUploadRequestTimeoutMs = segmentUploadRequestTimeoutMs;
     _serverMetrics = serverMetrics;
     _authProvider = authProvider;
+    _rawTableName = TableNameBuilder.extractRawTableName(tableName);
   }
 
   @Override
@@ -61,16 +67,20 @@ public class Server2ControllerSegmentUploader implements 
SegmentUploader {
     SegmentCompletionProtocol.Response response = 
uploadSegmentToController(segmentFile);
     if (response.getStatus() == 
SegmentCompletionProtocol.ControllerResponseStatus.UPLOAD_SUCCESS) {
       try {
-        return new URI(response.getSegmentLocation());
+        URI uri = new URI(response.getSegmentLocation());
+        _serverMetrics.addMeteredTableValue(_rawTableName, 
ServerMeter.SEGMENT_UPLOAD_SUCCESS, 1);
+        return uri;
       } catch (URISyntaxException e) {
         _segmentLogger.error("Error in segment location format: ", e);
       }
     }
+    _serverMetrics.addMeteredTableValue(_rawTableName, 
ServerMeter.SEGMENT_UPLOAD_FAILURE, 1);
     return null;
   }
 
   public SegmentCompletionProtocol.Response uploadSegmentToController(File 
segmentFile) {
     SegmentCompletionProtocol.Response response;
+    long startTime = System.currentTimeMillis();
     try {
       String responseStr = _fileUploadDownloadClient
           .uploadSegment(_controllerSegmentUploadCommitUrl, _segmentName, 
segmentFile,
@@ -88,6 +98,10 @@ public class Server2ControllerSegmentUploader implements 
SegmentUploader {
       // hence unable to send {@link 
SegmentCompletionProtocol.ControllerResponseStatus.NOT_LEADER}
       // If cache is not invalidated, we will not recover from exceptions 
until the controller comes back up
       ControllerLeaderLocator.getInstance().invalidateCachedControllerLeader();
+    } finally {
+      long duration = System.currentTimeMillis() - startTime;
+      _serverMetrics.addTimedTableValue(_rawTableName, 
ServerTimer.SEGMENT_UPLOAD_TIME_MS, duration,
+          TimeUnit.MILLISECONDS);
     }
     
SegmentCompletionProtocolUtils.raiseSegmentCompletionProtocolResponseMetric(_serverMetrics,
 response);
     return response;
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/server/realtime/ServerSegmentCompletionProtocolHandler.java
 
b/pinot-core/src/main/java/org/apache/pinot/server/realtime/ServerSegmentCompletionProtocolHandler.java
index a7547e9ab1..95e8917f00 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/server/realtime/ServerSegmentCompletionProtocolHandler.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/server/realtime/ServerSegmentCompletionProtocolHandler.java
@@ -159,7 +159,7 @@ public class ServerSegmentCompletionProtocolHandler {
     try {
       segmentUploader =
           new Server2ControllerSegmentUploader(LOGGER, 
_fileUploadDownloadClient, url, params.getSegmentName(),
-              _segmentUploadRequestTimeoutMs, _serverMetrics, _authProvider);
+              _segmentUploadRequestTimeoutMs, _serverMetrics, _authProvider, 
_rawTableName);
     } catch (URISyntaxException e) {
       LOGGER.error("Segment commit upload url error: ", e);
       return SegmentCompletionProtocol.RESP_NOT_SENT;
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/PinotFSSegmentUploaderTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/PinotFSSegmentUploaderTest.java
index 62fa1a44ef..119b7cb7d1 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/PinotFSSegmentUploaderTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/PinotFSSegmentUploaderTest.java
@@ -28,11 +28,13 @@ import java.util.Map;
 import java.util.UUID;
 import org.apache.commons.io.FileUtils;
 import org.apache.pinot.common.exception.HttpErrorStatusException;
+import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.common.utils.LLCSegmentName;
 import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.spi.filesystem.BasePinotFS;
 import org.apache.pinot.spi.filesystem.PinotFSFactory;
 import org.apache.pinot.spi.utils.StringUtil;
+import org.mockito.Mockito;
 import org.testng.Assert;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
@@ -42,6 +44,7 @@ public class PinotFSSegmentUploaderTest {
   private static final int TIMEOUT_IN_MS = 100;
   private File _file;
   private LLCSegmentName _llcSegmentName;
+  private ServerMetrics _serverMetrics = Mockito.mock(ServerMetrics.class);
 
   @BeforeClass
   public void setUp()
@@ -61,7 +64,7 @@ public class PinotFSSegmentUploaderTest {
 
   @Test
   public void testSuccessfulUpload() {
-    SegmentUploader segmentUploader = new 
PinotFSSegmentUploader("hdfs://root", TIMEOUT_IN_MS);
+    SegmentUploader segmentUploader = new 
PinotFSSegmentUploader("hdfs://root", TIMEOUT_IN_MS, _serverMetrics);
     URI segmentURI = segmentUploader.uploadSegment(_file, _llcSegmentName);
     Assert.assertTrue(segmentURI.toString().startsWith(StringUtil
         .join(File.separator, "hdfs://root", _llcSegmentName.getTableName(), 
_llcSegmentName.getSegmentName())));
@@ -69,7 +72,7 @@ public class PinotFSSegmentUploaderTest {
 
   @Test
   public void testSegmentAlreadyExist() {
-    SegmentUploader segmentUploader = new 
PinotFSSegmentUploader("existing://root", TIMEOUT_IN_MS);
+    SegmentUploader segmentUploader = new 
PinotFSSegmentUploader("existing://root", TIMEOUT_IN_MS, _serverMetrics);
     URI segmentURI = segmentUploader.uploadSegment(_file, _llcSegmentName);
     Assert.assertTrue(segmentURI.toString().startsWith(StringUtil
         .join(File.separator, "existing://root", 
_llcSegmentName.getTableName(), _llcSegmentName.getSegmentName())));
@@ -77,14 +80,14 @@ public class PinotFSSegmentUploaderTest {
 
   @Test
   public void testUploadTimeOut() {
-    SegmentUploader segmentUploader = new 
PinotFSSegmentUploader("timeout://root", TIMEOUT_IN_MS);
+    SegmentUploader segmentUploader = new 
PinotFSSegmentUploader("timeout://root", TIMEOUT_IN_MS, _serverMetrics);
     URI segmentURI = segmentUploader.uploadSegment(_file, _llcSegmentName);
     Assert.assertNull(segmentURI);
   }
 
   @Test
   public void testNoSegmentStoreConfigured() {
-    SegmentUploader segmentUploader = new PinotFSSegmentUploader("", 
TIMEOUT_IN_MS);
+    SegmentUploader segmentUploader = new PinotFSSegmentUploader("", 
TIMEOUT_IN_MS, _serverMetrics);
     URI segmentURI = segmentUploader.uploadSegment(_file, _llcSegmentName);
     Assert.assertNull(segmentURI);
   }
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/Server2ControllerSegmentUploaderTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/Server2ControllerSegmentUploaderTest.java
index 86a7088d94..3dbcfa506c 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/Server2ControllerSegmentUploaderTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/Server2ControllerSegmentUploaderTest.java
@@ -91,7 +91,7 @@ public class Server2ControllerSegmentUploaderTest {
       throws URISyntaxException {
     Server2ControllerSegmentUploader uploader =
         new Server2ControllerSegmentUploader(_logger, 
_fileUploadDownloadClient, GOOD_CONTROLLER_VIP, "segmentName",
-            10000, mock(ServerMetrics.class), null);
+            10000, mock(ServerMetrics.class), null, 
_llcSegmentName.getTableName());
     URI segmentURI = uploader.uploadSegment(_file, _llcSegmentName);
     Assert.assertEquals(segmentURI.toString(), SEGMENT_LOCATION);
   }
@@ -101,7 +101,7 @@ public class Server2ControllerSegmentUploaderTest {
       throws URISyntaxException {
     Server2ControllerSegmentUploader uploader =
         new Server2ControllerSegmentUploader(_logger, 
_fileUploadDownloadClient, BAD_CONTROLLER_VIP, "segmentName",
-            10000, mock(ServerMetrics.class), null);
+            10000, mock(ServerMetrics.class), null, 
_llcSegmentName.getTableName());
     URI segmentURI = uploader.uploadSegment(_file, _llcSegmentName);
     Assert.assertNull(segmentURI);
   }
diff --git 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
index 6806d16c10..6bfcf09952 100644
--- 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
+++ 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
@@ -66,6 +66,7 @@ import org.apache.pinot.segment.spi.SegmentMetadata;
 import org.apache.pinot.segment.spi.loader.SegmentDirectoryLoader;
 import org.apache.pinot.segment.spi.loader.SegmentDirectoryLoaderContext;
 import org.apache.pinot.segment.spi.loader.SegmentDirectoryLoaderRegistry;
+import org.apache.pinot.server.realtime.ServerSegmentCompletionProtocolHandler;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.env.PinotConfiguration;
@@ -113,7 +114,7 @@ public class HelixInstanceDataManager implements 
InstanceDataManager {
     _helixManager = helixManager;
     _serverMetrics = serverMetrics;
     _segmentUploader = new 
PinotFSSegmentUploader(_instanceDataManagerConfig.getSegmentStoreUri(),
-        PinotFSSegmentUploader.DEFAULT_SEGMENT_UPLOAD_TIMEOUT_MILLIS);
+        
ServerSegmentCompletionProtocolHandler.getSegmentUploadRequestTimeoutMs(), 
_serverMetrics);
 
     _externalViewDroppedMaxWaitMs = 
_instanceDataManagerConfig.getExternalViewDroppedMaxWaitMs();
     _externalViewDroppedCheckInternalMs = 
_instanceDataManagerConfig.getExternalViewDroppedCheckIntervalMs();


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to