Ananth, Yes we can very well try executing each query in batch and illuminate only erroneous query/statement. That we can be detailed out.
@Max, As operator doesn't handle this case, it's a good idea to use filter in upstream operator. We do have filter operator in Malhar. -Priyanka On Tue, Dec 6, 2016 at 5:16 PM, Max Bridgewater <max.bridgewa...@gmail.com> wrote: > I was wondering if there could be different strategies to choose from > depending on the scenario. One could be to drop the events in that window. > (Trying to avoid the word batch as I understand Apex doesn't do batch ;)) > > In my case, I would have preferred to update the upstream operator to > filter out events with null Ids. Or I would have added a filter operator. > Would this have been a viable way out? > > On Tue, Dec 6, 2016 at 4:35 AM, Ananth G <ananthg.a...@gmail.com> wrote: > >> Dropping the entire batch even if one entry is invalid seems to be too >> stringent . Thoughts? >> >> Regards >> Ananth >> >> On 6 Dec. 2016, at 5:08 pm, Priyanka Gugale <pri...@apache.org> wrote: >> >> Hi Max, >> >> Right now the operator doesn't provide a way to set retry limit and drop >> the statement / batch in case of exceptions. We should add this support >> to CassandraTransactionalStore. Created a Malhar jira ticket to track >> this: https://issues.apache.org/jira/browse/APEXMALHAR-2367 >> >> -Priyanka >> >> On Tue, Dec 6, 2016 at 12:51 AM, Max Bridgewater < >> max.bridgewa...@gmail.com> wrote: >> >>> Thanks, that's exactly what happened. One of my input event had Id as >>> null. What happened from there is what confused me. But it now makes >>> complete sense. Because this event was not delivered successfully, Apex >>> kept retrying. This means that messages that came after the malformed one >>> were stuck. I wonder if there is a way to limit the number of retries or if >>> this is left to the application layer. >>> >>> On Mon, Dec 5, 2016 at 11:53 AM, Priyanka Gugale <pri...@apache.org> >>> wrote: >>> >>>> Is it possible that your input has "null" value for id field? id seems >>>> to be your primary key, so it cannot accept null values. >>>> >>>> -Priyanka >>>> >>>> On Mon, Dec 5, 2016 at 10:04 PM, Max Bridgewater < >>>> max.bridgewa...@gmail.com> wrote: >>>> >>>>> Folks, >>>>> >>>>> I am trying to write sample stuff to Cassandra. The operator keeps >>>>> dying and being restated. The failure trace is below. This failure happens >>>>> even if no data is going through the pipeline. >>>>> >>>>> Here is how I create the Cassandra operator: >>>>> >>>>> List<FieldInfo> fieldInfos = Lists.newArrayList(); >>>>> fieldInfos.add(new FieldInfo("id", "id", null)); >>>>> fieldInfos.add(new FieldInfo("city", "city", null)); >>>>> fieldInfos.add(new FieldInfo("fname", "firstName", null)); >>>>> fieldInfos.add(new FieldInfo("lname", "lastName", null)); >>>>> >>>>> KafkaSinglePortInputOperator in = dag.addOperator("kafkaIn", >>>>> new KafkaSinglePortInputOperator()); >>>>> in.setInitialOffset(AbstractKafkaInputOperator.InitialOffset >>>>> .EARLIEST.name()); >>>>> JsonParser parser = dag.addOperator("jsonParser", new >>>>> JsonParser()); >>>>> CassandraTransactionalStore transactionalStore = new >>>>> CassandraTransactionalStore(); >>>>> CassandraPOJOOutputOperator out = new >>>>> CassandraPOJOOutputOperator(); >>>>> out.setStore(transactionalStore); >>>>> out.setFieldInfos(fieldInfos); >>>>> dag.addOperator("CassandraDataWriter", out); >>>>> dag.addStream("parse", in.outputPort, parser.in); >>>>> dag.addStream("data", parser.out, out.input); >>>>> >>>>> The json parser seems to work well and deserializes Kafka events into >>>>> POJOs that I then want to write to Cassandra. >>>>> >>>>> The Cassandra schema is as follows: >>>>> >>>>> CREATE TABLE testapp.testuser ( >>>>> id uuid PRIMARY KEY, >>>>> city text, >>>>> fname text, >>>>> lname text >>>>> ) WITH bloom_filter_fp_chance = 0.01 >>>>> AND caching = '{"keys":"ALL", "rows_per_partition":"NONE"}' >>>>> AND comment = '' >>>>> AND compaction = {'class': 'org.apache.cassandra.db.compa >>>>> ction.SizeTieredCompactionStrategy'} >>>>> AND compression = {'sstable_compression': 'org.apache.cassandra.io >>>>> .compress.LZ4Compressor'} >>>>> AND dclocal_read_repair_chance = 0.1 >>>>> AND default_time_to_live = 0 >>>>> AND gc_grace_seconds = 864000 >>>>> AND max_index_interval = 2048 >>>>> AND memtable_flush_period_in_ms = 0 >>>>> AND min_index_interval = 128 >>>>> AND read_repair_chance = 0.0 >>>>> AND speculative_retry = '99.0PERCENTILE'; >>>>> >>>>> Again, even without sending data, the exception happens. What am I >>>>> missing? Any hint would be appreciated. >>>>> >>>>> 2016-12-05 16:24:31,643 INFO >>>>> com.datatorrent.common.util.AsyncFSStorageAgent: >>>>> using /app/hadoop/tmp/nm-local-dir/usercache/dtadmin/appcache/appl >>>>> ication_1480950234717_0002/container_1480950234717_0002_01_0 >>>>> 00137/tmp/chkp6701939091095420196 as the basepath for checkpointing. >>>>> 2016-12-05 16:24:31,704 ERROR >>>>> com.datatorrent.stram.engine.StreamingContainer: >>>>> Operator set [OperatorDeployInfo[id=3,name= >>>>> CassandraDataWriter,type=GENERIC,checkpoint={58458e1b00000a4f, 0, >>>>> 0},inputs=[OperatorDeployInfo.InputDeployInfo[portName=input >>>>> ,streamId=data,sourceNodeId=2,sourcePortName=out,locality=<n >>>>> ull>,partitionMask=0,partitionKeys=<null>]],outputs=[]]] stopped >>>>> running due to an exception. >>>>> com.datastax.driver.core.exceptions.InvalidQueryException: Invalid >>>>> null value for partition key part id >>>>> at com.datastax.driver.core.exceptions.InvalidQueryException.co >>>>> py(InvalidQueryException.java:50) >>>>> at com.datastax.driver.core.DriverThrowables.propagateCause(Dri >>>>> verThrowables.java:37) >>>>> at com.datastax.driver.core.DefaultResultSetFuture.getUninterru >>>>> ptibly(DefaultResultSetFuture.java:245) >>>>> at com.datastax.driver.core.AbstractSession.execute(AbstractSes >>>>> sion.java:64) >>>>> >>>>> >>>>> >>>> >>> >> >