Hi Max,

Operators must be serialized using kryo.
Issue is KafkaTestUserPortStringInputOperator is not kryo serializable
because of mapper object.

 Please change the following line
private final ObjectMapper mapper = new ObjectMapper();
to
private final transient ObjectMapper mapper = new ObjectMapper();

Regards,
Chaitanya



On Thu, Dec 1, 2016 at 9:54 PM, Max Bridgewater <max.bridgewa...@gmail.com>
wrote:

> Thanks Ram. I changed the class to following but it's still not working.
> Not seeing anything useful in the logs. The app is not even deploying
> successfully.
>
>
> package com.example.myapexapp;
>
> import org.apache.apex.malhar.kafka.AbstractKafkaInputOperator;
> import org.apache.kafka.clients.consumer.ConsumerRecord;
> import org.codehaus.jackson.map.ObjectMapper;
>
> import com.datatorrent.api.DefaultOutputPort;
> import com.datatorrent.cassandra.TestUser;
>
> public class KafkaTestUserPortStringInputOperator extends
> AbstractKafkaInputOperator {
>
>     public final transient DefaultOutputPort<TestUser> outputPort = new
> DefaultOutputPort<>();
>
>     private final ObjectMapper mapper = new ObjectMapper();
>
>     private TestUser toUser(ConsumerRecord<byte[], byte[]> message) {
>         TestUser user = null;
>         try {
>             user = mapper.readValue(message.value(), TestUser.class);
>         } catch (Exception ex) {
>             return user;
>         }
>         return user;
>     }
>
>     @Override
>     protected void emitTuple(String cluster, ConsumerRecord<byte[],
> byte[]> message) {
>         outputPort.emit(toUser(message));
>     }
>
> }
>
>
> On Thu, Dec 1, 2016 at 10:53 AM, Munagala Ramanath <r...@datatorrent.com>
> wrote:
>
>> Max,
>>
>> The classes under *contrib/src/main/java/com/datatorrent/contrib/kafka*
>> use the old 0.8 Kafka API
>> whereas those under *kafka/src/main/java/org/apache/apex/malhar/kafka*
>> use the new 0.9 API.
>> *KafkaSinglePortInputOperator* (used in your working version) is in the
>> latter whereas
>> *AbstractKafkaSinglePortInputOperator* (extended by
>> *KafkaTestUserPortStringInputOperator* in your
>> failing version) is in the former.
>>
>> Can you try extending one of the classes from the 0.9 API package ?
>>
>> Ram
>>
>> On Thu, Dec 1, 2016 at 7:27 AM, Max Bridgewater <
>> max.bridgewa...@gmail.com> wrote:
>>
>>> I am attaching my whole project if that can help figure out what I am
>>> missing.
>>>
>>> On Thu, Dec 1, 2016 at 7:41 AM, Max Bridgewater <
>>> max.bridgewa...@gmail.com> wrote:
>>>
>>>> I'm really struggling to get this working. I am extending
>>>> AbstractKafkaSinglePortInputOperator to return a POJO. I read stuff
>>>> from Kafka and write to HDFS, pretty basic stuff.
>>>>
>>>> The following example works fine:
>>>>
>>>>     KafkaSinglePortInputOperator in = dag.addOperator("kafkaIn", new
>>>> KafkaSinglePortInputOperator());
>>>>     in.setInitialOffset(AbstractKafkaInputOperator.InitialOffset
>>>> .EARLIEST.name());
>>>>     LineOutputOperator out = dag.addOperator("fileOut", new
>>>> LineOutputOperator());
>>>>     dag.addStream("data", in.outputPort, out.input);
>>>>
>>>>
>>>> But, when I use my own KafkaTestUserPortStringInputOperator, nothing
>>>> works. The first thing that sounds weird is that my
>>>> KafkaTestUserPortStringInputOperator requires a consumer.zookeeper
>>>> property while the default KafkaSinglePortInputOperator doesn't. Any reason
>>>> why?
>>>>
>>>>
>>>> Here is my KafkaTestUserPortStringInputOperator:
>>>>
>>>> public class KafkaTestUserPortStringInputOperator extends
>>>> AbstractKafkaSinglePortInputOperator<TestUser>
>>>> {
>>>>
>>>>     private final ObjectMapper mapper = new ObjectMapper();
>>>>      private static final Logger logger = LoggerFactory.getLogger(KafkaT
>>>> estUserPortStringInputOperator.class);
>>>>
>>>>   @Override
>>>>   public TestUser getTuple(Message message)
>>>>   {
>>>>     TestUser user =null;
>>>>     try {
>>>>       ByteBuffer buffer = message.payload();
>>>>       byte[] bytes = new byte[buffer.remaining()];
>>>>       buffer.get(bytes);
>>>>       user=mapper.readValue(bytes, TestUser.class);
>>>>       logger.debug("Consuming {}", new String(bytes));
>>>>     }
>>>>     catch (Exception ex) {
>>>>       return user;
>>>>     }
>>>>     return user;
>>>>   }
>>>>
>>>> }
>>>>
>>>> And LineOutputOperator is as follows:
>>>>
>>>> public class LineOutputOperator extends AbstractFileOutputOperator<Obj
>>>> ect>
>>>> {
>>>>
>>>>   @NotNull
>>>>   private String baseName;
>>>>
>>>>   @Override
>>>>   public byte[] getBytesForTuple(Object t) {
>>>>     String result = t.toString() + System.lineSeparator();
>>>>     return result.getBytes(StandardCharsets.UTF_8);
>>>>  }
>>>>
>>>>   @Override
>>>>   protected String getFileName(Object t) {
>>>>     return baseName;
>>>>   }
>>>>
>>>>   public String getBaseName() { return baseName; }
>>>>   public void setBaseName(String v) { baseName = v; }
>>>> }
>>>>
>>>>
>>>> And my TestUser:
>>>>
>>>> public class TestUser
>>>> {
>>>>   @Bind(JavaSerializer.class)
>>>>   UUID id;
>>>>   String firstName;
>>>>   String lastName;
>>>>   String city;
>>>>
>>>>   public TestUser()
>>>>   {
>>>>
>>>>   }
>>>>
>>>>   public TestUser(UUID id, String firstName, String lastName, String
>>>> city)
>>>>   {
>>>>     this.id = id;
>>>>     this.firstName = firstName;
>>>>     this.lastName = lastName;
>>>>     this.city = city;
>>>>   }
>>>>
>>>>   public UUID getId()
>>>>   {
>>>>     return id;
>>>>   }
>>>>
>>>>   public void setId(UUID id)
>>>>   {
>>>>     this.id = id;
>>>>   }
>>>>
>>>>   public String getFirstName()
>>>>   {
>>>>     return firstName;
>>>>   }
>>>>
>>>>   public void setFirstName(String firstName)
>>>>   {
>>>>     this.firstName = firstName;
>>>>   }
>>>>
>>>>   public String getLastName()
>>>>   {
>>>>     return lastName;
>>>>   }
>>>>
>>>>   public void setLastName(String lastName)
>>>>   {
>>>>     this.lastName = lastName;
>>>>   }
>>>>
>>>>   public String getCity()
>>>>   {
>>>>     return city;
>>>>   }
>>>>
>>>>   public void setCity(String city)
>>>>   {
>>>>     this.city = city;
>>>>   }
>>>>
>>>>   @Override
>>>>   public int hashCode()
>>>>   {
>>>>     final int prime = 31;
>>>>     int result = 1;
>>>>     result = prime * result + ((city == null) ? 0 : city.hashCode());
>>>>     result = prime * result + ((firstName == null) ? 0 :
>>>> firstName.hashCode());
>>>>     result = prime * result + ((id == null) ? 0 : id.hashCode());
>>>>     result = prime * result + ((lastName == null) ? 0 :
>>>> lastName.hashCode());
>>>>     return result;
>>>>   }
>>>> }
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> On Thu, Dec 1, 2016 at 2:02 AM, Chaitanya Chebolu <
>>>> chaita...@datatorrent.com> wrote:
>>>>
>>>>> Hi Max,
>>>>>
>>>>>  This is the warning. I think no need to worry about this.
>>>>>  Did you see any other exception in logs or Please share the
>>>>> application master log.
>>>>>
>>>>> Regards,
>>>>> Chaitanya
>>>>>
>>>>>
>>>>> On Thu, Dec 1, 2016 at 8:03 AM, Max Bridgewater <
>>>>> max.bridgewa...@gmail.com> wrote:
>>>>>
>>>>>> I am getting the same error described here:
>>>>>> https://www.mail-archive.com/users@apex.apache.org/msg00533.html
>>>>>>
>>>>>> I am running Mahlar 3.5.0 with Apex 3.5.0-DT. First I got the
>>>>>> zookeeper error. Then I added consumer.zookeeper. Now I am getting:
>>>>>>
>>>>>> java.io.FileNotFoundException: File does not exist:
>>>>>> /user/dtadmin/datatorrent/apps/application_1480549373717_000
>>>>>> 6/events/index.txt
>>>>>>
>>>>>> Anybody knows what the solution is?
>>>>>>
>>>>>> Thanks,
>>>>>> Max.
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to