Hi,
following the article at http://www.datastax.com/dev/blog/bulk-loading ,
I developed a custom builder app to serialize a text file with rows in
json format to a sstable.
I managed to get the tool running and building the tables, however when
I try to load them I get this error:
sstableloader -d localhost demodb/
Exception in thread "main" java.lang.NullPointerException
at
org.apache.cassandra.io.sstable.SSTableLoader.<init>(SSTableLoader.java:64)
at org.apache.cassandra.tools.BulkLoader.main(BulkLoader.java:64)
and when I try to decode the sstables to json I get this one:
sstable2json demodb/demodb-positions8-jb-1-Data.db
[
{"key":
"00080000000000bae94e0000080000013f188b9bd00000040000000000","columns":
[Exception in thread "main" java.lang.IllegalArgumentException
at java.nio.Buffer.limit(Buffer.java:267)
at
org.apache.cassandra.db.marshal.AbstractCompositeType.getBytes(AbstractCompositeType.java:55)
at
org.apache.cassandra.db.marshal.AbstractCompositeType.getWithShortLength(AbstractCompositeType.java:64)
at
org.apache.cassandra.db.marshal.AbstractCompositeType.getString(AbstractCompositeType.java:230)
at
org.apache.cassandra.tools.SSTableExport.serializeColumn(SSTableExport.java:183)
at
org.apache.cassandra.tools.SSTableExport.serializeAtom(SSTableExport.java:152)
at
org.apache.cassandra.tools.SSTableExport.serializeAtoms(SSTableExport.java:140)
at
org.apache.cassandra.tools.SSTableExport.serializeRow(SSTableExport.java:238)
at
org.apache.cassandra.tools.SSTableExport.serializeRow(SSTableExport.java:223)
at
org.apache.cassandra.tools.SSTableExport.export(SSTableExport.java:360)
at
org.apache.cassandra.tools.SSTableExport.export(SSTableExport.java:382)
at
org.apache.cassandra.tools.SSTableExport.export(SSTableExport.java:394)
at
org.apache.cassandra.tools.SSTableExport.main(SSTableExport.java:477)
So it seems something is wrong with me streaming the data.
These are the relevant parts of the code:
This is the pojo to deserialize the json:
public class PositionJsonModel {
@JsonProperty("iD")
private Long idDevice;
@JsonProperty("iU")
private Long idUnit;
@JsonProperty("iE")
private Integer idEvent;
@JsonProperty("iTE")
private Integer idTypeEvent;
@JsonProperty("tEv")
private String timestampEvent;
@JsonProperty("tRx")
private String timestampRx;
@JsonProperty("mi")
private Long mileage;
private Long lat;
private Long lng;
@JsonProperty("A1")
private String country;
@JsonProperty("A2")
private String state;
@JsonProperty("A3")
private String county;
@JsonProperty("A4")
private String city;
@JsonProperty("A5")
private String locality;
@JsonProperty("st")
private String street;
@JsonProperty("cn")
private String civnum;
@JsonProperty("in")
private String info;
@JsonProperty("sp")
private Integer speed;
//getters, setters, tostring
...
And this is the main class:
....
BufferedReader reader = new BufferedReader(new
FileReader(filename));
String keyspace = "demodb";
String columnFamily="positions8";
File directory = new File(keyspace);
if (!directory.exists()) {
directory.mkdir();
}
Murmur3Partitioner partitioner = new Murmur3Partitioner();
SSTableSimpleUnsortedWriter positionsWriter =
new
SSTableSimpleUnsortedWriter(directory,partitioner,keyspace,columnFamily,
UTF8Type.instance,null,64);
String line="";
ObjectMapper mapper = new ObjectMapper();
while ((line = reader.readLine()) != null){
long timestamp = System.currentTimeMillis() * 1000;
System.out.println("timestamp: "+timestamp);
PositionJsonModel model= mapper.readValue(line,
PositionJsonModel.class);
// CREATE TABLE positions8 (
// iddevice bigint,
// timestampevent timestamp,
// idevent int,
// idunit bigint,
// status text,
// value text,
// PRIMARY KEY (iddevice, timestampevent, idevent)
// ) WITH CLUSTERING ORDER BY (timestampevent DESC,
idevent ASC)
List<AbstractType<?>> typeList = new
ArrayList<AbstractType<?>>();
typeList.add(LongType.instance);
typeList.add(DateType.instance);
typeList.add(IntegerType.instance);
CompositeType compositeKeyTypes =
CompositeType.getInstance(typeList);
Builder cpBuilder= new Builder(compositeKeyTypes);
System.out.println("getIdDevice: "+model.getIdDevice());
System.out.println("getTimestampEvent:
"+model.getTimestampEvent());
System.out.println("getIdEvent: "+model.getIdEvent());
cpBuilder.add(bytes(model.getIdDevice()));
cpBuilder.add(bytes(DateType.dateStringToTimestamp(model.getTimestampEvent())));
cpBuilder.add(bytes(model.getIdEvent()));
positionsWriter.newRow(cpBuilder.build());
positionsWriter.addColumn(bytes("idunit"),
bytes(model.getIdUnit()), timestamp);
String status="G";
if (model.getCity()==null||model.getCity().equals("")) {
status="U";
}
positionsWriter.addColumn(bytes("status"), bytes(status),
timestamp);
positionsWriter.addColumn(bytes("value"), bytes(line),
timestamp);
System.out.println("line: "+line);
}
reader.close();
positionsWriter.close();
System.exit(0);
.....
The table declaration is included above. I'm using a composite primary
key, and I'm not really sure if it's the right way to build it,
I didn't manage to get any updated or complete example on doing this
with a composite type key.
Can anyone help me?
Regards,
Paolo
--
Paolo Crosato
Software engineer/Custom Solutions