Folks, I am trying to write sample stuff to Cassandra. The operator keeps dying and being restated. The failure trace is below. This failure happens even if no data is going through the pipeline.
Here is how I create the Cassandra operator: List<FieldInfo> fieldInfos = Lists.newArrayList(); fieldInfos.add(new FieldInfo("id", "id", null)); fieldInfos.add(new FieldInfo("city", "city", null)); fieldInfos.add(new FieldInfo("fname", "firstName", null)); fieldInfos.add(new FieldInfo("lname", "lastName", null)); KafkaSinglePortInputOperator in = dag.addOperator("kafkaIn", new KafkaSinglePortInputOperator()); in.setInitialOffset( AbstractKafkaInputOperator.InitialOffset.EARLIEST.name()); JsonParser parser = dag.addOperator("jsonParser", new JsonParser()); CassandraTransactionalStore transactionalStore = new CassandraTransactionalStore(); CassandraPOJOOutputOperator out = new CassandraPOJOOutputOperator(); out.setStore(transactionalStore); out.setFieldInfos(fieldInfos); dag.addOperator("CassandraDataWriter", out); dag.addStream("parse", in.outputPort, parser.in); dag.addStream("data", parser.out, out.input); The json parser seems to work well and deserializes Kafka events into POJOs that I then want to write to Cassandra. The Cassandra schema is as follows: CREATE TABLE testapp.testuser ( id uuid PRIMARY KEY, city text, fname text, lname text ) WITH bloom_filter_fp_chance = 0.01 AND caching = '{"keys":"ALL", "rows_per_partition":"NONE"}' AND comment = '' AND compaction = {'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy'} AND compression = {'sstable_compression': 'org.apache.cassandra.io.compress.LZ4Compressor'} AND dclocal_read_repair_chance = 0.1 AND default_time_to_live = 0 AND gc_grace_seconds = 864000 AND max_index_interval = 2048 AND memtable_flush_period_in_ms = 0 AND min_index_interval = 128 AND read_repair_chance = 0.0 AND speculative_retry = '99.0PERCENTILE'; Again, even without sending data, the exception happens. What am I missing? Any hint would be appreciated. 2016-12-05 16:24:31,643 INFO com.datatorrent.common.util.AsyncFSStorageAgent: using /app/hadoop/tmp/nm-local-dir/usercache/dtadmin/appcache/application_1480950234717_0002/container_1480950234717_0002_01_000137/tmp/chkp6701939091095420196 as the basepath for checkpointing. 2016-12-05 16:24:31,704 ERROR com.datatorrent.stram.engine.StreamingContainer: Operator set [OperatorDeployInfo[id=3,name=CassandraDataWriter,type=GENERIC,checkpoint={58458e1b00000a4f, 0, 0},inputs=[OperatorDeployInfo.InputDeployInfo[portName=input,streamId=data,sourceNodeId=2,sourcePortName=out,locality=<null>,partitionMask=0,partitionKeys=<null>]],outputs=[]]] stopped running due to an exception. com.datastax.driver.core.exceptions.InvalidQueryException: Invalid null value for partition key part id at com.datastax.driver.core.exceptions.InvalidQueryException.copy(InvalidQueryException.java:50) at com.datastax.driver.core.DriverThrowables.propagateCause(DriverThrowables.java:37) at com.datastax.driver.core.DefaultResultSetFuture.getUninterruptibly(DefaultResultSetFuture.java:245) at com.datastax.driver.core.AbstractSession.execute(AbstractSession.java:64)