Max, The classes under *contrib/src/main/java/com/datatorrent/contrib/kafka* use the old 0.8 Kafka API whereas those under *kafka/src/main/java/org/apache/apex/malhar/kafka* use the new 0.9 API. *KafkaSinglePortInputOperator* (used in your working version) is in the latter whereas *AbstractKafkaSinglePortInputOperator* (extended by *KafkaTestUserPortStringInputOperator* in your failing version) is in the former.
Can you try extending one of the classes from the 0.9 API package ? Ram On Thu, Dec 1, 2016 at 7:27 AM, Max Bridgewater <max.bridgewa...@gmail.com> wrote: > I am attaching my whole project if that can help figure out what I am > missing. > > On Thu, Dec 1, 2016 at 7:41 AM, Max Bridgewater <max.bridgewa...@gmail.com > > wrote: > >> I'm really struggling to get this working. I am extending >> AbstractKafkaSinglePortInputOperator to return a POJO. I read stuff from >> Kafka and write to HDFS, pretty basic stuff. >> >> The following example works fine: >> >> KafkaSinglePortInputOperator in = dag.addOperator("kafkaIn", new >> KafkaSinglePortInputOperator()); >> in.setInitialOffset(AbstractKafkaInputOperator.InitialOffset >> .EARLIEST.name()); >> LineOutputOperator out = dag.addOperator("fileOut", new >> LineOutputOperator()); >> dag.addStream("data", in.outputPort, out.input); >> >> >> But, when I use my own KafkaTestUserPortStringInputOperator, nothing >> works. The first thing that sounds weird is that my >> KafkaTestUserPortStringInputOperator requires a consumer.zookeeper >> property while the default KafkaSinglePortInputOperator doesn't. Any reason >> why? >> >> >> Here is my KafkaTestUserPortStringInputOperator: >> >> public class KafkaTestUserPortStringInputOperator extends >> AbstractKafkaSinglePortInputOperator<TestUser> >> { >> >> private final ObjectMapper mapper = new ObjectMapper(); >> private static final Logger logger = LoggerFactory.getLogger(KafkaT >> estUserPortStringInputOperator.class); >> >> @Override >> public TestUser getTuple(Message message) >> { >> TestUser user =null; >> try { >> ByteBuffer buffer = message.payload(); >> byte[] bytes = new byte[buffer.remaining()]; >> buffer.get(bytes); >> user=mapper.readValue(bytes, TestUser.class); >> logger.debug("Consuming {}", new String(bytes)); >> } >> catch (Exception ex) { >> return user; >> } >> return user; >> } >> >> } >> >> And LineOutputOperator is as follows: >> >> public class LineOutputOperator extends AbstractFileOutputOperator<Obj >> ect> >> { >> >> @NotNull >> private String baseName; >> >> @Override >> public byte[] getBytesForTuple(Object t) { >> String result = t.toString() + System.lineSeparator(); >> return result.getBytes(StandardCharsets.UTF_8); >> } >> >> @Override >> protected String getFileName(Object t) { >> return baseName; >> } >> >> public String getBaseName() { return baseName; } >> public void setBaseName(String v) { baseName = v; } >> } >> >> >> And my TestUser: >> >> public class TestUser >> { >> @Bind(JavaSerializer.class) >> UUID id; >> String firstName; >> String lastName; >> String city; >> >> public TestUser() >> { >> >> } >> >> public TestUser(UUID id, String firstName, String lastName, String city) >> { >> this.id = id; >> this.firstName = firstName; >> this.lastName = lastName; >> this.city = city; >> } >> >> public UUID getId() >> { >> return id; >> } >> >> public void setId(UUID id) >> { >> this.id = id; >> } >> >> public String getFirstName() >> { >> return firstName; >> } >> >> public void setFirstName(String firstName) >> { >> this.firstName = firstName; >> } >> >> public String getLastName() >> { >> return lastName; >> } >> >> public void setLastName(String lastName) >> { >> this.lastName = lastName; >> } >> >> public String getCity() >> { >> return city; >> } >> >> public void setCity(String city) >> { >> this.city = city; >> } >> >> @Override >> public int hashCode() >> { >> final int prime = 31; >> int result = 1; >> result = prime * result + ((city == null) ? 0 : city.hashCode()); >> result = prime * result + ((firstName == null) ? 0 : >> firstName.hashCode()); >> result = prime * result + ((id == null) ? 0 : id.hashCode()); >> result = prime * result + ((lastName == null) ? 0 : >> lastName.hashCode()); >> return result; >> } >> } >> >> >> >> >> >> On Thu, Dec 1, 2016 at 2:02 AM, Chaitanya Chebolu < >> chaita...@datatorrent.com> wrote: >> >>> Hi Max, >>> >>> This is the warning. I think no need to worry about this. >>> Did you see any other exception in logs or Please share the application >>> master log. >>> >>> Regards, >>> Chaitanya >>> >>> >>> On Thu, Dec 1, 2016 at 8:03 AM, Max Bridgewater < >>> max.bridgewa...@gmail.com> wrote: >>> >>>> I am getting the same error described here: >>>> https://www.mail-archive.com/users@apex.apache.org/msg00533.html >>>> >>>> I am running Mahlar 3.5.0 with Apex 3.5.0-DT. First I got the zookeeper >>>> error. Then I added consumer.zookeeper. Now I am getting: >>>> >>>> java.io.FileNotFoundException: File does not exist: >>>> /user/dtadmin/datatorrent/apps/application_1480549373717_000 >>>> 6/events/index.txt >>>> >>>> Anybody knows what the solution is? >>>> >>>> Thanks, >>>> Max. >>>> >>> >>> >> >