Hi all. I have a question. how can we use Mapreduce Chaining Jobs with cassandra columnFamily input? I use mapreduce chaining job ang it can not fined my input column family for job1..?
Configuration conf1 = new Configuration(); Configuration conf2 = new Configuration(); Job job1 = new Job(conf1, "DewPoint"); Job job2 = new Job(conf2, "DewPoint"); job1.setJarByClass(dewpoint. class); job1.setMapperClass(KeySelectmap.class); job1.setCombinerClass(KeySelectRed.class); job1.setReducerClass(KeySelectRed.class); job1.setOutputKeyClass(Text.class); job1.setOutputValueClass(IntWritable.class); job1.setInputFormatClass(CqlPagingInputFormat.class); FileOutputFormat.setOutputPath(job1, new Path(INTERMEDIATE)); job1.setNumReduceTasks(500); ControlledJob cjob1 = new ControlledJob(conf1); cjob1.setJob(job1); job2.setJarByClass(dewpoint.class); job2.setMapperClass(dewpMapper.class); job2.setCombinerClass(dewpReducer.class); job2.setReducerClass(dewpReducer.class); job2.setMapOutputKeyClass(IntWritable.class); job2.setMapOutputValueClass(Text.class); job2.setOutputKeyClass(Text.class); job2.setOutputValueClass(IntWritable.class); job2.setInputFormatClass(KeyValueTextInputFormat.class); FileInputFormat.addInputPath(job2, new Path(INTERMEDIATE)); FileOutputFormat.setOutputPath(job2, new Path(OUTPUT_PATH)); job2.setNumReduceTasks(200); ControlledJob cjob2 = new ControlledJob(conf2); cjob1.setJob(job2); JobControl jc = new JobControl("jc"); jc.addJob(cjob1); jc.addJob(cjob2); cjob2.addDependingJob(cjob1); jc.run(); ConfigHelper.setInputRpcPort(cjob1.getJob().getConfiguration(), "9160"); ConfigHelper.setInputInitialAddress(cjob1.getJob().getConfiguration(), "localhost"); ConfigHelper.setInputColumnFamily(cjob1.getJob().getConfiguration(), KEYSPACE, COLUMN_FAMILY); ConfigHelper.setInputPartitioner(cjob1.getJob().getConfiguration(), "Murmur3Partitioner"); ConfigHelper.setInputRpcPort(cjob1.getJob().getConfiguration(), "9160"); ConfigHelper.setInputInitialAddress(cjob1.getJob().getConfiguration(), "localhost"); ConfigHelper.setInputColumnFamily(cjob1.getJob().getConfiguration(), KEYSPACE, COLUMN_FAMILY); ConfigHelper.setInputPartitioner(cjob1.getJob().getConfiguration(), "Murmur3Partitioner"); error: 13/10/09 11:27:07 INFO util.NativeCodeLoader: Loaded the native-hadoop library 13/10/09 11:27:07 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same. 13/10/09 11:27:07 WARN mapred.JobClient: No job jar file set. User classes may not be found. See JobConf(Class) or JobConf#setJar(String). 13/10/09 11:27:08 INFO mapred.JobClient: Cleaning up the staging area file:/tmp/hadoop-ubuntu/mapred/staging/ubuntu994267136/.staging/job_local994267136_0001 13/10/09 11:27:08 INFO jobcontrol.ControlledJob: DewPoint got an error while submitting java.lang.UnsupportedOperationException: you must set the keyspace and columnfamily with setInputColumnFamily() at org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat.validateConfiguration(AbstractColumnFamilyInputFormat.java:82) at org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat.getSplits(AbstractColumnFamilyInputFormat.java:115) at org.apache.hadoop.mapred.JobClient.writeNewSplits(JobClient.java:1054) at org.apache.hadoop.mapred.JobClient.writeSplits(JobClient.java:1071) at org.apache.hadoop.mapred.JobClient.access$700(JobClient.java:179) at org.apache.hadoop.mapred.JobClient$2.run(JobClient.java:983) at org.apache.hadoop.mapred.JobClient$2.run(JobClient.java:936) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:396) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1190) at org.apache.hadoop.mapred.JobClient.submitJobInternal(JobClient.java:936) at org.apache.hadoop.mapreduce.Job.submit(Job.java:550) at org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob.submit(ControlledJob.java:336) at org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl.run(JobControl.java:233) at org.apache.cassandra.com.dewpoint.run(dewpoint.java:211) at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:65) at org.apache.cassandra.com.dewpoint.main(dewpoint.java:56)