Re: Kafka Streams : CommitFailedException

2017-11-07 Thread Tony John
Hi Guozang,

Thanks for looking into this. I was using 0.11.0.0 version of the library
earlier when I was getting the CommitFailed exception and the tasks were
terminating. The application config then was Replication Factor = 2, Num
Stream Threads = 1, Consumer Max Poll Records = 1000 & Consumer Max Poll
Interval = 2147483647. The streams config code (*Streams Config While Using
0.11.0.0*) is given below and the logs of the application while using
0.11.0.0 can be downloaded from
https://www.dropbox.com/s/hx1e5mknf9gx5z0/commit_failed_error.log?dl=0

I have upgraded the libraries to 0.11.0.1 and ran into some other issues.
Though the CommitFailed error logs are still showing up with 0.11.0.1 the
tasks are not getting terminated, but the app quickly runs out of memory
(GC overhead limit exceeded) and the CPU is choked, which was not the case
earlier. The logs are available @
https://www.dropbox.com/s/x6oehtuoqrwjj0i/oom_gc_overhead.log?dl=0 and
application config is also given below (*Streams Config While Using
0.11.0.1*).  Since I am not sure what configuration helps reduce the
CommitFailed error, which I think could be one of the reasons for the CPU
choke and eventually cause an OOM, I have gone ahead and used all possible
configuration parameters, but still no luck.

It would be great if you could shed some light on this as to what could be
causing this problem.

*Streams Config While Using 0.11.0.0 *

val props = Properties()
props.put(StreamsConfig.APPLICATION_ID_CONFIG, EngineConfig.APPLICATION_ID)
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, EngineConfig.KAFKA_SERVERS)
props.put(StreamsConfig.STATE_DIR_CONFIG, EngineConfig.STATE_STORE_DIR)
props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 2)
props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1)
props.put(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, "INFO")
props.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG),
1000)
props.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG),
Int.MAX_VALUE)

streams = KafkaStreams(builder, StreamsConfig(props))
streams.start()


*Streams Config While Using 0.11.0.1*

val props = Properties()
props.put(StreamsConfig.APPLICATION_ID_CONFIG, EngineConfig.APPLICATION_ID)
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, EngineConfig.KAFKA_SERVERS)
props.put(StreamsConfig.STATE_DIR_CONFIG, EngineConfig.STATE_STORE_DIR)
props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 2)
props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2)

props.put(StreamsConfig.consumerPrefix(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG),
1)
props.put(StreamsConfig.consumerPrefix(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG),
1)
props.put(StreamsConfig.consumerPrefix(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG),
true)
props.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG),
1)
props.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG),
Int.MAX_VALUE)
props.put(StreamsConfig.consumerPrefix(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG),
3)

streams = KafkaStreams(builder, StreamsConfig(props))
streams.start()


Thanks,
Tony

On Thu, Nov 2, 2017 at 4:39 PM, Tony John  wrote:

> Hi All,
>
> I am facing CommitFailedException in my streams application. As per the
> log I tried changing the max.poll.interval.ms and max.poll.records. But
> both didn't help. PFA the full stack trace of the exception and below is
> the streams configuration used. What else could be wrong?
>
> val props = Properties()
> props.put(StreamsConfig.APPLICATION_ID_CONFIG, EngineConfig.APPLICATION_ID)
> props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, EngineConfig.KAFKA_SERVERS)
> props.put(StreamsConfig.STATE_DIR_CONFIG, EngineConfig.STATE_STORE_DIR)
> props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 2)
> props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1)
> props.put(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, "INFO")
> props.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG),
>  1000)
> props.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG),
>  Int.MAX_VALUE)
>
> streams = KafkaStreams(builder, StreamsConfig(props))
> streams.start()
>
>
> Thanks,
> Tony
>


JOB | Permanent Sysadmin (Amsterdam, Netherlands OR remote)

2017-11-07 Thread James Tobin
Hello, I'm working with an employer that is looking to hire a
permanent sysadmin to help maintain their Hadoop and Kafka clusters at
their Amsterdam, Netherlands office OR remote.  Consequently I had
hoped that some members of this mailing list may like to discuss
further off-list using "JamesBTobin (at) Gmail (dot) Com".   Kind
regards, James


java.lang.OutOfMemoryError memory leak on 1.0.0 with 0.11.0.1 on disk and converting to 0.9 clients

2017-11-07 Thread Brett Rann
https://issues.apache.org/jira/browse/KAFKA-6185

This was also reported by another person, Ben, attached to the end of this
ticket https://issues.apache.org/jira/browse/KAFKA-6042 which is a broker
lockup/FD issue, but a new ticket was requested. It was also suggested by
Ismael Juma there to discuss it on the mail list, so starting a thread here.

We are testing 1.0.0 in a couple of environments.
Both have about 5 brokers, with two 1.0.0 brokers and the rest 0.11.0.1
brokers.
One is using on disk message format 0.9.0.1, the other 0.11.0.1
we have 0.9, 0.10, and 0.11 clients connecting.

The cluster on the 0.9.0.1 format is running fine for a week.

But the cluster on the 0.11.0.1 format is consistently having memory
issues. (prior to this it was running all 0.11.0.1 binary and disk message
format for many weeks translating backwards with no issues)

The first occurrence of the error comes along with this stack trace

{"timestamp":"2017-11-06
14:22:32,402","level":"ERROR","logger":"kafka.server.KafkaApis","thread":"kafka-request-handler-7","message":"[KafkaApi-1]
Error when handling request
{replica_id=-1,max_wait_time=500,min_bytes=1,topics=[{topic=maxwell.users,partitions=[{partition=0,fetch_offset=227537,max_bytes=1100},{partition=4,fetch_offset=354468,max_bytes=1100},{partition=5,fetch_offset=266524,max_bytes=1100},{partition=8,fetch_offset=324562,max_bytes=1100},{partition=10,fetch_offset=292931,max_bytes=1100},{partition=12,fetch_offset=325718,max_bytes=1100},{partition=15,fetch_offset=229036,max_bytes=1100}]}]}"}
java.lang.OutOfMemoryError: Java heap space
at java.nio.HeapByteBuffer.(HeapByteBuffer.java:57)
at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
at 
org.apache.kafka.common.record.AbstractRecords.downConvert(AbstractRecords.java:101)
at 
org.apache.kafka.common.record.FileRecords.downConvert(FileRecords.java:253)
at 
kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1$$anonfun$apply$4.apply(KafkaApis.scala:520)
at 
kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1$$anonfun$apply$4.apply(KafkaApis.scala:518)
at scala.Option.map(Option.scala:146)
at 
kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1.apply(KafkaApis.scala:518)
at 
kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1.apply(KafkaApis.scala:508)
at scala.Option.flatMap(Option.scala:171)
at 
kafka.server.KafkaApis.kafka$server$KafkaApis$$convertedPartitionData$1(KafkaApis.scala:508)
at 
kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$createResponse$2$1.apply(KafkaApis.scala:556)
at 
kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$createResponse$2$1.apply(KafkaApis.scala:555)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at 
kafka.server.KafkaApis.kafka$server$KafkaApis$$createResponse$2(KafkaApis.scala:555)
at 
kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$fetchResponseCallback$1$1.apply(KafkaApis.scala:569)
at 
kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$fetchResponseCallback$1$1.apply(KafkaApis.scala:569)
at 
kafka.server.KafkaApis$$anonfun$sendResponseMaybeThrottle$1.apply$mcVI$sp(KafkaApis.scala:2034)
at 
kafka.server.ClientRequestQuotaManager.maybeRecordAndThrottle(ClientRequestQuotaManager.scala:52)
at 
kafka.server.KafkaApis.sendResponseMaybeThrottle(KafkaApis.scala:2033)
at 
kafka.server.KafkaApis.kafka$server$KafkaApis$$fetchResponseCallback$1(KafkaApis.scala:569)
at 
kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$processResponseCallback$1$1.apply$mcVI$sp(KafkaApis.scala:588)
at 
kafka.server.ClientQuotaManager.maybeRecordAndThrottle(ClientQuotaManager.scala:175)
at 
kafka.server.KafkaApis.kafka$server$KafkaApis$$processResponseCallback$1(KafkaApis.scala:587)
at 
kafka.server.KafkaApis$$anonfun$handleFetchRequest$3.apply(KafkaApis.scala:604)
at 
kafka.server.KafkaApis$$anonfun$handleFetchRequest$3.apply(KafkaApis.scala:604)
at kafka.server.ReplicaManager.fetchMessages(ReplicaManager.scala:820)
at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:596)
at kafka.server.KafkaApis.handle(KafkaApis.scala:100)

And then after a few of those it settles into this kind of pattern

{"timestamp":"2017-11-06
15:06:48,114","level":"ERROR","logger":"kafka.server.KafkaApis","thread":"kafka-request-handler-1","message":"[KafkaApi-1]
Error when handling request

Re: [ANNOUNCE] New committer: Onur Karaman

2017-11-07 Thread UMESH CHAUDHARY
Congratulations Onur!

On Tue, 7 Nov 2017 at 21:44 Jun Rao  wrote:

> Affan,
>
> All known problems in the controller are described in the doc linked from
> https://issues.apache.org/jira/browse/KAFKA-5027.
>
> Thanks,
>
> Jun
>
> On Mon, Nov 6, 2017 at 11:00 PM, Affan Syed  wrote:
>
> > Congrats Onur,
> >
> > Can you also share the document where all known problems are listed; I am
> > assuming these bugs are still valid for the current stable release.
> >
> > Affan
> >
> > - Affan
> >
> > On Mon, Nov 6, 2017 at 10:24 PM, Jun Rao  wrote:
> >
> > > Hi, everyone,
> > >
> > > The PMC of Apache Kafka is pleased to announce a new Kafka committer
> Onur
> > > Karaman.
> > >
> > > Onur's most significant work is the improvement of Kafka controller,
> > which
> > > is the brain of a Kafka cluster. Over time, we have accumulated quite a
> > few
> > > correctness and performance issues in the controller. There have been
> > > attempts to fix controller issues in isolation, which would make the
> code
> > > base more complicated without a clear path of solving all problems.
> Onur
> > is
> > > the one who took a holistic approach, by first documenting all known
> > > issues, writing down a new design, coming up with a plan to deliver the
> > > changes in phases and executing on it. At this point, Onur has
> completed
> > > the two most important phases: making the controller single threaded
> and
> > > changing the controller to use the async ZK api. The former fixed
> > multiple
> > > deadlocks and race conditions. The latter significantly improved the
> > > performance when there are many partitions. Experimental results show
> > that
> > > Onur's work reduced the controlled shutdown time by a factor of 100
> times
> > > and the controller failover time by a factor of 3 times.
> > >
> > > Congratulations, Onur!
> > >
> > > Thanks,
> > >
> > > Jun (on behalf of the Apache Kafka PMC)
> > >
> >
>


Re: GlobalKTable never finishes restoring

2017-11-07 Thread Matthias J. Sax
Did you delete and recreate the topic of the GlobalStore?

I did have a look into the code, too, and think there is a bug in
`GlobalStateManagerImpl#restoreState()`. I did some initial
investigation using an existing test, and the test passed without
transactional data but fails if the global store data is written using
transactions.

Note: if transactions are available, commit markers will take "one spot"
in the partitions. Currently, we break the loop using consumer record offset

> offset = record.offset() + 1;

but I think, if there is a commit marker, the highWatermark is one
offset larger and thus this loop never terminates. We would need to
update the offset using consumer position instead that should step over
the commit marker correctly.

Will look into this in more detail tomorrow. Would still be valuable, if
you could verify my suspicion.

Thanks!

-Matthias

On 11/7/17 7:01 PM, Alex Good wrote:
> Disabling transactions doesn't seem to have changed anything. I've had a
> read through the kafka streams source code, specifically the parts relating
> to the restoration of the global stores and I can't see anything obvious I
> should look at.
> 
> @Ted will do, here's a pastebin of the most recent run
> https://pastebin.com/rw2WbFyt
> 
> Thanks
> Alex
> 
> On Tue, Nov 7, 2017 at 5:12 PM Ted Yu  wrote:
> 
>> Alex:
>> In the future, please use pastebin if the log is not too large.
>>
>> When people find this thread in mailing list archive, the attachment
>> wouldn't be there.
>>
>> Thanks
>>
>> On Tue, Nov 7, 2017 at 8:32 AM, Matthias J. Sax 
>> wrote:
>>
>>> Alex,
>>>
>>> I am not sure, but maybe it's a bug. I noticed that you read transaction
>>> data. Can you try to write to the topic without using transactions
>>> and/or set the consumer into READ_UNCOMMITTED mode to verify? It only a
>>> guess that it might be related to transactions and it would be great to
>>> verify or rule it out.
>>>
>>> Thanks a lot!
>>>
>>> -Matthias
>>>
>>>
>>> On 11/7/17 3:15 PM, Alex Good wrote:
 Hi All,

 I have a simple kafka streams app that seems to hang when restoring
 state for a GlobalKTable. We're running in a test environment at the
 moment and the topic it is loading from only has two messages in it, I
 don't know if the very low volume of messages would affect the restore?

 I've attached a log, the topic in question is invoices-state. As you
>> can
 see the GlobalStreamThread appears to load the two messages in the
>> topic
 and then continues to send read requests to the topic despite having
 caught up. Any tips on debugging this would be very welcome.

 Thanks
 Alex
>>>
>>>
>>
> 



signature.asc
Description: OpenPGP digital signature


Re: Kafka Streams : CommitFailedException

2017-11-07 Thread Guozhang Wang
Hello Tony,

You mentioned in 0.11.0.0 the
props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG,
1) while in 0.11.0.1 props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2).
But from your logs it seems you set this config as 2 in both versions.
Right?

Anyways, I took a look into your logs and I think you are hitting a known
issue (https://issues.apache.org/jira/browse/KAFKA-5152) that has been
fixed in 0.11.0.0; that is why you only see the WARN log entry in 0.11.0.1
but the app is not dying out. The running out of memory issues seems not
related to the CommitFailed error. Do you have any stateful operations in
your app that use an iterator? Did you close the iterator after complete
using it?


Guozhang


On Tue, Nov 7, 2017 at 12:42 AM, Tony John  wrote:

> Hi Guozang,
>
> Thanks for looking into this. I was using 0.11.0.0 version of the library
> earlier when I was getting the CommitFailed exception and the tasks were
> terminating. The application config then was Replication Factor = 2, Num
> Stream Threads = 1, Consumer Max Poll Records = 1000 & Consumer Max Poll
> Interval = 2147483647. The streams config code (*Streams Config While
> Using
> 0.11.0.0*) is given below and the logs of the application while using
> 0.11.0.0 can be downloaded from
> https://www.dropbox.com/s/hx1e5mknf9gx5z0/commit_failed_error.log?dl=0
>
> I have upgraded the libraries to 0.11.0.1 and ran into some other issues.
> Though the CommitFailed error logs are still showing up with 0.11.0.1 the
> tasks are not getting terminated, but the app quickly runs out of memory
> (GC overhead limit exceeded) and the CPU is choked, which was not the case
> earlier. The logs are available @
> https://www.dropbox.com/s/x6oehtuoqrwjj0i/oom_gc_overhead.log?dl=0 and
> application config is also given below (*Streams Config While Using
> 0.11.0.1*).  Since I am not sure what configuration helps reduce the
> CommitFailed error, which I think could be one of the reasons for the CPU
> choke and eventually cause an OOM, I have gone ahead and used all possible
> configuration parameters, but still no luck.
>
> It would be great if you could shed some light on this as to what could be
> causing this problem.
>
> *Streams Config While Using 0.11.0.0 *
>
> val props = Properties()
> props.put(StreamsConfig.APPLICATION_ID_CONFIG,
> EngineConfig.APPLICATION_ID)
> props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
> EngineConfig.KAFKA_SERVERS)
> props.put(StreamsConfig.STATE_DIR_CONFIG, EngineConfig.STATE_STORE_DIR)
> props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 2)
> props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1)
> props.put(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, "INFO")
> props.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_
> POLL_RECORDS_CONFIG),
> 1000)
> props.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_
> POLL_INTERVAL_MS_CONFIG),
> Int.MAX_VALUE)
>
> streams = KafkaStreams(builder, StreamsConfig(props))
> streams.start()
>
>
> *Streams Config While Using 0.11.0.1*
>
> val props = Properties()
> props.put(StreamsConfig.APPLICATION_ID_CONFIG,
> EngineConfig.APPLICATION_ID)
> props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
> EngineConfig.KAFKA_SERVERS)
> props.put(StreamsConfig.STATE_DIR_CONFIG, EngineConfig.STATE_STORE_DIR)
> props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 2)
> props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2)
>
> props.put(StreamsConfig.consumerPrefix(ConsumerConfig.HEARTB
> EAT_INTERVAL_MS_CONFIG),
> 1)
> props.put(StreamsConfig.consumerPrefix(ConsumerConfig.AUTO_
> COMMIT_INTERVAL_MS_CONFIG),
> 1)
> props.put(StreamsConfig.consumerPrefix(ConsumerConfig.ENABLE
> _AUTO_COMMIT_CONFIG),
> true)
> props.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_
> POLL_RECORDS_CONFIG),
> 1)
> props.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_
> POLL_INTERVAL_MS_CONFIG),
> Int.MAX_VALUE)
> props.put(StreamsConfig.consumerPrefix(ConsumerConfig.SESSIO
> N_TIMEOUT_MS_CONFIG),
> 3)
>
> streams = KafkaStreams(builder, StreamsConfig(props))
> streams.start()
>
>
> Thanks,
> Tony
>
> On Thu, Nov 2, 2017 at 4:39 PM, Tony John 
> wrote:
>
> > Hi All,
> >
> > I am facing CommitFailedException in my streams application. As per the
> > log I tried changing the max.poll.interval.ms and max.poll.records. But
> > both didn't help. PFA the full stack trace of the exception and below is
> > the streams configuration used. What else could be wrong?
> >
> > val props = Properties()
> > props.put(StreamsConfig.APPLICATION_ID_CONFIG,
> EngineConfig.APPLICATION_ID)
> > props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
> EngineConfig.KAFKA_SERVERS)
> > props.put(StreamsConfig.STATE_DIR_CONFIG, EngineConfig.STATE_STORE_DIR)
> > props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 2)
> > props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1)
> > props.put(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, "INFO")
> > 

Using Kafka to access APIs.

2017-11-07 Thread Taha Arif
Hello,


I want to build a project that accesses the Gdax websocket in a real time
stream, and push that data into Kafka to reformat it and prep it for a ML
model.

*Can I use Kafka to access GDAX's API and pull data from it in realtime?*

or

*Should I pull data from their API to a text file in real time and use
kafka to read that text file as it changes?*


 Also!

*Please clarify for my sanity, Kafka is not a language, it is a set of APIs
that can be used from any language?*

Thanks
*\\Taha*


GlobalKTable never finishes restoring

2017-11-07 Thread Alex Good
Hi All,

I have a simple kafka streams app that seems to hang when restoring state
for a GlobalKTable. We're running in a test environment at the moment and
the topic it is loading from only has two messages in it, I don't know if
the very low volume of messages would affect the restore?

I've attached a log, the topic in question is invoices-state. As you can
see the GlobalStreamThread appears to load the two messages in the topic
and then continues to send read requests to the topic despite having caught
up. Any tips on debugging this would be very welcome.

Thanks
Alex
INFO2017-11-07 14:12:22,531 0   
org.apache.kafka.clients.producer.ProducerConfig[main]  ProducerConfig 
values:
acks = all
batch.size = 16384
bootstrap.servers = [kafka-kafka.kafka.svc.cluster.local:9092]
buffer.memory = 33554432
client.id =
compression.type = none
connections.max.idle.ms = 54
enable.idempotence = false
interceptor.classes = null
key.serializer = class 
org.apache.kafka.common.serialization.ByteArraySerializer
linger.ms = 0
max.block.ms = 6
max.in.flight.requests.per.connection = 5
max.request.size = 1048576
metadata.max.age.ms = 30
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 3
partitioner.class = class 
org.apache.kafka.clients.producer.internals.DefaultPartitioner
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 3
retries = 1
retry.backoff.ms = 100
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 6
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
transaction.timeout.ms = 5000
transactional.id = invoices-transactions
value.serializer = class 
io.confluent.kafka.serializers.KafkaAvroSerializer

INFO2017-11-07 14:12:23,124 593 
io.confluent.kafka.serializers.KafkaAvroSerializerConfig[main]  
KafkaAvroSerializerConfig values:
schema.registry.url = 
[http://sad-squid-schema-registry.schema-registry.svc.cluster.local:8000]
max.schemas.per.subject = 1000

INFO2017-11-07 14:12:24,126 1595
org.apache.kafka.clients.producer.KafkaProducer [main]  Instantiated a 
transactional producer.
INFO2017-11-07 14:12:24,127 1596
org.apache.kafka.clients.producer.KafkaProducer [main]  Overriding the default 
max.in.flight.requests.per.connection to 1 since idempontence is enabled.
DEBUG   2017-11-07 14:12:24,134 1603org.apache.kafka.common.metrics.Metrics 
[main]  Added sensor with name bufferpool-wait-time
DEBUG   2017-11-07 14:12:24,137 1606org.apache.kafka.common.metrics.Metrics 
[main]  Added sensor with name buffer-exhausted-records
DEBUG   2017-11-07 14:12:24,235 1704org.apache.kafka.clients.Metadata   
[main]  Updated cluster metadata version 1 to Cluster(id = null, nodes = 
[kafka-kafka.kafka.svc.cluster.local:9092 (id: -1 rack: null)], partitions = [])
DEBUG   2017-11-07 14:12:24,243 1712org.apache.kafka.common.metrics.Metrics 
[main]  Added sensor with name produce-throttle-time
DEBUG   2017-11-07 14:12:24,260 1729org.apache.kafka.common.metrics.Metrics 
[main]  Added sensor with name connections-closed:
DEBUG   2017-11-07 14:12:24,260 1729org.apache.kafka.common.metrics.Metrics 
[main]  Added sensor with name connections-created:
DEBUG   2017-11-07 14:12:24,260 1729org.apache.kafka.common.metrics.Metrics 
[main]  Added sensor with name bytes-sent-received:
DEBUG   2017-11-07 14:12:24,260 1729org.apache.kafka.common.metrics.Metrics 
[main]  Added sensor with name bytes-sent:
DEBUG   2017-11-07 14:12:24,261 1730org.apache.kafka.common.metrics.Metrics 
[main]  Added sensor with name bytes-received:
DEBUG   2017-11-07 14:12:24,261 1730org.apache.kafka.common.metrics.Metrics 
[main]  Added sensor with name select-time:
DEBUG   2017-11-07 14:12:24,318 1787org.apache.kafka.common.metrics.Metrics 
[main]  Added 

Re: GlobalKTable never finishes restoring

2017-11-07 Thread Ted Yu
Alex:
In the future, please use pastebin if the log is not too large.

When people find this thread in mailing list archive, the attachment
wouldn't be there.

Thanks

On Tue, Nov 7, 2017 at 8:32 AM, Matthias J. Sax 
wrote:

> Alex,
>
> I am not sure, but maybe it's a bug. I noticed that you read transaction
> data. Can you try to write to the topic without using transactions
> and/or set the consumer into READ_UNCOMMITTED mode to verify? It only a
> guess that it might be related to transactions and it would be great to
> verify or rule it out.
>
> Thanks a lot!
>
> -Matthias
>
>
> On 11/7/17 3:15 PM, Alex Good wrote:
> > Hi All,
> >
> > I have a simple kafka streams app that seems to hang when restoring
> > state for a GlobalKTable. We're running in a test environment at the
> > moment and the topic it is loading from only has two messages in it, I
> > don't know if the very low volume of messages would affect the restore?
> >
> > I've attached a log, the topic in question is invoices-state. As you can
> > see the GlobalStreamThread appears to load the two messages in the topic
> > and then continues to send read requests to the topic despite having
> > caught up. Any tips on debugging this would be very welcome.
> >
> > Thanks
> > Alex
>
>


Re: clean leader election on kafka 0.10.2.1

2017-11-07 Thread Guozhang Wang
Henry,

I am not sure what do you mean by "waits for the leader of partition to
start up"? Leader election should not affect leader - follower starting up
process.


Guozhang


On Thu, Nov 2, 2017 at 4:56 PM, Henry Cai 
wrote:

> We were on kafka 0.10.2.1.  We tried to switch from unclean leader election
> to clean leader election and found it became very difficult to start up the
> whole cluster.
>
> It seems the hosts went into a deadlock situation during startup
> - broker A was a follower on partition 1 and waits for the leader of
> partition 1 (which is broker B) to start up
> - broker B was a follower on partition 2 and waits for the leader of
> partition 2 (which is broker A) to start up
>
> We found there are quite a few deadlock related bugs fixed in 0.11 or
> later, do we have to upgrade our kafka version to use clean leader
> election?
>



-- 
-- Guozhang


Re: GlobalKTable never finishes restoring

2017-11-07 Thread Alex Good
Previously deleting and recreating the topic has solved the problem.

Based on what you've said about the offset correction I did a quick test by
building kafka streams myself with the following code in
`GlobalStateManagerImpl#restoreState()`

while (offset < highWatermark) {
final ConsumerRecords records = consumer.poll(100);
final List> restoreRecords = new
ArrayList<>();
for (ConsumerRecord record : records) {
offset = record.offset() + 1;
if (record.key() != null) {
restoreRecords.add(KeyValue.pair(record.key(),
record.value()));
}
}
stateRestoreAdapter.restoreAll(restoreRecords);
stateRestoreListener.onBatchRestored(topicPartition, storeName,
offset, restoreRecords.size());
restoreCount += restoreRecords.size();
offset = consumer.position(topicPartition);
}

Note the recalculation of the offset using consumer position at the end of
the loop. That fixed the issue so may serve as further verification of your
hypothesis?

In the meantime I suppose the workaround is to not produce transactional
messages to topics backing a GlobalKTable?

Thanks
Alex


On Tue, Nov 7, 2017 at 8:35 PM Matthias J. Sax 
wrote:

> Did you delete and recreate the topic of the GlobalStore?
>
> I did have a look into the code, too, and think there is a bug in
> `GlobalStateManagerImpl#restoreState()`. I did some initial
> investigation using an existing test, and the test passed without
> transactional data but fails if the global store data is written using
> transactions.
>
> Note: if transactions are available, commit markers will take "one spot"
> in the partitions. Currently, we break the loop using consumer record
> offset
>
> > offset = record.offset() + 1;
>
> but I think, if there is a commit marker, the highWatermark is one
> offset larger and thus this loop never terminates. We would need to
> update the offset using consumer position instead that should step over
> the commit marker correctly.
>
> Will look into this in more detail tomorrow. Would still be valuable, if
> you could verify my suspicion.
>
> Thanks!
>
> -Matthias
>
> On 11/7/17 7:01 PM, Alex Good wrote:
> > Disabling transactions doesn't seem to have changed anything. I've had a
> > read through the kafka streams source code, specifically the parts
> relating
> > to the restoration of the global stores and I can't see anything obvious
> I
> > should look at.
> >
> > @Ted will do, here's a pastebin of the most recent run
> > https://pastebin.com/rw2WbFyt
> >
> > Thanks
> > Alex
> >
> > On Tue, Nov 7, 2017 at 5:12 PM Ted Yu  wrote:
> >
> >> Alex:
> >> In the future, please use pastebin if the log is not too large.
> >>
> >> When people find this thread in mailing list archive, the attachment
> >> wouldn't be there.
> >>
> >> Thanks
> >>
> >> On Tue, Nov 7, 2017 at 8:32 AM, Matthias J. Sax 
> >> wrote:
> >>
> >>> Alex,
> >>>
> >>> I am not sure, but maybe it's a bug. I noticed that you read
> transaction
> >>> data. Can you try to write to the topic without using transactions
> >>> and/or set the consumer into READ_UNCOMMITTED mode to verify? It only a
> >>> guess that it might be related to transactions and it would be great to
> >>> verify or rule it out.
> >>>
> >>> Thanks a lot!
> >>>
> >>> -Matthias
> >>>
> >>>
> >>> On 11/7/17 3:15 PM, Alex Good wrote:
>  Hi All,
> 
>  I have a simple kafka streams app that seems to hang when restoring
>  state for a GlobalKTable. We're running in a test environment at the
>  moment and the topic it is loading from only has two messages in it, I
>  don't know if the very low volume of messages would affect the
> restore?
> 
>  I've attached a log, the topic in question is invoices-state. As you
> >> can
>  see the GlobalStreamThread appears to load the two messages in the
> >> topic
>  and then continues to send read requests to the topic despite having
>  caught up. Any tips on debugging this would be very welcome.
> 
>  Thanks
>  Alex
> >>>
> >>>
> >>
> >
>
>


Kafka connect python api

2017-11-07 Thread swaapnika guntaka
Hi

Is there a python API for the source and sink connect implementation?

My source is - Filesystem
Sink - HDFS

Thanks
-- 
Swaapnika


Kafka Start Up and Shutdown

2017-11-07 Thread chidigam .
Hi All,
I am trying to understand different configurations to apply on my cluster.
I find the num.recovery.threads.per.data.dir=1,bit interesting.
In most of configuration example I find in the google, the value set as 1.
Everyone wants quick start and recovery, to achieve the same can I increase
this setting 8 or above.
Any internal details also will help.

Thanks in advance.
Bhanu


Client addressable listener

2017-11-07 Thread Thomas Stringer
I can't seem to get a listeners and advertised.listeners configuration for
server properties figured out so I can connect remotely with my producer
and consumers.

If I set it like this...

listeners=CLIENT://:9090,PLAINTEXT://:9092
advertised.listeners=CLIENT://:9090,PLAINTEXT://:9092

>From my external client I get a NoBrokersAvailable error. If I try this...

listeners=CLIENT://0.0.0.0:9090,PLAINTEXT://:9092
advertised.listeners=CLIENT://0.0.0.0:9090,PLAINTEXT://:9092

I get an error that it can't listen on the meta-address 0.0.0.0.

This is currently being hosted with a public interface, but if I try to set
this:

listeners=CLIENT://my.public.dns.name:9090,PLAINTEXT://:9092
advertised.listeners=CLIENT://my.public.dns.name:9090,PLAINTEXT://:9092

Then I get an error that it can't bind to the requested address. This is
sitting behind some networking infrastructure, as it's obvious ip addr only
shows my private IP address.

How would I get around this to setup a listener so an external/public
producer/consumer could connect to this broker?

Thank you in advance!


Re: [ANNOUNCE] New committer: Onur Karaman

2017-11-07 Thread Jun Rao
Affan,

All known problems in the controller are described in the doc linked from
https://issues.apache.org/jira/browse/KAFKA-5027.

Thanks,

Jun

On Mon, Nov 6, 2017 at 11:00 PM, Affan Syed  wrote:

> Congrats Onur,
>
> Can you also share the document where all known problems are listed; I am
> assuming these bugs are still valid for the current stable release.
>
> Affan
>
> - Affan
>
> On Mon, Nov 6, 2017 at 10:24 PM, Jun Rao  wrote:
>
> > Hi, everyone,
> >
> > The PMC of Apache Kafka is pleased to announce a new Kafka committer Onur
> > Karaman.
> >
> > Onur's most significant work is the improvement of Kafka controller,
> which
> > is the brain of a Kafka cluster. Over time, we have accumulated quite a
> few
> > correctness and performance issues in the controller. There have been
> > attempts to fix controller issues in isolation, which would make the code
> > base more complicated without a clear path of solving all problems. Onur
> is
> > the one who took a holistic approach, by first documenting all known
> > issues, writing down a new design, coming up with a plan to deliver the
> > changes in phases and executing on it. At this point, Onur has completed
> > the two most important phases: making the controller single threaded and
> > changing the controller to use the async ZK api. The former fixed
> multiple
> > deadlocks and race conditions. The latter significantly improved the
> > performance when there are many partitions. Experimental results show
> that
> > Onur's work reduced the controlled shutdown time by a factor of 100 times
> > and the controller failover time by a factor of 3 times.
> >
> > Congratulations, Onur!
> >
> > Thanks,
> >
> > Jun (on behalf of the Apache Kafka PMC)
> >
>


Re: GlobalKTable never finishes restoring

2017-11-07 Thread Alex Good
Disabling transactions doesn't seem to have changed anything. I've had a
read through the kafka streams source code, specifically the parts relating
to the restoration of the global stores and I can't see anything obvious I
should look at.

@Ted will do, here's a pastebin of the most recent run
https://pastebin.com/rw2WbFyt

Thanks
Alex

On Tue, Nov 7, 2017 at 5:12 PM Ted Yu  wrote:

> Alex:
> In the future, please use pastebin if the log is not too large.
>
> When people find this thread in mailing list archive, the attachment
> wouldn't be there.
>
> Thanks
>
> On Tue, Nov 7, 2017 at 8:32 AM, Matthias J. Sax 
> wrote:
>
> > Alex,
> >
> > I am not sure, but maybe it's a bug. I noticed that you read transaction
> > data. Can you try to write to the topic without using transactions
> > and/or set the consumer into READ_UNCOMMITTED mode to verify? It only a
> > guess that it might be related to transactions and it would be great to
> > verify or rule it out.
> >
> > Thanks a lot!
> >
> > -Matthias
> >
> >
> > On 11/7/17 3:15 PM, Alex Good wrote:
> > > Hi All,
> > >
> > > I have a simple kafka streams app that seems to hang when restoring
> > > state for a GlobalKTable. We're running in a test environment at the
> > > moment and the topic it is loading from only has two messages in it, I
> > > don't know if the very low volume of messages would affect the restore?
> > >
> > > I've attached a log, the topic in question is invoices-state. As you
> can
> > > see the GlobalStreamThread appears to load the two messages in the
> topic
> > > and then continues to send read requests to the topic despite having
> > > caught up. Any tips on debugging this would be very welcome.
> > >
> > > Thanks
> > > Alex
> >
> >
>


Re: GlobalKTable never finishes restoring

2017-11-07 Thread Matthias J. Sax
Alex,

I am not sure, but maybe it's a bug. I noticed that you read transaction
data. Can you try to write to the topic without using transactions
and/or set the consumer into READ_UNCOMMITTED mode to verify? It only a
guess that it might be related to transactions and it would be great to
verify or rule it out.

Thanks a lot!

-Matthias


On 11/7/17 3:15 PM, Alex Good wrote:
> Hi All,
> 
> I have a simple kafka streams app that seems to hang when restoring
> state for a GlobalKTable. We're running in a test environment at the
> moment and the topic it is loading from only has two messages in it, I
> don't know if the very low volume of messages would affect the restore?
> 
> I've attached a log, the topic in question is invoices-state. As you can
> see the GlobalStreamThread appears to load the two messages in the topic
> and then continues to send read requests to the topic despite having
> caught up. Any tips on debugging this would be very welcome.
> 
> Thanks
> Alex



signature.asc
Description: OpenPGP digital signature


Re: Offset commit for partitions not owned by consumer

2017-11-07 Thread Ted Yu
Can you reveal code snippet for BufferedConsumerClientAdapterImpl ?

I took a look at the logs. There was no log around 19:46:5x in either
server or controller log.

Thanks

On Tue, Nov 7, 2017 at 8:35 AM, Mana M  wrote:

> Ted, did you get chance to look at the issue? I am also planning to update
> to latest version to find out if we still see the same issue.
>
> On Fri, Nov 3, 2017 at 8:40 PM, Mana M  wrote:
>
> > Below are the logs:
> >
> > Consumer 1 logs, where issue can be seen: https://pastebin.com/PuQhud91
> > Consumer 2 logs: https://pastebin.com/yfJDSGPA
> >
> > server.log: https://pastebin.com/QKpk0zLn
> > controller.log: https://pastebin.com/9T0niwEw
> > state-change.log: https://pastebin.com/nrftHPC9
> >
> >
> > On Fri, Nov 3, 2017 at 1:53 PM, Ted Yu  wrote:
> >
> >> Can you pastebin relevant logs from client and broker ?
> >>
> >> Thanks
> >>
> >> On Fri, Nov 3, 2017 at 1:37 PM, Manan G  wrote:
> >>
> >> > Hello,
> >> >
> >> > I am using 0.11.0.0 version of Kakfa broker and Java client library.
> My
> >> > consumer code tracks offsets for each assigned partition and at some
> >> time
> >> > interval manually commits offsets by specifying partition->offset map.
> >> >
> >> > What I noticed is, after the rebalance, even if consumer loses some
> >> > partitions that were assigned to it previously, offset commit for
> those
> >> > lost partitions still succeeds by that same consumer! Shouldn't offset
> >> > commit fail in this scenario since consumer is trying to commit
> offsets
> >> for
> >> > partitions that are not assigned to it?
> >> >
> >> > For clarity below are the logs I see with comments:
> >> >
> >> > // This is when consumer starts for "test" topic and it picks up 3
> >> > partitions
> >> > log>> onPartitionsAssigned: partitions=[test-1, test-2, test-0]
> >> >
> >> > // Now consumer processes 3 records from partition 0 and 7 records
> from
> >> > partition 2 - confirmed with log statements
> >> > log>> ...
> >> >
> >> > // Rebalance happens - right now, my code does not commit any pending
> >> > offsets here and just prints the log statement
> >> > log>> onPartitionRevoked: partitions=[test-1, test-2, test-0]
> >> >
> >> > // After re-balance, consumer loses partition 0 and 1. Again, my code
> >> does
> >> > not do anything on this callback and just prints the log statement
> >> > onPartitionsAssigned: partitions=[test-2]
> >> >
> >> > // Since the code did not commit offsets during revoke call, after
> >> > rebalance, poll() returns all records for assigned partitions since
> last
> >> > offset commit.
> >> > // ... So we re-process 7 records from partition 2. This was confirmed
> >> with
> >> > log statements.
> >> > log>> ...
> >> >
> >> > // Offset commit gets triggered after some time and due to the bug in
> >> the
> >> > code, it tries to commit offsets for both partition 0 and 2.
> >> > // There is no failure however! I can see on Kafka broker side that
> >> offset
> >> > for partition 0 is updated to 3.
> >> > // I made sure that another consumer that is actually assigned
> >> partition 0
> >> > after re-balance has not committed offset yet.
> >> > commitOffsets: {0=3, 2=7}
> >> >
> >> >
> >> > Thanks,
> >> > M
> >> >
> >>
> >
> >
>


Re: Offset commit for partitions not owned by consumer

2017-11-07 Thread Mana M
Ted, did you get chance to look at the issue? I am also planning to update
to latest version to find out if we still see the same issue.

On Fri, Nov 3, 2017 at 8:40 PM, Mana M  wrote:

> Below are the logs:
>
> Consumer 1 logs, where issue can be seen: https://pastebin.com/PuQhud91
> Consumer 2 logs: https://pastebin.com/yfJDSGPA
>
> server.log: https://pastebin.com/QKpk0zLn
> controller.log: https://pastebin.com/9T0niwEw
> state-change.log: https://pastebin.com/nrftHPC9
>
>
> On Fri, Nov 3, 2017 at 1:53 PM, Ted Yu  wrote:
>
>> Can you pastebin relevant logs from client and broker ?
>>
>> Thanks
>>
>> On Fri, Nov 3, 2017 at 1:37 PM, Manan G  wrote:
>>
>> > Hello,
>> >
>> > I am using 0.11.0.0 version of Kakfa broker and Java client library. My
>> > consumer code tracks offsets for each assigned partition and at some
>> time
>> > interval manually commits offsets by specifying partition->offset map.
>> >
>> > What I noticed is, after the rebalance, even if consumer loses some
>> > partitions that were assigned to it previously, offset commit for those
>> > lost partitions still succeeds by that same consumer! Shouldn't offset
>> > commit fail in this scenario since consumer is trying to commit offsets
>> for
>> > partitions that are not assigned to it?
>> >
>> > For clarity below are the logs I see with comments:
>> >
>> > // This is when consumer starts for "test" topic and it picks up 3
>> > partitions
>> > log>> onPartitionsAssigned: partitions=[test-1, test-2, test-0]
>> >
>> > // Now consumer processes 3 records from partition 0 and 7 records from
>> > partition 2 - confirmed with log statements
>> > log>> ...
>> >
>> > // Rebalance happens - right now, my code does not commit any pending
>> > offsets here and just prints the log statement
>> > log>> onPartitionRevoked: partitions=[test-1, test-2, test-0]
>> >
>> > // After re-balance, consumer loses partition 0 and 1. Again, my code
>> does
>> > not do anything on this callback and just prints the log statement
>> > onPartitionsAssigned: partitions=[test-2]
>> >
>> > // Since the code did not commit offsets during revoke call, after
>> > rebalance, poll() returns all records for assigned partitions since last
>> > offset commit.
>> > // ... So we re-process 7 records from partition 2. This was confirmed
>> with
>> > log statements.
>> > log>> ...
>> >
>> > // Offset commit gets triggered after some time and due to the bug in
>> the
>> > code, it tries to commit offsets for both partition 0 and 2.
>> > // There is no failure however! I can see on Kafka broker side that
>> offset
>> > for partition 0 is updated to 3.
>> > // I made sure that another consumer that is actually assigned
>> partition 0
>> > after re-balance has not committed offset yet.
>> > commitOffsets: {0=3, 2=7}
>> >
>> >
>> > Thanks,
>> > M
>> >
>>
>
>


RE: [EXTERNAL]Cannot list topics

2017-11-07 Thread Preston, Dale
It's almost certainly a typo in your command lines.  Not sure how to help 
without you posting them as requested.

Also, post the console output from when you created the topic.

Dale

-Original Message-
From: Donghun Kim [mailto:kimdho...@gmail.com] 
Sent: Monday, November 6, 2017 7:03 PM
To: users@kafka.apache.org
Subject: [EXTERNAL]Cannot list topics

I’m just following Quickstart document from 
https://na01.safelinks.protection.outlook.com/?url=http%3A%2F%2Fkafka.apache.org%2F=01%7C01%7CDale.Preston%40conocophillips.com%7Cc73b087b8ac74aa6c3de08d5257c4cdb%7Cb449db5ea80a48eba4c23c88bb78353b%7C0=dW5HwUOZvrj8vmWWBSkqWenomcAgXNCsacdpdjw2ksk%3D=0
 
.
As instantiated, Tried to list topic that I made but nothing show up.
I might be doing something wrong, but it seems to need fix.

Thanx :)


RE: Public IP

2017-11-07 Thread Otto Grunewald
Hello,

Got a bit further.

I registered a DNS using my router IP address and then I can connect to the 
Kafka server from remote at my house through the ADSL.

So the "advertised.host.name" does not refer to an IP address but an actual 
domain host name (am I correct in saying this).

Is there no way of connecting to a server that is connected to a router that 
only has an IP address and does NOT have a domain name that the IP can resolve 
to.

Thanks,

Otto Grunewald


From: Otto Grunewald
Sent: 2017-11-07 17:51
To: 'users@kafka.apache.org' 
Subject: Public IP

Hello,

Hope you can assist.

I have a kafka server configured on our office network.

All works fine.  The NodeJS (no-kafka) producers and consumers connect to the 
kafka topics and does there thing.

When I tried connecting to the kafka from an external IP address, i.e. home 
ADSL network, it didn't see the kafka server.

This I then solved by changing the "server.properties" file and setting the 
"advertised.host.name"of the kafka sever to indicate the public IP of our 
office network.

All messages where produced and consumed.

I then took the kafka server home to my ADSL network.

My thought was just to change the "server.properties" file and set the 
"advertised.host.name" to the public IP of my home network (which I get by 
doing a "myip" in google).

The result was that the no-kafka node module indicates the following msg "WARN 
no-kafka-client No broker metadata received, retrying metadata request in 
1000ms".

How does one configure kafka server for the above scenario?

The server OS is Centos 7, utilizing Zookeeper 3.4.10 and Kafka 2.12-0.10.1.1.

Hope you can assist.

Thanks in advance.

Best regards,

Otto Grunewald