Re: How to create a topic using the Java API / Client?

2016-09-14 Thread Ali Akhtar
I'm guessing its not possible to delete topics?

On Thu, Sep 15, 2016 at 5:43 AM, Ali Akhtar  wrote:

> Thank you  Martin
>
> On 15 Sep 2016 3:05 am, "Mathieu Fenniak" 
> wrote:
>
>> Hey Ali,
>>
>> If you have auto create turned on, which it sounds like you do, and you're
>> happy with using the broker's configured partition count and replication
>> factor, then you can call "partitionsFor(String topic)" on your producer.
>> This will create the topic without sending a message to it.  I'm not sure
>> it's 100% smart, but I found it to be better than the alternatives I could
>> find. :-)
>>
>> Mathieu
>>
>>
>> On Wed, Sep 14, 2016 at 3:41 PM, Ali Akhtar  wrote:
>>
>> > It looks like if I just send a message to a non-existent topic, it is
>> > created. But this has the downside that the first message of the topic
>> is
>> > null or otherwise invalid.
>> >
>> > Also the partition count can't be specified.
>> >
>> > Is there a way to create a topic without needing to post a null
>> message? Do
>> > I need to talk to the bash script?
>> >
>> > On Wed, Sep 14, 2016 at 8:45 AM, Ali Akhtar 
>> wrote:
>> >
>> > > Using version 0.10.0.1, how can I create kafka topics using the java
>> > > client / API?
>> > >
>> > > Stackoverflow answers describe using kafka.admin.AdminUtils, but this
>> > > class is not included in the kafka-clients maven dependency. I also
>> don't
>> > > see the package kafka.admin in the javadocs: http://kafka.apache.
>> > > org/0100/javadoc/index.html?org/apache/kafka/clients/produce
>> > > r/KafkaProducer.html
>> > >
>> > >
>> > > What am I missing?
>> > >
>> > >
>> >
>>
>


Kafka Source Connector - JSON with Escape double quotes

2016-09-14 Thread dhanuka ranasinghe
Hi,

I am using Kafka connect to feed data into kafka, please find sample code
snippet from Source Task. My question is , why kafka connect dematerialize
JSON message  by escaping double quotes? Is there way to get the same
without escaping?

byte[] message = null;
Charset charset = Charset.forName("UTF-8");
String strMsg = new String(message, charset);
log.info(strMsg);
SourceRecord record = new SourceRecord(partition, null, topic,
ConnectSchema.STRING_SCHEMA,strMsg);

>From kafka console consumer :

{"schema":{"type":"string","optional":false},"payload":"
{ \"index\" : { \"_index\" : \"test\", \"_type\" : \"rex\", \"_id\" : \"1\"
} }\n"}

>From log.info() : What I really expect

INFO { "index" : { "_index" : "test", "_type" : "rex", "_id" : "1" } }


Cheers,
Dhanuka
-- 
Nothing Impossible,Creativity is more important than knowledge.


Re: what's the relationship between Zookeeper and Kafka ?

2016-09-14 Thread Jaikiran Pai
In addition to what Michael noted, this question has been asked a few 
times before too and here's one such previous discussion 
https://www.quora.com/What-is-the-actual-role-of-ZooKeeper-in-Kafka


-Jaikiran

On Wednesday 14 September 2016 03:50 AM, Michael Noll wrote:

Eric,

the latest versions of Kafka use ZooKeeper only on the side of the Kafka
brokers, i.e. the servers in a Kafka cluster.

Background:
In older versions of Kafka, the Kafka consumer API required client
applications (that would read from data Kafka) to also talk to ZK.  Why
would they need to do that:  because ZK was used, in the old Kafka consumer
API, to track which data records they had already consumed, to rewind
reading from Kafka in case of failures like client machine crashes, and so
on.  In other words, consumption-related metadata was managed in ZK.
However, no "actual" data was ever routed through ZK.

The latest versions of Kafka have an improved consumer API that no longer
needs to talk to ZK -- any information that was previously maintained in ZK
(by these client apps) is now stored directly in Kafka.

Going back to your Spark programs:  They are using these older consumer API
versions of Kafka that still require talking to ZooKeeper, hence the need
to set things like "zoo1:2181".


Does the kafka data actually get routed out of zookeeper before delivering
the payload onto Spark ?

This was never the case (old API vs. new API).  Otherwise this would have
been a significant bottleneck. :-)  Data has always been served through the
Kafka brokers only.

Hope this helps,
Michael





On Sat, Sep 10, 2016 at 4:22 PM, Valerio Bruno  wrote:


AFAIK Kafka uses Zookeeper to coordinate the Kafka clusters ( set of
brokers ).

Consumers usually connect Zookeeper to retrieve the list of brokers. Then
connect the  broker.

*Valerio*

On 10 September 2016 at 22:11, Eric Ho  wrote:


I notice that some Spark programs would contact something like

'zoo1:2181'

when trying to suck data out of Kafka.

Does the kafka data actually get routed out of zookeeper before

delivering

the payload onto Spark ?



--

-eric ho




--
*Valerio Bruno*





*+39 3383163406+45 2991720...@valeriobruno.it fax: +39
1782275656skype: valerio_brunohttp://www.valeriobruno.it
*





Re: kafkaproducer send blocks until broker is available

2016-09-14 Thread Jaikiran Pai
This is a known issue and is being tracked in this JIRA 
https://issues.apache.org/jira/browse/KAFKA-3539


-Jaikiran
On Saturday 10 September 2016 12:20 AM, Peter Sinoros Szabo wrote:

Hi,

I'd like to use the Java Kafka producer in a non-blocking async mode.
My assuptions were that until the new message can fit into the producer's
memory, it will queue up those messages and send out once the broker is
available.
I tested a simple case when I am sending messages using
KafkaProducer.send(), but the kafka broker is not available yet (a.k.a the
broker starts later then the application).
I see that in this case the send() blocks, although the documentation says
that this method is async.
Is it possible to configure kafka in a way so that the the producer
bufferes the messages sent out until the broker gets available?

Regards,
Peter










Re: Too many open files

2016-09-14 Thread Jaikiran Pai

What does the output of:

lsof -p 

show on that specific node?

-Jaikiran

On Monday 12 September 2016 10:03 PM, Michael Sparr wrote:

5-node Kafka cluster, bare metal, Ubuntu 14.04.x LTS with 64GB RAM, 8-core, 
960GB SSD boxes and a single node in cluster is filling logs with the following:

[2016-09-12 09:34:49,522] ERROR Error while accepting connection 
(kafka.network.Acceptor)
java.io.IOException: Too many open files
at sun.nio.ch.ServerSocketChannelImpl.accept0(Native Method)
at 
sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:422)
at 
sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:250)
at kafka.network.Acceptor.accept(SocketServer.scala:323)
at kafka.network.Acceptor.run(SocketServer.scala:268)
at java.lang.Thread.run(Thread.java:745)

No other nodes in cluster have this issue. Separate application server has 
consumers/producers using librdkafka + confluent kafka python library with a 
few million messages published to under 100 topics.

For days now the /var/log/kafka/kafka.server.log.N are filling up server with this 
message and using up all space on only a single server node in cluster. I have 
soft/hard limits at 65,535 for all users so > ulimit -n reveals 65535

Is there a setting I should add from librdkafka config in the Python producer 
clients to shorten socket connections even further to avoid this or something 
else going on?

Should I write this as issue in Github repo and if so, which project?


Thanks!






Re: Too many open files

2016-09-14 Thread Jaikiran Pai

What does the output of:

lsof -p 

show?

-Jaikiran

On Monday 12 September 2016 10:03 PM, Michael Sparr wrote:

5-node Kafka cluster, bare metal, Ubuntu 14.04.x LTS with 64GB RAM, 8-core, 
960GB SSD boxes and a single node in cluster is filling logs with the following:

[2016-09-12 09:34:49,522] ERROR Error while accepting connection 
(kafka.network.Acceptor)
java.io.IOException: Too many open files
at sun.nio.ch.ServerSocketChannelImpl.accept0(Native Method)
at 
sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:422)
at 
sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:250)
at kafka.network.Acceptor.accept(SocketServer.scala:323)
at kafka.network.Acceptor.run(SocketServer.scala:268)
at java.lang.Thread.run(Thread.java:745)

No other nodes in cluster have this issue. Separate application server has 
consumers/producers using librdkafka + confluent kafka python library with a 
few million messages published to under 100 topics.

For days now the /var/log/kafka/kafka.server.log.N are filling up server with this 
message and using up all space on only a single server node in cluster. I have 
soft/hard limits at 65,535 for all users so > ulimit -n reveals 65535

Is there a setting I should add from librdkafka config in the Python producer 
clients to shorten socket connections even further to avoid this or something 
else going on?

Should I write this as issue in Github repo and if so, which project?


Thanks!






Re: Exception while deserializing in kafka streams

2016-09-14 Thread Walter rakoff
Guozhang,

I am using 0.10.0.0. Could the below log be the cause?

16/09/14 17:24:35 WARN ConsumerConfig: The configuration
schema.registry.url = http://192.168.50.6: 8081
was supplied but isn't a known config.
16/09/14 17:24:35 INFO AppInfoParser: Kafka version : 0.10.0.0
16/09/14 17:24:35 INFO AppInfoParser: Kafka commitId : b8642491e78c5a13

Walter


On Wed, Sep 14, 2016 at 8:11 PM, Guozhang Wang  wrote:

> Hello Walter,
>
> Which version of Kafka were you using?
>
> I ask this because there was a bug causing the serde passed through config
> to NOT being configured when constructed:
> https://issues.apache.org/jira/browse/KAFKA-3639
>
>
> Which is fixed in the 0.10.0.0 release, which means you will only hit it if
> you are using the tech-preview release version.
>
>
> Guozhang
>
>
> On Wed, Sep 14, 2016 at 11:10 AM, Walter rakoff 
> wrote:
>
> > Hello,
> >
> > I get the below exception when deserilaizing avro records
> > using KafkaAvroDeserializer.
> >
> > 16/09/14 17:24:39 INFO StreamThread: Stream thread shutdown complete
> > [StreamThread-1]
> > Exception in thread "StreamThread-1"
> > org.apache.kafka.common.errors.SerializationException:
> > Error deserializing Avro message for id 4
> > Caused by: java.lang.NullPointerException
> > at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.
> > deserialize(AbstractKafkaAvroDeserializer.java:120)
> > at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.
> > deserialize(AbstractKafkaAvroDeserializer.java:92)
> >
> > I can confirm that schema registry URL is accessible and
> url/schemas/ids/4
> > does return valid schema.
> > May be some initialization didn't happen correctly?
> >
> > props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "
> > 192.168.50.6:8081")
> > props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
> > Serdes.Long.getClass.getName)
> > props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, classOf[
> > GenericAvroSerdeWithSchemaRegistry])
> >
> > GenericAvroSerdeWithSchemaRegistry code --> https://www.dropbox.com/s/
> > y471k9nj94tlxro/avro_serde.txt?dl=0
> >
> > Walter
> >
>
>
>
> --
> -- Guozhang
>


Anyone running Kafka on Kubernetes in production?

2016-09-14 Thread Ali Akhtar
If so, can you please share if you're using a publicly available
deployment, or if you created you own, how you did it? (I.e which services
/ replication controllers you have)

Also, how has the performance been for you? I've read a report which said
the performance suffered running kafka as a docker container.


Re: How to create a topic using the Java API / Client?

2016-09-14 Thread Ali Akhtar
Thank you  Martin

On 15 Sep 2016 3:05 am, "Mathieu Fenniak" 
wrote:

> Hey Ali,
>
> If you have auto create turned on, which it sounds like you do, and you're
> happy with using the broker's configured partition count and replication
> factor, then you can call "partitionsFor(String topic)" on your producer.
> This will create the topic without sending a message to it.  I'm not sure
> it's 100% smart, but I found it to be better than the alternatives I could
> find. :-)
>
> Mathieu
>
>
> On Wed, Sep 14, 2016 at 3:41 PM, Ali Akhtar  wrote:
>
> > It looks like if I just send a message to a non-existent topic, it is
> > created. But this has the downside that the first message of the topic is
> > null or otherwise invalid.
> >
> > Also the partition count can't be specified.
> >
> > Is there a way to create a topic without needing to post a null message?
> Do
> > I need to talk to the bash script?
> >
> > On Wed, Sep 14, 2016 at 8:45 AM, Ali Akhtar 
> wrote:
> >
> > > Using version 0.10.0.1, how can I create kafka topics using the java
> > > client / API?
> > >
> > > Stackoverflow answers describe using kafka.admin.AdminUtils, but this
> > > class is not included in the kafka-clients maven dependency. I also
> don't
> > > see the package kafka.admin in the javadocs: http://kafka.apache.
> > > org/0100/javadoc/index.html?org/apache/kafka/clients/produce
> > > r/KafkaProducer.html
> > >
> > >
> > > What am I missing?
> > >
> > >
> >
>


Consumer stops after reaching an offset of 1644

2016-09-14 Thread kant kodali
Hi All,
I am trying to do a simple benchmark test  for Kafka using single broker,
producer and consumer however my consumer doesn't seem to receive all the
messages produced by the producer so not sure what is going on any help?
Here is the full description of the problem.
http://stackoverflow.com/questions/39500780/i-am-trying-to-benchmark-kafka-using-node-js-but-it-stops-working-after-certain

Thanks!kant

Re: Exception while deserializing in kafka streams

2016-09-14 Thread Guozhang Wang
Hello Walter,

Which version of Kafka were you using?

I ask this because there was a bug causing the serde passed through config
to NOT being configured when constructed:
https://issues.apache.org/jira/browse/KAFKA-3639


Which is fixed in the 0.10.0.0 release, which means you will only hit it if
you are using the tech-preview release version.


Guozhang


On Wed, Sep 14, 2016 at 11:10 AM, Walter rakoff 
wrote:

> Hello,
>
> I get the below exception when deserilaizing avro records
> using KafkaAvroDeserializer.
>
> 16/09/14 17:24:39 INFO StreamThread: Stream thread shutdown complete
> [StreamThread-1]
> Exception in thread "StreamThread-1"
> org.apache.kafka.common.errors.SerializationException:
> Error deserializing Avro message for id 4
> Caused by: java.lang.NullPointerException
> at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.
> deserialize(AbstractKafkaAvroDeserializer.java:120)
> at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.
> deserialize(AbstractKafkaAvroDeserializer.java:92)
>
> I can confirm that schema registry URL is accessible and url/schemas/ids/4
> does return valid schema.
> May be some initialization didn't happen correctly?
>
> props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "
> 192.168.50.6:8081")
> props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
> Serdes.Long.getClass.getName)
> props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, classOf[
> GenericAvroSerdeWithSchemaRegistry])
>
> GenericAvroSerdeWithSchemaRegistry code --> https://www.dropbox.com/s/
> y471k9nj94tlxro/avro_serde.txt?dl=0
>
> Walter
>



-- 
-- Guozhang


Kafka consumer group problem

2016-09-14 Thread Joyce Chen
Hi,

I created a few consumers that belong to the same group_id, but I noticed that 
each consumer get all messages instead of only some of the messages.

As for the topic, I did create the topic with a few partitions.

Anyone else had the same problem? Is there any configuration parameter I need 
to set in order for consumer groups to work?

Below the sample of the python code:
consumer = KafkaConsumer(topic, group_id, bootstrap_servers=["%s:%d" % 
(kafka_host, kafka_port)])


Thanks!
Joyce





Re: How to create a topic using the Java API / Client?

2016-09-14 Thread Mathieu Fenniak
Hey Ali,

If you have auto create turned on, which it sounds like you do, and you're
happy with using the broker's configured partition count and replication
factor, then you can call "partitionsFor(String topic)" on your producer.
This will create the topic without sending a message to it.  I'm not sure
it's 100% smart, but I found it to be better than the alternatives I could
find. :-)

Mathieu


On Wed, Sep 14, 2016 at 3:41 PM, Ali Akhtar  wrote:

> It looks like if I just send a message to a non-existent topic, it is
> created. But this has the downside that the first message of the topic is
> null or otherwise invalid.
>
> Also the partition count can't be specified.
>
> Is there a way to create a topic without needing to post a null message? Do
> I need to talk to the bash script?
>
> On Wed, Sep 14, 2016 at 8:45 AM, Ali Akhtar  wrote:
>
> > Using version 0.10.0.1, how can I create kafka topics using the java
> > client / API?
> >
> > Stackoverflow answers describe using kafka.admin.AdminUtils, but this
> > class is not included in the kafka-clients maven dependency. I also don't
> > see the package kafka.admin in the javadocs: http://kafka.apache.
> > org/0100/javadoc/index.html?org/apache/kafka/clients/produce
> > r/KafkaProducer.html
> >
> >
> > What am I missing?
> >
> >
>


Re: How to create a topic using the Java API / Client?

2016-09-14 Thread Ajay Sharma
You can make following setting to set partition count, etc

auto.create.topics.enable = true(enable auto creation of topic 
on the server)
num.partitions = 1  (number of log partitions per topic)


On 9/14/16, 2:41 PM, "Ali Akhtar"  wrote:

It looks like if I just send a message to a non-existent topic, it is
created. But this has the downside that the first message of the topic is
null or otherwise invalid.

Also the partition count can't be specified.

Is there a way to create a topic without needing to post a null message? Do
I need to talk to the bash script?

On Wed, Sep 14, 2016 at 8:45 AM, Ali Akhtar  wrote:

> Using version 0.10.0.1, how can I create kafka topics using the java
> client / API?
>
> Stackoverflow answers describe using kafka.admin.AdminUtils, but this
> class is not included in the kafka-clients maven dependency. I also don't
> see the package kafka.admin in the javadocs: http://kafka.apache.
> org/0100/javadoc/index.html?org/apache/kafka/clients/produce
> r/KafkaProducer.html
>
>
> What am I missing?
>
>




Re: How to create a topic using the Java API / Client?

2016-09-14 Thread Ali Akhtar
It looks like if I just send a message to a non-existent topic, it is
created. But this has the downside that the first message of the topic is
null or otherwise invalid.

Also the partition count can't be specified.

Is there a way to create a topic without needing to post a null message? Do
I need to talk to the bash script?

On Wed, Sep 14, 2016 at 8:45 AM, Ali Akhtar  wrote:

> Using version 0.10.0.1, how can I create kafka topics using the java
> client / API?
>
> Stackoverflow answers describe using kafka.admin.AdminUtils, but this
> class is not included in the kafka-clients maven dependency. I also don't
> see the package kafka.admin in the javadocs: http://kafka.apache.
> org/0100/javadoc/index.html?org/apache/kafka/clients/produce
> r/KafkaProducer.html
>
>
> What am I missing?
>
>


Re: MockClientSupplier

2016-09-14 Thread Guozhang Wang
Hello Andy,

You can just call KafkaStreams.toString(). I'm copying its java doc string
here:

"Produces a string representation contain useful information about Kafka
Streams

* Such as thread IDs, task IDs and a representation of the topology.
This is useful

* in debugging scenarios."


Guozhang


On Mon, Sep 12, 2016 at 2:56 AM, Andy Chambers 
wrote:

> Thanks Guozhang,
>
> Actually on inspection, the MockClientSupplier itself seems easy enough to
> replicate. But now I have a question about the MockConsumer. I expected to
> be able to use the MockConsumer to simulate input events but when I try to
> add a record, I get:
>
>Cannot add records for a partition that is not assigned to the
>consumer
>
> I'd have thought the app under test would have caused the mock consumer to
> subscribe to the input topic. I tried to verify this without even producing
> a record by checking the "subscription" method of the mock consumer but
> that returns an empty set even after the app streams have been started.
>
> Is there any way to inspect the topology of a streams app? I think that's
> what I'd really like to test. The transformations/aggregations themselves
> are obviously easier to test independently of kafka. It might be nice to
> just check they are all glued together correctly.
>
> Cheers,
> Andy
>
>
>
> On Sun, Sep 11, 2016 at 8:56 PM, Guozhang Wang  wrote:
>
> > Hello Andy,
> >
> > Unfortunately the included package `o.a.k.test` is not included in the
> > released maven.
> >
> > There are some discussions about moving these test fixtures into public
> > packages:
> >
> > https://issues.apache.org/jira/browse/KAFKA-3625
> >
> > Guozhang
> >
> > On Sat, Sep 10, 2016 at 2:01 PM, Andy Chambers  >
> > wrote:
> >
> > > Hi,
> > >
> > > The MockClientSupplier looks like it would be useful for developers
> > wishing
> > > to write unit tests for kafka streams apps. Is it public? If so, can
> > > someone help me out with the maven coordinates. Currently depending on
> > > these maven coordinates
> > >
> > >   [org.apache.kafka/kafka-streams "0.10.0.1"]
> > >   [org.apache.kafka/kafka-clients "0.10.0.1"]
> > >   [org.apache.kafka/kafka_2.11 "0.10.0.1"]
> > >
> > > but none seem to include the class org.apache.kafka.test.
> > > MockClientSupplier
> > >
> > > Cheers,
> > > Andy
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>



-- 
-- Guozhang


Re: Unexpected KStream-KStream join behavior with asymmetric time window

2016-09-14 Thread Guozhang Wang
Hello Elias,

Thanks for reporting! I will follow-up with you on the created JIRA ticket.

Guozhang

On Mon, Sep 12, 2016 at 4:01 PM, Elias Levy 
wrote:

> https://issues.apache.org/jira/browse/KAFKA-4153
> https://github.com/apache/kafka/pull/1846
>
> On Mon, Sep 12, 2016 at 7:00 AM, Elias Levy 
> wrote:
>
> > Any ideas?
> >
> >
> > On Sunday, September 11, 2016, Elias Levy 
> > wrote:
> >
> >> Using Kafka 0.10.0.1, I am joining records in two streams separated by
> >> some time, but only when records from one stream are newer than records
> >> from the other.
> >>
> >> I.e. I am doing:
> >>
> >>   stream1.join(stream2, valueJoiner, JoinWindows.of("X").after(1))
> >>
> >> I would expect that the following would be equivalent:
> >>
> >>   stream2.join(stream1, valueJoiner, JoinWindows.of("X").before(1))
> >>
> >> Alas, I find that this is not the case.  To generate the same output as
> >> the first example I must do:
> >>
> >>   stream2.join(stream1, valueJoiner, JoinWindows.of("X").after(1))
> >>
> >> What am I missing?
> >>
> >>
> >>
>



-- 
-- Guozhang


Exception while deserializing in kafka streams

2016-09-14 Thread Walter rakoff
Hello,

I get the below exception when deserilaizing avro records
using KafkaAvroDeserializer.

16/09/14 17:24:39 INFO StreamThread: Stream thread shutdown complete
[StreamThread-1]
Exception in thread "StreamThread-1"
org.apache.kafka.common.errors.SerializationException:
Error deserializing Avro message for id 4
Caused by: java.lang.NullPointerException
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.
deserialize(AbstractKafkaAvroDeserializer.java:120)
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.
deserialize(AbstractKafkaAvroDeserializer.java:92)

I can confirm that schema registry URL is accessible and url/schemas/ids/4
does return valid schema.
May be some initialization didn't happen correctly?

props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "
192.168.50.6:8081")
props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
Serdes.Long.getClass.getName)
props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, classOf[
GenericAvroSerdeWithSchemaRegistry])

GenericAvroSerdeWithSchemaRegistry code --> https://www.dropbox.com/s/
y471k9nj94tlxro/avro_serde.txt?dl=0

Walter


Re: KAFKA-3933: Kafka OOM During Log Recovery Due to Leaked Native Memory

2016-09-14 Thread Ismael Juma
Here are the links:

https://github.com/apache/kafka/commit/8a417c89d2f0b7861b2dec26f02e4e302b64b604
(trunk)
https://github.com/apache/kafka/commit/c47c3b0b583a849fdf3ed0a06835427a2801950a
(0.10.0)

Ismael

On Wed, Sep 14, 2016 at 4:38 PM, feifei hsu  wrote:

> Hi, Ismael
>So many thanks for the quick reply
>I checked the trunk tree at github, I did not see the merge.
>did I make some mistake? sorry for that.
>
> for example, the pull 1598. file related to the LogSegment.scala. the one
> of the PR add try catch to close the leaking resource. but I did not see
> the code is at trunk. :-(
>  https://github.com/heroku/kafka/blob/trunk/core/src/main/scala/kafka/log/
> LogSegment.scala#L189
> https://github.com/apache/kafka/pull/1598/files#r70071062
>
>
> On Wed, Sep 14, 2016 at 5:06 AM, Ismael Juma  wrote:
>
>> Hi,
>>
>> We did merge the PR to trunk and 0.10.0.
>>
>> Ismael
>>
>> On Wed, Sep 14, 2016 at 9:21 AM, feifei hsu  wrote:
>>
>>> Hi Tom and Ismael.
>>>I am following the  kafka-3933. the memory leak. but I did not see
>>> the pr #1598 #1614 #1660 are merged into the trunk.
>>> Do you know what the current status?
>>>   So many thanks.
>>>We are also thinking backport it to 0.9.0.1
>>>
>>> --easy
>>>
>>
>>
>


Re: KAFKA-3933: Kafka OOM During Log Recovery Due to Leaked Native Memory

2016-09-14 Thread feifei hsu
Hi, Ismael
   So many thanks for the quick reply
   I checked the trunk tree at github, I did not see the merge.
   did I make some mistake? sorry for that.

for example, the pull 1598. file related to the LogSegment.scala. the one
of the PR add try catch to close the leaking resource. but I did not see
the code is at trunk. :-(

https://github.com/heroku/kafka/blob/trunk/core/src/main/scala/kafka/log/LogSegment.scala#L189
https://github.com/apache/kafka/pull/1598/files#r70071062


On Wed, Sep 14, 2016 at 5:06 AM, Ismael Juma  wrote:

> Hi,
>
> We did merge the PR to trunk and 0.10.0.
>
> Ismael
>
> On Wed, Sep 14, 2016 at 9:21 AM, feifei hsu  wrote:
>
>> Hi Tom and Ismael.
>>I am following the  kafka-3933. the memory leak. but I did not see the
>> pr #1598 #1614 #1660 are merged into the trunk.
>> Do you know what the current status?
>>   So many thanks.
>>We are also thinking backport it to 0.9.0.1
>>
>> --easy
>>
>
>


Re: ZooKeeper client > 3.4.6

2016-09-14 Thread Tommy Becker

We upgraded to 3.4.8 for this exact reason and have not had any issues.

On 09/14/2016 09:45 AM, Bill de hÓra wrote:
Hi,

I was wondering if anyone has experience (good or bad!) running Kafka with a 
ZooKeeper client > 3.4.6.

The reason to do this is to avoid having to bounce brokers when ZooKeeper node 
IPs change; it seems the 3.4.6 client caches IP addresses indefinitely but this 
may have be fixed in later versions.

(I noticed that in 
https://github.com/apache/kafka/commit/4c6d7ed95a05e1f08e820305afbab983822e8b82,
 ZooKeeper was bumped to 3.4.8, but the latest 0.10.0.1 still embeds 3.4.6.)

Bill

--
Tommy Becker
Senior Software Engineer

Digitalsmiths
A TiVo Company

www.digitalsmiths.com
tobec...@tivo.com



This email and any attachments may contain confidential and privileged material 
for the sole use of the intended recipient. Any review, copying, or 
distribution of this email (or any attachments) by others is prohibited. If you 
are not the intended recipient, please contact the sender immediately and 
permanently delete this email and any attachments. No employee or agent of TiVo 
Inc. is authorized to conclude any binding agreement on behalf of TiVo Inc. by 
email. Binding agreements with TiVo Inc. may only be made by a signed written 
agreement.


ZooKeeper client > 3.4.6

2016-09-14 Thread Bill de hÓra

Hi,

I was wondering if anyone has experience (good or bad!) running Kafka 
with a ZooKeeper client > 3.4.6.


The reason to do this is to avoid having to bounce brokers when 
ZooKeeper node IPs change; it seems the 3.4.6 client caches IP addresses 
indefinitely but this may have be fixed in later versions.


(I noticed that in 
https://github.com/apache/kafka/commit/4c6d7ed95a05e1f08e820305afbab983822e8b82, 
ZooKeeper was bumped to 3.4.8, but the latest 0.10.0.1 still embeds 3.4.6.)


Bill


RE: handling generics(solved) but new problem unearthed

2016-09-14 Thread Martin Gainty
the end goal was to power thru the kafka parsing/compilation to then see if all 
kafka java classes would actually compile

I found this  bug where kafka is attempting implement joptsimple 
kafka.tools.StreamsResetter.java fails the build

https://www.codatlas.com/github.com/apache/kafka/trunk/core/src/main/scala/kafka/tools/StreamsResetter.java

for some reason These errors were obfuscated by gradle so I implemented a 
pom.xml in core module which I am executing via

kafka/core>mvn -e -X -o package 

to view this error:

https://issues.apache.org/jira/browse/KAFKA-4170

Thanks Dean (and Matthieu)
Martin
__ 



> From: deanwamp...@gmail.com
> Date: Thu, 8 Sep 2016 21:25:19 -0500
> Subject: Re: handling generics in Kafka Scala
> To: mgai...@hotmail.com
> CC: mathieu.fenn...@replicon.com; users@kafka.apache.org; 
> dean.wamp...@gmail.com
> 
> Ah, yes. Scala interprets Java collection types, like ArrayList[T], as
> *invariant*, which means that you can't use a declared ArrayList[String]
> where you expect an ArrayList[Any], which would be an example of *covariance.
> *(This is due to Java's flawed way of declaring generics, where the person
> *declaring* the collection doesn't have the ability to specify variance
> behavior. The *user* decides if a declaration is invariant, covariant, or
> contravariant.)
> 
> Example using the Scala interpreter:
> 
> scala> import java.util.ArrayList
> import java.util.ArrayList
> 
> scala> var ala: ArrayList[Any] = null   // var so we can
> attempt assignments.
> ala: java.util.ArrayList[Any] = null
> 
> scala> ala = new ArrayList[String]()   // rejected. ala must be
> assigned an ArrayList[Any]
> :12: error: type mismatch;
>  found   : java.util.ArrayList[String]
>  required: java.util.ArrayList[Any]
> Note: String <: Any, but Java-defined class ArrayList is invariant in type
> E.
> You may wish to investigate a wildcard type such as `_ <: Any`. (SLS 3.2.10)
>ala = new ArrayList[String]()
>  ^
> 
> scala> val als = new ArrayList[String]()  // it doesn't work to
> declare an ArrayList[String] then try to assign to ala
> als: java.util.ArrayList[String] = []
> 
> scala> ala = als
> :13: error: type mismatch;
>  found   : java.util.ArrayList[String]
>  required: java.util.ArrayList[Any]
> Note: String <: Any, but Java-defined class ArrayList is invariant in type
> E.
> You may wish to investigate a wildcard type such as `_ <: Any`. (SLS 3.2.10)
>ala = als
>  ^
> 
> scala> ala = new ArrayList[Any]()   // can only assign an
> ArrayList[Any] instance.
> ala: java.util.ArrayList[Any] = []
> 
> // *** BUT YOU CAN ASSIGN STRINGS, INTS, ETC. TO THE ARRAY
> ITSELF 
> 
> scala> ala.add(1)
> res3: Boolean = true
> 
> scala> ala.add("two")
> res4: Boolean = true
> 
> scala> ala.add(3.3)
> res5: Boolean = true
> 
> scala> ala
> res6: java.util.ArrayList[Any] = [1, two, 3.3]
> 
> Note that the type of ala hasn't changed.
> 
> Contrast with Scala collections like Seq (List), which is covariant:
> 
> scala> var sa: Seq[Any] = null
> sa: Seq[Any] = null
> 
> scala> sa = List.empty[String]
> sa: Seq[Any] = List()
> 
> scala> sa = List(1, "two", 3.3)
> sa: Seq[Any] = List(1, two, 3.3)
> 
> scala> val list = List(1, "two", 3.3)
> list: List[Any] = List(1, two, 3.3)
> 
> scala> sa = list
> sa: Seq[Any] = List(1, two, 3.3)
> 
> Note that the type of "sa" doesn't change, even when it's actually pointing
> to a subclass that's covariant.
> 
> 
> For completeness, there's also *contravariance; *if Foo[T] is
> contravariant, then a Foo[String] would be a superclass of Foo[Any]. This
> is less common and harder to understand, but an important example is the
> argument types for the function types, like Function2[-Arg1, -Arg2,
> +Return], e.g.,
> 
> scala> var fisa: Function2[Int,String,Any] = null  // a function that
> takes Int and String args and returns an Any
> fisa: (Int, String) => Any = null
> 
> // A function that takes two Any arguments (note that Any is a supertype of
> both Int and String), and returns String
> 
> scala> val faas: Function2[Any,Any,String] = (a1:Any, a2:Any) =>
> a1.toString + a2.toString
> faas: (Any, Any) => String = 
> 
> scala> fisa = faas  // MIND
> BLOWN!!
> fisa: (Int, String) => Any = 
> 
> scala> fisa(1, "two")
> res7: Any = 1two
> 
> Why does this work. The Liskov Substitution Principle is really the
> correct, technical definition of what we've called the "is a" relationship
> in OOP. Fisa defines a contract; what ever function you actually use here,
> it must be able to accept an Int and String, and it must guarantee to
> return an Any (that's easy). Faas satisfies this contract. It can not only
> accept an Int and String, it can accept two Anys. That is, it is more
> tolerant to what you pass it. It returns a String only, but that's okay
> because the 

Re: KAFKA-3933: Kafka OOM During Log Recovery Due to Leaked Native Memory

2016-09-14 Thread Ismael Juma
Hi,

We did merge the PR to trunk and 0.10.0.

Ismael

On Wed, Sep 14, 2016 at 9:21 AM, feifei hsu  wrote:

> Hi Tom and Ismael.
>I am following the  kafka-3933. the memory leak. but I did not see the
> pr #1598 #1614 #1660 are merged into the trunk.
> Do you know what the current status?
>   So many thanks.
>We are also thinking backport it to 0.9.0.1
>
> --easy
>


KAFKA-3933: Kafka OOM During Log Recovery Due to Leaked Native Memory

2016-09-14 Thread feifei hsu
Hi Tom and Ismael.
   I am following the  kafka-3933. the memory leak. but I did not see the
pr #1598 #1614 #1660 are merged into the trunk.
Do you know what the current status?
  So many thanks.
   We are also thinking backport it to 0.9.0.1

--easy


Re: Question regarding functionality of MirrorMaker

2016-09-14 Thread UMESH CHAUDHARY
Hello cs,
Apologies for delayed response.
I found one topic in my Kafka env which has no leaders and no replicas.
That was pretty weird and I am not sure what caused this.

Because of this topic MirrorMaker was hanging and printing messages like
"No leader found for topic ..". Due to this hang MM was not able to
replicate other topics.

When I deleted that zombie topic, MM worked as expected.

Regards,
Umesh Chaudhary

On Fri, 26 Aug 2016 at 12:40 cs user  wrote:

> Hi Umesh,
>
> I haven't had that problem, it seems to work fine for me. The only issue I
> found, which kind of makes sense, it that it doesn't mirror existing topics
> immediately, only when messages are first set to the topic after mirror
> maker connects. It doesn't start from the first offset available, only the
> current one.
>
> However once you start sending messages it seems to subscribe to them fine
> and they get created on the mirror maker cluster, same for new topics which
> are created on the source cluster, they seem to come over fine.
>
> Only thing I can think of is that you have disabled auto topic creation on
> the mirror maker cluster so that mirror maker is unable to create them
> automatically? But then it wouldn't be able to create the existing topics
> either so that doesn't make sense.
>
> Are there any error messages in your mirror maker logs or on the mirror
> maker cluster which point to what the issue might be?
>
> Other than the boostrap servers, my producer settings look as follows:
>
> producer.type=async
> compression.codec=0
> serializer.class=kafka.serializer.DefaultEncoder
> max.message.size=1000
> queue.time=1000
> queue.enqueueTimeout.ms=-1
>
>
>
> Cheers!
>
>
>
>
> On Fri, Aug 26, 2016 at 6:08 AM, UMESH CHAUDHARY 
> wrote:
>
> > Hello Mate,
> > Thanks for your detailed response and it surely helps.
> >
> > WhiteList is the required config for MM from 0.9.0 onwards. And you are
> > correct that --new-consumer requires --bootstrap-servers rather than
> > --zookeeper .
> >
> > However, did you notice that MM picks the topics which are present at the
> > time of its startup and mirrors the data. When you add some new topics
> > after its startup it doesn't pick it automatically?
> >
> > Regards,
> > Umesh Chaudhary
> >
> > On Thu, 25 Aug 2016 at 19:23 cs user  wrote:
> >
> > > Hi Umesh,
> > >
> > > I am new to kafka as well, and configuring the MirrorMaker. I got mine
> > > working in the following way.
> > >
> > > I run the mirror maker instance on the mirror cluster, as in where you
> > want
> > > to mirror the topics to, although I'm not sure it matters.
> > >
> > > I use the following options when starting my service (systemd file):
> > >
> > > KAFKA_RUN="/opt/kafka/bin/kafka-run-class.sh"
> > > KAFKA_ARGS="kafka.tools.MirrorMaker"
> > > KAFKA_CONFIG="--new.consumer --offset.commit.interval.ms=5000
> > > --consumer.config /opt/kafka/config/consumer-mirror1.properties
> > > --producer.config /opt/kafka/config/producer.properties
> > --whitelist=\".*\""
> > >
> > > Without the --new.consumer parameter, the --consumer.config and
> > > producer.config files need to contain the zookeeper config for relevant
> > > clusters. When using the --new.consumer switch this is no longer needed
> > (as
> > > I understand it).
> > >
> > > The consumer config points at my source cluster, the producer config
> > points
> > > locally, to my mirror cluster. I think it's also important to configure
> > the
> > > whitelist to tell it which topics you want to mirror, in my case I
> mirror
> > > all of them with a wildcard.
> > >
> > > Not much config in the consumer.config and producer.config files apart
> > from
> > > the bootstrap.servers list, pointing at the relevant cluster. I have 3
> > > brokers in my mirror cluster and each one of them runs the same mirror
> > > maker service so one will take over if another one fails.
> > >
> > > I hope someone will correct me if I am wrong about anything, and
> > hopefully
> > > this will help!
> > >
> > > Cheers!
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > > On Thu, Aug 25, 2016 at 9:36 AM, UMESH CHAUDHARY 
> > > wrote:
> > >
> > > > Hey Folks,
> > > > I was trying to understand the behavior of MirrorMaker but looks
> like I
> > > am
> > > > missing something here. Please see the steps which I performed :
> > > >
> > > > 1) I configured MM on source Kafka cluster
> > > > 2) Created a topic and pushed some data in it using console producer.
> > > > 3) My understanding is that MM would start mirroring the data (which
> is
> > > > there in the topic) based on "offsetCommitIntervalMs" and it would be
> > > there
> > > > in destination cluster.
> > > >
> > > > https://github.com/apache/kafka/blob/0.9.0/core/src/
> > > > main/scala/kafka/tools/MirrorMaker.scala#L503
> > > >
> > > > 4) But when I list the topics on destination, I cant see the topic
> > which
> > > I
> > > > recently created on source.