I am attaching my whole project if that can help figure out what I am
missing.

On Thu, Dec 1, 2016 at 7:41 AM, Max Bridgewater <max.bridgewa...@gmail.com>
wrote:

> I'm really struggling to get this working. I am extending
> AbstractKafkaSinglePortInputOperator to return a POJO. I read stuff from
> Kafka and write to HDFS, pretty basic stuff.
>
> The following example works fine:
>
>     KafkaSinglePortInputOperator in = dag.addOperator("kafkaIn", new
> KafkaSinglePortInputOperator());
>     in.setInitialOffset(AbstractKafkaInputOperator.
> InitialOffset.EARLIEST.name());
>     LineOutputOperator out = dag.addOperator("fileOut", new
> LineOutputOperator());
>     dag.addStream("data", in.outputPort, out.input);
>
>
> But, when I use my own KafkaTestUserPortStringInputOperator, nothing
> works. The first thing that sounds weird is that my
> KafkaTestUserPortStringInputOperator requires a consumer.zookeeper
> property while the default KafkaSinglePortInputOperator doesn't. Any reason
> why?
>
>
> Here is my KafkaTestUserPortStringInputOperator:
>
> public class KafkaTestUserPortStringInputOperator extends
> AbstractKafkaSinglePortInputOperator<TestUser>
> {
>
>     private final ObjectMapper mapper = new ObjectMapper();
>      private static final Logger logger = LoggerFactory.getLogger(
> KafkaTestUserPortStringInputOperator.class);
>
>   @Override
>   public TestUser getTuple(Message message)
>   {
>     TestUser user =null;
>     try {
>       ByteBuffer buffer = message.payload();
>       byte[] bytes = new byte[buffer.remaining()];
>       buffer.get(bytes);
>       user=mapper.readValue(bytes, TestUser.class);
>       logger.debug("Consuming {}", new String(bytes));
>     }
>     catch (Exception ex) {
>       return user;
>     }
>     return user;
>   }
>
> }
>
> And LineOutputOperator is as follows:
>
> public class LineOutputOperator extends AbstractFileOutputOperator<Object>
> {
>
>   @NotNull
>   private String baseName;
>
>   @Override
>   public byte[] getBytesForTuple(Object t) {
>     String result = t.toString() + System.lineSeparator();
>     return result.getBytes(StandardCharsets.UTF_8);
>  }
>
>   @Override
>   protected String getFileName(Object t) {
>     return baseName;
>   }
>
>   public String getBaseName() { return baseName; }
>   public void setBaseName(String v) { baseName = v; }
> }
>
>
> And my TestUser:
>
> public class TestUser
> {
>   @Bind(JavaSerializer.class)
>   UUID id;
>   String firstName;
>   String lastName;
>   String city;
>
>   public TestUser()
>   {
>
>   }
>
>   public TestUser(UUID id, String firstName, String lastName, String city)
>   {
>     this.id = id;
>     this.firstName = firstName;
>     this.lastName = lastName;
>     this.city = city;
>   }
>
>   public UUID getId()
>   {
>     return id;
>   }
>
>   public void setId(UUID id)
>   {
>     this.id = id;
>   }
>
>   public String getFirstName()
>   {
>     return firstName;
>   }
>
>   public void setFirstName(String firstName)
>   {
>     this.firstName = firstName;
>   }
>
>   public String getLastName()
>   {
>     return lastName;
>   }
>
>   public void setLastName(String lastName)
>   {
>     this.lastName = lastName;
>   }
>
>   public String getCity()
>   {
>     return city;
>   }
>
>   public void setCity(String city)
>   {
>     this.city = city;
>   }
>
>   @Override
>   public int hashCode()
>   {
>     final int prime = 31;
>     int result = 1;
>     result = prime * result + ((city == null) ? 0 : city.hashCode());
>     result = prime * result + ((firstName == null) ? 0 :
> firstName.hashCode());
>     result = prime * result + ((id == null) ? 0 : id.hashCode());
>     result = prime * result + ((lastName == null) ? 0 :
> lastName.hashCode());
>     return result;
>   }
> }
>
>
>
>
>
> On Thu, Dec 1, 2016 at 2:02 AM, Chaitanya Chebolu <
> chaita...@datatorrent.com> wrote:
>
>> Hi Max,
>>
>>  This is the warning. I think no need to worry about this.
>>  Did you see any other exception in logs or Please share the application
>> master log.
>>
>> Regards,
>> Chaitanya
>>
>>
>> On Thu, Dec 1, 2016 at 8:03 AM, Max Bridgewater <
>> max.bridgewa...@gmail.com> wrote:
>>
>>> I am getting the same error described here:
>>> https://www.mail-archive.com/users@apex.apache.org/msg00533.html
>>>
>>> I am running Mahlar 3.5.0 with Apex 3.5.0-DT. First I got the zookeeper
>>> error. Then I added consumer.zookeeper. Now I am getting:
>>>
>>> java.io.FileNotFoundException: File does not exist:
>>> /user/dtadmin/datatorrent/apps/application_1480549373717_000
>>> 6/events/index.txt
>>>
>>> Anybody knows what the solution is?
>>>
>>> Thanks,
>>> Max.
>>>
>>
>>
>

Attachment: kafka-pojo-hdfs.tar.gz
Description: GNU Zip compressed data

Reply via email to