Re: Impala Metadata cache limits

2018-05-03 Thread Alexander Behm
I'd recommend staying below 1GB to avoid OOMing the catalogd or impalads.
Going up to 2GB is probably ok but is definitely approaching the danger
zone. The main problem here is the JVM 2GB array limit. When serializing
the metadata we write to a stream that's backed by a byte array. If that
byte array goes beyond 2GB then the JVM will OOM and take down the process.
You can hit this limit in various ways, and it can crash the catalogd and
impalads.

This 2GB limit applies to the uncompressed thrift-serialized size of the
metadata.

On Thu, May 3, 2018 at 2:26 AM, Prahalad kothwal <kothwal...@gmail.com>
wrote:

> Thanks for your response, we are running 2.8.0 and in the process of
> upgrading to 2.11.0, we have hundreds of partitioned Impala tables .
>
> Thanks,
> Prahalad
>
> On Mon, Apr 30, 2018 at 9:35 PM, Alexander Behm <alex.b...@cloudera.com>
> wrote:
>
>> What version of Impala are you running?
>>
>> On Sun, Apr 29, 2018 at 11:48 PM, Prahalad kothwal <kothwal...@gmail.com>
>> wrote:
>>
>>> Hi,
>>>
>>> Is there a limit to amount of metadata Impala can cache or is there a
>>> recommendation from Impala community ? we were told not to have more than
>>> 1gb of metadata we have 350gb of ram on each host.
>>>
>>> Thanks,
>>> Prahalad
>>>
>>>
>>
>


Re: Impala Metadata cache limits

2018-04-30 Thread Alexander Behm
What version of Impala are you running?

On Sun, Apr 29, 2018 at 11:48 PM, Prahalad kothwal 
wrote:

> Hi,
>
> Is there a limit to amount of metadata Impala can cache or is there a
> recommendation from Impala community ? we were told not to have more than
> 1gb of metadata we have 350gb of ram on each host.
>
> Thanks,
> Prahalad
>
>


Re: Local join instead of data exchange - co-located blocks

2018-04-13 Thread Alexander Behm
Here's the foll list. It might not be minimal, but copying/overwriting
these should work.

debug/service/impalad
debug/service/libfesupport.so
debug/service/libService.a
release/service/impalad
release/service/libfesupport.so
release/service/libService.a
yarn-extras-0.1-SNAPSHOT.jar
impala-data-source-api-1.0-SNAPSHOT-sources.jar
impala-data-source-api-1.0-SNAPSHOT.jar
impala-frontend-0.1-SNAPSHOT-tests.jar
impala-frontend-0.1-SNAPSHOT.jar
libkudu_client.so.0.1.0
libstdc++.so.6.0.20
impala-no-sse.bc
impala-sse.bc
libimpalalzo.so

If you are only modifying the Java portion (like DistributedPlanner), then
only copying/replacing the *.jar files should be sufficient.

On Fri, Apr 13, 2018 at 11:00 AM, Philipp Krause <
philippkrause.m...@googlemail.com> wrote:

> Yes, I have a running (virtual) cluster. I would try to follow your way
> with the custom impala build (DistributedPlanner.java is the only modified
> file at the moment). Thank you in advance for the file list!
>
> Best regards
> Philipp
>
> Alexander Behm <alex.b...@cloudera.com> schrieb am Fr., 13. Apr. 2018,
> 18:45:
>
>> I'm not really following your installation/setup and am not an expert on
>> Cloudera Manager installation/config. If you are going to build Impala
>> anyway, it's probably easiest to test on Impala's minicluster first.
>>
>> In general, if you have a running Cloudera Managed cluster, you can
>> deploy a custom Impala build by simply overwriting the Impala existing
>> binaries and jars with the new build. If you want to go this route, I can
>> give you a full list of files you need to replace.
>>
>> On Tue, Apr 10, 2018 at 11:44 AM, Philipp Krause <philippkrause.mail@
>> googlemail.com> wrote:
>>
>>> Thank you for the explanation! Yes, I'm using HDFS. The single replica
>>> setup is only for test purposes at the moment. I think this makes it easier
>>> to gain some first results since less modifications (scheduler etc.) are
>>> neccessary.
>>> I would like to test the DistributedPlanner modification in my virtual
>>> cluster. I used a customized Vagrant script to install Impala on multiple
>>> hosts (s.attachment). It simply installs cloudera-manager-server-db,
>>> cloudera-manager-server and cloudera-manager-daemons via apt-get. What
>>> would be the simplest solution to setup my modified version? Could I simply
>>> call ./buildall.sh and change the script to sth. like this?
>>>
>>> echo "Install java..."
>>> apt-get -q -y --force-yes install oracle-j2sdk1.7
>>> echo "Download impala..."
>>> wget https://... where I uploaded my modified version
>>> echo "Extract impala..."
>>> tar -xvzf Impala-cdh5-trunk.tar.gz
>>> cd Impala-cdh5-trunk
>>> echo "Build impala..."
>>> ./buildall.sh
>>> echo "Start impala instances..."
>>> service cloudera-scm-server-db initdb
>>> service cloudera-scm-server-db start
>>> service cloudera-scm-server start
>>>
>>> Or is there another, maybe even easier method, to test the code? Maybe
>>> via bootstrap_development.sh / minicluster?
>>>
>>> Best regards
>>> Philipp
>>>
>>> 2018-04-05 18:39 GMT+02:00 Alexander Behm <alex.b...@cloudera.com>:
>>>
>>>> Apologies for the late response. Btw, your previous post was clear
>>>> enough to me, so no worries :)
>>>>
>>>>
>>>> On Wed, Apr 4, 2018 at 7:46 AM, Philipp Krause <philippkrause.mail@
>>>> googlemail.com> wrote:
>>>>
>>>>> Hello Alex,
>>>>>
>>>>> I think my previous post has been too long and confusing. I apologize
>>>>> for that!
>>>>>
>>>>> If replicas are completely deactivated, all scan ranges of a block are
>>>>> mapped to the one host, where the block is located on. This host is the
>>>>> "executor"/reader for all the scan ranges of this block. Is that correct?
>>>>>
>>>>
>>>> Yes, assuming you are using HDFS.
>>>>
>>>>
>>>>>
>>>>> I tried to visualize my understanding of the scan_range to host
>>>>> mapping for my use case (s. attachment). Could you please have a quick 
>>>>> look
>>>>> at it and tell me if this is correct?
>>>>>
>>>>> "The existing scan range assignment is scan-node centric. For each
>>>>> scan node, we independently decide which of its scan ranges should be
>>

Re: Local join instead of data exchange - co-located blocks

2018-04-05 Thread Alexander Behm
Apologies for the late response. Btw, your previous post was clear enough
to me, so no worries :)


On Wed, Apr 4, 2018 at 7:46 AM, Philipp Krause <
philippkrause.m...@googlemail.com> wrote:

> Hello Alex,
>
> I think my previous post has been too long and confusing. I apologize for
> that!
>
> If replicas are completely deactivated, all scan ranges of a block are
> mapped to the one host, where the block is located on. This host is the
> "executor"/reader for all the scan ranges of this block. Is that correct?
>

Yes, assuming you are using HDFS.


>
> I tried to visualize my understanding of the scan_range to host mapping
> for my use case (s. attachment). Could you please have a quick look at it
> and tell me if this is correct?
>
> "The existing scan range assignment is scan-node centric. For each scan
> node, we independently decide which of its scan ranges should be processed
> by which host."
> Without replicas, all scan ranges of a block would be assigend to the same
> host where this block is located on. Isn't everything local here, so that
> Table_A - Block_0 and Table_B - Block_0 can be joined local or are further
> steps neccessary? The condition in the DistributedPlanner you pointed to me
> is set to false (no exchange nodes).
>
> "You want it to be host-centric. For each host, collect the local scan
> ranges of *all* scan nodes, and assign them to that host."
> Wouldn't the standard setup from above work? Wouldn't I assign all (the
> same) scan ranges to each host in this case here?
>

The standard setup works only in if every block only has exactly one
replica. For our purposes, that is basically never the case (who would
store production data without replication?), so the single-replica
assumption was not clear to me.

Does your current setup (only changing the planner and not the scheduler)
produce the expected results?


>
> Thank you very much!
>
> Best regards
> Philipp
>
> 2018-03-28 21:04 GMT+02:00 Philipp Krause <philippkrause.mail@
> googlemail.com>:
>
>> Thank you for your answer and sorry for my delay!
>>
>> If my understanding is correct, the list of scan nodes consists of all
>> nodes which contain a *local* block from a table that is needed for the
>> query (Assumption: I have no replicas in my first tests). If TableA-Block0
>> is on Node_0, isn't Node_0 automatically a scan node? And wouldn't this
>> scan node always be the host for the complete scan range(s) then?
>>
>> "For each scan node, we independently decide which of its scan ranges
>> should be processed by which host."
>>
>> https://github.com/cloudera/Impala/blob/cdh5-trunk/be/src/sc
>> heduling/scheduler.cc#L532
>> // Loop over all scan ranges, select an executor for those with local
>> impalads and
>> // collect all others for later processing.
>>
>> So in this whole block, scan ranges are assigned to the closest executor
>> (=host?). But isn't the closest executor always the node the block is
>> located on (assumed impalad is installed and I have no replicas)? And isn't
>> this node always a scan node at the same time? Otherwise a thread on a
>> remote host had to read the corresponding scan range, which would be more
>> expensive. The only exception I can think of is when all threads on the
>> local node are busy. Or, if I use replicas and all other threads of my node
>> with the "original" block are busy, a thread on another node which contains
>> a replica could read a special scan range of its local block. Is my
>> understanding correct here?
>>
>> Aren't all scan ranges read locally by its scan nodes if I have impalad
>> installed on all nodes? And am I right, that the scan range is only based
>> on its length which refers to maxScanRangeLength in
>> computeScanRangeLocations?
>> https://github.com/cloudera/Impala/blob/cdh5-trunk/fe/src/ma
>> in/java/org/apache/impala/planner/HdfsScanNode.java#L721
>>
>> I hope you can help me with the scan node <-> scan range->host
>> relationship. If I have Table_A-Block_0 and Table_B_Block_0 on the same
>> node (which I want to join locally), I don't get the point of why scan
>> ranges could be assigned to another host in my scenario.
>>
>> Best regads and thank you very much!
>> Philipp Krause
>>
>> Am 21.03.2018 um 05:21 schrieb Alexander Behm:
>>
>> Thanks for following up. I think I understand your setup.
>>
>> If you want to not think about scan ranges, then you can modify
>> HdfsScanNode.computeScanRangeLocations(). For example, you could change
>> it to produce one scan range per file or pe

Re: Local join instead of data exchange - co-located blocks

2018-04-05 Thread Alexander Behm
On Wed, Mar 28, 2018 at 12:04 PM, Philipp Krause <philippkrause.mail@
googlemail.com> wrote:

> Thank you for your answer and sorry for my delay!
>
> If my understanding is correct, the list of scan nodes consists of all
> nodes which contain a *local* block from a table that is needed for the
> query (Assumption: I have no replicas in my first tests). If TableA-Block0
> is on Node_0, isn't Node_0 automatically a scan node? And wouldn't this
> scan node always be the host for the complete scan range(s) then?
>

Yes, that is correct assuming you are using HDFS. The "single replica"
assumption was not clear to me. If that's the case then your current setup
that only changes the planner (and not the scheduler) should work. Is that
not the case?


>
> "For each scan node, we independently decide which of its scan ranges
> should be processed by which host."
>
> https://github.com/cloudera/Impala/blob/cdh5-trunk/be/src/sc
> heduling/scheduler.cc#L532
> // Loop over all scan ranges, select an executor for those with local
> impalads and
> // collect all others for later processing.
>
> So in this whole block, scan ranges are assigned to the closest executor
> (=host?). But isn't the closest executor always the node the block is
> located on (assumed impalad is installed and I have no replicas)? And isn't
> this node always a scan node at the same time? Otherwise a thread on a
> remote host had to read the corresponding scan range, which would be more
> expensive. The only exception I can think of is when all threads on the
> local node are busy. Or, if I use replicas and all other threads of my node
> with the "original" block are busy, a thread on another node which contains
> a replica could read a special scan range of its local block. Is my
> understanding correct here?
>

Yes, the closest host is always the one that has a local replica (you have
a single replica). By default, Impala is stupid and will try to assign
local only. Scheduling does not consider host load.

>
> Aren't all scan ranges read locally by its scan nodes if I have impalad
> installed on all nodes? And am I right, that the scan range is only based
> on its length which refers to maxScanRangeLength in
> computeScanRangeLocations?
> https://github.com/cloudera/Impala/blob/cdh5-trunk/fe/src/ma
> in/java/org/apache/impala/planner/HdfsScanNode.java#L721
>

Yes to all questions.

>
>
> I hope you can help me with the scan node <-> scan range->host
> relationship. If I have Table_A-Block_0 and Table_B_Block_0 on the same
> node (which I want to join locally), I don't get the point of why scan
> ranges could be assigned to another host in my scenario.
>

See first response.

>
> Best regads and thank you very much!
> Philipp Krause
>
> Am 21.03.2018 um 05:21 schrieb Alexander Behm:
>
> Thanks for following up. I think I understand your setup.
>
> If you want to not think about scan ranges, then you can modify
> HdfsScanNode.computeScanRangeLocations(). For example, you could change
> it to produce one scan range per file or per HDFS block. That way you'd
> know exactly what a scan range corresponds to.
>
> I think the easiest/fastest way for you to make progress is to
> re-implement the existing scan range assignment logic in that place in the
> code I had pointed you to. There is no quick fix to change the existing
> behavior.
> The existing scan range assignment is scan-node centric. For each scan
> node, we independently decide which of its scan ranges should be processed
> by which host.
>
> I believe an algorithm to achieve your goal would look completely
> different. You want it to be host-centric. For each host, collect the local
> scan ranges of *all* scan nodes, and assign them to that host.
>
> Does that make sense?
>
> Alex
>
>
> On Mon, Mar 19, 2018 at 1:02 PM, Philipp Krause <
> philippkrause.m...@googlemail.com> wrote:
>
>> I'd like to provide a small example for our purpose. The last post may be
>> a bit confusing, so here's a very simple example in the attached pdf file.
>> I hope, it's understandable. Otherwise, please give me a short feedback.
>>
>> Basically, I only want each data node to join all it's local blocks. Is
>> there a range mapping needed or is it possible to easily join all local
>> blocks (regardless of its content) since everything is already "prepared"?
>> Maybe you can clarify this for me.
>>
>> As you can see in the example, the tables are not partitioned by ID. The
>> files are manually prepared by the help of the modulo function. So I don't
>> have a range like [0,10], but something like 0,5,10,15 etc.
>>
>> I hope, I didn't make it to

Re: Local join instead of data exchange - co-located blocks

2018-03-20 Thread Alexander Behm
Thanks for following up. I think I understand your setup.

If you want to not think about scan ranges, then you can modify
HdfsScanNode.computeScanRangeLocations(). For example, you could change it
to produce one scan range per file or per HDFS block. That way you'd know
exactly what a scan range corresponds to.

I think the easiest/fastest way for you to make progress is to re-implement
the existing scan range assignment logic in that place in the code I had
pointed you to. There is no quick fix to change the existing behavior.
The existing scan range assignment is scan-node centric. For each scan
node, we independently decide which of its scan ranges should be processed
by which host.

I believe an algorithm to achieve your goal would look completely
different. You want it to be host-centric. For each host, collect the local
scan ranges of *all* scan nodes, and assign them to that host.

Does that make sense?

Alex


On Mon, Mar 19, 2018 at 1:02 PM, Philipp Krause <
philippkrause.m...@googlemail.com> wrote:

> I'd like to provide a small example for our purpose. The last post may be
> a bit confusing, so here's a very simple example in the attached pdf file.
> I hope, it's understandable. Otherwise, please give me a short feedback.
>
> Basically, I only want each data node to join all it's local blocks. Is
> there a range mapping needed or is it possible to easily join all local
> blocks (regardless of its content) since everything is already "prepared"?
> Maybe you can clarify this for me.
>
> As you can see in the example, the tables are not partitioned by ID. The
> files are manually prepared by the help of the modulo function. So I don't
> have a range like [0,10], but something like 0,5,10,15 etc.
>
> I hope, I didn't make it too complicated and confusing. I think, the
> actual idea behind this is really simple and I hope you can help me to get
> this working.
>
> Best regards and thank you very much for your time!
> Philipp
>
> Am 18.03.2018 um 17:32 schrieb Philipp Krause:
>
> Hi! At the moment the data to parquet (block) mapping is based on a simple
> modulo function: Id % #data_nodes. So with 5 data nodes all rows with Id's
> 0,5,10,... are written to Parquet_0, Id's 1,4,9 are written to Parquet_1
> etc. That's what I did manually. Since the parquet file size and the block
> size are both set to 64MB, each parquet file will result in one block when
> I transfer the parquet files to HDFS. By default, HDFS distributes the
> blocks randomly. For test purposes I transferred corresponding blocks from
> Table_A and Table_B to the same data node (Table_A - Block_X with Id's
> 0,5,10 and Table_B - Block_Y with Id's 0,5,10). In this case, they are
> transferred to data_node_0 because the modulo function (which I want to
> implement in the scheduler) returns 0 for these Id's. This is also done
> manually at the moment.
>
> 1.) DistributedPlanner: For first, upcoming tests I simply changed the
> first condition in the DistributedPlanner to true to avoid exchange nodes.
>
> 2.) The scheduler: That's the part I'm currently struggling with. For
> first tests, block replication is deactivated. I'm not sure how / where to
> implement the modulo function for scan range to host mapping. Without the
> modulo function, I had to implement a hard coded mapping (something like
> "range" 0-0, 5-5, 10-10 -> Data_node_0 etc.). Is that correct? Instead I
> would like to use a slightly more flexible solution by the help of this
> modulo function for the host mapping.
>
> I would be really grateful if you could give me a hint for the scheduling
> implementation. I try to go deeper through the code meanwhile.
>
> Best regards and thank you in advance
> Philipp
>
>
> Am 14.03.2018 um 08:06 schrieb Philipp Krause:
>
> Thank you very much for these information! I'll try to implement these two
> steps and post some updates within the next days!
>
> Best regards
> Philipp
>
> 2018-03-13 5:38 GMT+01:00 Alexander Behm <alex.b...@cloudera.com>:
>
>> Cool that you working on a research project with Impala!
>>
>> Properly adding such a feature to Impala is a substantial effort, but
>> hacking the code for an experiment or two seems doable.
>>
>> I think you will need to modify two things: (1) the planner to not add
>> exchange nodes, and (2) the scheduler to assign the co-located scan ranges
>> to the same host.
>>
>> Here are a few starting points in the code:
>>
>> 1) DistributedPlanner
>> https://github.com/apache/impala/blob/master/fe/src/main/
>> java/org/apache/impala/planner/DistributedPlanner.java#L318
>>
>> The first condition handles the case where no exchange nodes need to be
>> added because the

Re: REFRESH partitions

2018-03-19 Thread Alexander Behm
Did you have a different option in mind that might suit your needs better?

These are your options for discovering metadata changes external to Impala:
refresh 
refresh  PARTITION (partition_spec)
invalidate metadata 
recover partitions 
invalidate metadata (don't do this)

Those commands all do different things, so it really depends on your goals.

If you want new files/partitions to be incrementally discovered by Impala,
then use refresh.



On Mon, Mar 19, 2018 at 12:49 PM, Fawze Abujaber  wrote:

> Thanks Tim and Juan,
>
> So no options other than running the refresh statement each hour or to let
> the spark job run it after writing the parquet files.
>
> On Mon, Mar 19, 2018 at 9:34 PM, Tim Armstrong 
> wrote:
>
>> Don't use the -r option to impala-shell! That option was a mistake and
>> it's removed in impala 3.0. The problem is that it does a global invalidate
>> which is expensive because it requires reloading all metadata.
>>
>> On 19 Mar. 2018 10:35, "Juan"  wrote:
>>
>>> If the table is partitioned by year, month, day, but not hour, running
>>> recover partitions is not a good idea.
>>> Recover partitions only load metadata when it discovers a new partition,
>>> for existing partitions, even if there is new data, recover partitions will
>>> ignore them. so the table metadata could be out-of-date and queries will
>>> return wrong result.
>>>
>>> If the spark job is not running very frequently, you can run refresh
>>> table to refresh a specific partition after job completion. or running it
>>> once per hour.
>>>
>>> REFRESH [db_name.]table_name [PARTITION (key_col1=val1 [, 
>>> key_col2=val2...])]
>>>
>>>
>>> On Sat, Mar 17, 2018 at 1:10 AM, Fawze Abujaber 
>>> wrote:
>>>
 Hello Guys,

 I have a parquet files that a Spark job generates, i'm defining an
 external table on these parquet files which portioned by year.month and
 day, The Spark job feeds these tables each hour.

 I have a cron job that running  each one hour and run the command:

  alter table $(table_name) recover partitions

 I'm looking for other solutions if there is by impala, like
 configuration, for example i'm thinking if i need to educate the end users
 to use -r option to refresh the table.


 Is there any other solutions for recover partitions?







>>>
>


Re: Local join instead of data exchange - co-located blocks

2018-03-12 Thread Alexander Behm
Cool that you working on a research project with Impala!

Properly adding such a feature to Impala is a substantial effort, but
hacking the code for an experiment or two seems doable.

I think you will need to modify two things: (1) the planner to not add
exchange nodes, and (2) the scheduler to assign the co-located scan ranges
to the same host.

Here are a few starting points in the code:

1) DistributedPlanner
https://github.com/apache/impala/blob/master/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java#L318

The first condition handles the case where no exchange nodes need to be
added because the join inputs are already suitably partitioned.
You could hack the code to always go into that codepath, so no exchanges
are added.

2) The scheduler
https://github.com/apache/impala/blob/master/be/src/scheduling/scheduler.cc#L226

You'll need to dig through and understand that code so that you can make
the necessary changes. Change the scan range to host mapping to your
liking. The rest of the code should just work.

Cheers,

Alex


On Mon, Mar 12, 2018 at 6:55 PM, Philipp Krause <
philippkrause.m...@googlemail.com> wrote:

> Thank you very much for your quick answers!
> The intention behind this is to improve the execution time and (primarily)
> to examine the impact of block-co-location (research project) for this
> particular query (simplified):
>
> select A.x, B.y, A.z from tableA as A inner join tableB as B on A.id=B.id
>
> The "real" query includes three joins and the data size is in pb-range.
> Therefore several nodes (5 in the test environment with less data) are used
> (without any load balancer).
>
> Could you give me some hints what code changes are required and which
> files are affected? I don't know how to give Impala the information that it
> should only join the local data blocks on each node and then pass it to the
> "final" node which receives all intermediate results. I hope you can help
> me to get this working. That would be awesome!
> Best regards
> Philipp
>
> Am 12.03.2018 um 18:38 schrieb Alexander Behm:
>
> I suppose one exception is if your data lives only on a single node. Then
> you can set num_nodes=1 and make sure to send the query request to the
> impalad running on the same data node as the target data. Then you should
> get a local join.
>
> On Mon, Mar 12, 2018 at 9:30 AM, Alexander Behm <alex.b...@cloudera.com>
> wrote:
>
>> Such a specific block arrangement is very uncommon for typical Impala
>> setups, so we don't attempt to recognize and optimize this narrow case. In
>> particular, such an arrangement tends to be short lived if you have the
>> HDFS balancer turned on.
>>
>> Without making code changes, there is no way today to remove the data
>> exchanges and make sure that the scheduler assigns scan splits to nodes in
>> the desired way (co-located, but with possible load imbalance).
>>
>> In what way is the current setup unacceptable to you? Is this pre-mature
>> optimization? If you have certain performance expectations/requirements for
>> specific queries we might be able to help you improve those. If you want to
>> pursue this route, please help us by posting complete query profiles.
>>
>> Alex
>>
>> On Mon, Mar 12, 2018 at 6:29 AM, Philipp Krause <
>> philippkrause.m...@googlemail.com> wrote:
>>
>>> Hello everyone!
>>>
>>> In order to prevent network traffic, I'd like to perform local joins on
>>> each node instead of exchanging the data and perform a join over the
>>> complete data afterwards. My query is basically a join over three three
>>> tables on an ID attribute. The blocks are perfectly distributed, so that
>>> e.g. Table A - Block 0  and Table B - Block 0  are on the same node. These
>>> blocks contain all data rows with an ID range [0,1]. Table A - Block 1  and
>>> Table B - Block 1 with an ID range [2,3] are on another node etc. So I want
>>> to perform a local join per node because any data exchange would be
>>> unneccessary (except for the last step when the final node recevieves all
>>> results of the other nodes). Is this possible?
>>> At the moment the query plan includes multiple data exchanges, although
>>> the blocks are already perfectly distributed (manually).
>>> I would be grateful for any help!
>>>
>>> Best regards
>>> Philipp Krause
>>>
>>>
>>
>
>


Re: Local join instead of data exchange - co-located blocks

2018-03-12 Thread Alexander Behm
Such a specific block arrangement is very uncommon for typical Impala
setups, so we don't attempt to recognize and optimize this narrow case. In
particular, such an arrangement tends to be short lived if you have the
HDFS balancer turned on.

Without making code changes, there is no way today to remove the data
exchanges and make sure that the scheduler assigns scan splits to nodes in
the desired way (co-located, but with possible load imbalance).

In what way is the current setup unacceptable to you? Is this pre-mature
optimization? If you have certain performance expectations/requirements for
specific queries we might be able to help you improve those. If you want to
pursue this route, please help us by posting complete query profiles.

Alex

On Mon, Mar 12, 2018 at 6:29 AM, Philipp Krause <
philippkrause.m...@googlemail.com> wrote:

> Hello everyone!
>
> In order to prevent network traffic, I'd like to perform local joins on
> each node instead of exchanging the data and perform a join over the
> complete data afterwards. My query is basically a join over three three
> tables on an ID attribute. The blocks are perfectly distributed, so that
> e.g. Table A - Block 0  and Table B - Block 0  are on the same node. These
> blocks contain all data rows with an ID range [0,1]. Table A - Block 1  and
> Table B - Block 1 with an ID range [2,3] are on another node etc. So I want
> to perform a local join per node because any data exchange would be
> unneccessary (except for the last step when the final node recevieves all
> results of the other nodes). Is this possible?
> At the moment the query plan includes multiple data exchanges, although
> the blocks are already perfectly distributed (manually).
> I would be grateful for any help!
>
> Best regards
> Philipp Krause
>
>


Re: Incremental stats and catalogd data serialization

2018-03-08 Thread Alexander Behm
Hi Miguel,

the memory requirement stems from the incremental stats needed to compute
the number of distinct values for each column incrementally.

If you set the stats manually via ALTER TABLE, then no such incremental
stats exist, so there's no memory issue.

For manually setting stats, I'd recommend setting the per-partition row
counts as well as the table-level row counts.

If you know of primary keys in your data you should be careful to set the
NDV column statistics for the primary keys as accurately as possible.

Impala uses the table-level row count statistic and the column NDVs to
heuristically detect primary keys for better join ordering, so it's
generally good to keep the table-level row count and column NDVs "in sync"
for primary key detection to work.

Hth

Alex

On Thu, Mar 8, 2018 at 7:54 AM, Miguel Figueiredo <ollliega...@gmail.com>
wrote:

> Hi Alex,
>
> Thanks for the feedback.
>
> I will the new version and the new way of computing stats when possible.
> In the meantime we are thinking of computing the stats manually. If we
> compute stats per partition and for the whole table, will we encounter the
> same memory limit?
> Should we compute stats for the whole table and disregard partitions stats?
>
> Best regards,
> Miguel
>
> On Wed, Mar 7, 2018 at 6:14 PM, Alexander Behm <alex.b...@cloudera.com>
> wrote:
>
>> Using incremental stats in your scenario is extremely dangerous and I
>> highly recommend against it. That limitation was put in place to guard
>> clusters against service downtime due to serializing huge tables and
>> hitting JVM limits like the 2GB max array size.
>>
>> Even if the catalogd and impalads stay up, having such huge metadata will
>> negatively impact the health and performance of your cluster.
>>
>> There's a blurb about this in the Impala docs, CTRL+F for "For a table
>> with a huge number of partitions"
>> https://impala.apache.org/docs/build/html/topics/impala_comp
>> ute_stats.html
>>
>> Impala caches and disseminates metadata at the table granularity. If
>> anything in a table changes, the whole updated table is sent out to all
>> coordinating impalads. Each such impalad caches the entire metadata of a
>> table including the incremental stats for all columns and all partitions.
>>
>>
>> The below is copied from a different discussion thread discussing
>> alternatives to incremental stats:
>>
>> Btw, you should also know that the following improvements in the upcoming
>> 2.12 release might make "compute stats" more palatable on your huge tables.
>> We'd love your feedback on COMPUTE STATS with TABLESAMPLE, in particular.
>>
>> COMPUTE STATS with TABLESAMPLE
>> https://issues.apache.org/jira/browse/IMPALA-5310
>>
>> COMPUTE STATS on a subset of columns
>> https://issues.apache.org/jira/browse/IMPALA-3562
>>
>> The following improvement should allow you to COMPUTE STATS less
>> frequently by extrapolating the row count of partitions that were added or
>> modified since the last COMPUTE STATS.
>> https://issues.apache.org/jira/browse/IMPALA-2373
>> https://issues.apache.org/jira/browse/IMPALA-6228
>>
>> In general, would be great to get your feedback/ideas on how to make
>> computing stats more practical for you.
>>
>>
>>
>> On Wed, Mar 7, 2018 at 8:48 AM, Miguel Figueiredo <ollliega...@gmail.com>
>> wrote:
>>
>>> Hi,
>>>
>>> I have bumped into the 200MB limit when calculating incremental stats (
>>> https://issues.apache.org/jira/browse/IMPALA-3552).
>>>
>>> I don't understand which data catalogd sends to the impalad each time
>>> the incremental stats are calculated. Does it send only the new information
>>> calculated for new partitions or all the statistics data?
>>>
>>> In my case I have 387 tables with 2550 columns. I am creating a new
>>> partition for each table every hour and calculating incremental stats for
>>> these new partitions. If catalogd is sending serialized data for the new
>>> partitions and columns, it shouldn't amount to 200MB.
>>>
>>> I would appreciate if someone can help me understand this concept or
>>> point me to some documentation.
>>>
>>> Best regards,
>>> Miguel
>>>
>>> --
>>> Miguel Figueiredo
>>> Software Developer
>>>
>>> "I'm a pretty lazy person and am prepared to work quite hard in order to
>>> avoid work."
>>> -- Martin Fowler
>>>
>>
>>
>
>
> --
> Miguel Figueiredo
> Software Developer
>
> "I'm a pretty lazy person and am prepared to work quite hard in order to
> avoid work."
> -- Martin Fowler
>


Re: Incremental stats and catalogd data serialization

2018-03-07 Thread Alexander Behm
Using incremental stats in your scenario is extremely dangerous and I
highly recommend against it. That limitation was put in place to guard
clusters against service downtime due to serializing huge tables and
hitting JVM limits like the 2GB max array size.

Even if the catalogd and impalads stay up, having such huge metadata will
negatively impact the health and performance of your cluster.

There's a blurb about this in the Impala docs, CTRL+F for "For a table with
a huge number of partitions"
https://impala.apache.org/docs/build/html/topics/impala_compute_stats.html

Impala caches and disseminates metadata at the table granularity. If
anything in a table changes, the whole updated table is sent out to all
coordinating impalads. Each such impalad caches the entire metadata of a
table including the incremental stats for all columns and all partitions.


The below is copied from a different discussion thread discussing
alternatives to incremental stats:

Btw, you should also know that the following improvements in the upcoming
2.12 release might make "compute stats" more palatable on your huge tables.
We'd love your feedback on COMPUTE STATS with TABLESAMPLE, in particular.

COMPUTE STATS with TABLESAMPLE
https://issues.apache.org/jira/browse/IMPALA-5310

COMPUTE STATS on a subset of columns
https://issues.apache.org/jira/browse/IMPALA-3562

The following improvement should allow you to COMPUTE STATS less frequently
by extrapolating the row count of partitions that were added or modified
since the last COMPUTE STATS.
https://issues.apache.org/jira/browse/IMPALA-2373
https://issues.apache.org/jira/browse/IMPALA-6228

In general, would be great to get your feedback/ideas on how to make
computing stats more practical for you.



On Wed, Mar 7, 2018 at 8:48 AM, Miguel Figueiredo 
wrote:

> Hi,
>
> I have bumped into the 200MB limit when calculating incremental stats (
> https://issues.apache.org/jira/browse/IMPALA-3552).
>
> I don't understand which data catalogd sends to the impalad each time the
> incremental stats are calculated. Does it send only the new information
> calculated for new partitions or all the statistics data?
>
> In my case I have 387 tables with 2550 columns. I am creating a new
> partition for each table every hour and calculating incremental stats for
> these new partitions. If catalogd is sending serialized data for the new
> partitions and columns, it shouldn't amount to 200MB.
>
> I would appreciate if someone can help me understand this concept or point
> me to some documentation.
>
> Best regards,
> Miguel
>
> --
> Miguel Figueiredo
> Software Developer
>
> "I'm a pretty lazy person and am prepared to work quite hard in order to
> avoid work."
> -- Martin Fowler
>


Re: Estimate peak memory VS used peak memory

2018-03-05 Thread Alexander Behm
Sounds like you either onboarded a new workload with nested types or an
existing workload with nested types somehow got broken in the upgrade.
That error message is quite accurate: Impala does not support IS [NOT] NULL
predicates on complex types, but it sounds like that same query used to
work before.

I'm happy to help figure out what happened, but I'll need the SQL of the
query and the CREATE TABLE/VIEW statements of the tables/views involved in
the query. Sounds like there might be a bug here.

On Wed, Feb 28, 2018 at 10:29 AM, Fawze Abujaber <fawz...@gmail.com> wrote:

> Hi Mostafa,
>
> I already rollback the version, so i don't know how to get the settings
> and if i can get the query profile fora finished queries in the rollback
> version.
>
> But for example after the upgrade we started to see the following error
> which stopped to see after the rollback: IS NOT NULL predicate does not
> support complex types
>
>
>- IllegalStateException: org.apache.impala.common.AnalysisException:
>IS NOT NULL predicate does not support complex types: participants IS NOT
>NULL CAUSED BY: AnalysisException: IS NOT NULL predicate does not support
>complex types: participants IS NOT NULL
>
>
>
> On Wed, Feb 28, 2018 at 7:56 PM, Mostafa Mokhtar <mmokh...@cloudera.com>
> wrote:
>
>> Can you please share the query profiles for the failures you got along
>> with the admission control setting?
>>
>> Thanks
>> Mostafa
>>
>> On Feb 28, 2018, at 9:28 AM, Fawze Abujaber <fawz...@gmail.com> wrote:
>>
>> Thanks you all for your help and advises.
>>
>> Unfortunately i rolled back the upgrade till i understand how to control
>> impala resources and tackle all the failures that i start to see after the
>> upgrade.
>>
>>
>>
>> On Fri, Feb 23, 2018 at 8:22 PM, Fawze Abujaber <fawz...@gmail.com>
>> wrote:
>>
>>> Hi Tim,
>>>
>>> My Goal is : queries that their actual memory per node exceeds more than
>>> what i setup as a default max memory node to fail, despite i have a
>>> different queries in the pool, in the same pool some business queries can
>>> be simple as select count(*) and some others can have few joins.
>>>
>>> And i think this is the right decision and such query should be
>>> optimized.
>>>
>>> And also if i'm looking in my historical queries, i can know from the
>>> max used memory per node which queries will fail, and i think this help me
>>> alot, but i need any other query to queued if it asked actual memory lower
>>> than what i setup as default max memory per node for a query.
>>>
>>> Based on the above i'm looking for the parameters that i need to
>>> configure.
>>>
>>> i don't mind how much time and how much queries will queued, in my case
>>> i don't have any impala query that running beyond 4-5 minutes and 80% of
>>> queries below 1 minute.
>>>
>>> So i don't mind to setup the queue timeout to 20 minutes and max queued
>>> to 20-30 queries per pool.
>>>
>>> I want to make sure no query will fail if it not exceeding the default
>>> memory per node that i setup.
>>>
>>> should i used only the default max memory per node alone? should i
>>> combined it with the max running queries or with the memory limit of the
>>> whole pool?
>>>
>>>
>>> On Fri, Feb 23, 2018 at 8:08 PM, Tim Armstrong <tarmstr...@cloudera.com>
>>> wrote:
>>>
>>>> I think the previous answers have been good. I wanted to add a couple
>>>> of side notes for context since I've been doing a lot of work in this area
>>>> of Impala. I could talk about this stuff for hours.
>>>>
>>>> We do have mechanisms, like spilling data to disk or reducing # of
>>>> threads, that kick in to keep queries under the mem_limit. This has existed
>>>> in some form since Impala 2.0, but Impala 2.10 included some architectural
>>>> changes to make this more robust, and we have further improvements in the
>>>> pipeline. The end goal, which we're getting much closer to, is that queries
>>>> should reliably run to completion instead of getting killed after they are
>>>> admitted.
>>>>
>>>> That support is going to enable future enhancements to memory-based
>>>> admission control to make it easier for cluster admins like yourself to
>>>> configure admission control. It is definitely tricky to pick a good value
>>>> for mem_limit 

Re: Impala Admission Control concurrent query runtime

2018-02-28 Thread Alexander Behm
Hi Paulo,

for us to help you with performance issues it's very helpful to have the
query profiles of the queries you ran. Based on those profiles, we can
hopefully identify where the time is being spent and pinpoint the issue.

Sending us the profile of single query run and the 3 profiles of the 3
concurrent queries would be a good start. You can also poke around in the
profiles yourself and maybe you'll find something.

Alex

On Wed, Feb 28, 2018 at 11:37 AM, Roberto Cerioni, Paulo <
probertoceri...@rbbn.com> wrote:

> Hello,
>
>
> I've done some tests with concurrent queries running in the same queue and
> it seems the runtime always receive a penalty compared to a single query
> running, even if we have plenty of resources available to run multiple
> queries concurrently in that queue. In addition, I tried that by running
> the same query concurrently in different queues and the runtime received a
> penalty as well. For instance, if a query runtime running alone in the
> cluster is, let's say, 10 seconds, it seems that I can only achieve that
> performance when no other query is running in the cluster, otherwise there
> is a penalty of around 50% in runtime.
>
>
> Other behavior I noticed was that the result of queries is being released
> in "batches" according with the concurrency I have set (that might even be
> the reason for the aforementioned issue). For example, if I run a single
> query, I get the answer in 1 second, but if I run 2 of that same query
> concurrently the results of* both* queries will come after 1.5s. If I run
> 5 of the same query concurrently, then the result of all 5 queries is going
> to come back at the same time after 3 seconds, and so on. The penalty in
> runtime always increases with the number of queries I run concurrently,
> because the first issued query apparently has to wait the whole "batch" to
> execute.
>
>
> Could you please explain what is happening under the hood that might be
> driving this behavior? Is this supposed to work this way in the latest
> version of Impala we have or is there something in the configuration I can
> use to prevent this? Is that planned for Impala's roadmap?
>
>
> Thanks,
>
> Paulo.
>
>
>
>


Re: How to use [SHUFFLE] by default for all JOINS

2018-02-23 Thread Alexander Behm
Btw, you should also know that the following improvements in the upcoming
2.12 release might make "compute stats" more palatable on your huge tables.
We'd love your feedback on COMPUTE STATS with TABLESAMPLE, in particular.

COMPUTE STATS with TABLESAMPLE
https://issues.apache.org/jira/browse/IMPALA-5310

COMPUTE STATS on a subset of columns
https://issues.apache.org/jira/browse/IMPALA-3562

The following improvement should allow you to COMPUTE STATS less frequently
by extrapolating the row count of partitions that were added or modified
since the last COMPUTE STATS.
https://issues.apache.org/jira/browse/IMPALA-2373
https://issues.apache.org/jira/browse/IMPALA-6228

In general, would be great to get your feedback/ideas on how to make
computing stats more practical for you.


On Fri, Feb 23, 2018 at 8:23 PM, Alexander Behm <alex.b...@cloudera.com>
wrote:

> Maybe this improvement could help. It's available since Impala 2.9.
> https://issues.apache.org/jira/browse/IMPALA-5381
>
>
>
> On Fri, Feb 23, 2018 at 6:40 PM, Arya Goudarzi <gouda...@gmail.com> wrote:
>
>> Thank you Mostafa. My bad on mentioning the wrong version. We are using
>> 2.7 and not 1.7. We have upgrade in our plans and actually waiting for
>> Impala 2.12 as it has IMPALA-5058 fixes.
>>
>> On Fri, Feb 23, 2018 at 6:18 PM, Mostafa Mokhtar <mmokh...@cloudera.com>
>> wrote:
>>
>>> AFAIK there is no such flag.
>>> You are more likely to get much higher gains if you upgrade to a more
>>> recent version of Impala.
>>>
>>> https://www.slideshare.net/cloudera/performance-of-apache-impala
>>>
>>> Thanks
>>> Mostafa
>>>
>>> On Feb 23, 2018, at 6:12 PM, Arya Goudarzi <gouda...@gmail.com> wrote:
>>>
>>> Hi Team,
>>>
>>> TL;DR; I am wondering if there is a way to instruct Impala to use
>>> shuffle by default for all join queries as my research didn't end anywhere
>>> so far.
>>>
>>> We have a multi PiB cluster with hundreds of thousand of partitions. We
>>> are using Impala 1.7 with HDFS. Due to our cluster size, compute_stats, and
>>> compute_incremental_stats are not feasible for us as compute_stats seems a
>>> heavy operation on a lot of our large tables and destabilizes the cluster,
>>> and with compute_incremental_stats we hit IMPALA-2648
>>> <https://issues.apache.org/jira/browse/IMPALA-2648>.
>>>
>>> Therefore, to optimize our queries we need to add [shuffle] hint to the
>>> queries with joins, and we have seen that this improves performance 3x on
>>> simple tests because the system doesn't have to stream too much data and
>>> dump it for broadcast join.
>>>
>>> We have a large team of analysts who are pushing tons of queries to the
>>> system. It is hard to enforce policy at the moment for them to remember to
>>> use shuffle hint so it doesn't take our system down.
>>>
>>> --
>>> Cheers,
>>> -Arya
>>>
>>>
>>
>>
>> --
>> Cheers,
>> -Arya
>>
>
>


Re: How to use [SHUFFLE] by default for all JOINS

2018-02-23 Thread Alexander Behm
Maybe this improvement could help. It's available since Impala 2.9.
https://issues.apache.org/jira/browse/IMPALA-5381



On Fri, Feb 23, 2018 at 6:40 PM, Arya Goudarzi  wrote:

> Thank you Mostafa. My bad on mentioning the wrong version. We are using
> 2.7 and not 1.7. We have upgrade in our plans and actually waiting for
> Impala 2.12 as it has IMPALA-5058 fixes.
>
> On Fri, Feb 23, 2018 at 6:18 PM, Mostafa Mokhtar 
> wrote:
>
>> AFAIK there is no such flag.
>> You are more likely to get much higher gains if you upgrade to a more
>> recent version of Impala.
>>
>> https://www.slideshare.net/cloudera/performance-of-apache-impala
>>
>> Thanks
>> Mostafa
>>
>> On Feb 23, 2018, at 6:12 PM, Arya Goudarzi  wrote:
>>
>> Hi Team,
>>
>> TL;DR; I am wondering if there is a way to instruct Impala to use
>> shuffle by default for all join queries as my research didn't end anywhere
>> so far.
>>
>> We have a multi PiB cluster with hundreds of thousand of partitions. We
>> are using Impala 1.7 with HDFS. Due to our cluster size, compute_stats, and
>> compute_incremental_stats are not feasible for us as compute_stats seems a
>> heavy operation on a lot of our large tables and destabilizes the cluster,
>> and with compute_incremental_stats we hit IMPALA-2648
>> .
>>
>> Therefore, to optimize our queries we need to add [shuffle] hint to the
>> queries with joins, and we have seen that this improves performance 3x on
>> simple tests because the system doesn't have to stream too much data and
>> dump it for broadcast join.
>>
>> We have a large team of analysts who are pushing tons of queries to the
>> system. It is hard to enforce policy at the moment for them to remember to
>> use shuffle hint so it doesn't take our system down.
>>
>> --
>> Cheers,
>> -Arya
>>
>>
>
>
> --
> Cheers,
> -Arya
>


Re: Debugging Impala query that consistently hangs

2018-02-13 Thread Alexander Behm
If you are willing to share, it would be great get some details on why
compute stats failed.

On Tue, Feb 13, 2018 at 7:57 AM, Piyush Narang  wrote:

> Thanks Mostafa, that did help. I was able to compute stats on
> bi_dim_campaign and advertiser_event_rich. I updated the row counts
> manually for the arbitrage table. I do see the query completing
> successfully now. Will check in on the stats issue over the next couple of
> days for the bigger arbitrage table. The last time we ran into this we
> didn’t see any useful logs on the Hive server side. Our Hive team has
> updated Hive since then so I’ll try and try this out again / investigate it
> a bit more.
>
>
>
> -- Piyush
>
>
>
>
>
> *From: *Mostafa Mokhtar 
> *Reply-To: *"user@impala.apache.org" 
> *Date: *Monday, February 12, 2018 at 5:11 PM
> *To: *"user@impala.apache.org" 
> *Subject: *Re: Debugging Impala query that consistently hangs
>
>
>
> Hi Piyush,
>
>
>
> Please run the "alter table" commands below to set the number of rows per
> table, doing that should yield better plans that what you currently have
> and avoid building a hash table that consumes lots of memory.
>
>
>
> You can also set the number of distinct values for the columns involved in
> join, aggregations and predicates.
>
>
>
> None of this manual work is needed if "compute stats foo" is run
> successfully.
>
>
>
> alter table bi_dim_campaign set tblproperties ('numRows'='213');
>
> alter table advertiser_event_rich set tblproperties ('numRows'='177000');
>
> alter table bi_arbitrage_full set tblproperties ('bi_arbitrage_full'='
> 217000');
>
>
>
> https://www.cloudera.com/documentation/enterprise/5-10-
> x/topics/impala_perf_stats.html#perf_table_stats_manual
> 
>
> https://www.cloudera.com/documentation/enterprise/5-10-
> x/topics/impala_perf_stats.html#perf_column_stats_manual
> 
>
>
>
> P.S In tables with a large number of partitions I have seen HMS to hit
> various scalability limitations in JVM and the backing store.
>
>
>
>
>
>
>
> On Mon, Feb 12, 2018 at 6:55 PM, Tim Armstrong 
> wrote:
>
> Let us know if we can help figuring out what went wrong with compute stats.
>
>
>
> - Tim
>
>
>
> On Mon, Feb 12, 2018 at 6:07 AM, Piyush Narang 
> wrote:
>
> Got it, thanks for the explanation Tim. I’ll chase into the issue with
> compute stats for that table.
>
>
>
> -- Piyush
>
>
>
>
>
> *From: *Tim Armstrong 
> *Reply-To: *"user@impala.apache.org" 
> *Date: *Sunday, February 11, 2018 at 2:31 PM
>
>
> *To: *"user@impala.apache.org" 
> *Subject: *Re: Debugging Impala query that consistently hangs
>
>
>
> Piyush,
>
>
>
>   I can't recommend in strong enough terms that you figure out how to get
> compute stats working. You will not have a good experience with Impala
> without statistics - there's no way you will get good plans for all your
> queries.
>
>
>
> - Tim
>
>
>
> On Fri, Feb 9, 2018 at 11:25 AM, Piyush Narang 
> wrote:
>
> Thanks Tim. I had issues running compute stats on some of our tables
> (calling alter table on Hive was failing and I wasn’t able to resolve it)
> and I think this was one of them. I’ll try switching over to a shuffle join
> and see if that helps.
>
>
>
> -- Piyush
>
>
>
>
>
> *From: *Tim Armstrong 
> *Reply-To: *"user@impala.apache.org" 
> *Date: *Friday, February 9, 2018 at 12:24 PM
>
>
> *To: *"user@impala.apache.org" 
> *Subject: *Re: Debugging Impala query that consistently hangs
>
>
>
> I suspect it's busy building the hash tables in the join with id=7. If you
> drill down into the profile I suspect you'll see a bunch of time spent
> there. The top-level time counter isn't necessarily updated live for the
> time spent building the hash tables, but the fact it's using 179GB of
> memory is a big hint that it's building some big hash tables.
>
>
>
> The plan you're getting is really terrible btw. That join has > 2B rows on
> the right side and 0 rows on the left side, which is the exact opposite of
> what you what.
>
>
>
> I'd suggest running compute stats on the input tables to get a better
> 

Re: Computing stats on big partitioned parquet tables

2018-01-18 Thread Alexander Behm
The documentation has good overview of the limitations and caveats:
https://impala.apache.org/docs/build/html/topics/impala_perf_stats.html#perf_stats_incremental

On Thu, Jan 18, 2018 at 7:29 PM, Fawze Abujaber  wrote:

> Hi,
>
> I didn’t in the documentation of the incremental compute stats any
> limitations,
>
> Is it size limit or memory limit ( 200 MB)?
>
> Why should compute stats successes and incremental compute stats not?
>
> I’m upgrading my cluster at Sunday as the incremental compute stats was
> one of the incentives :(
>
> On Fri, 19 Jan 2018 at 4:13 Mostafa Mokhtar  wrote:
>
>> Hi,
>>
>> Do you mind sharing the query profile for the query that failed with OOM?
>> there should be some clues on to why the OOM is happening.
>>
>> Thanks
>> Mostafa
>>
>>
>> On Thu, Jan 18, 2018 at 5:54 PM, Thoralf Gutierrez <
>> thoralfgutier...@gmail.com> wrote:
>>
>>> Hello everybody!
>>>
>>> (I am using Impala 2.8.0, out of Cloudera Express 5.11.1)
>>>
>>> I now understand that we are _highly_ recommended to compute stats for
>>> our tables so I have decided to make sure we do.
>>>
>>> On my quest to do so, I started with a first `COMPUTE INCREMENTAL STATS
>>> my_big_partitioned_parquet_table` and ran into :
>>>
>>> > HiveServer2Error: AnalysisException: Incremental stats size estimate
>>> exceeds 200.00MB. Please try COMPUTE STATS instead.
>>>
>>> I found out that we could increase this limit, so I set
>>> inc_stats_size_limit_bytes to 1073741824 (1GB)
>>>
>>> > HiveServer2Error: AnalysisException: Incremental stats size estimate
>>> exceeds 1.00GB. Please try COMPUTE STATS instead.
>>>
>>> So I ended up trying to COMPUTE STATS for the whole table instead of
>>> incrementally, but I still hit memory limits when computing counts with my
>>> mem_limit at 34359738368 (32GB)
>>>
>>> > Process: memory limit exceeded. Limit=32.00 GB Total=48.87 GB
>>> Peak=51.97 GB
>>>
>>> 1. Am I correct to assume that even if I did not have enough memory, the
>>> query should spill to disk and just be slower instead of OOMing?
>>> 2. Any other recommendation on how else I could go about computing some
>>> stats on my big partitioned parquet table?
>>>
>>> Thanks a lot!
>>> Thoralf
>>>
>>>
>>


Re: Issues running compute incremental stats in Impala - alter not possible

2018-01-04 Thread Alexander Behm
Thanks for the update. Please let us know if you find out what happened on
the Hive side. We might be able to help.

On Thu, Jan 4, 2018 at 10:26 AM, Piyush Narang <p.nar...@criteo.com> wrote:

> Seems like my attempt to compute full stats for this table failed as well.
> Like Mostafa pointed out the bulk of the overhead was indeed in the select
> ndv(c1), … query. The query ends up spending over 5 hours there.
> Unfortunately, it seems to fail on the Hive metastore update. Digging into
> that with some folks on our end.
>
>
>
> -- Piyush
>
>
>
>
>
> *From: *Piyush Narang <p.nar...@criteo.com>
> *Reply-To: *"user@impala.apache.org" <user@impala.apache.org>
> *Date: *Wednesday, January 3, 2018 at 2:49 PM
>
> *To: *"user@impala.apache.org" <user@impala.apache.org>
> *Subject: *Re: Issues running compute incremental stats in Impala - alter
> not possible
>
>
>
> Thanks for pointing this out. Kicked off a run of this. Shall get back
> with breakdowns and how it goes.
>
>
>
> -- Piyush
>
>
>
>
>
> *From: *Mostafa Mokhtar <mmokh...@cloudera.com>
> *Reply-To: *"user@impala.apache.org" <user@impala.apache.org>
> *Date: *Wednesday, January 3, 2018 at 2:13 PM
> *To: *"user@impala.apache.org" <user@impala.apache.org>
> *Subject: *Re: Issues running compute incremental stats in Impala - alter
> not possible
>
>
>
> When you looked at the Web-UI while compute stats was running what did you
> find?
>
>
>
> In general compute stats has 3 phases
>
>1. Get row count per partition using something like select count(*),
>partition_id from foo group by partition_id
>2. SELECT NDV(C1), Max size (C1), Avg size (C1), ... NDV(CN), Max size
>(CN), Avg size (CN) from foo
>3. Persist the captured stats in Hive Meta store
>
>
>
> For steps #1 and #2 you should be able to get a good read on progress
> using the "Scan Progress" column.
>
> Step 3 if you tail /var/log/catalogd/catalogd.INFO you should see the
> progress there.
>
>
>
> And if the table is large in terms of on disk size I expect steps #2 to
> dominate the time.
>
>
>
> If the operation is CPU not IO bound increasing mt_dop should give you
> good speedup, yet I don't recommend a value greater than 16.
>
>
>
>
>
> On Wed, Jan 3, 2018 at 11:03 AM, Piyush Narang <p.nar...@criteo.com>
> wrote:
>
> Thanks Alex and Mostafa. I tried running compute stats full a couple of
> weeks back on this table and it was still going 4 hours later (and I didn’t
> see any progress indication on the Impala web UI). I’ll try and hunt
> through the Hive metastore log files and see if I can find anything.
>
>
>
> Is there something specific you’re looking for in the show create table
> output? I can dump that here (rather than the full table definition and
> details which is pretty verbose and I might need to check if it’s ok to
> share externally).
>
>
>
> -- Piyush
>
>
>
>
>
> *From: *Mostafa Mokhtar <mmokh...@cloudera.com>
> *Reply-To: *"user@impala.apache.org" <user@impala.apache.org>
> *Date: *Wednesday, January 3, 2018 at 1:48 PM
> *To: *"user@impala.apache.org" <user@impala.apache.org>
> *Subject: *Re: Issues running compute incremental stats in Impala - alter
> not possible
>
>
>
> Also check the Hive Metastore log files.
>
>
>
> In general if the table has a large number of partitions incremental stats
> will have very large overhead in terms of metadata.
>
>
>
> I would recommend running "compute stats bi_ full" then manually set the
> row count for newly added partitions whenever possible.
>
>
>
> On Wed, Jan 3, 2018 at 10:36 AM, Alexander Behm <alex.b...@cloudera.com>
> wrote:
>
> Thanks for the report. I have not seen this issue. Looks like the alter
> RPC is rejected by the Hive Metastore. Maybe looking into the
> Hive/Metastore logs would help.
>
>
>
> The SHOW CREATE TABLE output might also help us debug.
>
>
>
> On Wed, Jan 3, 2018 at 10:28 AM, Piyush Narang <p.nar...@criteo.com>
> wrote:
>
> Hi folks,
>
>
>
> I’m running into some issues when I try to compute incremental stats in
> Impala that I was hoping someone would be able to help with. I’m able to
> ‘compute stats’ in Impala on my smaller tables just fine. When I try
> computing stats incrementally for one of my larger tables, I seem to be
> running into this error:
>
> > compute incremental stats bi_ full partition (param1=0,day='2017-10-04',
> hour=00,host_platform='EU');
>
> Query: compute incremental stats bi_full partition
> (param1=0,day='2017-10-04',hour=00,host_platform='EU')
>
> WARNINGS: ImpalaRuntimeException: Error making 'alter_partitions' RPC to
> Hive Metastore:
>
> CAUSED BY: InvalidOperationException: alter is not possible
>
>
>
> Looking at impalad.INFO and catalogd.INFO I don’t see any additional
> details. I verified that I’m the owner of the tables in HDFS.
>
>
>
> Has anyone run into this issue in the past? Any workarounds?
>
>
>
> Thanks,
>
>
>
> -- Piyush
>
>
>
>
>
>
>
>
>


Re: Reserving standard SQL keywords next Impala release (IMPALA-3916)

2017-12-12 Thread Alexander Behm
Of course there's always the option of committing to master and then
reverting it in point-release branches, but that means we'll need to not
forget to do that.

On Tue, Dec 12, 2017 at 11:03 AM, Alexander Behm <alex.b...@cloudera.com>
wrote:

> I meant doing it in a point release.
>
> On Tue, Dec 12, 2017 at 11:02 AM, Dimitris Tsirogiannis <
> dtsirogian...@cloudera.com> wrote:
>
>> I think this is a good idea. Maybe we should do it in the next major
>> release (v3) instead of a point release, unless that's what you meant.
>>
>> Dimitris
>>
>> On Tue, Dec 12, 2017 at 10:57 AM, Alexander Behm <alex.b...@cloudera.com>
>> wrote:
>>
>>> Reserving standard SQL keywords seems like a reasonable thing to do, but
>>> it
>>> is an incompatible change. I think it should be ok to include the change
>>> in
>>> the next Impala release (whatever comes after 2.11), but wanted to hear
>>> other opinions.
>>>
>>> See:
>>> https://issues.apache.org/jira/browse/IMPALA-3916
>>>
>>
>>
>


Reserving standard SQL keywords next Impala release (IMPALA-3916)

2017-12-12 Thread Alexander Behm
Reserving standard SQL keywords seems like a reasonable thing to do, but it
is an incompatible change. I think it should be ok to include the change in
the next Impala release (whatever comes after 2.11), but wanted to hear
other opinions.

See:
https://issues.apache.org/jira/browse/IMPALA-3916


Re: Any plans for approximate topN query?

2017-11-28 Thread Alexander Behm
Agree that the techniques (approximation and sampling) are different and
complementary.

Our current user base tends to require exact query responses, so this is a
direction we have not seriously explored.

You are certainly welcome to flesh out your ideas in more detail and
propose/make a contribution! Perhaps other members of the community agree
with you and are willing to hep.


On Tue, Nov 28, 2017 at 6:24 PM, Jason Heo  wrote:

> Hi, Jeszy
>
> Thank you for your reply.
>
> My understanding is that you're mentioning sampling.
>
> Although both topN and sampling are an approximate technique for making
> queries run faster, I think they are difference concept.
>
> Using topN, by returning only N aggregated item on each node, we can
> eliminate expensive shuffle operation whereas sampling can reduce amount of
> input data.
>
> topN can be used without sampling, and sampling can be used without topN,
> and they can be used at the same time.
>
> My experiment on Druid 0.10.0 over my Dataset shows that "topN without
> sampling" is 100 times faster than GroupBy & OrderBy, and "topN with
> sampling" is 200 times after than GroupBy & OrderBy.
>
> Currently not many of Distributed SQL Engine support topN, by implementing
> topN Impala could be adopted by many types of analytic systems.
>
> Thanks.
>
> Regards,
>
> Jason
>
>
> 2017-11-28 23:19 GMT+09:00 Jeszy :
>
>> Hello Jason,
>>
>> IMPALA-5300 (https://issues.apache.org/jira/browse/IMPALA-5300) is in
>> the works, and I think it fits your use case. Can you take a look?
>>
>> Thanks!
>>
>> On 28 November 2017 at 15:11, Jason Heo  wrote:
>> > Hi,
>> >
>> > I'm wondering impala team has any plans for approximate topN for single
>> > dimension.
>> >
>> > My Web analytic system mostly serves top n urls. Such a "GROUP BY url
>> ORDER
>> > BY pageview LIMIT n" is slow especially for high-cardinality field.
>> > Approximate topN can be used instead of GroupBy for single dimension
>> with
>> > extremely lower latency.
>> >
>> > Elastisearch, Druid, and Clickhouse already provide this feature.
>> >
>> > It would be great if I can use it on Druid.
>> >
>> > Thanks.
>> >
>> > Regards,
>> >
>> > Jason
>>
>
>