Re: confirm subscribe to users@kafka.apache.org
On Mon, 24 Nov 2014, at 10:20, users-h...@kafka.apache.org wrote: Hi! This is the ezmlm program. I'm managing the users@kafka.apache.org mailing list. I'm working for my owner, who can be reached at users-ow...@kafka.apache.org. To confirm that you would like d...@dyachkov.org added to the users mailing list, please send a short reply to this address: users-sc.1416820838.ncndionckfhlifhflcaj-dev=dyachkov@kafka.apache.org Usually, this happens when you just hit the reply button. If this does not work, simply copy the address and paste it into the To: field of a new message. or click here: mailto:users-sc.1416820838.ncndionckfhlifhflcaj-dev=dyachkov@kafka.apache.org This confirmation serves two purposes. First, it verifies that I am able to get mail through to you. Second, it protects you in case someone forges a subscription request in your name. Please note that ALL Apache dev- and user- mailing lists are publicly archived. Do familiarize yourself with Apache's public archive policy at http://www.apache.org/foundation/public-archives.html prior to subscribing and posting messages to users@kafka.apache.org. If you're not sure whether or not the policy applies to this mailing list, assume it does unless the list name contains the word private in it. Some mail programs are broken and cannot handle long addresses. If you cannot reply to this request, instead send a message to users-requ...@kafka.apache.org and put the entire address listed above into the Subject: line. --- Administrative commands for the users list --- I can handle administrative requests automatically. Please do not send them to the list address! Instead, send your message to the correct command address: To subscribe to the list, send a message to: users-subscr...@kafka.apache.org To remove your address from the list, send a message to: users-unsubscr...@kafka.apache.org Send mail to the following for info and FAQ for this list: users-i...@kafka.apache.org users-...@kafka.apache.org Similar addresses exist for the digest list: users-digest-subscr...@kafka.apache.org users-digest-unsubscr...@kafka.apache.org To get messages 123 through 145 (a maximum of 100 per request), mail: users-get.123_...@kafka.apache.org To get an index with subject and author for messages 123-456 , mail: users-index.123_...@kafka.apache.org They are always returned as sets of 100, max 2000 per request, so you'll actually get 100-499. To receive all messages with the same subject as message 12345, send a short message to: users-thread.12...@kafka.apache.org The messages should contain one line or word of text to avoid being treated as sp@m, but I will ignore their content. Only the ADDRESS you send to is important. You can start a subscription for an alternate address, for example john@host.domain, just add a hyphen and your address (with '=' instead of '@') after the command word: users-subscribe-john=host.dom...@kafka.apache.org To stop subscription for this address, mail: users-unsubscribe-john=host.dom...@kafka.apache.org In both cases, I'll send a confirmation message to that address. When you receive it, simply reply to it to complete your subscription. If despite following these instructions, you do not get the desired results, please contact my owner at users-ow...@kafka.apache.org. Please be patient, my owner is a lot slower than I am ;-) --- Enclosed is a copy of the request I received. Return-Path: d...@dyachkov.org Received: (qmail 87465 invoked by uid 99); 24 Nov 2014 09:20:38 - Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 24 Nov 2014 09:20:38 + X-ASF-Spam-Status: No, hits=-2.7 required=10.0 tests=ASF_LIST_OPS,RCVD_IN_DNSWL_LOW,SPF_PASS X-Spam-Check-By: apache.org Received-SPF: pass (athena.apache.org: local policy) Received: from [66.111.4.28] (HELO out4-smtp.messagingengine.com) (66.111.4.28) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 24 Nov 2014 09:20:34 + Received: from compute4.internal (compute4.nyi.internal [10.202.2.44]) by mailout.nyi.internal (Postfix) with ESMTP id 18446205FA for users-subscr...@kafka.apache.org; Mon, 24 Nov 2014 04:18:05 -0500 (EST) Received: from web5 ([10.202.2.215]) by compute4.internal (MEProxy); Mon, 24 Nov 2014 04:18:05 -0500 DKIM-Signature: v=1; a=rsa-sha1; c=relaxed/relaxed; d=dyachkov.org; h= message-id:x-sasl-enc:from:to:mime-version :content-transfer-encoding:content-type:subject:date; s=mesmtp; bh=2jmj7l5rSw0yVb/vlWAYkK/YBwk=; b=0KQOnvBf/qxSBySwAXA2pfbGmaY0 OFQIYBaD1elhYOoKcMyfZFzhZCoy0N/MxQoTUsR3ANH/LX5Kx+y0tJi+bUpouSV8 R6ztuIvFrlCxUN/vMnR6Utn35cJypDP3dIwVEKYmtpN5J1LI7kcl21MRHc+jTmgW zfNxw8pWMbjDXGA= DKIM-Signature: v=1; a=rsa-sha1; c=relaxed/relaxed; d=
Is there a plan to build a ubiquitous web service API to manage the kafka cluster
Hi guys, Nowadays, all kafka administration work (add, tear down node, topic management, throughput monitor) are done by various different tool talk to brokers, zookeeper etc. Is there a plan for core team to build a central universal server providing webservice API to do all the admin work? Best, Siyuan
Re: Is there a plan to build a ubiquitous web service API to manage the kafka cluster
That is written up here https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Command+Line+and+Related+Improvements Initially there will be a client API for command line but since it works on the wire protocol anyone can either wrap the API or go directly to the wire protocol to implement it however they want. Work for that has already gotten underway parent ticket https://issues.apache.org/jira/browse/KAFKA-1694 /*** Joe Stein Founder, Principal Consultant Big Data Open Source Security LLC http://www.stealth.ly Twitter: @allthingshadoop http://www.twitter.com/allthingshadoop / On Mon, Nov 24, 2014 at 1:37 PM, hsy...@gmail.com hsy...@gmail.com wrote: Hi guys, Nowadays, all kafka administration work (add, tear down node, topic management, throughput monitor) are done by various different tool talk to brokers, zookeeper etc. Is there a plan for core team to build a central universal server providing webservice API to do all the admin work? Best, Siyuan
Re: BytesInPerSec greater than BytesOutPerSec
Which Kafka version are you using? On Fri, Nov 21, 2014 at 5:29 PM, Allen Wang aw...@netflix.com.invalid wrote: Yes, we have consumers for the topic fetching at the same time and we can see the its BytesPerSec metric. So we find it hard to explain why BytesInPerSec would be greater than BytesOutPerSec on the broker given that we have active consumer and replication. On Fri, Nov 21, 2014 at 4:32 PM, Guozhang Wang wangg...@gmail.com wrote: Are you having consumers fetching from these topics at the same time? BytesInPerSec only counts the bytes appended to the log from the produce request, and BytesOutPerSec counts the bytes fetch from fetch requests; hence both replica fetcher and normal consumer fetcher's requests count in BytesOutPerSec. Guozhang On Fri, Nov 21, 2014 at 11:13 AM, Allen Wang aw...@netflix.com.invalid wrote: We observed that for a topic, BytesIn is greater than BytesOut. We are under the impression that BytesOut should include replication. The topic has two replicas for each partitions and all replicas are in sync. Then BytesOut should be at least same as BytesIn since it always needs to replicate to a follower. Is that right? Thanks. -- -- Guozhang -- -- Guozhang
Two Kafka Question
Hello, First, is there a limit to how many Kafka brokers you can have? Second, if a Kafka broker node fails and I start a new broker on a new node, is it correct to assume that the cluster will copy data to that node to satisfy the replication factor specified for a given topic? In other words, let's assume that I have a 3 node cluster and a topic with a replication factor of 3. If one node fails and I start up a new node, will the new node have existing messages replicated to it? Thanks. Casey
Re: Two Kafka Question
Hi Casey, 1. There's some limit based on size of zookeeper nodes, not sure exactly where it is though. We've seen 30 node clusters running in production. 2. For your scenario to work, the new broker will need to have the same broker id as the old one - or you'll need to manually re-assign partitions. Gwen On Mon, Nov 24, 2014 at 11:15 AM, Sybrandy, Casey casey.sybra...@six3systems.com wrote: Hello, First, is there a limit to how many Kafka brokers you can have? Second, if a Kafka broker node fails and I start a new broker on a new node, is it correct to assume that the cluster will copy data to that node to satisfy the replication factor specified for a given topic? In other words, let's assume that I have a 3 node cluster and a topic with a replication factor of 3. If one node fails and I start up a new node, will the new node have existing messages replicated to it? Thanks. Casey
Does Kafka Producer service ?
Hello Amazing Kafka Creators User, I have learnt and use kafka in our Production system, so you can count my understanding as intermediate. With the statement that Kafka has solved the Scalability and Availability needs for a large scale message publish/subscribe system, I understand that having a Producer Service which sits in between the Application and the Producer defects the one major purpose of Kafka. So, my question is, How to loosely couple Kafka with my Production Application ? The reason being, I wish to do all producer code and Kafka library maintenance without affecting my large scale Production system. Its not an easy thing to buy a window to these type of changes done on a large scale production application :) Any advice on how this can be achieved(even moderately) will greatly help ? Thanks, Krishna Raj
Re: kafka web console running error
Which web console are you using? Thanks, Jun On Fri, Nov 21, 2014 at 8:34 AM, Sa Li sal...@gmail.com wrote: Hi, all I am trying to get kafka web console work, but seems it only works few hours and fails afterwards, below is the error messages on the screen. I am assuming something wrong with the DB, I used to swap H2 to mysql, but didn't help. Anyone has similar problem? - . . at sun.misc.Resource.getByteBuffer(Resource.java:160) ~[na:1.7.0_65] at java.net.URLClassLoader.defineClass(URLClassLoader.java:436) ~[na:1.7.0_65] at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.processBatch$1(BatchingExecutor.scala:67) at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:82) at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59) at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59) at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72) at akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:58) at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:42) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [ERROR] Failed to construct terminal; falling back to unsupported java.io.IOException: Cannot run program sh: error=24, Too many open files at java.lang.ProcessBuilder.start(ProcessBuilder.java:1047) at java.lang.Runtime.exec(Runtime.java:617) at java.lang.Runtime.exec(Runtime.java:485) at jline.internal.TerminalLineSettings.exec(TerminalLineSettings.java:183) at jline.internal.TerminalLineSettings.exec(TerminalLineSettings.java:173) at jline.internal.TerminalLineSettings.stty(TerminalLineSettings.java:168) at jline.internal.TerminalLineSettings.get(TerminalLineSettings.java:72) at jline.internal.TerminalLineSettings.init(TerminalLineSettings.java:52) at jline.UnixTerminal.init(UnixTerminal.java:31) at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:526) at java.lang.Class.newInstance(Class.java:379) [error] a.a.ActorSystemImpl - Uncaught error from thread [play-akka.actor.default-dispatcher-944] shutting down JVM since 'akka.jvm-exit-on-fatal-error' java.lang.NoClassDefFoundError: common/Util$$anonfun$getPartitionsLogSize$3$$anonfun$apply$19$$anonfun$apply$1$$anonfun$applyOrElse$1 at common.Util$$anonfun$getPartitionsLogSize$3$$anonfun$apply$19$$anonfun$apply$1.applyOrElse(Util.scala:75) ~[na:na] at common.Util$$anonfun$getPartitionsLogSize$3$$anonfun$apply$19$$anonfun$apply$1.applyOrElse(Util.scala:74) ~[na:na] at scala.runtime.AbstractPartialFunction$mcJL$sp.apply$mcJL$sp(AbstractPartialFunction.scala:33) ~[scala-library.jar:na] at scala.runtime.AbstractPartialFunction$mcJL$sp.apply(AbstractPartialFunction.scala:33) ~[scala-library.jar:na] at scala.runtime.AbstractPartialFunction$mcJL$sp.apply(AbstractPartialFunction.scala:25) ~[scala-library.jar:na] at scala.util.Failure$$anonfun$recover$1.apply(Try.scala:185) ~[scala-library.jar:na] at jline.TerminalFactory.getFlavor(TerminalFactory.java:168) at jline.TerminalFactory.create(TerminalFactory.java:81) at jline.TerminalFactory.get(TerminalFactory.java:159) at sbt.MainLoop$$anon$1.run(MainLoop.scala:19) at java.lang.Thread.run(Thread.java:745) Caused by: java.io.IOException: error=24, Too many open files at java.lang.UNIXProcess.forkAndExec(Native Method) at java.lang.UNIXProcess.init(UNIXProcess.java:186) at java.lang.ProcessImpl.start(ProcessImpl.java:130) at java.lang.ProcessBuilder.start(ProcessBuilder.java:1028) ... 18 more [error] a.a.ActorSystemImpl - Uncaught error from thread [play-akka.actor.default-dispatcher-943] shutting down JVM since 'akka.jvm-exit-on-fatal-error' java.lang.NoClassDefFoundError: common/Util$$anonfun$getPartitionsLogSize$3$$anonfun$apply$19$$anonfun$apply$1$$anonfun$applyOrElse$1 at
Re: isr never update
Which version of Kafka are you using? Any error in the controller and the state-change log? Thanks, Jun On Fri, Nov 21, 2014 at 5:59 PM, Shangan Chen chenshangan...@gmail.com wrote: In the initial state all replicas are in isr list, but sometimes when I check the topic state, the replica can never become isr even if actually it is synchronized. I saw in the log, the leader print expand isr request,but did not work. I found a interesting thing, the shrink and expand request happened just after the controller switch. I don't know whether it is related, and the controller log is overwrite, so I can not verify. Is there anything I can do to trigger the isr update? Currently, I alter the zookeeper partition state, and it works, but it really need a lot of manual work to do as I have quite a lot of topics in my cluster. Some useful information is as follows. *my replica lag config for default:* replica.lag.time.max.ms=1 replica.lag.max.messages=4000 *controller info:* [zk: localhost:2181(CONNECTED) 4] get /kafka08/controller {version:1,brokerid:29,timestamp:1416608404008} cZxid = 0x5a4c85923 ctime = Sat Nov 22 06:20:04 CST 2014 mZxid = 0x5a4c85923 mtime = Sat Nov 22 06:20:04 CST 2014 pZxid = 0x5a4c85923 cversion = 0 dataVersion = 0 aclVersion = 0 ephemeralOwner = 0x5477ba622cb6c7d dataLength = 55 numChildren = 0 *topic info:* Topic:org.nginx PartitionCount:48 ReplicationFactor:2 Configs: Topic: org.nginxPartition: 0Leader: 17 Replicas: 17,32 Isr: 17,32 Topic: org.nginxPartition: 1Leader: 18 Replicas: 18,33 Isr: 18,33 Topic: org.nginxPartition: 2Leader: 19 Replicas: 19,34 Isr: 34,19 Topic: org.nginxPartition: 3Leader: 20 Replicas: 20,35 Isr: 35,20 Topic: org.nginxPartition: 4Leader: 21 Replicas: 21,36 Isr: 21,36 Topic: org.nginxPartition: 5Leader: 22 Replicas: 22,17 Isr: 17,22 Topic: org.nginxPartition: 6Leader: 23 Replicas: 23,18 Isr: 18,23 Topic: org.nginxPartition: 7Leader: 24 Replicas: 24,19 Isr: 24,19 Topic: org.nginxPartition: 8Leader: 25 Replicas: 25,20 Isr: 25,20 Topic: org.nginxPartition: 9Leader: 26 Replicas: 26,21 Isr: 26,21 Topic: org.nginxPartition: 10 Leader: 27 Replicas: 27,22 Isr: 27,22 Topic: org.nginxPartition: 11 Leader: 28 Replicas: 28,23 Isr: 28,23 Topic: org.nginxPartition: 12 Leader: 29 Replicas: 29,24 Isr: 29 Topic: org.nginxPartition: 13 Leader: 30 Replicas: 30,25 Isr: 30,25 Topic: org.nginxPartition: 14 Leader: 31 Replicas: 31,26 Isr: 26,31 Topic: org.nginxPartition: 15 Leader: 32 Replicas: 32,27 Isr: 27,32 Topic: org.nginxPartition: 16 Leader: 33 Replicas: 33,28 Isr: 33,28 Topic: org.nginxPartition: 17 Leader: 34 Replicas: 34,29 Isr: 29,34 Topic: org.nginxPartition: 18 Leader: 35 Replicas: 35,30 Isr: 30,35 Topic: org.nginxPartition: 19 Leader: 36 Replicas: 36,31 Isr: 31,36 Topic: org.nginxPartition: 20 Leader: 17 Replicas: 17,32 Isr: 17,32 Topic: org.nginxPartition: 21 Leader: 18 Replicas: 18,33 Isr: 18,33 Topic: org.nginxPartition: 22 Leader: 19 Replicas: 19,34 Isr: 34,19 Topic: org.nginxPartition: 23 Leader: 20 Replicas: 20,35 Isr: 35,20 Topic: org.nginxPartition: 24 Leader: 21 Replicas: 21,36 Isr: 21,36 Topic: org.nginxPartition: 25 Leader: 22 Replicas: 22,17 Isr: 17,22 Topic: org.nginxPartition: 26 Leader: 23 Replicas: 23,18 Isr: 18,23 Topic: org.nginxPartition: 27 Leader: 24 Replicas: 24,19 Isr: 24,19 Topic: org.nginxPartition: 28 Leader: 25 Replicas: 25,20 Isr: 25,20 Topic: org.nginxPartition: 29 Leader: 26 Replicas: 26,21 Isr: 26,21 Topic: org.nginxPartition: 30 Leader: 27 Replicas: 27,22 Isr: 27,22 Topic: org.nginxPartition: 31 Leader: 28 Replicas: 28,23 Isr: 28,23 Topic: org.nginxPartition: 32 Leader: 29 Replicas: 29,24 Isr: 29 Topic: org.nginxPartition: 33 Leader: 30 Replicas: 30,25 Isr: 30,25 Topic: org.nginxPartition: 34 Leader: 31 Replicas: 31,26 Isr: 26,31 Topic: org.nginxPartition: 35 Leader: 32 Replicas: 32,27 Isr: 27,32 Topic: org.nginxPartition: 36 Leader: 33 Replicas: 33,28 Isr: 33,28 Topic: org.nginxPartition: 37 Leader: 34 Replicas: 34,29 Isr: 29,34
Re: issues using the new 0.8.2 producer
1. The new producer takes only the new producer configs. 2. There is no longer a pluggable partitioner. By default, if a key is provided, the producer hashes the bytes to get the partition. There is an interface for the client to explicitly specify a partition, if it wants to. 3. Currently, the new producer only takes bytes. We are discussing now if we want to make it take generic types like the old producer. Thanks, Jun On Sun, Nov 23, 2014 at 2:12 AM, Shlomi Hazan shl...@viber.com wrote: Hi, Started to dig into that new producer and have a few questions: 1. what part (if any) of the old producer config still apply to the new producer or is it just what is specified on New Producer Configs? 2. how do you specify a partitioner to the new producer? if no such option, what usage is made with the given key? is it simply hashed with Java's String API? 3. the javadoc example ( ProducerRecord record = new ProducerRecord(the-topic, key, value); ) is incorrect and shows as if creating a producer record takes 3 strings whereas it takes byte arrays for the last two arguments. will the final API be the one documented or rather the one implemented? I am really missing a working example for the new producer so if anyone has one I will be happy to get inspired... Shlomi
Re: Exception in thread main java.lang.ExceptionInInitializerError
It seems you hit an exception when instantiating the sfl4j logger inside the producer. Thanks, Jun On Sun, Nov 23, 2014 at 9:29 PM, Haoming Zhang haoming.zh...@outlook.com wrote: Hi all, Basically I used a lot of codes from this project https://github.com/stealthly/scala-kafka , my idea is to sent a key/value pair to Kafka, so that I can design a partition function in the further. I checked the document and seems I should create a ProducerRecord, then I can specify a partition or key. Follows the codes from stealehly's project, I created a test main function as following: import org.apache.kafka.clients.producer.{ KafkaProducer = NewKafkaProducer } import org.apache.kafka.clients.producer.ProducerConfig import java.util.Properties object Test extends App { val testMessage = UUID.randomUUID().toString val testTopic = 0e7fa3c2-1b75-407b-a03c-f40679ea3ce9 val groupId_1 = UUID.randomUUID().toString val brokerList: String = localhost:9092 val acks: Int = -1 val metadataFetchTimeout: Long = 3000L val blockOnBufferFull: Boolean = true val bufferSize: Long = 1024L * 1024L val retries: Int = 0 val producerProps = new Properties() producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList) producerProps.put(ProducerConfig.ACKS_CONFIG, acks.toString) producerProps.put(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG, metadataFetchTimeout.toString) producerProps.put(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG, blockOnBufferFull.toString) producerProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferSize.toString) producerProps.put(ProducerConfig.RETRIES_CONFIG, retries.toString) producerProps.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 100) producerProps.put(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG, 200) val producer = new NewKafkaProducer(producerProps) val record = new ProducerRecord(testTopic, 0, key.getBytes, testMessage.getBytes) producer.send(record) val consumer = new KafkaConsumer(testTopic, groupId_1, localhost:2181) def exec(binaryObject: Array[Byte]) = { val message = new String(binaryObject) print(testMessage = + testMessage + and consumed message = + message) consumer.close() } print(KafkaSpec is waiting some seconds) consumer.read(exec) print(KafkaSpec consumed) } The KafkaConsumer class is exactly as the same as stealehly's project: https://github.com/stealthly/scala-kafka/blob/master/src/main/scala/KafkaConsumer.scala But when I tried to run the test program, I got the following exception: [2014-11-23 21:15:36,461] INFO ProducerConfig values: block.on.buffer.full = true retry.backoff.ms = 100 buffer.memory = 33554432 batch.size = 16384 metrics.sample.window.ms = 3 metadata.max.age.ms = 30 receive.buffer.bytes = 32768 timeout.ms = 3 max.in.flight.requests.per.connection = 5 metric.reporters = [] bootstrap.servers = [localhost:9092] client.id = compression.type = none retries = 0 max.request.size = 1048576 send.buffer.bytes = 131072 acks = 1 reconnect.backoff.ms = 10 linger.ms = 0 metrics.num.samples = 2 metadata.fetch.timeout.ms = 6 (org.apache.kafka.clients.producer.ProducerConfig) Exception in thread main java.lang.ExceptionInInitializerError at org.slf4j.impl.Log4jLoggerFactory.getLogger(Log4jLoggerFactory.java:73) at org.slf4j.LoggerFactory.getLogger(LoggerFactory.java:243) at org.slf4j.LoggerFactory.getLogger(LoggerFactory.java:255) at org.apache.kafka.clients.producer.KafkaProducer.clinit(KafkaProducer.java:64) at Test$delayedInit$body.apply(Test.scala:47) at scala.Function0$class.apply$mcV$sp(Function0.scala:40) at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12) at scala.App$$anonfun$main$1.apply(App.scala:71) at scala.App$$anonfun$main$1.apply(App.scala:71) at scala.collection.immutable.List.foreach(List.scala:318) at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:32) at scala.App$class.main(App.scala:71) at Test$.main(Test.scala:21) at Test.main(Test.scala) Caused by: java.lang.NullPointerException at org.apache.kafka.clients.producer.KafkaProducer.init(KafkaProducer.java:98) at org.apache.kafka.clients.producer.KafkaProducer.init(KafkaProducer.java:94) at kafka.producer.KafkaLog4jAppender.activateOptions(KafkaLog4jAppender.scala:63) at org.apache.log4j.config.PropertySetter.activate(PropertySetter.java:257) at org.apache.log4j.config.PropertySetter.setProperties(PropertySetter.java:133) at org.apache.log4j.config.PropertySetter.setProperties(PropertySetter.java:97) at org.apache.log4j.PropertyConfigurator.parseAppender(PropertyConfigurator.java:689) at
Re: kafka web console running error
do you see error msg Too many open files? it tips you should modify nofile On Tue, Nov 25, 2014 at 1:26 PM, Jun Rao jun...@gmail.com wrote: Which web console are you using? Thanks, Jun On Fri, Nov 21, 2014 at 8:34 AM, Sa Li sal...@gmail.com wrote: Hi, all I am trying to get kafka web console work, but seems it only works few hours and fails afterwards, below is the error messages on the screen. I am assuming something wrong with the DB, I used to swap H2 to mysql, but didn't help. Anyone has similar problem? - . . at sun.misc.Resource.getByteBuffer(Resource.java:160) ~[na:1.7.0_65] at java.net.URLClassLoader.defineClass(URLClassLoader.java:436) ~[na:1.7.0_65] at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.processBatch$1(BatchingExecutor.scala:67) at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:82) at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59) at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59) at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72) at akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:58) at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:42) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [ERROR] Failed to construct terminal; falling back to unsupported java.io.IOException: Cannot run program sh: error=24, Too many open files at java.lang.ProcessBuilder.start(ProcessBuilder.java:1047) at java.lang.Runtime.exec(Runtime.java:617) at java.lang.Runtime.exec(Runtime.java:485) at jline.internal.TerminalLineSettings.exec(TerminalLineSettings.java:183) at jline.internal.TerminalLineSettings.exec(TerminalLineSettings.java:173) at jline.internal.TerminalLineSettings.stty(TerminalLineSettings.java:168) at jline.internal.TerminalLineSettings.get(TerminalLineSettings.java:72) at jline.internal.TerminalLineSettings.init(TerminalLineSettings.java:52) at jline.UnixTerminal.init(UnixTerminal.java:31) at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:526) at java.lang.Class.newInstance(Class.java:379) [error] a.a.ActorSystemImpl - Uncaught error from thread [play-akka.actor.default-dispatcher-944] shutting down JVM since 'akka.jvm-exit-on-fatal-error' java.lang.NoClassDefFoundError: common/Util$$anonfun$getPartitionsLogSize$3$$anonfun$apply$19$$anonfun$apply$1$$anonfun$applyOrElse$1 at common.Util$$anonfun$getPartitionsLogSize$3$$anonfun$apply$19$$anonfun$apply$1.applyOrElse(Util.scala:75) ~[na:na] at common.Util$$anonfun$getPartitionsLogSize$3$$anonfun$apply$19$$anonfun$apply$1.applyOrElse(Util.scala:74) ~[na:na] at scala.runtime.AbstractPartialFunction$mcJL$sp.apply$mcJL$sp(AbstractPartialFunction.scala:33) ~[scala-library.jar:na] at scala.runtime.AbstractPartialFunction$mcJL$sp.apply(AbstractPartialFunction.scala:33) ~[scala-library.jar:na] at scala.runtime.AbstractPartialFunction$mcJL$sp.apply(AbstractPartialFunction.scala:25) ~[scala-library.jar:na] at scala.util.Failure$$anonfun$recover$1.apply(Try.scala:185) ~[scala-library.jar:na] at jline.TerminalFactory.getFlavor(TerminalFactory.java:168) at jline.TerminalFactory.create(TerminalFactory.java:81) at jline.TerminalFactory.get(TerminalFactory.java:159) at sbt.MainLoop$$anon$1.run(MainLoop.scala:19) at java.lang.Thread.run(Thread.java:745) Caused by: java.io.IOException: error=24, Too many open files at java.lang.UNIXProcess.forkAndExec(Native Method) at java.lang.UNIXProcess.init(UNIXProcess.java:186) at java.lang.ProcessImpl.start(ProcessImpl.java:130) at java.lang.ProcessBuilder.start(ProcessBuilder.java:1028) ... 18 more [error] a.a.ActorSystemImpl - Uncaught error from thread [play-akka.actor.default-dispatcher-943] shutting down JVM
Re: issues using the new 0.8.2 producer
All clear, Thank you. I guess an example will be available when the version is released Shlomi On Tue, Nov 25, 2014 at 7:33 AM, Jun Rao jun...@gmail.com wrote: 1. The new producer takes only the new producer configs. 2. There is no longer a pluggable partitioner. By default, if a key is provided, the producer hashes the bytes to get the partition. There is an interface for the client to explicitly specify a partition, if it wants to. 3. Currently, the new producer only takes bytes. We are discussing now if we want to make it take generic types like the old producer. Thanks, Jun On Sun, Nov 23, 2014 at 2:12 AM, Shlomi Hazan shl...@viber.com wrote: Hi, Started to dig into that new producer and have a few questions: 1. what part (if any) of the old producer config still apply to the new producer or is it just what is specified on New Producer Configs? 2. how do you specify a partitioner to the new producer? if no such option, what usage is made with the given key? is it simply hashed with Java's String API? 3. the javadoc example ( ProducerRecord record = new ProducerRecord(the-topic, key, value); ) is incorrect and shows as if creating a producer record takes 3 strings whereas it takes byte arrays for the last two arguments. will the final API be the one documented or rather the one implemented? I am really missing a working example for the new producer so if anyone has one I will be happy to get inspired... Shlomi