Hi! At the moment the data to parquet (block) mapping is based on a
simple modulo function: Id % #data_nodes. So with 5 data nodes all rows
with Id's 0,5,10,... are written to Parquet_0, Id's 1,4,9 are written to
Parquet_1 etc. That's what I did manually. Since the parquet file size
and the block size are both set to 64MB, each parquet file will result
in one block when I transfer the parquet files to HDFS. By default, HDFS
distributes the blocks randomly. For test purposes I transferred
corresponding blocks from Table_A and Table_B to the same data node
(Table_A - Block_X with Id's 0,5,10 and Table_B - Block_Y with Id's
0,5,10). In this case, they are transferred to data_node_0 because the
modulo function (which I want to implement in the scheduler) returns 0
for these Id's. This is also done manually at the moment.
1.) DistributedPlanner: For first, upcoming tests I simply changed the
first condition in the DistributedPlanner to true to avoid exchange nodes.
2.) The scheduler: That's the part I'm currently struggling with. For
first tests, block replication is deactivated. I'm not sure how / where
to implement the modulo function for scan range to host mapping. Without
the modulo function, I had to implement a hard coded mapping (something
like "range" 0-0, 5-5, 10-10 -> Data_node_0 etc.). Is that correct?
Instead I would like to use a slightly more flexible solution by the
help of this modulo function for the host mapping.
I would be really grateful if you could give me a hint for the
scheduling implementation. I try to go deeper through the code meanwhile.
Best regards and thank you in advance
Philipp
Am 14.03.2018 um 08:06 schrieb Philipp Krause:
Thank you very much for these information! I'll try to implement these
two steps and post some updates within the next days!
Best regards
Philipp
2018-03-13 5:38 GMT+01:00 Alexander Behm <alex.b...@cloudera.com
<mailto:alex.b...@cloudera.com>>:
Cool that you working on a research project with Impala!
Properly adding such a feature to Impala is a substantial effort,
but hacking the code for an experiment or two seems doable.
I think you will need to modify two things: (1) the planner to not
add exchange nodes, and (2) the scheduler to assign the co-located
scan ranges to the same host.
Here are a few starting points in the code:
1) DistributedPlanner
https://github.com/apache/impala/blob/master/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java#L318
<https://github.com/apache/impala/blob/master/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java#L318>
The first condition handles the case where no exchange nodes need
to be added because the join inputs are already suitably partitioned.
You could hack the code to always go into that codepath, so no
exchanges are added.
2) The scheduler
https://github.com/apache/impala/blob/master/be/src/scheduling/scheduler.cc#L226
<https://github.com/apache/impala/blob/master/be/src/scheduling/scheduler.cc#L226>
You'll need to dig through and understand that code so that you
can make the necessary changes. Change the scan range to host
mapping to your liking. The rest of the code should just work.
Cheers,
Alex
On Mon, Mar 12, 2018 at 6:55 PM, Philipp Krause
<philippkrause.m...@googlemail.com
<mailto:philippkrause.m...@googlemail.com>> wrote:
Thank you very much for your quick answers!
The intention behind this is to improve the execution time and
(primarily) to examine the impact of block-co-location
(research project) for this particular query (simplified):
select A.x, B.y, A.z from tableA as A inner join tableB as B
on A.id=B.id
The "real" query includes three joins and the data size is in
pb-range. Therefore several nodes (5 in the test environment
with less data) are used (without any load balancer).
Could you give me some hints what code changes are required
and which files are affected? I don't know how to give Impala
the information that it should only join the local data blocks
on each node and then pass it to the "final" node which
receives all intermediate results. I hope you can help me to
get this working. That would be awesome!
Best regards
Philipp
Am 12.03.2018 um 18:38 schrieb Alexander Behm:
I suppose one exception is if your data lives only on a
single node. Then you can set num_nodes=1 and make sure to
send the query request to the impalad running on the same
data node as the target data. Then you should get a local join.
On Mon, Mar 12, 2018 at 9:30 AM, Alexander Behm
<alex.b...@cloudera.com <mailto:alex.b...@cloudera.com>> wrote:
Such a specific block arrangement is very uncommon for
typical Impala setups, so we don't attempt to recognize
and optimize this narrow case. In particular, such an
arrangement tends to be short lived if you have the HDFS
balancer turned on.
Without making code changes, there is no way today to
remove the data exchanges and make sure that the
scheduler assigns scan splits to nodes in the desired way
(co-located, but with possible load imbalance).
In what way is the current setup unacceptable to you? Is
this pre-mature optimization? If you have certain
performance expectations/requirements for specific
queries we might be able to help you improve those. If
you want to pursue this route, please help us by posting
complete query profiles.
Alex
On Mon, Mar 12, 2018 at 6:29 AM, Philipp Krause
<philippkrause.m...@googlemail.com
<mailto:philippkrause.m...@googlemail.com>> wrote:
Hello everyone!
In order to prevent network traffic, I'd like to
perform local joins on each node instead of
exchanging the data and perform a join over the
complete data afterwards. My query is basically a
join over three three tables on an ID attribute. The
blocks are perfectly distributed, so that e.g. Table
A - Block 0 and Table B - Block 0 are on the same
node. These blocks contain all data rows with an ID
range [0,1]. Table A - Block 1 and Table B - Block 1
with an ID range [2,3] are on another node etc. So I
want to perform a local join per node because any
data exchange would be unneccessary (except for the
last step when the final node recevieves all results
of the other nodes). Is this possible?
At the moment the query plan includes multiple data
exchanges, although the blocks are already perfectly
distributed (manually).
I would be grateful for any help!
Best regards
Philipp Krause