Here's the foll list. It might not be minimal, but copying/overwriting
these should work.

debug/service/impalad
debug/service/libfesupport.so
debug/service/libService.a
release/service/impalad
release/service/libfesupport.so
release/service/libService.a
yarn-extras-0.1-SNAPSHOT.jar
impala-data-source-api-1.0-SNAPSHOT-sources.jar
impala-data-source-api-1.0-SNAPSHOT.jar
impala-frontend-0.1-SNAPSHOT-tests.jar
impala-frontend-0.1-SNAPSHOT.jar
libkudu_client.so.0.1.0
libstdc++.so.6.0.20
impala-no-sse.bc
impala-sse.bc
libimpalalzo.so

If you are only modifying the Java portion (like DistributedPlanner), then
only copying/replacing the *.jar files should be sufficient.

On Fri, Apr 13, 2018 at 11:00 AM, Philipp Krause <
philippkrause.m...@googlemail.com> wrote:

> Yes, I have a running (virtual) cluster. I would try to follow your way
> with the custom impala build (DistributedPlanner.java is the only modified
> file at the moment). Thank you in advance for the file list!
>
> Best regards
> Philipp
>
> Alexander Behm <alex.b...@cloudera.com> schrieb am Fr., 13. Apr. 2018,
> 18:45:
>
>> I'm not really following your installation/setup and am not an expert on
>> Cloudera Manager installation/config. If you are going to build Impala
>> anyway, it's probably easiest to test on Impala's minicluster first.
>>
>> In general, if you have a running Cloudera Managed cluster, you can
>> deploy a custom Impala build by simply overwriting the Impala existing
>> binaries and jars with the new build. If you want to go this route, I can
>> give you a full list of files you need to replace.
>>
>> On Tue, Apr 10, 2018 at 11:44 AM, Philipp Krause <philippkrause.mail@
>> googlemail.com> wrote:
>>
>>> Thank you for the explanation! Yes, I'm using HDFS. The single replica
>>> setup is only for test purposes at the moment. I think this makes it easier
>>> to gain some first results since less modifications (scheduler etc.) are
>>> neccessary.
>>> I would like to test the DistributedPlanner modification in my virtual
>>> cluster. I used a customized Vagrant script to install Impala on multiple
>>> hosts (s.attachment). It simply installs cloudera-manager-server-db,
>>> cloudera-manager-server and cloudera-manager-daemons via apt-get. What
>>> would be the simplest solution to setup my modified version? Could I simply
>>> call ./buildall.sh and change the script to sth. like this?
>>>
>>> echo "Install java..."
>>> apt-get -q -y --force-yes install oracle-j2sdk1.7
>>> echo "Download impala..."
>>> wget https://... where I uploaded my modified version
>>> echo "Extract impala..."
>>> tar -xvzf Impala-cdh5-trunk.tar.gz
>>> cd Impala-cdh5-trunk
>>> echo "Build impala..."
>>> ./buildall.sh
>>> echo "Start impala instances..."
>>> service cloudera-scm-server-db initdb
>>> service cloudera-scm-server-db start
>>> service cloudera-scm-server start
>>>
>>> Or is there another, maybe even easier method, to test the code? Maybe
>>> via bootstrap_development.sh / minicluster?
>>>
>>> Best regards
>>> Philipp
>>>
>>> 2018-04-05 18:39 GMT+02:00 Alexander Behm <alex.b...@cloudera.com>:
>>>
>>>> Apologies for the late response. Btw, your previous post was clear
>>>> enough to me, so no worries :)
>>>>
>>>>
>>>> On Wed, Apr 4, 2018 at 7:46 AM, Philipp Krause <philippkrause.mail@
>>>> googlemail.com> wrote:
>>>>
>>>>> Hello Alex,
>>>>>
>>>>> I think my previous post has been too long and confusing. I apologize
>>>>> for that!
>>>>>
>>>>> If replicas are completely deactivated, all scan ranges of a block are
>>>>> mapped to the one host, where the block is located on. This host is the
>>>>> "executor"/reader for all the scan ranges of this block. Is that correct?
>>>>>
>>>>
>>>> Yes, assuming you are using HDFS.
>>>>
>>>>
>>>>>
>>>>> I tried to visualize my understanding of the scan_range to host
>>>>> mapping for my use case (s. attachment). Could you please have a quick 
>>>>> look
>>>>> at it and tell me if this is correct?
>>>>>
>>>>> "The existing scan range assignment is scan-node centric. For each
>>>>> scan node, we independently decide which of its scan ranges should be
>>>>> processed by which host."
>>>>> Without replicas, all scan ranges of a block would be assigend to the
>>>>> same host where this block is located on. Isn't everything local here, so
>>>>> that Table_A - Block_0 and Table_B - Block_0 can be joined local or are
>>>>> further steps neccessary? The condition in the DistributedPlanner you
>>>>> pointed to me is set to false (no exchange nodes).
>>>>>
>>>>> "You want it to be host-centric. For each host, collect the local scan
>>>>> ranges of *all* scan nodes, and assign them to that host."
>>>>> Wouldn't the standard setup from above work? Wouldn't I assign all
>>>>> (the same) scan ranges to each host in this case here?
>>>>>
>>>>
>>>> The standard setup works only in if every block only has exactly one
>>>> replica. For our purposes, that is basically never the case (who would
>>>> store production data without replication?), so the single-replica
>>>> assumption was not clear to me.
>>>>
>>>> Does your current setup (only changing the planner and not the
>>>> scheduler) produce the expected results?
>>>>
>>>>
>>>>>
>>>>> Thank you very much!
>>>>>
>>>>> Best regards
>>>>> Philipp
>>>>>
>>>>> 2018-03-28 21:04 GMT+02:00 Philipp Krause <philippkrause.mail@
>>>>> googlemail.com>:
>>>>>
>>>>>> Thank you for your answer and sorry for my delay!
>>>>>>
>>>>>> If my understanding is correct, the list of scan nodes consists of
>>>>>> all nodes which contain a *local* block from a table that is needed for 
>>>>>> the
>>>>>> query (Assumption: I have no replicas in my first tests). If 
>>>>>> TableA-Block0
>>>>>> is on Node_0, isn't Node_0 automatically a scan node? And wouldn't this
>>>>>> scan node always be the host for the complete scan range(s) then?
>>>>>>
>>>>>> "For each scan node, we independently decide which of its scan ranges
>>>>>> should be processed by which host."
>>>>>>
>>>>>> https://github.com/cloudera/Impala/blob/cdh5-trunk/be/src/
>>>>>> scheduling/scheduler.cc#L532
>>>>>> // Loop over all scan ranges, select an executor for those with local
>>>>>> impalads and
>>>>>> // collect all others for later processing.
>>>>>>
>>>>>> So in this whole block, scan ranges are assigned to the closest
>>>>>> executor (=host?). But isn't the closest executor always the node the 
>>>>>> block
>>>>>> is located on (assumed impalad is installed and I have no replicas)? And
>>>>>> isn't this node always a scan node at the same time? Otherwise a thread 
>>>>>> on
>>>>>> a remote host had to read the corresponding scan range, which would be 
>>>>>> more
>>>>>> expensive. The only exception I can think of is when all threads on the
>>>>>> local node are busy. Or, if I use replicas and all other threads of my 
>>>>>> node
>>>>>> with the "original" block are busy, a thread on another node which 
>>>>>> contains
>>>>>> a replica could read a special scan range of its local block. Is my
>>>>>> understanding correct here?
>>>>>>
>>>>>> Aren't all scan ranges read locally by its scan nodes if I have
>>>>>> impalad installed on all nodes? And am I right, that the scan range is 
>>>>>> only
>>>>>> based on its length which refers to maxScanRangeLength in
>>>>>> computeScanRangeLocations?
>>>>>> https://github.com/cloudera/Impala/blob/cdh5-trunk/fe/src/
>>>>>> main/java/org/apache/impala/planner/HdfsScanNode.java#L721
>>>>>>
>>>>>> I hope you can help me with the scan node <-> scan range->host
>>>>>> relationship. If I have Table_A-Block_0 and Table_B_Block_0 on the same
>>>>>> node (which I want to join locally), I don't get the point of why scan
>>>>>> ranges could be assigned to another host in my scenario.
>>>>>>
>>>>>> Best regads and thank you very much!
>>>>>> Philipp Krause
>>>>>>
>>>>>> Am 21.03.2018 um 05:21 schrieb Alexander Behm:
>>>>>>
>>>>>> Thanks for following up. I think I understand your setup.
>>>>>>
>>>>>> If you want to not think about scan ranges, then you can modify
>>>>>> HdfsScanNode.computeScanRangeLocations(). For example, you could
>>>>>> change it to produce one scan range per file or per HDFS block. That way
>>>>>> you'd know exactly what a scan range corresponds to.
>>>>>>
>>>>>> I think the easiest/fastest way for you to make progress is to
>>>>>> re-implement the existing scan range assignment logic in that place in 
>>>>>> the
>>>>>> code I had pointed you to. There is no quick fix to change the existing
>>>>>> behavior.
>>>>>> The existing scan range assignment is scan-node centric. For each
>>>>>> scan node, we independently decide which of its scan ranges should be
>>>>>> processed by which host.
>>>>>>
>>>>>> I believe an algorithm to achieve your goal would look completely
>>>>>> different. You want it to be host-centric. For each host, collect the 
>>>>>> local
>>>>>> scan ranges of *all* scan nodes, and assign them to that host.
>>>>>>
>>>>>> Does that make sense?
>>>>>>
>>>>>> Alex
>>>>>>
>>>>>>
>>>>>> On Mon, Mar 19, 2018 at 1:02 PM, Philipp Krause <philippkrause.mail@
>>>>>> googlemail.com> wrote:
>>>>>>
>>>>>>> I'd like to provide a small example for our purpose. The last post
>>>>>>> may be a bit confusing, so here's a very simple example in the attached 
>>>>>>> pdf
>>>>>>> file. I hope, it's understandable. Otherwise, please give me a short
>>>>>>> feedback.
>>>>>>>
>>>>>>> Basically, I only want each data node to join all it's local blocks.
>>>>>>> Is there a range mapping needed or is it possible to easily join all 
>>>>>>> local
>>>>>>> blocks (regardless of its content) since everything is already 
>>>>>>> "prepared"?
>>>>>>> Maybe you can clarify this for me.
>>>>>>>
>>>>>>> As you can see in the example, the tables are not partitioned by ID.
>>>>>>> The files are manually prepared by the help of the modulo function. So I
>>>>>>> don't have a range like [0,10], but something like 0,5,10,15 etc.
>>>>>>>
>>>>>>> I hope, I didn't make it too complicated and confusing. I think, the
>>>>>>> actual idea behind this is really simple and I hope you can help me to 
>>>>>>> get
>>>>>>> this working.
>>>>>>>
>>>>>>> Best regards and thank you very much for your time!
>>>>>>> Philipp
>>>>>>>
>>>>>>> Am 18.03.2018 um 17:32 schrieb Philipp Krause:
>>>>>>>
>>>>>>> Hi! At the moment the data to parquet (block) mapping is based on a
>>>>>>> simple modulo function: Id % #data_nodes. So with 5 data nodes all rows
>>>>>>> with Id's 0,5,10,... are written to Parquet_0, Id's 1,4,9 are written to
>>>>>>> Parquet_1 etc. That's what I did manually. Since the parquet file size 
>>>>>>> and
>>>>>>> the block size are both set to 64MB, each parquet file will result in 
>>>>>>> one
>>>>>>> block when I transfer the parquet files to HDFS. By default, HDFS
>>>>>>> distributes the blocks randomly. For test purposes I transferred
>>>>>>> corresponding blocks from Table_A and Table_B to the same data node
>>>>>>> (Table_A - Block_X with Id's 0,5,10 and Table_B - Block_Y with Id's
>>>>>>> 0,5,10). In this case, they are transferred to data_node_0 because the
>>>>>>> modulo function (which I want to implement in the scheduler) returns 0 
>>>>>>> for
>>>>>>> these Id's. This is also done manually at the moment.
>>>>>>>
>>>>>>> 1.) DistributedPlanner: For first, upcoming tests I simply changed
>>>>>>> the first condition in the DistributedPlanner to true to avoid exchange
>>>>>>> nodes.
>>>>>>>
>>>>>>> 2.) The scheduler: That's the part I'm currently struggling with.
>>>>>>> For first tests, block replication is deactivated. I'm not sure how / 
>>>>>>> where
>>>>>>> to implement the modulo function for scan range to host mapping. Without
>>>>>>> the modulo function, I had to implement a hard coded mapping (something
>>>>>>> like "range" 0-0, 5-5, 10-10 -> Data_node_0 etc.). Is that correct? 
>>>>>>> Instead
>>>>>>> I would like to use a slightly more flexible solution by the help of 
>>>>>>> this
>>>>>>> modulo function for the host mapping.
>>>>>>>
>>>>>>> I would be really grateful if you could give me a hint for the
>>>>>>> scheduling implementation. I try to go deeper through the code 
>>>>>>> meanwhile.
>>>>>>>
>>>>>>> Best regards and thank you in advance
>>>>>>> Philipp
>>>>>>>
>>>>>>>
>>>>>>> Am 14.03.2018 um 08:06 schrieb Philipp Krause:
>>>>>>>
>>>>>>> Thank you very much for these information! I'll try to implement
>>>>>>> these two steps and post some updates within the next days!
>>>>>>>
>>>>>>> Best regards
>>>>>>> Philipp
>>>>>>>
>>>>>>> 2018-03-13 5:38 GMT+01:00 Alexander Behm <alex.b...@cloudera.com>:
>>>>>>>
>>>>>>>> Cool that you working on a research project with Impala!
>>>>>>>>
>>>>>>>> Properly adding such a feature to Impala is a substantial effort,
>>>>>>>> but hacking the code for an experiment or two seems doable.
>>>>>>>>
>>>>>>>> I think you will need to modify two things: (1) the planner to not
>>>>>>>> add exchange nodes, and (2) the scheduler to assign the co-located scan
>>>>>>>> ranges to the same host.
>>>>>>>>
>>>>>>>> Here are a few starting points in the code:
>>>>>>>>
>>>>>>>> 1) DistributedPlanner
>>>>>>>> https://github.com/apache/impala/blob/master/fe/src/
>>>>>>>> main/java/org/apache/impala/planner/DistributedPlanner.java#L318
>>>>>>>>
>>>>>>>> The first condition handles the case where no exchange nodes need
>>>>>>>> to be added because the join inputs are already suitably partitioned.
>>>>>>>> You could hack the code to always go into that codepath, so no
>>>>>>>> exchanges are added.
>>>>>>>>
>>>>>>>> 2) The scheduler
>>>>>>>> https://github.com/apache/impala/blob/master/be/src/
>>>>>>>> scheduling/scheduler.cc#L226
>>>>>>>>
>>>>>>>> You'll need to dig through and understand that code so that you can
>>>>>>>> make the necessary changes. Change the scan range to host mapping to 
>>>>>>>> your
>>>>>>>> liking. The rest of the code should just work.
>>>>>>>>
>>>>>>>> Cheers,
>>>>>>>>
>>>>>>>> Alex
>>>>>>>>
>>>>>>>>
>>>>>>>> On Mon, Mar 12, 2018 at 6:55 PM, Philipp Krause <
>>>>>>>> philippkrause.m...@googlemail.com> wrote:
>>>>>>>>
>>>>>>>>> Thank you very much for your quick answers!
>>>>>>>>> The intention behind this is to improve the execution time and
>>>>>>>>> (primarily) to examine the impact of block-co-location (research 
>>>>>>>>> project)
>>>>>>>>> for this particular query (simplified):
>>>>>>>>>
>>>>>>>>> select A.x, B.y, A.z from tableA as A inner join tableB as B on
>>>>>>>>> A.id=B.id
>>>>>>>>>
>>>>>>>>> The "real" query includes three joins and the data size is in
>>>>>>>>> pb-range. Therefore several nodes (5 in the test environment with less
>>>>>>>>> data) are used (without any load balancer).
>>>>>>>>>
>>>>>>>>> Could you give me some hints what code changes are required and
>>>>>>>>> which files are affected? I don't know how to give Impala the 
>>>>>>>>> information
>>>>>>>>> that it should only join the local data blocks on each node and then 
>>>>>>>>> pass
>>>>>>>>> it to the "final" node which receives all intermediate results. I 
>>>>>>>>> hope you
>>>>>>>>> can help me to get this working. That would be awesome!
>>>>>>>>> Best regards
>>>>>>>>> Philipp
>>>>>>>>>
>>>>>>>>> Am 12.03.2018 um 18:38 schrieb Alexander Behm:
>>>>>>>>>
>>>>>>>>> I suppose one exception is if your data lives only on a single
>>>>>>>>> node. Then you can set num_nodes=1 and make sure to send the query 
>>>>>>>>> request
>>>>>>>>> to the impalad running on the same data node as the target data. Then 
>>>>>>>>> you
>>>>>>>>> should get a local join.
>>>>>>>>>
>>>>>>>>> On Mon, Mar 12, 2018 at 9:30 AM, Alexander Behm <
>>>>>>>>> alex.b...@cloudera.com> wrote:
>>>>>>>>>
>>>>>>>>>> Such a specific block arrangement is very uncommon for typical
>>>>>>>>>> Impala setups, so we don't attempt to recognize and optimize this 
>>>>>>>>>> narrow
>>>>>>>>>> case. In particular, such an arrangement tends to be short lived if 
>>>>>>>>>> you
>>>>>>>>>> have the HDFS balancer turned on.
>>>>>>>>>>
>>>>>>>>>> Without making code changes, there is no way today to remove the
>>>>>>>>>> data exchanges and make sure that the scheduler assigns scan splits 
>>>>>>>>>> to
>>>>>>>>>> nodes in the desired way (co-located, but with possible load 
>>>>>>>>>> imbalance).
>>>>>>>>>>
>>>>>>>>>> In what way is the current setup unacceptable to you? Is this
>>>>>>>>>> pre-mature optimization? If you have certain performance
>>>>>>>>>> expectations/requirements for specific queries we might be able to 
>>>>>>>>>> help you
>>>>>>>>>> improve those. If you want to pursue this route, please help us by 
>>>>>>>>>> posting
>>>>>>>>>> complete query profiles.
>>>>>>>>>>
>>>>>>>>>> Alex
>>>>>>>>>>
>>>>>>>>>> On Mon, Mar 12, 2018 at 6:29 AM, Philipp Krause <
>>>>>>>>>> philippkrause.m...@googlemail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hello everyone!
>>>>>>>>>>>
>>>>>>>>>>> In order to prevent network traffic, I'd like to perform local
>>>>>>>>>>> joins on each node instead of exchanging the data and perform a 
>>>>>>>>>>> join over
>>>>>>>>>>> the complete data afterwards. My query is basically a join over 
>>>>>>>>>>> three three
>>>>>>>>>>> tables on an ID attribute. The blocks are perfectly distributed, so 
>>>>>>>>>>> that
>>>>>>>>>>> e.g. Table A - Block 0  and Table B - Block 0  are on the same 
>>>>>>>>>>> node. These
>>>>>>>>>>> blocks contain all data rows with an ID range [0,1]. Table A - 
>>>>>>>>>>> Block 1  and
>>>>>>>>>>> Table B - Block 1 with an ID range [2,3] are on another node etc. 
>>>>>>>>>>> So I want
>>>>>>>>>>> to perform a local join per node because any data exchange would be
>>>>>>>>>>> unneccessary (except for the last step when the final node 
>>>>>>>>>>> recevieves all
>>>>>>>>>>> results of the other nodes). Is this possible?
>>>>>>>>>>> At the moment the query plan includes multiple data exchanges,
>>>>>>>>>>> although the blocks are already perfectly distributed (manually).
>>>>>>>>>>> I would be grateful for any help!
>>>>>>>>>>>
>>>>>>>>>>> Best regards
>>>>>>>>>>> Philipp Krause
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>

Reply via email to