Hi community,
I have the follwing pig script:
define FormatMessage com.cision.hadoop.pig.MessageFormatter();
--If you want the message to have no empty fields use this
--define FormatMessage com.cision.hadoop.pig.MessageFormatter('false');
dedupe = LOAD '/inflow/out/dedupe_out' USING
org.apache.pig.piggybank.storage.avro.AvroStorage();
rmf /inflow/out/storesearch_tmp
rmf /inflow/out/search_out
search = MAPREDUCE '/opt/mapr/pig/pig-0.10.0/contrib/hadoop-0.0.1.jar' STORE
dedupe INTO '/inflow/out/storesearch_tmp' USING
org.apache.pig.piggybank.storage.avro.AvroStorage('schema',
'{"type":"record","name":"monitor_enriched_article","fields":[
{"name":"ssotmonitorid","type":"long"}
, {"name":"article","type":"string"}
, {"name":"path","type":"string"}
, {"name":"htmlcleanedarticle","type":"string"}
, {"name":"drmfingerprint","type":"int"}
, {"name":"media_guid","type":["null","string"]}
, {"name":"outletName","type":["null","string"]}
, {"name":"outletid","type":["null","string"]}
, {"name":"mediaId","type":["null","string"]}
, {"name":"pubdate","type":"string"}
, {"name":"pubname","type":"string"}
, {"name":"headline","type":"string"}
, {"name":"sourceid","type":"string"}
, {"name":"mark","type":"string"}
, {"name":"ruleId","type":"string"}
, {"name":"publicityvalue","type":["null", "string"]}
, {"name":"arbitronCumeEstimate","type":["null","string"]}
, {"name":"audience","type":["null","string"]}
, {"name":"circulation","type":["null","string"]}
, {"name":"visitorsPerMonth","type":["null","string"]}
, {"name":"authors","type":["null", "string"]}
, {"name":"legacyContactId","type":["null", "string"]}
, {"name":"subscriptionid","type":["null", "string"]}
, {"name":"customerid","type":["null", "string"]}
, {"name":"media_type","type":["null", "string"]}
, {"name":"industries","type":["null", "string"]}
, {"name":"locations","type":["null", "string"]}
, {"name":"organizations","type":["null", "string"]}
, {"name":"people","type":["null", "string"]}
, {"name":"subject","type":["null", "string"]}
]}')
LOAD '/inflow/out/search_out' USING
org.apache.pig.piggybank.storage.SequenceFileLoader() AS (prefix: chararray,
searchResult: chararray)
`com.cision.hadoop.mapreduce.LuceneMapReduceMain
/inflow/out/storesearch_tmp /inflow/out/search_out | SearchAgents a:query
a:cust_id a:subscription_id a:tags 100
dc1-r1-n6.qwestcolo.local,dc1-r1-n5.qwestcolo.local,dc1-r2-n5.qwestcolo.local
5181`;
subscriptionIds = FILTER search BY
com.cision.hadoop.pig.filter.StartsWith(prefix, 's_');
highlights = FILTER search BY com.cision.hadoop.pig.filter.StartsWith(prefix,
'h_');
subscriptionIds_to_store = FOREACH subscriptionIds GENERATE
(long)SUBSTRING(prefix, 2, (int)StringSize(prefix) - 2) AS
ssotmonitorid
, searchResult AS ids;
highlightsSplit_to_store = FOREACH highlights GENERATE
SUBSTRING(prefix, 2, (int)StringSize(prefix) - 2) AS rowkey
, FLATTEN(STRSPLIT(searchResult, '\\|')) AS (fieldid:
chararray, text: chararray);
--STORE highlightsSplit_to_store INTO 'HighlightedSearches' USING
org.apache.pig.backend.hadoop.hbase.HBaseStorage('hs:field hs:text', '-loadKey
true -caster HBaseBinaryConverter');
joined_subscriptions = JOIN dedupe BY ssotmonitorid LEFT OUTER,
subscriptionIds_to_store BY ssotmonitorid USING 'skewed';
merged_articles = FOREACH joined_subscriptions GENERATE
dedupe::ssotmonitorid AS ssotmonitorid
, article AS article
, path AS path
, htmlcleanedarticle AS htmlcleanedarticle
, drmfingerprint AS drmfingerprint
, media_guid AS media_guid
, outletid AS outletid
, mediaId AS mediaId
, outletName AS outletName
, pubdate AS pubdate
, pubname AS pubname
, headline AS headline
, sourceid AS sourceid
, mark AS mark
, ruleId AS ruleId
, publicityvalue AS publicityvalue
, arbitronCumeEstimate AS arbitronCumeEstimate
, audience AS audience
, circulation AS circulation
, visitorsPerMonth AS visitorsPerMonth
, authors AS authors
, legacyContactId AS legacyContactId
, com.cision.hadoop.pig.common.TupleJoin('|',
com.cision.hadoop.pig.common.EliminateDuplicatesInTuple(com.cision.hadoop.pig.common.PigCombiner(STRSPLIT(subscriptionid,
'\\|'), STRSPLIT(ids, '\\|')))) AS subscriptionid
, customerid AS customerid
, media_type AS media_type
, industries AS industries
, locations AS locations
, organizations AS organizations
, people AS people
, subject AS subject;
--generate structured data set to be written to hbase
to_store_hbase = FOREACH merged_articles GENERATE
(chararray)ssotmonitorid
, industries
, locations
, organizations
, people
, subject
, htmlcleanedarticle
, outletid
, outletName
, ruleId
, publicityvalue
, arbitronCumeEstimate
, audience
, circulation
, visitorsPerMonth
, authors
, legacyContactId
, media_type
, pubdate
, subscriptionid
, customerid
, mark
, headline
, path;
--dump to_store_hbase;
STORE to_store_hbase INTO 'ItemMain' USING
org.apache.pig.backend.hadoop.hbase.HBaseStorage('a:topic a:loc a:org a:person
a:subject a:text a:oid a:oname a:ruleid a:pubval a:arb a:niel a:circ a:vpm
a:byline a:cid a:media a:pubdate a:subid a:owner s:feed a:title s:hdfile');
If I use the dump command, everything works fine. But as soon as I want to
store the stuff in hbase I get the following error:
ERROR 2017: Internal error creating job configuration.
org.apache.pig.impl.logicalLayer.FrontendException: ERROR 1002: Unable to store
alias to_store_hbase
at org.apache.pig.PigServer$Graph.registerQuery(PigServer.java:1552)
at org.apache.pig.PigServer.registerQuery(PigServer.java:540)
at
org.apache.pig.tools.grunt.GruntParser.processPig(GruntParser.java:970)
at
org.apache.pig.tools.pigscript.parser.PigScriptParser.parse(PigScriptParser.java:386)
at
org.apache.pig.tools.grunt.GruntParser.parseStopOnError(GruntParser.java:189)
at
org.apache.pig.tools.grunt.GruntParser.parseStopOnError(GruntParser.java:165)
at org.apache.pig.tools.grunt.Grunt.run(Grunt.java:69)
at org.apache.pig.Main.run(Main.java:490)
at org.apache.pig.Main.main(Main.java:111)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:601)
at org.apache.hadoop.util.RunJar.main(RunJar.java:197)
Caused by:
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobCreationException:
ERROR 2017: Internal error creating job configuration.
at
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler.getJob(JobControlCompiler.java:739)
at
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler.compile(JobControlCompiler.java:259)
at
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher.launchPig(MapReduceLauncher.java:180)
at org.apache.pig.PigServer.launchPlan(PigServer.java:1270)
at
org.apache.pig.PigServer.executeCompiledLogicalPlan(PigServer.java:1255)
at org.apache.pig.PigServer.execute(PigServer.java:1245)
at org.apache.pig.PigServer.access$400(PigServer.java:127)
at org.apache.pig.PigServer$Graph.registerQuery(PigServer.java:1547)
... 13 more
Caused by: java.lang.ClassCastException:
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach
cannot be cast to org.apa$
at
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler.getJob(JobControlCompiler.java:588)
... 20 more
Any suggestions on that?
Jonas