Folks, thanks for your help.  I've narrowed the problem down to
compression.  When I set hive.exec.compress.output=false, merges
proceed as expected.  When compression is on, the merge job doesn't
seem to actually merge, it just spits out the input.

On Fri, Nov 19, 2010 at 10:51 AM, yongqiang he <heyongqiang...@gmail.com> wrote:
> These are the parameters that control the behavior. (Try to set them
> to different values if it does not work in your environment.)
>
> set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
> set mapred.min.split.size.per.node=1000000000;
> set mapred.min.split.size.per.rack=1000000000;
> set mapred.max.split.size=1000000000;
>
> set hive.merge.size.per.task=1000000000;
> set hive.merge.smallfiles.avgsize=1000000000;
> set hive.merge.size.smallfiles.avgsize=1000000000;
> set hive.exec.dynamic.partition.mode=nonstrict;
>
>
> The output size of the second job is also controlled by the split
> size, as shown in the first 4 lines.
>
>
> On Fri, Nov 19, 2010 at 10:22 AM, Leo Alekseyev <dnqu...@gmail.com> wrote:
>> I'm using Hadoop 0.20.2.  Merge jobs (with static partitions) have
>> worked for me in the past.  Again, what's strange here is with the
>> latest Hive build the merge stage appears to run, but it doesn't
>> actually merge -- it's a quick map-only job that, near as I can tell,
>> doesn't do anything.
>>
>> On Fri, Nov 19, 2010 at 6:14 AM, Dave Brondsema <dbronds...@geek.net> wrote:
>>> What version of Hadoop are you on?
>>>
>>> On Thu, Nov 18, 2010 at 10:48 PM, Leo Alekseyev <dnqu...@gmail.com> wrote:
>>>>
>>>> I thought I was running Hive with those changes merged in, but to make
>>>> sure, I built the latest trunk version.  The behavior changed somewhat
>>>> (as in, it runs 2 stages instead of 1), but it still generates the
>>>> same number of files (# of files generated is equal to the number of
>>>> the original mappers, so I have no idea what the second stage is
>>>> actually doing).
>>>>
>>>> See below for query / explain query.  Stage 1 runs always; Stage 3
>>>> runs if hive.merge.mapfiles=true is set, but it still generates lots
>>>> of small files.
>>>>
>>>> The query is kind of large, but in essence it's simply
>>>> insert overwrite table foo partition(bar) select [columns] from
>>>> [table] tablesample(bucket 1 out of 10000 on rand()) where
>>>> [conditions].
>>>>
>>>>
>>>> explain insert overwrite table hbase_prefilter3_us_sample partition
>>>> (ds) select
>>>> server_host,client_ip,time_stamp,concat(server_host,':',regexp_extract(request_url,'/[^/]+/[^/]+/([^/]+)$',1)),referrer,parse_url(referrer,'HOST'),user_agent,cookie,geoip_int(client_ip,
>>>> 'COUNTRY_CODE',  './GeoIP.dat'),'',ds from alogs_master
>>>> TABLESAMPLE(BUCKET 1 OUT OF 10000 ON rand()) am_s where
>>>> am_s.ds='2010-11-05' and am_s.request_url rlike
>>>> '^/img[0-9]+/[0-9]+/[^.]+\.(png|jpg|gif|mp4|swf)$' and
>>>> geoip_int(am_s.client_ip, 'COUNTRY_CODE',  './GeoIP.dat')='US';
>>>> OK
>>>> ABSTRACT SYNTAX TREE:
>>>>  (TOK_QUERY (TOK_FROM (TOK_TABREF alogs_master (TOK_TABLESAMPLE 1
>>>> 10000 (TOK_FUNCTION rand)) am_s)) (TOK_INSERT (TOK_DESTINATION
>>>> (TOK_TAB hbase_prefilter3_us_sample (TOK_PARTSPEC (TOK_PARTVAL ds))))
>>>> (TOK_SELECT (TOK_SELEXPR (TOK_TABLE_OR_COL server_host)) (TOK_SELEXPR
>>>> (TOK_TABLE_OR_COL client_ip)) (TOK_SELEXPR (TOK_TABLE_OR_COL
>>>> time_stamp)) (TOK_SELEXPR (TOK_FUNCTION concat (TOK_TABLE_OR_COL
>>>> server_host) ':' (TOK_FUNCTION regexp_extract (TOK_TABLE_OR_COL
>>>> request_url) '/[^/]+/[^/]+/([^/]+)$' 1))) (TOK_SELEXPR
>>>> (TOK_TABLE_OR_COL referrer)) (TOK_SELEXPR (TOK_FUNCTION parse_url
>>>> (TOK_TABLE_OR_COL referrer) 'HOST')) (TOK_SELEXPR (TOK_TABLE_OR_COL
>>>> user_agent)) (TOK_SELEXPR (TOK_TABLE_OR_COL cookie)) (TOK_SELEXPR
>>>> (TOK_FUNCTION geoip_int (TOK_TABLE_OR_COL client_ip) 'COUNTRY_CODE'
>>>> './GeoIP.dat')) (TOK_SELEXPR '') (TOK_SELEXPR (TOK_TABLE_OR_COL ds)))
>>>> (TOK_WHERE (and (and (= (. (TOK_TABLE_OR_COL am_s) ds) '2010-11-05')
>>>> (rlike (. (TOK_TABLE_OR_COL am_s) request_url)
>>>> '^/img[0-9]+/[0-9]+/[^.]+\.(png|jpg|gif|mp4|swf)$')) (= (TOK_FUNCTION
>>>> geoip_int (. (TOK_TABLE_OR_COL am_s) client_ip) 'COUNTRY_CODE'
>>>> './GeoIP.dat') 'US')))))
>>>>
>>>> STAGE DEPENDENCIES:
>>>>  Stage-1 is a root stage
>>>>  Stage-5 depends on stages: Stage-1 , consists of Stage-4, Stage-3
>>>>  Stage-4
>>>>  Stage-0 depends on stages: Stage-4, Stage-3
>>>>  Stage-2 depends on stages: Stage-0
>>>>  Stage-3
>>>>
>>>> STAGE PLANS:
>>>>  Stage: Stage-1
>>>>    Map Reduce
>>>>      Alias -> Map Operator Tree:
>>>>        am_s
>>>>          TableScan
>>>>            alias: am_s
>>>>            Filter Operator
>>>>              predicate:
>>>>                  expr: (((hash(rand()) & 2147483647) % 10000) = 0)
>>>>                  type: boolean
>>>>              Filter Operator
>>>>                predicate:
>>>>                    expr: ((request_url rlike
>>>> '^/img[0-9]+/[0-9]+/[^.]+.(png|jpg|gif|mp4|swf)$') and
>>>> (GenericUDFGeoIP ( client_ip, 'COUNTRY_CODE', './GeoIP.dat' ) = 'US'))
>>>>                    type: boolean
>>>>                Filter Operator
>>>>                  predicate:
>>>>                      expr: (((ds = '2010-11-05') and (request_url
>>>> rlike '^/img[0-9]+/[0-9]+/[^.]+.(png|jpg|gif|mp4|swf)$')) and
>>>> (GenericUDFGeoIP ( client_ip, 'COUNTRY_CODE', './GeoIP.dat' ) = 'US'))
>>>>                      type: boolean
>>>>                  Select Operator
>>>>                    expressions:
>>>>                          expr: server_host
>>>>                          type: string
>>>>                          expr: client_ip
>>>>                          type: int
>>>>                          expr: time_stamp
>>>>                          type: int
>>>>                          expr: concat(server_host, ':',
>>>> regexp_extract(request_url, '/[^/]+/[^/]+/([^/]+)$', 1))
>>>>                          type: string
>>>>                          expr: referrer
>>>>                          type: string
>>>>                          expr: parse_url(referrer, 'HOST')
>>>>                          type: string
>>>>                          expr: user_agent
>>>>                          type: string
>>>>                          expr: cookie
>>>>                          type: string
>>>>                          expr: GenericUDFGeoIP ( client_ip,
>>>> 'COUNTRY_CODE', './GeoIP.dat' )
>>>>                          type: string
>>>>                          expr: ''
>>>>                          type: string
>>>>                          expr: ds
>>>>                          type: string
>>>>                    outputColumnNames: _col0, _col1, _col2, _col3,
>>>> _col4, _col5, _col6, _col7, _col8, _col9, _col10
>>>>                    File Output Operator
>>>>                      compressed: true
>>>>                      GlobalTableId: 1
>>>>                      table:
>>>>                          input format:
>>>> org.apache.hadoop.mapred.TextInputFormat
>>>>                          output format:
>>>> org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
>>>>                          serde:
>>>> org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
>>>>                          name: hbase_prefilter3_us_sample
>>>>
>>>>  Stage: Stage-5
>>>>    Conditional Operator
>>>>
>>>>  Stage: Stage-4
>>>>    Move Operator
>>>>      files:
>>>>          hdfs directory: true
>>>>          destination:
>>>>
>>>> hdfs://namenode.imageshack.us:9000/tmp/hive-hadoop/hive_2010-11-18_17-58-36_843_6726655151866456030/-ext-10000
>>>>
>>>>  Stage: Stage-0
>>>>    Move Operator
>>>>      tables:
>>>>          partition:
>>>>            ds
>>>>          replace: true
>>>>          table:
>>>>              input format: org.apache.hadoop.mapred.TextInputFormat
>>>>              output format:
>>>> org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
>>>>              serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
>>>>              name: hbase_prefilter3_us_sample
>>>>
>>>>  Stage: Stage-2
>>>>    Stats-Aggr Operator
>>>>
>>>>  Stage: Stage-3
>>>>    Map Reduce
>>>>      Alias -> Map Operator Tree:
>>>>
>>>>  hdfs://namenode.imageshack.us:9000/tmp/hive-hadoop/hive_2010-11-18_17-58-36_843_6726655151866456030/-ext-10002
>>>>            File Output Operator
>>>>              compressed: true
>>>>              GlobalTableId: 0
>>>>              table:
>>>>                  input format: org.apache.hadoop.mapred.TextInputFormat
>>>>                  output format:
>>>> org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
>>>>                  serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
>>>>                  name: hbase_prefilter3_us_sample
>>>>
>>>>
>>>>
>>>>
>>>> On Thu, Nov 18, 2010 at 3:44 PM, Ning Zhang <nzh...@fb.com> wrote:
>>>> > I see. If you are using dynamic partitions, HIVE-1307 and HIVE-1622 need
>>>> > to be there for merging to take place. HIVE-1307 was committed to trunk 
>>>> > on
>>>> > 08/25 and HIVE-1622 was committed on 09/13. The simplest way is to update
>>>> > your Hive trunk and rerun the query. If it still doesn't work maybe you 
>>>> > can
>>>> > post your query and the result of 'explain <query>' and we can take a 
>>>> > look.
>>>> >
>>>> > Ning
>>>> >
>>>> > On Nov 18, 2010, at 2:57 PM, Leo Alekseyev wrote:
>>>> >
>>>> >> Hi Ning,
>>>> >> For the dataset I'm experimenting with, the total size of the output
>>>> >> is 2mb, and the files are at most a few kb in size.  My
>>>> >> hive.input.format was set to default HiveInputFormat; however, when I
>>>> >> set it to CombineHiveInputFormat, it only made the first stage of the
>>>> >> job use fewer mappers.  The merge job was *still* filtered out at
>>>> >> runtime.  I also tried set hive.mergejob.maponly=false; that didn't
>>>> >> have any effect.
>>>> >>
>>>> >> I am a bit at a loss what to do here.  Is there a way to see what's
>>>> >> going on exactly using e.g. debug log levels?..  Btw, I'm also using
>>>> >> dynamic partitions; could that somehow be interfering with the merge
>>>> >> job?..
>>>> >>
>>>> >> I'm running a relatively fresh Hive from trunk (built maybe a month
>>>> >> ago).
>>>> >>
>>>> >> --Leo
>>>> >>
>>>> >> On Thu, Nov 18, 2010 at 1:12 PM, Ning Zhang <nzh...@fb.com> wrote:
>>>> >>> The settings looks good. The parameter
>>>> >>> hive.merge.size.smallfiles.avgsize is used to determine at run time if 
>>>> >>> a
>>>> >>> merge should be triggered: if the average size of the files in the 
>>>> >>> partition
>>>> >>> is SMALLER than the parameter and there are more than 1 file, the merge
>>>> >>> should be scheduled. Can you try to see if you have any big files as 
>>>> >>> well in
>>>> >>> your resulting partition? If it is because of a very large file, you 
>>>> >>> can set
>>>> >>> the parameter large enough.
>>>> >>>
>>>> >>> Another possibility is that your Hadoop installation does not support
>>>> >>> CombineHiveInputFormat, which is used for the new merge job. Someone
>>>> >>> reported previously merge was not successful because of this. If 
>>>> >>> that's the
>>>> >>> case, you can turn off CombineHiveInputFormat and use the old
>>>> >>> HiveInputFormat (though slower) by setting hive.mergejob.maponly=false.
>>>> >>>
>>>> >>> Ning
>>>> >>> On Nov 17, 2010, at 6:00 PM, Leo Alekseyev wrote:
>>>> >>>
>>>> >>>> I have jobs that sample (or generate) a small amount of data from a
>>>> >>>> large table.  At the end, I get e.g. about 3000 or more files of 1kb
>>>> >>>> or so.  This becomes a nuisance.  How can I make Hive do another pass
>>>> >>>> to merge the output?  I have the following settings:
>>>> >>>>
>>>> >>>> hive.merge.mapfiles=true
>>>> >>>> hive.merge.mapredfiles=true
>>>> >>>> hive.merge.size.per.task=256000000
>>>> >>>> hive.merge.size.smallfiles.avgsize=16000000
>>>> >>>>
>>>> >>>> After setting hive.merge* to true, Hive started indicating "Total
>>>> >>>> MapReduce jobs = 2".  However, after generating the
>>>> >>>> lots-of-small-files table, Hive says:
>>>> >>>> Ended Job = job_201011021934_1344
>>>> >>>> Ended Job = 781771542, job is filtered out (removed at runtime).
>>>> >>>>
>>>> >>>> Is there a way to force the merge, or am I missing something?
>>>> >>>> --Leo
>>>> >>>
>>>> >>>
>>>> >
>>>> >
>>>
>>>
>>>
>>> --
>>> Dave Brondsema
>>> Software Engineer
>>> Geeknet
>>>
>>> www.geek.net
>>>
>>
>

Reply via email to