Here's the foll list. It might not be minimal, but copying/overwriting
these should work.
If you are only modifying the Java portion (like DistributedPlanner),
then only copying/replacing the *.jar files should be sufficient.
On Fri, Apr 13, 2018 at 11:00 AM, Philipp Krause
Yes, I have a running (virtual) cluster. I would try to follow
your way with the custom impala build (DistributedPlanner.java is
the only modified file at the moment). Thank you in advance for
the file list!
Alexander Behm <alex.b...@cloudera.com
<mailto:alex.b...@cloudera.com>> schrieb am Fr., 13. Apr. 2018, 18:45:
I'm not really following your installation/setup and am not an
expert on Cloudera Manager installation/config. If you are
going to build Impala anyway, it's probably easiest to test on
Impala's minicluster first.
In general, if you have a running Cloudera Managed cluster,
you can deploy a custom Impala build by simply overwriting the
Impala existing binaries and jars with the new build. If you
want to go this route, I can give you a full list of files you
need to replace.
On Tue, Apr 10, 2018 at 11:44 AM, Philipp Krause
Thank you for the explanation! Yes, I'm using HDFS. The
single replica setup is only for test purposes at the
moment. I think this makes it easier to gain some first
results since less modifications (scheduler etc.) are
I would like to test the DistributedPlanner modification
in my virtual cluster. I used a customized Vagrant script
to install Impala on multiple hosts (s.attachment). It
simply installs cloudera-manager-server-db,
cloudera-manager-server and cloudera-manager-daemons via
apt-get. What would be the simplest solution to setup my
modified version? Could I simply call ./buildall.sh and
change the script to sth. like this?
echo "Install java..."
apt-get -q -y --force-yes install oracle-j2sdk1.7
echo "Download impala..."
wget https://... where I uploaded my modified version
echo "Extract impala..."
tar -xvzf Impala-cdh5-trunk.tar.gz
echo "Build impala..."
echo "Start impala instances..."
service cloudera-scm-server-db initdb
service cloudera-scm-server-db start
service cloudera-scm-server start
Or is there another, maybe even easier method, to test the
code? Maybe via bootstrap_development.sh / minicluster?
2018-04-05 18:39 GMT+02:00 Alexander Behm
Apologies for the late response. Btw, your previous
post was clear enough to me, so no worries :)
On Wed, Apr 4, 2018 at 7:46 AM, Philipp Krause
I think my previous post has been too long and
confusing. I apologize for that!
If replicas are completely deactivated, all scan
ranges of a block are mapped to the one host,
where the block is located on. This host is the
"executor"/reader for all the scan ranges of this
block. Is that correct?
Yes, assuming you are using HDFS.
I tried to visualize my understanding of the
scan_range to host mapping for my use case (s.
attachment). Could you please have a quick look at
it and tell me if this is correct?
"The existing scan range assignment is scan-node
centric. For each scan node, we independently
decide which of its scan ranges should be
processed by which host."
Without replicas, all scan ranges of a block would
be assigend to the same host where this block is
located on. Isn't everything local here, so that
Table_A - Block_0 and Table_B - Block_0 can be
joined local or are further steps neccessary? The
condition in the DistributedPlanner you pointed to
me is set to false (no exchange nodes).
"You want it to be host-centric. For each host,
collect the local scan ranges of *all* scan nodes,
and assign them to that host."
Wouldn't the standard setup from above work?
Wouldn't I assign all (the same) scan ranges to
each host in this case here?
The standard setup works only in if every block only
has exactly one replica. For our purposes, that is
basically never the case (who would store production
data without replication?), so the single-replica
assumption was not clear to me.
Does your current setup (only changing the planner and
not the scheduler) produce the expected results?
Thank you very much!
2018-03-28 21:04 GMT+02:00 Philipp Krause
Thank you for your answer and sorry for my delay!
If my understanding is correct, the list of
scan nodes consists of all nodes which contain
a *local* block from a table that is needed
for the query (Assumption: I have no replicas
in my first tests). If TableA-Block0 is on
Node_0, isn't Node_0 automatically a scan
node? And wouldn't this scan node always be
the host for the complete scan range(s) then?
"For each scan node, we independently decide
which of its scan ranges should be processed
by which host."
// Loop over all scan ranges, select an
executor for those with local impalads and
// collect all others for later processing.
So in this whole block, scan ranges are
assigned to the closest executor (=host?). But
isn't the closest executor always the node the
block is located on (assumed impalad is
installed and I have no replicas)? And isn't
this node always a scan node at the same time?
Otherwise a thread on a remote host had to
read the corresponding scan range, which would
be more expensive. The only exception I can
think of is when all threads on the local node
are busy. Or, if I use replicas and all other
threads of my node with the "original" block
are busy, a thread on another node which
contains a replica could read a special scan
range of its local block. Is my understanding
Aren't all scan ranges read locally by its
scan nodes if I have impalad installed on all
nodes? And am I right, that the scan range is
only based on its length which refers to
maxScanRangeLength in computeScanRangeLocations?
I hope you can help me with the scan node <->
scan range->host relationship. If I have
Table_A-Block_0 and Table_B_Block_0 on the
same node (which I want to join locally), I
don't get the point of why scan ranges could
be assigned to another host in my scenario.
Best regads and thank you very much!
Am 21.03.2018 um 05:21 schrieb Alexander Behm:
Thanks for following up. I think I understand
If you want to not think about scan ranges,
then you can modify
example, you could change it to produce one
scan range per file or per HDFS block. That
way you'd know exactly what a scan range
I think the easiest/fastest way for you to
make progress is to re-implement the existing
scan range assignment logic in that place in
the code I had pointed you to. There is no
quick fix to change the existing behavior.
The existing scan range assignment is
scan-node centric. For each scan node, we
independently decide which of its scan ranges
should be processed by which host.
I believe an algorithm to achieve your goal
would look completely different. You want it
to be host-centric. For each host, collect
the local scan ranges of *all* scan nodes,
and assign them to that host.
Does that make sense?
On Mon, Mar 19, 2018 at 1:02 PM, Philipp
I'd like to provide a small example for
our purpose. The last post may be a bit
confusing, so here's a very simple
example in the attached pdf file. I hope,
it's understandable. Otherwise, please
give me a short feedback.
Basically, I only want each data node to
join all it's local blocks. Is there a
range mapping needed or is it possible to
easily join all local blocks (regardless
of its content) since everything is
already "prepared"? Maybe you can clarify
this for me.
As you can see in the example, the tables
are not partitioned by ID. The files are
manually prepared by the help of the
modulo function. So I don't have a range
like [0,10], but something like 0,5,10,15
I hope, I didn't make it too complicated
and confusing. I think, the actual idea
behind this is really simple and I hope
you can help me to get this working.
Best regards and thank you very much for
Am 18.03.2018 um 17:32 schrieb Philipp
Hi! At the moment the data to parquet
(block) mapping is based on a simple
modulo function: Id % #data_nodes. So
with 5 data nodes all rows with Id's
0,5,10,... are written to Parquet_0,
Id's 1,4,9 are written to Parquet_1 etc.
That's what I did manually. Since the
parquet file size and the block size are
both set to 64MB, each parquet file will
result in one block when I transfer the
parquet files to HDFS. By default, HDFS
distributes the blocks randomly. For
test purposes I transferred
corresponding blocks from Table_A and
Table_B to the same data node (Table_A -
Block_X with Id's 0,5,10 and Table_B -
Block_Y with Id's 0,5,10). In this case,
they are transferred to data_node_0
because the modulo function (which I
want to implement in the scheduler)
returns 0 for these Id's. This is also
done manually at the moment.
1.) DistributedPlanner: For first,
upcoming tests I simply changed the
first condition in the
DistributedPlanner to true to avoid
2.) The scheduler: That's the part I'm
currently struggling with. For first
tests, block replication is deactivated.
I'm not sure how / where to implement
the modulo function for scan range to
host mapping. Without the modulo
function, I had to implement a hard
coded mapping (something like "range"
0-0, 5-5, 10-10 -> Data_node_0 etc.). Is
that correct? Instead I would like to
use a slightly more flexible solution by
the help of this modulo function for the
I would be really grateful if you could
give me a hint for the scheduling
implementation. I try to go deeper
through the code meanwhile.
Best regards and thank you in advance
Am 14.03.2018 um 08:06 schrieb Philipp
Thank you very much for these
information! I'll try to implement
these two steps and post some updates
within the next days!
2018-03-13 5:38 GMT+01:00 Alexander
Cool that you working on a research
project with Impala!
Properly adding such a feature to
Impala is a substantial effort, but
hacking the code for an experiment
or two seems doable.
I think you will need to modify two
things: (1) the planner to not add
exchange nodes, and (2) the
scheduler to assign the co-located
scan ranges to the same host.
Here are a few starting points in
The first condition handles the
case where no exchange nodes need
to be added because the join inputs
are already suitably partitioned.
You could hack the code to always
go into that codepath, so no
exchanges are added.
2) The scheduler
You'll need to dig through and
understand that code so that you
can make the necessary changes.
Change the scan range to host
mapping to your liking. The rest of
the code should just work.
On Mon, Mar 12, 2018 at 6:55 PM,
Thank you very much for your
The intention behind this is to
improve the execution time and
(primarily) to examine the
impact of block-co-location
(research project) for this
particular query (simplified):
select A.x, B.y, A.z from
tableA as A inner join tableB
as B on A.id=B.id
The "real" query includes three
joins and the data size is in
pb-range. Therefore several
nodes (5 in the test
environment with less data) are
used (without any load balancer).
Could you give me some hints
what code changes are required
and which files are affected? I
don't know how to give Impala
the information that it should
only join the local data blocks
on each node and then pass it
to the "final" node which
receives all intermediate
results. I hope you can help me
to get this working. That would
Am 12.03.2018 um 18:38 schrieb
I suppose one exception is if
your data lives only on a
single node. Then you can set
num_nodes=1 and make sure to
send the query request to the
impalad running on the same
data node as the target data.
Then you should get a local join.
On Mon, Mar 12, 2018 at 9:30
AM, Alexander Behm
Such a specific block
arrangement is very
uncommon for typical
Impala setups, so we don't
attempt to recognize and
optimize this narrow case.
In particular, such an
arrangement tends to be
short lived if you have
the HDFS balancer turned on.
Without making code
changes, there is no way
today to remove the data
exchanges and make sure
that the scheduler assigns
scan splits to nodes in
the desired way
(co-located, but with
possible load imbalance).
In what way is the current
setup unacceptable to you?
Is this pre-mature
optimization? If you have
for specific queries we
might be able to help you
improve those. If you want
to pursue this route,
please help us by posting
complete query profiles.
On Mon, Mar 12, 2018 at
6:29 AM, Philipp Krause
In order to prevent
network traffic, I'd
like to perform local
joins on each node
instead of exchanging
the data and perform a
join over the complete
data afterwards. My
query is basically a
join over three three
tables on an ID
attribute. The blocks
distributed, so that
e.g. Table A - Block
0 and Table B - Block
0 are on the same
node. These blocks
contain all data rows
with an ID range
[0,1]. Table A - Block
1 and Table B - Block
1 with an ID range
[2,3] are on another
node etc. So I want to
perform a local join
per node because any
data exchange would be
for the last step when
the final node
recevieves all results
of the other nodes).
Is this possible?
At the moment the
query plan includes
the blocks are already
I would be grateful
for any help!