Anurag Mantripragada has posted comments on this change. ( 
http://gerrit.cloudera.org:8080/12889 )

Change subject: IMPALA-7971: Add support for insert events in event processor.
......................................................................


Patch Set 5:

(24 comments)

Thanks for the comments Vihang. I cleaned up code in CatalogOpExecutor. Still 
figuring out a way to write tests to verify if tables are refreshed because of 
insert events.

http://gerrit.cloudera.org:8080/#/c/12889/4//COMMIT_MSG
Commit Message:

http://gerrit.cloudera.org:8080/#/c/12889/4//COMMIT_MSG@24
PS4, Line 24: Existing self-events logic cannot be used for insert events since
            :    firing insert event does not allow us to modify
> I think it is more appropriate to say existing self-events logic cannot be
Done


http://gerrit.cloudera.org:8080/#/c/12889/4/be/src/service/client-request-state.cc
File be/src/service/client-request-state.cc:

http://gerrit.cloudera.org:8080/#/c/12889/4/be/src/service/client-request-state.cc@1106
PS4, Line 1106:       // is_overwrite is used to know the type of insert in FE.
> add a comment here explaining why this is needed.
Done


http://gerrit.cloudera.org:8080/#/c/12889/4/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java
File fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java:

http://gerrit.cloudera.org:8080/#/c/12889/4/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java@839
PS4, Line 839: HashSet<>(f
> would be appropriate to intialize the set with the capacity fdList.size() s
Done


http://gerrit.cloudera.org:8080/#/c/12889/4/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java@842
PS4, Line 842:
> suggest you to use Path.SEPARATOR instead of "/"
Done


http://gerrit.cloudera.org:8080/#/c/12889/4/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
File fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java:

http://gerrit.cloudera.org:8080/#/c/12889/4/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java@159
PS4, Line 159: refresh
> refresh on a table/partition
Done


http://gerrit.cloudera.org:8080/#/c/12889/4/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java@587
PS4, Line 587: public static class InsertEvent extends MetastoreTableEvent {
> IIUC, the reason you are extending TableInvalidatingEvent is because you wa
You are right. Changing it back to old TableInvalidatingEvent.


http://gerrit.cloudera.org:8080/#/c/12889/4/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java@588
PS4, Line 588:
> nit, add new line above the constructor. Add a javadoc
Done


http://gerrit.cloudera.org:8080/#/c/12889/4/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java@606
PS4, Line 606:       }
> add a // TODO : to handle self-events for insert case
Done


http://gerrit.cloudera.org:8080/#/c/12889/4/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java@613
PS4, Line 613:
> nit, just saying refresh is good enough. No need to say reload here.
Done


http://gerrit.cloudera.org:8080/#/c/12889/4/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java@616
PS4, Line 616:
> same as above. Just say refresh since I don't think reload means anything e
Done


http://gerrit.cloudera.org:8080/#/c/12889/4/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java@623
PS4, Line 623:       }
> Add a // TODO : One way to do this would be to change hive source code to r
Done


http://gerrit.cloudera.org:8080/#/c/12889/4/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java@965
PS4, Line 965: t object parameters used for self-
> Why do we need to rename? Currently, all the implementations of this sub-cl
Done


http://gerrit.cloudera.org:8080/#/c/12889/4/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java
File 
fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java:

http://gerrit.cloudera.org:8080/#/c/12889/4/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java@118
PS4, Line 118:
> Ignore to keep it consistent with other entries of this table
Done


http://gerrit.cloudera.org:8080/#/c/12889/4/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java@118
PS4, Line 118:       |            |            |
             :  * | INSERT EVENT| Refres
> Just use Refresh unless Reload means something else.
Done


http://gerrit.cloudera.org:8080/#/c/12889/4/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
File fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java:

http://gerrit.cloudera.org:8080/#/c/12889/4/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java@3554
PS4, Line 3554:     FeCatalogUtils.loadAllPartitions((HdfsTable) table);
              :       // Map of partition ids to file names of all existing 
partitions touched by the
              :       // insert
> these can be moved to down below near the place where they are used to impr
Done


http://gerrit.cloudera.org:8080/#/c/12889/4/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java@3575
PS4, Line 3575:           // Attempt to remove this partition name from from 
partsToCreate. If remove
> I think it will be much simpler if you keep track of PartitionIds instead o
This is a good suggestion for partitioned tables. However, for unpartitioned 
table we create a new partition instead of update in-place. This will result in 
a new partition id. I have changed the code to track fileNames as a 'null' key 
in the existingFilesForInsertedPartitions map. Please let me know if this is a 
bad practice.


http://gerrit.cloudera.org:8080/#/c/12889/4/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java@3671
PS4, Line 3671: }
> Won't this be always true? partsToLoadMetadata is created from update.getCr
This part no longer exists.


http://gerrit.cloudera.org:8080/#/c/12889/4/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java@3678
PS4, Line 3678:  Submit the watch request for the given cache directives.
              :       if (!cacheDirIds.isEmpty()) {
              :         catalog_.watchCacheDirs(cacheDirIds, 
tblName.toThrift());
> may be it will be simpler to handle partitioned and non-partitioned cases s
Done


http://gerrit.cloudera.org:8080/#/c/12889/4/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java@3714
PS4, Line 3714:
              :    * Populate Insert Event Data from existing partitions 
touched by the insert and call
              :    * fireInsertEvent() if external event processing is enabled.
              :    */
              :   private void createInsertEvent(Table table,
              :       Map<Long, Set<String>> 
existingFilesForInsertedPartitions, boolean is_overwrite) {
              :     if (!catalog_.isExternalEventProcessingEnabled()) return;
              :
              :     // If t
> This can be found simply by doing List<FeFsPartition> partitionsPostInsert
Done


http://gerrit.cloudera.org:8080/#/c/12889/4/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java@3701
PS4, Line 3701:  finally {
              :       context.stop();
              :       
Preconditions.checkState(!catalog_.getLock().isWriteLockedByCurrentThread());
              :       table.getLock().unlock();
              :     }
              :
              :     if (update.isSync_ddl()) {
              :       response.getResult().setVersion(
              :           catalog_.waitForSyncDdlVersion(response.getResult()));
              :     }
              :     return response;
              :   }
              :
              :   /**
              :    * Populate Insert Event Data from existing partitions 
touched by the insert and call
              :    * fireInsertEvent() if external event processing is enabled.
              :    */
              :   private void createInsertEvent(Table table,
              :       Map<Long, Set<String>> 
existingFilesForInsertedPartitions, boolean is_overwrite) {
              :     if (!catalog_.isExternalEventProcessingEnabled()) return;
              :
              :     // If table is partitioned, we add all existing partitions 
touched by this insert
              :     // to the insert event. We also find the delta of new files 
created in the
              :     // existing partitions to the insert event.
              :     if (table.getNumClusteringCols() > 0) {
              :       // Load existing parts touched by insert.
              :       Collection<? extends FeFsPartition> partsPostInsert =
              :           
((FeFsTable)table).loadPartitions(existingFilesForInsertedPartitions.keySet());
              :       // Existing Partitions touched by the insert.
              :       for (FeFsPartition part : partsPostInsert) {
              :         // Find the delta of the files added by the insert if 
it is not an overwrite
              :         List<String> newFiles = new ArrayList<>();
              :         if (!is_overwrite) {
              :           Set<String> deltaFiles = 
((HdfsPartition)part).getFileNames();
              :           deltaFiles.removeAll(
              :               
existingFilesForInsertedPartitions.get(part.getId()));
              :           for (String fileName : deltaFiles) {
              :             newFiles.add(fileName);
              :           }
              :         }
              :         List<String> partVals = 
part.getPartitionValuesAsStrings(true);
              :         fireInsertEvent(table, partVals, newFiles, 
is_overwrite);
              :       }
              :     } else { // Non-partitioned table
              :       List<String> newFiles = new ArrayList<>();
              :       if (!is_overwrite) {
              :         FeFsPartition singlePart =
              :             
Iterables.getOnlyElement(FeCatalogUtils.loadAllPartitions((HdfsTable) table));
              :
> move this to a separate method. Inside the method you can do a check for ca
Done


http://gerrit.cloudera.org:8080/#/c/12889/4/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java@3765
PS4, Line 3765: List<String> newFiles, boolea
> add more details w.r.t what goes into insert event and how it is used later
Done


http://gerrit.cloudera.org:8080/#/c/12889/4/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java@3783
PS4, Line 3783:
> shouldn't this be error. Add the implication of this exception in the messa
Done


http://gerrit.cloudera.org:8080/#/c/12889/4/tests/custom_cluster/test_event_processing.py
File tests/custom_cluster/test_event_processing.py:

http://gerrit.cloudera.org:8080/#/c/12889/4/tests/custom_cluster/test_event_processing.py@54
PS4, Line 54:      data = self.execute_scalar("select * from %s.%s" % (db_name, 
TBL_INSERT_NOPART))
> Thanks for adding this test, I think it is useful and we can extend this fu
I agree, there is no way to tell if the new data is due to an invalidate or by 
the code path in this patch. I think a better place for this test is in 
frontend tests. I will investigate.


http://gerrit.cloudera.org:8080/#/c/12889/4/tests/custom_cluster/test_event_processing.py@59
PS4, Line 59:      time.sleep(10)
> assuming this time is in seconds, I am worried that this may be too less an
Done



--
To view, visit http://gerrit.cloudera.org:8080/12889
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I7c48c5ca4bde18d532c582980aebbc25f1bf1c52
Gerrit-Change-Number: 12889
Gerrit-PatchSet: 5
Gerrit-Owner: Anurag Mantripragada <anu...@cloudera.com>
Gerrit-Reviewer: Anurag Mantripragada <anu...@cloudera.com>
Gerrit-Reviewer: Bharath Krishna <bhar...@cloudera.com>
Gerrit-Reviewer: Bharath Vissapragada <bhara...@cloudera.com>
Gerrit-Reviewer: Impala Public Jenkins <impala-public-jenk...@cloudera.com>
Gerrit-Reviewer: Paul Rogers <prog...@cloudera.com>
Gerrit-Reviewer: Vihang Karajgaonkar <vih...@cloudera.com>
Gerrit-Comment-Date: Tue, 02 Apr 2019 21:05:24 +0000
Gerrit-HasComments: Yes

Reply via email to