Re: Local join instead of data exchange - co-located blocks

2018-06-25 Thread Lars Volker
Hi Philip,

In
https://github.com/apache/impala/blob/master/be/src/scheduling/scheduler.cc#L749
you see that we call ComputeFragmentExecParams() and
ComputeBackendExecParams(). You should check both to see that they're doing
the right thing in your case. The former also modifies the
per_node_scan_ranges here:
https://github.com/apache/impala/blob/master/be/src/scheduling/scheduler.cc#L400

I hope that helps with your debugging.

Cheers, Lars


On Mon, Jun 25, 2018 at 3:27 AM Philipp Krause <
philippkrause.m...@googlemail.com> wrote:

> Hi Lars,
>
> thank you, the implementation of the modified impalad version works fine
> now. I've implemented some more logs in the scheduler.cc and
> fragment-instance-state.cc to gain some more information about where the
> problem with the empty build side could result from. Among others I printed
> the plan_exec_info, node_id and fragment in ComputeScanRangeAssignment (s.
> attachment). All scan range prints in the ComputeScanRangeAssignment (s.
> plan_exec_info.txt, scan_range_node_0/1.txt, fragment_node_0/1.txt) are
> correct to me. The first incorrect print results from the
> per_node_scan_ranges in "Status FragmentInstanceState::Prepare()".
> per_node_scan_ranges are included in instance_ctx so there must be a
> modification of the per_node_scan_ranges between the scheduler.cc and
> fragment-instance-state.cc. Unfortunately I don't know where.
>
> I also compared all values to the original Impala as well as to the
> num_node=1 setting.
>
> In the original version are more exchanges in the per_node_scan_ranges
> (fragment-instance-state.cc) like:
>
> 01: fragment_idx (i32) = 0,
> ...
> 05: per_exch_num_senders (map) = map[1] {
> 5 -> 1,
>   },
>
>
> and
>
> 01: fragment_idx (i32) = 1,
> ...
> 05: per_exch_num_senders (map) = map[2] {
> 3 -> 1,
> 4 -> 1,
>   },
>
> There are two more fragments which contain all correct scan ranges for
> each scan node. In my modified version there's only one exchange and one
> correct scan range fragment printed. The other scan range for the build
> side is still missing (s. fragment_instance_ctx.txt). In the num_nodes=1
> version there's no exchange printed and one fragment containing all correct
> scan ranges.
>
> I think I won't get it working without your help. Maybe you have an idea
> if you see the logs? To me it seems to be an exchange problem since there
> are exchanges printed in the per_node_scan_ranges in
> fragment-instance-state.cc which actually shouldn't exist?!
>
> Otherwise, because the time is running out for our university project:
> Would it be possible to "hard code" the per_node_scan_ranges somewhere? I
> would be very happy to get at least one working configuration to do some
> measurements.
>
>   Current simplified setup: two tables t1, t2 -> partitioned by id -> all
> blocks are on node 2.
>   t1
>   1 (...769.parq)
>   0 (...105.parq)
>   0 (...316.parq)
>
>   t2
>   1 (...079.parq)
>   0 (...538.parq)
>
> Thank you so much!
> Philipp
>
> Am 18.06.2018 um 18:51 schrieb Lars Volker:
>
> Hi Philip,
>
> Apologies for the delay. Since you're currently looking for a correct
> implementation more than a fast one, I would highly recommend to use debug
> builds instead of release builds. The latter won't have DCHECKs enabled and
> you might find it much harder to debug any mistakes you make during the
> implementation (or spot bugs you unveil). That would change the "release"
> suffix to "debug" in your efforts.
>
> Otherwise the path you have is correct. Try building without the -so flag.
> Omitting it will give you a statically linked binary and you won't have to
> add all the shared libs.
>
> Cheers, Lars
>
> On Mon, Jun 18, 2018 at 3:08 AM Philipp Krause <
> philippkrause.m...@googlemail.com> wrote:
>
>> Hi Lars,
>>
>> I'm really sorry to bother you with my concern. I never had to modify
>> anything in the backend so far, that's why I was asking for the correct
>> files (or locations) to replace. I guess the locations are different
>> because I used the Cloudera Manager to install everything on my cluster?
>> I added more logs to the scheduler to get more information and try to
>> find an answer concerning the empty build side.
>>
>> If the file replacement is too complicated or cumbersome, I would like to
>> setup a mini cluster as you or Alex have already suggested earlier. I
>> already tried to set it up but unfortunately I'm getting errors for the
>> hdfs-datanodes. I created a post in the forum for that:
>>
>>
>> http://community.cloudera.com/t5/Interactive-Short-cycle-SQL/Mini-cluster-setup-CDH5-13-1/m-p/69131#M4605
>>
>> I had the same problem with the newest Impala version. Maybe you know
>> what I'm doing wrong or can provide ma a working configuration like Ubuntu
>> X and Impala version X? If we get the mini cluster working, I would try to
>> go with this tutorial which might make things a lot easier for debugging:
>>
>>
>> 

Re: Local join instead of data exchange - co-located blocks

2018-06-18 Thread Lars Volker
Hi Philip,

Apologies for the delay. Since you're currently looking for a correct
implementation more than a fast one, I would highly recommend to use debug
builds instead of release builds. The latter won't have DCHECKs enabled and
you might find it much harder to debug any mistakes you make during the
implementation (or spot bugs you unveil). That would change the "release"
suffix to "debug" in your efforts.

Otherwise the path you have is correct. Try building without the -so flag.
Omitting it will give you a statically linked binary and you won't have to
add all the shared libs.

Cheers, Lars

On Mon, Jun 18, 2018 at 3:08 AM Philipp Krause <
philippkrause.m...@googlemail.com> wrote:

> Hi Lars,
>
> I'm really sorry to bother you with my concern. I never had to modify
> anything in the backend so far, that's why I was asking for the correct
> files (or locations) to replace. I guess the locations are different
> because I used the Cloudera Manager to install everything on my cluster?
> I added more logs to the scheduler to get more information and try to find
> an answer concerning the empty build side.
>
> If the file replacement is too complicated or cumbersome, I would like to
> setup a mini cluster as you or Alex have already suggested earlier. I
> already tried to set it up but unfortunately I'm getting errors for the
> hdfs-datanodes. I created a post in the forum for that:
>
>
> http://community.cloudera.com/t5/Interactive-Short-cycle-SQL/Mini-cluster-setup-CDH5-13-1/m-p/69131#M4605
>
> I had the same problem with the newest Impala version. Maybe you know what
> I'm doing wrong or can provide ma a working configuration like Ubuntu X and
> Impala version X? If we get the mini cluster working, I would try to go
> with this tutorial which might make things a lot easier for debugging:
>
>
> https://cwiki.apache.org/confluence/display/IMPALA/Eclipse+Setup+for+Impala+Development
>
> Thanks for your time and help!
> Philipp
>
> Am 10.06.2018 um 17:58 schrieb Philipp Krause:
>
> Hi Lars,
>
> Alex sent me a list of files which I should replace when modifications are
> made. For changes in the DistributedPlanner.java I only had to replace
> "impala-frontend-0.1-SNAPSHOT.jar".
> Now I've added some more logs to the scheduler, so that I have to replace
> some more files.
>
> Impalad was found in the following locations on my server:
>
> /usr/lib/cmf/agent/build/env/lib/python2.7/site-packages/cmf-5.13.1-py2.7.egg/lib.linux-x86_64-2.7/cmf/monitor/impalad
>
> /usr/lib/cmf/agent/build/env/lib/python2.7/site-packages/cmf-5.13.1-py2.7.egg/cmf/monitor/impalad
> /usr/bin/impalad
> /var/log/impalad
> /var/log/impala-minidumps/impalad
> /var/lib/dpkg/alternatives/impalad
>
> /opt/cloudera/parcels/CDH-5.13.1-1.cdh5.13.1.p0.2/lib/debug/usr/lib/impala/sbin-debug/impalad
>
> /opt/cloudera/parcels/CDH-5.13.1-1.cdh5.13.1.p0.2/lib/debug/usr/lib/impala/sbin-retail/impalad
>
> /opt/cloudera/parcels/CDH-5.13.1-1.cdh5.13.1.p0.2/lib/impala/sbin-debug/impalad
>
> /opt/cloudera/parcels/CDH-5.13.1-1.cdh5.13.1.p0.2/lib/impala/sbin-retail/impalad
> /opt/cloudera/parcels/CDH-5.13.1-1.cdh5.13.1.p0.2/bin/impalad
> /etc/alternatives/impalad
> /impala/impalad
>
>  I tried to replace "impalad" in
> "/opt/cloudera/parcels/CDH-5.13.1-1.cdh5.13.1.p0.2/lib/impala/sbin-retail/".
> Is this the correct path? I got some errors about missing shared libraries
> after the replacement:
>  libExec.so: cannot open shared object file: No such file or directory
>  I already added like 5 other shared libaries to the same folder. But
> before I continue, I wanted to make sure if this is the correct path.
>
> Build command: ./buildall.sh -notests -so -release
>
>  I hope you can help me!
>
>  Best regards
>  Philipp
>
> Am 30.05.2018 um 19:43 schrieb Lars Volker:
>
> Hi Philipp,
>
> The ScanRangeAssignment logfile entry gets printed by the scheduler in
> L918 in PrintAssignment(). For each host and each plan node it shows the
> scan ranges assigned. per_node_scan_ranges is a per-host structure in that
> assignment. When inspecting the full logs you should be able to correlate
> those and there should not be a difference.
>
> Next you should be able to see that both scan node children of the join
> have scan ranges assigned to them. If that is not the case then the
> scheduler might have made a wrong decision.
>
> Once you get the assignments right, the join node should have data on both
> sides, regardless of exchanges.
>
> I hope this helps you track down the issues. It might help to make the
> logs more readable by renaming the files. Currently it's hard to see what
> belongs where.
>
> Cheers, Lars
>
> On Wed, May 23, 2018 at 6:25 AM, Philipp Krause <
> philippkrause.m...@googlemail.com> wrote:
>
>> Hi Lars,
>> thanks for the clarification. I was looking for the single
>> ScanRangeAssignments (node_id=0 & node_id=1) on each datanode. E.g. for
>> datanode 6 the assignment looks as follows:
>>
>> ScanRangeAssignment: server=TNetworkAddress {
>>   01: 

Re: Local join instead of data exchange - co-located blocks

2018-05-30 Thread Lars Volker
Hi Philipp,

The ScanRangeAssignment logfile entry gets printed by the scheduler in L918
in PrintAssignment(). For each host and each plan node it shows the scan
ranges assigned. per_node_scan_ranges is a per-host structure in that
assignment. When inspecting the full logs you should be able to correlate
those and there should not be a difference.

Next you should be able to see that both scan node children of the join
have scan ranges assigned to them. If that is not the case then the
scheduler might have made a wrong decision.

Once you get the assignments right, the join node should have data on both
sides, regardless of exchanges.

I hope this helps you track down the issues. It might help to make the logs
more readable by renaming the files. Currently it's hard to see what
belongs where.

Cheers, Lars

On Wed, May 23, 2018 at 6:25 AM, Philipp Krause <
philippkrause.m...@googlemail.com> wrote:

> Hi Lars,
> thanks for the clarification. I was looking for the single
> ScanRangeAssignments (node_id=0 & node_id=1) on each datanode. E.g. for
> datanode 6 the assignment looks as follows:
>
> ScanRangeAssignment: server=TNetworkAddress {
>   01: hostname (string) = "vm-cluster-node6",
>   02: port (i32) = 22000,
> }
> 17:53:33.602  INFO  cc:916
> node_id=0 ranges=TScanRangeParams {
>   01: scan_range (struct) = TScanRange {
> 01: hdfs_file_split (struct) = THdfsFileSplit {
>   01: file_name (string) = "164b134d9e26eb2c-
> 9c0a6dc80003_1787762321_data.0.parq",
> ...
>
> 17:53:33.602  INFO  cc:916
> node_id=1 ranges=TScanRangeParams {
>   01: scan_range (struct) = TScanRange {
> 01: hdfs_file_split (struct) = THdfsFileSplit {
>   01: file_name (string) = "ad42accc923aa106-
> da6778840003_857029511_data.0.parq",
> ...
>
>
> This seems to be correct to me since both corresponding partitions /
> parquet files are on the same node. Is this correct or am I mistaken here?
> I guess these lines only provide information about what partitions each
> node needs and do not refer to the final scan range to node assignment
> (what I thought first)? The latter is expressed in per_node_scan_ranges?
>
>
> TPlanFragmentInstanceCtx {
>   01: fragment_idx (i32) = 0,
>   02: fragment_instance_id (struct) = TUniqueId {
> 01: hi (i64) = -413574937583451838,
> 02: lo (i64) = 7531803561076719616,
>   },
>   03: per_fragment_instance_idx (i32) = 0,
>   04: per_node_scan_ranges (map) = map[0] {
>   },
>   05: per_exch_num_senders (map) = map[1] {
> 3 -> 5,
>   },
>   06: sender_id (i32) = -1,
> }
>
> Here, I wonder about the exchange.
>
> fragment_instance_ctx:
> TPlanFragmentInstanceCtx {
>   01: fragment_idx (i32) = 1,
>   02: fragment_instance_id (struct) = TUniqueId {
> 01: hi (i64) = -413574937583451838,
> 02: lo (i64) = 7531803561076719621,
>   },
>   03: per_fragment_instance_idx (i32) = 4,
>   04: per_node_scan_ranges (map) = map[1] {
> 0 -> list[2] {
>   [0] = TScanRangeParams {
> 01: scan_range (struct) = TScanRange {
>   01: hdfs_file_split (struct) = THdfsFileSplit {
> 01: file_name (string) = "164b134d9e26eb2c-
> 9c0a6dc80004_1915463945_data.0.parq",
> ...
>   },
> },
> ...
>   },
>   [1] = TScanRangeParams {
> 01: scan_range (struct) = TScanRange {
>   01: hdfs_file_split (struct) = THdfsFileSplit {
> 01: file_name (string) = "164b134d9e26eb2c-
> 9c0a6dc80004_1023833177_data.0.parq",
> ...
>   },
> },
> ...
>   },
> },
>   },
>   05: per_exch_num_senders (map) = map[0] {
>   },
>   06: sender_id (i32) = 4,
> }
>
> Why are only two partitions listed here (partition 0 and 5 which are on
> datanode 2)? As you already said, the build side is always empty but the
> probe side is always filled. So shouldn't be at least one partition per
> node be listed? Could you also clearify the difference between
> ScanRangeAssignments (where in my opinion everything looks correct) and
> per_node_scan_ranges to me? What I don't really get is why the build is
> empty although the correct partitions are logged in ScanRangeAssignments
> (but missing in per_node_scan_ranges). Thank you very much in advance!
>
> Best regards
> Philipp
>
> Am 21.05.2018 um 22:57 schrieb Lars Volker:
>
> Hi Philipp,
>
> The distributed profile shows that the HDFS scan on the build side of the
> join does not have any scan ranges assigned to it. You mentioned that you 
> "rechecked
> my scan assignments and they seem to be fine". You should be able to see
> them in the plan using a debugger or some print statements. Check my
> previous email for tips where to start debugging.
>
> If you search for "per_node_scan_ranges" in the log files, you'll see
> that in the num_nodes=0 case, only one node has scan ranges assigned to it.
> You might want to double check that the scheduler does what you expect in
> that case, possibly by stepping through 

Re: Local join instead of data exchange - co-located blocks

2018-05-14 Thread Lars Volker
Hi Philipp,

Looking at the profile, one of your scan nodes doesn't seem to receive any
scan ranges ("Hdfs split stats" is empty). The other one receives one
split, but it get's filtered out by the runtime filter coming from that
first node ("Files rejected: 1"). You might want to disable runtime filters
for now until you get it sorted out.

Then you might want to start debugging
in be/src/service/client-request-state.cc:466, which is where the scheduler
gets called. You mentioned that your assignments look OK, so until then
things should be correct. If you're uncomfortable poking it all apart with
GDB you can always print objects using the methods in debug-util.h. From
there go down coord_->Exec() in L480. Set query option num_nodes=1 to
execute everything at the coordinator for easier debugging. Otherwise, the
coordinator will start remote fragments, which you can intercept with a
debugger in ImpalaInternalService::ExecQueryFInstances
(be/src/service/impala-internal-service.cc:42).

Cheers, Lars

On Mon, May 14, 2018 at 1:18 AM, Philipp Krause <
philippkrause.m...@googlemail.com> wrote:

> Hello Alex,
>
> I suppose you're very busy, so I apologize for the interruption. If you
> have any idea of what I could try to solve this problem, please let me
> know. Currently I don't know how to progress and I'd appreciate any help
> you can give me.
>
> Best regards
> Philipp
>
>
> Philipp Krause  schrieb am Mo., 7. Mai
> 2018, 12:59:
>
>> I just wanted to add, that I tried the join with two other, minimal and
>> "fresh" tables. All blocks from both tables were on the same node but I got
>> the same result that no data were processed. To me, the scan range mapping
>> of my modified version looks the same compared to the original one. I only
>> noticed a difference in the query profile:
>> Filter 0 (1.00 MB):
>>  - Files processed: 1 (1)
>>  - Files rejected: 1 (1)
>> ...
>>
>> This filter only appears in my modified version. Hopefully we can find
>> the mistake.
>>
>> Am 04.05.2018 um 15:40 schrieb Philipp Krause:
>>
>> Hi!
>>
>> The query profile and the scan range mappings are attached
>> (query_profile.txt + scan_ranges.txt). The complete log file is also
>> attached. The mapping looks fine to me, I couldn't find any mistakes there.
>> For example, line 168 (scan_ranges.txt) shows that partition ID=4 is
>> assigned to node_0 and partition ID=10 is assigned to node_1. Both
>> partitions contain all id=4 rows which should be correct for the join. But
>> probably I have overlooked something in the log.
>>
>> The partition/block setup is as follows:
>> 6 Nodes (1 Namenode, 5 Datanodes)
>> Node 1:
>> Node 2: 0|0 5|5
>> Node 3: 1|1
>> Node 4: 2|2
>> Node 5: 3|3
>> Node 6: 4|4
>>
>> 0|0 means partition_0 from table A and B.
>>
>> Also thanks to Lars for the logging option, which I have used!
>>
>> Best regards
>> Philipp
>>
>> Am 04.05.2018 um 07:10 schrieb Lars Volker:
>>
>> I haven't followed this thread closely, but you can also print all scan
>> range assignments made by the scheduler by passing -vmodule=scheduler=2 as
>> a startup option. The logging happens in scheduler.cc:612
>> 
>>  .
>>
>>
>> This wiki page has a way to achieve that using environment variables:
>> https://cwiki.apache.org/confluence/display/IMPALA/Useful+Tips+for+New+
>> Impala+Developers
>>
>> Cheers, Lars
>>
>> On Thu, May 3, 2018 at 8:54 PM, Alexander Behm 
>> wrote:
>>
>>> No idea what's going on, but my guess is something is awry with the
>>> scan-range assignment. Can you attach the full profile? It's probably also
>>> good to print the scan ranges created in HdfsScanNode.
>>> computeScanRangeLocations().
>>>
>>> On Thu, May 3, 2018 at 5:51 PM, Philipp Krause >> googlemail.com> wrote:
>>>
 Hello Alex,

 I have tried out several configurations but I still couldn't find a
 solution for my problem :( In the query summary (s. attachment) it looks
 like as if no rows are read. Do you have an idea what I have to change? I
 am sorry for the circumstances and thank you once more for the great
 support to get this working!

 Am 29.04.2018 um 21:21 schrieb Philipp Krause:

 Hi Alex,
 I got the modified version working on my cluster. The query plan looks
 exactly as wanted (s. attachment). This is awesome! Unfortunately the
 result set is empty. As you can see in query_state.png, the scan progress
 always shows 50% although the query has finished.

 The only modification in the code is the if statement you pointed to me
 (I set it to true). Maybe I have to give Impala the information about the
 lhs / rhs join partition since there are no exchange nodes now (like in the
 following lines)? The corresponding  partitions / blocks of each table are
 on the same node.

 I think 

Re: Local join instead of data exchange - co-located blocks

2018-05-14 Thread Philipp Krause
Hello Alex,

I suppose you're very busy, so I apologize for the interruption. If you
have any idea of what I could try to solve this problem, please let me
know. Currently I don't know how to progress and I'd appreciate any help
you can give me.

Best regards
Philipp

Philipp Krause  schrieb am Mo., 7. Mai
2018, 12:59:

> I just wanted to add, that I tried the join with two other, minimal and
> "fresh" tables. All blocks from both tables were on the same node but I got
> the same result that no data were processed. To me, the scan range mapping
> of my modified version looks the same compared to the original one. I only
> noticed a difference in the query profile:
> Filter 0 (1.00 MB):
>  - Files processed: 1 (1)
>  - Files rejected: 1 (1)
> ...
>
> This filter only appears in my modified version. Hopefully we can find the
> mistake.
>
> Am 04.05.2018 um 15:40 schrieb Philipp Krause:
>
> Hi!
>
> The query profile and the scan range mappings are attached
> (query_profile.txt + scan_ranges.txt). The complete log file is also
> attached. The mapping looks fine to me, I couldn't find any mistakes there.
> For example, line 168 (scan_ranges.txt) shows that partition ID=4 is
> assigned to node_0 and partition ID=10 is assigned to node_1. Both
> partitions contain all id=4 rows which should be correct for the join. But
> probably I have overlooked something in the log.
>
> The partition/block setup is as follows:
> 6 Nodes (1 Namenode, 5 Datanodes)
> Node 1:
> Node 2: 0|0 5|5
> Node 3: 1|1
> Node 4: 2|2
> Node 5: 3|3
> Node 6: 4|4
>
> 0|0 means partition_0 from table A and B.
>
> Also thanks to Lars for the logging option, which I have used!
>
> Best regards
> Philipp
>
> Am 04.05.2018 um 07:10 schrieb Lars Volker:
>
> I haven't followed this thread closely, but you can also print all scan
> range assignments made by the scheduler by passing -vmodule=scheduler=2 as
> a startup option. The logging happens in scheduler.cc:612
> 
>  .
>
>
> This wiki page has a way to achieve that using environment variables:
> https://cwiki.apache.org/confluence/display/IMPALA/Useful+Tips+for+New+Impala+Developers
>
> Cheers, Lars
>
> On Thu, May 3, 2018 at 8:54 PM, Alexander Behm 
> wrote:
>
>> No idea what's going on, but my guess is something is awry with the
>> scan-range assignment. Can you attach the full profile? It's probably also
>> good to print the scan ranges created in
>> HdfsScanNode.computeScanRangeLocations().
>>
>> On Thu, May 3, 2018 at 5:51 PM, Philipp Krause <
>> philippkrause.m...@googlemail.com> wrote:
>>
>>> Hello Alex,
>>>
>>> I have tried out several configurations but I still couldn't find a
>>> solution for my problem :( In the query summary (s. attachment) it looks
>>> like as if no rows are read. Do you have an idea what I have to change? I
>>> am sorry for the circumstances and thank you once more for the great
>>> support to get this working!
>>>
>>> Am 29.04.2018 um 21:21 schrieb Philipp Krause:
>>>
>>> Hi Alex,
>>> I got the modified version working on my cluster. The query plan looks
>>> exactly as wanted (s. attachment). This is awesome! Unfortunately the
>>> result set is empty. As you can see in query_state.png, the scan progress
>>> always shows 50% although the query has finished.
>>>
>>> The only modification in the code is the if statement you pointed to me
>>> (I set it to true). Maybe I have to give Impala the information about the
>>> lhs / rhs join partition since there are no exchange nodes now (like in the
>>> following lines)? The corresponding  partitions / blocks of each table are
>>> on the same node.
>>>
>>> I think we are very close to the final result and I hope you can help me
>>> once more. Thank you so much!
>>>
>>> Best regards
>>> Philipp
>>>
>>> Am 24.04.2018 um 18:00 schrieb Alexander Behm:
>>>
>>> On Tue, Apr 24, 2018 at 5:31 AM, Philipp Krause <
>>> philippkrause.m...@googlemail.com> wrote:
>>>
 To prevent the broadcast join I could simply use the shuffle operator
 in the query:

 SELECT * FROM business_partition_1 INNER JOIN [SHUFFLE]
 business_partition_2 WHERE
 business_partition_1.businessid=business_partition_2.businessid

>>>
>>> Not sure what version of Impala you are using, and whether hints
>>> override any changes you might make. I suggest you make the code work as
>>> you wish without requiring hints.
>>>

 I think the broadcast is currently only used because of my very small
 test tables.

 This gives me the plan attached as partitioned_shuffle.png. Since my
 modified version isn't working yet, I partitioned both tables on businessid
 in Impala. The "hack" should only help to get into the if-condition if I
 partition the data manually, right?. But in this case (if the partitioning
 is done by Impala itself) Impala should get into 

Re: Local join instead of data exchange - co-located blocks

2018-04-15 Thread Philipp Krause
Hi Alex! Thank you for the list! The build of the modified cdh5-trunk 
branch (debug mode) was sucessfull. After replacing 
"impala-frontend-0.1-SNAPSHOT.jar" in 
/opt/cloudera/parcels/CDH-5.13.1-1.cdh5.13.1.p0.2/jars/ I got the 
following error in my existing cluster:
F0416 01:16:45.402997 17897 catalog.cc:69] NoSuchMethodError: 
getCatalogObjects
When I switch back to the original jar file the error is gone. So it 
must be something wrong with this file I guess. But I wonder about the 
error in catalog.cc because I didn't touch any .cc files.


I also replaced "impala-data-source-api-1.0-SNAPSHOT.jar". The other jar 
files do not exist in my impala installation (CDH-5.13.1).


What am I doing wrong?

Best regards
Philipp


Am 13.04.2018 um 20:12 schrieb Alexander Behm:
Here's the foll list. It might not be minimal, but copying/overwriting 
these should work.


debug/service/impalad
debug/service/libfesupport.so
debug/service/libService.a
release/service/impalad
release/service/libfesupport.so
release/service/libService.a
yarn-extras-0.1-SNAPSHOT.jar
impala-data-source-api-1.0-SNAPSHOT-sources.jar
impala-data-source-api-1.0-SNAPSHOT.jar
impala-frontend-0.1-SNAPSHOT-tests.jar
impala-frontend-0.1-SNAPSHOT.jar
libkudu_client.so.0.1.0
libstdc++.so.6.0.20
impala-no-sse.bc
impala-sse.bc
libimpalalzo.so

If you are only modifying the Java portion (like DistributedPlanner), 
then only copying/replacing the *.jar files should be sufficient.


On Fri, Apr 13, 2018 at 11:00 AM, Philipp Krause 
> wrote:


Yes, I have a running (virtual) cluster. I would try to follow
your way with the custom impala build (DistributedPlanner.java is
the only modified file at the moment). Thank you in advance for
the file list!

Best regards
Philipp

Alexander Behm > schrieb am Fr., 13. Apr. 2018, 18:45:

I'm not really following your installation/setup and am not an
expert on Cloudera Manager installation/config. If you are
going to build Impala anyway, it's probably easiest to test on
Impala's minicluster first.

In general, if you have a running Cloudera Managed cluster,
you can deploy a custom Impala build by simply overwriting the
Impala existing binaries and jars with the new build. If you
want to go this route, I can give you a full list of files you
need to replace.

On Tue, Apr 10, 2018 at 11:44 AM, Philipp Krause
> wrote:

Thank you for the explanation! Yes, I'm using HDFS. The
single replica setup is only for test purposes at the
moment. I think this makes it easier to gain some first
results since less modifications (scheduler etc.) are
neccessary.
I would like to test the DistributedPlanner modification
in my virtual cluster. I used a customized Vagrant script
to install Impala on multiple hosts (s.attachment). It
simply installs cloudera-manager-server-db,
cloudera-manager-server and cloudera-manager-daemons via
apt-get. What would be the simplest solution to setup my
modified version? Could I simply call ./buildall.sh and
change the script to sth. like this?

echo "Install java..."
apt-get -q -y --force-yes install oracle-j2sdk1.7
echo "Download impala..."
wget https://... where I uploaded my modified version
echo "Extract impala..."
tar -xvzf Impala-cdh5-trunk.tar.gz
cd Impala-cdh5-trunk
echo "Build impala..."
./buildall.sh
echo "Start impala instances..."
service cloudera-scm-server-db initdb
service cloudera-scm-server-db start
service cloudera-scm-server start

Or is there another, maybe even easier method, to test the
code? Maybe via bootstrap_development.sh / minicluster?

Best regards
Philipp

2018-04-05 18:39 GMT+02:00 Alexander Behm
>:

Apologies for the late response. Btw, your previous
post was clear enough to me, so no worries :)


On Wed, Apr 4, 2018 at 7:46 AM, Philipp Krause
> wrote:

Hello Alex,

I think my previous post has been too long and
confusing. I apologize for that!

If replicas are completely deactivated, all scan
ranges of a block are mapped to the one host,

Re: Local join instead of data exchange - co-located blocks

2018-04-13 Thread Alexander Behm
Here's the foll list. It might not be minimal, but copying/overwriting
these should work.

debug/service/impalad
debug/service/libfesupport.so
debug/service/libService.a
release/service/impalad
release/service/libfesupport.so
release/service/libService.a
yarn-extras-0.1-SNAPSHOT.jar
impala-data-source-api-1.0-SNAPSHOT-sources.jar
impala-data-source-api-1.0-SNAPSHOT.jar
impala-frontend-0.1-SNAPSHOT-tests.jar
impala-frontend-0.1-SNAPSHOT.jar
libkudu_client.so.0.1.0
libstdc++.so.6.0.20
impala-no-sse.bc
impala-sse.bc
libimpalalzo.so

If you are only modifying the Java portion (like DistributedPlanner), then
only copying/replacing the *.jar files should be sufficient.

On Fri, Apr 13, 2018 at 11:00 AM, Philipp Krause <
philippkrause.m...@googlemail.com> wrote:

> Yes, I have a running (virtual) cluster. I would try to follow your way
> with the custom impala build (DistributedPlanner.java is the only modified
> file at the moment). Thank you in advance for the file list!
>
> Best regards
> Philipp
>
> Alexander Behm  schrieb am Fr., 13. Apr. 2018,
> 18:45:
>
>> I'm not really following your installation/setup and am not an expert on
>> Cloudera Manager installation/config. If you are going to build Impala
>> anyway, it's probably easiest to test on Impala's minicluster first.
>>
>> In general, if you have a running Cloudera Managed cluster, you can
>> deploy a custom Impala build by simply overwriting the Impala existing
>> binaries and jars with the new build. If you want to go this route, I can
>> give you a full list of files you need to replace.
>>
>> On Tue, Apr 10, 2018 at 11:44 AM, Philipp Krause > googlemail.com> wrote:
>>
>>> Thank you for the explanation! Yes, I'm using HDFS. The single replica
>>> setup is only for test purposes at the moment. I think this makes it easier
>>> to gain some first results since less modifications (scheduler etc.) are
>>> neccessary.
>>> I would like to test the DistributedPlanner modification in my virtual
>>> cluster. I used a customized Vagrant script to install Impala on multiple
>>> hosts (s.attachment). It simply installs cloudera-manager-server-db,
>>> cloudera-manager-server and cloudera-manager-daemons via apt-get. What
>>> would be the simplest solution to setup my modified version? Could I simply
>>> call ./buildall.sh and change the script to sth. like this?
>>>
>>> echo "Install java..."
>>> apt-get -q -y --force-yes install oracle-j2sdk1.7
>>> echo "Download impala..."
>>> wget https://... where I uploaded my modified version
>>> echo "Extract impala..."
>>> tar -xvzf Impala-cdh5-trunk.tar.gz
>>> cd Impala-cdh5-trunk
>>> echo "Build impala..."
>>> ./buildall.sh
>>> echo "Start impala instances..."
>>> service cloudera-scm-server-db initdb
>>> service cloudera-scm-server-db start
>>> service cloudera-scm-server start
>>>
>>> Or is there another, maybe even easier method, to test the code? Maybe
>>> via bootstrap_development.sh / minicluster?
>>>
>>> Best regards
>>> Philipp
>>>
>>> 2018-04-05 18:39 GMT+02:00 Alexander Behm :
>>>
 Apologies for the late response. Btw, your previous post was clear
 enough to me, so no worries :)


 On Wed, Apr 4, 2018 at 7:46 AM, Philipp Krause  wrote:

> Hello Alex,
>
> I think my previous post has been too long and confusing. I apologize
> for that!
>
> If replicas are completely deactivated, all scan ranges of a block are
> mapped to the one host, where the block is located on. This host is the
> "executor"/reader for all the scan ranges of this block. Is that correct?
>

 Yes, assuming you are using HDFS.


>
> I tried to visualize my understanding of the scan_range to host
> mapping for my use case (s. attachment). Could you please have a quick 
> look
> at it and tell me if this is correct?
>
> "The existing scan range assignment is scan-node centric. For each
> scan node, we independently decide which of its scan ranges should be
> processed by which host."
> Without replicas, all scan ranges of a block would be assigend to the
> same host where this block is located on. Isn't everything local here, so
> that Table_A - Block_0 and Table_B - Block_0 can be joined local or are
> further steps neccessary? The condition in the DistributedPlanner you
> pointed to me is set to false (no exchange nodes).
>
> "You want it to be host-centric. For each host, collect the local scan
> ranges of *all* scan nodes, and assign them to that host."
> Wouldn't the standard setup from above work? Wouldn't I assign all
> (the same) scan ranges to each host in this case here?
>

 The standard setup works only in if every block only has exactly one
 replica. For our purposes, that is basically never the case (who would
 store production data without replication?), 

Re: Local join instead of data exchange - co-located blocks

2018-04-13 Thread Philipp Krause
Yes, I have a running (virtual) cluster. I would try to follow your way
with the custom impala build (DistributedPlanner.java is the only modified
file at the moment). Thank you in advance for the file list!

Best regards
Philipp

Alexander Behm  schrieb am Fr., 13. Apr. 2018,
18:45:

> I'm not really following your installation/setup and am not an expert on
> Cloudera Manager installation/config. If you are going to build Impala
> anyway, it's probably easiest to test on Impala's minicluster first.
>
> In general, if you have a running Cloudera Managed cluster, you can deploy
> a custom Impala build by simply overwriting the Impala existing binaries
> and jars with the new build. If you want to go this route, I can give you a
> full list of files you need to replace.
>
> On Tue, Apr 10, 2018 at 11:44 AM, Philipp Krause <
> philippkrause.m...@googlemail.com> wrote:
>
>> Thank you for the explanation! Yes, I'm using HDFS. The single replica
>> setup is only for test purposes at the moment. I think this makes it easier
>> to gain some first results since less modifications (scheduler etc.) are
>> neccessary.
>> I would like to test the DistributedPlanner modification in my virtual
>> cluster. I used a customized Vagrant script to install Impala on multiple
>> hosts (s.attachment). It simply installs cloudera-manager-server-db,
>> cloudera-manager-server and cloudera-manager-daemons via apt-get. What
>> would be the simplest solution to setup my modified version? Could I simply
>> call ./buildall.sh and change the script to sth. like this?
>>
>> echo "Install java..."
>> apt-get -q -y --force-yes install oracle-j2sdk1.7
>> echo "Download impala..."
>> wget https://... where I uploaded my modified version
>> echo "Extract impala..."
>> tar -xvzf Impala-cdh5-trunk.tar.gz
>> cd Impala-cdh5-trunk
>> echo "Build impala..."
>> ./buildall.sh
>> echo "Start impala instances..."
>> service cloudera-scm-server-db initdb
>> service cloudera-scm-server-db start
>> service cloudera-scm-server start
>>
>> Or is there another, maybe even easier method, to test the code? Maybe
>> via bootstrap_development.sh / minicluster?
>>
>> Best regards
>> Philipp
>>
>> 2018-04-05 18:39 GMT+02:00 Alexander Behm :
>>
>>> Apologies for the late response. Btw, your previous post was clear
>>> enough to me, so no worries :)
>>>
>>>
>>> On Wed, Apr 4, 2018 at 7:46 AM, Philipp Krause <
>>> philippkrause.m...@googlemail.com> wrote:
>>>
 Hello Alex,

 I think my previous post has been too long and confusing. I apologize
 for that!

 If replicas are completely deactivated, all scan ranges of a block are
 mapped to the one host, where the block is located on. This host is the
 "executor"/reader for all the scan ranges of this block. Is that correct?

>>>
>>> Yes, assuming you are using HDFS.
>>>
>>>

 I tried to visualize my understanding of the scan_range to host mapping
 for my use case (s. attachment). Could you please have a quick look at it
 and tell me if this is correct?

 "The existing scan range assignment is scan-node centric. For each scan
 node, we independently decide which of its scan ranges should be processed
 by which host."
 Without replicas, all scan ranges of a block would be assigend to the
 same host where this block is located on. Isn't everything local here, so
 that Table_A - Block_0 and Table_B - Block_0 can be joined local or are
 further steps neccessary? The condition in the DistributedPlanner you
 pointed to me is set to false (no exchange nodes).

 "You want it to be host-centric. For each host, collect the local scan
 ranges of *all* scan nodes, and assign them to that host."
 Wouldn't the standard setup from above work? Wouldn't I assign all (the
 same) scan ranges to each host in this case here?

>>>
>>> The standard setup works only in if every block only has exactly one
>>> replica. For our purposes, that is basically never the case (who would
>>> store production data without replication?), so the single-replica
>>> assumption was not clear to me.
>>>
>>> Does your current setup (only changing the planner and not the
>>> scheduler) produce the expected results?
>>>
>>>

 Thank you very much!

 Best regards
 Philipp

 2018-03-28 21:04 GMT+02:00 Philipp Krause <
 philippkrause.m...@googlemail.com>:

> Thank you for your answer and sorry for my delay!
>
> If my understanding is correct, the list of scan nodes consists of all
> nodes which contain a *local* block from a table that is needed for the
> query (Assumption: I have no replicas in my first tests). If TableA-Block0
> is on Node_0, isn't Node_0 automatically a scan node? And wouldn't this
> scan node always be the host for the complete scan range(s) then?
>
> "For each scan node, we independently decide which of its scan ranges

Re: Local join instead of data exchange - co-located blocks

2018-04-05 Thread Alexander Behm
Apologies for the late response. Btw, your previous post was clear enough
to me, so no worries :)


On Wed, Apr 4, 2018 at 7:46 AM, Philipp Krause <
philippkrause.m...@googlemail.com> wrote:

> Hello Alex,
>
> I think my previous post has been too long and confusing. I apologize for
> that!
>
> If replicas are completely deactivated, all scan ranges of a block are
> mapped to the one host, where the block is located on. This host is the
> "executor"/reader for all the scan ranges of this block. Is that correct?
>

Yes, assuming you are using HDFS.


>
> I tried to visualize my understanding of the scan_range to host mapping
> for my use case (s. attachment). Could you please have a quick look at it
> and tell me if this is correct?
>
> "The existing scan range assignment is scan-node centric. For each scan
> node, we independently decide which of its scan ranges should be processed
> by which host."
> Without replicas, all scan ranges of a block would be assigend to the same
> host where this block is located on. Isn't everything local here, so that
> Table_A - Block_0 and Table_B - Block_0 can be joined local or are further
> steps neccessary? The condition in the DistributedPlanner you pointed to me
> is set to false (no exchange nodes).
>
> "You want it to be host-centric. For each host, collect the local scan
> ranges of *all* scan nodes, and assign them to that host."
> Wouldn't the standard setup from above work? Wouldn't I assign all (the
> same) scan ranges to each host in this case here?
>

The standard setup works only in if every block only has exactly one
replica. For our purposes, that is basically never the case (who would
store production data without replication?), so the single-replica
assumption was not clear to me.

Does your current setup (only changing the planner and not the scheduler)
produce the expected results?


>
> Thank you very much!
>
> Best regards
> Philipp
>
> 2018-03-28 21:04 GMT+02:00 Philipp Krause  googlemail.com>:
>
>> Thank you for your answer and sorry for my delay!
>>
>> If my understanding is correct, the list of scan nodes consists of all
>> nodes which contain a *local* block from a table that is needed for the
>> query (Assumption: I have no replicas in my first tests). If TableA-Block0
>> is on Node_0, isn't Node_0 automatically a scan node? And wouldn't this
>> scan node always be the host for the complete scan range(s) then?
>>
>> "For each scan node, we independently decide which of its scan ranges
>> should be processed by which host."
>>
>> https://github.com/cloudera/Impala/blob/cdh5-trunk/be/src/sc
>> heduling/scheduler.cc#L532
>> // Loop over all scan ranges, select an executor for those with local
>> impalads and
>> // collect all others for later processing.
>>
>> So in this whole block, scan ranges are assigned to the closest executor
>> (=host?). But isn't the closest executor always the node the block is
>> located on (assumed impalad is installed and I have no replicas)? And isn't
>> this node always a scan node at the same time? Otherwise a thread on a
>> remote host had to read the corresponding scan range, which would be more
>> expensive. The only exception I can think of is when all threads on the
>> local node are busy. Or, if I use replicas and all other threads of my node
>> with the "original" block are busy, a thread on another node which contains
>> a replica could read a special scan range of its local block. Is my
>> understanding correct here?
>>
>> Aren't all scan ranges read locally by its scan nodes if I have impalad
>> installed on all nodes? And am I right, that the scan range is only based
>> on its length which refers to maxScanRangeLength in
>> computeScanRangeLocations?
>> https://github.com/cloudera/Impala/blob/cdh5-trunk/fe/src/ma
>> in/java/org/apache/impala/planner/HdfsScanNode.java#L721
>>
>> I hope you can help me with the scan node <-> scan range->host
>> relationship. If I have Table_A-Block_0 and Table_B_Block_0 on the same
>> node (which I want to join locally), I don't get the point of why scan
>> ranges could be assigned to another host in my scenario.
>>
>> Best regads and thank you very much!
>> Philipp Krause
>>
>> Am 21.03.2018 um 05:21 schrieb Alexander Behm:
>>
>> Thanks for following up. I think I understand your setup.
>>
>> If you want to not think about scan ranges, then you can modify
>> HdfsScanNode.computeScanRangeLocations(). For example, you could change
>> it to produce one scan range per file or per HDFS block. That way you'd
>> know exactly what a scan range corresponds to.
>>
>> I think the easiest/fastest way for you to make progress is to
>> re-implement the existing scan range assignment logic in that place in the
>> code I had pointed you to. There is no quick fix to change the existing
>> behavior.
>> The existing scan range assignment is scan-node centric. For each scan
>> node, we independently decide which of its scan ranges should be processed
>> by which host.

Re: Local join instead of data exchange - co-located blocks

2018-04-05 Thread Alexander Behm
On Wed, Mar 28, 2018 at 12:04 PM, Philipp Krause  wrote:

> Thank you for your answer and sorry for my delay!
>
> If my understanding is correct, the list of scan nodes consists of all
> nodes which contain a *local* block from a table that is needed for the
> query (Assumption: I have no replicas in my first tests). If TableA-Block0
> is on Node_0, isn't Node_0 automatically a scan node? And wouldn't this
> scan node always be the host for the complete scan range(s) then?
>

Yes, that is correct assuming you are using HDFS. The "single replica"
assumption was not clear to me. If that's the case then your current setup
that only changes the planner (and not the scheduler) should work. Is that
not the case?


>
> "For each scan node, we independently decide which of its scan ranges
> should be processed by which host."
>
> https://github.com/cloudera/Impala/blob/cdh5-trunk/be/src/sc
> heduling/scheduler.cc#L532
> // Loop over all scan ranges, select an executor for those with local
> impalads and
> // collect all others for later processing.
>
> So in this whole block, scan ranges are assigned to the closest executor
> (=host?). But isn't the closest executor always the node the block is
> located on (assumed impalad is installed and I have no replicas)? And isn't
> this node always a scan node at the same time? Otherwise a thread on a
> remote host had to read the corresponding scan range, which would be more
> expensive. The only exception I can think of is when all threads on the
> local node are busy. Or, if I use replicas and all other threads of my node
> with the "original" block are busy, a thread on another node which contains
> a replica could read a special scan range of its local block. Is my
> understanding correct here?
>

Yes, the closest host is always the one that has a local replica (you have
a single replica). By default, Impala is stupid and will try to assign
local only. Scheduling does not consider host load.

>
> Aren't all scan ranges read locally by its scan nodes if I have impalad
> installed on all nodes? And am I right, that the scan range is only based
> on its length which refers to maxScanRangeLength in
> computeScanRangeLocations?
> https://github.com/cloudera/Impala/blob/cdh5-trunk/fe/src/ma
> in/java/org/apache/impala/planner/HdfsScanNode.java#L721
>

Yes to all questions.

>
>
> I hope you can help me with the scan node <-> scan range->host
> relationship. If I have Table_A-Block_0 and Table_B_Block_0 on the same
> node (which I want to join locally), I don't get the point of why scan
> ranges could be assigned to another host in my scenario.
>

See first response.

>
> Best regads and thank you very much!
> Philipp Krause
>
> Am 21.03.2018 um 05:21 schrieb Alexander Behm:
>
> Thanks for following up. I think I understand your setup.
>
> If you want to not think about scan ranges, then you can modify
> HdfsScanNode.computeScanRangeLocations(). For example, you could change
> it to produce one scan range per file or per HDFS block. That way you'd
> know exactly what a scan range corresponds to.
>
> I think the easiest/fastest way for you to make progress is to
> re-implement the existing scan range assignment logic in that place in the
> code I had pointed you to. There is no quick fix to change the existing
> behavior.
> The existing scan range assignment is scan-node centric. For each scan
> node, we independently decide which of its scan ranges should be processed
> by which host.
>
> I believe an algorithm to achieve your goal would look completely
> different. You want it to be host-centric. For each host, collect the local
> scan ranges of *all* scan nodes, and assign them to that host.
>
> Does that make sense?
>
> Alex
>
>
> On Mon, Mar 19, 2018 at 1:02 PM, Philipp Krause <
> philippkrause.m...@googlemail.com> wrote:
>
>> I'd like to provide a small example for our purpose. The last post may be
>> a bit confusing, so here's a very simple example in the attached pdf file.
>> I hope, it's understandable. Otherwise, please give me a short feedback.
>>
>> Basically, I only want each data node to join all it's local blocks. Is
>> there a range mapping needed or is it possible to easily join all local
>> blocks (regardless of its content) since everything is already "prepared"?
>> Maybe you can clarify this for me.
>>
>> As you can see in the example, the tables are not partitioned by ID. The
>> files are manually prepared by the help of the modulo function. So I don't
>> have a range like [0,10], but something like 0,5,10,15 etc.
>>
>> I hope, I didn't make it too complicated and confusing. I think, the
>> actual idea behind this is really simple and I hope you can help me to get
>> this working.
>>
>> Best regards and thank you very much for your time!
>> Philipp
>>
>> Am 18.03.2018 um 17:32 schrieb Philipp Krause:
>>
>> Hi! At the moment the data to parquet (block) mapping is based on a
>> simple modulo function: Id % #data_nodes. 

Re: Local join instead of data exchange - co-located blocks

2018-03-28 Thread Philipp Krause

Thank you for your answer and sorry for my delay!

If my understanding is correct, the list of scan nodes consists of all 
nodes which contain a *local* block from a table that is needed for the 
query (Assumption: I have no replicas in my first tests). If 
TableA-Block0 is on Node_0, isn't Node_0 automatically a scan node? And 
wouldn't this scan node always be the host for the complete scan 
range(s) then?


"For each scan node, we independently decide which of its scan ranges 
should be processed by which host."


https://github.com/cloudera/Impala/blob/cdh5-trunk/be/src/scheduling/scheduler.cc#L532 

// Loop over all scan ranges, select an executor for those with local 
impalads and

// collect all others for later processing.

So in this whole block, scan ranges are assigned to the closest executor 
(=host?). But isn't the closest executor always the node the block is 
located on (assumed impalad is installed and I have no replicas)? And 
isn't this node always a scan node at the same time? Otherwise a thread 
on a remote host had to read the corresponding scan range, which would 
be more expensive. The only exception I can think of is when all threads 
on the local node are busy. Or, if I use replicas and all other threads 
of my node with the "original" block are busy, a thread on another node 
which contains a replica could read a special scan range of its local 
block. Is my understanding correct here?


Aren't all scan ranges read locally by its scan nodes if I have impalad 
installed on all nodes? And am I right, that the scan range is only 
based on its length which refers to maxScanRangeLength in 
computeScanRangeLocations?

https://github.com/cloudera/Impala/blob/cdh5-trunk/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java#L721

I hope you can help me with the scan node <-> scan range->host 
relationship. If I have Table_A-Block_0 and Table_B_Block_0 on the same 
node (which I want to join locally), I don't get the point of why scan 
ranges could be assigned to another host in my scenario.


Best regads and thank you very much!
Philipp Krause


Am 21.03.2018 um 05:21 schrieb Alexander Behm:

Thanks for following up. I think I understand your setup.

If you want to not think about scan ranges, then you can modify 
HdfsScanNode.computeScanRangeLocations(). For example, you could 
change it to produce one scan range per file or per HDFS block. That 
way you'd know exactly what a scan range corresponds to.


I think the easiest/fastest way for you to make progress is to 
re-implement the existing scan range assignment logic in that place in 
the code I had pointed you to. There is no quick fix to change the 
existing behavior.
The existing scan range assignment is scan-node centric. For each scan 
node, we independently decide which of its scan ranges should be 
processed by which host.


I believe an algorithm to achieve your goal would look completely 
different. You want it to be host-centric. For each host, collect the 
local scan ranges of *all* scan nodes, and assign them to that host.


Does that make sense?

Alex


On Mon, Mar 19, 2018 at 1:02 PM, Philipp Krause 
> wrote:


I'd like to provide a small example for our purpose. The last post
may be a bit confusing, so here's a very simple example in the
attached pdf file. I hope, it's understandable. Otherwise, please
give me a short feedback.

Basically, I only want each data node to join all it's local
blocks. Is there a range mapping needed or is it possible to
easily join all local blocks (regardless of its content) since
everything is already "prepared"? Maybe you can clarify this for me.

As you can see in the example, the tables are not partitioned by
ID. The files are manually prepared by the help of the modulo
function. So I don't have a range like [0,10], but something like
0,5,10,15 etc.

I hope, I didn't make it too complicated and confusing. I think,
the actual idea behind this is really simple and I hope you can
help me to get this working.

Best regards and thank you very much for your time!
Philipp


Am 18.03.2018 um 17:32 schrieb Philipp Krause:


Hi! At the moment the data to parquet (block) mapping is based on
a simple modulo function: Id % #data_nodes. So with 5 data nodes
all rows with Id's 0,5,10,... are written to Parquet_0, Id's
1,4,9 are written to Parquet_1 etc. That's what I did manually.
Since the parquet file size and the block size are both set to
64MB, each parquet file will result in one block when I transfer
the parquet files to HDFS. By default, HDFS distributes the
blocks randomly. For test purposes I transferred corresponding
blocks from Table_A and Table_B to the same data node (Table_A -
Block_X with Id's 0,5,10 and Table_B - Block_Y with Id's 0,5,10).
In this case, they are transferred to 

Re: Local join instead of data exchange - co-located blocks

2018-03-20 Thread Alexander Behm
Thanks for following up. I think I understand your setup.

If you want to not think about scan ranges, then you can modify
HdfsScanNode.computeScanRangeLocations(). For example, you could change it
to produce one scan range per file or per HDFS block. That way you'd know
exactly what a scan range corresponds to.

I think the easiest/fastest way for you to make progress is to re-implement
the existing scan range assignment logic in that place in the code I had
pointed you to. There is no quick fix to change the existing behavior.
The existing scan range assignment is scan-node centric. For each scan
node, we independently decide which of its scan ranges should be processed
by which host.

I believe an algorithm to achieve your goal would look completely
different. You want it to be host-centric. For each host, collect the local
scan ranges of *all* scan nodes, and assign them to that host.

Does that make sense?

Alex


On Mon, Mar 19, 2018 at 1:02 PM, Philipp Krause <
philippkrause.m...@googlemail.com> wrote:

> I'd like to provide a small example for our purpose. The last post may be
> a bit confusing, so here's a very simple example in the attached pdf file.
> I hope, it's understandable. Otherwise, please give me a short feedback.
>
> Basically, I only want each data node to join all it's local blocks. Is
> there a range mapping needed or is it possible to easily join all local
> blocks (regardless of its content) since everything is already "prepared"?
> Maybe you can clarify this for me.
>
> As you can see in the example, the tables are not partitioned by ID. The
> files are manually prepared by the help of the modulo function. So I don't
> have a range like [0,10], but something like 0,5,10,15 etc.
>
> I hope, I didn't make it too complicated and confusing. I think, the
> actual idea behind this is really simple and I hope you can help me to get
> this working.
>
> Best regards and thank you very much for your time!
> Philipp
>
> Am 18.03.2018 um 17:32 schrieb Philipp Krause:
>
> Hi! At the moment the data to parquet (block) mapping is based on a simple
> modulo function: Id % #data_nodes. So with 5 data nodes all rows with Id's
> 0,5,10,... are written to Parquet_0, Id's 1,4,9 are written to Parquet_1
> etc. That's what I did manually. Since the parquet file size and the block
> size are both set to 64MB, each parquet file will result in one block when
> I transfer the parquet files to HDFS. By default, HDFS distributes the
> blocks randomly. For test purposes I transferred corresponding blocks from
> Table_A and Table_B to the same data node (Table_A - Block_X with Id's
> 0,5,10 and Table_B - Block_Y with Id's 0,5,10). In this case, they are
> transferred to data_node_0 because the modulo function (which I want to
> implement in the scheduler) returns 0 for these Id's. This is also done
> manually at the moment.
>
> 1.) DistributedPlanner: For first, upcoming tests I simply changed the
> first condition in the DistributedPlanner to true to avoid exchange nodes.
>
> 2.) The scheduler: That's the part I'm currently struggling with. For
> first tests, block replication is deactivated. I'm not sure how / where to
> implement the modulo function for scan range to host mapping. Without the
> modulo function, I had to implement a hard coded mapping (something like
> "range" 0-0, 5-5, 10-10 -> Data_node_0 etc.). Is that correct? Instead I
> would like to use a slightly more flexible solution by the help of this
> modulo function for the host mapping.
>
> I would be really grateful if you could give me a hint for the scheduling
> implementation. I try to go deeper through the code meanwhile.
>
> Best regards and thank you in advance
> Philipp
>
>
> Am 14.03.2018 um 08:06 schrieb Philipp Krause:
>
> Thank you very much for these information! I'll try to implement these two
> steps and post some updates within the next days!
>
> Best regards
> Philipp
>
> 2018-03-13 5:38 GMT+01:00 Alexander Behm :
>
>> Cool that you working on a research project with Impala!
>>
>> Properly adding such a feature to Impala is a substantial effort, but
>> hacking the code for an experiment or two seems doable.
>>
>> I think you will need to modify two things: (1) the planner to not add
>> exchange nodes, and (2) the scheduler to assign the co-located scan ranges
>> to the same host.
>>
>> Here are a few starting points in the code:
>>
>> 1) DistributedPlanner
>> https://github.com/apache/impala/blob/master/fe/src/main/
>> java/org/apache/impala/planner/DistributedPlanner.java#L318
>>
>> The first condition handles the case where no exchange nodes need to be
>> added because the join inputs are already suitably partitioned.
>> You could hack the code to always go into that codepath, so no exchanges
>> are added.
>>
>> 2) The scheduler
>> https://github.com/apache/impala/blob/master/be/src/scheduli
>> ng/scheduler.cc#L226
>>
>> You'll need to dig through and understand that code so that you can make
>> the 

Re: Local join instead of data exchange - co-located blocks

2018-03-19 Thread Philipp Krause
I'd like to provide a small example for our purpose. The last post may 
be a bit confusing, so here's a very simple example in the attached pdf 
file. I hope, it's understandable. Otherwise, please give me a short 
feedback.


Basically, I only want each data node to join all it's local blocks. Is 
there a range mapping needed or is it possible to easily join all local 
blocks (regardless of its content) since everything is already 
"prepared"? Maybe you can clarify this for me.


As you can see in the example, the tables are not partitioned by ID. The 
files are manually prepared by the help of the modulo function. So I 
don't have a range like [0,10], but something like 0,5,10,15 etc.


I hope, I didn't make it too complicated and confusing. I think, the 
actual idea behind this is really simple and I hope you can help me to 
get this working.


Best regards and thank you very much for your time!
Philipp


Am 18.03.2018 um 17:32 schrieb Philipp Krause:


Hi! At the moment the data to parquet (block) mapping is based on a 
simple modulo function: Id % #data_nodes. So with 5 data nodes all 
rows with Id's 0,5,10,... are written to Parquet_0, Id's 1,4,9 are 
written to Parquet_1 etc. That's what I did manually. Since the 
parquet file size and the block size are both set to 64MB, each 
parquet file will result in one block when I transfer the parquet 
files to HDFS. By default, HDFS distributes the blocks randomly. For 
test purposes I transferred corresponding blocks from Table_A and 
Table_B to the same data node (Table_A - Block_X with Id's 0,5,10 and 
Table_B - Block_Y with Id's 0,5,10). In this case, they are 
transferred to data_node_0 because the modulo function (which I want 
to implement in the scheduler) returns 0 for these Id's. This is also 
done manually at the moment.


1.) DistributedPlanner: For first, upcoming tests I simply changed the 
first condition in the DistributedPlanner to true to avoid exchange nodes.


2.) The scheduler: That's the part I'm currently struggling with. For 
first tests, block replication is deactivated. I'm not sure how / 
where to implement the modulo function for scan range to host mapping. 
Without the modulo function, I had to implement a hard coded mapping 
(something like "range" 0-0, 5-5, 10-10 -> Data_node_0 etc.). Is that 
correct? Instead I would like to use a slightly more flexible solution 
by the help of this modulo function for the host mapping.


I would be really grateful if you could give me a hint for the 
scheduling implementation. I try to go deeper through the code meanwhile.


Best regards and thank you in advance
Philipp


Am 14.03.2018 um 08:06 schrieb Philipp Krause:
Thank you very much for these information! I'll try to implement 
these two steps and post some updates within the next days!


Best regards
Philipp

2018-03-13 5:38 GMT+01:00 Alexander Behm >:


Cool that you working on a research project with Impala!

Properly adding such a feature to Impala is a substantial effort,
but hacking the code for an experiment or two seems doable.

I think you will need to modify two things: (1) the planner to
not add exchange nodes, and (2) the scheduler to assign the
co-located scan ranges to the same host.

Here are a few starting points in the code:

1) DistributedPlanner

https://github.com/apache/impala/blob/master/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java#L318



The first condition handles the case where no exchange nodes need
to be added because the join inputs are already suitably partitioned.
You could hack the code to always go into that codepath, so no
exchanges are added.

2) The scheduler

https://github.com/apache/impala/blob/master/be/src/scheduling/scheduler.cc#L226



You'll need to dig through and understand that code so that you
can make the necessary changes. Change the scan range to host
mapping to your liking. The rest of the code should just work.

Cheers,

Alex


On Mon, Mar 12, 2018 at 6:55 PM, Philipp Krause
> wrote:

Thank you very much for your quick answers!
The intention behind this is to improve the execution time
and (primarily) to examine the impact of block-co-location
(research project) for this particular query (simplified):

select A.x, B.y, A.z from tableA as A inner join tableB as B
on A.id=B.id

The "real" query includes three joins and the data size is in
pb-range. Therefore several nodes (5 in the test environment
with less data) are used (without any load balancer).

Could 

Re: Local join instead of data exchange - co-located blocks

2018-03-18 Thread Philipp Krause
Hi! At the moment the data to parquet (block) mapping is based on a 
simple modulo function: Id % #data_nodes. So with 5 data nodes all rows 
with Id's 0,5,10,... are written to Parquet_0, Id's 1,4,9 are written to 
Parquet_1 etc. That's what I did manually. Since the parquet file size 
and the block size are both set to 64MB, each parquet file will result 
in one block when I transfer the parquet files to HDFS. By default, HDFS 
distributes the blocks randomly. For test purposes I transferred 
corresponding blocks from Table_A and Table_B to the same data node 
(Table_A - Block_X with Id's 0,5,10 and Table_B - Block_Y with Id's 
0,5,10). In this case, they are transferred to data_node_0 because the 
modulo function (which I want to implement in the scheduler) returns 0 
for these Id's. This is also done manually at the moment.


1.) DistributedPlanner: For first, upcoming tests I simply changed the 
first condition in the DistributedPlanner to true to avoid exchange nodes.


2.) The scheduler: That's the part I'm currently struggling with. For 
first tests, block replication is deactivated. I'm not sure how / where 
to implement the modulo function for scan range to host mapping. Without 
the modulo function, I had to implement a hard coded mapping (something 
like "range" 0-0, 5-5, 10-10 -> Data_node_0 etc.). Is that correct? 
Instead I would like to use a slightly more flexible solution by the 
help of this modulo function for the host mapping.


I would be really grateful if you could give me a hint for the 
scheduling implementation. I try to go deeper through the code meanwhile.


Best regards and thank you in advance
Philipp


Am 14.03.2018 um 08:06 schrieb Philipp Krause:
Thank you very much for these information! I'll try to implement these 
two steps and post some updates within the next days!


Best regards
Philipp

2018-03-13 5:38 GMT+01:00 Alexander Behm >:


Cool that you working on a research project with Impala!

Properly adding such a feature to Impala is a substantial effort,
but hacking the code for an experiment or two seems doable.

I think you will need to modify two things: (1) the planner to not
add exchange nodes, and (2) the scheduler to assign the co-located
scan ranges to the same host.

Here are a few starting points in the code:

1) DistributedPlanner

https://github.com/apache/impala/blob/master/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java#L318



The first condition handles the case where no exchange nodes need
to be added because the join inputs are already suitably partitioned.
You could hack the code to always go into that codepath, so no
exchanges are added.

2) The scheduler

https://github.com/apache/impala/blob/master/be/src/scheduling/scheduler.cc#L226



You'll need to dig through and understand that code so that you
can make the necessary changes. Change the scan range to host
mapping to your liking. The rest of the code should just work.

Cheers,

Alex


On Mon, Mar 12, 2018 at 6:55 PM, Philipp Krause
> wrote:

Thank you very much for your quick answers!
The intention behind this is to improve the execution time and
(primarily) to examine the impact of block-co-location
(research project) for this particular query (simplified):

select A.x, B.y, A.z from tableA as A inner join tableB as B
on A.id=B.id

The "real" query includes three joins and the data size is in
pb-range. Therefore several nodes (5 in the test environment
with less data) are used (without any load balancer).

Could you give me some hints what code changes are required
and which files are affected? I don't know how to give Impala
the information that it should only join the local data blocks
on each node and then pass it to the "final" node which
receives all intermediate results. I hope you can help me to
get this working. That would be awesome!

Best regards
Philipp

Am 12.03.2018 um 18:38 schrieb Alexander Behm:

I suppose one exception is if your data lives only on a
single node. Then you can set num_nodes=1 and make sure to
send the query request to the impalad running on the same
data node as the target data. Then you should get a local join.

On Mon, Mar 12, 2018 at 9:30 AM, Alexander Behm
> wrote:

Such a specific block arrangement is very uncommon for
typical Impala setups, 

Re: Local join instead of data exchange - co-located blocks

2018-03-14 Thread Philipp Krause
Thank you very much for these information! I'll try to implement these two
steps and post some updates within the next days!

Best regards
Philipp

2018-03-13 5:38 GMT+01:00 Alexander Behm :

> Cool that you working on a research project with Impala!
>
> Properly adding such a feature to Impala is a substantial effort, but
> hacking the code for an experiment or two seems doable.
>
> I think you will need to modify two things: (1) the planner to not add
> exchange nodes, and (2) the scheduler to assign the co-located scan ranges
> to the same host.
>
> Here are a few starting points in the code:
>
> 1) DistributedPlanner
> https://github.com/apache/impala/blob/master/fe/src/
> main/java/org/apache/impala/planner/DistributedPlanner.java#L318
>
> The first condition handles the case where no exchange nodes need to be
> added because the join inputs are already suitably partitioned.
> You could hack the code to always go into that codepath, so no exchanges
> are added.
>
> 2) The scheduler
> https://github.com/apache/impala/blob/master/be/src/
> scheduling/scheduler.cc#L226
>
> You'll need to dig through and understand that code so that you can make
> the necessary changes. Change the scan range to host mapping to your
> liking. The rest of the code should just work.
>
> Cheers,
>
> Alex
>
>
> On Mon, Mar 12, 2018 at 6:55 PM, Philipp Krause  googlemail.com> wrote:
>
>> Thank you very much for your quick answers!
>> The intention behind this is to improve the execution time and
>> (primarily) to examine the impact of block-co-location (research project)
>> for this particular query (simplified):
>>
>> select A.x, B.y, A.z from tableA as A inner join tableB as B on A.id=B.id
>>
>> The "real" query includes three joins and the data size is in pb-range.
>> Therefore several nodes (5 in the test environment with less data) are used
>> (without any load balancer).
>>
>> Could you give me some hints what code changes are required and which
>> files are affected? I don't know how to give Impala the information that it
>> should only join the local data blocks on each node and then pass it to the
>> "final" node which receives all intermediate results. I hope you can help
>> me to get this working. That would be awesome!
>> Best regards
>> Philipp
>>
>> Am 12.03.2018 um 18:38 schrieb Alexander Behm:
>>
>> I suppose one exception is if your data lives only on a single node. Then
>> you can set num_nodes=1 and make sure to send the query request to the
>> impalad running on the same data node as the target data. Then you should
>> get a local join.
>>
>> On Mon, Mar 12, 2018 at 9:30 AM, Alexander Behm 
>> wrote:
>>
>>> Such a specific block arrangement is very uncommon for typical Impala
>>> setups, so we don't attempt to recognize and optimize this narrow case. In
>>> particular, such an arrangement tends to be short lived if you have the
>>> HDFS balancer turned on.
>>>
>>> Without making code changes, there is no way today to remove the data
>>> exchanges and make sure that the scheduler assigns scan splits to nodes in
>>> the desired way (co-located, but with possible load imbalance).
>>>
>>> In what way is the current setup unacceptable to you? Is this pre-mature
>>> optimization? If you have certain performance expectations/requirements for
>>> specific queries we might be able to help you improve those. If you want to
>>> pursue this route, please help us by posting complete query profiles.
>>>
>>> Alex
>>>
>>> On Mon, Mar 12, 2018 at 6:29 AM, Philipp Krause <
>>> philippkrause.m...@googlemail.com> wrote:
>>>
 Hello everyone!

 In order to prevent network traffic, I'd like to perform local joins on
 each node instead of exchanging the data and perform a join over the
 complete data afterwards. My query is basically a join over three three
 tables on an ID attribute. The blocks are perfectly distributed, so that
 e.g. Table A - Block 0  and Table B - Block 0  are on the same node. These
 blocks contain all data rows with an ID range [0,1]. Table A - Block 1  and
 Table B - Block 1 with an ID range [2,3] are on another node etc. So I want
 to perform a local join per node because any data exchange would be
 unneccessary (except for the last step when the final node recevieves all
 results of the other nodes). Is this possible?
 At the moment the query plan includes multiple data exchanges, although
 the blocks are already perfectly distributed (manually).
 I would be grateful for any help!

 Best regards
 Philipp Krause


>>>
>>
>>
>


Re: Local join instead of data exchange - co-located blocks

2018-03-12 Thread Alexander Behm
Cool that you working on a research project with Impala!

Properly adding such a feature to Impala is a substantial effort, but
hacking the code for an experiment or two seems doable.

I think you will need to modify two things: (1) the planner to not add
exchange nodes, and (2) the scheduler to assign the co-located scan ranges
to the same host.

Here are a few starting points in the code:

1) DistributedPlanner
https://github.com/apache/impala/blob/master/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java#L318

The first condition handles the case where no exchange nodes need to be
added because the join inputs are already suitably partitioned.
You could hack the code to always go into that codepath, so no exchanges
are added.

2) The scheduler
https://github.com/apache/impala/blob/master/be/src/scheduling/scheduler.cc#L226

You'll need to dig through and understand that code so that you can make
the necessary changes. Change the scan range to host mapping to your
liking. The rest of the code should just work.

Cheers,

Alex


On Mon, Mar 12, 2018 at 6:55 PM, Philipp Krause <
philippkrause.m...@googlemail.com> wrote:

> Thank you very much for your quick answers!
> The intention behind this is to improve the execution time and (primarily)
> to examine the impact of block-co-location (research project) for this
> particular query (simplified):
>
> select A.x, B.y, A.z from tableA as A inner join tableB as B on A.id=B.id
>
> The "real" query includes three joins and the data size is in pb-range.
> Therefore several nodes (5 in the test environment with less data) are used
> (without any load balancer).
>
> Could you give me some hints what code changes are required and which
> files are affected? I don't know how to give Impala the information that it
> should only join the local data blocks on each node and then pass it to the
> "final" node which receives all intermediate results. I hope you can help
> me to get this working. That would be awesome!
> Best regards
> Philipp
>
> Am 12.03.2018 um 18:38 schrieb Alexander Behm:
>
> I suppose one exception is if your data lives only on a single node. Then
> you can set num_nodes=1 and make sure to send the query request to the
> impalad running on the same data node as the target data. Then you should
> get a local join.
>
> On Mon, Mar 12, 2018 at 9:30 AM, Alexander Behm 
> wrote:
>
>> Such a specific block arrangement is very uncommon for typical Impala
>> setups, so we don't attempt to recognize and optimize this narrow case. In
>> particular, such an arrangement tends to be short lived if you have the
>> HDFS balancer turned on.
>>
>> Without making code changes, there is no way today to remove the data
>> exchanges and make sure that the scheduler assigns scan splits to nodes in
>> the desired way (co-located, but with possible load imbalance).
>>
>> In what way is the current setup unacceptable to you? Is this pre-mature
>> optimization? If you have certain performance expectations/requirements for
>> specific queries we might be able to help you improve those. If you want to
>> pursue this route, please help us by posting complete query profiles.
>>
>> Alex
>>
>> On Mon, Mar 12, 2018 at 6:29 AM, Philipp Krause <
>> philippkrause.m...@googlemail.com> wrote:
>>
>>> Hello everyone!
>>>
>>> In order to prevent network traffic, I'd like to perform local joins on
>>> each node instead of exchanging the data and perform a join over the
>>> complete data afterwards. My query is basically a join over three three
>>> tables on an ID attribute. The blocks are perfectly distributed, so that
>>> e.g. Table A - Block 0  and Table B - Block 0  are on the same node. These
>>> blocks contain all data rows with an ID range [0,1]. Table A - Block 1  and
>>> Table B - Block 1 with an ID range [2,3] are on another node etc. So I want
>>> to perform a local join per node because any data exchange would be
>>> unneccessary (except for the last step when the final node recevieves all
>>> results of the other nodes). Is this possible?
>>> At the moment the query plan includes multiple data exchanges, although
>>> the blocks are already perfectly distributed (manually).
>>> I would be grateful for any help!
>>>
>>> Best regards
>>> Philipp Krause
>>>
>>>
>>
>
>


Re: Local join instead of data exchange - co-located blocks

2018-03-12 Thread Philipp Krause

Thank you very much for your quick answers!
The intention behind this is to improve the execution time and 
(primarily) to examine the impact of block-co-location (research 
project) for this particular query (simplified):


select A.x, B.y, A.z from tableA as A inner join tableB as B on A.id=B.id

The "real" query includes three joins and the data size is in pb-range. 
Therefore several nodes (5 in the test environment with less data) are 
used (without any load balancer).


Could you give me some hints what code changes are required and which 
files are affected? I don't know how to give Impala the information that 
it should only join the local data blocks on each node and then pass it 
to the "final" node which receives all intermediate results. I hope you 
can help me to get this working. That would be awesome!


Best regards
Philipp

Am 12.03.2018 um 18:38 schrieb Alexander Behm:
I suppose one exception is if your data lives only on a single node. 
Then you can set num_nodes=1 and make sure to send the query request 
to the impalad running on the same data node as the target data. Then 
you should get a local join.


On Mon, Mar 12, 2018 at 9:30 AM, Alexander Behm 
> wrote:


Such a specific block arrangement is very uncommon for typical
Impala setups, so we don't attempt to recognize and optimize this
narrow case. In particular, such an arrangement tends to be short
lived if you have the HDFS balancer turned on.

Without making code changes, there is no way today to remove the
data exchanges and make sure that the scheduler assigns scan
splits to nodes in the desired way (co-located, but with possible
load imbalance).

In what way is the current setup unacceptable to you? Is this
pre-mature optimization? If you have certain performance
expectations/requirements for specific queries we might be able to
help you improve those. If you want to pursue this route, please
help us by posting complete query profiles.

Alex

On Mon, Mar 12, 2018 at 6:29 AM, Philipp Krause
> wrote:

Hello everyone!

In order to prevent network traffic, I'd like to perform local
joins on each node instead of exchanging the data and perform
a join over the complete data afterwards. My query is
basically a join over three three tables on an ID attribute.
The blocks are perfectly distributed, so that e.g. Table A -
Block 0  and Table B - Block 0  are on the same node. These
blocks contain all data rows with an ID range [0,1]. Table A -
Block 1  and Table B - Block 1 with an ID range [2,3] are on
another node etc. So I want to perform a local join per node
because any data exchange would be unneccessary (except for
the last step when the final node recevieves all results of
the other nodes). Is this possible?
At the moment the query plan includes multiple data exchanges,
although the blocks are already perfectly distributed (manually).
I would be grateful for any help!

Best regards
Philipp Krause







Re: Local join instead of data exchange - co-located blocks

2018-03-12 Thread Alexander Behm
Such a specific block arrangement is very uncommon for typical Impala
setups, so we don't attempt to recognize and optimize this narrow case. In
particular, such an arrangement tends to be short lived if you have the
HDFS balancer turned on.

Without making code changes, there is no way today to remove the data
exchanges and make sure that the scheduler assigns scan splits to nodes in
the desired way (co-located, but with possible load imbalance).

In what way is the current setup unacceptable to you? Is this pre-mature
optimization? If you have certain performance expectations/requirements for
specific queries we might be able to help you improve those. If you want to
pursue this route, please help us by posting complete query profiles.

Alex

On Mon, Mar 12, 2018 at 6:29 AM, Philipp Krause <
philippkrause.m...@googlemail.com> wrote:

> Hello everyone!
>
> In order to prevent network traffic, I'd like to perform local joins on
> each node instead of exchanging the data and perform a join over the
> complete data afterwards. My query is basically a join over three three
> tables on an ID attribute. The blocks are perfectly distributed, so that
> e.g. Table A - Block 0  and Table B - Block 0  are on the same node. These
> blocks contain all data rows with an ID range [0,1]. Table A - Block 1  and
> Table B - Block 1 with an ID range [2,3] are on another node etc. So I want
> to perform a local join per node because any data exchange would be
> unneccessary (except for the last step when the final node recevieves all
> results of the other nodes). Is this possible?
> At the moment the query plan includes multiple data exchanges, although
> the blocks are already perfectly distributed (manually).
> I would be grateful for any help!
>
> Best regards
> Philipp Krause
>
>


Local join instead of data exchange - co-located blocks

2018-03-12 Thread Philipp Krause

Hello everyone!

In order to prevent network traffic, I'd like to perform local joins on each 
node instead of exchanging the data and perform a join over the complete data 
afterwards. My query is basically a join over three three tables on an ID 
attribute. The blocks are perfectly distributed, so that e.g. Table A - Block 0 
 and Table B - Block 0  are on the same node. These blocks contain all data 
rows with an ID range [0,1]. Table A - Block 1  and Table B - Block 1 with an 
ID range [2,3] are on another node etc. So I want to perform a local join per 
node because any data exchange would be unneccessary (except for the last step 
when the final node recevieves all results of the other nodes). Is this 
possible?
At the moment the query plan includes multiple data exchanges, although the 
blocks are already perfectly distributed (manually).
I would be grateful for any help!

Best regards
Philipp Krause