Convert Simple Kafka Consumer to standalone Spark JavaStream Consumer

2015-07-21 Thread Hafsa Asif
Hi, I have  a simple High level Kafka Consumer like :
package matchinguu.kafka.consumer;


import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;

import java.util.*;

public class SimpleHLConsumer {

private final ConsumerConnector consumer;
private final String topic;

public SimpleHLConsumer(String zookeeper, String groupId, String topic)
{
Properties props = new Properties();
props.put(zookeeper.connect, zookeeper);
props.put(group.id, groupId);
props.put(zookeeper.session.timeout.ms, 500);
props.put(zookeeper.sync.time.ms, 250);
props.put(auto.commit.interval.ms, 1000);


consumer = Consumer.createJavaConsumerConnector(new
ConsumerConfig(props));
this.topic = topic;
}

public void testConsumer() {
MapString, Integer topicCount = new HashMapString, Integer();
topicCount.put(topic, 1);

MapString, Listlt;KafkaStreamlt;byte[], byte[] consumerStreams
= consumer.createMessageStreams(topicCount);
ListKafkaStreamlt;byte[], byte[] streams =
consumerStreams.get(topic);
for (final KafkaStream stream : streams) {

ConsumerIteratorbyte[], byte[] it = stream.iterator();
while (it.hasNext()) {
System.out.println();
System.out.println(Message from Single Topic:  + new
String(it.next().message()));
}
}
if (consumer != null) {
System.out.println(Shutdown Happens);
consumer.shutdown();
}

}

public static void main(String[] args) {
System.out.println(Consumer is now reading messages from
producer);
//String topic = args[0];
String topic = test;
SimpleHLConsumer simpleHLConsumer = new
SimpleHLConsumer(localhost:2181, testgroup, topic);
simpleHLConsumer.testConsumer();
   }

}

I want to get my messages through Spark Java Streaming with Kafka
integration. Can anyone help me to reform this code so that I can get same
output with Spark Kafka integration.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Convert-Simple-Kafka-Consumer-to-standalone-Spark-JavaStream-Consumer-tp23930.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Convert Simple Kafka Consumer to standalone Spark JavaStream Consumer

2015-07-21 Thread Tathagata Das
From what I understand about your code, it is getting data from different
partitions of a topic - get all data from partition 1, then from partition
2, etc. Though you have configured it to read from just one partition
(topicCount has count = 1). So I am not sure what your intention is, read
all partitions serially, or in parallel.

If you want to start of Kafka + Spark Streaming, I strongly suggest reading
the Kafka integration guide -
https://spark.apache.org/docs/latest/streaming-kafka-integration.html
and run the examples for the two ways
-
https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala
-
https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/DirectKafkaWordCount.scala

Since you understand the high level consumer idea, you may want to start
with the first receiver-based approach, which uses HL consumer as well, and
takes topicCounts.


On Tue, Jul 21, 2015 at 8:23 AM, Hafsa Asif hafsa.a...@matchinguu.com
wrote:

 Hi, I have  a simple High level Kafka Consumer like :
 package matchinguu.kafka.consumer;


 import kafka.consumer.Consumer;
 import kafka.consumer.ConsumerConfig;
 import kafka.consumer.ConsumerIterator;
 import kafka.consumer.KafkaStream;
 import kafka.javaapi.consumer.ConsumerConnector;

 import java.util.*;

 public class SimpleHLConsumer {

 private final ConsumerConnector consumer;
 private final String topic;

 public SimpleHLConsumer(String zookeeper, String groupId, String topic)
 {
 Properties props = new Properties();
 props.put(zookeeper.connect, zookeeper);
 props.put(group.id, groupId);
 props.put(zookeeper.session.timeout.ms, 500);
 props.put(zookeeper.sync.time.ms, 250);
 props.put(auto.commit.interval.ms, 1000);


 consumer = Consumer.createJavaConsumerConnector(new
 ConsumerConfig(props));
 this.topic = topic;
 }

 public void testConsumer() {
 MapString, Integer topicCount = new HashMapString, Integer();
 topicCount.put(topic, 1);

 MapString, Listlt;KafkaStreamlt;byte[], byte[]
 consumerStreams
 = consumer.createMessageStreams(topicCount);
 ListKafkaStreamlt;byte[], byte[] streams =
 consumerStreams.get(topic);
 for (final KafkaStream stream : streams) {

 ConsumerIteratorbyte[], byte[] it = stream.iterator();
 while (it.hasNext()) {
 System.out.println();
 System.out.println(Message from Single Topic:  + new
 String(it.next().message()));
 }
 }
 if (consumer != null) {
 System.out.println(Shutdown Happens);
 consumer.shutdown();
 }

 }

 public static void main(String[] args) {
 System.out.println(Consumer is now reading messages from
 producer);
 //String topic = args[0];
 String topic = test;
 SimpleHLConsumer simpleHLConsumer = new
 SimpleHLConsumer(localhost:2181, testgroup, topic);
 simpleHLConsumer.testConsumer();
}

 }

 I want to get my messages through Spark Java Streaming with Kafka
 integration. Can anyone help me to reform this code so that I can get same
 output with Spark Kafka integration.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Convert-Simple-Kafka-Consumer-to-standalone-Spark-JavaStream-Consumer-tp23930.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org