Hi all. I have a question. how can we use Mapreduce Chaining Jobs with
cassandra columnFamily input?
I use mapreduce chaining job ang it can not fined my input column family
for job1..?

Configuration conf1 = new Configuration();
        Configuration conf2 = new Configuration();

        Job job1 = new Job(conf1, "DewPoint");
        Job job2 = new Job(conf2, "DewPoint");

        job1.setJarByClass(dewpoint.
class);
        job1.setMapperClass(KeySelectmap.class);
        job1.setCombinerClass(KeySelectRed.class);
        job1.setReducerClass(KeySelectRed.class);
        job1.setOutputKeyClass(Text.class);
        job1.setOutputValueClass(IntWritable.class);
        job1.setInputFormatClass(CqlPagingInputFormat.class);
        FileOutputFormat.setOutputPath(job1, new Path(INTERMEDIATE));

        job1.setNumReduceTasks(500);

        ControlledJob cjob1 = new ControlledJob(conf1);
        cjob1.setJob(job1);

        job2.setJarByClass(dewpoint.class);
        job2.setMapperClass(dewpMapper.class);
        job2.setCombinerClass(dewpReducer.class);
        job2.setReducerClass(dewpReducer.class);
        job2.setMapOutputKeyClass(IntWritable.class);
        job2.setMapOutputValueClass(Text.class);
        job2.setOutputKeyClass(Text.class);
        job2.setOutputValueClass(IntWritable.class);
        job2.setInputFormatClass(KeyValueTextInputFormat.class);
        FileInputFormat.addInputPath(job2, new Path(INTERMEDIATE));
        FileOutputFormat.setOutputPath(job2, new Path(OUTPUT_PATH));

        job2.setNumReduceTasks(200);
        ControlledJob cjob2 = new ControlledJob(conf2);
        cjob1.setJob(job2);

        JobControl jc = new JobControl("jc");
        jc.addJob(cjob1);
        jc.addJob(cjob2);
        cjob2.addDependingJob(cjob1);
        jc.run();

       ConfigHelper.setInputRpcPort(cjob1.getJob().getConfiguration(),
"9160");

ConfigHelper.setInputInitialAddress(cjob1.getJob().getConfiguration(),
"localhost");
       ConfigHelper.setInputColumnFamily(cjob1.getJob().getConfiguration(),
KEYSPACE, COLUMN_FAMILY);
       ConfigHelper.setInputPartitioner(cjob1.getJob().getConfiguration(),
"Murmur3Partitioner");

        ConfigHelper.setInputRpcPort(cjob1.getJob().getConfiguration(),
"9160");

ConfigHelper.setInputInitialAddress(cjob1.getJob().getConfiguration(),
"localhost");

ConfigHelper.setInputColumnFamily(cjob1.getJob().getConfiguration(),
KEYSPACE, COLUMN_FAMILY);
        ConfigHelper.setInputPartitioner(cjob1.getJob().getConfiguration(),
"Murmur3Partitioner");

error:

13/10/09 11:27:07 INFO util.NativeCodeLoader: Loaded the native-hadoop
library
13/10/09 11:27:07 WARN mapred.JobClient: Use GenericOptionsParser for
parsing the arguments. Applications should implement Tool for the same.
13/10/09 11:27:07 WARN mapred.JobClient: No job jar file set.  User classes
may not be found. See JobConf(Class) or JobConf#setJar(String).
13/10/09 11:27:08 INFO mapred.JobClient: Cleaning up the staging area
file:/tmp/hadoop-ubuntu/mapred/staging/ubuntu994267136/.staging/job_local994267136_0001
13/10/09 11:27:08 INFO jobcontrol.ControlledJob: DewPoint got an error
while submitting
java.lang.UnsupportedOperationException: you must set the keyspace and
columnfamily with setInputColumnFamily()
    at
org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat.validateConfiguration(AbstractColumnFamilyInputFormat.java:82)
    at
org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat.getSplits(AbstractColumnFamilyInputFormat.java:115)
    at
org.apache.hadoop.mapred.JobClient.writeNewSplits(JobClient.java:1054)
    at org.apache.hadoop.mapred.JobClient.writeSplits(JobClient.java:1071)
    at org.apache.hadoop.mapred.JobClient.access$700(JobClient.java:179)
    at org.apache.hadoop.mapred.JobClient$2.run(JobClient.java:983)
    at org.apache.hadoop.mapred.JobClient$2.run(JobClient.java:936)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:396)
    at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1190)
    at
org.apache.hadoop.mapred.JobClient.submitJobInternal(JobClient.java:936)
    at org.apache.hadoop.mapreduce.Job.submit(Job.java:550)
    at
org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob.submit(ControlledJob.java:336)
    at
org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl.run(JobControl.java:233)
    at org.apache.cassandra.com.dewpoint.run(dewpoint.java:211)
    at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:65)
    at org.apache.cassandra.com.dewpoint.main(dewpoint.java:56)

Reply via email to