Ananth,
Yes we can very well try executing each query in batch and illuminate only
erroneous query/statement. That we can  be detailed out.

@Max,
As operator doesn't handle this case, it's a good idea to use filter in
upstream operator. We do have filter operator in Malhar.

-Priyanka


On Tue, Dec 6, 2016 at 5:16 PM, Max Bridgewater <max.bridgewa...@gmail.com>
wrote:

> I was wondering if there could be different strategies to choose from
> depending on the scenario. One could be to drop the events in that window.
> (Trying to avoid the word batch as I understand Apex doesn't do batch ;))
>
> In my case, I would have preferred to update the upstream operator to
> filter out events with null Ids. Or I would have added a filter operator.
> Would this have been a viable way out?
>
> On Tue, Dec 6, 2016 at 4:35 AM, Ananth G <ananthg.a...@gmail.com> wrote:
>
>> Dropping the entire batch even if one entry is invalid seems to be too
>> stringent . Thoughts?
>>
>> Regards
>> Ananth
>>
>> On 6 Dec. 2016, at 5:08 pm, Priyanka Gugale <pri...@apache.org> wrote:
>>
>> Hi Max,
>>
>> Right now the operator doesn't provide a way to set retry limit and drop
>> the statement / batch in case of exceptions. We should add this support
>> to CassandraTransactionalStore. Created a Malhar jira ticket to track
>> this: https://issues.apache.org/jira/browse/APEXMALHAR-2367
>>
>> -Priyanka
>>
>> On Tue, Dec 6, 2016 at 12:51 AM, Max Bridgewater <
>> max.bridgewa...@gmail.com> wrote:
>>
>>> Thanks, that's exactly what happened. One of my input event had Id as
>>> null. What happened from there is what confused me. But it now makes
>>> complete sense. Because this event was not delivered successfully, Apex
>>> kept retrying. This means that messages that came after the malformed one
>>> were stuck. I wonder if there is a way to limit the number of retries or if
>>> this is left to the application layer.
>>>
>>> On Mon, Dec 5, 2016 at 11:53 AM, Priyanka Gugale <pri...@apache.org>
>>> wrote:
>>>
>>>> Is it possible that your input has "null" value for id field? id seems
>>>> to be your primary key, so it cannot accept null values.
>>>>
>>>> -Priyanka
>>>>
>>>> On Mon, Dec 5, 2016 at 10:04 PM, Max Bridgewater <
>>>> max.bridgewa...@gmail.com> wrote:
>>>>
>>>>> Folks,
>>>>>
>>>>> I am trying to write sample stuff to Cassandra. The operator keeps
>>>>> dying and being restated. The failure trace is below. This failure happens
>>>>> even if no data is going through the pipeline.
>>>>>
>>>>> Here is how I create the Cassandra operator:
>>>>>
>>>>>         List<FieldInfo> fieldInfos = Lists.newArrayList();
>>>>>         fieldInfos.add(new FieldInfo("id", "id", null));
>>>>>         fieldInfos.add(new FieldInfo("city", "city", null));
>>>>>         fieldInfos.add(new FieldInfo("fname", "firstName", null));
>>>>>         fieldInfos.add(new FieldInfo("lname", "lastName", null));
>>>>>
>>>>>         KafkaSinglePortInputOperator in = dag.addOperator("kafkaIn",
>>>>> new     KafkaSinglePortInputOperator());
>>>>>         in.setInitialOffset(AbstractKafkaInputOperator.InitialOffset
>>>>> .EARLIEST.name());
>>>>>         JsonParser parser = dag.addOperator("jsonParser", new
>>>>> JsonParser());
>>>>>         CassandraTransactionalStore transactionalStore = new
>>>>> CassandraTransactionalStore();
>>>>>         CassandraPOJOOutputOperator out = new
>>>>> CassandraPOJOOutputOperator();
>>>>>         out.setStore(transactionalStore);
>>>>>         out.setFieldInfos(fieldInfos);
>>>>>         dag.addOperator("CassandraDataWriter", out);
>>>>>         dag.addStream("parse", in.outputPort, parser.in);
>>>>>         dag.addStream("data", parser.out, out.input);
>>>>>
>>>>> The json parser seems to work well and deserializes Kafka events into
>>>>> POJOs that I then want to write to Cassandra.
>>>>>
>>>>> The Cassandra schema is as follows:
>>>>>
>>>>> CREATE TABLE testapp.testuser (
>>>>>     id uuid PRIMARY KEY,
>>>>>     city text,
>>>>>     fname text,
>>>>>     lname text
>>>>> ) WITH bloom_filter_fp_chance = 0.01
>>>>>     AND caching = '{"keys":"ALL", "rows_per_partition":"NONE"}'
>>>>>     AND comment = ''
>>>>>     AND compaction = {'class': 'org.apache.cassandra.db.compa
>>>>> ction.SizeTieredCompactionStrategy'}
>>>>>     AND compression = {'sstable_compression': 'org.apache.cassandra.io
>>>>> .compress.LZ4Compressor'}
>>>>>     AND dclocal_read_repair_chance = 0.1
>>>>>     AND default_time_to_live = 0
>>>>>     AND gc_grace_seconds = 864000
>>>>>     AND max_index_interval = 2048
>>>>>     AND memtable_flush_period_in_ms = 0
>>>>>     AND min_index_interval = 128
>>>>>     AND read_repair_chance = 0.0
>>>>>     AND speculative_retry = '99.0PERCENTILE';
>>>>>
>>>>> Again, even without sending data, the exception happens. What am I
>>>>> missing? Any hint would be appreciated.
>>>>>
>>>>> 2016-12-05 16:24:31,643 INFO 
>>>>> com.datatorrent.common.util.AsyncFSStorageAgent:
>>>>> using /app/hadoop/tmp/nm-local-dir/usercache/dtadmin/appcache/appl
>>>>> ication_1480950234717_0002/container_1480950234717_0002_01_0
>>>>> 00137/tmp/chkp6701939091095420196 as the basepath for checkpointing.
>>>>> 2016-12-05 16:24:31,704 ERROR 
>>>>> com.datatorrent.stram.engine.StreamingContainer:
>>>>> Operator set [OperatorDeployInfo[id=3,name=
>>>>> CassandraDataWriter,type=GENERIC,checkpoint={58458e1b00000a4f, 0,
>>>>> 0},inputs=[OperatorDeployInfo.InputDeployInfo[portName=input
>>>>> ,streamId=data,sourceNodeId=2,sourcePortName=out,locality=<n
>>>>> ull>,partitionMask=0,partitionKeys=<null>]],outputs=[]]] stopped
>>>>> running due to an exception.
>>>>> com.datastax.driver.core.exceptions.InvalidQueryException: Invalid
>>>>> null value for partition key part id
>>>>> at com.datastax.driver.core.exceptions.InvalidQueryException.co
>>>>> py(InvalidQueryException.java:50)
>>>>> at com.datastax.driver.core.DriverThrowables.propagateCause(Dri
>>>>> verThrowables.java:37)
>>>>> at com.datastax.driver.core.DefaultResultSetFuture.getUninterru
>>>>> ptibly(DefaultResultSetFuture.java:245)
>>>>> at com.datastax.driver.core.AbstractSession.execute(AbstractSes
>>>>> sion.java:64)
>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to