Anurag Mantripragada has posted comments on this change. ( )

Change subject: IMPALA-7971: Add support for insert events in event processor.

Patch Set 5:


Thanks for the comments Vihang. I cleaned up code in CatalogOpExecutor. Still 
figuring out a way to write tests to verify if tables are refreshed because of 
insert events.
Commit Message:
PS4, Line 24: Existing self-events logic cannot be used for insert events since
            :    firing insert event does not allow us to modify
> I think it is more appropriate to say existing self-events logic cannot be
File be/src/service/
PS4, Line 1106:       // is_overwrite is used to know the type of insert in FE.
> add a comment here explaining why this is needed.
File fe/src/main/java/org/apache/impala/catalog/
PS4, Line 839: HashSet<>(f
> would be appropriate to intialize the set with the capacity fdList.size() s
PS4, Line 842:
> suggest you to use Path.SEPARATOR instead of "/"
File fe/src/main/java/org/apache/impala/catalog/events/
PS4, Line 159: refresh
> refresh on a table/partition
PS4, Line 587: public static class InsertEvent extends MetastoreTableEvent {
> IIUC, the reason you are extending TableInvalidatingEvent is because you wa
You are right. Changing it back to old TableInvalidatingEvent.
PS4, Line 588:
> nit, add new line above the constructor. Add a javadoc
PS4, Line 606:       }
> add a // TODO : to handle self-events for insert case
PS4, Line 613:
> nit, just saying refresh is good enough. No need to say reload here.
PS4, Line 616:
> same as above. Just say refresh since I don't think reload means anything e
PS4, Line 623:       }
> Add a // TODO : One way to do this would be to change hive source code to r
PS4, Line 965: t object parameters used for self-
> Why do we need to rename? Currently, all the implementations of this sub-cl
PS4, Line 118:
> Ignore to keep it consistent with other entries of this table
PS4, Line 118:       |            |            |
             :  * | INSERT EVENT| Refres
> Just use Refresh unless Reload means something else.
File fe/src/main/java/org/apache/impala/service/
PS4, Line 3554:     FeCatalogUtils.loadAllPartitions((HdfsTable) table);
              :       // Map of partition ids to file names of all existing 
partitions touched by the
              :       // insert
> these can be moved to down below near the place where they are used to impr
PS4, Line 3575:           // Attempt to remove this partition name from from 
partsToCreate. If remove
> I think it will be much simpler if you keep track of PartitionIds instead o
This is a good suggestion for partitioned tables. However, for unpartitioned 
table we create a new partition instead of update in-place. This will result in 
a new partition id. I have changed the code to track fileNames as a 'null' key 
in the existingFilesForInsertedPartitions map. Please let me know if this is a 
bad practice.
PS4, Line 3671: }
> Won't this be always true? partsToLoadMetadata is created from update.getCr
This part no longer exists.
PS4, Line 3678:  Submit the watch request for the given cache directives.
              :       if (!cacheDirIds.isEmpty()) {
              :         catalog_.watchCacheDirs(cacheDirIds, 
> may be it will be simpler to handle partitioned and non-partitioned cases s
PS4, Line 3714:
              :    * Populate Insert Event Data from existing partitions 
touched by the insert and call
              :    * fireInsertEvent() if external event processing is enabled.
              :    */
              :   private void createInsertEvent(Table table,
              :       Map<Long, Set<String>> 
existingFilesForInsertedPartitions, boolean is_overwrite) {
              :     if (!catalog_.isExternalEventProcessingEnabled()) return;
              :     // If t
> This can be found simply by doing List<FeFsPartition> partitionsPostInsert
PS4, Line 3701:  finally {
              :       context.stop();
              :       table.getLock().unlock();
              :     }
              :     if (update.isSync_ddl()) {
              :       response.getResult().setVersion(
              :           catalog_.waitForSyncDdlVersion(response.getResult()));
              :     }
              :     return response;
              :   }
              :   /**
              :    * Populate Insert Event Data from existing partitions 
touched by the insert and call
              :    * fireInsertEvent() if external event processing is enabled.
              :    */
              :   private void createInsertEvent(Table table,
              :       Map<Long, Set<String>> 
existingFilesForInsertedPartitions, boolean is_overwrite) {
              :     if (!catalog_.isExternalEventProcessingEnabled()) return;
              :     // If table is partitioned, we add all existing partitions 
touched by this insert
              :     // to the insert event. We also find the delta of new files 
created in the
              :     // existing partitions to the insert event.
              :     if (table.getNumClusteringCols() > 0) {
              :       // Load existing parts touched by insert.
              :       Collection<? extends FeFsPartition> partsPostInsert =
              :       // Existing Partitions touched by the insert.
              :       for (FeFsPartition part : partsPostInsert) {
              :         // Find the delta of the files added by the insert if 
it is not an overwrite
              :         List<String> newFiles = new ArrayList<>();
              :         if (!is_overwrite) {
              :           Set<String> deltaFiles = 
              :           deltaFiles.removeAll(
              :           for (String fileName : deltaFiles) {
              :             newFiles.add(fileName);
              :           }
              :         }
              :         List<String> partVals = 
              :         fireInsertEvent(table, partVals, newFiles, 
              :       }
              :     } else { // Non-partitioned table
              :       List<String> newFiles = new ArrayList<>();
              :       if (!is_overwrite) {
              :         FeFsPartition singlePart =
Iterables.getOnlyElement(FeCatalogUtils.loadAllPartitions((HdfsTable) table));
> move this to a separate method. Inside the method you can do a check for ca
PS4, Line 3765: List<String> newFiles, boolea
> add more details w.r.t what goes into insert event and how it is used later
PS4, Line 3783:
> shouldn't this be error. Add the implication of this exception in the messa
File tests/custom_cluster/
PS4, Line 54:      data = self.execute_scalar("select * from %s.%s" % (db_name, 
> Thanks for adding this test, I think it is useful and we can extend this fu
I agree, there is no way to tell if the new data is due to an invalidate or by 
the code path in this patch. I think a better place for this test is in 
frontend tests. I will investigate.
PS4, Line 59:      time.sleep(10)
> assuming this time is in seconds, I am worried that this may be too less an

To view, visit
To unsubscribe, visit

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I7c48c5ca4bde18d532c582980aebbc25f1bf1c52
Gerrit-Change-Number: 12889
Gerrit-PatchSet: 5
Gerrit-Owner: Anurag Mantripragada <>
Gerrit-Reviewer: Anurag Mantripragada <>
Gerrit-Reviewer: Bharath Krishna <>
Gerrit-Reviewer: Bharath Vissapragada <>
Gerrit-Reviewer: Impala Public Jenkins <>
Gerrit-Reviewer: Paul Rogers <>
Gerrit-Reviewer: Vihang Karajgaonkar <>
Gerrit-Comment-Date: Tue, 02 Apr 2019 21:05:24 +0000
Gerrit-HasComments: Yes

Reply via email to