I'm really struggling to get this working. I am extending
AbstractKafkaSinglePortInputOperator to return a POJO. I read stuff from
Kafka and write to HDFS, pretty basic stuff.
The following example works fine:
KafkaSinglePortInputOperator in = dag.addOperator("kafkaIn", new
KafkaSinglePortInputOperator());
in.setInitialOffset(
AbstractKafkaInputOperator.InitialOffset.EARLIEST.name());
LineOutputOperator out = dag.addOperator("fileOut", new
LineOutputOperator());
dag.addStream("data", in.outputPort, out.input);
But, when I use my own KafkaTestUserPortStringInputOperator, nothing works.
The first thing that sounds weird is that my
KafkaTestUserPortStringInputOperator requires a consumer.zookeeper property
while the default KafkaSinglePortInputOperator doesn't. Any reason why?
Here is my KafkaTestUserPortStringInputOperator:
public class KafkaTestUserPortStringInputOperator extends
AbstractKafkaSinglePortInputOperator<TestUser>
{
private final ObjectMapper mapper = new ObjectMapper();
private static final Logger logger =
LoggerFactory.getLogger(KafkaTestUserPortStringInputOperator.class);
@Override
public TestUser getTuple(Message message)
{
TestUser user =null;
try {
ByteBuffer buffer = message.payload();
byte[] bytes = new byte[buffer.remaining()];
buffer.get(bytes);
user=mapper.readValue(bytes, TestUser.class);
logger.debug("Consuming {}", new String(bytes));
}
catch (Exception ex) {
return user;
}
return user;
}
}
And LineOutputOperator is as follows:
public class LineOutputOperator extends AbstractFileOutputOperator<Object>
{
@NotNull
private String baseName;
@Override
public byte[] getBytesForTuple(Object t) {
String result = t.toString() + System.lineSeparator();
return result.getBytes(StandardCharsets.UTF_8);
}
@Override
protected String getFileName(Object t) {
return baseName;
}
public String getBaseName() { return baseName; }
public void setBaseName(String v) { baseName = v; }
}
And my TestUser:
public class TestUser
{
@Bind(JavaSerializer.class)
UUID id;
String firstName;
String lastName;
String city;
public TestUser()
{
}
public TestUser(UUID id, String firstName, String lastName, String city)
{
this.id = id;
this.firstName = firstName;
this.lastName = lastName;
this.city = city;
}
public UUID getId()
{
return id;
}
public void setId(UUID id)
{
this.id = id;
}
public String getFirstName()
{
return firstName;
}
public void setFirstName(String firstName)
{
this.firstName = firstName;
}
public String getLastName()
{
return lastName;
}
public void setLastName(String lastName)
{
this.lastName = lastName;
}
public String getCity()
{
return city;
}
public void setCity(String city)
{
this.city = city;
}
@Override
public int hashCode()
{
final int prime = 31;
int result = 1;
result = prime * result + ((city == null) ? 0 : city.hashCode());
result = prime * result + ((firstName == null) ? 0 :
firstName.hashCode());
result = prime * result + ((id == null) ? 0 : id.hashCode());
result = prime * result + ((lastName == null) ? 0 :
lastName.hashCode());
return result;
}
}
On Thu, Dec 1, 2016 at 2:02 AM, Chaitanya Chebolu <[email protected]
> wrote:
> Hi Max,
>
> This is the warning. I think no need to worry about this.
> Did you see any other exception in logs or Please share the application
> master log.
>
> Regards,
> Chaitanya
>
>
> On Thu, Dec 1, 2016 at 8:03 AM, Max Bridgewater <[email protected]
> > wrote:
>
>> I am getting the same error described here:
>> https://www.mail-archive.com/[email protected]/msg00533.html
>>
>> I am running Mahlar 3.5.0 with Apex 3.5.0-DT. First I got the zookeeper
>> error. Then I added consumer.zookeeper. Now I am getting:
>>
>> java.io.FileNotFoundException: File does not exist:
>> /user/dtadmin/datatorrent/apps/application_1480549373717_
>> 0006/events/index.txt
>>
>> Anybody knows what the solution is?
>>
>> Thanks,
>> Max.
>>
>
>