Hi Max, Operators must be serialized using kryo. Issue is KafkaTestUserPortStringInputOperator is not kryo serializable because of mapper object.
Please change the following line private final ObjectMapper mapper = new ObjectMapper(); to private final transient ObjectMapper mapper = new ObjectMapper(); Regards, Chaitanya On Thu, Dec 1, 2016 at 9:54 PM, Max Bridgewater <[email protected]> wrote: > Thanks Ram. I changed the class to following but it's still not working. > Not seeing anything useful in the logs. The app is not even deploying > successfully. > > > package com.example.myapexapp; > > import org.apache.apex.malhar.kafka.AbstractKafkaInputOperator; > import org.apache.kafka.clients.consumer.ConsumerRecord; > import org.codehaus.jackson.map.ObjectMapper; > > import com.datatorrent.api.DefaultOutputPort; > import com.datatorrent.cassandra.TestUser; > > public class KafkaTestUserPortStringInputOperator extends > AbstractKafkaInputOperator { > > public final transient DefaultOutputPort<TestUser> outputPort = new > DefaultOutputPort<>(); > > private final ObjectMapper mapper = new ObjectMapper(); > > private TestUser toUser(ConsumerRecord<byte[], byte[]> message) { > TestUser user = null; > try { > user = mapper.readValue(message.value(), TestUser.class); > } catch (Exception ex) { > return user; > } > return user; > } > > @Override > protected void emitTuple(String cluster, ConsumerRecord<byte[], > byte[]> message) { > outputPort.emit(toUser(message)); > } > > } > > > On Thu, Dec 1, 2016 at 10:53 AM, Munagala Ramanath <[email protected]> > wrote: > >> Max, >> >> The classes under *contrib/src/main/java/com/datatorrent/contrib/kafka* >> use the old 0.8 Kafka API >> whereas those under *kafka/src/main/java/org/apache/apex/malhar/kafka* >> use the new 0.9 API. >> *KafkaSinglePortInputOperator* (used in your working version) is in the >> latter whereas >> *AbstractKafkaSinglePortInputOperator* (extended by >> *KafkaTestUserPortStringInputOperator* in your >> failing version) is in the former. >> >> Can you try extending one of the classes from the 0.9 API package ? >> >> Ram >> >> On Thu, Dec 1, 2016 at 7:27 AM, Max Bridgewater < >> [email protected]> wrote: >> >>> I am attaching my whole project if that can help figure out what I am >>> missing. >>> >>> On Thu, Dec 1, 2016 at 7:41 AM, Max Bridgewater < >>> [email protected]> wrote: >>> >>>> I'm really struggling to get this working. I am extending >>>> AbstractKafkaSinglePortInputOperator to return a POJO. I read stuff >>>> from Kafka and write to HDFS, pretty basic stuff. >>>> >>>> The following example works fine: >>>> >>>> KafkaSinglePortInputOperator in = dag.addOperator("kafkaIn", new >>>> KafkaSinglePortInputOperator()); >>>> in.setInitialOffset(AbstractKafkaInputOperator.InitialOffset >>>> .EARLIEST.name()); >>>> LineOutputOperator out = dag.addOperator("fileOut", new >>>> LineOutputOperator()); >>>> dag.addStream("data", in.outputPort, out.input); >>>> >>>> >>>> But, when I use my own KafkaTestUserPortStringInputOperator, nothing >>>> works. The first thing that sounds weird is that my >>>> KafkaTestUserPortStringInputOperator requires a consumer.zookeeper >>>> property while the default KafkaSinglePortInputOperator doesn't. Any reason >>>> why? >>>> >>>> >>>> Here is my KafkaTestUserPortStringInputOperator: >>>> >>>> public class KafkaTestUserPortStringInputOperator extends >>>> AbstractKafkaSinglePortInputOperator<TestUser> >>>> { >>>> >>>> private final ObjectMapper mapper = new ObjectMapper(); >>>> private static final Logger logger = LoggerFactory.getLogger(KafkaT >>>> estUserPortStringInputOperator.class); >>>> >>>> @Override >>>> public TestUser getTuple(Message message) >>>> { >>>> TestUser user =null; >>>> try { >>>> ByteBuffer buffer = message.payload(); >>>> byte[] bytes = new byte[buffer.remaining()]; >>>> buffer.get(bytes); >>>> user=mapper.readValue(bytes, TestUser.class); >>>> logger.debug("Consuming {}", new String(bytes)); >>>> } >>>> catch (Exception ex) { >>>> return user; >>>> } >>>> return user; >>>> } >>>> >>>> } >>>> >>>> And LineOutputOperator is as follows: >>>> >>>> public class LineOutputOperator extends AbstractFileOutputOperator<Obj >>>> ect> >>>> { >>>> >>>> @NotNull >>>> private String baseName; >>>> >>>> @Override >>>> public byte[] getBytesForTuple(Object t) { >>>> String result = t.toString() + System.lineSeparator(); >>>> return result.getBytes(StandardCharsets.UTF_8); >>>> } >>>> >>>> @Override >>>> protected String getFileName(Object t) { >>>> return baseName; >>>> } >>>> >>>> public String getBaseName() { return baseName; } >>>> public void setBaseName(String v) { baseName = v; } >>>> } >>>> >>>> >>>> And my TestUser: >>>> >>>> public class TestUser >>>> { >>>> @Bind(JavaSerializer.class) >>>> UUID id; >>>> String firstName; >>>> String lastName; >>>> String city; >>>> >>>> public TestUser() >>>> { >>>> >>>> } >>>> >>>> public TestUser(UUID id, String firstName, String lastName, String >>>> city) >>>> { >>>> this.id = id; >>>> this.firstName = firstName; >>>> this.lastName = lastName; >>>> this.city = city; >>>> } >>>> >>>> public UUID getId() >>>> { >>>> return id; >>>> } >>>> >>>> public void setId(UUID id) >>>> { >>>> this.id = id; >>>> } >>>> >>>> public String getFirstName() >>>> { >>>> return firstName; >>>> } >>>> >>>> public void setFirstName(String firstName) >>>> { >>>> this.firstName = firstName; >>>> } >>>> >>>> public String getLastName() >>>> { >>>> return lastName; >>>> } >>>> >>>> public void setLastName(String lastName) >>>> { >>>> this.lastName = lastName; >>>> } >>>> >>>> public String getCity() >>>> { >>>> return city; >>>> } >>>> >>>> public void setCity(String city) >>>> { >>>> this.city = city; >>>> } >>>> >>>> @Override >>>> public int hashCode() >>>> { >>>> final int prime = 31; >>>> int result = 1; >>>> result = prime * result + ((city == null) ? 0 : city.hashCode()); >>>> result = prime * result + ((firstName == null) ? 0 : >>>> firstName.hashCode()); >>>> result = prime * result + ((id == null) ? 0 : id.hashCode()); >>>> result = prime * result + ((lastName == null) ? 0 : >>>> lastName.hashCode()); >>>> return result; >>>> } >>>> } >>>> >>>> >>>> >>>> >>>> >>>> On Thu, Dec 1, 2016 at 2:02 AM, Chaitanya Chebolu < >>>> [email protected]> wrote: >>>> >>>>> Hi Max, >>>>> >>>>> This is the warning. I think no need to worry about this. >>>>> Did you see any other exception in logs or Please share the >>>>> application master log. >>>>> >>>>> Regards, >>>>> Chaitanya >>>>> >>>>> >>>>> On Thu, Dec 1, 2016 at 8:03 AM, Max Bridgewater < >>>>> [email protected]> wrote: >>>>> >>>>>> I am getting the same error described here: >>>>>> https://www.mail-archive.com/[email protected]/msg00533.html >>>>>> >>>>>> I am running Mahlar 3.5.0 with Apex 3.5.0-DT. First I got the >>>>>> zookeeper error. Then I added consumer.zookeeper. Now I am getting: >>>>>> >>>>>> java.io.FileNotFoundException: File does not exist: >>>>>> /user/dtadmin/datatorrent/apps/application_1480549373717_000 >>>>>> 6/events/index.txt >>>>>> >>>>>> Anybody knows what the solution is? >>>>>> >>>>>> Thanks, >>>>>> Max. >>>>>> >>>>> >>>>> >>>> >>> >> >
