After making the change, we are getting the below error while application launch:
*An error occurred trying to launch the application. Server message: javax.validation.ConstraintViolationException: Operator kafkaOut violates constraints [ConstraintViolationImpl{rootBean=com.example.datatorrent.KafkaSinglePortExactlyOnceOutputOperator@4726f93f, propertyPath='topic', message='may not be null', * Thanks!! On Fri, Oct 7, 2016 at 1:13 PM, Jaspal Singh <jaspal.singh1...@gmail.com> wrote: > So I just changes the malhar-kafka version to 3.5.0, I was able to import > the AbstractOutputOperator. Let me try to launch it now. > > Thanks for your inputs !! > > > > On Fri, Oct 7, 2016 at 1:09 PM, Jaspal Singh <jaspal.singh1...@gmail.com> > wrote: > >> Should we use malhar-library version 3.5 then ? >> >> >> Thanks!! >> >> On Fri, Oct 7, 2016 at 1:04 PM, Thomas Weise <t...@apache.org> wrote: >> >>> Please make sure to depend on version 3.5 of malhar-kafka in pom.xml. >>> This operator is not in malhar-library, it's a separate module. >>> >>> >>> On Fri, Oct 7, 2016 at 11:01 AM, Jaspal Singh < >>> jaspal.singh1...@gmail.com> wrote: >>> >>>> Hi Siyuan, >>>> >>>> I am using the same Kafka producer as you mentioned. But I am not >>>> seeing the AbstractKafkaOutputOperator in malhar library while import. >>>> >>>> >>>> Thanks!! >>>> >>>> On Fri, Oct 7, 2016 at 12:52 PM, hsy...@gmail.com <hsy...@gmail.com> >>>> wrote: >>>> >>>>> Also which kafka output operator you are using? >>>>> Please use org.apache.apex.malhar.kafka.AbstractOutputOperator >>>>> instead of com.datatorrent.contrib.kafka.AbstractOutputOperator. >>>>> Only the org.apache.apex.malhar.kafka.AbstractOutputOperator works >>>>> with MapR stream, the latter only works with kafka 0.8.* or 0.9 >>>>> >>>>> Regards, >>>>> Siyuan >>>>> >>>>> On Fri, Oct 7, 2016 at 10:38 AM, hsy...@gmail.com <hsy...@gmail.com> >>>>> wrote: >>>>> >>>>>> Hey Jaspal, >>>>>> >>>>>> Did you add any code to existing KafkaSinglePortExactlyOnceOutputOperator >>>>>> from malhar? If so please make sure the producer you use here is >>>>>> org.apache.kafka.clients.producer.KafkaProducer instead of >>>>>> kafka.javaapi.producer.Producer. That is old api and that is not >>>>>> supported by MapR stream. >>>>>> >>>>>> >>>>>> Regards, >>>>>> Siyuan >>>>>> >>>>>> On Fri, Oct 7, 2016 at 9:01 AM, Jaspal Singh < >>>>>> jaspal.singh1...@gmail.com> wrote: >>>>>> >>>>>>> Thomas, >>>>>>> >>>>>>> Below is the operator implementation we are trying to run. This >>>>>>> operator is getting an object of tenant class from updtream operator. >>>>>>> >>>>>>> public class KafkaSinglePortExactlyOnceOutputOperator extends >>>>>>> AbstractKafkaOutputOperator { >>>>>>> >>>>>>> private static final Logger LOG = >>>>>>> LoggerFactory.getLogger(KafkaSinglePortExactlyOnceOutputOperator.class); >>>>>>> >>>>>>> public transient final DefaultInputPort<Tenant> in = new >>>>>>> DefaultInputPort<Tenant>() { >>>>>>> >>>>>>> Gson gson = new Gson(); >>>>>>> >>>>>>> @Override >>>>>>> public void process(Tenant tenant) { >>>>>>> >>>>>>> try { >>>>>>> Producer<String, String> producer = getKafkaProducer(); >>>>>>> //ObjectMapper mapper = new ObjectMapper(); >>>>>>> long now = System.currentTimeMillis(); >>>>>>> //Configuration conf = HBaseConfiguration.create(); >>>>>>> //TenantDao dao = new TenantDao(conf); >>>>>>> //ArrayList<Put> puts = new ArrayList<>(); >>>>>>> if (tenant != null) { >>>>>>> //Tenant tenant = tenant.next(); >>>>>>> if (StringUtils.isNotEmpty(tenant.getGl())) { >>>>>>> producer.send(new ProducerRecord<String, >>>>>>> String>("/datalake/corporate/ses_dlpoc/stream_fabbddev/jsingh73/sample-stream:certUpdate", >>>>>>> tenant.getVolumeName(), gson.toJson(tenant))); >>>>>>> //puts.add(dao.mkPut(tenant)); >>>>>>> } else { >>>>>>> producer.send(new ProducerRecord<String, >>>>>>> String>("/datalake/corporate/ses_dlpoc/stream_fabbddev/jsingh73/sample-stream:error", >>>>>>> tenant.getVolumeName(), gson.toJson(tenant))); >>>>>>> >>>>>>> } >>>>>>> producer.flush(); >>>>>>> } >>>>>>> } >>>>>>> >>>>>>> >>>>>>> After building the application, it throws error during launch: >>>>>>> >>>>>>> An error occurred trying to launch the application. Server message: >>>>>>> java.lang.NoClassDefFoundError: Lkafka/javaapi/producer/Producer; at >>>>>>> java.lang.Class.getDeclaredFields0(Native Method) at >>>>>>> java.lang.Class.privateGetDeclaredFields(Class.java:2583) at >>>>>>> java.lang.Class.getDeclaredFields(Class.java:1916) at >>>>>>> >>>>>>> >>>>>>> Thanks >>>>>>> Jaspal >>>>>>> >>>>>>> On Fri, Oct 7, 2016 at 10:42 AM, Jaspal Singh < >>>>>>> jaspal.singh1...@gmail.com> wrote: >>>>>>> >>>>>>>> Thomas, >>>>>>>> >>>>>>>> I was trying to refer to the input from previous operator. >>>>>>>> >>>>>>>> Another thing when we extend the AbstractKafkaOutputOperator, do we >>>>>>>> need to specify <String, T> ? Since we are getting an object of class >>>>>>>> type >>>>>>>> from previous operator. >>>>>>>> >>>>>>>> >>>>>>>> Thanks >>>>>>>> Jaspal >>>>>>>> >>>>>>>> On Fri, Oct 7, 2016 at 10:12 AM, Thomas Weise <t...@apache.org> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Are you referring to the upstream operator in the DAG or the state >>>>>>>>> of the previous application after relaunch? Since the data is stored >>>>>>>>> in >>>>>>>>> MapR streams, an operator that is a producer can also act as a >>>>>>>>> consumer. >>>>>>>>> Please clarify your question. >>>>>>>>> >>>>>>>>> >>>>>>>>> On Fri, Oct 7, 2016 at 7:59 AM, Jaspal Singh < >>>>>>>>> jaspal.singh1...@gmail.com> wrote: >>>>>>>>> >>>>>>>>>> Hi Thomas, >>>>>>>>>> >>>>>>>>>> I have a question, so when we are using >>>>>>>>>> *KafkaSinglePortExactlyOnceOutputOperator* to write results into >>>>>>>>>> maprstream topic will it be able to read messgaes from the previous >>>>>>>>>> operator ? >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> Thanks >>>>>>>>>> Jaspal >>>>>>>>>> >>>>>>>>>> On Thu, Oct 6, 2016 at 6:28 PM, Thomas Weise <t...@apache.org> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> For recovery you need to set the window data manager like so: >>>>>>>>>>> >>>>>>>>>>> https://github.com/DataTorrent/examples/blob/master/tutorial >>>>>>>>>>> s/exactly-once/src/main/java/com/example/myapexapp/Applicati >>>>>>>>>>> on.java#L33 >>>>>>>>>>> >>>>>>>>>>> That will also apply to stateful restart of the entire >>>>>>>>>>> application (relaunch from previous instance's checkpointed state). >>>>>>>>>>> >>>>>>>>>>> For cold restart, you would need to consider the property you >>>>>>>>>>> mention and decide what is applicable to your use case. >>>>>>>>>>> >>>>>>>>>>> Thomas >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On Thu, Oct 6, 2016 at 4:16 PM, Jaspal Singh < >>>>>>>>>>> jaspal.singh1...@gmail.com> wrote: >>>>>>>>>>> >>>>>>>>>>>> Ok now I get it. Thanks for the nice explaination !! >>>>>>>>>>>> >>>>>>>>>>>> One more thing, so you mentioned about checkpointing the offset >>>>>>>>>>>> ranges to replay in same order from kafka. >>>>>>>>>>>> >>>>>>>>>>>> Is there any property we need to configure to do that? like >>>>>>>>>>>> initialOffset set to APPLICATION_OR_LATEST. >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> Thanks >>>>>>>>>>>> Jaspal >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> On Thursday, October 6, 2016, Thomas Weise < >>>>>>>>>>>> thomas.we...@gmail.com> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> What you want is the effect of exactly-once output (that's why >>>>>>>>>>>>> we call it also end-to-end exactly-once). There is no such thing >>>>>>>>>>>>> as >>>>>>>>>>>>> exactly-once processing in a distributed system. In this case it >>>>>>>>>>>>> would be >>>>>>>>>>>>> rather "produce exactly-once. Upstream operators, on failure, >>>>>>>>>>>>> will recover >>>>>>>>>>>>> to checkpointed state and re-process the stream from there. This >>>>>>>>>>>>> is >>>>>>>>>>>>> at-least-once, the default behavior. Because in the input >>>>>>>>>>>>> operator you have >>>>>>>>>>>>> configured to replay in the same order from Kafka (this is done by >>>>>>>>>>>>> checkpointing the offset ranges), the computation in the DAG is >>>>>>>>>>>>> idempotent >>>>>>>>>>>>> and the output operator can discard the results that were already >>>>>>>>>>>>> published >>>>>>>>>>>>> instead of producing duplicates. >>>>>>>>>>>>> >>>>>>>>>>>>> On Thu, Oct 6, 2016 at 3:57 PM, Jaspal Singh < >>>>>>>>>>>>> jaspal.singh1...@gmail.com> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> I think this is something called a customized operator >>>>>>>>>>>>>> implementation that is taking care of exactly once processing at >>>>>>>>>>>>>> output. >>>>>>>>>>>>>> >>>>>>>>>>>>>> What if any previous operators fail ? How we can make sure >>>>>>>>>>>>>> they also recover using EXACTLY_ONCE processing mode ? >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> Thanks >>>>>>>>>>>>>> Jaspal >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Thursday, October 6, 2016, Thomas Weise < >>>>>>>>>>>>>> thomas.we...@gmail.com> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> In that case please have a look at: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> https://github.com/apache/apex-malhar/blob/master/kafka/src/ >>>>>>>>>>>>>>> main/java/org/apache/apex/malhar/kafka/KafkaSinglePortExactl >>>>>>>>>>>>>>> yOnceOutputOperator.java >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> The operator will ensure that messages are not duplicated, >>>>>>>>>>>>>>> under the stated assumptions. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On Thu, Oct 6, 2016 at 3:37 PM, Jaspal Singh < >>>>>>>>>>>>>>> jaspal.singh1...@gmail.com> wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Hi Thomas, >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> In our case we are writing the results back to >>>>>>>>>>>>>>>> maprstreams topic based on some validations. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Thanks >>>>>>>>>>>>>>>> Jaspal >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> On Thursday, October 6, 2016, Thomas Weise <t...@apache.org> >>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Hi, >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> which operators in your application are writing to >>>>>>>>>>>>>>>>> external systems? >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> When you look at the example from the blog ( >>>>>>>>>>>>>>>>> https://github.com/DataTorren >>>>>>>>>>>>>>>>> t/examples/tree/master/tutorials/exactly-once), there is >>>>>>>>>>>>>>>>> Kafka input, which is configured to be idempotent. The >>>>>>>>>>>>>>>>> results are written >>>>>>>>>>>>>>>>> to JDBC. That operator by itself supports exactly-once >>>>>>>>>>>>>>>>> through transactions >>>>>>>>>>>>>>>>> (in conjunction with idempotent input), hence there is no >>>>>>>>>>>>>>>>> need to configure >>>>>>>>>>>>>>>>> the processing mode at all. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Thomas >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >