Thank you for the explanation! Yes, I'm using HDFS. The single replica setup is only for test purposes at the moment. I think this makes it easier to gain some first results since less modifications (scheduler etc.) are neccessary. I would like to test the DistributedPlanner modification in my virtual cluster. I used a customized Vagrant script to install Impala on multiple hosts (s.attachment). It simply installs cloudera-manager-server-db, cloudera-manager-server and cloudera-manager-daemons via apt-get. What would be the simplest solution to setup my modified version? Could I simply call ./buildall.sh and change the script to sth. like this?
echo "Install java..." apt-get -q -y --force-yes install oracle-j2sdk1.7 echo "Download impala..." wget https://... where I uploaded my modified version echo "Extract impala..." tar -xvzf Impala-cdh5-trunk.tar.gz cd Impala-cdh5-trunk echo "Build impala..." ./buildall.sh echo "Start impala instances..." service cloudera-scm-server-db initdb service cloudera-scm-server-db start service cloudera-scm-server start Or is there another, maybe even easier method, to test the code? Maybe via bootstrap_development.sh / minicluster? Best regards Philipp 2018-04-05 18:39 GMT+02:00 Alexander Behm <alex.b...@cloudera.com>: > Apologies for the late response. Btw, your previous post was clear enough > to me, so no worries :) > > > On Wed, Apr 4, 2018 at 7:46 AM, Philipp Krause <philippkrause.mail@ > googlemail.com> wrote: > >> Hello Alex, >> >> I think my previous post has been too long and confusing. I apologize for >> that! >> >> If replicas are completely deactivated, all scan ranges of a block are >> mapped to the one host, where the block is located on. This host is the >> "executor"/reader for all the scan ranges of this block. Is that correct? >> > > Yes, assuming you are using HDFS. > > >> >> I tried to visualize my understanding of the scan_range to host mapping >> for my use case (s. attachment). Could you please have a quick look at it >> and tell me if this is correct? >> >> "The existing scan range assignment is scan-node centric. For each scan >> node, we independently decide which of its scan ranges should be processed >> by which host." >> Without replicas, all scan ranges of a block would be assigend to the >> same host where this block is located on. Isn't everything local here, so >> that Table_A - Block_0 and Table_B - Block_0 can be joined local or are >> further steps neccessary? The condition in the DistributedPlanner you >> pointed to me is set to false (no exchange nodes). >> >> "You want it to be host-centric. For each host, collect the local scan >> ranges of *all* scan nodes, and assign them to that host." >> Wouldn't the standard setup from above work? Wouldn't I assign all (the >> same) scan ranges to each host in this case here? >> > > The standard setup works only in if every block only has exactly one > replica. For our purposes, that is basically never the case (who would > store production data without replication?), so the single-replica > assumption was not clear to me. > > Does your current setup (only changing the planner and not the scheduler) > produce the expected results? > > >> >> Thank you very much! >> >> Best regards >> Philipp >> >> 2018-03-28 21:04 GMT+02:00 Philipp Krause <philippkrause.mail@googlemail >> .com>: >> >>> Thank you for your answer and sorry for my delay! >>> >>> If my understanding is correct, the list of scan nodes consists of all >>> nodes which contain a *local* block from a table that is needed for the >>> query (Assumption: I have no replicas in my first tests). If TableA-Block0 >>> is on Node_0, isn't Node_0 automatically a scan node? And wouldn't this >>> scan node always be the host for the complete scan range(s) then? >>> >>> "For each scan node, we independently decide which of its scan ranges >>> should be processed by which host." >>> >>> https://github.com/cloudera/Impala/blob/cdh5-trunk/be/src/sc >>> heduling/scheduler.cc#L532 >>> // Loop over all scan ranges, select an executor for those with local >>> impalads and >>> // collect all others for later processing. >>> >>> So in this whole block, scan ranges are assigned to the closest executor >>> (=host?). But isn't the closest executor always the node the block is >>> located on (assumed impalad is installed and I have no replicas)? And isn't >>> this node always a scan node at the same time? Otherwise a thread on a >>> remote host had to read the corresponding scan range, which would be more >>> expensive. The only exception I can think of is when all threads on the >>> local node are busy. Or, if I use replicas and all other threads of my node >>> with the "original" block are busy, a thread on another node which contains >>> a replica could read a special scan range of its local block. Is my >>> understanding correct here? >>> >>> Aren't all scan ranges read locally by its scan nodes if I have impalad >>> installed on all nodes? And am I right, that the scan range is only based >>> on its length which refers to maxScanRangeLength in >>> computeScanRangeLocations? >>> https://github.com/cloudera/Impala/blob/cdh5-trunk/fe/src/ma >>> in/java/org/apache/impala/planner/HdfsScanNode.java#L721 >>> >>> I hope you can help me with the scan node <-> scan range->host >>> relationship. If I have Table_A-Block_0 and Table_B_Block_0 on the same >>> node (which I want to join locally), I don't get the point of why scan >>> ranges could be assigned to another host in my scenario. >>> >>> Best regads and thank you very much! >>> Philipp Krause >>> >>> Am 21.03.2018 um 05:21 schrieb Alexander Behm: >>> >>> Thanks for following up. I think I understand your setup. >>> >>> If you want to not think about scan ranges, then you can modify >>> HdfsScanNode.computeScanRangeLocations(). For example, you could change >>> it to produce one scan range per file or per HDFS block. That way you'd >>> know exactly what a scan range corresponds to. >>> >>> I think the easiest/fastest way for you to make progress is to >>> re-implement the existing scan range assignment logic in that place in the >>> code I had pointed you to. There is no quick fix to change the existing >>> behavior. >>> The existing scan range assignment is scan-node centric. For each scan >>> node, we independently decide which of its scan ranges should be processed >>> by which host. >>> >>> I believe an algorithm to achieve your goal would look completely >>> different. You want it to be host-centric. For each host, collect the local >>> scan ranges of *all* scan nodes, and assign them to that host. >>> >>> Does that make sense? >>> >>> Alex >>> >>> >>> On Mon, Mar 19, 2018 at 1:02 PM, Philipp Krause < >>> philippkrause.m...@googlemail.com> wrote: >>> >>>> I'd like to provide a small example for our purpose. The last post may >>>> be a bit confusing, so here's a very simple example in the attached pdf >>>> file. I hope, it's understandable. Otherwise, please give me a short >>>> feedback. >>>> >>>> Basically, I only want each data node to join all it's local blocks. Is >>>> there a range mapping needed or is it possible to easily join all local >>>> blocks (regardless of its content) since everything is already "prepared"? >>>> Maybe you can clarify this for me. >>>> >>>> As you can see in the example, the tables are not partitioned by ID. >>>> The files are manually prepared by the help of the modulo function. So I >>>> don't have a range like [0,10], but something like 0,5,10,15 etc. >>>> >>>> I hope, I didn't make it too complicated and confusing. I think, the >>>> actual idea behind this is really simple and I hope you can help me to get >>>> this working. >>>> >>>> Best regards and thank you very much for your time! >>>> Philipp >>>> >>>> Am 18.03.2018 um 17:32 schrieb Philipp Krause: >>>> >>>> Hi! At the moment the data to parquet (block) mapping is based on a >>>> simple modulo function: Id % #data_nodes. So with 5 data nodes all rows >>>> with Id's 0,5,10,... are written to Parquet_0, Id's 1,4,9 are written to >>>> Parquet_1 etc. That's what I did manually. Since the parquet file size and >>>> the block size are both set to 64MB, each parquet file will result in one >>>> block when I transfer the parquet files to HDFS. By default, HDFS >>>> distributes the blocks randomly. For test purposes I transferred >>>> corresponding blocks from Table_A and Table_B to the same data node >>>> (Table_A - Block_X with Id's 0,5,10 and Table_B - Block_Y with Id's >>>> 0,5,10). In this case, they are transferred to data_node_0 because the >>>> modulo function (which I want to implement in the scheduler) returns 0 for >>>> these Id's. This is also done manually at the moment. >>>> >>>> 1.) DistributedPlanner: For first, upcoming tests I simply changed the >>>> first condition in the DistributedPlanner to true to avoid exchange nodes. >>>> >>>> 2.) The scheduler: That's the part I'm currently struggling with. For >>>> first tests, block replication is deactivated. I'm not sure how / where to >>>> implement the modulo function for scan range to host mapping. Without the >>>> modulo function, I had to implement a hard coded mapping (something like >>>> "range" 0-0, 5-5, 10-10 -> Data_node_0 etc.). Is that correct? Instead I >>>> would like to use a slightly more flexible solution by the help of this >>>> modulo function for the host mapping. >>>> >>>> I would be really grateful if you could give me a hint for the >>>> scheduling implementation. I try to go deeper through the code meanwhile. >>>> >>>> Best regards and thank you in advance >>>> Philipp >>>> >>>> >>>> Am 14.03.2018 um 08:06 schrieb Philipp Krause: >>>> >>>> Thank you very much for these information! I'll try to implement these >>>> two steps and post some updates within the next days! >>>> >>>> Best regards >>>> Philipp >>>> >>>> 2018-03-13 5:38 GMT+01:00 Alexander Behm <alex.b...@cloudera.com>: >>>> >>>>> Cool that you working on a research project with Impala! >>>>> >>>>> Properly adding such a feature to Impala is a substantial effort, but >>>>> hacking the code for an experiment or two seems doable. >>>>> >>>>> I think you will need to modify two things: (1) the planner to not add >>>>> exchange nodes, and (2) the scheduler to assign the co-located scan ranges >>>>> to the same host. >>>>> >>>>> Here are a few starting points in the code: >>>>> >>>>> 1) DistributedPlanner >>>>> https://github.com/apache/impala/blob/master/fe/src/main/jav >>>>> a/org/apache/impala/planner/DistributedPlanner.java#L318 >>>>> >>>>> The first condition handles the case where no exchange nodes need to >>>>> be added because the join inputs are already suitably partitioned. >>>>> You could hack the code to always go into that codepath, so no >>>>> exchanges are added. >>>>> >>>>> 2) The scheduler >>>>> https://github.com/apache/impala/blob/master/be/src/scheduli >>>>> ng/scheduler.cc#L226 >>>>> >>>>> You'll need to dig through and understand that code so that you can >>>>> make the necessary changes. Change the scan range to host mapping to your >>>>> liking. The rest of the code should just work. >>>>> >>>>> Cheers, >>>>> >>>>> Alex >>>>> >>>>> >>>>> On Mon, Mar 12, 2018 at 6:55 PM, Philipp Krause < >>>>> philippkrause.m...@googlemail.com> wrote: >>>>> >>>>>> Thank you very much for your quick answers! >>>>>> The intention behind this is to improve the execution time and >>>>>> (primarily) to examine the impact of block-co-location (research project) >>>>>> for this particular query (simplified): >>>>>> >>>>>> select A.x, B.y, A.z from tableA as A inner join tableB as B on >>>>>> A.id=B.id >>>>>> >>>>>> The "real" query includes three joins and the data size is in >>>>>> pb-range. Therefore several nodes (5 in the test environment with less >>>>>> data) are used (without any load balancer). >>>>>> >>>>>> Could you give me some hints what code changes are required and which >>>>>> files are affected? I don't know how to give Impala the information that >>>>>> it >>>>>> should only join the local data blocks on each node and then pass it to >>>>>> the >>>>>> "final" node which receives all intermediate results. I hope you can help >>>>>> me to get this working. That would be awesome! >>>>>> Best regards >>>>>> Philipp >>>>>> >>>>>> Am 12.03.2018 um 18:38 schrieb Alexander Behm: >>>>>> >>>>>> I suppose one exception is if your data lives only on a single node. >>>>>> Then you can set num_nodes=1 and make sure to send the query request to >>>>>> the >>>>>> impalad running on the same data node as the target data. Then you should >>>>>> get a local join. >>>>>> >>>>>> On Mon, Mar 12, 2018 at 9:30 AM, Alexander Behm < >>>>>> alex.b...@cloudera.com> wrote: >>>>>> >>>>>>> Such a specific block arrangement is very uncommon for typical >>>>>>> Impala setups, so we don't attempt to recognize and optimize this narrow >>>>>>> case. In particular, such an arrangement tends to be short lived if you >>>>>>> have the HDFS balancer turned on. >>>>>>> >>>>>>> Without making code changes, there is no way today to remove the >>>>>>> data exchanges and make sure that the scheduler assigns scan splits to >>>>>>> nodes in the desired way (co-located, but with possible load imbalance). >>>>>>> >>>>>>> In what way is the current setup unacceptable to you? Is this >>>>>>> pre-mature optimization? If you have certain performance >>>>>>> expectations/requirements for specific queries we might be able to help >>>>>>> you >>>>>>> improve those. If you want to pursue this route, please help us by >>>>>>> posting >>>>>>> complete query profiles. >>>>>>> >>>>>>> Alex >>>>>>> >>>>>>> On Mon, Mar 12, 2018 at 6:29 AM, Philipp Krause < >>>>>>> philippkrause.m...@googlemail.com> wrote: >>>>>>> >>>>>>>> Hello everyone! >>>>>>>> >>>>>>>> In order to prevent network traffic, I'd like to perform local >>>>>>>> joins on each node instead of exchanging the data and perform a join >>>>>>>> over >>>>>>>> the complete data afterwards. My query is basically a join over three >>>>>>>> three >>>>>>>> tables on an ID attribute. The blocks are perfectly distributed, so >>>>>>>> that >>>>>>>> e.g. Table A - Block 0 and Table B - Block 0 are on the same node. >>>>>>>> These >>>>>>>> blocks contain all data rows with an ID range [0,1]. Table A - Block 1 >>>>>>>> and >>>>>>>> Table B - Block 1 with an ID range [2,3] are on another node etc. So I >>>>>>>> want >>>>>>>> to perform a local join per node because any data exchange would be >>>>>>>> unneccessary (except for the last step when the final node recevieves >>>>>>>> all >>>>>>>> results of the other nodes). Is this possible? >>>>>>>> At the moment the query plan includes multiple data exchanges, >>>>>>>> although the blocks are already perfectly distributed (manually). >>>>>>>> I would be grateful for any help! >>>>>>>> >>>>>>>> Best regards >>>>>>>> Philipp Krause >>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>>> >>>>> >>>> >>>> >>>> >>> >>> >> >
Vagrantfile
Description: Binary data