Hi Ashutosh,

Please find attached pig query code, sample input and output. Input is lzo
compressed and is loaded using LzoProtoBufPigLoaders (elephantbird) .
I am using pig-0.8.0.jar.

thanks and regards,
Vijaya Bhaskar Peddinti

On Fri, Nov 18, 2011 at 10:28 PM, Ashutosh Chauhan <[email protected]>wrote:

> Can you provide script, sample data and logs. That may help to proceed with
> debugging.
>
> Ashutosh
> On Fri, Nov 18, 2011 at 07:26, vijaya bhaskar peddinti <
> [email protected]> wrote:
>
> > Dear All,
> >
> > In one of the PoCs, I have to export results generated in a Pig script to
> > mysql. For this I am using DBStorage. While there are no errors during
> the
> > execution and output logs shows the exact number of records written but
> > when i check in DB nothing is written into it. On debugging, I understood
> > the Prepared Statement is becoming NULL in CommitTask method.
> >
> > Please suggest on how this can be resolved or any help in proceeding
> > further would be of great help.
> >
> > thanks and regards,
> > Vijaya Bhaskar Peddinti
> >
>
InfluenceUserID | score
-----------------------
U1|172
U2|116
U3|20 
package poc.sna.pig.service;

import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;


import org.apache.pig.ExecType;
import org.apache.pig.PigServer;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.executionengine.ExecJob;
import org.apache.pig.test.MiniCluster;

import poc.config.AppProperties;

public class PigService{
	public boolean loadCalculateAndStore(String filePath) {
		try {
			PigServer pigServer = new PigServer("mapreduce");
			pigServer.getPigContext().getProperties()
					.setProperty(
							"io.compression.codecs",
							"org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.BZip2Codec,com.hadoop.compression.lzo.LzoCodec,com.hadoop.compression.lzo.LzopCodec");
			pigServer.setJobName("Score Calculation");
			pigServer.registerJar("./score-1.0.jar");
			runMyQuery(pigServer, filePath);
		} catch (ExecException e) {
			e.printStackTrace();
		} catch (IOException e) {
			e.printStackTrace();
		}
		return false;
	}

	private void runMyQuery(PigServer pigServer, String inputFile)
			throws IOException {

		pigServer.setBatchOn();
		String insertQuery = "INSERT INTO influencescore (influenceuserid, finalscore) values (?,?)";
        String dbStore = "org.apache.pig.piggybank.storage.DBStorage('" + AppProperties.get("db.driver")                                                                                                                       
            + "', '" + AppProperties.get("db.url") + "','" + AppProperties.get("db.user")+ "', '','" + insertQuery + "');";

		pigServer.registerQuery("source = load '"
						+ inputFile
						+ "' USING com.twitter.elephantbird.pig.load.LzoProtobufBlockPigLoader('com.infosys.poc.sna.beans.FbPostProtos.FbPost')"
						+ " as (postid: chararray, userid: chararray,username: chararray,likes: int,comments: int,score: int,message: chararray, messagetype: chararray);");
		pigServer.registerQuery("subsource = foreach source generate userid as user, (4*likes+6*comments) as postscore:int; ");		
		pigServer.registerQuery("grped = GROUP subsource BY user;");
		pigServer.registerQuery("scored = foreach grped { distnt = distinct subsource.user; generate flatten(group), SUM(subsource.postscore);};");
		pigServer.registerQuery("STORE scored INTO 'dummy' USING " + dbStore);
	        ExecJob job = pigServer.executeBatch().get(0);
	        try {
	            while(!job.hasCompleted()) Thread.sleep(1000);
	        } catch(InterruptedException ie) {
			// ignore
	        }
	}

	public PigService(){
		
	}

}
11/11/19 08:59:17 INFO pigstats.ScriptState: Pig features used in the script: GROUP_BY
11/11/19 08:59:17 INFO executionengine.HExecutionEngine: pig.usenewlogicalplan is set to true. New logical plan will be used.
11/11/19 08:59:17 INFO rules.ColumnPruneVisitor: Columns pruned for source: $0, $2, $5, $6, $7
11/11/19 08:59:17 ERROR storage.MyDBStorage: In Check Specs: 
11/11/19 08:59:18 INFO executionengine.HExecutionEngine: (Name: scored: Store(hdfs://localhost/user/training/output11321673356291:org.apache.pig.piggybank.storage.MyDBStorage('com.mysql.jdbc.Driver','jdbc:mysql://localhost:3306/sna','root','','INSERT INTO influencescore (influenceuserid, finalscore) values (?,?)')) - scope-24 Operator Key: scope-24)
11/11/19 08:59:18 INFO mapReduceLayer.MRCompiler: File concatenation threshold: 100 optimistic? false
11/11/19 08:59:18 INFO mapReduceLayer.CombinerOptimizer: Choosing to move algebraic foreach to combiner
11/11/19 08:59:18 INFO mapReduceLayer.MultiQueryOptimizer: MR plan size before optimization: 1
11/11/19 08:59:18 INFO mapReduceLayer.MultiQueryOptimizer: MR plan size after optimization: 1
11/11/19 08:59:20 INFO pigstats.ScriptState: Pig script settings are added to the job
11/11/19 08:59:20 INFO mapReduceLayer.JobControlCompiler: mapred.job.reduce.markreset.buffer.percent is not set, set to default 0.3
11/11/19 08:59:30 INFO mapReduceLayer.JobControlCompiler: Setting up single store job
11/11/19 08:59:30 INFO mapReduceLayer.JobControlCompiler: BytesPerReducer=1000000000 maxReducers=999 totalInputFileSize=218
11/11/19 08:59:30 INFO mapReduceLayer.JobControlCompiler: Neither PARALLEL nor default parallelism is set for this job. Setting number of reducers to 1
11/11/19 08:59:31 INFO mapReduceLayer.MapReduceLauncher: 1 map-reduce job(s) waiting for submission.
11/11/19 08:59:31 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
11/11/19 08:59:31 INFO mapReduceLayer.MapReduceLauncher: 0% complete
11/11/19 08:59:32 ERROR storage.MyDBStorage: In Check Specs: PigLatin:DefaultJobName
11/11/19 08:59:32 INFO input.FileInputFormat: Total input paths to process : 1
11/11/19 08:59:32 INFO util.MapRedUtil: Total input paths (combined) to process : 0
11/11/19 08:59:33 INFO mapReduceLayer.MapReduceLauncher: HadoopJobId: job_201111190838_0001
11/11/19 08:59:33 INFO mapReduceLayer.MapReduceLauncher: More information at: http://localhost:50030/jobdetails.jsp?jobid=job_201111190838_0001
11/11/19 09:00:12 INFO mapReduceLayer.MapReduceLauncher: job job_201111190838_0001 has failed! Stop running all dependent jobs
11/11/19 09:00:12 INFO mapReduceLayer.MapReduceLauncher: 100% complete
11/11/19 09:00:13 WARN mapReduceLayer.Launcher: There is no log file to write to.
11/11/19 09:00:13 ERROR mapReduceLayer.Launcher: Backend error message
java.io.IOException: JDBC Error
	at org.apache.pig.piggybank.storage.MyDBStorage$MyDBOutputFormat$1.commitTask(MyDBStorage.java:236)
	at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigOutputCommitter.commitTask(PigOutputCommitter.java:210)
	at org.apache.hadoop.mapred.Task.commit(Task.java:897)
	at org.apache.hadoop.mapred.Task.done(Task.java:767)
	at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:419)
	at org.apache.hadoop.mapred.Child$4.run(Child.java:268)
	at java.security.AccessController.doPrivileged(Native Method)
	at javax.security.auth.Subject.doAs(Subject.java:396)
	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1115)
	at org.apache.hadoop.mapred.Child.main(Child.java:262)
Caused by: java.lang.NullPointerException
	at org.apache.pig.piggybank.storage.MyDBStorage$MyDBOutputFormat$1.commitTask(MyDBStorage.java:219)
	... 9 more

11/11/19 09:00:13 ERROR pigstats.PigStats: ERROR 2997: Unable to recreate exception from backed error: java.io.IOException: JDBC Error
11/11/19 09:00:13 ERROR pigstats.PigStatsUtil: 1 map reduce job(s) failed!
11/11/19 09:00:13 INFO pigstats.PigStats: Script Statistics: 

HadoopVersion	PigVersion	UserId	StartedAt	FinishedAt	Features
0.20.2-cdh3u0	0.8.0-SNAPSHOT	training	2011-11-19 08:59:20	2011-11-19 09:00:13	GROUP_BY

Failed!

Failed Jobs:
JobId	Alias	Feature	Message	Outputs
job_201111190838_0001	grped,scored,source,subsource	GROUP_BY,COMBINER	Message: Job failed! Error - NA	hdfs://localhost/user/training/output11321673356291,

Input(s):
Failed to read data from "/user/training/1321673353251"

Output(s):
Failed to produce result in "hdfs://localhost/user/training/output11321673356291"

Counters:
Total records written : 0
Total bytes written : 0
Spillable Memory Manager spill count : 0
Total bags proactively spilled: 0
Total records proactively spilled: 0

Job DAG:
job_201111190838_0001
postid|userid|message|likes|comments|messagetype|username
---------------------------------------------------------
P1|U1|sample|5|2|video|bhaskar
P2|U2|sample|15|5|photo|bhaskar
P3|U1|sample|1|3|video|bhaskar
P4|U2|sample|20|12|video|bhaskar

Reply via email to