Hi Vineet, You probably meant to send this message to the Kafka list, no?
-Flavio > On 23 Mar 2016, at 06:33, vineet.salian <[email protected]> wrote: > > Hi Avinash, > I am facing the same problem. > I am unable to detect the source of the problem as the producer program is > working but consumer is not. > Can you please help me out. > > import java.io.UnsupportedEncodingException; > import java.nio.ByteBuffer; > import java.util.HashMap; > import java.util.List; > import java.util.Map; > import java.util.Properties; > > import kafka.consumer.Consumer; > import kafka.consumer.ConsumerConfig; > import kafka.consumer.ConsumerIterator; > import kafka.consumer.KafkaStream; > import kafka.javaapi.consumer.ConsumerConnector; > import kafka.javaapi.message.ByteBufferMessageSet; > import kafka.message.MessageAndOffset; > public class HelloKafkaConsumer extends Thread { > final static String clientId = "SimpleConsumerDemoClient"; > final static String TOPIC = "test"; > ConsumerConnector consumerConnector; > > > public static void main(String[] argv) throws > UnsupportedEncodingException { > HelloKafkaConsumer helloKafkaConsumer = new HelloKafkaConsumer(); > helloKafkaConsumer.start(); > } > > public HelloKafkaConsumer(){ > try{ > Properties properties = new Properties(); > properties.put("zookeeper.connect","localhost:2181"); > System.out.println("Hi"+properties); > properties.put("group.id","test-group"); > System.out.println("Hi1"+properties); > ConsumerConfig consumerConfig = new ConsumerConfig(properties); > System.out.println("hi2"+consumerConfig); > consumerConnector = > Consumer.createJavaConsumerConnector(consumerConfig); > System.out.println("hi3"+consumerConnector); > }catch(Exception e){} > } > > @Override > public void run() { > try{ > Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); > System.out.println("topic count map "+topicCountMap); > topicCountMap.put(TOPIC, new Integer(1)); > Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = > consumerConnector.createMessageStreams(topicCountMap); > System.out.println("------------------------->"); > KafkaStream<byte[], byte[]> stream = consumerMap.get(TOPIC).get(0); > ConsumerIterator<byte[], byte[]> it = stream.iterator(); > while(it.hasNext()) > System.out.println(new String(it.next().message())); > }catch(Exception e){ System.out.print("Inside run"+e); > e.printStackTrace(); } > } > > private static void printMessages(ByteBufferMessageSet messageSet) > throws UnsupportedEncodingException { > for(MessageAndOffset messageAndOffset: messageSet) > { > try{ > ByteBuffer payload = > messageAndOffset.message().payload(); > byte[] bytes = new byte[payload.limit()]; > payload.get(bytes); > System.out.println(new String(bytes, "UTF-8")); > }catch(Exception e){} > } > } > } > > This is the consumer program. > > Regards, > Vineet > > > > -- > View this message in context: > http://zookeeper-user.578899.n2.nabble.com/Unusual-exception-tp5632833p7582198.html > Sent from the zookeeper-user mailing list archive at Nabble.com.
