Max,

The classes under *contrib/src/main/java/com/datatorrent/contrib/kafka* use
the old 0.8 Kafka API
whereas those under *kafka/src/main/java/org/apache/apex/malhar/kafka* use
the new 0.9 API.
*KafkaSinglePortInputOperator* (used in your working version) is in the
latter whereas
*AbstractKafkaSinglePortInputOperator* (extended by
*KafkaTestUserPortStringInputOperator* in your
failing version) is in the former.

Can you try extending one of the classes from the 0.9 API package ?

Ram

On Thu, Dec 1, 2016 at 7:27 AM, Max Bridgewater <max.bridgewa...@gmail.com>
wrote:

> I am attaching my whole project if that can help figure out what I am
> missing.
>
> On Thu, Dec 1, 2016 at 7:41 AM, Max Bridgewater <max.bridgewa...@gmail.com
> > wrote:
>
>> I'm really struggling to get this working. I am extending
>> AbstractKafkaSinglePortInputOperator to return a POJO. I read stuff from
>> Kafka and write to HDFS, pretty basic stuff.
>>
>> The following example works fine:
>>
>>     KafkaSinglePortInputOperator in = dag.addOperator("kafkaIn", new
>> KafkaSinglePortInputOperator());
>>     in.setInitialOffset(AbstractKafkaInputOperator.InitialOffset
>> .EARLIEST.name());
>>     LineOutputOperator out = dag.addOperator("fileOut", new
>> LineOutputOperator());
>>     dag.addStream("data", in.outputPort, out.input);
>>
>>
>> But, when I use my own KafkaTestUserPortStringInputOperator, nothing
>> works. The first thing that sounds weird is that my
>> KafkaTestUserPortStringInputOperator requires a consumer.zookeeper
>> property while the default KafkaSinglePortInputOperator doesn't. Any reason
>> why?
>>
>>
>> Here is my KafkaTestUserPortStringInputOperator:
>>
>> public class KafkaTestUserPortStringInputOperator extends
>> AbstractKafkaSinglePortInputOperator<TestUser>
>> {
>>
>>     private final ObjectMapper mapper = new ObjectMapper();
>>      private static final Logger logger = LoggerFactory.getLogger(KafkaT
>> estUserPortStringInputOperator.class);
>>
>>   @Override
>>   public TestUser getTuple(Message message)
>>   {
>>     TestUser user =null;
>>     try {
>>       ByteBuffer buffer = message.payload();
>>       byte[] bytes = new byte[buffer.remaining()];
>>       buffer.get(bytes);
>>       user=mapper.readValue(bytes, TestUser.class);
>>       logger.debug("Consuming {}", new String(bytes));
>>     }
>>     catch (Exception ex) {
>>       return user;
>>     }
>>     return user;
>>   }
>>
>> }
>>
>> And LineOutputOperator is as follows:
>>
>> public class LineOutputOperator extends AbstractFileOutputOperator<Obj
>> ect>
>> {
>>
>>   @NotNull
>>   private String baseName;
>>
>>   @Override
>>   public byte[] getBytesForTuple(Object t) {
>>     String result = t.toString() + System.lineSeparator();
>>     return result.getBytes(StandardCharsets.UTF_8);
>>  }
>>
>>   @Override
>>   protected String getFileName(Object t) {
>>     return baseName;
>>   }
>>
>>   public String getBaseName() { return baseName; }
>>   public void setBaseName(String v) { baseName = v; }
>> }
>>
>>
>> And my TestUser:
>>
>> public class TestUser
>> {
>>   @Bind(JavaSerializer.class)
>>   UUID id;
>>   String firstName;
>>   String lastName;
>>   String city;
>>
>>   public TestUser()
>>   {
>>
>>   }
>>
>>   public TestUser(UUID id, String firstName, String lastName, String city)
>>   {
>>     this.id = id;
>>     this.firstName = firstName;
>>     this.lastName = lastName;
>>     this.city = city;
>>   }
>>
>>   public UUID getId()
>>   {
>>     return id;
>>   }
>>
>>   public void setId(UUID id)
>>   {
>>     this.id = id;
>>   }
>>
>>   public String getFirstName()
>>   {
>>     return firstName;
>>   }
>>
>>   public void setFirstName(String firstName)
>>   {
>>     this.firstName = firstName;
>>   }
>>
>>   public String getLastName()
>>   {
>>     return lastName;
>>   }
>>
>>   public void setLastName(String lastName)
>>   {
>>     this.lastName = lastName;
>>   }
>>
>>   public String getCity()
>>   {
>>     return city;
>>   }
>>
>>   public void setCity(String city)
>>   {
>>     this.city = city;
>>   }
>>
>>   @Override
>>   public int hashCode()
>>   {
>>     final int prime = 31;
>>     int result = 1;
>>     result = prime * result + ((city == null) ? 0 : city.hashCode());
>>     result = prime * result + ((firstName == null) ? 0 :
>> firstName.hashCode());
>>     result = prime * result + ((id == null) ? 0 : id.hashCode());
>>     result = prime * result + ((lastName == null) ? 0 :
>> lastName.hashCode());
>>     return result;
>>   }
>> }
>>
>>
>>
>>
>>
>> On Thu, Dec 1, 2016 at 2:02 AM, Chaitanya Chebolu <
>> chaita...@datatorrent.com> wrote:
>>
>>> Hi Max,
>>>
>>>  This is the warning. I think no need to worry about this.
>>>  Did you see any other exception in logs or Please share the application
>>> master log.
>>>
>>> Regards,
>>> Chaitanya
>>>
>>>
>>> On Thu, Dec 1, 2016 at 8:03 AM, Max Bridgewater <
>>> max.bridgewa...@gmail.com> wrote:
>>>
>>>> I am getting the same error described here:
>>>> https://www.mail-archive.com/users@apex.apache.org/msg00533.html
>>>>
>>>> I am running Mahlar 3.5.0 with Apex 3.5.0-DT. First I got the zookeeper
>>>> error. Then I added consumer.zookeeper. Now I am getting:
>>>>
>>>> java.io.FileNotFoundException: File does not exist:
>>>> /user/dtadmin/datatorrent/apps/application_1480549373717_000
>>>> 6/events/index.txt
>>>>
>>>> Anybody knows what the solution is?
>>>>
>>>> Thanks,
>>>> Max.
>>>>
>>>
>>>
>>
>

Reply via email to