Thanks Ram. I changed the class to following but it's still not working.
Not seeing anything useful in the logs. The app is not even deploying
successfully.


package com.example.myapexapp;

import org.apache.apex.malhar.kafka.AbstractKafkaInputOperator;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.codehaus.jackson.map.ObjectMapper;

import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.cassandra.TestUser;

public class KafkaTestUserPortStringInputOperator extends
AbstractKafkaInputOperator {

    public final transient DefaultOutputPort<TestUser> outputPort = new
DefaultOutputPort<>();

    private final ObjectMapper mapper = new ObjectMapper();

    private TestUser toUser(ConsumerRecord<byte[], byte[]> message) {
        TestUser user = null;
        try {
            user = mapper.readValue(message.value(), TestUser.class);
        } catch (Exception ex) {
            return user;
        }
        return user;
    }

    @Override
    protected void emitTuple(String cluster, ConsumerRecord<byte[], byte[]>
message) {
        outputPort.emit(toUser(message));
    }

}


On Thu, Dec 1, 2016 at 10:53 AM, Munagala Ramanath <r...@datatorrent.com>
wrote:

> Max,
>
> The classes under *contrib/src/main/java/com/datatorrent/contrib/kafka*
> use the old 0.8 Kafka API
> whereas those under *kafka/src/main/java/org/apache/apex/malhar/kafka*
> use the new 0.9 API.
> *KafkaSinglePortInputOperator* (used in your working version) is in the
> latter whereas
> *AbstractKafkaSinglePortInputOperator* (extended by
> *KafkaTestUserPortStringInputOperator* in your
> failing version) is in the former.
>
> Can you try extending one of the classes from the 0.9 API package ?
>
> Ram
>
> On Thu, Dec 1, 2016 at 7:27 AM, Max Bridgewater <max.bridgewa...@gmail.com
> > wrote:
>
>> I am attaching my whole project if that can help figure out what I am
>> missing.
>>
>> On Thu, Dec 1, 2016 at 7:41 AM, Max Bridgewater <
>> max.bridgewa...@gmail.com> wrote:
>>
>>> I'm really struggling to get this working. I am extending
>>> AbstractKafkaSinglePortInputOperator to return a POJO. I read stuff
>>> from Kafka and write to HDFS, pretty basic stuff.
>>>
>>> The following example works fine:
>>>
>>>     KafkaSinglePortInputOperator in = dag.addOperator("kafkaIn", new
>>> KafkaSinglePortInputOperator());
>>>     in.setInitialOffset(AbstractKafkaInputOperator.InitialOffset
>>> .EARLIEST.name());
>>>     LineOutputOperator out = dag.addOperator("fileOut", new
>>> LineOutputOperator());
>>>     dag.addStream("data", in.outputPort, out.input);
>>>
>>>
>>> But, when I use my own KafkaTestUserPortStringInputOperator, nothing
>>> works. The first thing that sounds weird is that my
>>> KafkaTestUserPortStringInputOperator requires a consumer.zookeeper
>>> property while the default KafkaSinglePortInputOperator doesn't. Any reason
>>> why?
>>>
>>>
>>> Here is my KafkaTestUserPortStringInputOperator:
>>>
>>> public class KafkaTestUserPortStringInputOperator extends
>>> AbstractKafkaSinglePortInputOperator<TestUser>
>>> {
>>>
>>>     private final ObjectMapper mapper = new ObjectMapper();
>>>      private static final Logger logger = LoggerFactory.getLogger(KafkaT
>>> estUserPortStringInputOperator.class);
>>>
>>>   @Override
>>>   public TestUser getTuple(Message message)
>>>   {
>>>     TestUser user =null;
>>>     try {
>>>       ByteBuffer buffer = message.payload();
>>>       byte[] bytes = new byte[buffer.remaining()];
>>>       buffer.get(bytes);
>>>       user=mapper.readValue(bytes, TestUser.class);
>>>       logger.debug("Consuming {}", new String(bytes));
>>>     }
>>>     catch (Exception ex) {
>>>       return user;
>>>     }
>>>     return user;
>>>   }
>>>
>>> }
>>>
>>> And LineOutputOperator is as follows:
>>>
>>> public class LineOutputOperator extends AbstractFileOutputOperator<Obj
>>> ect>
>>> {
>>>
>>>   @NotNull
>>>   private String baseName;
>>>
>>>   @Override
>>>   public byte[] getBytesForTuple(Object t) {
>>>     String result = t.toString() + System.lineSeparator();
>>>     return result.getBytes(StandardCharsets.UTF_8);
>>>  }
>>>
>>>   @Override
>>>   protected String getFileName(Object t) {
>>>     return baseName;
>>>   }
>>>
>>>   public String getBaseName() { return baseName; }
>>>   public void setBaseName(String v) { baseName = v; }
>>> }
>>>
>>>
>>> And my TestUser:
>>>
>>> public class TestUser
>>> {
>>>   @Bind(JavaSerializer.class)
>>>   UUID id;
>>>   String firstName;
>>>   String lastName;
>>>   String city;
>>>
>>>   public TestUser()
>>>   {
>>>
>>>   }
>>>
>>>   public TestUser(UUID id, String firstName, String lastName, String
>>> city)
>>>   {
>>>     this.id = id;
>>>     this.firstName = firstName;
>>>     this.lastName = lastName;
>>>     this.city = city;
>>>   }
>>>
>>>   public UUID getId()
>>>   {
>>>     return id;
>>>   }
>>>
>>>   public void setId(UUID id)
>>>   {
>>>     this.id = id;
>>>   }
>>>
>>>   public String getFirstName()
>>>   {
>>>     return firstName;
>>>   }
>>>
>>>   public void setFirstName(String firstName)
>>>   {
>>>     this.firstName = firstName;
>>>   }
>>>
>>>   public String getLastName()
>>>   {
>>>     return lastName;
>>>   }
>>>
>>>   public void setLastName(String lastName)
>>>   {
>>>     this.lastName = lastName;
>>>   }
>>>
>>>   public String getCity()
>>>   {
>>>     return city;
>>>   }
>>>
>>>   public void setCity(String city)
>>>   {
>>>     this.city = city;
>>>   }
>>>
>>>   @Override
>>>   public int hashCode()
>>>   {
>>>     final int prime = 31;
>>>     int result = 1;
>>>     result = prime * result + ((city == null) ? 0 : city.hashCode());
>>>     result = prime * result + ((firstName == null) ? 0 :
>>> firstName.hashCode());
>>>     result = prime * result + ((id == null) ? 0 : id.hashCode());
>>>     result = prime * result + ((lastName == null) ? 0 :
>>> lastName.hashCode());
>>>     return result;
>>>   }
>>> }
>>>
>>>
>>>
>>>
>>>
>>> On Thu, Dec 1, 2016 at 2:02 AM, Chaitanya Chebolu <
>>> chaita...@datatorrent.com> wrote:
>>>
>>>> Hi Max,
>>>>
>>>>  This is the warning. I think no need to worry about this.
>>>>  Did you see any other exception in logs or Please share the
>>>> application master log.
>>>>
>>>> Regards,
>>>> Chaitanya
>>>>
>>>>
>>>> On Thu, Dec 1, 2016 at 8:03 AM, Max Bridgewater <
>>>> max.bridgewa...@gmail.com> wrote:
>>>>
>>>>> I am getting the same error described here:
>>>>> https://www.mail-archive.com/users@apex.apache.org/msg00533.html
>>>>>
>>>>> I am running Mahlar 3.5.0 with Apex 3.5.0-DT. First I got the
>>>>> zookeeper error. Then I added consumer.zookeeper. Now I am getting:
>>>>>
>>>>> java.io.FileNotFoundException: File does not exist:
>>>>> /user/dtadmin/datatorrent/apps/application_1480549373717_000
>>>>> 6/events/index.txt
>>>>>
>>>>> Anybody knows what the solution is?
>>>>>
>>>>> Thanks,
>>>>> Max.
>>>>>
>>>>
>>>>
>>>
>>
>

Reply via email to