After making the change, we are getting the below error while application
launch:

*An error occurred trying to launch the application. Server message:
javax.validation.ConstraintViolationException: Operator kafkaOut violates
constraints
[ConstraintViolationImpl{rootBean=com.example.datatorrent.KafkaSinglePortExactlyOnceOutputOperator@4726f93f,
propertyPath='topic', message='may not be null', *



Thanks!!

On Fri, Oct 7, 2016 at 1:13 PM, Jaspal Singh <jaspal.singh1...@gmail.com>
wrote:

> So I just changes the malhar-kafka version to 3.5.0, I was able to import
> the AbstractOutputOperator. Let me try to launch it now.
>
> Thanks for your inputs !!
>
>
>
> On Fri, Oct 7, 2016 at 1:09 PM, Jaspal Singh <jaspal.singh1...@gmail.com>
> wrote:
>
>> Should we use malhar-library version 3.5 then ?
>>
>>
>> Thanks!!
>>
>> On Fri, Oct 7, 2016 at 1:04 PM, Thomas Weise <t...@apache.org> wrote:
>>
>>> Please make sure to depend on version 3.5 of malhar-kafka in pom.xml.
>>> This operator is not in malhar-library, it's a separate module.
>>>
>>>
>>> On Fri, Oct 7, 2016 at 11:01 AM, Jaspal Singh <
>>> jaspal.singh1...@gmail.com> wrote:
>>>
>>>> Hi Siyuan,
>>>>
>>>> I am using the same Kafka producer as you mentioned. But I am not
>>>> seeing the AbstractKafkaOutputOperator in malhar library while import.
>>>>
>>>>
>>>> Thanks!!
>>>>
>>>> On Fri, Oct 7, 2016 at 12:52 PM, hsy...@gmail.com <hsy...@gmail.com>
>>>> wrote:
>>>>
>>>>> Also which kafka output operator you are using?
>>>>> Please use org.apache.apex.malhar.kafka.AbstractOutputOperator
>>>>> instead of com.datatorrent.contrib.kafka.AbstractOutputOperator.
>>>>> Only the org.apache.apex.malhar.kafka.AbstractOutputOperator works
>>>>> with MapR stream, the latter only works with kafka 0.8.* or 0.9
>>>>>
>>>>> Regards,
>>>>> Siyuan
>>>>>
>>>>> On Fri, Oct 7, 2016 at 10:38 AM, hsy...@gmail.com <hsy...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hey Jaspal,
>>>>>>
>>>>>> Did you add any code to existing KafkaSinglePortExactlyOnceOutputOperator
>>>>>> from malhar?  If so please make sure the producer you use here is
>>>>>> org.apache.kafka.clients.producer.KafkaProducer instead of
>>>>>> kafka.javaapi.producer.Producer.  That is old api and that is not
>>>>>> supported by MapR stream.
>>>>>>
>>>>>>
>>>>>> Regards,
>>>>>> Siyuan
>>>>>>
>>>>>> On Fri, Oct 7, 2016 at 9:01 AM, Jaspal Singh <
>>>>>> jaspal.singh1...@gmail.com> wrote:
>>>>>>
>>>>>>> Thomas,
>>>>>>>
>>>>>>> Below is the operator implementation we are trying to run. This
>>>>>>> operator is getting an object of tenant class from updtream operator.
>>>>>>>
>>>>>>> public class KafkaSinglePortExactlyOnceOutputOperator extends 
>>>>>>> AbstractKafkaOutputOperator {
>>>>>>>
>>>>>>>     private static final Logger LOG = 
>>>>>>> LoggerFactory.getLogger(KafkaSinglePortExactlyOnceOutputOperator.class);
>>>>>>>
>>>>>>>     public transient final DefaultInputPort<Tenant> in = new 
>>>>>>> DefaultInputPort<Tenant>() {
>>>>>>>
>>>>>>>         Gson gson = new Gson();
>>>>>>>
>>>>>>>         @Override
>>>>>>>         public void process(Tenant tenant) {
>>>>>>>
>>>>>>>             try {
>>>>>>>                 Producer<String, String> producer = getKafkaProducer();
>>>>>>>                 //ObjectMapper mapper = new ObjectMapper();
>>>>>>>                 long now = System.currentTimeMillis();
>>>>>>>                 //Configuration conf = HBaseConfiguration.create();
>>>>>>>                 //TenantDao dao = new TenantDao(conf);
>>>>>>>                 //ArrayList<Put> puts = new ArrayList<>();
>>>>>>>                 if (tenant != null) {
>>>>>>>                     //Tenant tenant = tenant.next();
>>>>>>>                     if (StringUtils.isNotEmpty(tenant.getGl())) {
>>>>>>>                         producer.send(new ProducerRecord<String, 
>>>>>>> String>("/datalake/corporate/ses_dlpoc/stream_fabbddev/jsingh73/sample-stream:certUpdate",
>>>>>>>  tenant.getVolumeName(), gson.toJson(tenant)));
>>>>>>>                         //puts.add(dao.mkPut(tenant));
>>>>>>>                     } else {
>>>>>>>                         producer.send(new ProducerRecord<String, 
>>>>>>> String>("/datalake/corporate/ses_dlpoc/stream_fabbddev/jsingh73/sample-stream:error",
>>>>>>>  tenant.getVolumeName(), gson.toJson(tenant)));
>>>>>>>
>>>>>>>                     }
>>>>>>>                     producer.flush();
>>>>>>>                 }
>>>>>>>             }
>>>>>>>
>>>>>>>
>>>>>>> After building the application, it throws error during launch:
>>>>>>>
>>>>>>> An error occurred trying to launch the application. Server message:
>>>>>>> java.lang.NoClassDefFoundError: Lkafka/javaapi/producer/Producer; at
>>>>>>> java.lang.Class.getDeclaredFields0(Native Method) at
>>>>>>> java.lang.Class.privateGetDeclaredFields(Class.java:2583) at
>>>>>>> java.lang.Class.getDeclaredFields(Class.java:1916) at
>>>>>>>
>>>>>>>
>>>>>>> Thanks
>>>>>>> Jaspal
>>>>>>>
>>>>>>> On Fri, Oct 7, 2016 at 10:42 AM, Jaspal Singh <
>>>>>>> jaspal.singh1...@gmail.com> wrote:
>>>>>>>
>>>>>>>> Thomas,
>>>>>>>>
>>>>>>>> I was trying to refer to the input from previous operator.
>>>>>>>>
>>>>>>>> Another thing when we extend the AbstractKafkaOutputOperator, do we
>>>>>>>> need to specify <String, T> ? Since we are getting an object of class 
>>>>>>>> type
>>>>>>>> from previous operator.
>>>>>>>>
>>>>>>>>
>>>>>>>> Thanks
>>>>>>>> Jaspal
>>>>>>>>
>>>>>>>> On Fri, Oct 7, 2016 at 10:12 AM, Thomas Weise <t...@apache.org>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Are you referring to the upstream operator in the DAG or the state
>>>>>>>>> of the previous application after relaunch? Since the data is stored 
>>>>>>>>> in
>>>>>>>>> MapR streams, an operator that is a producer can also act as a 
>>>>>>>>> consumer.
>>>>>>>>> Please clarify your question.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Fri, Oct 7, 2016 at 7:59 AM, Jaspal Singh <
>>>>>>>>> jaspal.singh1...@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> Hi Thomas,
>>>>>>>>>>
>>>>>>>>>> I have a question, so when we are using
>>>>>>>>>> *KafkaSinglePortExactlyOnceOutputOperator* to write results into
>>>>>>>>>> maprstream topic will it be able to read messgaes from the previous
>>>>>>>>>> operator ?
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Thanks
>>>>>>>>>> Jaspal
>>>>>>>>>>
>>>>>>>>>> On Thu, Oct 6, 2016 at 6:28 PM, Thomas Weise <t...@apache.org>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> For recovery you need to set the window data manager like so:
>>>>>>>>>>>
>>>>>>>>>>> https://github.com/DataTorrent/examples/blob/master/tutorial
>>>>>>>>>>> s/exactly-once/src/main/java/com/example/myapexapp/Applicati
>>>>>>>>>>> on.java#L33
>>>>>>>>>>>
>>>>>>>>>>> That will also apply to stateful restart of the entire
>>>>>>>>>>> application (relaunch from previous instance's checkpointed state).
>>>>>>>>>>>
>>>>>>>>>>> For cold restart, you would need to consider the property you
>>>>>>>>>>> mention and decide what is applicable to your use case.
>>>>>>>>>>>
>>>>>>>>>>> Thomas
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Thu, Oct 6, 2016 at 4:16 PM, Jaspal Singh <
>>>>>>>>>>> jaspal.singh1...@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Ok now I get it. Thanks for the nice explaination !!
>>>>>>>>>>>>
>>>>>>>>>>>> One more thing, so you mentioned about checkpointing the offset
>>>>>>>>>>>> ranges to replay in same order from kafka.
>>>>>>>>>>>>
>>>>>>>>>>>> Is there any property we need to configure to do that? like
>>>>>>>>>>>> initialOffset set to APPLICATION_OR_LATEST.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks
>>>>>>>>>>>> Jaspal
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Thursday, October 6, 2016, Thomas Weise <
>>>>>>>>>>>> thomas.we...@gmail.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> What you want is the effect of exactly-once output (that's why
>>>>>>>>>>>>> we call it also end-to-end exactly-once). There is no such thing 
>>>>>>>>>>>>> as
>>>>>>>>>>>>> exactly-once processing in a distributed system. In this case it 
>>>>>>>>>>>>> would be
>>>>>>>>>>>>> rather "produce exactly-once. Upstream operators, on failure, 
>>>>>>>>>>>>> will recover
>>>>>>>>>>>>> to checkpointed state and re-process the stream from there. This 
>>>>>>>>>>>>> is
>>>>>>>>>>>>> at-least-once, the default behavior. Because in the input 
>>>>>>>>>>>>> operator you have
>>>>>>>>>>>>> configured to replay in the same order from Kafka (this is done by
>>>>>>>>>>>>> checkpointing the offset ranges), the computation in the DAG is 
>>>>>>>>>>>>> idempotent
>>>>>>>>>>>>> and the output operator can discard the results that were already 
>>>>>>>>>>>>> published
>>>>>>>>>>>>> instead of producing duplicates.
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Thu, Oct 6, 2016 at 3:57 PM, Jaspal Singh <
>>>>>>>>>>>>> jaspal.singh1...@gmail.com> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> I think this is something called a customized operator
>>>>>>>>>>>>>> implementation that is taking care of exactly once processing at 
>>>>>>>>>>>>>> output.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> What if any previous operators fail ? How we can make sure
>>>>>>>>>>>>>> they also recover using EXACTLY_ONCE processing mode ?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>>> Jaspal
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Thursday, October 6, 2016, Thomas Weise <
>>>>>>>>>>>>>> thomas.we...@gmail.com> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> In that case please have a look at:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> https://github.com/apache/apex-malhar/blob/master/kafka/src/
>>>>>>>>>>>>>>> main/java/org/apache/apex/malhar/kafka/KafkaSinglePortExactl
>>>>>>>>>>>>>>> yOnceOutputOperator.java
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> The operator will ensure that messages are not duplicated,
>>>>>>>>>>>>>>> under the stated assumptions.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Thu, Oct 6, 2016 at 3:37 PM, Jaspal Singh <
>>>>>>>>>>>>>>> jaspal.singh1...@gmail.com> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Hi Thomas,
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> In our case we are writing the results back to
>>>>>>>>>>>>>>>> maprstreams topic based on some validations.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>>>>> Jaspal
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Thursday, October 6, 2016, Thomas Weise <t...@apache.org>
>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> which operators in your application are writing to
>>>>>>>>>>>>>>>>> external systems?
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> When you look at the example from the blog (
>>>>>>>>>>>>>>>>> https://github.com/DataTorren
>>>>>>>>>>>>>>>>> t/examples/tree/master/tutorials/exactly-once), there is
>>>>>>>>>>>>>>>>> Kafka input, which is configured to be idempotent. The 
>>>>>>>>>>>>>>>>> results are written
>>>>>>>>>>>>>>>>> to JDBC. That operator by itself supports exactly-once 
>>>>>>>>>>>>>>>>> through transactions
>>>>>>>>>>>>>>>>> (in conjunction with idempotent input), hence there is no 
>>>>>>>>>>>>>>>>> need to configure
>>>>>>>>>>>>>>>>> the processing mode at all.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Thomas
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to