Thank you for the explanation! Yes, I'm using HDFS. The single replica
setup is only for test purposes at the moment. I think this makes it easier
to gain some first results since less modifications (scheduler etc.) are
I would like to test the DistributedPlanner modification in my virtual
cluster. I used a customized Vagrant script to install Impala on multiple
hosts (s.attachment). It simply installs cloudera-manager-server-db,
cloudera-manager-server and cloudera-manager-daemons via apt-get. What
would be the simplest solution to setup my modified version? Could I simply
call ./ and change the script to sth. like this?

echo "Install java..."
apt-get -q -y --force-yes install oracle-j2sdk1.7
echo "Download impala..."
wget https://... where I uploaded my modified version
echo "Extract impala..."
tar -xvzf Impala-cdh5-trunk.tar.gz
cd Impala-cdh5-trunk
echo "Build impala..."
echo "Start impala instances..."
service cloudera-scm-server-db initdb
service cloudera-scm-server-db start
service cloudera-scm-server start

Or is there another, maybe even easier method, to test the code? Maybe via / minicluster?

Best regards

2018-04-05 18:39 GMT+02:00 Alexander Behm <>:

> Apologies for the late response. Btw, your previous post was clear enough
> to me, so no worries :)
> On Wed, Apr 4, 2018 at 7:46 AM, Philipp Krause <philippkrause.mail@
>> wrote:
>> Hello Alex,
>> I think my previous post has been too long and confusing. I apologize for
>> that!
>> If replicas are completely deactivated, all scan ranges of a block are
>> mapped to the one host, where the block is located on. This host is the
>> "executor"/reader for all the scan ranges of this block. Is that correct?
> Yes, assuming you are using HDFS.
>> I tried to visualize my understanding of the scan_range to host mapping
>> for my use case (s. attachment). Could you please have a quick look at it
>> and tell me if this is correct?
>> "The existing scan range assignment is scan-node centric. For each scan
>> node, we independently decide which of its scan ranges should be processed
>> by which host."
>> Without replicas, all scan ranges of a block would be assigend to the
>> same host where this block is located on. Isn't everything local here, so
>> that Table_A - Block_0 and Table_B - Block_0 can be joined local or are
>> further steps neccessary? The condition in the DistributedPlanner you
>> pointed to me is set to false (no exchange nodes).
>> "You want it to be host-centric. For each host, collect the local scan
>> ranges of *all* scan nodes, and assign them to that host."
>> Wouldn't the standard setup from above work? Wouldn't I assign all (the
>> same) scan ranges to each host in this case here?
> The standard setup works only in if every block only has exactly one
> replica. For our purposes, that is basically never the case (who would
> store production data without replication?), so the single-replica
> assumption was not clear to me.
> Does your current setup (only changing the planner and not the scheduler)
> produce the expected results?
>> Thank you very much!
>> Best regards
>> Philipp
>> 2018-03-28 21:04 GMT+02:00 Philipp Krause <philippkrause.mail@googlemail
>> .com>:
>>> Thank you for your answer and sorry for my delay!
>>> If my understanding is correct, the list of scan nodes consists of all
>>> nodes which contain a *local* block from a table that is needed for the
>>> query (Assumption: I have no replicas in my first tests). If TableA-Block0
>>> is on Node_0, isn't Node_0 automatically a scan node? And wouldn't this
>>> scan node always be the host for the complete scan range(s) then?
>>> "For each scan node, we independently decide which of its scan ranges
>>> should be processed by which host."
>>> heduling/
>>> // Loop over all scan ranges, select an executor for those with local
>>> impalads and
>>> // collect all others for later processing.
>>> So in this whole block, scan ranges are assigned to the closest executor
>>> (=host?). But isn't the closest executor always the node the block is
>>> located on (assumed impalad is installed and I have no replicas)? And isn't
>>> this node always a scan node at the same time? Otherwise a thread on a
>>> remote host had to read the corresponding scan range, which would be more
>>> expensive. The only exception I can think of is when all threads on the
>>> local node are busy. Or, if I use replicas and all other threads of my node
>>> with the "original" block are busy, a thread on another node which contains
>>> a replica could read a special scan range of its local block. Is my
>>> understanding correct here?
>>> Aren't all scan ranges read locally by its scan nodes if I have impalad
>>> installed on all nodes? And am I right, that the scan range is only based
>>> on its length which refers to maxScanRangeLength in
>>> computeScanRangeLocations?
>>> in/java/org/apache/impala/planner/
>>> I hope you can help me with the scan node <-> scan range->host
>>> relationship. If I have Table_A-Block_0 and Table_B_Block_0 on the same
>>> node (which I want to join locally), I don't get the point of why scan
>>> ranges could be assigned to another host in my scenario.
>>> Best regads and thank you very much!
>>> Philipp Krause
>>> Am 21.03.2018 um 05:21 schrieb Alexander Behm:
>>> Thanks for following up. I think I understand your setup.
>>> If you want to not think about scan ranges, then you can modify
>>> HdfsScanNode.computeScanRangeLocations(). For example, you could change
>>> it to produce one scan range per file or per HDFS block. That way you'd
>>> know exactly what a scan range corresponds to.
>>> I think the easiest/fastest way for you to make progress is to
>>> re-implement the existing scan range assignment logic in that place in the
>>> code I had pointed you to. There is no quick fix to change the existing
>>> behavior.
>>> The existing scan range assignment is scan-node centric. For each scan
>>> node, we independently decide which of its scan ranges should be processed
>>> by which host.
>>> I believe an algorithm to achieve your goal would look completely
>>> different. You want it to be host-centric. For each host, collect the local
>>> scan ranges of *all* scan nodes, and assign them to that host.
>>> Does that make sense?
>>> Alex
>>> On Mon, Mar 19, 2018 at 1:02 PM, Philipp Krause <
>>>> wrote:
>>>> I'd like to provide a small example for our purpose. The last post may
>>>> be a bit confusing, so here's a very simple example in the attached pdf
>>>> file. I hope, it's understandable. Otherwise, please give me a short
>>>> feedback.
>>>> Basically, I only want each data node to join all it's local blocks. Is
>>>> there a range mapping needed or is it possible to easily join all local
>>>> blocks (regardless of its content) since everything is already "prepared"?
>>>> Maybe you can clarify this for me.
>>>> As you can see in the example, the tables are not partitioned by ID.
>>>> The files are manually prepared by the help of the modulo function. So I
>>>> don't have a range like [0,10], but something like 0,5,10,15 etc.
>>>> I hope, I didn't make it too complicated and confusing. I think, the
>>>> actual idea behind this is really simple and I hope you can help me to get
>>>> this working.
>>>> Best regards and thank you very much for your time!
>>>> Philipp
>>>> Am 18.03.2018 um 17:32 schrieb Philipp Krause:
>>>> Hi! At the moment the data to parquet (block) mapping is based on a
>>>> simple modulo function: Id % #data_nodes. So with 5 data nodes all rows
>>>> with Id's 0,5,10,... are written to Parquet_0, Id's 1,4,9 are written to
>>>> Parquet_1 etc. That's what I did manually. Since the parquet file size and
>>>> the block size are both set to 64MB, each parquet file will result in one
>>>> block when I transfer the parquet files to HDFS. By default, HDFS
>>>> distributes the blocks randomly. For test purposes I transferred
>>>> corresponding blocks from Table_A and Table_B to the same data node
>>>> (Table_A - Block_X with Id's 0,5,10 and Table_B - Block_Y with Id's
>>>> 0,5,10). In this case, they are transferred to data_node_0 because the
>>>> modulo function (which I want to implement in the scheduler) returns 0 for
>>>> these Id's. This is also done manually at the moment.
>>>> 1.) DistributedPlanner: For first, upcoming tests I simply changed the
>>>> first condition in the DistributedPlanner to true to avoid exchange nodes.
>>>> 2.) The scheduler: That's the part I'm currently struggling with. For
>>>> first tests, block replication is deactivated. I'm not sure how / where to
>>>> implement the modulo function for scan range to host mapping. Without the
>>>> modulo function, I had to implement a hard coded mapping (something like
>>>> "range" 0-0, 5-5, 10-10 -> Data_node_0 etc.). Is that correct? Instead I
>>>> would like to use a slightly more flexible solution by the help of this
>>>> modulo function for the host mapping.
>>>> I would be really grateful if you could give me a hint for the
>>>> scheduling implementation. I try to go deeper through the code meanwhile.
>>>> Best regards and thank you in advance
>>>> Philipp
>>>> Am 14.03.2018 um 08:06 schrieb Philipp Krause:
>>>> Thank you very much for these information! I'll try to implement these
>>>> two steps and post some updates within the next days!
>>>> Best regards
>>>> Philipp
>>>> 2018-03-13 5:38 GMT+01:00 Alexander Behm <>:
>>>>> Cool that you working on a research project with Impala!
>>>>> Properly adding such a feature to Impala is a substantial effort, but
>>>>> hacking the code for an experiment or two seems doable.
>>>>> I think you will need to modify two things: (1) the planner to not add
>>>>> exchange nodes, and (2) the scheduler to assign the co-located scan ranges
>>>>> to the same host.
>>>>> Here are a few starting points in the code:
>>>>> 1) DistributedPlanner
>>>>> a/org/apache/impala/planner/
>>>>> The first condition handles the case where no exchange nodes need to
>>>>> be added because the join inputs are already suitably partitioned.
>>>>> You could hack the code to always go into that codepath, so no
>>>>> exchanges are added.
>>>>> 2) The scheduler
>>>>> ng/
>>>>> You'll need to dig through and understand that code so that you can
>>>>> make the necessary changes. Change the scan range to host mapping to your
>>>>> liking. The rest of the code should just work.
>>>>> Cheers,
>>>>> Alex
>>>>> On Mon, Mar 12, 2018 at 6:55 PM, Philipp Krause <
>>>>>> wrote:
>>>>>> Thank you very much for your quick answers!
>>>>>> The intention behind this is to improve the execution time and
>>>>>> (primarily) to examine the impact of block-co-location (research project)
>>>>>> for this particular query (simplified):
>>>>>> select A.x, B.y, A.z from tableA as A inner join tableB as B on
>>>>>> The "real" query includes three joins and the data size is in
>>>>>> pb-range. Therefore several nodes (5 in the test environment with less
>>>>>> data) are used (without any load balancer).
>>>>>> Could you give me some hints what code changes are required and which
>>>>>> files are affected? I don't know how to give Impala the information that 
>>>>>> it
>>>>>> should only join the local data blocks on each node and then pass it to 
>>>>>> the
>>>>>> "final" node which receives all intermediate results. I hope you can help
>>>>>> me to get this working. That would be awesome!
>>>>>> Best regards
>>>>>> Philipp
>>>>>> Am 12.03.2018 um 18:38 schrieb Alexander Behm:
>>>>>> I suppose one exception is if your data lives only on a single node.
>>>>>> Then you can set num_nodes=1 and make sure to send the query request to 
>>>>>> the
>>>>>> impalad running on the same data node as the target data. Then you should
>>>>>> get a local join.
>>>>>> On Mon, Mar 12, 2018 at 9:30 AM, Alexander Behm <
>>>>>>> wrote:
>>>>>>> Such a specific block arrangement is very uncommon for typical
>>>>>>> Impala setups, so we don't attempt to recognize and optimize this narrow
>>>>>>> case. In particular, such an arrangement tends to be short lived if you
>>>>>>> have the HDFS balancer turned on.
>>>>>>> Without making code changes, there is no way today to remove the
>>>>>>> data exchanges and make sure that the scheduler assigns scan splits to
>>>>>>> nodes in the desired way (co-located, but with possible load imbalance).
>>>>>>> In what way is the current setup unacceptable to you? Is this
>>>>>>> pre-mature optimization? If you have certain performance
>>>>>>> expectations/requirements for specific queries we might be able to help 
>>>>>>> you
>>>>>>> improve those. If you want to pursue this route, please help us by 
>>>>>>> posting
>>>>>>> complete query profiles.
>>>>>>> Alex
>>>>>>> On Mon, Mar 12, 2018 at 6:29 AM, Philipp Krause <
>>>>>>>> wrote:
>>>>>>>> Hello everyone!
>>>>>>>> In order to prevent network traffic, I'd like to perform local
>>>>>>>> joins on each node instead of exchanging the data and perform a join 
>>>>>>>> over
>>>>>>>> the complete data afterwards. My query is basically a join over three 
>>>>>>>> three
>>>>>>>> tables on an ID attribute. The blocks are perfectly distributed, so 
>>>>>>>> that
>>>>>>>> e.g. Table A - Block 0  and Table B - Block 0  are on the same node. 
>>>>>>>> These
>>>>>>>> blocks contain all data rows with an ID range [0,1]. Table A - Block 1 
>>>>>>>>  and
>>>>>>>> Table B - Block 1 with an ID range [2,3] are on another node etc. So I 
>>>>>>>> want
>>>>>>>> to perform a local join per node because any data exchange would be
>>>>>>>> unneccessary (except for the last step when the final node recevieves 
>>>>>>>> all
>>>>>>>> results of the other nodes). Is this possible?
>>>>>>>> At the moment the query plan includes multiple data exchanges,
>>>>>>>> although the blocks are already perfectly distributed (manually).
>>>>>>>> I would be grateful for any help!
>>>>>>>> Best regards
>>>>>>>> Philipp Krause

Attachment: Vagrantfile
Description: Binary data

Reply via email to