Folks, thanks for your help. I've narrowed the problem down to compression. When I set hive.exec.compress.output=false, merges proceed as expected. When compression is on, the merge job doesn't seem to actually merge, it just spits out the input.
On Fri, Nov 19, 2010 at 10:51 AM, yongqiang he <heyongqiang...@gmail.com> wrote: > These are the parameters that control the behavior. (Try to set them > to different values if it does not work in your environment.) > > set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat; > set mapred.min.split.size.per.node=1000000000; > set mapred.min.split.size.per.rack=1000000000; > set mapred.max.split.size=1000000000; > > set hive.merge.size.per.task=1000000000; > set hive.merge.smallfiles.avgsize=1000000000; > set hive.merge.size.smallfiles.avgsize=1000000000; > set hive.exec.dynamic.partition.mode=nonstrict; > > > The output size of the second job is also controlled by the split > size, as shown in the first 4 lines. > > > On Fri, Nov 19, 2010 at 10:22 AM, Leo Alekseyev <dnqu...@gmail.com> wrote: >> I'm using Hadoop 0.20.2. Merge jobs (with static partitions) have >> worked for me in the past. Again, what's strange here is with the >> latest Hive build the merge stage appears to run, but it doesn't >> actually merge -- it's a quick map-only job that, near as I can tell, >> doesn't do anything. >> >> On Fri, Nov 19, 2010 at 6:14 AM, Dave Brondsema <dbronds...@geek.net> wrote: >>> What version of Hadoop are you on? >>> >>> On Thu, Nov 18, 2010 at 10:48 PM, Leo Alekseyev <dnqu...@gmail.com> wrote: >>>> >>>> I thought I was running Hive with those changes merged in, but to make >>>> sure, I built the latest trunk version. The behavior changed somewhat >>>> (as in, it runs 2 stages instead of 1), but it still generates the >>>> same number of files (# of files generated is equal to the number of >>>> the original mappers, so I have no idea what the second stage is >>>> actually doing). >>>> >>>> See below for query / explain query. Stage 1 runs always; Stage 3 >>>> runs if hive.merge.mapfiles=true is set, but it still generates lots >>>> of small files. >>>> >>>> The query is kind of large, but in essence it's simply >>>> insert overwrite table foo partition(bar) select [columns] from >>>> [table] tablesample(bucket 1 out of 10000 on rand()) where >>>> [conditions]. >>>> >>>> >>>> explain insert overwrite table hbase_prefilter3_us_sample partition >>>> (ds) select >>>> server_host,client_ip,time_stamp,concat(server_host,':',regexp_extract(request_url,'/[^/]+/[^/]+/([^/]+)$',1)),referrer,parse_url(referrer,'HOST'),user_agent,cookie,geoip_int(client_ip, >>>> 'COUNTRY_CODE', './GeoIP.dat'),'',ds from alogs_master >>>> TABLESAMPLE(BUCKET 1 OUT OF 10000 ON rand()) am_s where >>>> am_s.ds='2010-11-05' and am_s.request_url rlike >>>> '^/img[0-9]+/[0-9]+/[^.]+\.(png|jpg|gif|mp4|swf)$' and >>>> geoip_int(am_s.client_ip, 'COUNTRY_CODE', './GeoIP.dat')='US'; >>>> OK >>>> ABSTRACT SYNTAX TREE: >>>> (TOK_QUERY (TOK_FROM (TOK_TABREF alogs_master (TOK_TABLESAMPLE 1 >>>> 10000 (TOK_FUNCTION rand)) am_s)) (TOK_INSERT (TOK_DESTINATION >>>> (TOK_TAB hbase_prefilter3_us_sample (TOK_PARTSPEC (TOK_PARTVAL ds)))) >>>> (TOK_SELECT (TOK_SELEXPR (TOK_TABLE_OR_COL server_host)) (TOK_SELEXPR >>>> (TOK_TABLE_OR_COL client_ip)) (TOK_SELEXPR (TOK_TABLE_OR_COL >>>> time_stamp)) (TOK_SELEXPR (TOK_FUNCTION concat (TOK_TABLE_OR_COL >>>> server_host) ':' (TOK_FUNCTION regexp_extract (TOK_TABLE_OR_COL >>>> request_url) '/[^/]+/[^/]+/([^/]+)$' 1))) (TOK_SELEXPR >>>> (TOK_TABLE_OR_COL referrer)) (TOK_SELEXPR (TOK_FUNCTION parse_url >>>> (TOK_TABLE_OR_COL referrer) 'HOST')) (TOK_SELEXPR (TOK_TABLE_OR_COL >>>> user_agent)) (TOK_SELEXPR (TOK_TABLE_OR_COL cookie)) (TOK_SELEXPR >>>> (TOK_FUNCTION geoip_int (TOK_TABLE_OR_COL client_ip) 'COUNTRY_CODE' >>>> './GeoIP.dat')) (TOK_SELEXPR '') (TOK_SELEXPR (TOK_TABLE_OR_COL ds))) >>>> (TOK_WHERE (and (and (= (. (TOK_TABLE_OR_COL am_s) ds) '2010-11-05') >>>> (rlike (. (TOK_TABLE_OR_COL am_s) request_url) >>>> '^/img[0-9]+/[0-9]+/[^.]+\.(png|jpg|gif|mp4|swf)$')) (= (TOK_FUNCTION >>>> geoip_int (. (TOK_TABLE_OR_COL am_s) client_ip) 'COUNTRY_CODE' >>>> './GeoIP.dat') 'US'))))) >>>> >>>> STAGE DEPENDENCIES: >>>> Stage-1 is a root stage >>>> Stage-5 depends on stages: Stage-1 , consists of Stage-4, Stage-3 >>>> Stage-4 >>>> Stage-0 depends on stages: Stage-4, Stage-3 >>>> Stage-2 depends on stages: Stage-0 >>>> Stage-3 >>>> >>>> STAGE PLANS: >>>> Stage: Stage-1 >>>> Map Reduce >>>> Alias -> Map Operator Tree: >>>> am_s >>>> TableScan >>>> alias: am_s >>>> Filter Operator >>>> predicate: >>>> expr: (((hash(rand()) & 2147483647) % 10000) = 0) >>>> type: boolean >>>> Filter Operator >>>> predicate: >>>> expr: ((request_url rlike >>>> '^/img[0-9]+/[0-9]+/[^.]+.(png|jpg|gif|mp4|swf)$') and >>>> (GenericUDFGeoIP ( client_ip, 'COUNTRY_CODE', './GeoIP.dat' ) = 'US')) >>>> type: boolean >>>> Filter Operator >>>> predicate: >>>> expr: (((ds = '2010-11-05') and (request_url >>>> rlike '^/img[0-9]+/[0-9]+/[^.]+.(png|jpg|gif|mp4|swf)$')) and >>>> (GenericUDFGeoIP ( client_ip, 'COUNTRY_CODE', './GeoIP.dat' ) = 'US')) >>>> type: boolean >>>> Select Operator >>>> expressions: >>>> expr: server_host >>>> type: string >>>> expr: client_ip >>>> type: int >>>> expr: time_stamp >>>> type: int >>>> expr: concat(server_host, ':', >>>> regexp_extract(request_url, '/[^/]+/[^/]+/([^/]+)$', 1)) >>>> type: string >>>> expr: referrer >>>> type: string >>>> expr: parse_url(referrer, 'HOST') >>>> type: string >>>> expr: user_agent >>>> type: string >>>> expr: cookie >>>> type: string >>>> expr: GenericUDFGeoIP ( client_ip, >>>> 'COUNTRY_CODE', './GeoIP.dat' ) >>>> type: string >>>> expr: '' >>>> type: string >>>> expr: ds >>>> type: string >>>> outputColumnNames: _col0, _col1, _col2, _col3, >>>> _col4, _col5, _col6, _col7, _col8, _col9, _col10 >>>> File Output Operator >>>> compressed: true >>>> GlobalTableId: 1 >>>> table: >>>> input format: >>>> org.apache.hadoop.mapred.TextInputFormat >>>> output format: >>>> org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat >>>> serde: >>>> org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe >>>> name: hbase_prefilter3_us_sample >>>> >>>> Stage: Stage-5 >>>> Conditional Operator >>>> >>>> Stage: Stage-4 >>>> Move Operator >>>> files: >>>> hdfs directory: true >>>> destination: >>>> >>>> hdfs://namenode.imageshack.us:9000/tmp/hive-hadoop/hive_2010-11-18_17-58-36_843_6726655151866456030/-ext-10000 >>>> >>>> Stage: Stage-0 >>>> Move Operator >>>> tables: >>>> partition: >>>> ds >>>> replace: true >>>> table: >>>> input format: org.apache.hadoop.mapred.TextInputFormat >>>> output format: >>>> org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat >>>> serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe >>>> name: hbase_prefilter3_us_sample >>>> >>>> Stage: Stage-2 >>>> Stats-Aggr Operator >>>> >>>> Stage: Stage-3 >>>> Map Reduce >>>> Alias -> Map Operator Tree: >>>> >>>> hdfs://namenode.imageshack.us:9000/tmp/hive-hadoop/hive_2010-11-18_17-58-36_843_6726655151866456030/-ext-10002 >>>> File Output Operator >>>> compressed: true >>>> GlobalTableId: 0 >>>> table: >>>> input format: org.apache.hadoop.mapred.TextInputFormat >>>> output format: >>>> org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat >>>> serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe >>>> name: hbase_prefilter3_us_sample >>>> >>>> >>>> >>>> >>>> On Thu, Nov 18, 2010 at 3:44 PM, Ning Zhang <nzh...@fb.com> wrote: >>>> > I see. If you are using dynamic partitions, HIVE-1307 and HIVE-1622 need >>>> > to be there for merging to take place. HIVE-1307 was committed to trunk >>>> > on >>>> > 08/25 and HIVE-1622 was committed on 09/13. The simplest way is to update >>>> > your Hive trunk and rerun the query. If it still doesn't work maybe you >>>> > can >>>> > post your query and the result of 'explain <query>' and we can take a >>>> > look. >>>> > >>>> > Ning >>>> > >>>> > On Nov 18, 2010, at 2:57 PM, Leo Alekseyev wrote: >>>> > >>>> >> Hi Ning, >>>> >> For the dataset I'm experimenting with, the total size of the output >>>> >> is 2mb, and the files are at most a few kb in size. My >>>> >> hive.input.format was set to default HiveInputFormat; however, when I >>>> >> set it to CombineHiveInputFormat, it only made the first stage of the >>>> >> job use fewer mappers. The merge job was *still* filtered out at >>>> >> runtime. I also tried set hive.mergejob.maponly=false; that didn't >>>> >> have any effect. >>>> >> >>>> >> I am a bit at a loss what to do here. Is there a way to see what's >>>> >> going on exactly using e.g. debug log levels?.. Btw, I'm also using >>>> >> dynamic partitions; could that somehow be interfering with the merge >>>> >> job?.. >>>> >> >>>> >> I'm running a relatively fresh Hive from trunk (built maybe a month >>>> >> ago). >>>> >> >>>> >> --Leo >>>> >> >>>> >> On Thu, Nov 18, 2010 at 1:12 PM, Ning Zhang <nzh...@fb.com> wrote: >>>> >>> The settings looks good. The parameter >>>> >>> hive.merge.size.smallfiles.avgsize is used to determine at run time if >>>> >>> a >>>> >>> merge should be triggered: if the average size of the files in the >>>> >>> partition >>>> >>> is SMALLER than the parameter and there are more than 1 file, the merge >>>> >>> should be scheduled. Can you try to see if you have any big files as >>>> >>> well in >>>> >>> your resulting partition? If it is because of a very large file, you >>>> >>> can set >>>> >>> the parameter large enough. >>>> >>> >>>> >>> Another possibility is that your Hadoop installation does not support >>>> >>> CombineHiveInputFormat, which is used for the new merge job. Someone >>>> >>> reported previously merge was not successful because of this. If >>>> >>> that's the >>>> >>> case, you can turn off CombineHiveInputFormat and use the old >>>> >>> HiveInputFormat (though slower) by setting hive.mergejob.maponly=false. >>>> >>> >>>> >>> Ning >>>> >>> On Nov 17, 2010, at 6:00 PM, Leo Alekseyev wrote: >>>> >>> >>>> >>>> I have jobs that sample (or generate) a small amount of data from a >>>> >>>> large table. At the end, I get e.g. about 3000 or more files of 1kb >>>> >>>> or so. This becomes a nuisance. How can I make Hive do another pass >>>> >>>> to merge the output? I have the following settings: >>>> >>>> >>>> >>>> hive.merge.mapfiles=true >>>> >>>> hive.merge.mapredfiles=true >>>> >>>> hive.merge.size.per.task=256000000 >>>> >>>> hive.merge.size.smallfiles.avgsize=16000000 >>>> >>>> >>>> >>>> After setting hive.merge* to true, Hive started indicating "Total >>>> >>>> MapReduce jobs = 2". However, after generating the >>>> >>>> lots-of-small-files table, Hive says: >>>> >>>> Ended Job = job_201011021934_1344 >>>> >>>> Ended Job = 781771542, job is filtered out (removed at runtime). >>>> >>>> >>>> >>>> Is there a way to force the merge, or am I missing something? >>>> >>>> --Leo >>>> >>> >>>> >>> >>>> > >>>> > >>> >>> >>> >>> -- >>> Dave Brondsema >>> Software Engineer >>> Geeknet >>> >>> www.geek.net >>> >> >