Hi all... I have a question. in the cassandra wordcount mapreduce with
cql3, I want to get a string column and a float (or double) column as map
input key and value. I mean I want to get date column of type string as key
and temprature column of type float as value. but when I println value of
temprature it shows me som of them and then error....
here is the code:
package org.apache.cassandra.com;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.*;
import java.util.Map.Entry;
import org.apache.cassandra.hadoop.cql3.CqlConfigHelper;
import org.apache.cassandra.hadoop.cql3.CqlOutputFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.hadoop.cql3.CqlPagingInputFormat;
import org.apache.cassandra.hadoop.ConfigHelper;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import java.nio.charset.CharacterCodingException;
public class dewpoint extends Configured implements Tool
{
private static final Logger logger =
LoggerFactory.getLogger(dewpoint.class);
static final String KEYSPACE = "weather";
static final String COLUMN_FAMILY = "momentinfo";
static final String OUTPUT_REDUCER_VAR = "output_reducer";
static final String OUTPUT_COLUMN_FAMILY = "output_words";
private static final String OUTPUT_PATH_PREFIX = "/tmp/dewpointt";
private static final String PRIMARY_KEY = "row_key";
public static void main(String[] args) throws Exception
{
// Let ToolRunner handle generic command-line options
ToolRunner.run(new Configuration(), new dewpoint(), args);
System.exit(0);
}
public static class TokenizerMapper extends Mapper<Map<String,
ByteBuffer>, Map<String, ByteBuffer>, Text, IntWritable>
{
private final static IntWritable one = new IntWritable(1);
private Text date = new Text();
public void map(Map<String, ByteBuffer> keys, Map<String,
ByteBuffer> columns, Context context) throws IOException,
InterruptedException
{
for (Entry<String, ByteBuffer> column : columns.entrySet())
{
if (!"date".equalsIgnoreCase(column.getKey()) &&
!"temprature".equalsIgnoreCase(column.getKey()))
continue;
String value1 = ByteBufferUtil.string(column.getValue());
double value2 = ByteBufferUtil.toDouble(column.getValue());
System.out.println(value2);
.....
and here is the error:
13/10/05 12:36:22 INFO com.dewpoint: output reducer type: filesystem
13/10/05 12:36:24 INFO util.NativeCodeLoader: Loaded the native-hadoop
library
13/10/05 12:36:24 WARN mapred.JobClient: No job jar file set. User classes
may not be found. See JobConf(Class) or JobConf#setJar(String).
13/10/05 12:36:26 INFO mapred.JobClient: Running job:
job_local1875596001_0001
13/10/05 12:36:27 INFO mapred.LocalJobRunner: Waiting for map tasks
13/10/05 12:36:27 INFO mapred.LocalJobRunner: Starting task:
attempt_local1875596001_0001_m_000000_0
13/10/05 12:36:27 INFO util.ProcessTree: setsid exited with exit code 0
13/10/05 12:36:27 INFO mapred.Task: Using ResourceCalculatorPlugin :
org.apache.hadoop.util.LinuxResourceCalculatorPlugin@1e2670b
13/10/05 12:36:27 INFO mapred.MapTask: Processing split:
ColumnFamilySplit((5366152502320075885, '9070993788622720120] @[localhost])
13/10/05 12:36:27 INFO mapred.MapTask: io.sort.mb = 100
13/10/05 12:36:27 INFO mapred.JobClient: map 0% reduce 0%
13/10/05 12:36:28 INFO mapred.MapTask: data buffer = 79691776/99614720
13/10/05 12:36:28 INFO mapred.MapTask: record buffer = 262144/327680
6.00457842484433E-67
13/10/05 12:36:30 INFO mapred.MapTask: Starting flush of map output
13/10/05 12:36:30 INFO mapred.MapTask: Finished spill 0
13/10/05 12:36:30 INFO mapred.LocalJobRunner: Starting task:
attempt_local1875596001_0001_m_000001_0
13/10/05 12:36:30 INFO mapred.Task: Using ResourceCalculatorPlugin :
org.apache.hadoop.util.LinuxResourceCalculatorPlugin@1579a30
13/10/05 12:36:30 INFO mapred.MapTask: Processing split:
ColumnFamilySplit((-5699318449577318512, '-2034684803435882987]
@[localhost])
13/10/05 12:36:30 INFO mapred.MapTask: io.sort.mb = 100
13/10/05 12:36:32 INFO mapred.MapTask: data buffer = 79691776/99614720
13/10/05 12:36:32 INFO mapred.MapTask: record buffer = 262144/327680
6.004578424845004E-67
13/10/05 12:36:32 INFO mapred.MapTask: Starting flush of map output
13/10/05 12:36:32 INFO mapred.MapTask: Finished spill 0
13/10/05 12:36:32 INFO mapred.LocalJobRunner: Starting task:
attempt_local1875596001_0001_m_000002_0
13/10/05 12:36:32 INFO mapred.Task: Using ResourceCalculatorPlugin :
org.apache.hadoop.util.LinuxResourceCalculatorPlugin@112da40
13/10/05 12:36:32 INFO mapred.MapTask: Processing split:
ColumnFamilySplit((1684704676388456087, '5366152502320075885] @[localhost])
13/10/05 12:36:32 INFO mapred.MapTask: io.sort.mb = 100
13/10/05 12:36:32 INFO mapred.MapTask: data buffer = 79691776/99614720
13/10/05 12:36:32 INFO mapred.MapTask: record buffer = 262144/327680
1.4273722733722645E-71
13/10/05 12:36:32 INFO mapred.MapTask: Starting flush of map output
13/10/05 12:36:32 INFO mapred.MapTask: Finished spill 0
13/10/05 12:36:32 INFO mapred.LocalJobRunner: Starting task:
attempt_local1875596001_0001_m_000003_0
13/10/05 12:36:32 INFO mapred.Task: Using ResourceCalculatorPlugin :
org.apache.hadoop.util.LinuxResourceCalculatorPlugin@126a29c
13/10/05 12:36:32 INFO mapred.MapTask: Processing split:
ColumnFamilySplit((-9223372036854775808, '-5699318449577318512]
@[localhost])
13/10/05 12:36:32 INFO mapred.MapTask: io.sort.mb = 100
13/10/05 12:36:33 INFO mapred.MapTask: data buffer = 79691776/99614720
13/10/05 12:36:33 INFO mapred.LocalJobRunner:
13/10/05 12:36:33 INFO mapred.MapTask: record buffer = 262144/327680
6.00457842484433E-67
13/10/05 12:36:33 INFO mapred.MapTask: Starting flush of map output
13/10/05 12:36:33 INFO mapred.MapTask: Finished spill 0
13/10/05 12:36:33 INFO mapred.LocalJobRunner: Starting task:
attempt_local1875596001_0001_m_000004_0
13/10/05 12:36:33 INFO mapred.Task: Using ResourceCalculatorPlugin :
org.apache.hadoop.util.LinuxResourceCalculatorPlugin@2f2295
13/10/05 12:36:33 INFO mapred.MapTask: Processing split:
ColumnFamilySplit((-2034684803435882987, '1684704676388456087] @[localhost])
13/10/05 12:36:33 INFO mapred.MapTask: io.sort.mb = 100
13/10/05 12:36:34 INFO mapred.MapTask: data buffer = 79691776/99614720
13/10/05 12:36:34 INFO mapred.MapTask: record buffer = 262144/327680
13/10/05 12:36:34 INFO mapred.JobClient: map 16% reduce 0%
6.004595404242602E-67
13/10/05 12:36:34 INFO mapred.MapTask: Starting flush of map output
13/10/05 12:36:34 INFO mapred.MapTask: Finished spill 0
13/10/05 12:36:34 INFO mapred.LocalJobRunner: Starting task:
attempt_local1875596001_0001_m_000005_0
13/10/05 12:36:34 INFO mapred.Task: Using ResourceCalculatorPlugin :
org.apache.hadoop.util.LinuxResourceCalculatorPlugin@1706da8
13/10/05 12:36:34 INFO mapred.MapTask: Processing split:
ColumnFamilySplit((9070993788622720120, '-9223372036854775808] @[localhost])
13/10/05 12:36:34 INFO mapred.MapTask: io.sort.mb = 100
13/10/05 12:36:34 INFO mapred.MapTask: data buffer = 79691776/99614720
13/10/05 12:36:34 INFO mapred.MapTask: record buffer = 262144/327680
6.004601064041352E-67
13/10/05 12:36:34 INFO mapred.MapTask: Starting flush of map output
13/10/05 12:36:34 INFO mapred.MapTask: Finished spill 0
13/10/05 12:36:34 INFO mapred.LocalJobRunner: Map task executor complete.
13/10/05 12:36:34 WARN mapred.LocalJobRunner: job_local1875596001_0001
java.lang.Exception: java.nio.charset.MalformedInputException: Input length
= 1
at
org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:354)
Caused by: java.nio.charset.MalformedInputException: Input length = 1
at java.nio.charset.CoderResult.throwException(CoderResult.java:260)
at java.nio.charset.CharsetDecoder.decode(CharsetDecoder.java:781)
at
org.apache.cassandra.utils.ByteBufferUtil.string(ByteBufferUtil.java:167)
at
org.apache.cassandra.utils.ByteBufferUtil.string(ByteBufferUtil.java:124)
at
org.apache.cassandra.com.dewpoint$TokenizerMapper.map(dewpoint.java:65)
at
org.apache.cassandra.com.dewpoint$TokenizerMapper.map(dewpoint.java:1)
at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:764)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:364)
at
org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:223)
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)
at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
at java.util.concurrent.FutureTask.run(FutureTask.java:138)
at
java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
at java.lang.Thread.run(Thread.java:662)
13/10/05 12:36:35 INFO mapred.JobClient: Job complete:
job_local1875596001_0001
13/10/05 12:36:35 INFO mapred.JobClient: Counters: 15
13/10/05 12:36:35 INFO mapred.JobClient: FileSystemCounters
13/10/05 12:36:35 INFO mapred.JobClient: FILE_BYTES_READ=2713
13/10/05 12:36:35 INFO mapred.JobClient: FILE_BYTES_WRITTEN=53478
13/10/05 12:36:35 INFO mapred.JobClient: File Input Format Counters
13/10/05 12:36:35 INFO mapred.JobClient: Bytes Read=0
13/10/05 12:36:35 INFO mapred.JobClient: Map-Reduce Framework
13/10/05 12:36:35 INFO mapred.JobClient: Map output materialized
bytes=23
13/10/05 12:36:35 INFO mapred.JobClient: Combine output records=1
13/10/05 12:36:35 INFO mapred.JobClient: Map input records=1
13/10/05 12:36:35 INFO mapred.JobClient: Physical memory (bytes)
snapshot=0
13/10/05 12:36:35 INFO mapred.JobClient: Spilled Records=1
13/10/05 12:36:35 INFO mapred.JobClient: Map output bytes=15
13/10/05 12:36:35 INFO mapred.JobClient: CPU time spent (ms)=0
13/10/05 12:36:35 INFO mapred.JobClient: Total committed heap usage
(bytes)=363921408
13/10/05 12:36:35 INFO mapred.JobClient: Virtual memory (bytes)
snapshot=0
13/10/05 12:36:35 INFO mapred.JobClient: Combine input records=1
13/10/05 12:36:35 INFO mapred.JobClient: Map output records=1
13/10/05 12:36:35 INFO mapred.JobClient: SPLIT_RAW_BYTES=103
what does it mean?