Bharath Vissapragada has posted comments on this change. ( 
http://gerrit.cloudera.org:8080/8235 )

Change subject: IMPALA-5429: Multi threaded block metadata loading
......................................................................


Patch Set 5:

(36 comments)

http://gerrit.cloudera.org:8080/#/c/8235/4//COMMIT_MSG
Commit Message:

http://gerrit.cloudera.org:8080/#/c/8235/4//COMMIT_MSG@59
PS4, Line 59:
> I see. So are these literally as simple as, to pick the first one, a single
That is correct.


http://gerrit.cloudera.org:8080/#/c/8235/5/be/src/catalog/catalog.cc
File be/src/catalog/catalog.cc:

http://gerrit.cloudera.org:8080/#/c/8235/5/be/src/catalog/catalog.cc@35
PS5, Line 35: DEFINE_int32(num_metadata_loading_threads, 16,
            :     "(Advanced) The number of metadata loading threads (degree of 
parallelism) to use "
            :     "when loading catalog metadata.");
> I'm confused by the commit message which talks about not loading from hms u
Right, this flag controls the number parallel *table* loads. And each table 
loading involves

- Loading HMS metadata (including schema details, partition information etc.)
- Loading/synthesizing block metadata

So this flag is about inter-table parallelism and the commit message talks 
about intra-table parallel loading. Does that make sense? (I clarified the 
content in the commit message a little to reflect this, incase that helps).


http://gerrit.cloudera.org:8080/#/c/8235/5/be/src/catalog/catalog.cc@42
PS5, Line 42: DEFINE_int32(max_s3_parts_parallel_load, 10,
> I would be more aggressive with this parameter and put it at 20.
Done


http://gerrit.cloudera.org:8080/#/c/8235/5/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
File fe/src/main/java/org/apache/impala/catalog/HdfsTable.java:

http://gerrit.cloudera.org:8080/#/c/8235/5/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java@215
PS5, Line 215:   // Block metadata loading stats for a single HDFS path.
> nit: File/block (since we're also loading/refreshing files). Also, you may
Done


http://gerrit.cloudera.org:8080/#/c/8235/5/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java@217
PS5, Line 217: loadedFiles_
> You may want to add a comment here. What is loaded vs refreshed? Is the one
Done


http://gerrit.cloudera.org:8080/#/c/8235/5/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java@218
PS5, Line 218: _
> I believe the convention is that we don't use '_' for public members.
Done. Was not aware of this.


http://gerrit.cloudera.org:8080/#/c/8235/5/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java@217
PS5, Line 217: public int loadedFiles_ = 0;
             :     public int refreshedFiles_ = 0;
             :     public int ignoredFiles_ = 0;
> add comments for these-- see the question regarding refreshedFiles below, f
Yep, the current variables are confusing for sure, redid them and added 
comments.


http://gerrit.cloudera.org:8080/#/c/8235/5/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java@231
PS5, Line 231: Runnable
> I don't know how this is used later on, but alternatively you can make Path
I settled for Runnable because it looked more cleaner to me with this class 
exposing the debugString() method and only those callers who need the return 
value can access that method. I'm fine either way. I switched to Callable.


http://gerrit.cloudera.org:8080/#/c/8235/5/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java@247
PS5, Line 247: Blocks on the loadBlockMetadata() call.
> Not following this comment. run() either calls refreshBlockMetadata() or lo
The comment was stale. Updated it.


http://gerrit.cloudera.org:8080/#/c/8235/5/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java@333
PS5, Line 333: loadBlockMetadata
> I know I am guilty for some of these names but maybe rename this to "resetA
Done


http://gerrit.cloudera.org:8080/#/c/8235/5/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java@363
PS5, Line 363: numUnknownDiskIds
> Are you overriding or incrementing the value of numUnknownDiskIds in the cr
Incrementing and will use the final value in L369. Tracking the create call 
which eventually calls HdfsPartition.createDiskIds(), it only increments and 
doesn't overwrite.


http://gerrit.cloudera.org:8080/#/c/8235/5/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java@368
PS5, Line 368: for (HdfsPartition partition: partitions) 
partition.setFileDescriptors(
> Am I misreading this or does each partition get set to the same list of new
That is right. All the HdfsPartitions in 'partitions' map to the same partDir. 
So we update the fileDescs for each of them.


http://gerrit.cloudera.org:8080/#/c/8235/5/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java@368
PS5, Line 368: newFileDescs
> Hm, that doesn't seem particularly safe (i.e. using the same list for every
Like we discussed, there is a subtle bug here (even without the patch). We can 
see the inconsistency with the following steps

- create table test(a int) partitioned by (b int);
- insert into test partition(b=1) values (1);
- alter table test add partition (b=2) location 
'hdfs://localhost:20500/test-warehouse/test/b=1/' -- Point b=2 to the same 
location as b=1

[localhost:21000] > show partitions test;
Query: show partitions test
+-------+-------+--------+------+--------------+-------------------+--------+-------------------+------------------------------------------------+
| b     | #Rows | #Files | Size | Bytes Cached | Cache Replication | Format | 
Incremental stats | Location                                       |
+-------+-------+--------+------+--------------+-------------------+--------+-------------------+------------------------------------------------+
| 1     | -1    | 1      | 2B   | NOT CACHED   | NOT CACHED        | TEXT   | 
false             | hdfs://localhost:20500/test-warehouse/test/b=1 |
| 2     | -1    | 1      | 2B   | NOT CACHED   | NOT CACHED        | TEXT   | 
false             | hdfs://localhost:20500/test-warehouse/test/b=1 |
| Total | -1    | 2      | 4B   | 0B           |                   |        |   
                |                                                |
+-------+-------+--------+------+--------------+-------------------+--------+-------------------+------------------------------------------------+


insert into test partition(b=1) values (2);
show files in test;
Query: show files in test
+----------------------------------------------------------------------------------------------------+------+-----------+
| Path                                                                          
                     | Size | Partition |
+----------------------------------------------------------------------------------------------------+------+-----------+
| 
hdfs://localhost:20500/test-warehouse/test/b=1/2e44cd49e8c3d30d-572fc97800000000_627280230_data.0.
 | 2B   | b=1       |
| 
hdfs://localhost:20500/test-warehouse/test/b=1/e44245ad5c0ef020-a08716d00000000_1244237483_data.0.
 | 2B   | b=1       |
| 
hdfs://localhost:20500/test-warehouse/test/b=1/e44245ad5c0ef020-a08716d00000000_1244237483_data.0.
 | 2B   | b=2       |
+----------------------------------------------------------------------------------------------------+------+-----------+

invalidate metadata test;
 show files in test;
Query: show files in test
+----------------------------------------------------------------------------------------------------+------+-----------+
| Path                                                                          
                     | Size | Partition |
+----------------------------------------------------------------------------------------------------+------+-----------+
| 
hdfs://localhost:20500/test-warehouse/test/b=1/2e44cd49e8c3d30d-572fc97800000000_627280230_data.0.
 | 2B   | b=1       |
| 
hdfs://localhost:20500/test-warehouse/test/b=1/e44245ad5c0ef020-a08716d00000000_1244237483_data.0.
 | 2B   | b=1       |
| 
hdfs://localhost:20500/test-warehouse/test/b=1/2e44cd49e8c3d30d-572fc97800000000_627280230_data.0.
 | 2B   | b=2       |
| 
hdfs://localhost:20500/test-warehouse/test/b=1/e44245ad5c0ef020-a08716d00000000_1244237483_data.0.
 | 2B   | b=2       |
+----------------------------------------------------------------------------------------------------+------+-----------+

Bottom line: 'show files' shows different files before and after invalidate.

Ideally we can fix it in this patch, but as we discussed I think we can do it 
as a separate patch after discussing with the community.


http://gerrit.cloudera.org:8080/#/c/8235/5/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java@368
PS5, Line 368: for (HdfsPartition partition: partitions) 
partition.setFileDescriptors(
> Just clarified this so I'll post my misunderstanding here. The comment stat
Yea, s/correspond/map.


http://gerrit.cloudera.org:8080/#/c/8235/5/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java@380
PS5, Line 380: changed
             :    * mtime
> It's not just the changed mtime that we're looking for in order to determin
Reworded this a little with a reference to hasFileChanged(). LMK if it looks ok.


http://gerrit.cloudera.org:8080/#/c/8235/5/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java@383
PS5, Line 383: The initial table load still uses the listFiles()
             :    * on the data directory that fetches both the FileStatus as 
well as BlockLocations in
             :    * a single call.
> remove
Done


http://gerrit.cloudera.org:8080/#/c/8235/5/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java@398
PS5, Line 398: get(0)
> Comment why we take the file descriptors of one partition and that it doesn
Done


http://gerrit.cloudera.org:8080/#/c/8235/5/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java@399
PS5, Line 399: public String apply(FileDescriptor desc) {
             :               return desc.getFileName();
             :             }
> Add @Override and make it a single line (if it fits)
Done


http://gerrit.cloudera.org:8080/#/c/8235/5/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java@426
PS5, Line 426: new Reference<Long>(Long.valueOf(0)
> why not use numUnknownDiskIds here?
Done


http://gerrit.cloudera.org:8080/#/c/8235/5/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java@431
PS5, Line 431: ++loadStats.refreshedFiles_;
> does refreshedFiles mean "file blocks reloaded" or "file checked for reload
Yea the whole thing about refreshedFiles is confusing and it is not needed I 
guess. We can just have loaded/ignored files. In a typical 
loadFromScrach/invalidate, loadedFiles >> ignoredFiles and during refresh 
loadedFiles << ignoredFiles. ignoredFiles now account for the files that were 
checked for reload but weren't reloaded since they haven't changed. IMO that is 
more clear.


http://gerrit.cloudera.org:8080/#/c/8235/5/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java@433
PS5, Line 433: for (HdfsPartition partition: partitions) 
partition.setFileDescriptors(n
> same question as in the load method.
Answered above.


http://gerrit.cloudera.org:8080/#/c/8235/5/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java@448
PS5, Line 448: (fd == null) || (fd.getFileLength() != status.getLen()) ||
             :       (fd.getModificationTime() != status.getModificationTime());
> I think we were also checking if the partition was cached. Isn't this check
Couldn't think of why we need to force reload block md for cached partitions. 
Hence removed it. Let me know if it was included for a specific reason.


http://gerrit.cloudera.org:8080/#/c/8235/5/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java@455
PS5, Line 455: refreshFileMetadata
> Maybe call it refreshPartitionStorageMetadata? Overall, it may make sense t
PartitionStorageMetadata sounds better, I changed it at multiple places. Let me 
know if it looks better.


http://gerrit.cloudera.org:8080/#/c/8235/5/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java@694
PS5, Line 694: Exception
> Why this change?
Stale change, forgot to undo. Done


http://gerrit.cloudera.org:8080/#/c/8235/5/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java@773
PS5, Line 773: HDFS and S3
> just to clarify, HdfsTable covers both hdfs table metadata as well as metad
Correct. We use the same hdfs client library to access both of them.


http://gerrit.cloudera.org:8080/#/c/8235/5/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java@783
PS5, Line 783: getFileSystem(CONF)
> I noticed that this is called in many places in this class-- is it bc a giv
Correct. This is one of the overheads as noticed in the perf runs and 
unfortunately we can't workaround it


http://gerrit.cloudera.org:8080/#/c/8235/5/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java@787
PS5, Line 787:     int threadPoolSize = 
FileSystemUtil.supportsStorageIds(tableFs) ?
> What is the expected behavior for tables with mixed FSs?
Just implemented this as a first cut. I don't think it makes sense to loop 
through all partitions in this call, instead we need to maintain counters of 
hdfs and non-hdfs partitions and keep them updated in adds/drops. Once we have 
those numbers we can do some fancy things. May be we can do it as a follow up 
patch? I can raise a new jira for it and link it to this one


http://gerrit.cloudera.org:8080/#/c/8235/5/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java@801
PS5, Line 801: getLoadingThreadPoolSize
> can different partitions have different number of files? if so, work across
Yes, each partition can have its own no. of files, so the work definitely 
varies across threads. It is roughly proportional to the no. of blocks for all 
the files in that partition directory. Not sure if that answers your question.


http://gerrit.cloudera.org:8080/#/c/8235/5/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java@801
PS5, Line 801: getLoadingThreadPoolSize
> Parallelization of metadata loading is done on per partition granularity.
Done


http://gerrit.cloudera.org:8080/#/c/8235/5/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java@805
PS5, Line 805: ExecutorService partitionLoadingPool = 
Executors.newFixedThreadPool(threadPoolSize);
> What is the benefit of having a one thread pool per loaded table instead of
I'm summarizing what we discussed so that others following this can comment.

I don't think the overhead is too much, especially given we just start 5/10 
threads. While I agree that having a global thread pool (across all the tables) 
can give us more control of the resource consumption/queue bounds, I think it 
is more complicated and involves writing proper scheduler.


http://gerrit.cloudera.org:8080/#/c/8235/5/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java@821
PS5, Line 821: debugString
> This will get information from loadingStats but how do you know that loadin
Done


http://gerrit.cloudera.org:8080/#/c/8235/5/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java@1237
PS5, Line 1237: HashMap
> nit: Map
Done


http://gerrit.cloudera.org:8080/#/c/8235/5/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java@1303
PS5, Line 1303: Returns the HdfsPartition objects associated with the specified 
list of partition
              :    * names.
> Update comment.
Done


http://gerrit.cloudera.org:8080/#/c/8235/5/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
File fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java:

http://gerrit.cloudera.org:8080/#/c/8235/5/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java@626
PS5, Line 626: private Table addHdfsPartitions(Table tbl, List<Partition> 
partitions)
             :       throws CatalogException {
             :     Preconditions.checkNotNull(tbl);
             :     Preconditions.checkNotNull(partitions);
             :     if (!(tbl instanceof HdfsTable)) {
             :       throw new CatalogException("Table " + tbl.getFullName() + 
" is not an HDFS table");
             :     }
             :     HdfsTable hdfsTable = (HdfsTable) tbl;
             :     List<HdfsPartition> hdfsPartitions = 
hdfsTable.createAndLoadPartitions(partitions);
             :     for (HdfsPartition hdfsPartition: hdfsPartitions) {
             :       catalog_.addPartition(hdfsPartition);
             :     }
             :     return hdfsTable;
             :   }
> Lot's of duplication with the previous one. How expensive would it be to si
Done


http://gerrit.cloudera.org:8080/#/c/8235/5/fe/src/main/java/org/apache/impala/util/ListMap.java
File fe/src/main/java/org/apache/impala/util/ListMap.java:

http://gerrit.cloudera.org:8080/#/c/8235/5/fe/src/main/java/org/apache/impala/util/ListMap.java@59
PS5, Line 59: synchronized (list_) {
> Why synchronized on list_? This confuses the locking scheme. If you want yo
Done


http://gerrit.cloudera.org:8080/#/c/8235/5/fe/src/main/java/org/apache/impala/util/ListMap.java@78
PS5, Line 78: list_ = Collections.synchronizedList(list);
> Hm, I think this new implementation is way too complicated. I'd start with
Not totally sure I follow this. Do you mean this list_ need not be synchronized?



--
To view, visit http://gerrit.cloudera.org:8080/8235
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I07eaa7151dfc4d56da8db8c2654bd65d8f808481
Gerrit-Change-Number: 8235
Gerrit-PatchSet: 5
Gerrit-Owner: Bharath Vissapragada <[email protected]>
Gerrit-Reviewer: Bharath Vissapragada <[email protected]>
Gerrit-Reviewer: Dimitris Tsirogiannis <[email protected]>
Gerrit-Reviewer: Jim Apple <[email protected]>
Gerrit-Reviewer: Mostafa Mokhtar <[email protected]>
Gerrit-Reviewer: Vuk Ercegovac <[email protected]>
Gerrit-Comment-Date: Mon, 16 Oct 2017 18:32:55 +0000
Gerrit-HasComments: Yes

Reply via email to