Siyuan,
So for the output operator, we have specified it as a part of our logic
itself.
public class KafkaSinglePortExactlyOnceOutputOperator<T> extends
AbstractKafkaOutputOperator {
private static final Logger LOG =
LoggerFactory.getLogger(KafkaSinglePortExactlyOnceOutputOperator.class);
public transient final DefaultInputPort<Tenant> in = new
DefaultInputPort<Tenant>() {
Gson gson = new Gson();
@Override
public void process(Tenant tenant) {
try {
Producer<String, String> producer = getKafkaProducer();
//ObjectMapper mapper = new ObjectMapper();
long now = System.currentTimeMillis();
//Configuration conf = HBaseConfiguration.create();
//TenantDao dao = new TenantDao(conf);
//ArrayList<Put> puts = new ArrayList<>();
if (tenant != null) {
//Tenant tenant = tenant.next();
if (StringUtils.isNotEmpty(tenant.getGl())) {
producer.send(new ProducerRecord<String,
String>("/datalake/corporate/ses_dlpoc/stream_fabbddev/jsingh73/sample-stream:certUpdate",
tenant.getVolumeName(), gson.toJson(tenant)));
//puts.add(dao.mkPut(tenant));
} else {
producer.send(new ProducerRecord<String,
String>("/datalake/corporate/ses_dlpoc/stream_fabbddev/jsingh73/sample-stream:error",
tenant.getVolumeName(), gson.toJson(tenant)));
}
producer.flush();
}
}
Thanks!!
On Fri, Oct 7, 2016 at 1:34 PM, [email protected] <[email protected]> wrote:
> Jaspal,
>
> I think you miss the kafkaOut :)
>
> Regards,
> Siyuan
>
> On Fri, Oct 7, 2016 at 11:32 AM, Jaspal Singh <[email protected]>
> wrote:
>
>> Siyuan,
>>
>> That's how we have given it in properties file:
>>
>> [image: Inline image 1]
>>
>>
>> Thanks!!
>>
>> On Fri, Oct 7, 2016 at 1:27 PM, [email protected] <[email protected]>
>> wrote:
>>
>>> Jaspal,
>>>
>>> Topic is a mandatory property you have to set. In mapr, the value should
>>> be set to the full stream path example: /your/stream/path:streamname
>>>
>>> Regards,
>>> Siyuan
>>>
>>> On Fri, Oct 7, 2016 at 11:22 AM, Jaspal Singh <
>>> [email protected]> wrote:
>>>
>>>> After making the change, we are getting the below error while
>>>> application launch:
>>>>
>>>> *An error occurred trying to launch the application. Server message:
>>>> javax.validation.ConstraintViolationException: Operator kafkaOut violates
>>>> constraints
>>>> [ConstraintViolationImpl{rootBean=com.example.datatorrent.KafkaSinglePortExactlyOnceOutputOperator@4726f93f,
>>>> propertyPath='topic', message='may not be null', *
>>>>
>>>>
>>>>
>>>> Thanks!!
>>>>
>>>> On Fri, Oct 7, 2016 at 1:13 PM, Jaspal Singh <
>>>> [email protected]> wrote:
>>>>
>>>>> So I just changes the malhar-kafka version to 3.5.0, I was able to
>>>>> import the AbstractOutputOperator. Let me try to launch it now.
>>>>>
>>>>> Thanks for your inputs !!
>>>>>
>>>>>
>>>>>
>>>>> On Fri, Oct 7, 2016 at 1:09 PM, Jaspal Singh <
>>>>> [email protected]> wrote:
>>>>>
>>>>>> Should we use malhar-library version 3.5 then ?
>>>>>>
>>>>>>
>>>>>> Thanks!!
>>>>>>
>>>>>> On Fri, Oct 7, 2016 at 1:04 PM, Thomas Weise <[email protected]> wrote:
>>>>>>
>>>>>>> Please make sure to depend on version 3.5 of malhar-kafka in
>>>>>>> pom.xml. This operator is not in malhar-library, it's a separate module.
>>>>>>>
>>>>>>>
>>>>>>> On Fri, Oct 7, 2016 at 11:01 AM, Jaspal Singh <
>>>>>>> [email protected]> wrote:
>>>>>>>
>>>>>>>> Hi Siyuan,
>>>>>>>>
>>>>>>>> I am using the same Kafka producer as you mentioned. But I am not
>>>>>>>> seeing the AbstractKafkaOutputOperator in malhar library while import.
>>>>>>>>
>>>>>>>>
>>>>>>>> Thanks!!
>>>>>>>>
>>>>>>>> On Fri, Oct 7, 2016 at 12:52 PM, [email protected] <[email protected]
>>>>>>>> > wrote:
>>>>>>>>
>>>>>>>>> Also which kafka output operator you are using?
>>>>>>>>> Please use org.apache.apex.malhar.kafka.AbstractOutputOperator
>>>>>>>>> instead of com.datatorrent.contrib.kafka.AbstractOutputOperator.
>>>>>>>>> Only the org.apache.apex.malhar.kafka.AbstractOutputOperator
>>>>>>>>> works with MapR stream, the latter only works with kafka 0.8.* or 0.9
>>>>>>>>>
>>>>>>>>> Regards,
>>>>>>>>> Siyuan
>>>>>>>>>
>>>>>>>>> On Fri, Oct 7, 2016 at 10:38 AM, [email protected] <
>>>>>>>>> [email protected]> wrote:
>>>>>>>>>
>>>>>>>>>> Hey Jaspal,
>>>>>>>>>>
>>>>>>>>>> Did you add any code to existing
>>>>>>>>>> KafkaSinglePortExactlyOnceOutputOperator
>>>>>>>>>> from malhar? If so please make sure the producer you use here is
>>>>>>>>>> org.apache.kafka.clients.producer.KafkaProducer instead of
>>>>>>>>>> kafka.javaapi.producer.Producer. That is old api and that is
>>>>>>>>>> not supported by MapR stream.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Regards,
>>>>>>>>>> Siyuan
>>>>>>>>>>
>>>>>>>>>> On Fri, Oct 7, 2016 at 9:01 AM, Jaspal Singh <
>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>
>>>>>>>>>>> Thomas,
>>>>>>>>>>>
>>>>>>>>>>> Below is the operator implementation we are trying to run. This
>>>>>>>>>>> operator is getting an object of tenant class from updtream
>>>>>>>>>>> operator.
>>>>>>>>>>>
>>>>>>>>>>> public class KafkaSinglePortExactlyOnceOutputOperator extends
>>>>>>>>>>> AbstractKafkaOutputOperator {
>>>>>>>>>>>
>>>>>>>>>>> private static final Logger LOG =
>>>>>>>>>>> LoggerFactory.getLogger(KafkaSinglePortExactlyOnceOutputOperator.class);
>>>>>>>>>>>
>>>>>>>>>>> public transient final DefaultInputPort<Tenant> in = new
>>>>>>>>>>> DefaultInputPort<Tenant>() {
>>>>>>>>>>>
>>>>>>>>>>> Gson gson = new Gson();
>>>>>>>>>>>
>>>>>>>>>>> @Override
>>>>>>>>>>> public void process(Tenant tenant) {
>>>>>>>>>>>
>>>>>>>>>>> try {
>>>>>>>>>>> Producer<String, String> producer =
>>>>>>>>>>> getKafkaProducer();
>>>>>>>>>>> //ObjectMapper mapper = new ObjectMapper();
>>>>>>>>>>> long now = System.currentTimeMillis();
>>>>>>>>>>> //Configuration conf = HBaseConfiguration.create();
>>>>>>>>>>> //TenantDao dao = new TenantDao(conf);
>>>>>>>>>>> //ArrayList<Put> puts = new ArrayList<>();
>>>>>>>>>>> if (tenant != null) {
>>>>>>>>>>> //Tenant tenant = tenant.next();
>>>>>>>>>>> if (StringUtils.isNotEmpty(tenant.getGl())) {
>>>>>>>>>>> producer.send(new ProducerRecord<String,
>>>>>>>>>>> String>("/datalake/corporate/ses_dlpoc/stream_fabbddev/jsingh73/sample-stream:certUpdate",
>>>>>>>>>>> tenant.getVolumeName(), gson.toJson(tenant)));
>>>>>>>>>>> //puts.add(dao.mkPut(tenant));
>>>>>>>>>>> } else {
>>>>>>>>>>> producer.send(new ProducerRecord<String,
>>>>>>>>>>> String>("/datalake/corporate/ses_dlpoc/stream_fabbddev/jsingh73/sample-stream:error",
>>>>>>>>>>> tenant.getVolumeName(), gson.toJson(tenant)));
>>>>>>>>>>>
>>>>>>>>>>> }
>>>>>>>>>>> producer.flush();
>>>>>>>>>>> }
>>>>>>>>>>> }
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> After building the application, it throws error during launch:
>>>>>>>>>>>
>>>>>>>>>>> An error occurred trying to launch the application. Server
>>>>>>>>>>> message: java.lang.NoClassDefFoundError:
>>>>>>>>>>> Lkafka/javaapi/producer/Producer; at
>>>>>>>>>>> java.lang.Class.getDeclaredFields0(Native Method) at
>>>>>>>>>>> java.lang.Class.privateGetDeclaredFields(Class.java:2583) at
>>>>>>>>>>> java.lang.Class.getDeclaredFields(Class.java:1916) at
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Thanks
>>>>>>>>>>> Jaspal
>>>>>>>>>>>
>>>>>>>>>>> On Fri, Oct 7, 2016 at 10:42 AM, Jaspal Singh <
>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Thomas,
>>>>>>>>>>>>
>>>>>>>>>>>> I was trying to refer to the input from previous operator.
>>>>>>>>>>>>
>>>>>>>>>>>> Another thing when we extend the AbstractKafkaOutputOperator,
>>>>>>>>>>>> do we need to specify <String, T> ? Since we are getting an object
>>>>>>>>>>>> of class
>>>>>>>>>>>> type from previous operator.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks
>>>>>>>>>>>> Jaspal
>>>>>>>>>>>>
>>>>>>>>>>>> On Fri, Oct 7, 2016 at 10:12 AM, Thomas Weise <[email protected]>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Are you referring to the upstream operator in the DAG or the
>>>>>>>>>>>>> state of the previous application after relaunch? Since the data
>>>>>>>>>>>>> is stored
>>>>>>>>>>>>> in MapR streams, an operator that is a producer can also act as a
>>>>>>>>>>>>> consumer.
>>>>>>>>>>>>> Please clarify your question.
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Fri, Oct 7, 2016 at 7:59 AM, Jaspal Singh <
>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hi Thomas,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I have a question, so when we are using
>>>>>>>>>>>>>> *KafkaSinglePortExactlyOnceOutputOperator* to write results
>>>>>>>>>>>>>> into maprstream topic will it be able to read messgaes from the
>>>>>>>>>>>>>> previous
>>>>>>>>>>>>>> operator ?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>>> Jaspal
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Thu, Oct 6, 2016 at 6:28 PM, Thomas Weise <[email protected]>
>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> For recovery you need to set the window data manager like so:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> https://github.com/DataTorrent/examples/blob/master/tutorial
>>>>>>>>>>>>>>> s/exactly-once/src/main/java/com/example/myapexapp/Applicati
>>>>>>>>>>>>>>> on.java#L33
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> That will also apply to stateful restart of the entire
>>>>>>>>>>>>>>> application (relaunch from previous instance's checkpointed
>>>>>>>>>>>>>>> state).
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> For cold restart, you would need to consider the property
>>>>>>>>>>>>>>> you mention and decide what is applicable to your use case.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Thomas
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Thu, Oct 6, 2016 at 4:16 PM, Jaspal Singh <
>>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Ok now I get it. Thanks for the nice explaination !!
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> One more thing, so you mentioned about checkpointing the
>>>>>>>>>>>>>>>> offset ranges to replay in same order from kafka.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Is there any property we need to configure to do that? like
>>>>>>>>>>>>>>>> initialOffset set to APPLICATION_OR_LATEST.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>>>>> Jaspal
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Thursday, October 6, 2016, Thomas Weise <
>>>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> What you want is the effect of exactly-once output (that's
>>>>>>>>>>>>>>>>> why we call it also end-to-end exactly-once). There is no
>>>>>>>>>>>>>>>>> such thing as
>>>>>>>>>>>>>>>>> exactly-once processing in a distributed system. In this case
>>>>>>>>>>>>>>>>> it would be
>>>>>>>>>>>>>>>>> rather "produce exactly-once. Upstream operators, on failure,
>>>>>>>>>>>>>>>>> will recover
>>>>>>>>>>>>>>>>> to checkpointed state and re-process the stream from there.
>>>>>>>>>>>>>>>>> This is
>>>>>>>>>>>>>>>>> at-least-once, the default behavior. Because in the input
>>>>>>>>>>>>>>>>> operator you have
>>>>>>>>>>>>>>>>> configured to replay in the same order from Kafka (this is
>>>>>>>>>>>>>>>>> done by
>>>>>>>>>>>>>>>>> checkpointing the offset ranges), the computation in the DAG
>>>>>>>>>>>>>>>>> is idempotent
>>>>>>>>>>>>>>>>> and the output operator can discard the results that were
>>>>>>>>>>>>>>>>> already published
>>>>>>>>>>>>>>>>> instead of producing duplicates.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> On Thu, Oct 6, 2016 at 3:57 PM, Jaspal Singh <
>>>>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> I think this is something called a customized operator
>>>>>>>>>>>>>>>>>> implementation that is taking care of exactly once
>>>>>>>>>>>>>>>>>> processing at output.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> What if any previous operators fail ? How we can make
>>>>>>>>>>>>>>>>>> sure they also recover using EXACTLY_ONCE processing mode ?
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>>>>>>> Jaspal
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> On Thursday, October 6, 2016, Thomas Weise <
>>>>>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> In that case please have a look at:
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> https://github.com/apache/apex
>>>>>>>>>>>>>>>>>>> -malhar/blob/master/kafka/src/
>>>>>>>>>>>>>>>>>>> main/java/org/apache/apex/malh
>>>>>>>>>>>>>>>>>>> ar/kafka/KafkaSinglePortExactlyOnceOutputOperator.java
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> The operator will ensure that messages are not
>>>>>>>>>>>>>>>>>>> duplicated, under the stated assumptions.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> On Thu, Oct 6, 2016 at 3:37 PM, Jaspal Singh <
>>>>>>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Hi Thomas,
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> In our case we are writing the results back to
>>>>>>>>>>>>>>>>>>>> maprstreams topic based on some validations.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>>>>>>>>> Jaspal
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> On Thursday, October 6, 2016, Thomas Weise <
>>>>>>>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> which operators in your application are writing to
>>>>>>>>>>>>>>>>>>>>> external systems?
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> When you look at the example from the blog (
>>>>>>>>>>>>>>>>>>>>> https://github.com/DataTorren
>>>>>>>>>>>>>>>>>>>>> t/examples/tree/master/tutorials/exactly-once), there
>>>>>>>>>>>>>>>>>>>>> is Kafka input, which is configured to be idempotent. The
>>>>>>>>>>>>>>>>>>>>> results are
>>>>>>>>>>>>>>>>>>>>> written to JDBC. That operator by itself supports
>>>>>>>>>>>>>>>>>>>>> exactly-once through
>>>>>>>>>>>>>>>>>>>>> transactions (in conjunction with idempotent input),
>>>>>>>>>>>>>>>>>>>>> hence there is no need
>>>>>>>>>>>>>>>>>>>>> to configure the processing mode at all.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Thomas
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>