[email protected] has posted comments on this change. ( 
http://gerrit.cloudera.org:8080/21031 )

Change subject: IMPALA-12709: Add support for hierarchical metastore event 
processing
......................................................................


Patch Set 39:

(39 comments)

Reworked for comments.

http://gerrit.cloudera.org:8080/#/c/21031/38/be/src/common/global-flags.cc
File be/src/common/global-flags.cc:

http://gerrit.cloudera.org:8080/#/c/21031/38/be/src/common/global-flags.cc@288
PS38, Line 288:
              : DEFINE_double(hms_event_polling_interval_s, 1,
> Have we discuss why we're changing this to allow subsecond?
In the initial version, have define a new flag for millisec precision of event 
polling interval. Based on a comment, have changed the existing 
hms_event_polling_interval_s to accept double values later.
Have updated description in docs/topics/impala_metadata.xml now.


http://gerrit.cloudera.org:8080/#/c/21031/38/common/thrift/metrics.json
File common/thrift/metrics.json:

http://gerrit.cloudera.org:8080/#/c/21031/38/common/thrift/metrics.json@3656
PS38, Line 3656:     "key" : "events-processor.outstanding-event-count"
Updated docs/topics/impala_metadata.xml now.

Currently we maintain pending event count at the executor/thread level. It is 
used to put backpressure(with max_outstanding_events_on_executors limit flag) 
on incoming/event fetch mechanism when processing is slow. It is also used to 
know if all the pending events generated so far have finished processing(i.e., 
when pending count is 0). Same thread might be processing events of multiple 
databases(with DB Processors) or multiple tables(with Table Processors). Shall 
enhance the maintainability in the subsequent PRs.


http://gerrit.cloudera.org:8080/#/c/21031/38/fe/src/compat-apache-hive-3/java/org/apache/impala/compat/MetastoreShim.java
File 
fe/src/compat-apache-hive-3/java/org/apache/impala/compat/MetastoreShim.java:

http://gerrit.cloudera.org:8080/#/c/21031/38/fe/src/compat-apache-hive-3/java/org/apache/impala/compat/MetastoreShim.java@660
PS38, Line 660:   /**
              :    * CDP Hive-3 only function. This is a dummy implementation 
till the CommitTxnEvent
              :    * dummy implementation class defined in this file becomes 
actual implementation. Need
              :    * to change when CommitTxnEvent implementation is supported.
              :    */
> Can you add comment, what is the first Apache Hive version where this Pseud
This is a dummy implementation till the CommitTxnEvent dummy implementation 
class defined in this file becomes actual implementation. Need to change when 
CommitTxnEvent implementation in this file is supported. It would be changed 
with IMPALA-13285.


http://gerrit.cloudera.org:8080/#/c/21031/36/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java
File fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java:

http://gerrit.cloudera.org:8080/#/c/21031/36/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java@1060
PS36, Line 1060:     List<WriteEventInfo> writeEventInfoList;
               :     try (MetaStoreClientPool.MetaStoreClient client = 
event.getCatalogOpExecutor()
               :         .getCatalog().getMetaStoreClient()) {
> Ack.
Both can be configured as false. It means, disabling automatic invalidate. In 
that case, admin must run invalidate metadata manually.


http://gerrit.cloudera.org:8080/#/c/21031/38/fe/src/main/java/org/apache/impala/catalog/events/DbEventExecutor.java
File fe/src/main/java/org/apache/impala/catalog/events/DbEventExecutor.java:

http://gerrit.cloudera.org:8080/#/c/21031/38/fe/src/main/java/org/apache/impala/catalog/events/DbEventExecutor.java@71
PS38, Line 71:   // Database name to DbEventExecutor assignment
             :   private final Map<String, DbEventExecutor> 
dbNameToEventExecutor_;
             :
> Can this be a protected field inside EventExecutorService?
Done. Have moved dbNameToEventExecutor to EventExecutorService now. And passed 
reference to DbEventExecutor from EventExecutorService.


http://gerrit.cloudera.org:8080/#/c/21031/38/fe/src/main/java/org/apache/impala/catalog/events/DbEventExecutor.java@263
PS38, Line 263:      */
> nit: stale comment
Done


http://gerrit.cloudera.org:8080/#/c/21031/38/fe/src/main/java/org/apache/impala/catalog/events/DbEventExecutor.java@457
PS38, Line 457:    * @return Outstanding event count
              :    */
              :   long getOutstandingEventCount() {
              :     long outstandingEventCount;
              :     synchronized (outstandingEventCountLock_) {
              :       outstandingEventCount = outstandingEventCount_;
              :     }
              :
> Checked the usages of the final count returned in MetastoreEventProcessor#g
Yes, correct.


http://gerrit.cloudera.org:8080/#/c/21031/38/fe/src/main/java/org/apache/impala/catalog/events/DbEventExecutor.java@457
PS38, Line 457:    * @return Outstanding event count
              :    */
              :   long getOutstandingEventCount() {
              :     long outstandingEventCount;
              :     synchronized (outstandingEventCountLock_) {
              :       outstandingEventCount = outstandingEventCount_;
              :     }
              :
> Looking again, we might have a potential split-brain problem here by having
events_.size() is O(N) operation for ConcurrentLinkedQueue. It may also give 
inaccurate result if elements are added or removed during the execution of the 
size().  We would need to do this for 
(num_db_event_executors+2)*num_table_event_executors_per_db_event_executor 
queues.
We use outstanding event count for these purposes:
1. dbs and tables are assigned(if not already assigned) to appropriate 
executors/threads based on it.
2. Backpressure to control/restrict processEvents() from fetching more events 
when outstanding events exceed max_outstanding_events_on_executors.
3. Shown as guage metric.
We don't need accurate count in any case. And offer() and poll() operations are 
thread safe with ConcurrentLinkedQueue.


http://gerrit.cloudera.org:8080/#/c/21031/38/fe/src/main/java/org/apache/impala/catalog/events/EventExecutorService.java
File 
fe/src/main/java/org/apache/impala/catalog/events/EventExecutorService.java:

http://gerrit.cloudera.org:8080/#/c/21031/38/fe/src/main/java/org/apache/impala/catalog/events/EventExecutorService.java@44
PS38, Line 44:
> I don't see stop method. Do you mean shutdown()?
Yes. It is not stop. It is shutdown method. EventExecutorService.clear() is not 
called after ExternalEventsProcessor.pause() invocations.
ExternalEventsProcessor.clear() method invokes EventExecutorService.clear(). 
And ExternalEventsProcessor.clear() is called after 
ExternalEventsProcessor.pause() only in CatalogServiceCatalog.reset(). Have 
updated class header and method headers describing which 
ExternalEventsProcessor method invokes which EventExecutorService method.


http://gerrit.cloudera.org:8080/#/c/21031/38/fe/src/main/java/org/apache/impala/catalog/events/EventExecutorService.java@52
PS38, Line 52: public class EventExecutorService {
> Can isActive_ flip-flop between false -> true -> false -> true?
EventExecutorService cannot be started again once it is shutdown. Have changed 
the boolean isActive_ flag to EventExecutorStatus status_. Also have updated 
the description. As long as EventExecutorService is active, it can 
process/consume the metastore event. There is no direct relation between 
EventExecutorStatus and EventProcessorStatus. If the event processor becomes 
inactive at any time, it is handled in DbEventExecutor and TableEventExecutor 
threads.


http://gerrit.cloudera.org:8080/#/c/21031/38/fe/src/main/java/org/apache/impala/catalog/events/EventExecutorService.java@85
PS38, Line 85:     }
             :   }
             :
             :   /**
> Can clear() called anytime, even when MetastoreEventsProcessor still dispat
Done. Have made start,shutdown,process methods synchronized. And have updated 
all method headers.


http://gerrit.cloudera.org:8080/#/c/21031/38/fe/src/main/java/org/apache/impala/catalog/events/EventExecutorService.java@92
PS38, Line 92:   @VisibleForTesting
             :   List<DbEventExecutor> getDbEventExecutors() {
             :     return dbEventExecutors_;
             :   }
> can I call start() again after shutdown().
Done


http://gerrit.cloudera.org:8080/#/c/21031/38/fe/src/main/java/org/apache/impala/catalog/events/EventExecutorService.java@132
PS38, Line 132:     
dbEventExecutors_.parallelStream().forEach(DbEventExecutor::clear);
              :   }
              :
> Based on your reply in https://gerrit.cloudera.org/c/21031/36/fe/src/main/j
Yes. Have updated method header.


http://gerrit.cloudera.org:8080/#/c/21031/38/fe/src/main/java/org/apache/impala/catalog/events/EventExecutorService.java@266
PS38, Line 266:     
pseudoDropTableEvent.setEventType(MetastoreEventType.DROP_TABLE.toString());
              :     
pseudoDropTableEvent.setTableName(alterEvent.getBeforeTable().getTableName());
              :     pseudoDropTableEvent.setDbNa
> If dbNameToEventExecutor moved into this class, you can do dbNameToEventExe
Have moved dbNameToEventExecutor to this class. But assigning and unassigning 
DbEventExecutor still happen in DbEventExecutor#enqueue() and 
DbEventExecutor#cleanIfNecessary when DbProcessor is created and destroyed 
respectively.
It is not ok to add DbEventExecutor to dbNameToEventExecutor map here because 
EventExecutorService#clear() in turn calls DbEventExecutor#clear() and may 
remove db from dbNameToEventExecutor map since EventExecutorService#clear() may 
happen any time and it is not synchronized method unlike 
EventExecutorService#process() where DbEventExecutor#enqueue() happens.


So if we have to assign datababase to DbEventExecutor(put in 
dbNameToEventExecutor map) here in EventExecutorService, need to make 
EventExecutorService#clear() and EventExecutorService#cleanup() synchronized 
methods and remove DbEventExecutor#usableLock_ lock.
But making EventExecutorService#clear() and EventExecutorService#cleanup() 
methods synchronized may add overhead as they deal with multiple 
DbEventExecutors. Also assigning DbEventExecutor happens in 
EventExecutorService and unassigning happens in DbEventExecutor.
Let me know your opinion.


http://gerrit.cloudera.org:8080/#/c/21031/38/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
File fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java:

http://gerrit.cloudera.org:8080/#/c/21031/38/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java@3282
PS38, Line 3282: catalog_.invalidateTable(tbl.getTableName().toThr
> nit: Just to confirm, is this meant to be pass by reference (sharing object
tableWriteIds_ is removed from txnToWriteIds_ in catalog and preserved in 
tableWriteIds_. It is used in process() method and also used in generating list 
of PseudoAbortTxnEvent in EventExecutorService#processAbortTxnEvent().


http://gerrit.cloudera.org:8080/#/c/21031/38/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java
File 
fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java:

http://gerrit.cloudera.org:8080/#/c/21031/38/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java@804
PS38, Line 804:     if (!isHierarchicalEventProcessingEnabled()) return 0;
              :     return eventExecutorService_.getOutstandingEventCount();
              :   }
              :
              :   /**
> nit: can be simplified to
Done


http://gerrit.cloudera.org:8080/#/c/21031/38/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java@840
PS38, Line 840:    */
> Annotate that this may return null.
Done


http://gerrit.cloudera.org:8080/#/c/21031/38/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java@844
PS38, Line 844:
              :   /**
              :    * This method is only for testing.
              :
> Please add explicit comment that this is only for testing and must not be u
Done


http://gerrit.cloudera.org:8080/#/c/21031/38/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java@973
PS38, Line 973:
              :   /**
              :    * Fetch the event from HMS specified by 'eventId'
              :    * @return null if the event has been cleaned up or any error 
occurs.
              :    */
> What happen with this method if eventExecutorService_ != null?
EventExecutorService.start() is invoked from ExternalEventsProcessor.start().  
There is no pause for EventExecutorService once started. So upon 
ExternalEventsProcessor.start(long fromEventId), event processor fetches the 
events and dispatches it to EventExecutorService.process(MetastoreEvent event) 
as usual. And the event gets processed at DbEventExecutor and 
TableEventExecutor appropriately. No special processing required.


http://gerrit.cloudera.org:8080/#/c/21031/38/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java@1165
PS38, Line 1165:    * @return List of NotificationEvents from metastore since 
lastSyncedEventId
               :    * @throws MetastoreNotificationFetchException
               :    */
               :   @VisibleForTesting
               :   public List<NotificationEvent> getNextMetastoreEvents(long 
currentEventId)
               :       throws MetastoreNotificationFetchException {
               :     r
> Here, Exception is followed by eventExecutorService_.cleanup().
This cleanup removes unwanted DbProcessors that do not have any pending events, 
do not have TableProcessors in it and have elapsed min_event_processor_idle_ms 
time.

DbProcessor#cleanIfNecessary() removes unwanted TableProcessors that do not 
have pending events and have elapsed min_event_processor_idle_ms time.

DbProcessors cleanup happens from dispatcher thread here. And TableProcessors 
cleanup happens from DbEventExecutor thread.


http://gerrit.cloudera.org:8080/#/c/21031/38/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java@1173
PS38, Line 1173:
               :   /**
> Especially, please state what is the expected follow up to do if this metho
Done


http://gerrit.cloudera.org:8080/#/c/21031/38/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java@1251
PS38, Line 1251: s method does global invalidation when
> In the meantime, can you print different WARN log that has more information
Done. Modified to show "Lag: {}. {} events pending to be dispatched" when 
hierarchical mode is enabled.


http://gerrit.cloudera.org:8080/#/c/21031/38/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java@1424
PS38, Line 1424:
> This check is repeated in multiple places. Please create a dedicated method
Done


http://gerrit.cloudera.org:8080/#/c/21031/38/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java@1437
PS38, Line 1437:
> if isHierarchical is True, where are we printing the "Processing" log?
We have event infolog upon event processing in DbEventExecutor and 
TableEventExecutor threads.


http://gerrit.cloudera.org:8080/#/c/21031/38/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java@1463
PS38, Line 1463:     Map<MetastoreEvent, Long> eventProcessingTime = n
> I understand that eventExecutorService_ enable us from processing events in
Spinning the loop(getOutstandingEventCount() == 0) to make sure events are 
processed in the DbEventExecutor/TableEventExecutor thread may have a problem 
when exception occurs in any event processing and that event’s 
MetastoreEvent#onFailure() return false and auto global invalidation also 
cannot happen. Because, spin loop never terminates in that case and 
processEvents method wouldn’t return. It becomes more complicate to handle it 
just for testing.


http://gerrit.cloudera.org:8080/#/c/21031/38/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java@1484
PS38, Line 1484:           currentFilteredEvent_ = event;
               :           String targetName = event.getTargetName();
               :           String desc = String.format("%s %s on %s, 
eventId=%d",
               :               isHierarchical ? "Dispatching" : "Processing", 
event.getEventType(),
               :               targetName, event.getEventId());
               :           try (ThreadNameAnnotator tna = ne
> So will this LOG.info line lies if hierarchical mode enabled?
Done. Modified to show "Time elapsed in dispatching event batch: {}" when 
hierarchical mode is enabled.


http://gerrit.cloudera.org:8080/#/c/21031/36/fe/src/main/java/org/apache/impala/catalog/events/RenameTableBarrierEvent.java
File 
fe/src/main/java/org/apache/impala/catalog/events/RenameTableBarrierEvent.java:

http://gerrit.cloudera.org:8080/#/c/21031/36/fe/src/main/java/org/apache/impala/catalog/events/RenameTableBarrierEvent.java@72
PS36, Line 72: nt pr
> Agree with Riza that adding "synchronized" to the methods is more maintaina
Done


http://gerrit.cloudera.org:8080/#/c/21031/38/fe/src/main/java/org/apache/impala/catalog/events/RenameTableBarrierEvent.java
File 
fe/src/main/java/org/apache/impala/catalog/events/RenameTableBarrierEvent.java:

http://gerrit.cloudera.org:8080/#/c/21031/38/fe/src/main/java/org/apache/impala/catalog/events/RenameTableBarrierEvent.java@71
PS38, Line 71:   /**
             :    * Rename event processing state of alter table rename event. 
It is combined state of
             :    * both pseudo-events.
             :    */
> Please add a comment in this class on why volatile variables are enough to
Have synchronized methods now according to the other comment.


http://gerrit.cloudera.org:8080/#/c/21031/38/fe/src/main/java/org/apache/impala/catalog/events/RenameTableBarrierEvent.java@82
PS38, Line 82:     // Whether pseudo create table event is skipped upon the 
event process.
             :     private boolean createSkipped_ = false;
             :
             :     // Whether pseudo create table event is processed.
             :     private boolean createProcessed_ = false;
> Should this stay volatile like the rest?
Have synchronized methods now according to the other comment.


http://gerrit.cloudera.org:8080/#/c/21031/36/fe/src/main/java/org/apache/impala/catalog/events/TableEventExecutor.java
File fe/src/main/java/org/apache/impala/catalog/events/TableEventExecutor.java:

http://gerrit.cloudera.org:8080/#/c/21031/36/fe/src/main/java/org/apache/impala/catalog/events/TableEventExecutor.java@158
PS36, Line 158:       if (event instanceof DbBarrierEvent) {
> Do we have test coverage for such cases?
Yes. It is covered with testEventSkippingWhenDropDatabaseQueuedBehind.


http://gerrit.cloudera.org:8080/#/c/21031/38/fe/src/main/java/org/apache/impala/catalog/events/TableEventExecutor.java
File fe/src/main/java/org/apache/impala/catalog/events/TableEventExecutor.java:

http://gerrit.cloudera.org:8080/#/c/21031/38/fe/src/main/java/org/apache/impala/catalog/events/TableEventExecutor.java@295
PS38, Line 295:    */
              :   long getOutstandingEventCount() {
              :     synchronized (outstandingEventCountLock_) {
              :       return outstandingEventCount_;
              :
> Is it possible to count directly by summing events_.size() of all tableProc
events_.size() is O(N) operation for ConcurrentLinkedQueue. It may also give 
inaccurate result if elements are added or removed during the execution of the 
size().  We would need to do this for 
(num_db_event_executors+2)*num_table_event_executors_per_db_event_executor 
queues.
We use outstanding event count for these purposes:
1. dbs and tables are assigned(if not already assigned) to appropriate 
executors/threads based on it.
2. Backpressure to control/restrict processEvents() from fetching more events 
when outstanding events exceed max_outstanding_events_on_executors.
3. Shown as guage metric.
We don't accurate count in any case. And offer() and poll() operations are 
thread safe.


http://gerrit.cloudera.org:8080/#/c/21031/38/fe/src/test/java/org/apache/impala/catalog/MetastoreApiTestUtils.java
File fe/src/test/java/org/apache/impala/catalog/MetastoreApiTestUtils.java:

http://gerrit.cloudera.org:8080/#/c/21031/38/fe/src/test/java/org/apache/impala/catalog/MetastoreApiTestUtils.java@196
PS38, Line 196:
              :   /**
              :    * Method to generate insert event for the transactional 
table to hms notification log
              :    */
> Please add comment what is this method doing.
Done


http://gerrit.cloudera.org:8080/#/c/21031/38/fe/src/test/java/org/apache/impala/catalog/events/EventExecutorServiceTest.java
File 
fe/src/test/java/org/apache/impala/catalog/events/EventExecutorServiceTest.java:

http://gerrit.cloudera.org:8080/#/c/21031/38/fe/src/test/java/org/apache/impala/catalog/events/EventExecutorServiceTest.java@93
PS38, Line 93: eventExecutorService_
> Assert that this is not null.
Done


http://gerrit.cloudera.org:8080/#/c/21031/38/fe/src/test/java/org/apache/impala/catalog/events/EventExecutorServiceTest.java@114
PS38, Line 114:     // before running new test
> Please add comment why this is necessary.
Yes. We set different instance of EventExecutorService the appropriate tests. 
This step here is to ensures the default EventExecutorService instance that was 
created within eventsProcessor_ is restored. Added a comment now.


http://gerrit.cloudera.org:8080/#/c/21031/38/fe/src/test/java/org/apache/impala/catalog/events/EventExecutorServiceTest.java@125
PS38, Line 125:   public void cleanUp() {
              :     assertEquals(MetastoreEventsProces
> Also assert that test ends with empty eventsProcessor_?
We use a new local EventExecutorService instance in each test method. And tests 
ensure EventExecutorService process the events generated within the respective 
test and verifies the expectations. It doesn't make sense to get outstanding 
event count of EventExecutorService at this place.


http://gerrit.cloudera.org:8080/#/c/21031/38/tests/custom_cluster/test_event_processing_perf.py
File tests/custom_cluster/test_event_processing_perf.py:

http://gerrit.cloudera.org:8080/#/c/21031/38/tests/custom_cluster/test_event_processing_perf.py@34
PS38, Line 34: class TestEventProcessingPerf(CustomClusterTestSuite):
> How long does it takes to run the whole TestEventProcessingPerf?
It takes very long time(~ 5hrs) to run with just with these smaller values. 
Have marked to skip tests in the normal run.


http://gerrit.cloudera.org:8080/#/c/21031/38/tests/custom_cluster/test_event_processing_perf.py@53
PS38, Line 53:
> Wherever you set very low hms_event_polling_interval_s to speedup tests, I
we are already starting custom cluster tests with DEFAULT_STATESTORE_ARGS.
--state_store_args=--statestore_update_frequency_ms=50 
--statestore_priority_update_frequency_ms=50 
--statestore_heartbeat_frequency_ms=50


http://gerrit.cloudera.org:8080/#/c/21031/38/tests/custom_cluster/test_event_processing_perf.py@228
PS38, Line 228:         True)
> Please write comment on what this method does.
Done


http://gerrit.cloudera.org:8080/#/c/21031/38/tests/util/event_processor_utils.py
File tests/util/event_processor_utils.py:

http://gerrit.cloudera.org:8080/#/c/21031/38/tests/util/event_processor_utils.py@60
PS38, Line 60: and outstanding_events == 0
> Is this only relevant if hierarchical mode is enabled?
Yes it is relevant when hierarchical event processing is enabled. And has no 
effect when it is disabled. get_outstanding_event_count() always returns 0 if 
hierarchical processing is disabled. Otherwise, appropriate value.
We just want to ensure there are no pending events(i.e., outstanding event 
count is 0) on event executors. We don't need expected_outstanding_event 
argument.



--
To view, visit http://gerrit.cloudera.org:8080/21031
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I76d8a739f9db6d40f01028bfd786a85d83f9e5d6
Gerrit-Change-Number: 21031
Gerrit-PatchSet: 39
Gerrit-Owner: Anonymous Coward <[email protected]>
Gerrit-Reviewer: Anonymous Coward <[email protected]>
Gerrit-Reviewer: Anonymous Coward <[email protected]>
Gerrit-Reviewer: Csaba Ringhofer <[email protected]>
Gerrit-Reviewer: Impala Public Jenkins <[email protected]>
Gerrit-Reviewer: Quanlong Huang <[email protected]>
Gerrit-Reviewer: Riza Suminto <[email protected]>
Gerrit-Reviewer: Sai Hemanth Gantasala <[email protected]>
Gerrit-Comment-Date: Mon, 31 Mar 2025 04:08:19 +0000
Gerrit-HasComments: Yes

Reply via email to