Hi, Thanks for the reply.
I tried to consume from two different topics in same app , I am getting
error (*java.lang.NoSuchMethodError:
kafka.utils.ZkUtils.getAllBrokersInCluster(Lorg/I0Itec/zkclient/ZkClient;)Lscala/collection/Seq;*)
.
When I tried consuming from kafka 9 using this(KafkaSinglePortInputOperator)
operator, I was able to do it successfully , but when I am adding another
one more operator(KafkaSinglePortByteArrayInputOperator) to consume from .8
in same dag I am getting the error.
For testing I am not merging kafka output to any operator, it is writing at
two different location in HDFS.
Looks like there is some version issue comming , which I am not able to
identify . Any help is highly appreciated.
My pom.xml looks like this=
<properties>
<apex.version>3.4.0</apex.version>
<malhar.version>3.6.0</malhar.version>
<apex.apppackage.classpath>lib/*.jar</apex.apppackage.classpath>
<hadoop.version>2.7.1.2.3.4.0-3485</hadoop.version>
<hbase.version>1.1.2.2.3.4.0-3485</hbase.version>
<kafka.version>0.9.0.1</kafka.version>
<confluent.kafka.version>0.9.0.1-cp1</confluent.kafka.version>
<kafka.avro.srlzr.version>2.0.1</kafka.avro.srlzr.version>
<avro.version>1.7.7</avro.version>
<json.version>1.1</json.version>
<jodatime.version>2.9.1</jodatime.version>
<kyroserializer.version>0.38</kyroserializer.version>
<junit.version>4.10</junit.version>
</properties>
<repositories>
<repository>
<id>HDPReleases</id>
<name>HDP Releases</name>
<url>http://repo.hortonworks.com/content/repositories/releases/</url>
<layout>default</layout>
</repository>
<repository>
<id>HDP Jetty Hadoop</id>
<name>HDP Jetty Hadoop</name>
<url>http://repo.hortonworks.com/content/repositories/jetty-hadoop/</url>
<layout>default</layout>
</repository>
<repository>
<id>confluent</id>
<url>http://packages.confluent.io/maven</url>
</repository>
</repositories>
<dependencies>
<dependency>
<groupId>org.apache.apex</groupId>
<artifactId>malhar-library</artifactId>
<version>${malhar.version}</version>
</dependency>
<dependency>
<groupId>org.apache.apex</groupId>
<artifactId>apex-common</artifactId>
<version>${apex.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>${junit.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.apex</groupId>
<artifactId>apex-engine</artifactId>
<version>${apex.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.apex</groupId>
<artifactId>malhar-contrib</artifactId>
<version>${malhar.version}</version>
</dependency>
<dependency>
<groupId>org.apache.apex</groupId>
<artifactId>malhar-kafka</artifactId>
<version>${malhar.version}</version>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>${avro.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>${confluent.kafka.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.8.1.1</version>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<version>${kafka.avro.srlzr.version}</version>
<exclusions>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.googlecode.json-simple</groupId>
<artifactId>json-simple</artifactId>
<version>${json.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>${hbase.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
<version>${jodatime.version}</version>
</dependency>
<dependency>
<groupId>de.javakaffee</groupId>
<artifactId>kryo-serializers</artifactId>
<version>${kyroserializer.version}</version>
</dependency>
</dependencies>
My DAG looks like this=>
public void populateDAG(DAG dag, Configuration conf)
{
KafkaSinglePortInputOperator kafkaInTtce =
dag.addOperator("Kafka_Input_SSL",new KafkaSinglePortInputOperator());
kafkaInTtce.setInitialPartitionCount(Integer.parseInt(conf.get("kafka.partitioncount")));
kafkaInTtce.setTopics(conf.get("kafka.ssl.topic"));
kafkaInTtce.setInitialOffset(conf.get("kafka.offset"));
kafkaInTtce.setClusters(conf.get("kafka.cluster"));
kafkaInTtce.setConsumerProps(getKafkaProperties(conf.get("kafka.cluster"),
conf));
kafkaInTtce.setStrategy(conf.get("kafka.strategy"));
AvroBytesConversionOperator avroConversion =
dag.addOperator("Avro_Convert", new
AvroBytesConversionOperator(conf.get("kafka.schema.registry")));
ColumnsExtractOperator fieldExtract = dag.addOperator("Field_Extract",
new ColumnsExtractOperator());
WriteToHdfs hdfs = dag.addOperator("To_HDFS", new
WriteToHdfs(conf.get("hdfs.filename")));
hdfs.setMaxLength(268435456); // new file rotates after every 256mb
dag.addStream("Kafka_Avro_Msg_Byte_Stream", kafkaInTtce.outputPort,
avroConversion.input);
dag.addStream("jsonstring_stream", avroConversion.output,
fieldExtract.input);
dag.addStream("valid_recs_into_hdfs_stream", fieldExtract.output,
hdfs.input);
KafkaSinglePortByteArrayInputOperator kafkaInput =
dag.addOperator("Kafka_Input_NonSSL", new
KafkaSinglePortByteArrayInputOperator());
CopyofAvroBytesConversionOperator avroConversionEstore =
dag.addOperator("Avro_Convert_estore", new
CopyofAvroBytesConversionOperator("http://--------"));
CopyOfColumnsExtractOperator fieldExtractEstore =
dag.addOperator("Field_Extract_Estore", new CopyOfColumnsExtractOperator());
WriteToHdfs2 hdfs2 = dag.addOperator("To_HDFS2", new
WriteToHdfs2("DeviceTypeEstore"));
hdfs2.setMaxLength(268435456);
dag.addStream("Kafka_Avro_estore_Stream", kafkaInput.outputPort,
avroConversionEstore.input);
dag.addStream("jsonstring_stream_estore", avroConversionEstore.output,
fieldExtractEstore.input);
dag.addStream("valid_recs_into_hdfs_estorestream",
fieldExtractEstore.output, hdfs2.input);
}
Error I am getting(.dt log)=>
2017-08-01 04:57:38,281 INFO stram.StreamingAppMaster
(StreamingAppMaster.java:main(99)) - Initializing Application Master.
2017-08-01 04:57:38,388 INFO stram.StreamingAppMasterService
(StreamingAppMasterService.java:serviceInit(537)) - Application master,
appId=507386, clustertimestamp=1500406884031, attemptId=2
2017-08-01 04:57:38,622 WARN util.NativeCodeLoader
(NativeCodeLoader.java:<clinit>(62)) - Unable to load native-hadoop library
for your platform... using builtin-java classes where applicable
2017-08-01 04:57:39,441 WARN shortcircuit.DomainSocketFactory
(DomainSocketFactory.java:<init>(117)) - The short-circuit local reads
feature cannot be used because libhadoop cannot be loaded.
2017-08-01 04:57:40,088 INFO kafka.AbstractKafkaInputOperator
(AbstractKafkaInputOperator.java:initPartitioner(327)) - Initialize
Partitioner
2017-08-01 04:57:40,089 INFO kafka.AbstractKafkaInputOperator
(AbstractKafkaInputOperator.java:initPartitioner(340)) - Actual Partitioner
is class org.apache.apex.malhar.kafka.OneToManyPartitioner
2017-08-01 04:57:40,121 INFO kafka.AbstractKafkaPartitioner
(AbstractKafkaPartitioner.java:initMetadataClients(234)) - Consumer
Properties : #
#Tue Aug 01 04:57:40 CDT 2017
security.protocol=SSL
enable.auto.commit=false
value.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer
group.id=org.apache.apex.malhar.kafka.AbstractKafkaInputOperatorMETA_GROUP
ssl.keystore.password=
ssl.truststore.location=/home_dir/client.truststore.jks
bootstrap.servers=
ssl.truststore.password=
ssl.keystore.location=/home_dir/server.keystore.jks
key.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer
2017-08-01 04:57:40,290 INFO utils.AppInfoParser
(AppInfoParser.java:<init>(82)) - Kafka version : 0.9.0.0
2017-08-01 04:57:40,291 INFO utils.AppInfoParser
(AppInfoParser.java:<init>(83)) - Kafka commitId : fc7243c2af4b2b4a
2017-08-01 04:57:41,306 INFO kafka.AbstractKafkaPartitioner
(AbstractKafkaPartitioner.java:definePartitions(151)) - Partition change
detected:
2017-08-01 04:57:41,307 INFO kafka.AbstractKafkaPartitioner
(AbstractKafkaPartitioner.java:definePartitions(170)) - [New] Partition 0
with assignment PartitionMeta{cluster='10.66.137.93:9093',
topicPartition=firefly-apps-superapp-21};PartitionMeta{cluster='10.66.137.93:9093',
topicPartition=firefly-apps-superapp-22};PartitionMeta{cluster='10.66.137.93:9093',
topicPartition=firefly-apps-superapp-23};PartitionMeta{cluster='10.66.137.93:9093',
topicPartition=firefly-apps-superapp-24};PartitionMeta{cluster='10.66.137.93:9093',
topicPartition=firefly-apps-superapp-18};PartitionMeta{cluster='10.66.137.93:9093',
topicPartition=firefly-apps-superapp-19};PartitionMeta{cluster='10.66.137.93:9093',
topicPartition=firefly-apps-superapp-20};PartitionMeta{cluster='10.66.137.93:9093',
topicPartition=firefly-apps-superapp-14};PartitionMeta{cluster='10.66.137.93:9093',
topicPartition=firefly-apps-superapp-25};PartitionMeta{cluster='10.66.137.93:9093',
topicPartition=firefly-apps-superapp-26}
2017-08-01 04:57:41,318 INFO kafka.AbstractKafkaPartitioner
(AbstractKafkaPartitioner.java:definePartitions(170)) - [New] Partition 1
with assignment PartitionMeta{cluster='10.66.137.93:9093',
topicPartition=firefly-apps-superapp-6};PartitionMeta{cluster='10.66.137.93:9093',
topicPartition=firefly-apps-superapp-8};PartitionMeta{cluster='10.66.137.93:9093',
topicPartition=firefly-apps-superapp-2};PartitionMeta{cluster='10.66.137.93:9093',
topicPartition=firefly-apps-superapp-3};PartitionMeta{cluster='10.66.137.93:9093',
topicPartition=firefly-apps-superapp-13};PartitionMeta{cluster='10.66.137.93:9093',
topicPartition=firefly-apps-superapp-16};PartitionMeta{cluster='10.66.137.93:9093',
topicPartition=firefly-apps-superapp-9};PartitionMeta{cluster='10.66.137.93:9093',
topicPartition=firefly-apps-superapp-10};PartitionMeta{cluster='10.66.137.93:9093',
topicPartition=firefly-apps-superapp-11};PartitionMeta{cluster='10.66.137.93:9093',
topicPartition=firefly-apps-superapp-28}
2017-08-01 04:57:41,322 INFO kafka.AbstractKafkaPartitioner
(AbstractKafkaPartitioner.java:definePartitions(170)) - [New] Partition 2
with assignment PartitionMeta{cluster='10.66.137.93:9093',
topicPartition=firefly-apps-superapp-5};PartitionMeta{cluster='10.66.137.93:9093',
topicPartition=firefly-apps-superapp-7};PartitionMeta{cluster='10.66.137.93:9093',
topicPartition=firefly-apps-superapp-17};PartitionMeta{cluster='10.66.137.93:9093',
topicPartition=firefly-apps-superapp-1};PartitionMeta{cluster='10.66.137.93:9093',
topicPartition=firefly-apps-superapp-4};PartitionMeta{cluster='10.66.137.93:9093',
topicPartition=firefly-apps-superapp-29};PartitionMeta{cluster='10.66.137.93:9093',
topicPartition=firefly-apps-superapp-15};PartitionMeta{cluster='10.66.137.93:9093',
topicPartition=firefly-apps-superapp-0};PartitionMeta{cluster='10.66.137.93:9093',
topicPartition=firefly-apps-superapp-27};PartitionMeta{cluster='10.66.137.93:9093',
topicPartition=firefly-apps-superapp-12}
2017-08-01 04:57:41,365 INFO zkclient.ZkEventThread
(ZkEventThread.java:run(64)) - Starting ZkClient event thread.
2017-08-01 04:57:41,372 INFO zookeeper.ZooKeeper
(Environment.java:logEnv(100)) - Client
environment:zookeeper.version=3.4.6-1569965, built on 02/20/2014 09:09 GMT
2017-08-01 04:57:41,372 INFO zookeeper.ZooKeeper
(Environment.java:logEnv(100)) - Client
environment:host.name=brdn1351.target.com
2017-08-01 04:57:41,373 INFO zookeeper.ZooKeeper
(Environment.java:logEnv(100)) - Client environment:java.version=1.8.0_73
2017-08-01 04:57:41,373 INFO zookeeper.ZooKeeper
(Environment.java:logEnv(100)) - Client environment:java.vendor=Oracle
Corporation
2017-08-01 04:57:41,373 INFO zookeeper.ZooKeeper
(Environment.java:logEnv(100)) - Client
environment:java.home=/usr/java/jdk1.8.0_73/jre
connectString=10.66.137.94:2181,10.66.137.95:2181,10.66.137.96:2181,10.66.137.97:2181,10.66.137.98:2181
sessionTimeout=30000 watcher=org.I0Itec.zkclient.ZkClient@6d64b553
2017-08-01 04:57:41,388 INFO zkclient.ZkClient
(ZkClient.java:waitForKeeperState(934)) - Waiting for keeper state
SyncConnected
2017-08-01 04:57:41,392 INFO zookeeper.ClientCnxn
(ClientCnxn.java:logStartConnect(975)) - Opening socket connection to server
10.66.137.97/10.66.137.97:2181. Will not attempt to authenticate using SASL
(unknown error)
2017-08-01 04:57:41,393 INFO zookeeper.ClientCnxn
(ClientCnxn.java:primeConnection(852)) - Socket connection established to
10.66.137.97/10.66.137.97:2181, initiating session
2017-08-01 04:57:41,445 INFO zookeeper.ClientCnxn
(ClientCnxn.java:onConnected(1235)) - Session establishment complete on
server 10.66.137.97/10.66.137.97:2181, sessionid = 0x350dafbffc66af1,
negotiated timeout = 30000
2017-08-01 04:57:41,447 INFO zkclient.ZkClient
(ZkClient.java:processStateChanged(711)) - zookeeper state changed
(SyncConnected)
2017-08-01 04:57:41,450 ERROR stram.StreamingAppMaster
(StreamingAppMaster.java:main(106)) - Exiting Application Master
java.lang.NoSuchMethodError:
kafka.utils.ZkUtils.getAllBrokersInCluster(Lorg/I0Itec/zkclient/ZkClient;)Lscala/collection/Seq;
at
com.datatorrent.contrib.kafka.KafkaMetadataUtil.getBrokers(KafkaMetadataUtil.java:117)
at
com.datatorrent.contrib.kafka.KafkaConsumer.initBrokers(KafkaConsumer.java:139)
at
com.datatorrent.contrib.kafka.AbstractKafkaInputOperator.definePartitions(AbstractKafkaInputOperator.java:506)
at
com.datatorrent.stram.plan.physical.PhysicalPlan.initPartitioning(PhysicalPlan.java:752)
at
com.datatorrent.stram.plan.physical.PhysicalPlan.addLogicalOperator(PhysicalPlan.java:1676)
at
com.datatorrent.stram.plan.physical.PhysicalPlan.<init>(PhysicalPlan.java:378)
at
com.datatorrent.stram.StreamingContainerManager.<init>(StreamingContainerManager.java:418)
at
com.datatorrent.stram.StreamingContainerManager.getInstance(StreamingContainerManager.java:3023)
at
com.datatorrent.stram.StreamingAppMasterService.serviceInit(StreamingAppMasterService.java:551)
at
org.apache.hadoop.service.AbstractService.init(AbstractService.java:163)
at
com.datatorrent.stram.StreamingAppMaster.main(StreamingAppMaster.java:102)
--
View this message in context:
http://apache-apex-users-list.78494.x6.nabble.com/How-to-consume-from-two-different-topics-in-one-apex-application-tp1797p1801.html
Sent from the Apache Apex Users list mailing list archive at Nabble.com.