[email protected] has posted comments on this change. ( http://gerrit.cloudera.org:8080/21031 )
Change subject: IMPALA-12709: Add support for hierarchical metastore event processing ...................................................................... Patch Set 39: (39 comments) Reworked for comments. http://gerrit.cloudera.org:8080/#/c/21031/38/be/src/common/global-flags.cc File be/src/common/global-flags.cc: http://gerrit.cloudera.org:8080/#/c/21031/38/be/src/common/global-flags.cc@288 PS38, Line 288: : DEFINE_double(hms_event_polling_interval_s, 1, > Have we discuss why we're changing this to allow subsecond? In the initial version, have define a new flag for millisec precision of event polling interval. Based on a comment, have changed the existing hms_event_polling_interval_s to accept double values later. Have updated description in docs/topics/impala_metadata.xml now. http://gerrit.cloudera.org:8080/#/c/21031/38/common/thrift/metrics.json File common/thrift/metrics.json: http://gerrit.cloudera.org:8080/#/c/21031/38/common/thrift/metrics.json@3656 PS38, Line 3656: "key" : "events-processor.outstanding-event-count" Updated docs/topics/impala_metadata.xml now. Currently we maintain pending event count at the executor/thread level. It is used to put backpressure(with max_outstanding_events_on_executors limit flag) on incoming/event fetch mechanism when processing is slow. It is also used to know if all the pending events generated so far have finished processing(i.e., when pending count is 0). Same thread might be processing events of multiple databases(with DB Processors) or multiple tables(with Table Processors). Shall enhance the maintainability in the subsequent PRs. http://gerrit.cloudera.org:8080/#/c/21031/38/fe/src/compat-apache-hive-3/java/org/apache/impala/compat/MetastoreShim.java File fe/src/compat-apache-hive-3/java/org/apache/impala/compat/MetastoreShim.java: http://gerrit.cloudera.org:8080/#/c/21031/38/fe/src/compat-apache-hive-3/java/org/apache/impala/compat/MetastoreShim.java@660 PS38, Line 660: /** : * CDP Hive-3 only function. This is a dummy implementation till the CommitTxnEvent : * dummy implementation class defined in this file becomes actual implementation. Need : * to change when CommitTxnEvent implementation is supported. : */ > Can you add comment, what is the first Apache Hive version where this Pseud This is a dummy implementation till the CommitTxnEvent dummy implementation class defined in this file becomes actual implementation. Need to change when CommitTxnEvent implementation in this file is supported. It would be changed with IMPALA-13285. http://gerrit.cloudera.org:8080/#/c/21031/36/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java File fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java: http://gerrit.cloudera.org:8080/#/c/21031/36/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java@1060 PS36, Line 1060: List<WriteEventInfo> writeEventInfoList; : try (MetaStoreClientPool.MetaStoreClient client = event.getCatalogOpExecutor() : .getCatalog().getMetaStoreClient()) { > Ack. Both can be configured as false. It means, disabling automatic invalidate. In that case, admin must run invalidate metadata manually. http://gerrit.cloudera.org:8080/#/c/21031/38/fe/src/main/java/org/apache/impala/catalog/events/DbEventExecutor.java File fe/src/main/java/org/apache/impala/catalog/events/DbEventExecutor.java: http://gerrit.cloudera.org:8080/#/c/21031/38/fe/src/main/java/org/apache/impala/catalog/events/DbEventExecutor.java@71 PS38, Line 71: // Database name to DbEventExecutor assignment : private final Map<String, DbEventExecutor> dbNameToEventExecutor_; : > Can this be a protected field inside EventExecutorService? Done. Have moved dbNameToEventExecutor to EventExecutorService now. And passed reference to DbEventExecutor from EventExecutorService. http://gerrit.cloudera.org:8080/#/c/21031/38/fe/src/main/java/org/apache/impala/catalog/events/DbEventExecutor.java@263 PS38, Line 263: */ > nit: stale comment Done http://gerrit.cloudera.org:8080/#/c/21031/38/fe/src/main/java/org/apache/impala/catalog/events/DbEventExecutor.java@457 PS38, Line 457: * @return Outstanding event count : */ : long getOutstandingEventCount() { : long outstandingEventCount; : synchronized (outstandingEventCountLock_) { : outstandingEventCount = outstandingEventCount_; : } : > Checked the usages of the final count returned in MetastoreEventProcessor#g Yes, correct. http://gerrit.cloudera.org:8080/#/c/21031/38/fe/src/main/java/org/apache/impala/catalog/events/DbEventExecutor.java@457 PS38, Line 457: * @return Outstanding event count : */ : long getOutstandingEventCount() { : long outstandingEventCount; : synchronized (outstandingEventCountLock_) { : outstandingEventCount = outstandingEventCount_; : } : > Looking again, we might have a potential split-brain problem here by having events_.size() is O(N) operation for ConcurrentLinkedQueue. It may also give inaccurate result if elements are added or removed during the execution of the size(). We would need to do this for (num_db_event_executors+2)*num_table_event_executors_per_db_event_executor queues. We use outstanding event count for these purposes: 1. dbs and tables are assigned(if not already assigned) to appropriate executors/threads based on it. 2. Backpressure to control/restrict processEvents() from fetching more events when outstanding events exceed max_outstanding_events_on_executors. 3. Shown as guage metric. We don't need accurate count in any case. And offer() and poll() operations are thread safe with ConcurrentLinkedQueue. http://gerrit.cloudera.org:8080/#/c/21031/38/fe/src/main/java/org/apache/impala/catalog/events/EventExecutorService.java File fe/src/main/java/org/apache/impala/catalog/events/EventExecutorService.java: http://gerrit.cloudera.org:8080/#/c/21031/38/fe/src/main/java/org/apache/impala/catalog/events/EventExecutorService.java@44 PS38, Line 44: > I don't see stop method. Do you mean shutdown()? Yes. It is not stop. It is shutdown method. EventExecutorService.clear() is not called after ExternalEventsProcessor.pause() invocations. ExternalEventsProcessor.clear() method invokes EventExecutorService.clear(). And ExternalEventsProcessor.clear() is called after ExternalEventsProcessor.pause() only in CatalogServiceCatalog.reset(). Have updated class header and method headers describing which ExternalEventsProcessor method invokes which EventExecutorService method. http://gerrit.cloudera.org:8080/#/c/21031/38/fe/src/main/java/org/apache/impala/catalog/events/EventExecutorService.java@52 PS38, Line 52: public class EventExecutorService { > Can isActive_ flip-flop between false -> true -> false -> true? EventExecutorService cannot be started again once it is shutdown. Have changed the boolean isActive_ flag to EventExecutorStatus status_. Also have updated the description. As long as EventExecutorService is active, it can process/consume the metastore event. There is no direct relation between EventExecutorStatus and EventProcessorStatus. If the event processor becomes inactive at any time, it is handled in DbEventExecutor and TableEventExecutor threads. http://gerrit.cloudera.org:8080/#/c/21031/38/fe/src/main/java/org/apache/impala/catalog/events/EventExecutorService.java@85 PS38, Line 85: } : } : : /** > Can clear() called anytime, even when MetastoreEventsProcessor still dispat Done. Have made start,shutdown,process methods synchronized. And have updated all method headers. http://gerrit.cloudera.org:8080/#/c/21031/38/fe/src/main/java/org/apache/impala/catalog/events/EventExecutorService.java@92 PS38, Line 92: @VisibleForTesting : List<DbEventExecutor> getDbEventExecutors() { : return dbEventExecutors_; : } > can I call start() again after shutdown(). Done http://gerrit.cloudera.org:8080/#/c/21031/38/fe/src/main/java/org/apache/impala/catalog/events/EventExecutorService.java@132 PS38, Line 132: dbEventExecutors_.parallelStream().forEach(DbEventExecutor::clear); : } : > Based on your reply in https://gerrit.cloudera.org/c/21031/36/fe/src/main/j Yes. Have updated method header. http://gerrit.cloudera.org:8080/#/c/21031/38/fe/src/main/java/org/apache/impala/catalog/events/EventExecutorService.java@266 PS38, Line 266: pseudoDropTableEvent.setEventType(MetastoreEventType.DROP_TABLE.toString()); : pseudoDropTableEvent.setTableName(alterEvent.getBeforeTable().getTableName()); : pseudoDropTableEvent.setDbNa > If dbNameToEventExecutor moved into this class, you can do dbNameToEventExe Have moved dbNameToEventExecutor to this class. But assigning and unassigning DbEventExecutor still happen in DbEventExecutor#enqueue() and DbEventExecutor#cleanIfNecessary when DbProcessor is created and destroyed respectively. It is not ok to add DbEventExecutor to dbNameToEventExecutor map here because EventExecutorService#clear() in turn calls DbEventExecutor#clear() and may remove db from dbNameToEventExecutor map since EventExecutorService#clear() may happen any time and it is not synchronized method unlike EventExecutorService#process() where DbEventExecutor#enqueue() happens. So if we have to assign datababase to DbEventExecutor(put in dbNameToEventExecutor map) here in EventExecutorService, need to make EventExecutorService#clear() and EventExecutorService#cleanup() synchronized methods and remove DbEventExecutor#usableLock_ lock. But making EventExecutorService#clear() and EventExecutorService#cleanup() methods synchronized may add overhead as they deal with multiple DbEventExecutors. Also assigning DbEventExecutor happens in EventExecutorService and unassigning happens in DbEventExecutor. Let me know your opinion. http://gerrit.cloudera.org:8080/#/c/21031/38/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java File fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java: http://gerrit.cloudera.org:8080/#/c/21031/38/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java@3282 PS38, Line 3282: catalog_.invalidateTable(tbl.getTableName().toThr > nit: Just to confirm, is this meant to be pass by reference (sharing object tableWriteIds_ is removed from txnToWriteIds_ in catalog and preserved in tableWriteIds_. It is used in process() method and also used in generating list of PseudoAbortTxnEvent in EventExecutorService#processAbortTxnEvent(). http://gerrit.cloudera.org:8080/#/c/21031/38/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java File fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java: http://gerrit.cloudera.org:8080/#/c/21031/38/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java@804 PS38, Line 804: if (!isHierarchicalEventProcessingEnabled()) return 0; : return eventExecutorService_.getOutstandingEventCount(); : } : : /** > nit: can be simplified to Done http://gerrit.cloudera.org:8080/#/c/21031/38/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java@840 PS38, Line 840: */ > Annotate that this may return null. Done http://gerrit.cloudera.org:8080/#/c/21031/38/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java@844 PS38, Line 844: : /** : * This method is only for testing. : > Please add explicit comment that this is only for testing and must not be u Done http://gerrit.cloudera.org:8080/#/c/21031/38/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java@973 PS38, Line 973: : /** : * Fetch the event from HMS specified by 'eventId' : * @return null if the event has been cleaned up or any error occurs. : */ > What happen with this method if eventExecutorService_ != null? EventExecutorService.start() is invoked from ExternalEventsProcessor.start(). There is no pause for EventExecutorService once started. So upon ExternalEventsProcessor.start(long fromEventId), event processor fetches the events and dispatches it to EventExecutorService.process(MetastoreEvent event) as usual. And the event gets processed at DbEventExecutor and TableEventExecutor appropriately. No special processing required. http://gerrit.cloudera.org:8080/#/c/21031/38/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java@1165 PS38, Line 1165: * @return List of NotificationEvents from metastore since lastSyncedEventId : * @throws MetastoreNotificationFetchException : */ : @VisibleForTesting : public List<NotificationEvent> getNextMetastoreEvents(long currentEventId) : throws MetastoreNotificationFetchException { : r > Here, Exception is followed by eventExecutorService_.cleanup(). This cleanup removes unwanted DbProcessors that do not have any pending events, do not have TableProcessors in it and have elapsed min_event_processor_idle_ms time. DbProcessor#cleanIfNecessary() removes unwanted TableProcessors that do not have pending events and have elapsed min_event_processor_idle_ms time. DbProcessors cleanup happens from dispatcher thread here. And TableProcessors cleanup happens from DbEventExecutor thread. http://gerrit.cloudera.org:8080/#/c/21031/38/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java@1173 PS38, Line 1173: : /** > Especially, please state what is the expected follow up to do if this metho Done http://gerrit.cloudera.org:8080/#/c/21031/38/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java@1251 PS38, Line 1251: s method does global invalidation when > In the meantime, can you print different WARN log that has more information Done. Modified to show "Lag: {}. {} events pending to be dispatched" when hierarchical mode is enabled. http://gerrit.cloudera.org:8080/#/c/21031/38/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java@1424 PS38, Line 1424: > This check is repeated in multiple places. Please create a dedicated method Done http://gerrit.cloudera.org:8080/#/c/21031/38/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java@1437 PS38, Line 1437: > if isHierarchical is True, where are we printing the "Processing" log? We have event infolog upon event processing in DbEventExecutor and TableEventExecutor threads. http://gerrit.cloudera.org:8080/#/c/21031/38/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java@1463 PS38, Line 1463: Map<MetastoreEvent, Long> eventProcessingTime = n > I understand that eventExecutorService_ enable us from processing events in Spinning the loop(getOutstandingEventCount() == 0) to make sure events are processed in the DbEventExecutor/TableEventExecutor thread may have a problem when exception occurs in any event processing and that event’s MetastoreEvent#onFailure() return false and auto global invalidation also cannot happen. Because, spin loop never terminates in that case and processEvents method wouldn’t return. It becomes more complicate to handle it just for testing. http://gerrit.cloudera.org:8080/#/c/21031/38/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java@1484 PS38, Line 1484: currentFilteredEvent_ = event; : String targetName = event.getTargetName(); : String desc = String.format("%s %s on %s, eventId=%d", : isHierarchical ? "Dispatching" : "Processing", event.getEventType(), : targetName, event.getEventId()); : try (ThreadNameAnnotator tna = ne > So will this LOG.info line lies if hierarchical mode enabled? Done. Modified to show "Time elapsed in dispatching event batch: {}" when hierarchical mode is enabled. http://gerrit.cloudera.org:8080/#/c/21031/36/fe/src/main/java/org/apache/impala/catalog/events/RenameTableBarrierEvent.java File fe/src/main/java/org/apache/impala/catalog/events/RenameTableBarrierEvent.java: http://gerrit.cloudera.org:8080/#/c/21031/36/fe/src/main/java/org/apache/impala/catalog/events/RenameTableBarrierEvent.java@72 PS36, Line 72: nt pr > Agree with Riza that adding "synchronized" to the methods is more maintaina Done http://gerrit.cloudera.org:8080/#/c/21031/38/fe/src/main/java/org/apache/impala/catalog/events/RenameTableBarrierEvent.java File fe/src/main/java/org/apache/impala/catalog/events/RenameTableBarrierEvent.java: http://gerrit.cloudera.org:8080/#/c/21031/38/fe/src/main/java/org/apache/impala/catalog/events/RenameTableBarrierEvent.java@71 PS38, Line 71: /** : * Rename event processing state of alter table rename event. It is combined state of : * both pseudo-events. : */ > Please add a comment in this class on why volatile variables are enough to Have synchronized methods now according to the other comment. http://gerrit.cloudera.org:8080/#/c/21031/38/fe/src/main/java/org/apache/impala/catalog/events/RenameTableBarrierEvent.java@82 PS38, Line 82: // Whether pseudo create table event is skipped upon the event process. : private boolean createSkipped_ = false; : : // Whether pseudo create table event is processed. : private boolean createProcessed_ = false; > Should this stay volatile like the rest? Have synchronized methods now according to the other comment. http://gerrit.cloudera.org:8080/#/c/21031/36/fe/src/main/java/org/apache/impala/catalog/events/TableEventExecutor.java File fe/src/main/java/org/apache/impala/catalog/events/TableEventExecutor.java: http://gerrit.cloudera.org:8080/#/c/21031/36/fe/src/main/java/org/apache/impala/catalog/events/TableEventExecutor.java@158 PS36, Line 158: if (event instanceof DbBarrierEvent) { > Do we have test coverage for such cases? Yes. It is covered with testEventSkippingWhenDropDatabaseQueuedBehind. http://gerrit.cloudera.org:8080/#/c/21031/38/fe/src/main/java/org/apache/impala/catalog/events/TableEventExecutor.java File fe/src/main/java/org/apache/impala/catalog/events/TableEventExecutor.java: http://gerrit.cloudera.org:8080/#/c/21031/38/fe/src/main/java/org/apache/impala/catalog/events/TableEventExecutor.java@295 PS38, Line 295: */ : long getOutstandingEventCount() { : synchronized (outstandingEventCountLock_) { : return outstandingEventCount_; : > Is it possible to count directly by summing events_.size() of all tableProc events_.size() is O(N) operation for ConcurrentLinkedQueue. It may also give inaccurate result if elements are added or removed during the execution of the size(). We would need to do this for (num_db_event_executors+2)*num_table_event_executors_per_db_event_executor queues. We use outstanding event count for these purposes: 1. dbs and tables are assigned(if not already assigned) to appropriate executors/threads based on it. 2. Backpressure to control/restrict processEvents() from fetching more events when outstanding events exceed max_outstanding_events_on_executors. 3. Shown as guage metric. We don't accurate count in any case. And offer() and poll() operations are thread safe. http://gerrit.cloudera.org:8080/#/c/21031/38/fe/src/test/java/org/apache/impala/catalog/MetastoreApiTestUtils.java File fe/src/test/java/org/apache/impala/catalog/MetastoreApiTestUtils.java: http://gerrit.cloudera.org:8080/#/c/21031/38/fe/src/test/java/org/apache/impala/catalog/MetastoreApiTestUtils.java@196 PS38, Line 196: : /** : * Method to generate insert event for the transactional table to hms notification log : */ > Please add comment what is this method doing. Done http://gerrit.cloudera.org:8080/#/c/21031/38/fe/src/test/java/org/apache/impala/catalog/events/EventExecutorServiceTest.java File fe/src/test/java/org/apache/impala/catalog/events/EventExecutorServiceTest.java: http://gerrit.cloudera.org:8080/#/c/21031/38/fe/src/test/java/org/apache/impala/catalog/events/EventExecutorServiceTest.java@93 PS38, Line 93: eventExecutorService_ > Assert that this is not null. Done http://gerrit.cloudera.org:8080/#/c/21031/38/fe/src/test/java/org/apache/impala/catalog/events/EventExecutorServiceTest.java@114 PS38, Line 114: // before running new test > Please add comment why this is necessary. Yes. We set different instance of EventExecutorService the appropriate tests. This step here is to ensures the default EventExecutorService instance that was created within eventsProcessor_ is restored. Added a comment now. http://gerrit.cloudera.org:8080/#/c/21031/38/fe/src/test/java/org/apache/impala/catalog/events/EventExecutorServiceTest.java@125 PS38, Line 125: public void cleanUp() { : assertEquals(MetastoreEventsProces > Also assert that test ends with empty eventsProcessor_? We use a new local EventExecutorService instance in each test method. And tests ensure EventExecutorService process the events generated within the respective test and verifies the expectations. It doesn't make sense to get outstanding event count of EventExecutorService at this place. http://gerrit.cloudera.org:8080/#/c/21031/38/tests/custom_cluster/test_event_processing_perf.py File tests/custom_cluster/test_event_processing_perf.py: http://gerrit.cloudera.org:8080/#/c/21031/38/tests/custom_cluster/test_event_processing_perf.py@34 PS38, Line 34: class TestEventProcessingPerf(CustomClusterTestSuite): > How long does it takes to run the whole TestEventProcessingPerf? It takes very long time(~ 5hrs) to run with just with these smaller values. Have marked to skip tests in the normal run. http://gerrit.cloudera.org:8080/#/c/21031/38/tests/custom_cluster/test_event_processing_perf.py@53 PS38, Line 53: > Wherever you set very low hms_event_polling_interval_s to speedup tests, I we are already starting custom cluster tests with DEFAULT_STATESTORE_ARGS. --state_store_args=--statestore_update_frequency_ms=50 --statestore_priority_update_frequency_ms=50 --statestore_heartbeat_frequency_ms=50 http://gerrit.cloudera.org:8080/#/c/21031/38/tests/custom_cluster/test_event_processing_perf.py@228 PS38, Line 228: True) > Please write comment on what this method does. Done http://gerrit.cloudera.org:8080/#/c/21031/38/tests/util/event_processor_utils.py File tests/util/event_processor_utils.py: http://gerrit.cloudera.org:8080/#/c/21031/38/tests/util/event_processor_utils.py@60 PS38, Line 60: and outstanding_events == 0 > Is this only relevant if hierarchical mode is enabled? Yes it is relevant when hierarchical event processing is enabled. And has no effect when it is disabled. get_outstanding_event_count() always returns 0 if hierarchical processing is disabled. Otherwise, appropriate value. We just want to ensure there are no pending events(i.e., outstanding event count is 0) on event executors. We don't need expected_outstanding_event argument. -- To view, visit http://gerrit.cloudera.org:8080/21031 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: I76d8a739f9db6d40f01028bfd786a85d83f9e5d6 Gerrit-Change-Number: 21031 Gerrit-PatchSet: 39 Gerrit-Owner: Anonymous Coward <[email protected]> Gerrit-Reviewer: Anonymous Coward <[email protected]> Gerrit-Reviewer: Anonymous Coward <[email protected]> Gerrit-Reviewer: Csaba Ringhofer <[email protected]> Gerrit-Reviewer: Impala Public Jenkins <[email protected]> Gerrit-Reviewer: Quanlong Huang <[email protected]> Gerrit-Reviewer: Riza Suminto <[email protected]> Gerrit-Reviewer: Sai Hemanth Gantasala <[email protected]> Gerrit-Comment-Date: Mon, 31 Mar 2025 04:08:19 +0000 Gerrit-HasComments: Yes
