[GitHub] [incubator-pinot] mcvsubbu commented on a change in pull request #4993: Support Text column type in Pinot (both offline and realtime)
mcvsubbu commented on a change in pull request #4993: Support Text column type in Pinot (both offline and realtime) URL: https://github.com/apache/incubator-pinot/pull/4993#discussion_r368334405 ## File path: pinot-core/src/main/java/org/apache/pinot/core/common/predicate/TextMatchPredicate.java ## @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.common.predicate; + +import com.google.common.base.Preconditions; +import java.util.Arrays; +import java.util.List; +import org.apache.pinot.core.common.Predicate; + + +public class TextMatchPredicate extends Predicate { + String _searchQuery; + + public TextMatchPredicate(String lhs, List rhs) { +super(lhs, Predicate.Type.TEXT_MATCH, rhs); +Preconditions.checkArgument(rhs.size() == 1); Review comment: Is there a compilation check that this is validated? If not, then throwing a runtime error on a query that is invalid in the first place seems to be wrong. We will end up returning 5xx instead of 4xx on the query, right? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org
[GitHub] [incubator-pinot] mcvsubbu commented on a change in pull request #4993: Support Text column type in Pinot (both offline and realtime)
mcvsubbu commented on a change in pull request #4993: Support Text column type in Pinot (both offline and realtime) URL: https://github.com/apache/incubator-pinot/pull/4993#discussion_r368336179 ## File path: pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/invertedindex/RealtimeLuceneTextIndexReader.java ## @@ -0,0 +1,117 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.realtime.impl.invertedindex; + +import java.io.File; +import org.apache.lucene.analysis.standard.StandardAnalyzer; +import org.apache.lucene.index.IndexWriter; +import org.apache.lucene.queryparser.classic.QueryParser; +import org.apache.lucene.search.Collector; +import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.Query; +import org.apache.lucene.search.SearcherManager; +import org.apache.pinot.core.common.Predicate; +import org.apache.pinot.core.common.predicate.TextMatchPredicate; +import org.apache.pinot.core.segment.creator.impl.inv.text.LuceneTextIndexCreator; +import org.apache.pinot.core.segment.index.readers.InvertedIndexReader; +import org.apache.pinot.core.segment.index.readers.text.LuceneDocIdCollector; +import org.apache.pinot.core.segment.index.readers.text.LuceneIndexSearcherReferenceManager; +import org.apache.pinot.core.segment.index.readers.text.LuceneTextIndexReader; +import org.roaringbitmap.buffer.MutableRoaringBitmap; + + +/** + * Lucene text index reader supporting near realtime search. An instance of this + * is created per consuming segment by {@link org.apache.pinot.core.indexsegment.mutable.MutableSegmentImpl}. + * Internally it uses {@link LuceneTextIndexCreator} for adding documents to the lucene index + * as and when they are indexed by the consuming segment. + */ +public class RealtimeLuceneTextIndexReader implements InvertedIndexReader { + private final QueryParser _queryParser; + private final LuceneTextIndexCreator _indexCreator; + private volatile SearcherManager _searcherManager; Review comment: why is this volatile? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org
[GitHub] [incubator-pinot] mcvsubbu commented on a change in pull request #4993: Support Text column type in Pinot (both offline and realtime)
mcvsubbu commented on a change in pull request #4993: Support Text column type in Pinot (both offline and realtime) URL: https://github.com/apache/incubator-pinot/pull/4993#discussion_r368335582 ## File path: pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/invertedindex/RealtimeLuceneIndexReaderRefreshTask.java ## @@ -0,0 +1,106 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.realtime.impl.invertedindex; + +import java.util.List; +import java.util.concurrent.ConcurrentLinkedQueue; +import org.apache.lucene.search.SearcherManager; +import org.apache.pinot.core.indexsegment.mutable.MutableSegmentImpl; +import org.apache.pinot.core.segment.creator.impl.SegmentColumnarIndexCreator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Background thread to refresh the realtime lucene index readers for supporting + * near-realtime text search. The task maintains a queue of realtime segments. + * This queue is global (across all realtime segments of all realtime/hybrid tables). + * + * Each element in the queue is of type + * {@link org.apache.pinot.core.indexsegment.mutable.MutableSegmentImpl.RealtimeLuceneReadersForRealtimeSegment}. + * It encapsulate a lock and all the realtime lucene readers for the particular realtime segment. + * Since text index is also create on a per column basis, there will be as many realtime lucene + * readers as the number of columns with text search enabled. + * + * Between each successive execution of the task, there is a fixed delay (regardless of how long + * each execution took). When the task wakes up, it pick the RealtimeLuceneReadersForRealtimeSegment + * from the head of queue, refresh it's readers and adds this at the tail of queue. + */ +public class RealtimeLuceneIndexReaderRefreshTask implements Runnable { Review comment: Suggest rename to `RealtimeLuceneIndexReaderRefreshThread` since "task" has a different meaning in pinot (see minion) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org
[GitHub] [incubator-pinot] mcvsubbu commented on a change in pull request #4993: Support Text column type in Pinot (both offline and realtime)
mcvsubbu commented on a change in pull request #4993: Support Text column type in Pinot (both offline and realtime) URL: https://github.com/apache/incubator-pinot/pull/4993#discussion_r368334759 ## File path: pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImpl.java ## @@ -118,6 +131,54 @@ private volatile long _lastIndexedTimeMs = Long.MIN_VALUE; private volatile long _latestIngestionTimeMs = Long.MIN_VALUE; + private static final ScheduledExecutorService _scheduledExecutorService; + private static final ConcurrentLinkedQueue _luceneRealtimeReaders; + private static final ConcurrentHashMap _segmentToRealtimeLuceneReadersMap; + + static { Review comment: This code should go into the segment data manager instead of a static here. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org
[GitHub] [incubator-pinot] mcvsubbu commented on a change in pull request #4993: Support Text column type in Pinot (both offline and realtime)
mcvsubbu commented on a change in pull request #4993: Support Text column type in Pinot (both offline and realtime) URL: https://github.com/apache/incubator-pinot/pull/4993#discussion_r368335707 ## File path: pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/invertedindex/RealtimeLuceneIndexReaderRefreshTask.java ## @@ -0,0 +1,106 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.realtime.impl.invertedindex; + +import java.util.List; +import java.util.concurrent.ConcurrentLinkedQueue; +import org.apache.lucene.search.SearcherManager; +import org.apache.pinot.core.indexsegment.mutable.MutableSegmentImpl; +import org.apache.pinot.core.segment.creator.impl.SegmentColumnarIndexCreator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Background thread to refresh the realtime lucene index readers for supporting + * near-realtime text search. The task maintains a queue of realtime segments. + * This queue is global (across all realtime segments of all realtime/hybrid tables). + * + * Each element in the queue is of type + * {@link org.apache.pinot.core.indexsegment.mutable.MutableSegmentImpl.RealtimeLuceneReadersForRealtimeSegment}. + * It encapsulate a lock and all the realtime lucene readers for the particular realtime segment. + * Since text index is also create on a per column basis, there will be as many realtime lucene + * readers as the number of columns with text search enabled. + * + * Between each successive execution of the task, there is a fixed delay (regardless of how long + * each execution took). When the task wakes up, it pick the RealtimeLuceneReadersForRealtimeSegment + * from the head of queue, refresh it's readers and adds this at the tail of queue. + */ +public class RealtimeLuceneIndexReaderRefreshTask implements Runnable { + private static final Logger LOGGER = LoggerFactory.getLogger(SegmentColumnarIndexCreator.class); + public static final int INITIAL_DELAY_MS_DEFAULT = 1000; + // TODO: make this configurable and choose a higher default value + public static final int DELAY_BETWEEN_SUCCESSIVE_EXECUTION_MS_DEFAULT = 10; Review comment: default should not be this value. In fact, the default should be to disable the task. Ideally, the thread should be scheduled only when there is at least one text column added This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org
[GitHub] [incubator-pinot] mcvsubbu commented on a change in pull request #4993: Support Text column type in Pinot (both offline and realtime)
mcvsubbu commented on a change in pull request #4993: Support Text column type in Pinot (both offline and realtime) URL: https://github.com/apache/incubator-pinot/pull/4993#discussion_r368335026 ## File path: pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImpl.java ## @@ -118,6 +131,54 @@ private volatile long _lastIndexedTimeMs = Long.MIN_VALUE; private volatile long _latestIngestionTimeMs = Long.MIN_VALUE; + private static final ScheduledExecutorService _scheduledExecutorService; + private static final ConcurrentLinkedQueue _luceneRealtimeReaders; + private static final ConcurrentHashMap _segmentToRealtimeLuceneReadersMap; + + static { +_scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(); +_luceneRealtimeReaders = new ConcurrentLinkedQueue<>(); +_segmentToRealtimeLuceneReadersMap = new ConcurrentHashMap<>(); +_scheduledExecutorService.scheduleWithFixedDelay(new RealtimeLuceneIndexReaderRefreshTask(_luceneRealtimeReaders), +RealtimeLuceneIndexReaderRefreshTask.INITIAL_DELAY_MS_DEFAULT, RealtimeLuceneIndexReaderRefreshTask.DELAY_BETWEEN_SUCCESSIVE_EXECUTION_MS_DEFAULT, TimeUnit.MILLISECONDS); + } + + /** + * Since the text index is maintained per TEXT column (similar to other Pinot indexes), + * there could be multiple lucene indexes for a given segment and therefore there can be + * multiple realtime lucene readers (one for each index/column) for the particular + * realtime segment. + */ + public static class RealtimeLuceneReadersForRealtimeSegment { +private final String segmentName; +private final Lock lock; +private volatile boolean segmentAboutToBeDestroyed; +private final List realtimeLuceneReaders; Review comment: I guess we need to handle the case when a text column is added to a consuming segment and a reload is issued. See PR 4954 in the brew. Lists are not thread-safe, but it depends on how we handle this. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org
[GitHub] [incubator-pinot] mcvsubbu commented on a change in pull request #4990: Enhance TableRebalancer to be able to rebalance table under any condition
mcvsubbu commented on a change in pull request #4990: Enhance TableRebalancer to be able to rebalance table under any condition URL: https://github.com/apache/incubator-pinot/pull/4990#discussion_r368332007 ## File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java ## @@ -175,67 +166,127 @@ public RebalanceResult rebalance(TableConfig tableConfig, Configuration rebalanc if (currentAssignment.equals(targetAssignment)) { LOGGER.info("Table: {} is already balanced", tableNameWithType); if (reassignInstances) { -return new RebalanceResult(RebalanceResult.Status.DONE, "Instance reassigned, table is already balanced", -instancePartitionsMap, targetAssignment); +if (dryRun) { + return new RebalanceResult(RebalanceResult.Status.DONE, + "Instance reassigned in dry-run mode, table is already balanced", instancePartitionsMap, + targetAssignment); +} else { + return new RebalanceResult(RebalanceResult.Status.DONE, "Instance reassigned, table is already balanced", + instancePartitionsMap, targetAssignment); +} } else { return new RebalanceResult(RebalanceResult.Status.NO_OP, "Table is already balanced", instancePartitionsMap, targetAssignment); } } if (dryRun) { - LOGGER.info("Rebalance table: {} in dry-run mode, returning the target assignment", tableNameWithType); + LOGGER.info("Rebalancing table: {} in dry-run mode, returning the target assignment", tableNameWithType); return new RebalanceResult(RebalanceResult.Status.DONE, "Dry-run mode", instancePartitionsMap, targetAssignment); } -int minAvailableReplicas; -if (rebalanceConfig.getBoolean(RebalanceConfigConstants.DOWNTIME, RebalanceConfigConstants.DEFAULT_DOWNTIME)) { - minAvailableReplicas = 0; +if (!downtime && !currentIdealState.isEnabled()) { + LOGGER.warn("Table: {} is disabled, rebalancing it with downtime", tableNameWithType); Review comment: (1) If the table is disabled, it is already down, so maybe just say that we are moving segment assignment? (2) Why is this a warning? it can be an info. (3) We should return this information in the status as well. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org
[GitHub] [incubator-pinot] mcvsubbu commented on a change in pull request #4990: Enhance TableRebalancer to be able to rebalance table under any condition
mcvsubbu commented on a change in pull request #4990: Enhance TableRebalancer to be able to rebalance table under any condition URL: https://github.com/apache/incubator-pinot/pull/4990#discussion_r368332729 ## File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java ## @@ -175,67 +166,127 @@ public RebalanceResult rebalance(TableConfig tableConfig, Configuration rebalanc if (currentAssignment.equals(targetAssignment)) { LOGGER.info("Table: {} is already balanced", tableNameWithType); if (reassignInstances) { -return new RebalanceResult(RebalanceResult.Status.DONE, "Instance reassigned, table is already balanced", -instancePartitionsMap, targetAssignment); +if (dryRun) { + return new RebalanceResult(RebalanceResult.Status.DONE, + "Instance reassigned in dry-run mode, table is already balanced", instancePartitionsMap, + targetAssignment); +} else { + return new RebalanceResult(RebalanceResult.Status.DONE, "Instance reassigned, table is already balanced", + instancePartitionsMap, targetAssignment); +} } else { return new RebalanceResult(RebalanceResult.Status.NO_OP, "Table is already balanced", instancePartitionsMap, targetAssignment); } } if (dryRun) { - LOGGER.info("Rebalance table: {} in dry-run mode, returning the target assignment", tableNameWithType); + LOGGER.info("Rebalancing table: {} in dry-run mode, returning the target assignment", tableNameWithType); return new RebalanceResult(RebalanceResult.Status.DONE, "Dry-run mode", instancePartitionsMap, targetAssignment); } -int minAvailableReplicas; -if (rebalanceConfig.getBoolean(RebalanceConfigConstants.DOWNTIME, RebalanceConfigConstants.DEFAULT_DOWNTIME)) { - minAvailableReplicas = 0; +if (!downtime && !currentIdealState.isEnabled()) { + LOGGER.warn("Table: {} is disabled, rebalancing it with downtime", tableNameWithType); + downtime = true; +} + +if (downtime) { LOGGER.info("Rebalancing table: {} with downtime", tableNameWithType); -} else { - minAvailableReplicas = rebalanceConfig.getInt(RebalanceConfigConstants.MIN_REPLICAS_TO_KEEP_UP_FOR_NO_DOWNTIME, - RebalanceConfigConstants.DEFAULT_MIN_REPLICAS_TO_KEEP_UP_FOR_NO_DOWNTIME); - int numCurrentReplicas = currentAssignment.values().iterator().next().size(); - int numTargetReplicas = targetAssignment.values().iterator().next().size(); - // Use the smaller one to determine the min available replicas - int numReplicas = Math.min(numCurrentReplicas, numTargetReplicas); - if (minAvailableReplicas > 0) { -if (minAvailableReplicas >= numReplicas) { - LOGGER.warn( - "Illegal config for min available replicas: {} for table: {}, must be less than number of replicas (current: {}, target: {})", - minAvailableReplicas, tableNameWithType, numCurrentReplicas, numTargetReplicas); - return new RebalanceResult(RebalanceResult.Status.FAILED, "Illegal min available replicas config", + + while (true) { +// Reuse current IdealState to update the IdealState in cluster +ZNRecord idealStateRecord = currentIdealState.getRecord(); +idealStateRecord.setMapFields(targetAssignment); +currentIdealState.setNumPartitions(targetAssignment.size()); + currentIdealState.setReplicas(Integer.toString(targetAssignment.values().iterator().next().size())); + +// Check version and update IdealState +try { + Preconditions.checkState(_helixDataAccessor.getBaseDataAccessor() + .set(idealStatePropertyKey.getPath(), idealStateRecord, idealStateRecord.getVersion(), + AccessOption.PERSISTENT), "Failed to update IdealState"); + LOGGER.info("Finished rebalancing table: {} in {}ms.", tableNameWithType, + System.currentTimeMillis() - startTimeMs); + return new RebalanceResult(RebalanceResult.Status.DONE, "Success", instancePartitionsMap, targetAssignment); +} catch (ZkBadVersionException e) { + LOGGER.info("IdealState version changed for table: {}, re-calculating the target assignment", + tableNameWithType); + try { +IdealState idealState = _helixDataAccessor.getProperty(idealStatePropertyKey); +// IdealState might be null if table got deleted, throwing exception to abort the rebalance +Preconditions.checkState(idealState != null, "Failed to find the IdealState"); +currentIdealState = idealState; +currentAssignment = currentIdealState.getRecord().getMapFields(); +targetAssignment = +segmentAssignment.rebalanceTable(currentAssignment, instancePartitionsMap, rebalanceConfig);
[GitHub] [incubator-pinot] mcvsubbu commented on a change in pull request #4990: Enhance TableRebalancer to be able to rebalance table under any condition
mcvsubbu commented on a change in pull request #4990: Enhance TableRebalancer to be able to rebalance table under any condition URL: https://github.com/apache/incubator-pinot/pull/4990#discussion_r368331774 ## File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java ## @@ -175,67 +166,127 @@ public RebalanceResult rebalance(TableConfig tableConfig, Configuration rebalanc if (currentAssignment.equals(targetAssignment)) { LOGGER.info("Table: {} is already balanced", tableNameWithType); if (reassignInstances) { -return new RebalanceResult(RebalanceResult.Status.DONE, "Instance reassigned, table is already balanced", -instancePartitionsMap, targetAssignment); +if (dryRun) { + return new RebalanceResult(RebalanceResult.Status.DONE, + "Instance reassigned in dry-run mode, table is already balanced", instancePartitionsMap, + targetAssignment); +} else { + return new RebalanceResult(RebalanceResult.Status.DONE, "Instance reassigned, table is already balanced", + instancePartitionsMap, targetAssignment); +} } else { return new RebalanceResult(RebalanceResult.Status.NO_OP, "Table is already balanced", instancePartitionsMap, targetAssignment); } } if (dryRun) { - LOGGER.info("Rebalance table: {} in dry-run mode, returning the target assignment", tableNameWithType); + LOGGER.info("Rebalancing table: {} in dry-run mode, returning the target assignment", tableNameWithType); Review comment: ```suggestion LOGGER.info("Rebalancing table {} in dry-run mode, returning the target assignment", tableNameWithType); ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org
[GitHub] [incubator-pinot] mcvsubbu commented on a change in pull request #4990: Enhance TableRebalancer to be able to rebalance table under any condition
mcvsubbu commented on a change in pull request #4990: Enhance TableRebalancer to be able to rebalance table under any condition URL: https://github.com/apache/incubator-pinot/pull/4990#discussion_r368333237 ## File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java ## @@ -307,32 +358,32 @@ private InstancePartitions getInstancePartitions(TableConfig tableConfig, } private IdealState waitForExternalViewToConverge(String tableNameWithType) - throws InterruptedException, TimeoutException { + throws InterruptedException { long endTimeMs = System.currentTimeMillis() + EXTERNAL_VIEW_STABILIZATION_MAX_WAIT_MS; -while (System.currentTimeMillis() < endTimeMs) { - IdealState idealState = - _helixDataAccessor.getProperty(_helixDataAccessor.keyBuilder().idealStates(tableNameWithType)); +IdealState idealState; +do { + idealState = _helixDataAccessor.getProperty(_helixDataAccessor.keyBuilder().idealStates(tableNameWithType)); // IdealState might be null if table got deleted, throwing exception to abort the rebalance Preconditions.checkState(idealState != null, "Failed to find the IdealState"); ExternalView externalView = _helixDataAccessor.getProperty(_helixDataAccessor.keyBuilder().externalView(tableNameWithType)); // ExternalView might be null when table is just created, skipping check for this iteration if (externalView != null) { -Map> externalViewSegmentStates = externalView.getRecord().getMapFields(); -if (isExternalViewConverged(externalViewSegmentStates, idealState.getRecord().getMapFields())) { +if (isExternalViewConverged(tableNameWithType, externalView.getRecord().getMapFields(), +idealState.getRecord().getMapFields())) { + LOGGER.info("ExternalView converged for table: {}", tableNameWithType); return idealState; } -if (hasSegmentInErrorState(externalViewSegmentStates)) { - throw new IllegalStateException("Found segments in ERROR state"); -} } Thread.sleep(EXTERNAL_VIEW_CHECK_INTERVAL_MS); -} +} while (System.currentTimeMillis() < endTimeMs); -throw new TimeoutException("Timeout while waiting for ExternalView to converge"); +LOGGER.warn("ExternalView haven't converged within: {}ms for table: {}, continuing the rebalance", Review comment: ```suggestion LOGGER.warn("ExternalView has not converged within: {}ms for table: {}, continuing the rebalance", ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org
[GitHub] [incubator-pinot] mcvsubbu commented on a change in pull request #4990: Enhance TableRebalancer to be able to rebalance table under any condition
mcvsubbu commented on a change in pull request #4990: Enhance TableRebalancer to be able to rebalance table under any condition URL: https://github.com/apache/incubator-pinot/pull/4990#discussion_r368331670 ## File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java ## @@ -175,67 +166,127 @@ public RebalanceResult rebalance(TableConfig tableConfig, Configuration rebalanc if (currentAssignment.equals(targetAssignment)) { LOGGER.info("Table: {} is already balanced", tableNameWithType); if (reassignInstances) { -return new RebalanceResult(RebalanceResult.Status.DONE, "Instance reassigned, table is already balanced", -instancePartitionsMap, targetAssignment); +if (dryRun) { + return new RebalanceResult(RebalanceResult.Status.DONE, + "Instance reassigned in dry-run mode, table is already balanced", instancePartitionsMap, Review comment: Can you elaborate on what instance re-assignment means? Maybe best to add it in the comments on RebalanceConfigConstants. We need to also doc it in along with the rebalance command. Let me know if there is a doc already, and I can read it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org
[GitHub] [incubator-pinot] mcvsubbu commented on a change in pull request #4990: Enhance TableRebalancer to be able to rebalance table under any condition
mcvsubbu commented on a change in pull request #4990: Enhance TableRebalancer to be able to rebalance table under any condition URL: https://github.com/apache/incubator-pinot/pull/4990#discussion_r368333414 ## File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java ## @@ -420,10 +463,19 @@ static boolean isExternalViewConverged(Map> external Map idealStateInstanceStateMap = entry.getValue(); for (Map.Entry instanceStateEntry : idealStateInstanceStateMap.entrySet()) { // Ignore OFFLINE state in IdealState -String state = instanceStateEntry.getValue(); -if (!state.equals(SegmentOnlineOfflineStateModel.OFFLINE) && !state - .equals(externalViewInstanceStateMap.get(instanceStateEntry.getKey( { - return false; +String idealStateInstanceState = instanceStateEntry.getValue(); +if (!idealStateInstanceState.equals(RealtimeSegmentOnlineOfflineStateModel.OFFLINE)) { Review comment: Is the condition inverted? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org
[GitHub] [incubator-pinot] mcvsubbu commented on a change in pull request #4990: Enhance TableRebalancer to be able to rebalance table under any condition
mcvsubbu commented on a change in pull request #4990: Enhance TableRebalancer to be able to rebalance table under any condition URL: https://github.com/apache/incubator-pinot/pull/4990#discussion_r368332133 ## File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java ## @@ -175,67 +166,127 @@ public RebalanceResult rebalance(TableConfig tableConfig, Configuration rebalanc if (currentAssignment.equals(targetAssignment)) { LOGGER.info("Table: {} is already balanced", tableNameWithType); if (reassignInstances) { -return new RebalanceResult(RebalanceResult.Status.DONE, "Instance reassigned, table is already balanced", -instancePartitionsMap, targetAssignment); +if (dryRun) { + return new RebalanceResult(RebalanceResult.Status.DONE, + "Instance reassigned in dry-run mode, table is already balanced", instancePartitionsMap, + targetAssignment); +} else { + return new RebalanceResult(RebalanceResult.Status.DONE, "Instance reassigned, table is already balanced", + instancePartitionsMap, targetAssignment); +} } else { return new RebalanceResult(RebalanceResult.Status.NO_OP, "Table is already balanced", instancePartitionsMap, targetAssignment); } } if (dryRun) { - LOGGER.info("Rebalance table: {} in dry-run mode, returning the target assignment", tableNameWithType); + LOGGER.info("Rebalancing table: {} in dry-run mode, returning the target assignment", tableNameWithType); return new RebalanceResult(RebalanceResult.Status.DONE, "Dry-run mode", instancePartitionsMap, targetAssignment); } -int minAvailableReplicas; -if (rebalanceConfig.getBoolean(RebalanceConfigConstants.DOWNTIME, RebalanceConfigConstants.DEFAULT_DOWNTIME)) { - minAvailableReplicas = 0; +if (!downtime && !currentIdealState.isEnabled()) { + LOGGER.warn("Table: {} is disabled, rebalancing it with downtime", tableNameWithType); + downtime = true; +} + +if (downtime) { LOGGER.info("Rebalancing table: {} with downtime", tableNameWithType); -} else { - minAvailableReplicas = rebalanceConfig.getInt(RebalanceConfigConstants.MIN_REPLICAS_TO_KEEP_UP_FOR_NO_DOWNTIME, - RebalanceConfigConstants.DEFAULT_MIN_REPLICAS_TO_KEEP_UP_FOR_NO_DOWNTIME); - int numCurrentReplicas = currentAssignment.values().iterator().next().size(); - int numTargetReplicas = targetAssignment.values().iterator().next().size(); - // Use the smaller one to determine the min available replicas - int numReplicas = Math.min(numCurrentReplicas, numTargetReplicas); - if (minAvailableReplicas > 0) { -if (minAvailableReplicas >= numReplicas) { - LOGGER.warn( - "Illegal config for min available replicas: {} for table: {}, must be less than number of replicas (current: {}, target: {})", - minAvailableReplicas, tableNameWithType, numCurrentReplicas, numTargetReplicas); - return new RebalanceResult(RebalanceResult.Status.FAILED, "Illegal min available replicas config", + + while (true) { Review comment: Is is useful to extract this part into a method (provided it has reasonable number of arguments. Similarly the else part. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org
[GitHub] [incubator-pinot] mcvsubbu commented on a change in pull request #4990: Enhance TableRebalancer to be able to rebalance table under any condition
mcvsubbu commented on a change in pull request #4990: Enhance TableRebalancer to be able to rebalance table under any condition URL: https://github.com/apache/incubator-pinot/pull/4990#discussion_r368333800 ## File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java ## @@ -420,10 +463,19 @@ static boolean isExternalViewConverged(Map> external Map idealStateInstanceStateMap = entry.getValue(); for (Map.Entry instanceStateEntry : idealStateInstanceStateMap.entrySet()) { // Ignore OFFLINE state in IdealState -String state = instanceStateEntry.getValue(); -if (!state.equals(SegmentOnlineOfflineStateModel.OFFLINE) && !state - .equals(externalViewInstanceStateMap.get(instanceStateEntry.getKey( { - return false; +String idealStateInstanceState = instanceStateEntry.getValue(); +if (!idealStateInstanceState.equals(RealtimeSegmentOnlineOfflineStateModel.OFFLINE)) { + continue; +} +String instanceName = instanceStateEntry.getKey(); +String externalViewInstanceState = externalViewInstanceStateMap.get(instanceName); +if (!idealStateInstanceState.equals(externalViewInstanceState)) { + if (RealtimeSegmentOnlineOfflineStateModel.ERROR.equals(externalViewInstanceState)) { +LOGGER.warn("Found ERROR instance: {} for segment: {}, table: {}", instanceName, segmentName, Review comment: I think a warning is better. The server instance, when it transitioned into ERROR state, would have logged an error anyway. It is not an error in the controller if it discovers that a server was malfunctioning. On another (but related) note, I would avoid logging strings that have "ERROR". It is quite irritating to see these come up when we see some problem, and want to get all the error logs (as in, logs that have been logged with LOGGER.error() statement). I am not sure we can do much here, but something to note. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org
[GitHub] [incubator-pinot] mcvsubbu commented on a change in pull request #4979: Update quota manager to reduce zk access
mcvsubbu commented on a change in pull request #4979: Update quota manager to reduce zk access URL: https://github.com/apache/incubator-pinot/pull/4979#discussion_r368329377 ## File path: pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java ## @@ -323,11 +321,43 @@ public void processQueryQuotaChange() { } int onlineBrokerCount = otherOnlineBrokerCount + 1; - double overallRate = Double.parseDouble(quotaConfig.getMaxQueriesPerSecond()); + // Get stat from property store + String tableConfigPath = constructTableConfigPath(tableNameWithType); + Stat stat = _propertyStore.getStat(tableConfigPath, AccessOption.PERSISTENT); + if (stat == null) { +LOGGER.info("Table {} gets deleted from property store. Removing its rate limit.", tableNameWithType); Review comment: ```suggestion LOGGER.info("Table {} has been deleted from property store. Removing its rate limit.", tableNameWithType); ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org
[GitHub] [incubator-pinot] mcvsubbu commented on a change in pull request #4979: Update quota manager to reduce zk access
mcvsubbu commented on a change in pull request #4979: Update quota manager to reduce zk access URL: https://github.com/apache/incubator-pinot/pull/4979#discussion_r368328494 ## File path: pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java ## @@ -122,15 +133,11 @@ private void removeRateLimiter(String tableNameWithType) { /** * Get QuotaConfig from property store. - * @param rawTableName table name without table type. - * @param tableType table type: offline or real-time. + * @param tableNameWithType table name with table type. * @return QuotaConfig, which could be null. */ - private QuotaConfig getQuotaConfigFromPropertyStore(String rawTableName, CommonConstants.Helix.TableType tableType) { -ZkHelixPropertyStore propertyStore = _helixManager.getHelixPropertyStore(); - -String tableNameWithType = TableNameBuilder.forType(tableType).tableNameWithType(rawTableName); -TableConfig tableConfig = ZKMetadataProvider.getTableConfig(propertyStore, tableNameWithType); + private QuotaConfig getQuotaConfigFromPropertyStore(String tableNameWithType) { +TableConfig tableConfig = ZKMetadataProvider.getTableConfig(_propertyStore, tableNameWithType); Review comment: I remember some discussions from before that the property store handle is stale if helix loses zk connection. Has that assumption changed (or is my recollection wrong)? Helix code seems to be resetting the handle on disconnect() as well. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org
[GitHub] [incubator-pinot] mcvsubbu commented on a change in pull request #4979: Update quota manager to reduce zk access
mcvsubbu commented on a change in pull request #4979: Update quota manager to reduce zk access URL: https://github.com/apache/incubator-pinot/pull/4979#discussion_r368329637 ## File path: pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java ## @@ -323,11 +321,43 @@ public void processQueryQuotaChange() { } int onlineBrokerCount = otherOnlineBrokerCount + 1; - double overallRate = Double.parseDouble(quotaConfig.getMaxQueriesPerSecond()); + // Get stat from property store + String tableConfigPath = constructTableConfigPath(tableNameWithType); + Stat stat = _propertyStore.getStat(tableConfigPath, AccessOption.PERSISTENT); + if (stat == null) { +LOGGER.info("Table {} gets deleted from property store. Removing its rate limit.", tableNameWithType); +it.remove(); +continue; + } + + // If number of online brokers and table config don't change, there is no need to re-calculate the dynamic rate. + if (onlineBrokerCount == queryQuotaEntity.getNumOnlineBrokers() && stat.getVersion() == queryQuotaEntity + .getTableConfigStatVersion()) { +continue; Review comment: It may be useful to add a log line here with the version This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org
[GitHub] [incubator-pinot] mcvsubbu commented on a change in pull request #4979: Update quota manager to reduce zk access
mcvsubbu commented on a change in pull request #4979: Update quota manager to reduce zk access URL: https://github.com/apache/incubator-pinot/pull/4979#discussion_r368328996 ## File path: pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java ## @@ -39,34 +41,43 @@ import org.apache.pinot.common.metrics.BrokerMetrics; import org.apache.pinot.common.utils.CommonConstants; import org.apache.pinot.common.utils.helix.HelixHelper; +import org.apache.zookeeper.data.Stat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static org.apache.pinot.common.utils.CommonConstants.Helix.BROKER_RESOURCE_INSTANCE; -import static org.apache.pinot.common.utils.CommonConstants.Helix.TableType; - +/** + * This class is to support the qps quota feature. + * It depends on the broker source change to update the dynamic rate limit, + * which means it only gets updated when a new table added or a broker restarted. + * TODO: support adding new rate limiter for existing tables without restarting the broker. + */ public class HelixExternalViewBasedQueryQuotaManager implements ClusterChangeHandler, QueryQuotaManager { private static final Logger LOGGER = LoggerFactory.getLogger(HelixExternalViewBasedQueryQuotaManager.class); private static final int TIME_RANGE_IN_SECOND = 1; private final AtomicInteger _lastKnownBrokerResourceVersion = new AtomicInteger(-1); - private final Map _rateLimiterMap = new ConcurrentHashMap<>(); + private final Map _rateLimiterMap = new ConcurrentHashMap<>(); private HelixManager _helixManager; + private ZkHelixPropertyStore _propertyStore; private BrokerMetrics _brokerMetrics; @Override public void init(HelixManager helixManager) { Preconditions.checkState(_helixManager == null, "HelixExternalViewBasedQueryQuotaManager is already initialized"); _helixManager = helixManager; +_propertyStore = _helixManager.getHelixPropertyStore(); } @Override public void processClusterChange(HelixConstants.ChangeType changeType) { Preconditions .checkState(changeType == HelixConstants.ChangeType.EXTERNAL_VIEW, "Illegal change type: " + changeType); -processQueryQuotaChange(); +ExternalView currentBrokerResource = HelixHelper Review comment: suggest rename `currentBrokerResource` to `brokerResourceEV` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org
[GitHub] [incubator-pinot] mcvsubbu commented on a change in pull request #4979: Update quota manager to reduce zk access
mcvsubbu commented on a change in pull request #4979: Update quota manager to reduce zk access URL: https://github.com/apache/incubator-pinot/pull/4979#discussion_r368328688 ## File path: pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java ## @@ -180,10 +187,15 @@ private void createRateLimiter(String tableNameWithType, ExternalView brokerReso return; } +// Get stat from property store +String tableConfigPath = constructTableConfigPath(tableNameWithType); +Stat stat = _propertyStore.getStat(tableConfigPath, AccessOption.PERSISTENT); + double perBrokerRate = overallRate / onlineCount; -QueryQuotaConfig queryQuotaConfig = -new QueryQuotaConfig(RateLimiter.create(perBrokerRate), new HitCounter(TIME_RANGE_IN_SECOND)); -_rateLimiterMap.put(tableNameWithType, queryQuotaConfig); +QueryQuotaEntity queryQuotaEntity = +new QueryQuotaEntity(RateLimiter.create(perBrokerRate), new HitCounter(TIME_RANGE_IN_SECOND), onlineCount, +overallRate, stat.getVersion()); +_rateLimiterMap.put(tableNameWithType, queryQuotaEntity); Review comment: Add the version to the log line below This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org
[GitHub] [incubator-pinot] mcvsubbu commented on a change in pull request #4979: Update quota manager to reduce zk access
mcvsubbu commented on a change in pull request #4979: Update quota manager to reduce zk access URL: https://github.com/apache/incubator-pinot/pull/4979#discussion_r368330092 ## File path: pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java ## @@ -323,11 +321,43 @@ public void processQueryQuotaChange() { } int onlineBrokerCount = otherOnlineBrokerCount + 1; - double overallRate = Double.parseDouble(quotaConfig.getMaxQueriesPerSecond()); + // Get stat from property store + String tableConfigPath = constructTableConfigPath(tableNameWithType); + Stat stat = _propertyStore.getStat(tableConfigPath, AccessOption.PERSISTENT); + if (stat == null) { +LOGGER.info("Table {} gets deleted from property store. Removing its rate limit.", tableNameWithType); +it.remove(); +continue; + } + + // If number of online brokers and table config don't change, there is no need to re-calculate the dynamic rate. + if (onlineBrokerCount == queryQuotaEntity.getNumOnlineBrokers() && stat.getVersion() == queryQuotaEntity + .getTableConfigStatVersion()) { +continue; + } + + double overallRate; + // Get latest quota config only if stat don't match. + if (stat.getVersion() != queryQuotaEntity.getTableConfigStatVersion()) { +QuotaConfig quotaConfig = getQuotaConfigFromPropertyStore(tableNameWithType); +if (quotaConfig == null || quotaConfig.getMaxQueriesPerSecond() == null || !quotaConfig +.isMaxQueriesPerSecondValid()) { + LOGGER.info("No query quota config or the config is invalid for Table {}. Removing its rate limit.", + tableNameWithType); + it.remove(); + continue; +} +overallRate = Double.parseDouble(quotaConfig.getMaxQueriesPerSecond()); + } else { +overallRate = queryQuotaEntity.getOverallRate(); + } double latestRate = overallRate / onlineBrokerCount; - double previousRate = queryQuotaConfig.getRateLimiter().getRate(); + double previousRate = queryQuotaEntity.getRateLimiter().getRate(); if (Math.abs(latestRate - previousRate) > 0.001) { -queryQuotaConfig.getRateLimiter().setRate(latestRate); +queryQuotaEntity.getRateLimiter().setRate(latestRate); +queryQuotaEntity.setNumOnlineBrokers(onlineBrokerCount); +queryQuotaEntity.setOverallRate(overallRate); +queryQuotaEntity.setTableConfigStatVersion(stat.getVersion()); Review comment: Even if we don't update the rate due to small difference, should we not update the tableConfigStatVersion? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org
[GitHub] [incubator-pinot] mcvsubbu commented on a change in pull request #4954: Support schema evolution for consuming segments
mcvsubbu commented on a change in pull request #4954: Support schema evolution for consuming segments URL: https://github.com/apache/incubator-pinot/pull/4954#discussion_r368324340 ## File path: pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImpl.java ## @@ -252,6 +258,16 @@ public long getMaxTime() { return _maxTime; } + public void addExtraColumns(Schema newSchema, + Map defaultColumnActionMap) { +defaultColumnActionMap.forEach(((columnName, defaultColumnAction) -> { + if (defaultColumnAction.isAddAction() && !newSchema.getFieldSpecFor(columnName).isVirtualColumn()) { +_newlyAddedColumnsFieldMap.put(columnName, newSchema.getFieldSpecFor(columnName)); + } +})); +_logger.debug("Newly added columns: " + _newlyAddedColumnsFieldMap.toString()); Review comment: Should be info log This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org
[GitHub] [incubator-pinot] mcvsubbu commented on a change in pull request #4954: Support schema evolution for consuming segments
mcvsubbu commented on a change in pull request #4954: Support schema evolution for consuming segments URL: https://github.com/apache/incubator-pinot/pull/4954#discussion_r368326552 ## File path: pinot-core/src/main/java/org/apache/pinot/core/io/reader/impl/SameMultiValueInvertedIndex.java ## @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.io.reader.impl; + +import java.io.IOException; +import org.apache.pinot.common.utils.Pairs; +import org.apache.pinot.core.io.reader.BaseSingleColumnMultiValueReader; +import org.apache.pinot.core.io.reader.impl.v1.FixedBitMultiValueReader; + + +/** + * Reader for the multi-value column with the same value + */ +public class SameMultiValueInvertedIndex extends BaseSingleColumnMultiValueReader implements SortedIndexMultiValueReader { Review comment: `Constant` is another alternative This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org
[GitHub] [incubator-pinot] mcvsubbu commented on a change in pull request #4954: Support schema evolution for consuming segments
mcvsubbu commented on a change in pull request #4954: Support schema evolution for consuming segments URL: https://github.com/apache/incubator-pinot/pull/4954#discussion_r368327053 ## File path: pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java ## @@ -194,7 +198,17 @@ private void reloadSegment(String tableNameWithType, SegmentMetadata segmentMeta File indexDir = segmentMetadata.getIndexDir(); if (indexDir == null) { - LOGGER.info("Skip reloading REALTIME consuming segment: {} in table: {}", segmentName, tableNameWithType); + if (!_instanceDataManagerConfig.shouldReloadConsumingSegment()) { +LOGGER.info("Skip reloading REALTIME consuming segment: {} in table: {}", segmentName, tableNameWithType); +return; + } + LOGGER.info("Try reloading REALTIME consuming segment: {} in table: {}", segmentName, tableNameWithType); + SegmentMetadataImpl segmentMetadataImpl = (SegmentMetadataImpl) segmentMetadata; Review comment: I would bypass the references to DefaultColumnAction class altogether. Since we have a handle to the mutable index, just call the addExtraColumns API with the new schema. Just make sure that API handles well if we call it multiple times, either with same set of columns or newer ones than the previous time. Agreed with your observation that this _is_ the reload API. It makes me think that reload API should be implemented as a part of the segment API, but that is a HUGE change, and I am not sure what else is involved there (and why it was done this way as a static method like ImmutableSegment.reload()). So, let us not go into that. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org
[GitHub] [incubator-pinot] mcvsubbu commented on a change in pull request #4954: Support schema evolution for consuming segments
mcvsubbu commented on a change in pull request #4954: Support schema evolution for consuming segments URL: https://github.com/apache/incubator-pinot/pull/4954#discussion_r368326469 ## File path: pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImpl.java ## @@ -467,13 +483,22 @@ public SegmentMetadata getSegmentMetadata() { return physicalColumnNames; } + @Override + public Set getColumnNamesForSelectStar() { +return Sets.union(getPhysicalColumnNames(), _newlyAddedColumnsFieldMap.keySet()); + } + @Override public ColumnDataSource getDataSource(String columnName) { FieldSpec fieldSpec = _schema.getFieldSpecFor(columnName); -if (fieldSpec.isVirtualColumn()) { +if ((fieldSpec == null && _newlyAddedColumnsFieldMap.containsKey(columnName)) || fieldSpec.isVirtualColumn()) { Review comment: In that case, this logic is best handled like: ``` if (fieldSpec != null) { if (fieldSpec.isVirtualColumn()) { // process virtual column from schema } else { // process real column from schema } } else { // process newly added column using the new provider factory } ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org
[GitHub] [incubator-pinot] mcvsubbu commented on a change in pull request #4954: Support schema evolution for consuming segments
mcvsubbu commented on a change in pull request #4954: Support schema evolution for consuming segments URL: https://github.com/apache/incubator-pinot/pull/4954#discussion_r368324197 ## File path: pinot-core/src/main/java/org/apache/pinot/core/indexsegment/IndexSegment.java ## @@ -57,6 +58,13 @@ */ Set getPhysicalColumnNames(); + /** + * Returns all columns for the "select *" query + * + * @return Set of column names + */ + Set getColumnNamesForSelectStar(); Review comment: Newly added columns should not be included in virtual column provider. You need a new provider for these columns. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org
[GitHub] [incubator-pinot] mcvsubbu commented on a change in pull request #4954: Support schema evolution for consuming segments
mcvsubbu commented on a change in pull request #4954: Support schema evolution for consuming segments URL: https://github.com/apache/incubator-pinot/pull/4954#discussion_r368325394 ## File path: pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImpl.java ## @@ -252,6 +258,16 @@ public long getMaxTime() { return _maxTime; } + public void addExtraColumns(Schema newSchema, + Map defaultColumnActionMap) { Review comment: I think we should remove the DefaultColumnAction here. We know that there is only one thing we can do -- add columns with a default value. The only argument needed here is newSchema This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org
[GitHub] [incubator-pinot] mcvsubbu commented on a change in pull request #4954: Support schema evolution for consuming segments
mcvsubbu commented on a change in pull request #4954: Support schema evolution for consuming segments URL: https://github.com/apache/incubator-pinot/pull/4954#discussion_r368326346 ## File path: pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImpl.java ## @@ -467,13 +483,22 @@ public SegmentMetadata getSegmentMetadata() { return physicalColumnNames; } + @Override + public Set getColumnNamesForSelectStar() { +return Sets.union(getPhysicalColumnNames(), _newlyAddedColumnsFieldMap.keySet()); + } + @Override public ColumnDataSource getDataSource(String columnName) { FieldSpec fieldSpec = _schema.getFieldSpecFor(columnName); -if (fieldSpec.isVirtualColumn()) { +if ((fieldSpec == null && _newlyAddedColumnsFieldMap.containsKey(columnName)) || fieldSpec.isVirtualColumn()) { Review comment: We should have a different column provider here that should be optimized better than the virtual column provider. The new column provider should not be built every time, but should stay in memory once built. What may be updated is the number of rows. That will minimize additional allocation during query processing. Add a new column provider factory (say, ConstantValueColumnProviderFactory) that has a map from the col name to the provider. The map is built lazily as columns are added to the query. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org
[GitHub] [incubator-pinot] mcvsubbu commented on a change in pull request #4954: Support schema evolution for consuming segments
mcvsubbu commented on a change in pull request #4954: Support schema evolution for consuming segments URL: https://github.com/apache/incubator-pinot/pull/4954#discussion_r364456350 ## File path: pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImpl.java ## @@ -80,9 +84,9 @@ private final long _startTimeMillis = System.currentTimeMillis(); private final String _segmentName; - private final Schema _schema; + private final Schema _originalSchema; private final int _capacity; - private final SegmentMetadata _segmentMetadata; + private final SegmentMetadata _originalSegmentMetadata; Review comment: I would prefer to leave the names as is, and rename the new fields as `_columnsAddedDuringConsumption` or something like that. We can wait for others to chime in before we rename stuff This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org
[GitHub] [incubator-pinot] codecov-io edited a comment on issue #4997: Update dockerfile for pinot image and pinot-presto image
codecov-io edited a comment on issue #4997: Update dockerfile for pinot image and pinot-presto image URL: https://github.com/apache/incubator-pinot/pull/4997#issuecomment-575875680 # [Codecov](https://codecov.io/gh/apache/incubator-pinot/pull/4997?src=pr&el=h1) Report > Merging [#4997](https://codecov.io/gh/apache/incubator-pinot/pull/4997?src=pr&el=desc) into [master](https://codecov.io/gh/apache/incubator-pinot/commit/f990656ee442973d311157bfca993db6d8218874?src=pr&el=desc) will **decrease** coverage by `0.04%`. > The diff coverage is `n/a`. [![Impacted file tree graph](https://codecov.io/gh/apache/incubator-pinot/pull/4997/graphs/tree.svg?width=650&token=4ibza2ugkz&height=150&src=pr)](https://codecov.io/gh/apache/incubator-pinot/pull/4997?src=pr&el=tree) ```diff @@ Coverage Diff @@ ## master#4997 +/- ## - Coverage 57.26% 57.21% -0.05% Complexity 12 12 Files 1177 1177 Lines 6245562455 Branches 9174 9174 - Hits 3576535736 -29 - Misses2402024053 +33 + Partials 2670 2666 -4 ``` | [Impacted Files](https://codecov.io/gh/apache/incubator-pinot/pull/4997?src=pr&el=tree) | Coverage Δ | Complexity Δ | | |---|---|---|---| | [...a/manager/realtime/RealtimeSegmentDataManager.java](https://codecov.io/gh/apache/incubator-pinot/pull/4997/diff?src=pr&el=tree#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS9kYXRhL21hbmFnZXIvcmVhbHRpbWUvUmVhbHRpbWVTZWdtZW50RGF0YU1hbmFnZXIuamF2YQ==) | `50% <0%> (-25%)` | `0% <0%> (ø)` | | | [.../impl/dictionary/FloatOnHeapMutableDictionary.java](https://codecov.io/gh/apache/incubator-pinot/pull/4997/diff?src=pr&el=tree#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS9yZWFsdGltZS9pbXBsL2RpY3Rpb25hcnkvRmxvYXRPbkhlYXBNdXRhYmxlRGljdGlvbmFyeS5qYXZh) | `46.34% <0%> (-14.64%)` | `0% <0%> (ø)` | | | [...elix/core/relocation/RealtimeSegmentRelocator.java](https://codecov.io/gh/apache/incubator-pinot/pull/4997/diff?src=pr&el=tree#diff-cGlub3QtY29udHJvbGxlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29udHJvbGxlci9oZWxpeC9jb3JlL3JlbG9jYXRpb24vUmVhbHRpbWVTZWdtZW50UmVsb2NhdG9yLmphdmE=) | `22.5% <0%> (-7.5%)` | `0% <0%> (ø)` | | | [...e/impl/dictionary/LongOnHeapMutableDictionary.java](https://codecov.io/gh/apache/incubator-pinot/pull/4997/diff?src=pr&el=tree#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS9yZWFsdGltZS9pbXBsL2RpY3Rpb25hcnkvTG9uZ09uSGVhcE11dGFibGVEaWN0aW9uYXJ5LmphdmE=) | `48.78% <0%> (-7.32%)` | `0% <0%> (ø)` | | | [...mpl/dictionary/DoubleOffHeapMutableDictionary.java](https://codecov.io/gh/apache/incubator-pinot/pull/4997/diff?src=pr&el=tree#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS9yZWFsdGltZS9pbXBsL2RpY3Rpb25hcnkvRG91YmxlT2ZmSGVhcE11dGFibGVEaWN0aW9uYXJ5LmphdmE=) | `43.01% <0%> (-6.46%)` | `0% <0%> (ø)` | | | [...impl/dictionary/FloatOffHeapMutableDictionary.java](https://codecov.io/gh/apache/incubator-pinot/pull/4997/diff?src=pr&el=tree#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS9yZWFsdGltZS9pbXBsL2RpY3Rpb25hcnkvRmxvYXRPZmZIZWFwTXV0YWJsZURpY3Rpb25hcnkuamF2YQ==) | `53.76% <0%> (-6.46%)` | `0% <0%> (ø)` | | | [...troller/helix/core/retention/RetentionManager.java](https://codecov.io/gh/apache/incubator-pinot/pull/4997/diff?src=pr&el=tree#diff-cGlub3QtY29udHJvbGxlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29udHJvbGxlci9oZWxpeC9jb3JlL3JldGVudGlvbi9SZXRlbnRpb25NYW5hZ2VyLmphdmE=) | `75% <0%> (-4.17%)` | `0% <0%> (ø)` | | | [...g/apache/pinot/common/utils/helix/HelixHelper.java](https://codecov.io/gh/apache/incubator-pinot/pull/4997/diff?src=pr&el=tree#diff-cGlub3QtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9jb21tb24vdXRpbHMvaGVsaXgvSGVsaXhIZWxwZXIuamF2YQ==) | `50.89% <0%> (-1.8%)` | `0% <0%> (ø)` | | | [...ache/pinot/common/metadata/ZKMetadataProvider.java](https://codecov.io/gh/apache/incubator-pinot/pull/4997/diff?src=pr&el=tree#diff-cGlub3QtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9jb21tb24vbWV0YWRhdGEvWktNZXRhZGF0YVByb3ZpZGVyLmphdmE=) | `67.41% <0%> (-1.13%)` | `0% <0%> (ø)` | | | [...manager/realtime/LLRealtimeSegmentDataManager.java](https://codecov.io/gh/apache/incubator-pinot/pull/4997/diff?src=pr&el=tree#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS9kYXRhL21hbmFnZXIvcmVhbHRpbWUvTExSZWFsdGltZVNlZ21lbnREYXRhTWFuYWdlci5qYXZh) | `68.67% <0%> (-0.98%)` | `0% <0%> (ø)` | | | ... and [9 more](https://codecov.io/gh/apache/incubator-pinot/pull/4997/diff?src=pr&el=tree-more) | | -- [Continue to review full report at Codeco
[incubator-pinot] branch upgrade_docker_build_maven_to_3.6 updated (d5fcaad -> d9d02a3)
This is an automated email from the ASF dual-hosted git repository. xiangfu pushed a change to branch upgrade_docker_build_maven_to_3.6 in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git. from d5fcaad Update dockerfile for pinot image and pinot-presto image add d9d02a3 update ingestion job specs for docker examples No new revisions were added by this update. Summary of changes: docker/images/pinot/Dockerfile | 2 ++ .../images/pinot/ingestion-job-specs/airlineStats.yaml | 6 +++--- .../images/pinot/ingestion-job-specs/baseballStats.yaml | 6 +++--- 3 files changed, 8 insertions(+), 6 deletions(-) copy pinot-tools/src/main/resources/examples/batch/airlineStats/ingestionJobSpec.yaml => docker/images/pinot/ingestion-job-specs/airlineStats.yaml (96%) copy pinot-tools/src/main/resources/examples/batch/baseballStats/ingestionJobSpec.yaml => docker/images/pinot/ingestion-job-specs/baseballStats.yaml (96%) - To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org