Hi Vineet,

You probably meant to send this message to the Kafka list, no?

-Flavio

> On 23 Mar 2016, at 06:33, vineet.salian <[email protected]> wrote:
> 
> Hi Avinash,
> I am facing the same problem. 
> I am unable to detect the source of the problem as the producer program is
> working but consumer is not.
> Can you please help me out.
> 
> import java.io.UnsupportedEncodingException;
> import java.nio.ByteBuffer;
> import java.util.HashMap;
> import java.util.List;
> import java.util.Map;
> import java.util.Properties;
> 
> import kafka.consumer.Consumer;
> import kafka.consumer.ConsumerConfig;
> import kafka.consumer.ConsumerIterator;
> import kafka.consumer.KafkaStream;
> import kafka.javaapi.consumer.ConsumerConnector;
> import kafka.javaapi.message.ByteBufferMessageSet;
> import kafka.message.MessageAndOffset;
> public class HelloKafkaConsumer extends  Thread {
>    final static String clientId = "SimpleConsumerDemoClient";
>    final static String TOPIC = "test";
>    ConsumerConnector consumerConnector;
> 
> 
>    public static void main(String[] argv) throws
> UnsupportedEncodingException {
>        HelloKafkaConsumer helloKafkaConsumer = new HelloKafkaConsumer();
>        helloKafkaConsumer.start();
>    }
> 
>    public HelloKafkaConsumer(){
>       try{
>        Properties properties = new Properties();
>        properties.put("zookeeper.connect","localhost:2181");
>        System.out.println("Hi"+properties);
>        properties.put("group.id","test-group");
>        System.out.println("Hi1"+properties);
>        ConsumerConfig consumerConfig = new ConsumerConfig(properties);
>        System.out.println("hi2"+consumerConfig);
>        consumerConnector =
> Consumer.createJavaConsumerConnector(consumerConfig);
>       System.out.println("hi3"+consumerConnector);
>        }catch(Exception e){}
>    }
> 
>    @Override
>    public void run() {
>       try{
>        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
> System.out.println("topic count map "+topicCountMap);
>        topicCountMap.put(TOPIC, new Integer(1));
>        Map<String, List&lt;KafkaStream&lt;byte[], byte[]>>> consumerMap =
> consumerConnector.createMessageStreams(topicCountMap);
>        System.out.println("------------------------->");
>        KafkaStream<byte[], byte[]> stream =  consumerMap.get(TOPIC).get(0);
>        ConsumerIterator<byte[], byte[]> it = stream.iterator();
>        while(it.hasNext())
>            System.out.println(new String(it.next().message()));
>       }catch(Exception e){ System.out.print("Inside run"+e);
> e.printStackTrace(); }
>    }
> 
>    private static void printMessages(ByteBufferMessageSet messageSet)
> throws UnsupportedEncodingException {
>               for(MessageAndOffset messageAndOffset: messageSet) 
>               {
>                       try{
>                       ByteBuffer payload = 
> messageAndOffset.message().payload();
>                       byte[] bytes = new byte[payload.limit()];
>                       payload.get(bytes);
>                       System.out.println(new String(bytes, "UTF-8"));
>                       }catch(Exception e){}
>               }
>    }
> }
> 
> This is the consumer program.
> 
> Regards,
> Vineet
> 
> 
> 
> --
> View this message in context: 
> http://zookeeper-user.578899.n2.nabble.com/Unusual-exception-tp5632833p7582198.html
> Sent from the zookeeper-user mailing list archive at Nabble.com.

Reply via email to