I'm not really following your installation/setup and am not an expert on
Cloudera Manager installation/config. If you are going to build Impala
anyway, it's probably easiest to test on Impala's minicluster first.

In general, if you have a running Cloudera Managed cluster, you can deploy
a custom Impala build by simply overwriting the Impala existing binaries
and jars with the new build. If you want to go this route, I can give you a
full list of files you need to replace.

On Tue, Apr 10, 2018 at 11:44 AM, Philipp Krause <
philippkrause.m...@googlemail.com> wrote:

> Thank you for the explanation! Yes, I'm using HDFS. The single replica
> setup is only for test purposes at the moment. I think this makes it easier
> to gain some first results since less modifications (scheduler etc.) are
> neccessary.
> I would like to test the DistributedPlanner modification in my virtual
> cluster. I used a customized Vagrant script to install Impala on multiple
> hosts (s.attachment). It simply installs cloudera-manager-server-db,
> cloudera-manager-server and cloudera-manager-daemons via apt-get. What
> would be the simplest solution to setup my modified version? Could I simply
> call ./buildall.sh and change the script to sth. like this?
> echo "Install java..."
> apt-get -q -y --force-yes install oracle-j2sdk1.7
> echo "Download impala..."
> wget https://... where I uploaded my modified version
> echo "Extract impala..."
> tar -xvzf Impala-cdh5-trunk.tar.gz
> cd Impala-cdh5-trunk
> echo "Build impala..."
> ./buildall.sh
> echo "Start impala instances..."
> service cloudera-scm-server-db initdb
> service cloudera-scm-server-db start
> service cloudera-scm-server start
> Or is there another, maybe even easier method, to test the code? Maybe via
> bootstrap_development.sh / minicluster?
> Best regards
> Philipp
> 2018-04-05 18:39 GMT+02:00 Alexander Behm <alex.b...@cloudera.com>:
>> Apologies for the late response. Btw, your previous post was clear enough
>> to me, so no worries :)
>> On Wed, Apr 4, 2018 at 7:46 AM, Philipp Krause <
>> philippkrause.m...@googlemail.com> wrote:
>>> Hello Alex,
>>> I think my previous post has been too long and confusing. I apologize
>>> for that!
>>> If replicas are completely deactivated, all scan ranges of a block are
>>> mapped to the one host, where the block is located on. This host is the
>>> "executor"/reader for all the scan ranges of this block. Is that correct?
>> Yes, assuming you are using HDFS.
>>> I tried to visualize my understanding of the scan_range to host mapping
>>> for my use case (s. attachment). Could you please have a quick look at it
>>> and tell me if this is correct?
>>> "The existing scan range assignment is scan-node centric. For each scan
>>> node, we independently decide which of its scan ranges should be processed
>>> by which host."
>>> Without replicas, all scan ranges of a block would be assigend to the
>>> same host where this block is located on. Isn't everything local here, so
>>> that Table_A - Block_0 and Table_B - Block_0 can be joined local or are
>>> further steps neccessary? The condition in the DistributedPlanner you
>>> pointed to me is set to false (no exchange nodes).
>>> "You want it to be host-centric. For each host, collect the local scan
>>> ranges of *all* scan nodes, and assign them to that host."
>>> Wouldn't the standard setup from above work? Wouldn't I assign all (the
>>> same) scan ranges to each host in this case here?
>> The standard setup works only in if every block only has exactly one
>> replica. For our purposes, that is basically never the case (who would
>> store production data without replication?), so the single-replica
>> assumption was not clear to me.
>> Does your current setup (only changing the planner and not the scheduler)
>> produce the expected results?
>>> Thank you very much!
>>> Best regards
>>> Philipp
>>> 2018-03-28 21:04 GMT+02:00 Philipp Krause <philippkrause.mail@googlemail
>>> .com>:
>>>> Thank you for your answer and sorry for my delay!
>>>> If my understanding is correct, the list of scan nodes consists of all
>>>> nodes which contain a *local* block from a table that is needed for the
>>>> query (Assumption: I have no replicas in my first tests). If TableA-Block0
>>>> is on Node_0, isn't Node_0 automatically a scan node? And wouldn't this
>>>> scan node always be the host for the complete scan range(s) then?
>>>> "For each scan node, we independently decide which of its scan ranges
>>>> should be processed by which host."
>>>> https://github.com/cloudera/Impala/blob/cdh5-trunk/be/src/sc
>>>> heduling/scheduler.cc#L532
>>>> // Loop over all scan ranges, select an executor for those with local
>>>> impalads and
>>>> // collect all others for later processing.
>>>> So in this whole block, scan ranges are assigned to the closest
>>>> executor (=host?). But isn't the closest executor always the node the block
>>>> is located on (assumed impalad is installed and I have no replicas)? And
>>>> isn't this node always a scan node at the same time? Otherwise a thread on
>>>> a remote host had to read the corresponding scan range, which would be more
>>>> expensive. The only exception I can think of is when all threads on the
>>>> local node are busy. Or, if I use replicas and all other threads of my node
>>>> with the "original" block are busy, a thread on another node which contains
>>>> a replica could read a special scan range of its local block. Is my
>>>> understanding correct here?
>>>> Aren't all scan ranges read locally by its scan nodes if I have impalad
>>>> installed on all nodes? And am I right, that the scan range is only based
>>>> on its length which refers to maxScanRangeLength in
>>>> computeScanRangeLocations?
>>>> https://github.com/cloudera/Impala/blob/cdh5-trunk/fe/src/ma
>>>> in/java/org/apache/impala/planner/HdfsScanNode.java#L721
>>>> I hope you can help me with the scan node <-> scan range->host
>>>> relationship. If I have Table_A-Block_0 and Table_B_Block_0 on the same
>>>> node (which I want to join locally), I don't get the point of why scan
>>>> ranges could be assigned to another host in my scenario.
>>>> Best regads and thank you very much!
>>>> Philipp Krause
>>>> Am 21.03.2018 um 05:21 schrieb Alexander Behm:
>>>> Thanks for following up. I think I understand your setup.
>>>> If you want to not think about scan ranges, then you can modify
>>>> HdfsScanNode.computeScanRangeLocations(). For example, you could
>>>> change it to produce one scan range per file or per HDFS block. That way
>>>> you'd know exactly what a scan range corresponds to.
>>>> I think the easiest/fastest way for you to make progress is to
>>>> re-implement the existing scan range assignment logic in that place in the
>>>> code I had pointed you to. There is no quick fix to change the existing
>>>> behavior.
>>>> The existing scan range assignment is scan-node centric. For each scan
>>>> node, we independently decide which of its scan ranges should be processed
>>>> by which host.
>>>> I believe an algorithm to achieve your goal would look completely
>>>> different. You want it to be host-centric. For each host, collect the local
>>>> scan ranges of *all* scan nodes, and assign them to that host.
>>>> Does that make sense?
>>>> Alex
>>>> On Mon, Mar 19, 2018 at 1:02 PM, Philipp Krause <
>>>> philippkrause.m...@googlemail.com> wrote:
>>>>> I'd like to provide a small example for our purpose. The last post may
>>>>> be a bit confusing, so here's a very simple example in the attached pdf
>>>>> file. I hope, it's understandable. Otherwise, please give me a short
>>>>> feedback.
>>>>> Basically, I only want each data node to join all it's local blocks.
>>>>> Is there a range mapping needed or is it possible to easily join all local
>>>>> blocks (regardless of its content) since everything is already "prepared"?
>>>>> Maybe you can clarify this for me.
>>>>> As you can see in the example, the tables are not partitioned by ID.
>>>>> The files are manually prepared by the help of the modulo function. So I
>>>>> don't have a range like [0,10], but something like 0,5,10,15 etc.
>>>>> I hope, I didn't make it too complicated and confusing. I think, the
>>>>> actual idea behind this is really simple and I hope you can help me to get
>>>>> this working.
>>>>> Best regards and thank you very much for your time!
>>>>> Philipp
>>>>> Am 18.03.2018 um 17:32 schrieb Philipp Krause:
>>>>> Hi! At the moment the data to parquet (block) mapping is based on a
>>>>> simple modulo function: Id % #data_nodes. So with 5 data nodes all rows
>>>>> with Id's 0,5,10,... are written to Parquet_0, Id's 1,4,9 are written to
>>>>> Parquet_1 etc. That's what I did manually. Since the parquet file size and
>>>>> the block size are both set to 64MB, each parquet file will result in one
>>>>> block when I transfer the parquet files to HDFS. By default, HDFS
>>>>> distributes the blocks randomly. For test purposes I transferred
>>>>> corresponding blocks from Table_A and Table_B to the same data node
>>>>> (Table_A - Block_X with Id's 0,5,10 and Table_B - Block_Y with Id's
>>>>> 0,5,10). In this case, they are transferred to data_node_0 because the
>>>>> modulo function (which I want to implement in the scheduler) returns 0 for
>>>>> these Id's. This is also done manually at the moment.
>>>>> 1.) DistributedPlanner: For first, upcoming tests I simply changed the
>>>>> first condition in the DistributedPlanner to true to avoid exchange nodes.
>>>>> 2.) The scheduler: That's the part I'm currently struggling with. For
>>>>> first tests, block replication is deactivated. I'm not sure how / where to
>>>>> implement the modulo function for scan range to host mapping. Without the
>>>>> modulo function, I had to implement a hard coded mapping (something like
>>>>> "range" 0-0, 5-5, 10-10 -> Data_node_0 etc.). Is that correct? Instead I
>>>>> would like to use a slightly more flexible solution by the help of this
>>>>> modulo function for the host mapping.
>>>>> I would be really grateful if you could give me a hint for the
>>>>> scheduling implementation. I try to go deeper through the code meanwhile.
>>>>> Best regards and thank you in advance
>>>>> Philipp
>>>>> Am 14.03.2018 um 08:06 schrieb Philipp Krause:
>>>>> Thank you very much for these information! I'll try to implement these
>>>>> two steps and post some updates within the next days!
>>>>> Best regards
>>>>> Philipp
>>>>> 2018-03-13 5:38 GMT+01:00 Alexander Behm <alex.b...@cloudera.com>:
>>>>>> Cool that you working on a research project with Impala!
>>>>>> Properly adding such a feature to Impala is a substantial effort, but
>>>>>> hacking the code for an experiment or two seems doable.
>>>>>> I think you will need to modify two things: (1) the planner to not
>>>>>> add exchange nodes, and (2) the scheduler to assign the co-located scan
>>>>>> ranges to the same host.
>>>>>> Here are a few starting points in the code:
>>>>>> 1) DistributedPlanner
>>>>>> https://github.com/apache/impala/blob/master/fe/src/main/jav
>>>>>> a/org/apache/impala/planner/DistributedPlanner.java#L318
>>>>>> The first condition handles the case where no exchange nodes need to
>>>>>> be added because the join inputs are already suitably partitioned.
>>>>>> You could hack the code to always go into that codepath, so no
>>>>>> exchanges are added.
>>>>>> 2) The scheduler
>>>>>> https://github.com/apache/impala/blob/master/be/src/scheduli
>>>>>> ng/scheduler.cc#L226
>>>>>> You'll need to dig through and understand that code so that you can
>>>>>> make the necessary changes. Change the scan range to host mapping to your
>>>>>> liking. The rest of the code should just work.
>>>>>> Cheers,
>>>>>> Alex
>>>>>> On Mon, Mar 12, 2018 at 6:55 PM, Philipp Krause <
>>>>>> philippkrause.m...@googlemail.com> wrote:
>>>>>>> Thank you very much for your quick answers!
>>>>>>> The intention behind this is to improve the execution time and
>>>>>>> (primarily) to examine the impact of block-co-location (research 
>>>>>>> project)
>>>>>>> for this particular query (simplified):
>>>>>>> select A.x, B.y, A.z from tableA as A inner join tableB as B on
>>>>>>> A.id=B.id
>>>>>>> The "real" query includes three joins and the data size is in
>>>>>>> pb-range. Therefore several nodes (5 in the test environment with less
>>>>>>> data) are used (without any load balancer).
>>>>>>> Could you give me some hints what code changes are required and
>>>>>>> which files are affected? I don't know how to give Impala the 
>>>>>>> information
>>>>>>> that it should only join the local data blocks on each node and then 
>>>>>>> pass
>>>>>>> it to the "final" node which receives all intermediate results. I hope 
>>>>>>> you
>>>>>>> can help me to get this working. That would be awesome!
>>>>>>> Best regards
>>>>>>> Philipp
>>>>>>> Am 12.03.2018 um 18:38 schrieb Alexander Behm:
>>>>>>> I suppose one exception is if your data lives only on a single node.
>>>>>>> Then you can set num_nodes=1 and make sure to send the query request to 
>>>>>>> the
>>>>>>> impalad running on the same data node as the target data. Then you 
>>>>>>> should
>>>>>>> get a local join.
>>>>>>> On Mon, Mar 12, 2018 at 9:30 AM, Alexander Behm <
>>>>>>> alex.b...@cloudera.com> wrote:
>>>>>>>> Such a specific block arrangement is very uncommon for typical
>>>>>>>> Impala setups, so we don't attempt to recognize and optimize this 
>>>>>>>> narrow
>>>>>>>> case. In particular, such an arrangement tends to be short lived if you
>>>>>>>> have the HDFS balancer turned on.
>>>>>>>> Without making code changes, there is no way today to remove the
>>>>>>>> data exchanges and make sure that the scheduler assigns scan splits to
>>>>>>>> nodes in the desired way (co-located, but with possible load 
>>>>>>>> imbalance).
>>>>>>>> In what way is the current setup unacceptable to you? Is this
>>>>>>>> pre-mature optimization? If you have certain performance
>>>>>>>> expectations/requirements for specific queries we might be able to 
>>>>>>>> help you
>>>>>>>> improve those. If you want to pursue this route, please help us by 
>>>>>>>> posting
>>>>>>>> complete query profiles.
>>>>>>>> Alex
>>>>>>>> On Mon, Mar 12, 2018 at 6:29 AM, Philipp Krause <
>>>>>>>> philippkrause.m...@googlemail.com> wrote:
>>>>>>>>> Hello everyone!
>>>>>>>>> In order to prevent network traffic, I'd like to perform local
>>>>>>>>> joins on each node instead of exchanging the data and perform a join 
>>>>>>>>> over
>>>>>>>>> the complete data afterwards. My query is basically a join over three 
>>>>>>>>> three
>>>>>>>>> tables on an ID attribute. The blocks are perfectly distributed, so 
>>>>>>>>> that
>>>>>>>>> e.g. Table A - Block 0  and Table B - Block 0  are on the same node. 
>>>>>>>>> These
>>>>>>>>> blocks contain all data rows with an ID range [0,1]. Table A - Block 
>>>>>>>>> 1  and
>>>>>>>>> Table B - Block 1 with an ID range [2,3] are on another node etc. So 
>>>>>>>>> I want
>>>>>>>>> to perform a local join per node because any data exchange would be
>>>>>>>>> unneccessary (except for the last step when the final node recevieves 
>>>>>>>>> all
>>>>>>>>> results of the other nodes). Is this possible?
>>>>>>>>> At the moment the query plan includes multiple data exchanges,
>>>>>>>>> although the blocks are already perfectly distributed (manually).
>>>>>>>>> I would be grateful for any help!
>>>>>>>>> Best regards
>>>>>>>>> Philipp Krause

Reply via email to