Re: REFRESH partitions

2018-03-19 Thread Alexander Behm
Did you have a different option in mind that might suit your needs better?

These are your options for discovering metadata changes external to Impala:
refresh 
refresh  PARTITION (partition_spec)
invalidate metadata 
recover partitions 
invalidate metadata (don't do this)

Those commands all do different things, so it really depends on your goals.

If you want new files/partitions to be incrementally discovered by Impala,
then use refresh.



On Mon, Mar 19, 2018 at 12:49 PM, Fawze Abujaber  wrote:

> Thanks Tim and Juan,
>
> So no options other than running the refresh statement each hour or to let
> the spark job run it after writing the parquet files.
>
> On Mon, Mar 19, 2018 at 9:34 PM, Tim Armstrong 
> wrote:
>
>> Don't use the -r option to impala-shell! That option was a mistake and
>> it's removed in impala 3.0. The problem is that it does a global invalidate
>> which is expensive because it requires reloading all metadata.
>>
>> On 19 Mar. 2018 10:35, "Juan"  wrote:
>>
>>> If the table is partitioned by year, month, day, but not hour, running
>>> recover partitions is not a good idea.
>>> Recover partitions only load metadata when it discovers a new partition,
>>> for existing partitions, even if there is new data, recover partitions will
>>> ignore them. so the table metadata could be out-of-date and queries will
>>> return wrong result.
>>>
>>> If the spark job is not running very frequently, you can run refresh
>>> table to refresh a specific partition after job completion. or running it
>>> once per hour.
>>>
>>> REFRESH [db_name.]table_name [PARTITION (key_col1=val1 [, 
>>> key_col2=val2...])]
>>>
>>>
>>> On Sat, Mar 17, 2018 at 1:10 AM, Fawze Abujaber 
>>> wrote:
>>>
 Hello Guys,

 I have a parquet files that a Spark job generates, i'm defining an
 external table on these parquet files which portioned by year.month and
 day, The Spark job feeds these tables each hour.

 I have a cron job that running  each one hour and run the command:

  alter table $(table_name) recover partitions

 I'm looking for other solutions if there is by impala, like
 configuration, for example i'm thinking if i need to educate the end users
 to use -r option to refresh the table.


 Is there any other solutions for recover partitions?







>>>
>


Re: Local join instead of data exchange - co-located blocks

2018-03-19 Thread Philipp Krause
I'd like to provide a small example for our purpose. The last post may 
be a bit confusing, so here's a very simple example in the attached pdf 
file. I hope, it's understandable. Otherwise, please give me a short 
feedback.


Basically, I only want each data node to join all it's local blocks. Is 
there a range mapping needed or is it possible to easily join all local 
blocks (regardless of its content) since everything is already 
"prepared"? Maybe you can clarify this for me.


As you can see in the example, the tables are not partitioned by ID. The 
files are manually prepared by the help of the modulo function. So I 
don't have a range like [0,10], but something like 0,5,10,15 etc.


I hope, I didn't make it too complicated and confusing. I think, the 
actual idea behind this is really simple and I hope you can help me to 
get this working.


Best regards and thank you very much for your time!
Philipp


Am 18.03.2018 um 17:32 schrieb Philipp Krause:


Hi! At the moment the data to parquet (block) mapping is based on a 
simple modulo function: Id % #data_nodes. So with 5 data nodes all 
rows with Id's 0,5,10,... are written to Parquet_0, Id's 1,4,9 are 
written to Parquet_1 etc. That's what I did manually. Since the 
parquet file size and the block size are both set to 64MB, each 
parquet file will result in one block when I transfer the parquet 
files to HDFS. By default, HDFS distributes the blocks randomly. For 
test purposes I transferred corresponding blocks from Table_A and 
Table_B to the same data node (Table_A - Block_X with Id's 0,5,10 and 
Table_B - Block_Y with Id's 0,5,10). In this case, they are 
transferred to data_node_0 because the modulo function (which I want 
to implement in the scheduler) returns 0 for these Id's. This is also 
done manually at the moment.


1.) DistributedPlanner: For first, upcoming tests I simply changed the 
first condition in the DistributedPlanner to true to avoid exchange nodes.


2.) The scheduler: That's the part I'm currently struggling with. For 
first tests, block replication is deactivated. I'm not sure how / 
where to implement the modulo function for scan range to host mapping. 
Without the modulo function, I had to implement a hard coded mapping 
(something like "range" 0-0, 5-5, 10-10 -> Data_node_0 etc.). Is that 
correct? Instead I would like to use a slightly more flexible solution 
by the help of this modulo function for the host mapping.


I would be really grateful if you could give me a hint for the 
scheduling implementation. I try to go deeper through the code meanwhile.


Best regards and thank you in advance
Philipp


Am 14.03.2018 um 08:06 schrieb Philipp Krause:
Thank you very much for these information! I'll try to implement 
these two steps and post some updates within the next days!


Best regards
Philipp

2018-03-13 5:38 GMT+01:00 Alexander Behm >:


Cool that you working on a research project with Impala!

Properly adding such a feature to Impala is a substantial effort,
but hacking the code for an experiment or two seems doable.

I think you will need to modify two things: (1) the planner to
not add exchange nodes, and (2) the scheduler to assign the
co-located scan ranges to the same host.

Here are a few starting points in the code:

1) DistributedPlanner

https://github.com/apache/impala/blob/master/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java#L318



The first condition handles the case where no exchange nodes need
to be added because the join inputs are already suitably partitioned.
You could hack the code to always go into that codepath, so no
exchanges are added.

2) The scheduler

https://github.com/apache/impala/blob/master/be/src/scheduling/scheduler.cc#L226



You'll need to dig through and understand that code so that you
can make the necessary changes. Change the scan range to host
mapping to your liking. The rest of the code should just work.

Cheers,

Alex


On Mon, Mar 12, 2018 at 6:55 PM, Philipp Krause
> wrote:

Thank you very much for your quick answers!
The intention behind this is to improve the execution time
and (primarily) to examine the impact of block-co-location
(research project) for this particular query (simplified):

select A.x, B.y, A.z from tableA as A inner join tableB as B
on A.id=B.id

The "real" query includes three joins and the data size is in
pb-range. Therefore several nodes (5 in the test environment
with less data) are used (without any load balancer).

Could 

Re: REFRESH partitions

2018-03-19 Thread Fawze Abujaber
Thanks Tim and Juan,

So no options other than running the refresh statement each hour or to let
the spark job run it after writing the parquet files.

On Mon, Mar 19, 2018 at 9:34 PM, Tim Armstrong 
wrote:

> Don't use the -r option to impala-shell! That option was a mistake and
> it's removed in impala 3.0. The problem is that it does a global invalidate
> which is expensive because it requires reloading all metadata.
>
> On 19 Mar. 2018 10:35, "Juan"  wrote:
>
>> If the table is partitioned by year, month, day, but not hour, running
>> recover partitions is not a good idea.
>> Recover partitions only load metadata when it discovers a new partition,
>> for existing partitions, even if there is new data, recover partitions will
>> ignore them. so the table metadata could be out-of-date and queries will
>> return wrong result.
>>
>> If the spark job is not running very frequently, you can run refresh
>> table to refresh a specific partition after job completion. or running it
>> once per hour.
>>
>> REFRESH [db_name.]table_name [PARTITION (key_col1=val1 [, key_col2=val2...])]
>>
>>
>> On Sat, Mar 17, 2018 at 1:10 AM, Fawze Abujaber 
>> wrote:
>>
>>> Hello Guys,
>>>
>>> I have a parquet files that a Spark job generates, i'm defining an
>>> external table on these parquet files which portioned by year.month and
>>> day, The Spark job feeds these tables each hour.
>>>
>>> I have a cron job that running  each one hour and run the command:
>>>
>>>  alter table $(table_name) recover partitions
>>>
>>> I'm looking for other solutions if there is by impala, like
>>> configuration, for example i'm thinking if i need to educate the end users
>>> to use -r option to refresh the table.
>>>
>>>
>>> Is there any other solutions for recover partitions?
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>


Re: REFRESH partitions

2018-03-19 Thread Tim Armstrong
Don't use the -r option to impala-shell! That option was a mistake and it's
removed in impala 3.0. The problem is that it does a global invalidate
which is expensive because it requires reloading all metadata.

On 19 Mar. 2018 10:35, "Juan"  wrote:

> If the table is partitioned by year, month, day, but not hour, running
> recover partitions is not a good idea.
> Recover partitions only load metadata when it discovers a new partition,
> for existing partitions, even if there is new data, recover partitions will
> ignore them. so the table metadata could be out-of-date and queries will
> return wrong result.
>
> If the spark job is not running very frequently, you can run refresh table
> to refresh a specific partition after job completion. or running it once
> per hour.
>
> REFRESH [db_name.]table_name [PARTITION (key_col1=val1 [, key_col2=val2...])]
>
>
> On Sat, Mar 17, 2018 at 1:10 AM, Fawze Abujaber  wrote:
>
>> Hello Guys,
>>
>> I have a parquet files that a Spark job generates, i'm defining an
>> external table on these parquet files which portioned by year.month and
>> day, The Spark job feeds these tables each hour.
>>
>> I have a cron job that running  each one hour and run the command:
>>
>>  alter table $(table_name) recover partitions
>>
>> I'm looking for other solutions if there is by impala, like
>> configuration, for example i'm thinking if i need to educate the end users
>> to use -r option to refresh the table.
>>
>>
>> Is there any other solutions for recover partitions?
>>
>>
>>
>>
>>
>>
>>
>


Re: REFRESH partitions

2018-03-19 Thread Juan
If the table is partitioned by year, month, day, but not hour, running
recover partitions is not a good idea.
Recover partitions only load metadata when it discovers a new partition,
for existing partitions, even if there is new data, recover partitions will
ignore them. so the table metadata could be out-of-date and queries will
return wrong result.

If the spark job is not running very frequently, you can run refresh table
to refresh a specific partition after job completion. or running it once
per hour.

REFRESH [db_name.]table_name [PARTITION (key_col1=val1 [, key_col2=val2...])]


On Sat, Mar 17, 2018 at 1:10 AM, Fawze Abujaber  wrote:

> Hello Guys,
>
> I have a parquet files that a Spark job generates, i'm defining an
> external table on these parquet files which portioned by year.month and
> day, The Spark job feeds these tables each hour.
>
> I have a cron job that running  each one hour and run the command:
>
>  alter table $(table_name) recover partitions
>
> I'm looking for other solutions if there is by impala, like configuration,
> for example i'm thinking if i need to educate the end users to use -r
> option to refresh the table.
>
>
> Is there any other solutions for recover partitions?
>
>
>
>
>
>
>