I'm really struggling to get this working. I am extending
AbstractKafkaSinglePortInputOperator to return a POJO. I read stuff from
Kafka and write to HDFS, pretty basic stuff.

The following example works fine:

    KafkaSinglePortInputOperator in = dag.addOperator("kafkaIn", new
KafkaSinglePortInputOperator());
    in.setInitialOffset(
AbstractKafkaInputOperator.InitialOffset.EARLIEST.name());
    LineOutputOperator out = dag.addOperator("fileOut", new
LineOutputOperator());
    dag.addStream("data", in.outputPort, out.input);


But, when I use my own KafkaTestUserPortStringInputOperator, nothing works.
The first thing that sounds weird is that my
KafkaTestUserPortStringInputOperator requires a consumer.zookeeper property
while the default KafkaSinglePortInputOperator doesn't. Any reason why?


Here is my KafkaTestUserPortStringInputOperator:

public class KafkaTestUserPortStringInputOperator extends
AbstractKafkaSinglePortInputOperator<TestUser>
{

    private final ObjectMapper mapper = new ObjectMapper();
     private static final Logger logger =
LoggerFactory.getLogger(KafkaTestUserPortStringInputOperator.class);

  @Override
  public TestUser getTuple(Message message)
  {
    TestUser user =null;
    try {
      ByteBuffer buffer = message.payload();
      byte[] bytes = new byte[buffer.remaining()];
      buffer.get(bytes);
      user=mapper.readValue(bytes, TestUser.class);
      logger.debug("Consuming {}", new String(bytes));
    }
    catch (Exception ex) {
      return user;
    }
    return user;
  }

}

And LineOutputOperator is as follows:

public class LineOutputOperator extends AbstractFileOutputOperator<Object>
{

  @NotNull
  private String baseName;

  @Override
  public byte[] getBytesForTuple(Object t) {
    String result = t.toString() + System.lineSeparator();
    return result.getBytes(StandardCharsets.UTF_8);
 }

  @Override
  protected String getFileName(Object t) {
    return baseName;
  }

  public String getBaseName() { return baseName; }
  public void setBaseName(String v) { baseName = v; }
}


And my TestUser:

public class TestUser
{
  @Bind(JavaSerializer.class)
  UUID id;
  String firstName;
  String lastName;
  String city;

  public TestUser()
  {

  }

  public TestUser(UUID id, String firstName, String lastName, String city)
  {
    this.id = id;
    this.firstName = firstName;
    this.lastName = lastName;
    this.city = city;
  }

  public UUID getId()
  {
    return id;
  }

  public void setId(UUID id)
  {
    this.id = id;
  }

  public String getFirstName()
  {
    return firstName;
  }

  public void setFirstName(String firstName)
  {
    this.firstName = firstName;
  }

  public String getLastName()
  {
    return lastName;
  }

  public void setLastName(String lastName)
  {
    this.lastName = lastName;
  }

  public String getCity()
  {
    return city;
  }

  public void setCity(String city)
  {
    this.city = city;
  }

  @Override
  public int hashCode()
  {
    final int prime = 31;
    int result = 1;
    result = prime * result + ((city == null) ? 0 : city.hashCode());
    result = prime * result + ((firstName == null) ? 0 :
firstName.hashCode());
    result = prime * result + ((id == null) ? 0 : id.hashCode());
    result = prime * result + ((lastName == null) ? 0 :
lastName.hashCode());
    return result;
  }
}





On Thu, Dec 1, 2016 at 2:02 AM, Chaitanya Chebolu <chaita...@datatorrent.com
> wrote:

> Hi Max,
>
>  This is the warning. I think no need to worry about this.
>  Did you see any other exception in logs or Please share the application
> master log.
>
> Regards,
> Chaitanya
>
>
> On Thu, Dec 1, 2016 at 8:03 AM, Max Bridgewater <max.bridgewa...@gmail.com
> > wrote:
>
>> I am getting the same error described here:
>> https://www.mail-archive.com/users@apex.apache.org/msg00533.html
>>
>> I am running Mahlar 3.5.0 with Apex 3.5.0-DT. First I got the zookeeper
>> error. Then I added consumer.zookeeper. Now I am getting:
>>
>> java.io.FileNotFoundException: File does not exist:
>> /user/dtadmin/datatorrent/apps/application_1480549373717_
>> 0006/events/index.txt
>>
>> Anybody knows what the solution is?
>>
>> Thanks,
>> Max.
>>
>
>

Reply via email to