Re: confirm subscribe to users@kafka.apache.org

2014-11-24 Thread Ivan Dyachkov


On Mon, 24 Nov 2014, at 10:20, users-h...@kafka.apache.org wrote:
 Hi! This is the ezmlm program. I'm managing the
 users@kafka.apache.org mailing list.
 
 I'm working for my owner, who can be reached
 at users-ow...@kafka.apache.org.
 
 To confirm that you would like
 
d...@dyachkov.org
 
 added to the users mailing list, please send
 a short reply to this address:
 
users-sc.1416820838.ncndionckfhlifhflcaj-dev=dyachkov@kafka.apache.org
 
 Usually, this happens when you just hit the reply button.
 If this does not work, simply copy the address and paste it into
 the To: field of a new message.
 
 or click here:
   
 mailto:users-sc.1416820838.ncndionckfhlifhflcaj-dev=dyachkov@kafka.apache.org
 
 This confirmation serves two purposes. First, it verifies that I am able
 to get mail through to you. Second, it protects you in case someone
 forges a subscription request in your name.
 
 Please note that ALL Apache dev- and user- mailing lists are publicly
 archived.  Do familiarize yourself with Apache's public archive policy at
 
 http://www.apache.org/foundation/public-archives.html
 
 prior to subscribing and posting messages to users@kafka.apache.org.
 If you're not sure whether or not the policy applies to this mailing list,
 assume it does unless the list name contains the word private in it.
 
 Some mail programs are broken and cannot handle long addresses. If you
 cannot reply to this request, instead send a message to
 users-requ...@kafka.apache.org and put the
 entire address listed above into the Subject: line.
 
 
 --- Administrative commands for the users list ---
 
 I can handle administrative requests automatically. Please
 do not send them to the list address! Instead, send
 your message to the correct command address:
 
 To subscribe to the list, send a message to:
users-subscr...@kafka.apache.org
 
 To remove your address from the list, send a message to:
users-unsubscr...@kafka.apache.org
 
 Send mail to the following for info and FAQ for this list:
users-i...@kafka.apache.org
users-...@kafka.apache.org
 
 Similar addresses exist for the digest list:
users-digest-subscr...@kafka.apache.org
users-digest-unsubscr...@kafka.apache.org
 
 To get messages 123 through 145 (a maximum of 100 per request), mail:
users-get.123_...@kafka.apache.org
 
 To get an index with subject and author for messages 123-456 , mail:
users-index.123_...@kafka.apache.org
 
 They are always returned as sets of 100, max 2000 per request,
 so you'll actually get 100-499.
 
 To receive all messages with the same subject as message 12345,
 send a short message to:
users-thread.12...@kafka.apache.org
 
 The messages should contain one line or word of text to avoid being
 treated as sp@m, but I will ignore their content.
 Only the ADDRESS you send to is important.
 
 You can start a subscription for an alternate address,
 for example john@host.domain, just add a hyphen and your
 address (with '=' instead of '@') after the command word:
 users-subscribe-john=host.dom...@kafka.apache.org
 
 To stop subscription for this address, mail:
 users-unsubscribe-john=host.dom...@kafka.apache.org
 
 In both cases, I'll send a confirmation message to that address. When
 you receive it, simply reply to it to complete your subscription.
 
 If despite following these instructions, you do not get the
 desired results, please contact my owner at
 users-ow...@kafka.apache.org. Please be patient, my owner is a
 lot slower than I am ;-)
 
 --- Enclosed is a copy of the request I received.
 
 Return-Path: d...@dyachkov.org
 Received: (qmail 87465 invoked by uid 99); 24 Nov 2014 09:20:38 -
 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136)
 by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 24 Nov 2014 09:20:38 +
 X-ASF-Spam-Status: No, hits=-2.7 required=10.0
   tests=ASF_LIST_OPS,RCVD_IN_DNSWL_LOW,SPF_PASS
 X-Spam-Check-By: apache.org
 Received-SPF: pass (athena.apache.org: local policy)
 Received: from [66.111.4.28] (HELO out4-smtp.messagingengine.com) 
 (66.111.4.28)
 by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 24 Nov 2014 09:20:34 +
 Received: from compute4.internal (compute4.nyi.internal [10.202.2.44])
   by mailout.nyi.internal (Postfix) with ESMTP id 18446205FA
   for users-subscr...@kafka.apache.org; Mon, 24 Nov 2014 04:18:05 -0500 
 (EST)
 Received: from web5 ([10.202.2.215])
   by compute4.internal (MEProxy); Mon, 24 Nov 2014 04:18:05 -0500
 DKIM-Signature: v=1; a=rsa-sha1; c=relaxed/relaxed; d=dyachkov.org; h=
   message-id:x-sasl-enc:from:to:mime-version
   :content-transfer-encoding:content-type:subject:date; s=mesmtp;
bh=2jmj7l5rSw0yVb/vlWAYkK/YBwk=; b=0KQOnvBf/qxSBySwAXA2pfbGmaY0
   OFQIYBaD1elhYOoKcMyfZFzhZCoy0N/MxQoTUsR3ANH/LX5Kx+y0tJi+bUpouSV8
   R6ztuIvFrlCxUN/vMnR6Utn35cJypDP3dIwVEKYmtpN5J1LI7kcl21MRHc+jTmgW
   zfNxw8pWMbjDXGA=
 DKIM-Signature: v=1; a=rsa-sha1; c=relaxed/relaxed; d=
   

Is there a plan to build a ubiquitous web service API to manage the kafka cluster

2014-11-24 Thread hsy...@gmail.com
Hi guys,

Nowadays, all kafka administration work (add, tear down node, topic
management, throughput monitor) are done by various different tool talk to
brokers, zookeeper etc. Is there a plan for core team to build a central
universal server providing webservice API to do all the admin work?

Best,
Siyuan


Re: Is there a plan to build a ubiquitous web service API to manage the kafka cluster

2014-11-24 Thread Joe Stein
That is written up here
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Command+Line+and+Related+Improvements

Initially there will be a client API for command line but since it works on
the wire protocol anyone can either wrap the API or go directly to the wire
protocol to implement it however they want.

Work for that has already gotten underway parent ticket
https://issues.apache.org/jira/browse/KAFKA-1694

/***
 Joe Stein
 Founder, Principal Consultant
 Big Data Open Source Security LLC
 http://www.stealth.ly
 Twitter: @allthingshadoop http://www.twitter.com/allthingshadoop
/

On Mon, Nov 24, 2014 at 1:37 PM, hsy...@gmail.com hsy...@gmail.com wrote:

 Hi guys,

 Nowadays, all kafka administration work (add, tear down node, topic
 management, throughput monitor) are done by various different tool talk to
 brokers, zookeeper etc. Is there a plan for core team to build a central
 universal server providing webservice API to do all the admin work?

 Best,
 Siyuan



Re: BytesInPerSec greater than BytesOutPerSec

2014-11-24 Thread Guozhang Wang
Which Kafka version are you using?

On Fri, Nov 21, 2014 at 5:29 PM, Allen Wang aw...@netflix.com.invalid
wrote:

 Yes, we have consumers for the topic fetching at the same time and we can
 see the its BytesPerSec metric. So we find it hard to explain why
 BytesInPerSec would be greater than BytesOutPerSec on the broker given that
 we have active consumer and replication.


 On Fri, Nov 21, 2014 at 4:32 PM, Guozhang Wang wangg...@gmail.com wrote:

  Are you having consumers fetching from these topics at the same time?
 
  BytesInPerSec only counts the bytes appended to the log from the produce
  request, and BytesOutPerSec counts the bytes fetch from fetch requests;
  hence both replica fetcher and normal consumer fetcher's requests count
 in
  BytesOutPerSec.
 
  Guozhang
 
 
  On Fri, Nov 21, 2014 at 11:13 AM, Allen Wang aw...@netflix.com.invalid
  wrote:
 
   We observed that for a topic, BytesIn is greater than BytesOut. We are
   under the impression that BytesOut should include replication. The
 topic
   has two replicas for each partitions and all replicas are in sync. Then
   BytesOut should be at least same as BytesIn since it always needs to
   replicate to a follower. Is that right?
  
   Thanks.
  
 
 
 
  --
  -- Guozhang
 




-- 
-- Guozhang


Two Kafka Question

2014-11-24 Thread Sybrandy, Casey
Hello,

First, is there a limit to how many Kafka brokers you can have?

Second, if a Kafka broker node fails and I start a new broker on a new node, is 
it correct to assume that the cluster will copy data to that node to satisfy 
the replication factor specified for a given topic?  In other words, let's 
assume that I have a 3 node cluster and a topic with a replication factor of 3. 
 If one node fails and I start up a new node, will the new node have existing 
messages replicated to it?

Thanks.

Casey


Re: Two Kafka Question

2014-11-24 Thread Gwen Shapira
Hi Casey,

1. There's some limit based on size of zookeeper nodes, not sure exactly
where it is though. We've seen 30 node clusters running in production.

2. For your scenario to work, the new broker will need to have the same
broker id as the old one - or you'll need to manually re-assign partitions.

Gwen

On Mon, Nov 24, 2014 at 11:15 AM, Sybrandy, Casey 
casey.sybra...@six3systems.com wrote:

 Hello,

 First, is there a limit to how many Kafka brokers you can have?

 Second, if a Kafka broker node fails and I start a new broker on a new
 node, is it correct to assume that the cluster will copy data to that node
 to satisfy the replication factor specified for a given topic?  In other
 words, let's assume that I have a 3 node cluster and a topic with a
 replication factor of 3.  If one node fails and I start up a new node, will
 the new node have existing messages replicated to it?

 Thanks.

 Casey



Does Kafka Producer service ?

2014-11-24 Thread Krishna Raj
Hello Amazing Kafka Creators  User,

I have learnt and use kafka in our Production system, so you can count my
understanding as intermediate.

With the statement that Kafka has solved the Scalability and Availability
needs for a large scale message publish/subscribe system, I understand
that having a Producer Service which sits in between the Application and
the Producer defects the one major purpose of Kafka.

So, my question is, How to loosely couple Kafka with my Production
Application ?

The reason being, I wish to do all producer code and Kafka library
maintenance without affecting my large scale Production system. Its not an
easy thing to buy a window to these type of changes done on a large scale
production application :)

Any advice on how this can be achieved(even moderately) will greatly help ?

Thanks,
Krishna Raj


Re: kafka web console running error

2014-11-24 Thread Jun Rao
Which web console are you using?

Thanks,

Jun

On Fri, Nov 21, 2014 at 8:34 AM, Sa Li sal...@gmail.com wrote:

 Hi, all

 I am trying to get kafka web console work, but seems it only works few
 hours and fails afterwards, below is the error messages on the screen. I am
 assuming something wrong with the DB, I used to swap H2 to mysql, but
 didn't help. Anyone has similar problem?


 -
 .
 .


at sun.misc.Resource.getByteBuffer(Resource.java:160) ~[na:1.7.0_65]
 at java.net.URLClassLoader.defineClass(URLClassLoader.java:436)
 ~[na:1.7.0_65]

 at

 akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.processBatch$1(BatchingExecutor.scala:67)
 at

 akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:82)
 at

 akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
 at

 akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
 at
 scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
 at
 akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:58)
 at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:42)
 at

 akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
 at
 scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
 at

 scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
 at
 scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
 at

 scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
 [ERROR] Failed to construct terminal; falling back to unsupported
 java.io.IOException: Cannot run program sh: error=24, Too many open files
 at java.lang.ProcessBuilder.start(ProcessBuilder.java:1047)
 at java.lang.Runtime.exec(Runtime.java:617)
 at java.lang.Runtime.exec(Runtime.java:485)
 at
 jline.internal.TerminalLineSettings.exec(TerminalLineSettings.java:183)
 at
 jline.internal.TerminalLineSettings.exec(TerminalLineSettings.java:173)
 at
 jline.internal.TerminalLineSettings.stty(TerminalLineSettings.java:168)
 at
 jline.internal.TerminalLineSettings.get(TerminalLineSettings.java:72)
 at
 jline.internal.TerminalLineSettings.init(TerminalLineSettings.java:52)
 at jline.UnixTerminal.init(UnixTerminal.java:31)
 at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
 Method)
 at

 sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
 at

 sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
 at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
 at java.lang.Class.newInstance(Class.java:379)
 [error] a.a.ActorSystemImpl - Uncaught error from thread
 [play-akka.actor.default-dispatcher-944] shutting down JVM since
 'akka.jvm-exit-on-fatal-error'
 java.lang.NoClassDefFoundError:

 common/Util$$anonfun$getPartitionsLogSize$3$$anonfun$apply$19$$anonfun$apply$1$$anonfun$applyOrElse$1
 at

 common.Util$$anonfun$getPartitionsLogSize$3$$anonfun$apply$19$$anonfun$apply$1.applyOrElse(Util.scala:75)
 ~[na:na]
 at

 common.Util$$anonfun$getPartitionsLogSize$3$$anonfun$apply$19$$anonfun$apply$1.applyOrElse(Util.scala:74)
 ~[na:na]
 at

 scala.runtime.AbstractPartialFunction$mcJL$sp.apply$mcJL$sp(AbstractPartialFunction.scala:33)
 ~[scala-library.jar:na]
 at

 scala.runtime.AbstractPartialFunction$mcJL$sp.apply(AbstractPartialFunction.scala:33)
 ~[scala-library.jar:na]
 at

 scala.runtime.AbstractPartialFunction$mcJL$sp.apply(AbstractPartialFunction.scala:25)
 ~[scala-library.jar:na]
 at scala.util.Failure$$anonfun$recover$1.apply(Try.scala:185)
 ~[scala-library.jar:na]
 at jline.TerminalFactory.getFlavor(TerminalFactory.java:168)
 at jline.TerminalFactory.create(TerminalFactory.java:81)
 at jline.TerminalFactory.get(TerminalFactory.java:159)
 at sbt.MainLoop$$anon$1.run(MainLoop.scala:19)
 at java.lang.Thread.run(Thread.java:745)
 Caused by: java.io.IOException: error=24, Too many open files
 at java.lang.UNIXProcess.forkAndExec(Native Method)
 at java.lang.UNIXProcess.init(UNIXProcess.java:186)
 at java.lang.ProcessImpl.start(ProcessImpl.java:130)
 at java.lang.ProcessBuilder.start(ProcessBuilder.java:1028)
 ... 18 more

 [error] a.a.ActorSystemImpl - Uncaught error from thread
 [play-akka.actor.default-dispatcher-943] shutting down JVM since
 'akka.jvm-exit-on-fatal-error'
 java.lang.NoClassDefFoundError:

 common/Util$$anonfun$getPartitionsLogSize$3$$anonfun$apply$19$$anonfun$apply$1$$anonfun$applyOrElse$1
 at

 

Re: isr never update

2014-11-24 Thread Jun Rao
Which version of Kafka are you using? Any error in the controller and the
state-change log?

Thanks,

Jun

On Fri, Nov 21, 2014 at 5:59 PM, Shangan Chen chenshangan...@gmail.com
wrote:

 In the initial state all replicas are in isr list, but sometimes when I
 check the topic state, the replica can never become isr even if actually it
 is synchronized. I saw in  the log, the leader print expand isr request,but
 did not work. I found a interesting thing, the shrink and expand request
 happened just after the controller switch. I don't know whether it is
 related, and the controller log is overwrite, so I can not verify. Is there
 anything I can do to trigger the isr update? Currently, I alter the
 zookeeper partition state, and it works, but it really need a lot of manual
 work to do as I have quite a lot of topics in my cluster. Some useful
 information is as follows.

 *my replica lag config for default:*

 replica.lag.time.max.ms=1
 replica.lag.max.messages=4000

 *controller info:*

 [zk: localhost:2181(CONNECTED) 4] get /kafka08/controller
 {version:1,brokerid:29,timestamp:1416608404008}
 cZxid = 0x5a4c85923
 ctime = Sat Nov 22 06:20:04 CST 2014
 mZxid = 0x5a4c85923
 mtime = Sat Nov 22 06:20:04 CST 2014
 pZxid = 0x5a4c85923
 cversion = 0
 dataVersion = 0
 aclVersion = 0
 ephemeralOwner = 0x5477ba622cb6c7d
 dataLength = 55
 numChildren = 0


 *topic info:*

 Topic:org.nginx PartitionCount:48   ReplicationFactor:2 Configs:
 Topic: org.nginxPartition: 0Leader: 17  Replicas:
 17,32 Isr: 17,32
 Topic: org.nginxPartition: 1Leader: 18  Replicas:
 18,33 Isr: 18,33
 Topic: org.nginxPartition: 2Leader: 19  Replicas:
 19,34 Isr: 34,19
 Topic: org.nginxPartition: 3Leader: 20  Replicas:
 20,35 Isr: 35,20
 Topic: org.nginxPartition: 4Leader: 21  Replicas:
 21,36 Isr: 21,36
 Topic: org.nginxPartition: 5Leader: 22  Replicas:
 22,17 Isr: 17,22
 Topic: org.nginxPartition: 6Leader: 23  Replicas:
 23,18 Isr: 18,23
 Topic: org.nginxPartition: 7Leader: 24  Replicas:
 24,19 Isr: 24,19
 Topic: org.nginxPartition: 8Leader: 25  Replicas:
 25,20 Isr: 25,20
 Topic: org.nginxPartition: 9Leader: 26  Replicas:
 26,21 Isr: 26,21
 Topic: org.nginxPartition: 10   Leader: 27  Replicas:
 27,22 Isr: 27,22
 Topic: org.nginxPartition: 11   Leader: 28  Replicas:
 28,23 Isr: 28,23
 Topic: org.nginxPartition: 12   Leader: 29  Replicas:
 29,24 Isr: 29
 Topic: org.nginxPartition: 13   Leader: 30  Replicas:
 30,25 Isr: 30,25
 Topic: org.nginxPartition: 14   Leader: 31  Replicas:
 31,26 Isr: 26,31
 Topic: org.nginxPartition: 15   Leader: 32  Replicas:
 32,27 Isr: 27,32
 Topic: org.nginxPartition: 16   Leader: 33  Replicas:
 33,28 Isr: 33,28
 Topic: org.nginxPartition: 17   Leader: 34  Replicas:
 34,29 Isr: 29,34
 Topic: org.nginxPartition: 18   Leader: 35  Replicas:
 35,30 Isr: 30,35
 Topic: org.nginxPartition: 19   Leader: 36  Replicas:
 36,31 Isr: 31,36
 Topic: org.nginxPartition: 20   Leader: 17  Replicas:
 17,32 Isr: 17,32
 Topic: org.nginxPartition: 21   Leader: 18  Replicas:
 18,33 Isr: 18,33
 Topic: org.nginxPartition: 22   Leader: 19  Replicas:
 19,34 Isr: 34,19
 Topic: org.nginxPartition: 23   Leader: 20  Replicas:
 20,35 Isr: 35,20
 Topic: org.nginxPartition: 24   Leader: 21  Replicas:
 21,36 Isr: 21,36
 Topic: org.nginxPartition: 25   Leader: 22  Replicas:
 22,17 Isr: 17,22
 Topic: org.nginxPartition: 26   Leader: 23  Replicas:
 23,18 Isr: 18,23
 Topic: org.nginxPartition: 27   Leader: 24  Replicas:
 24,19 Isr: 24,19
 Topic: org.nginxPartition: 28   Leader: 25  Replicas:
 25,20 Isr: 25,20
 Topic: org.nginxPartition: 29   Leader: 26  Replicas:
 26,21 Isr: 26,21
 Topic: org.nginxPartition: 30   Leader: 27  Replicas:
 27,22 Isr: 27,22
 Topic: org.nginxPartition: 31   Leader: 28  Replicas:
 28,23 Isr: 28,23
 Topic: org.nginxPartition: 32   Leader: 29  Replicas:
 29,24 Isr: 29
 Topic: org.nginxPartition: 33   Leader: 30  Replicas:
 30,25 Isr: 30,25
 Topic: org.nginxPartition: 34   Leader: 31  Replicas:
 31,26 Isr: 26,31
 Topic: org.nginxPartition: 35   Leader: 32  Replicas:
 32,27 Isr: 27,32
 Topic: org.nginxPartition: 36   Leader: 33  Replicas:
 33,28 Isr: 33,28
 Topic: org.nginxPartition: 37   Leader: 34  Replicas:
 34,29 Isr: 29,34
 

Re: issues using the new 0.8.2 producer

2014-11-24 Thread Jun Rao
1. The new producer takes only the new producer configs.

2. There is no longer a pluggable partitioner. By default, if a key is
provided, the producer hashes the bytes to get the partition. There is an
interface for the client to explicitly specify a partition, if it wants to.

3. Currently, the new producer only takes bytes. We are discussing now if
we want to make it take generic types like the old producer.

Thanks,

Jun

On Sun, Nov 23, 2014 at 2:12 AM, Shlomi Hazan shl...@viber.com wrote:

 Hi,
 Started to dig into that new producer and have a few questions:
 1. what part (if any) of the old producer config still apply to the new
 producer or is it just what is specified on New Producer Configs?
 2. how do you specify a partitioner to the new producer? if no such option,
 what usage is made with the given key? is it simply hashed with Java's
 String API?
 3. the javadoc example (

 ProducerRecord record = new ProducerRecord(the-topic, key, value);

 ) is incorrect and shows as if creating a producer record takes 3 strings
 whereas it takes byte arrays for the last two arguments. will the final API
 be the one documented or rather the one implemented?

 I am really missing a working example for the new producer so if anyone has
 one I will be happy to get inspired...
 Shlomi



Re: Exception in thread main java.lang.ExceptionInInitializerError

2014-11-24 Thread Jun Rao
It seems you hit an exception when instantiating the sfl4j logger inside
the producer.

Thanks,

Jun

On Sun, Nov 23, 2014 at 9:29 PM, Haoming Zhang haoming.zh...@outlook.com
wrote:




 Hi all,

 Basically I used a lot of codes from this project
 https://github.com/stealthly/scala-kafka , my idea is to sent a key/value
 pair to Kafka, so that I can design a partition function in the further.

 I checked the document and seems I should create a ProducerRecord, then I
 can specify a partition or key. Follows the codes from stealehly's project,
 I created a test main function as following:
 import org.apache.kafka.clients.producer.{ KafkaProducer =
 NewKafkaProducer }
 import org.apache.kafka.clients.producer.ProducerConfig
 import java.util.Properties

 object Test extends App {
 val testMessage = UUID.randomUUID().toString
 val testTopic = 0e7fa3c2-1b75-407b-a03c-f40679ea3ce9
 val groupId_1 = UUID.randomUUID().toString

 val brokerList: String = localhost:9092
 val acks: Int = -1
 val metadataFetchTimeout: Long = 3000L
 val blockOnBufferFull: Boolean = true
 val bufferSize: Long = 1024L * 1024L
 val retries: Int = 0

 val producerProps = new Properties()
 producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
 producerProps.put(ProducerConfig.ACKS_CONFIG, acks.toString)
 producerProps.put(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG,
 metadataFetchTimeout.toString)
 producerProps.put(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG,
 blockOnBufferFull.toString)
 producerProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG,
 bufferSize.toString)
 producerProps.put(ProducerConfig.RETRIES_CONFIG, retries.toString)
 producerProps.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 100)
 producerProps.put(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG, 200)

 val producer = new NewKafkaProducer(producerProps)

 val record = new ProducerRecord(testTopic, 0, key.getBytes,
 testMessage.getBytes)
 producer.send(record)

 val consumer = new KafkaConsumer(testTopic, groupId_1,
 localhost:2181)

 def exec(binaryObject: Array[Byte]) = {
   val message = new String(binaryObject)
   print(testMessage =  + testMessage +  and consumed message =  +
 message)
   consumer.close()
 }

 print(KafkaSpec is waiting some seconds)
 consumer.read(exec)
 print(KafkaSpec consumed)
 }

 The KafkaConsumer class is exactly as the same as stealehly's project:
 https://github.com/stealthly/scala-kafka/blob/master/src/main/scala/KafkaConsumer.scala

 But when I tried to run the test program, I got the following exception:
 [2014-11-23 21:15:36,461] INFO ProducerConfig values:
 block.on.buffer.full = true
 retry.backoff.ms = 100
 buffer.memory = 33554432
 batch.size = 16384
 metrics.sample.window.ms = 3
 metadata.max.age.ms = 30
 receive.buffer.bytes = 32768
 timeout.ms = 3
 max.in.flight.requests.per.connection = 5
 metric.reporters = []
 bootstrap.servers = [localhost:9092]
 client.id =
 compression.type = none
 retries = 0
 max.request.size = 1048576
 send.buffer.bytes = 131072
 acks = 1
 reconnect.backoff.ms = 10
 linger.ms = 0
 metrics.num.samples = 2
 metadata.fetch.timeout.ms = 6
  (org.apache.kafka.clients.producer.ProducerConfig)
 Exception in thread main java.lang.ExceptionInInitializerError
 at
 org.slf4j.impl.Log4jLoggerFactory.getLogger(Log4jLoggerFactory.java:73)
 at org.slf4j.LoggerFactory.getLogger(LoggerFactory.java:243)
 at org.slf4j.LoggerFactory.getLogger(LoggerFactory.java:255)
 at
 org.apache.kafka.clients.producer.KafkaProducer.clinit(KafkaProducer.java:64)
 at Test$delayedInit$body.apply(Test.scala:47)
 at scala.Function0$class.apply$mcV$sp(Function0.scala:40)
 at
 scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
 at scala.App$$anonfun$main$1.apply(App.scala:71)
 at scala.App$$anonfun$main$1.apply(App.scala:71)
 at scala.collection.immutable.List.foreach(List.scala:318)
 at
 scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:32)
 at scala.App$class.main(App.scala:71)
 at Test$.main(Test.scala:21)
 at Test.main(Test.scala)
 Caused by: java.lang.NullPointerException
 at
 org.apache.kafka.clients.producer.KafkaProducer.init(KafkaProducer.java:98)
 at
 org.apache.kafka.clients.producer.KafkaProducer.init(KafkaProducer.java:94)
 at
 kafka.producer.KafkaLog4jAppender.activateOptions(KafkaLog4jAppender.scala:63)
 at
 org.apache.log4j.config.PropertySetter.activate(PropertySetter.java:257)
 at
 org.apache.log4j.config.PropertySetter.setProperties(PropertySetter.java:133)
 at
 org.apache.log4j.config.PropertySetter.setProperties(PropertySetter.java:97)
 at
 org.apache.log4j.PropertyConfigurator.parseAppender(PropertyConfigurator.java:689)
 at
 

Re: kafka web console running error

2014-11-24 Thread Yang Fang
do you see error msg Too many open files? it tips you should modify
nofile

On Tue, Nov 25, 2014 at 1:26 PM, Jun Rao jun...@gmail.com wrote:

 Which web console are you using?

 Thanks,

 Jun

 On Fri, Nov 21, 2014 at 8:34 AM, Sa Li sal...@gmail.com wrote:

  Hi, all
 
  I am trying to get kafka web console work, but seems it only works few
  hours and fails afterwards, below is the error messages on the screen. I
 am
  assuming something wrong with the DB, I used to swap H2 to mysql, but
  didn't help. Anyone has similar problem?
 
 
  -
  .
  .
 
 
 at sun.misc.Resource.getByteBuffer(Resource.java:160) ~[na:1.7.0_65]
  at java.net.URLClassLoader.defineClass(URLClassLoader.java:436)
  ~[na:1.7.0_65]
 
  at
 
 
 akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.processBatch$1(BatchingExecutor.scala:67)
  at
 
 
 akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:82)
  at
 
 
 akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
  at
 
 
 akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
  at
  scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
  at
  akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:58)
  at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:42)
  at
 
 
 akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
  at
  scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
  at
 
 
 scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
  at
  scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
  at
 
 
 scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
  [ERROR] Failed to construct terminal; falling back to unsupported
  java.io.IOException: Cannot run program sh: error=24, Too many open
 files
  at java.lang.ProcessBuilder.start(ProcessBuilder.java:1047)
  at java.lang.Runtime.exec(Runtime.java:617)
  at java.lang.Runtime.exec(Runtime.java:485)
  at
  jline.internal.TerminalLineSettings.exec(TerminalLineSettings.java:183)
  at
  jline.internal.TerminalLineSettings.exec(TerminalLineSettings.java:173)
  at
  jline.internal.TerminalLineSettings.stty(TerminalLineSettings.java:168)
  at
  jline.internal.TerminalLineSettings.get(TerminalLineSettings.java:72)
  at
  jline.internal.TerminalLineSettings.init(TerminalLineSettings.java:52)
  at jline.UnixTerminal.init(UnixTerminal.java:31)
  at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
  Method)
  at
 
 
 sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
  at
 
 
 sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
  at
 java.lang.reflect.Constructor.newInstance(Constructor.java:526)
  at java.lang.Class.newInstance(Class.java:379)
  [error] a.a.ActorSystemImpl - Uncaught error from thread
  [play-akka.actor.default-dispatcher-944] shutting down JVM since
  'akka.jvm-exit-on-fatal-error'
  java.lang.NoClassDefFoundError:
 
 
 common/Util$$anonfun$getPartitionsLogSize$3$$anonfun$apply$19$$anonfun$apply$1$$anonfun$applyOrElse$1
  at
 
 
 common.Util$$anonfun$getPartitionsLogSize$3$$anonfun$apply$19$$anonfun$apply$1.applyOrElse(Util.scala:75)
  ~[na:na]
  at
 
 
 common.Util$$anonfun$getPartitionsLogSize$3$$anonfun$apply$19$$anonfun$apply$1.applyOrElse(Util.scala:74)
  ~[na:na]
  at
 
 
 scala.runtime.AbstractPartialFunction$mcJL$sp.apply$mcJL$sp(AbstractPartialFunction.scala:33)
  ~[scala-library.jar:na]
  at
 
 
 scala.runtime.AbstractPartialFunction$mcJL$sp.apply(AbstractPartialFunction.scala:33)
  ~[scala-library.jar:na]
  at
 
 
 scala.runtime.AbstractPartialFunction$mcJL$sp.apply(AbstractPartialFunction.scala:25)
  ~[scala-library.jar:na]
  at scala.util.Failure$$anonfun$recover$1.apply(Try.scala:185)
  ~[scala-library.jar:na]
  at jline.TerminalFactory.getFlavor(TerminalFactory.java:168)
  at jline.TerminalFactory.create(TerminalFactory.java:81)
  at jline.TerminalFactory.get(TerminalFactory.java:159)
  at sbt.MainLoop$$anon$1.run(MainLoop.scala:19)
  at java.lang.Thread.run(Thread.java:745)
  Caused by: java.io.IOException: error=24, Too many open files
  at java.lang.UNIXProcess.forkAndExec(Native Method)
  at java.lang.UNIXProcess.init(UNIXProcess.java:186)
  at java.lang.ProcessImpl.start(ProcessImpl.java:130)
  at java.lang.ProcessBuilder.start(ProcessBuilder.java:1028)
  ... 18 more
 
  [error] a.a.ActorSystemImpl - Uncaught error from thread
  [play-akka.actor.default-dispatcher-943] shutting down JVM 

Re: issues using the new 0.8.2 producer

2014-11-24 Thread Shlomi Hazan
All clear, Thank you.
I guess an example will be available when the version is released
Shlomi

On Tue, Nov 25, 2014 at 7:33 AM, Jun Rao jun...@gmail.com wrote:

 1. The new producer takes only the new producer configs.

 2. There is no longer a pluggable partitioner. By default, if a key is
 provided, the producer hashes the bytes to get the partition. There is an
 interface for the client to explicitly specify a partition, if it wants to.

 3. Currently, the new producer only takes bytes. We are discussing now if
 we want to make it take generic types like the old producer.

 Thanks,

 Jun

 On Sun, Nov 23, 2014 at 2:12 AM, Shlomi Hazan shl...@viber.com wrote:

  Hi,
  Started to dig into that new producer and have a few questions:
  1. what part (if any) of the old producer config still apply to the new
  producer or is it just what is specified on New Producer Configs?
  2. how do you specify a partitioner to the new producer? if no such
 option,
  what usage is made with the given key? is it simply hashed with Java's
  String API?
  3. the javadoc example (
 
  ProducerRecord record = new ProducerRecord(the-topic, key, value);
 
  ) is incorrect and shows as if creating a producer record takes 3 strings
  whereas it takes byte arrays for the last two arguments. will the final
 API
  be the one documented or rather the one implemented?
 
  I am really missing a working example for the new producer so if anyone
 has
  one I will be happy to get inspired...
  Shlomi