It looks like the Kafka API dependency is missing. Can you please check it
is part of the .apa file?

To your previous question: The records/tuples/objects are moved by the Apex
engine through the stream from operator to operator. There is nothing you
need to do beyond connecting the operator ports with addStream when you
specify the DAG.


On Fri, Oct 7, 2016 at 9:01 AM, Jaspal Singh <jaspal.singh1...@gmail.com>
wrote:

> Thomas,
>
> Below is the operator implementation we are trying to run. This operator
> is getting an object of tenant class from updtream operator.
>
> public class KafkaSinglePortExactlyOnceOutputOperator extends 
> AbstractKafkaOutputOperator {
>
>     private static final Logger LOG = 
> LoggerFactory.getLogger(KafkaSinglePortExactlyOnceOutputOperator.class);
>
>     public transient final DefaultInputPort<Tenant> in = new 
> DefaultInputPort<Tenant>() {
>
>         Gson gson = new Gson();
>
>         @Override
>         public void process(Tenant tenant) {
>
>             try {
>                 Producer<String, String> producer = getKafkaProducer();
>                 //ObjectMapper mapper = new ObjectMapper();
>                 long now = System.currentTimeMillis();
>                 //Configuration conf = HBaseConfiguration.create();
>                 //TenantDao dao = new TenantDao(conf);
>                 //ArrayList<Put> puts = new ArrayList<>();
>                 if (tenant != null) {
>                     //Tenant tenant = tenant.next();
>                     if (StringUtils.isNotEmpty(tenant.getGl())) {
>                         producer.send(new ProducerRecord<String, 
> String>("/datalake/corporate/ses_dlpoc/stream_fabbddev/jsingh73/sample-stream:certUpdate",
>  tenant.getVolumeName(), gson.toJson(tenant)));
>                         //puts.add(dao.mkPut(tenant));
>                     } else {
>                         producer.send(new ProducerRecord<String, 
> String>("/datalake/corporate/ses_dlpoc/stream_fabbddev/jsingh73/sample-stream:error",
>  tenant.getVolumeName(), gson.toJson(tenant)));
>
>                     }
>                     producer.flush();
>                 }
>             }
>
>
> After building the application, it throws error during launch:
>
> An error occurred trying to launch the application. Server message:
> java.lang.NoClassDefFoundError: Lkafka/javaapi/producer/Producer; at
> java.lang.Class.getDeclaredFields0(Native Method) at java.lang.Class.
> privateGetDeclaredFields(Class.java:2583) at java.lang.Class.
> getDeclaredFields(Class.java:1916) at
>
>
> Thanks
> Jaspal
>
> On Fri, Oct 7, 2016 at 10:42 AM, Jaspal Singh <jaspal.singh1...@gmail.com>
> wrote:
>
>> Thomas,
>>
>> I was trying to refer to the input from previous operator.
>>
>> Another thing when we extend the AbstractKafkaOutputOperator, do we need
>> to specify <String, T> ? Since we are getting an object of class type from
>> previous operator.
>>
>>
>> Thanks
>> Jaspal
>>
>> On Fri, Oct 7, 2016 at 10:12 AM, Thomas Weise <t...@apache.org> wrote:
>>
>>> Are you referring to the upstream operator in the DAG or the state of
>>> the previous application after relaunch? Since the data is stored in MapR
>>> streams, an operator that is a producer can also act as a consumer. Please
>>> clarify your question.
>>>
>>>
>>> On Fri, Oct 7, 2016 at 7:59 AM, Jaspal Singh <jaspal.singh1...@gmail.com
>>> > wrote:
>>>
>>>> Hi Thomas,
>>>>
>>>> I have a question, so when we are using
>>>> *KafkaSinglePortExactlyOnceOutputOperator* to write results into
>>>> maprstream topic will it be able to read messgaes from the previous
>>>> operator ?
>>>>
>>>>
>>>> Thanks
>>>> Jaspal
>>>>
>>>> On Thu, Oct 6, 2016 at 6:28 PM, Thomas Weise <t...@apache.org> wrote:
>>>>
>>>>> For recovery you need to set the window data manager like so:
>>>>>
>>>>> https://github.com/DataTorrent/examples/blob/master/tutorial
>>>>> s/exactly-once/src/main/java/com/example/myapexapp/Applicati
>>>>> on.java#L33
>>>>>
>>>>> That will also apply to stateful restart of the entire application
>>>>> (relaunch from previous instance's checkpointed state).
>>>>>
>>>>> For cold restart, you would need to consider the property you mention
>>>>> and decide what is applicable to your use case.
>>>>>
>>>>> Thomas
>>>>>
>>>>>
>>>>> On Thu, Oct 6, 2016 at 4:16 PM, Jaspal Singh <
>>>>> jaspal.singh1...@gmail.com> wrote:
>>>>>
>>>>>> Ok now I get it. Thanks for the nice explaination !!
>>>>>>
>>>>>> One more thing, so you mentioned about checkpointing the offset
>>>>>> ranges to replay in same order from kafka.
>>>>>>
>>>>>> Is there any property we need to configure to do that? like
>>>>>> initialOffset set to APPLICATION_OR_LATEST.
>>>>>>
>>>>>>
>>>>>> Thanks
>>>>>> Jaspal
>>>>>>
>>>>>>
>>>>>> On Thursday, October 6, 2016, Thomas Weise <thomas.we...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> What you want is the effect of exactly-once output (that's why we
>>>>>>> call it also end-to-end exactly-once). There is no such thing as
>>>>>>> exactly-once processing in a distributed system. In this case it would 
>>>>>>> be
>>>>>>> rather "produce exactly-once. Upstream operators, on failure, will 
>>>>>>> recover
>>>>>>> to checkpointed state and re-process the stream from there. This is
>>>>>>> at-least-once, the default behavior. Because in the input operator you 
>>>>>>> have
>>>>>>> configured to replay in the same order from Kafka (this is done by
>>>>>>> checkpointing the offset ranges), the computation in the DAG is 
>>>>>>> idempotent
>>>>>>> and the output operator can discard the results that were already 
>>>>>>> published
>>>>>>> instead of producing duplicates.
>>>>>>>
>>>>>>> On Thu, Oct 6, 2016 at 3:57 PM, Jaspal Singh <
>>>>>>> jaspal.singh1...@gmail.com> wrote:
>>>>>>>
>>>>>>>> I think this is something called a customized operator
>>>>>>>> implementation that is taking care of exactly once processing at 
>>>>>>>> output.
>>>>>>>>
>>>>>>>> What if any previous operators fail ? How we can make sure they
>>>>>>>> also recover using EXACTLY_ONCE processing mode ?
>>>>>>>>
>>>>>>>>
>>>>>>>> Thanks
>>>>>>>> Jaspal
>>>>>>>>
>>>>>>>>
>>>>>>>> On Thursday, October 6, 2016, Thomas Weise <thomas.we...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> In that case please have a look at:
>>>>>>>>>
>>>>>>>>> https://github.com/apache/apex-malhar/blob/master/kafka/src/
>>>>>>>>> main/java/org/apache/apex/malhar/kafka/KafkaSinglePortExactl
>>>>>>>>> yOnceOutputOperator.java
>>>>>>>>>
>>>>>>>>> The operator will ensure that messages are not duplicated, under
>>>>>>>>> the stated assumptions.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Thu, Oct 6, 2016 at 3:37 PM, Jaspal Singh <
>>>>>>>>> jaspal.singh1...@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> Hi Thomas,
>>>>>>>>>>
>>>>>>>>>> In our case we are writing the results back to maprstreams topic
>>>>>>>>>> based on some validations.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Thanks
>>>>>>>>>> Jaspal
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Thursday, October 6, 2016, Thomas Weise <t...@apache.org>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi,
>>>>>>>>>>>
>>>>>>>>>>> which operators in your application are writing to external
>>>>>>>>>>> systems?
>>>>>>>>>>>
>>>>>>>>>>> When you look at the example from the blog (
>>>>>>>>>>> https://github.com/DataTorrent/examples/tree/master/tutoria
>>>>>>>>>>> ls/exactly-once), there is Kafka input, which is configured to
>>>>>>>>>>> be idempotent. The results are written to JDBC. That operator by 
>>>>>>>>>>> itself
>>>>>>>>>>> supports exactly-once through transactions (in conjunction with 
>>>>>>>>>>> idempotent
>>>>>>>>>>> input), hence there is no need to configure the processing mode at 
>>>>>>>>>>> all.
>>>>>>>>>>>
>>>>>>>>>>> Thomas
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to