From what I understand about your code, it is getting data from different
partitions of a topic - get all data from partition 1, then from partition
2, etc. Though you have configured it to read from just one partition
(topicCount has count = 1). So I am not sure what your intention is, read
all partitions serially, or in parallel.
If you want to start of Kafka + Spark Streaming, I strongly suggest reading
the Kafka integration guide -
https://spark.apache.org/docs/latest/streaming-kafka-integration.html
and run the examples for the two ways
-
https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala
-
https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/DirectKafkaWordCount.scala
Since you understand the high level consumer idea, you may want to start
with the first receiver-based approach, which uses HL consumer as well, and
takes topicCounts.
On Tue, Jul 21, 2015 at 8:23 AM, Hafsa Asif hafsa.a...@matchinguu.com
wrote:
Hi, I have a simple High level Kafka Consumer like :
package matchinguu.kafka.consumer;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import java.util.*;
public class SimpleHLConsumer {
private final ConsumerConnector consumer;
private final String topic;
public SimpleHLConsumer(String zookeeper, String groupId, String topic)
{
Properties props = new Properties();
props.put(zookeeper.connect, zookeeper);
props.put(group.id, groupId);
props.put(zookeeper.session.timeout.ms, 500);
props.put(zookeeper.sync.time.ms, 250);
props.put(auto.commit.interval.ms, 1000);
consumer = Consumer.createJavaConsumerConnector(new
ConsumerConfig(props));
this.topic = topic;
}
public void testConsumer() {
MapString, Integer topicCount = new HashMapString, Integer();
topicCount.put(topic, 1);
MapString, Listlt;KafkaStreamlt;byte[], byte[]
consumerStreams
= consumer.createMessageStreams(topicCount);
ListKafkaStreamlt;byte[], byte[] streams =
consumerStreams.get(topic);
for (final KafkaStream stream : streams) {
ConsumerIteratorbyte[], byte[] it = stream.iterator();
while (it.hasNext()) {
System.out.println();
System.out.println(Message from Single Topic: + new
String(it.next().message()));
}
}
if (consumer != null) {
System.out.println(Shutdown Happens);
consumer.shutdown();
}
}
public static void main(String[] args) {
System.out.println(Consumer is now reading messages from
producer);
//String topic = args[0];
String topic = test;
SimpleHLConsumer simpleHLConsumer = new
SimpleHLConsumer(localhost:2181, testgroup, topic);
simpleHLConsumer.testConsumer();
}
}
I want to get my messages through Spark Java Streaming with Kafka
integration. Can anyone help me to reform this code so that I can get same
output with Spark Kafka integration.
--
View this message in context:
http://apache-spark-user-list.1001560.n3.nabble.com/Convert-Simple-Kafka-Consumer-to-standalone-Spark-JavaStream-Consumer-tp23930.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.
-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org