This is an automated email from the ASF dual-hosted git repository. snlee pushed a commit to branch sampling-query-log in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit 4a8b5503ab463006ad247f76b0d34007a12d214d Author: Seunghyun Lee <sn...@linkedin.com> AuthorDate: Tue Mar 5 20:58:48 2019 -0800 Adding the support for sampling logs For high throughput use cases, logging becomes the bottleneck on both server and broker. "Category.callAppenders()" is using a synchronized block that all the worker threads can be stalled due to logging. Long term solution should be migration towards log4j2. As a short term fix, this pr adds the configuration for sampling logs. --- .../requesthandler/BaseBrokerRequestHandler.java | 36 ++++++++++++++-------- .../apache/pinot/common/utils/CommonConstants.java | 2 ++ .../core/query/scheduler/PriorityScheduler.java | 8 +++-- .../pinot/core/query/scheduler/QueryScheduler.java | 31 ++++++++++++++----- .../query/scheduler/fcfs/BoundedFCFSScheduler.java | 9 +++--- .../query/scheduler/fcfs/FCFSQueryScheduler.java | 2 +- .../tokenbucket/TokenPriorityScheduler.java | 8 ++--- .../query/scheduler/PrioritySchedulerTest.java | 14 ++++----- .../request/ScheduledRequestHandlerTest.java | 10 +++--- 9 files changed, 76 insertions(+), 44 deletions(-) diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java index 47d478a..a1902fb 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java @@ -25,6 +25,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Random; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import javax.annotation.Nullable; @@ -65,6 +66,7 @@ import static org.apache.pinot.common.utils.CommonConstants.Broker.Request.TRACE public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler { private static final Logger LOGGER = LoggerFactory.getLogger(BaseBrokerRequestHandler.class); private static final Pql2Compiler REQUEST_COMPILER = new Pql2Compiler(); + private static final Random RANDOM = new Random(); protected final Configuration _config; protected final RoutingTable _routingTable; @@ -81,6 +83,7 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler { protected final long _brokerTimeoutMs; protected final int _queryResponseLimit; protected final int _queryLogLength; + protected final float _queryLogSamplingRate; public BaseBrokerRequestHandler(Configuration config, RoutingTable routingTable, TimeBoundaryService timeBoundaryService, AccessControlFactory accessControlFactory, @@ -96,9 +99,12 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler { _brokerTimeoutMs = config.getLong(CONFIG_OF_BROKER_TIMEOUT_MS, DEFAULT_BROKER_TIMEOUT_MS); _queryResponseLimit = config.getInt(CONFIG_OF_BROKER_QUERY_RESPONSE_LIMIT, DEFAULT_BROKER_QUERY_RESPONSE_LIMIT); _queryLogLength = config.getInt(CONFIG_OF_BROKER_QUERY_LOG_LENGTH, DEFAULT_BROKER_QUERY_LOG_LENGTH); + _queryLogSamplingRate = + config.getFloat(CONFIG_OF_BROKER_QUERY_LOG_SAMPLING_RATE, DEFAULT_BROKER_QUERY_LOG_SAMPLING_RATE); - LOGGER.info("Broker Id: {}, timeout: {}ms, query response limit: {}, query log length: {}", _brokerId, - _brokerTimeoutMs, _queryResponseLimit, _queryLogLength); + LOGGER.info( + "Broker Id: {}, timeout: {}ms, query response limit: {}, query log length: {}, query log sampling rate: {}", + _brokerId, _brokerTimeoutMs, _queryResponseLimit, _queryLogLength, _queryLogSamplingRate); } private String getDefaultBrokerId() { @@ -292,17 +298,21 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler { LOGGER.debug("Broker Response: {}", brokerResponse); - // Table name might have been changed (with suffix _OFFLINE/_REALTIME appended) - LOGGER.info( - "RequestId:{}, table:{}, timeMs:{}, docs:{}/{}, entries:{}/{}, segments(queried/processed/matched):{}/{}/{} " - + "servers:{}/{}, groupLimitReached:{}, exceptions:{}, serverStats:{}, query:{}", requestId, - brokerRequest.getQuerySource().getTableName(), totalTimeMs, brokerResponse.getNumDocsScanned(), - brokerResponse.getTotalDocs(), brokerResponse.getNumEntriesScannedInFilter(), - brokerResponse.getNumEntriesScannedPostFilter(), brokerResponse.getNumSegmentsQueried(), - brokerResponse.getNumSegmentsProcessed(), brokerResponse.getNumSegmentsMatched(), - brokerResponse.getNumServersResponded(), brokerResponse.getNumServersQueried(), - brokerResponse.isNumGroupsLimitReached(), brokerResponse.getExceptionsSize(), serverStats.getServerStats(), - StringUtils.substring(query, 0, _queryLogLength)); + // Sampling the query log based on the sampling rate configuration + if(_queryLogSamplingRate > RANDOM.nextFloat()) { + // Table name might have been changed (with suffix _OFFLINE/_REALTIME appended) + LOGGER.info( + "RequestId:{}, table:{}, timeMs:{}, docs:{}/{}, entries:{}/{}, segments(queried/processed/matched):{}/{}/{} " + + "servers:{}/{}, groupLimitReached:{}, exceptions:{}, serverStats:{}, query:{}", requestId, + brokerRequest.getQuerySource().getTableName(), totalTimeMs, brokerResponse.getNumDocsScanned(), + brokerResponse.getTotalDocs(), brokerResponse.getNumEntriesScannedInFilter(), + brokerResponse.getNumEntriesScannedPostFilter(), brokerResponse.getNumSegmentsQueried(), + brokerResponse.getNumSegmentsProcessed(), brokerResponse.getNumSegmentsMatched(), + brokerResponse.getNumServersResponded(), brokerResponse.getNumServersQueried(), + brokerResponse.isNumGroupsLimitReached(), brokerResponse.getExceptionsSize(), serverStats.getServerStats(), + StringUtils.substring(query, 0, _queryLogLength)); + } + return brokerResponse; } diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java index 0e6f5a5..cf5ec95 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java @@ -111,6 +111,8 @@ public class CommonConstants { public static final int DEFAULT_BROKER_QUERY_RESPONSE_LIMIT = Integer.MAX_VALUE; public static final String CONFIG_OF_BROKER_QUERY_LOG_LENGTH = "pinot.broker.query.log.length"; public static final int DEFAULT_BROKER_QUERY_LOG_LENGTH = Integer.MAX_VALUE; + public static final String CONFIG_OF_BROKER_QUERY_LOG_SAMPLING_RATE = "pinot.broker.query.log.samplingRate"; + public static final float DEFAULT_BROKER_QUERY_LOG_SAMPLING_RATE = 1.0f; public static final String CONFIG_OF_BROKER_TIMEOUT_MS = "pinot.broker.timeoutMs"; public static final long DEFAULT_BROKER_TIMEOUT_MS = 10_000L; public static final String CONFIG_OF_BROKER_ID = "pinot.broker.id"; diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/PriorityScheduler.java b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/PriorityScheduler.java index afb2831..5f968b8 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/PriorityScheduler.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/PriorityScheduler.java @@ -27,6 +27,7 @@ import java.util.List; import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.LongAccumulator; import javax.annotation.Nonnull; +import org.apache.commons.configuration.Configuration; import org.apache.pinot.common.exception.QueryException; import org.apache.pinot.common.metrics.ServerMeter; import org.apache.pinot.common.metrics.ServerMetrics; @@ -53,9 +54,10 @@ public abstract class PriorityScheduler extends QueryScheduler { @VisibleForTesting Thread scheduler; - public PriorityScheduler(@Nonnull ResourceManager resourceManager, @Nonnull QueryExecutor queryExecutor, - @Nonnull SchedulerPriorityQueue queue, @Nonnull ServerMetrics metrics, @Nonnull LongAccumulator latestQueryTime) { - super(queryExecutor, resourceManager, metrics, latestQueryTime); + public PriorityScheduler(@Nonnull Configuration config, @Nonnull ResourceManager resourceManager, + @Nonnull QueryExecutor queryExecutor, @Nonnull SchedulerPriorityQueue queue, @Nonnull ServerMetrics metrics, + @Nonnull LongAccumulator latestQueryTime) { + super(config, queryExecutor, resourceManager, metrics, latestQueryTime); Preconditions.checkNotNull(queue); this.queryQueue = queue; this.numRunners = resourceManager.getNumQueryRunnerThreads(); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java index 82d2b5d..99cf31e 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java @@ -23,10 +23,12 @@ import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListenableFutureTask; import java.util.Map; +import java.util.Random; import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.LongAccumulator; import javax.annotation.Nonnull; import javax.annotation.Nullable; +import org.apache.commons.configuration.Configuration; import org.apache.pinot.common.exception.QueryException; import org.apache.pinot.common.metrics.ServerMeter; import org.apache.pinot.common.metrics.ServerMetrics; @@ -48,9 +50,14 @@ import org.slf4j.LoggerFactory; */ public abstract class QueryScheduler { private static final Logger LOGGER = LoggerFactory.getLogger(QueryScheduler.class); + private static final Random RANDOM = new Random(); + private static final String INVALID_NUM_SCANNED = "-1"; private static final String INVALID_SEGMENTS_COUNT = "-1"; + private static final String QUERY_LOG_SAMPLING_RATE_KEY = "query_log_sampling_rate"; + private static final float DEFAULT_QUERY_LOG_SAMPLING_RATE = 1.0f; + private final float queryLogSamplingRate; protected final ServerMetrics serverMetrics; protected final QueryExecutor queryExecutor; protected final ResourceManager resourceManager; @@ -63,8 +70,9 @@ public abstract class QueryScheduler { * @param resourceManager for managing server thread resources * @param serverMetrics server metrics collector */ - public QueryScheduler(@Nonnull QueryExecutor queryExecutor, @Nonnull ResourceManager resourceManager, + public QueryScheduler(@Nonnull Configuration config, @Nonnull QueryExecutor queryExecutor, @Nonnull ResourceManager resourceManager, @Nonnull ServerMetrics serverMetrics, @Nonnull LongAccumulator latestQueryTime) { + Preconditions.checkNotNull(config); Preconditions.checkNotNull(queryExecutor); Preconditions.checkNotNull(resourceManager); Preconditions.checkNotNull(serverMetrics); @@ -73,6 +81,9 @@ public abstract class QueryScheduler { this.resourceManager = resourceManager; this.queryExecutor = queryExecutor; this.latestQueryTime = latestQueryTime; + this.queryLogSamplingRate = config.getFloat(QUERY_LOG_SAMPLING_RATE_KEY, DEFAULT_QUERY_LOG_SAMPLING_RATE); + + LOGGER.info("Query log sampling rate: {}", queryLogSamplingRate); } /** @@ -170,13 +181,17 @@ public abstract class QueryScheduler { TimerContext timerContext = queryRequest.getTimerContext(); int numSegmentsQueried = queryRequest.getSegmentsToQuery().size(); - LOGGER.info( - "Processed requestId={},table={},segments(queried/processed/matched)={}/{}/{},schedulerWaitMs={},totalExecMs={},totalTimeMs={},broker={},numDocsScanned={},scanInFilter={},scanPostFilter={},sched={}", - requestId, tableNameWithType, numSegmentsQueried, numSegmentsProcessed, numSegmentsMatched, - timerContext.getPhaseDurationMs(ServerQueryPhase.SCHEDULER_WAIT), - timerContext.getPhaseDurationMs(ServerQueryPhase.QUERY_PROCESSING), - timerContext.getPhaseDurationMs(ServerQueryPhase.TOTAL_QUERY_TIME), queryRequest.getBrokerId(), numDocsScanned, - numEntriesScannedInFilter, numEntriesScannedPostFilter, name()); + + // Sampling the query log based on the sampling rate configuration + if (queryLogSamplingRate > RANDOM.nextFloat()) { + LOGGER.info( + "Processed requestId={},table={},segments(queried/processed/matched)={}/{}/{},schedulerWaitMs={},totalExecMs={},totalTimeMs={},broker={},numDocsScanned={},scanInFilter={},scanPostFilter={},sched={}", + requestId, tableNameWithType, numSegmentsQueried, numSegmentsProcessed, numSegmentsMatched, + timerContext.getPhaseDurationMs(ServerQueryPhase.SCHEDULER_WAIT), + timerContext.getPhaseDurationMs(ServerQueryPhase.QUERY_PROCESSING), + timerContext.getPhaseDurationMs(ServerQueryPhase.TOTAL_QUERY_TIME), queryRequest.getBrokerId(), numDocsScanned, + numEntriesScannedInFilter, numEntriesScannedPostFilter, name()); + } serverMetrics.addMeteredTableValue(tableNameWithType, ServerMeter.NUM_SEGMENTS_QUERIED, numSegmentsQueried); serverMetrics.addMeteredTableValue(tableNameWithType, ServerMeter.NUM_SEGMENTS_PROCESSED, numSegmentsProcessed); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/fcfs/BoundedFCFSScheduler.java b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/fcfs/BoundedFCFSScheduler.java index 8a425b1..301fb06 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/fcfs/BoundedFCFSScheduler.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/fcfs/BoundedFCFSScheduler.java @@ -53,12 +53,13 @@ public class BoundedFCFSScheduler extends PriorityScheduler { } }; MultiLevelPriorityQueue queue = new MultiLevelPriorityQueue(config, rm, groupFactory, new TableBasedGroupMapper()); - return new BoundedFCFSScheduler(rm, queryExecutor, queue, serverMetrics, latestQueryTime); + return new BoundedFCFSScheduler(config, rm, queryExecutor, queue, serverMetrics, latestQueryTime); } - private BoundedFCFSScheduler(@Nonnull ResourceManager resourceManager, @Nonnull QueryExecutor queryExecutor, - @Nonnull SchedulerPriorityQueue queue, @Nonnull ServerMetrics metrics, @Nonnull LongAccumulator latestQueryTime) { - super(resourceManager, queryExecutor, queue, metrics, latestQueryTime); + private BoundedFCFSScheduler(@Nonnull Configuration config, @Nonnull ResourceManager resourceManager, + @Nonnull QueryExecutor queryExecutor, @Nonnull SchedulerPriorityQueue queue, @Nonnull ServerMetrics metrics, + @Nonnull LongAccumulator latestQueryTime) { + super(config, resourceManager, queryExecutor, queue, metrics, latestQueryTime); } @Override diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/fcfs/FCFSQueryScheduler.java b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/fcfs/FCFSQueryScheduler.java index 95004c2..92016d6 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/fcfs/FCFSQueryScheduler.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/fcfs/FCFSQueryScheduler.java @@ -42,7 +42,7 @@ public class FCFSQueryScheduler extends QueryScheduler { public FCFSQueryScheduler(@Nonnull Configuration config, @Nonnull QueryExecutor queryExecutor, @Nonnull ServerMetrics serverMetrics, @Nonnull LongAccumulator latestQueryTime) { - super(queryExecutor, new UnboundedResourceManager(config), serverMetrics, latestQueryTime); + super(config, queryExecutor, new UnboundedResourceManager(config), serverMetrics, latestQueryTime); } @Nonnull diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/tokenbucket/TokenPriorityScheduler.java b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/tokenbucket/TokenPriorityScheduler.java index fdc4670..7a28bf2 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/tokenbucket/TokenPriorityScheduler.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/tokenbucket/TokenPriorityScheduler.java @@ -59,13 +59,13 @@ public class TokenPriorityScheduler extends PriorityScheduler { }; MultiLevelPriorityQueue queue = new MultiLevelPriorityQueue(config, rm, groupFactory, new TableBasedGroupMapper()); - return new TokenPriorityScheduler(rm, queryExecutor, queue, metrics, latestQueryTime); + return new TokenPriorityScheduler(config, rm, queryExecutor, queue, metrics, latestQueryTime); } - private TokenPriorityScheduler(@Nonnull ResourceManager resourceManager, @Nonnull QueryExecutor queryExecutor, - @Nonnull MultiLevelPriorityQueue queue, @Nonnull ServerMetrics metrics, + private TokenPriorityScheduler(@Nonnull Configuration config, @Nonnull ResourceManager resourceManager, + @Nonnull QueryExecutor queryExecutor, @Nonnull MultiLevelPriorityQueue queue, @Nonnull ServerMetrics metrics, @Nonnull LongAccumulator latestQueryTime) { - super(resourceManager, queryExecutor, queue, metrics, latestQueryTime); + super(config, resourceManager, queryExecutor, queue, metrics, latestQueryTime); } @Override diff --git a/pinot-core/src/test/java/org/apache/pinot/core/query/scheduler/PrioritySchedulerTest.java b/pinot-core/src/test/java/org/apache/pinot/core/query/scheduler/PrioritySchedulerTest.java index 1bdb81b..06611c2 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/query/scheduler/PrioritySchedulerTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/query/scheduler/PrioritySchedulerTest.java @@ -231,13 +231,13 @@ public class PrioritySchedulerTest { static TestSchedulerGroupFactory groupFactory; static LongAccumulator latestQueryTime; - public static TestPriorityScheduler create(Configuration conf) { - ResourceManager rm = new PolicyBasedResourceManager(conf); + public static TestPriorityScheduler create(Configuration config) { + ResourceManager rm = new PolicyBasedResourceManager(config); QueryExecutor qe = new TestQueryExecutor(); groupFactory = new TestSchedulerGroupFactory(); - MultiLevelPriorityQueue queue = new MultiLevelPriorityQueue(conf, rm, groupFactory, new TableBasedGroupMapper()); + MultiLevelPriorityQueue queue = new MultiLevelPriorityQueue(config, rm, groupFactory, new TableBasedGroupMapper()); latestQueryTime = new LongAccumulator(Long::max, 0); - return new TestPriorityScheduler(rm, qe, queue, metrics, latestQueryTime); + return new TestPriorityScheduler(config, rm, qe, queue, metrics, latestQueryTime); } public static TestPriorityScheduler create() { @@ -246,10 +246,10 @@ public class PrioritySchedulerTest { } // store locally for easy access - public TestPriorityScheduler(@Nonnull ResourceManager resourceManager, @Nonnull QueryExecutor queryExecutor, - @Nonnull SchedulerPriorityQueue queue, @Nonnull ServerMetrics metrics, + public TestPriorityScheduler(@Nonnull Configuration config, @Nonnull ResourceManager resourceManager, + @Nonnull QueryExecutor queryExecutor, @Nonnull SchedulerPriorityQueue queue, @Nonnull ServerMetrics metrics, @Nonnull LongAccumulator latestQueryTime) { - super(resourceManager, queryExecutor, queue, metrics, latestQueryTime); + super(config, resourceManager, queryExecutor, queue, metrics, latestQueryTime); } ResourceManager getResourceManager() { diff --git a/pinot-server/src/test/java/org/apache/pinot/server/request/ScheduledRequestHandlerTest.java b/pinot-server/src/test/java/org/apache/pinot/server/request/ScheduledRequestHandlerTest.java index 48578f5..00db1bd 100644 --- a/pinot-server/src/test/java/org/apache/pinot/server/request/ScheduledRequestHandlerTest.java +++ b/pinot-server/src/test/java/org/apache/pinot/server/request/ScheduledRequestHandlerTest.java @@ -34,6 +34,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.LongAccumulator; import javax.annotation.Nonnull; +import org.apache.commons.configuration.Configuration; import org.apache.commons.configuration.PropertiesConfiguration; import org.apache.pinot.common.exception.QueryException; import org.apache.pinot.common.metrics.ServerMetrics; @@ -65,6 +66,7 @@ import static org.mockito.Mockito.when; public class ScheduledRequestHandlerTest { private static final BrokerRequest DUMMY_BROKER_REQUEST = new Pql2Compiler().compileToBrokerRequest("SELECT * FROM myTable_OFFLINE"); + private static final Configuration DEFAULT_SCHEDULER_CONFIG = new PropertiesConfiguration(); private ServerMetrics serverMetrics; private ChannelHandlerContext channelHandlerContext; @@ -119,8 +121,8 @@ public class ScheduledRequestHandlerTest { @Test public void testQueryProcessingException() throws Exception { - ScheduledRequestHandler handler = - new ScheduledRequestHandler(new QueryScheduler(queryExecutor, resourceManager, serverMetrics, latestQueryTime) { + ScheduledRequestHandler handler = new ScheduledRequestHandler( + new QueryScheduler(DEFAULT_SCHEDULER_CONFIG, queryExecutor, resourceManager, serverMetrics, latestQueryTime) { @Nonnull @Override public ListenableFuture<byte[]> submit(@Nonnull ServerQueryRequest queryRequest) { @@ -159,8 +161,8 @@ public class ScheduledRequestHandlerTest { @Test public void testValidQueryResponse() throws InterruptedException, ExecutionException, TimeoutException, IOException { - ScheduledRequestHandler handler = - new ScheduledRequestHandler(new QueryScheduler(queryExecutor, resourceManager, serverMetrics, latestQueryTime) { + ScheduledRequestHandler handler = new ScheduledRequestHandler( + new QueryScheduler(DEFAULT_SCHEDULER_CONFIG, queryExecutor, resourceManager, serverMetrics, latestQueryTime) { @Nonnull @Override public ListenableFuture<byte[]> submit(@Nonnull ServerQueryRequest queryRequest) { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org