I am attaching my whole project if that can help figure out what I am missing.
On Thu, Dec 1, 2016 at 7:41 AM, Max Bridgewater <[email protected]> wrote: > I'm really struggling to get this working. I am extending > AbstractKafkaSinglePortInputOperator to return a POJO. I read stuff from > Kafka and write to HDFS, pretty basic stuff. > > The following example works fine: > > KafkaSinglePortInputOperator in = dag.addOperator("kafkaIn", new > KafkaSinglePortInputOperator()); > in.setInitialOffset(AbstractKafkaInputOperator. > InitialOffset.EARLIEST.name()); > LineOutputOperator out = dag.addOperator("fileOut", new > LineOutputOperator()); > dag.addStream("data", in.outputPort, out.input); > > > But, when I use my own KafkaTestUserPortStringInputOperator, nothing > works. The first thing that sounds weird is that my > KafkaTestUserPortStringInputOperator requires a consumer.zookeeper > property while the default KafkaSinglePortInputOperator doesn't. Any reason > why? > > > Here is my KafkaTestUserPortStringInputOperator: > > public class KafkaTestUserPortStringInputOperator extends > AbstractKafkaSinglePortInputOperator<TestUser> > { > > private final ObjectMapper mapper = new ObjectMapper(); > private static final Logger logger = LoggerFactory.getLogger( > KafkaTestUserPortStringInputOperator.class); > > @Override > public TestUser getTuple(Message message) > { > TestUser user =null; > try { > ByteBuffer buffer = message.payload(); > byte[] bytes = new byte[buffer.remaining()]; > buffer.get(bytes); > user=mapper.readValue(bytes, TestUser.class); > logger.debug("Consuming {}", new String(bytes)); > } > catch (Exception ex) { > return user; > } > return user; > } > > } > > And LineOutputOperator is as follows: > > public class LineOutputOperator extends AbstractFileOutputOperator<Object> > { > > @NotNull > private String baseName; > > @Override > public byte[] getBytesForTuple(Object t) { > String result = t.toString() + System.lineSeparator(); > return result.getBytes(StandardCharsets.UTF_8); > } > > @Override > protected String getFileName(Object t) { > return baseName; > } > > public String getBaseName() { return baseName; } > public void setBaseName(String v) { baseName = v; } > } > > > And my TestUser: > > public class TestUser > { > @Bind(JavaSerializer.class) > UUID id; > String firstName; > String lastName; > String city; > > public TestUser() > { > > } > > public TestUser(UUID id, String firstName, String lastName, String city) > { > this.id = id; > this.firstName = firstName; > this.lastName = lastName; > this.city = city; > } > > public UUID getId() > { > return id; > } > > public void setId(UUID id) > { > this.id = id; > } > > public String getFirstName() > { > return firstName; > } > > public void setFirstName(String firstName) > { > this.firstName = firstName; > } > > public String getLastName() > { > return lastName; > } > > public void setLastName(String lastName) > { > this.lastName = lastName; > } > > public String getCity() > { > return city; > } > > public void setCity(String city) > { > this.city = city; > } > > @Override > public int hashCode() > { > final int prime = 31; > int result = 1; > result = prime * result + ((city == null) ? 0 : city.hashCode()); > result = prime * result + ((firstName == null) ? 0 : > firstName.hashCode()); > result = prime * result + ((id == null) ? 0 : id.hashCode()); > result = prime * result + ((lastName == null) ? 0 : > lastName.hashCode()); > return result; > } > } > > > > > > On Thu, Dec 1, 2016 at 2:02 AM, Chaitanya Chebolu < > [email protected]> wrote: > >> Hi Max, >> >> This is the warning. I think no need to worry about this. >> Did you see any other exception in logs or Please share the application >> master log. >> >> Regards, >> Chaitanya >> >> >> On Thu, Dec 1, 2016 at 8:03 AM, Max Bridgewater < >> [email protected]> wrote: >> >>> I am getting the same error described here: >>> https://www.mail-archive.com/[email protected]/msg00533.html >>> >>> I am running Mahlar 3.5.0 with Apex 3.5.0-DT. First I got the zookeeper >>> error. Then I added consumer.zookeeper. Now I am getting: >>> >>> java.io.FileNotFoundException: File does not exist: >>> /user/dtadmin/datatorrent/apps/application_1480549373717_000 >>> 6/events/index.txt >>> >>> Anybody knows what the solution is? >>> >>> Thanks, >>> Max. >>> >> >> >
kafka-pojo-hdfs.tar.gz
Description: GNU Zip compressed data
