How to keep consumers alive without polling new messages

2016-09-27 Thread Yifan Ying
Hi all,

0.10 consumers use poll() method to heartbeat Kafka brokers. Is there any
way that I can make the consumer heartbeat but not poll any messages? The
javadoc says, the recommended way is to move message processing to another
thread. But when message processing keeps failing(because a third party
service goes down for a while), the thread that actually processes messages
could have too many messages accumulated. Maybe re-sending failed messages
to another queue(IMQ) and re-processing them later is a good option?

Thanks!
-- 
Yifan


Fwd: Kafka Defunct Sockets

2016-09-27 Thread Magesh Kumar
Hi,

This is Magesh working as a Engineer at Visa INc. I'm relatively new to the
Kafka ecosystem. We are using Kafka 0.9 and during our testing in our test
environments, we have noticed that producer does retries with
NETWORK_EXCEPTION.

To debug the issue, i enabled TRACE logging and noticed that the nodes were
added to the Disconnected list and hence they were being retried.

>From the producer code, I noticed that the following would be the only
scenario where a node is marked disconnected

/* cancel any defunct sockets */
if (!key.isValid()) {
close(channel);
this.disconnected.add(channel.id());
}
} catch (Exception e) {
String desc = channel.socketDescription();
if (e instanceof IOException)
log.debug("Connection with {} disconnected", desc,
e);
else
log.warn("Unexpected error from {}; closing
connection", desc, e);
close(channel);
this.disconnected.add(channel.id());
}

Upon careful analysis, I didn't find  any logs related to the exception
block. So, the only possibility is that the sockets were becoming DeFunct.
With netsat, I found that the sockets were getting dropped periodically. I
wasn't sure if it was the Producer, Broker or the network layer thats
causing this. Just wanted to check if there is any recommendation for this.
We are using SASL.

Thanks
Magesh


Spark per topic num of partitions doubt

2016-09-27 Thread Adis Ababa
Hello,
I have asked the question on stackoverflow as well here
http://stackoverflow.com/questions/39737201/spark-kafka-per-topic-number-of-partitions-map-not-honored

I am confused about the "per topic number of partitions" parameter when
creating a inputDstream using KafkaUtils.createStream(...) method.

I am pasting the question here, please help.

>From [Spark Documentation][1]

> parameter topicMap of KafkaUtils.createStream(...) method determines
 "per-topic number of Kafka partitions to consume" [Javadoc here][2]

So, when I created a kafka topic with 3 partitions and started a spark
receiver as

Map topicMap = new HashMap<>();
topicMap.put(topic, 1);
JavaPairReceiverInputDStream inputDStream =
  KafkaUtils.createStream(javaStreamingContext, zookeeperQuorum,
groupId, topicMap);

I expected this receiver to receive messages from ONLY one partition of the
3 partitions that I created. However, when I check the offset checker, I
see the following:

Pid Offset  logSize Lag Owner
0   9   9   0   none
1   11  11  0   none
2   7   7   0   none

I expected this code to receive messages from one partition and then I
thought I needed to start more receivers (one per partition) as given in
the [documentation here][3] to cover all Kafka topic partitions.

int numStreams = 3;
List> kafkaStreams = new
ArrayList<>(numStreams);
for (int i = 0; i < numStreams; i++) {
  kafkaStreams.add(KafkaUtils.createStream(...));
}
JavaPairDStream unifiedStream =
streamingContext.union(kafkaStreams.get(0), kafkaStreams.subList(1,
kafkaStreams.size()));

So, my question is can one receiver receive messages from all partitions?
If so, what in the world does the topicMap(topic -> numPartitions) mean?

  [1]: http://spark.apache.org/docs/latest/streaming-kafka-integration.html
  [2]:
http://spark.apache.org/docs/latest/api/java/index.html?org/apache/spark/streaming/kafka/KafkaUtils.html
  [3]:
http://spark.apache.org/docs/latest/streaming-programming-guide.html#input-dstreams-and-receivers


Re: Schema Registry in Staging Environment

2016-09-27 Thread Ewen Cheslack-Postava
Lawrence,

There are two common ways to approach registration of schemas. The first is
to just rely on auto-registration that the serializers do (I'm assuming
you're using the Java clients & serializers here, or an equivalent
implementation in another language). In this case you can generally just
allow the registration to happen as the updated application hits each
stage. If it gets rejected in staging, it won't ever make it to prod. If
you discover an issue in staging, the most common case is that it isn't a
problem with the schema but rather with the surrounding code, in which case
a subsequent deploy will generally be able to succeed. Note that unrelated
changes can continue even if you end up not deploying the change to
production since they will not be affected by the newly registered schema.

The second way is to build the registration into your deployment pipeline,
so you may perform the registration before ever deploying an instance of
the app. This generally requires more coordination between your deployment
and apps (since deployment needs to know what schemas exist and how to
register them in the appropriate environment), but allows you to catch
errors a bit earlier (and may allow you to restrict writes to the schema
registry to the machines performing deployment, which some shops may want
to do).

One of the goals of the schema registry is to help decouple developers
within your organization, so it is absolutely common for developers to
simply create new schemas. In fact, they may just build the entire process
into their app. For example, while not released yet, we have a maven plugin
that you can use to integrate interactions with the schema registry into
your Maven/Java application development process:
https://github.com/confluentinc/schema-registry/tree/master/maven-plugin

-Ewen

On Mon, Sep 26, 2016 at 1:33 PM, Lawrence Weikum 
wrote:

> Hello,
>
> Has anyone used Confluent’s Schema Registry?  If so, I’m curious to hear
> about best practices for using it in a staging environment.
>
> Do users typically copy schemas over to the staging environment from
> production?  Are developers allowed to create new schemas in the staging
> environment?
>
> Thanks!
>
> Lawrence Weikum
>
>


-- 
Thanks,
Ewen


Kafka consumer receiving same message multiple times

2016-09-27 Thread Shamik Banerjee
Hi,
   I've recently started using kafka to read documents coming through a web 
crawler. What I'm noticing is when I'm dealing with few million documents, the 
consumer is processing the same message over and over again. Looks like the 
data is not getting committed for some reason. This is not the case when I'm 
testing the consumer with few hundred message.

I'm using kafka high level consumer client code in java. I'm using consumer 
group running on number of threads equivalent to number of partitions. So each 
thread is deciated to a partition. Here's a code snippet for polling data.
while(true){try{if(consumerDao.canPollTopic()){ConsumerRecordsrecords 
=consumer.poll(this.config.getPropertyAsIneger(IPreProcessorConstant.KAFKA_POLL_COUNT));for(ConsumerRecordrecord
 :records){if(record.value()!=null){TextAnalysisRequesttextAnalysisObj 
=record.value();if(textAnalysisObj!=null){PostProcessRequestreq 
=newPostProcessRequest();req.setRequest(this.getRequest(textAnalysisObj));PreProcessorUtil.submitPostProcessRequest(req,config);else{Thread.sleep(this.config.getPropertyAsIneger(IPreProcessorConstant.KAFKA_POLL_SLEEP));}}catch(Exceptionex){LOGGER.error("Error
 in Full Consumer group worker",ex);
}}
Here's the kafka consumer configuration parameters I'm setting. Rest are 
default values.
consumer.auto.commit=true
consumer.auto.commit.interval=1000
consumer.session.timeout=18
consumer.poll.records=2147483647
consumer.request.timeout=181000

Here's the complete consumer config:

metric.reporters =metadata.max.age.ms =30partition.assignment.strategy 
=[org.apache.kafka.clients.consumer.RangeAssignor]reconnect.backoff.ms 
=50sasl.kerberos.ticket.renew.window.factor =0.8max.partition.fetch.bytes 
=1048576bootstrap.servers =[kafkahost1:9092,kafkahost2:9092]ssl.keystore.type 
=JKS
enable.auto.commit =truesasl.mechanism =GSSAPI
interceptor.classes =nullexclude.internal.topics =truessl.truststore.password 
=nullclient.id =ssl.endpoint.identification.algorithm =nullmax.poll.records 
=2147483647check.crcs =truerequest.timeout.ms =181000heartbeat.interval.ms 
=3000auto.commit.interval.ms =1000receive.buffer.bytes 
=65536ssl.truststore.type =JKS
ssl.truststore.location =nullssl.keystore.password =nullfetch.min.bytes 
=1send.buffer.bytes =131072value.deserializer 
=classcom.test.preprocessor.consumer.serializer.KryoObjectSerializergroup.id 
=full_group
retry.backoff.ms =100sasl.kerberos.kinit.cmd =/usr/bin/kinit
sasl.kerberos.service.name =nullsasl.kerberos.ticket.renew.jitter 
=0.05ssl.trustmanager.algorithm =PKIX
ssl.key.password =nullfetch.max.wait.ms 
=500sasl.kerberos.min.time.before.relogin =6connections.max.idle.ms 
=54session.timeout.ms =18metrics.num.samples =2key.deserializer 
=classorg.apache.kafka.common.serialization.StringDeserializerssl.protocol =TLS
ssl.provider =nullssl.enabled.protocols 
=[TLSv1.2,TLSv1.1,TLSv1]ssl.keystore.location =nullssl.cipher.suites 
=nullsecurity.protocol =PLAINTEXT
ssl.keymanager.algorithm =SunX509metrics.sample.window.ms 
=3auto.offset.reset =latest
My sample kafka queue is having 8 partitions with 2 replication factor.
The log retention period in server.properties is setup as 168 hours.
log.retention.hours=168log.roll.hours=168
Not sure what I'm missing here. Any pointers will be appreciated.


Re: micro-batching in kafka streams

2016-09-27 Thread Ara Ebrahimi
One more thing:

Guozhang pointed me towards this sample for micro-batching: 
https://github.com/apache/kafka/blob/177b2d0bea76f270ec087ebe73431307c1aef5a1/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java

This is a good example and successfully got it adapted for my user case. BUT 
the main problem is that even if my use case deals with writing of hourly 
windows of data and hence the data is already in a rocksdb file but I need to 
create a duplicate of the same file just to be able to periodically do range 
scans on it and write to the external database. I did try to see if I could get 
StateStore to read the same rocksdb file used by the aggregateByKey which is 
happening before this step but it complained about not being able to lock the 
file. Would be great to be able to share the same underlying file between 
aggregateByKey (or any other such KTable-producing operation) and such periodic 
triggers.

Ara.

On Sep 26, 2016, at 10:40 AM, Ara Ebrahimi 
> wrote:

Hi,

So, here’s the situation:

- for classic batching of writes to external systems, right now I simply hack 
it. This specific case is writing of records to Accmumlo database, and I simply 
use the batch writer to batch writes, and it flushes every second or so. I’ve 
added a shutdown hook to the jvm to flush upon graceful exit too. This is good 
enough for me, but obviously it’s not perfect. I wish Kafka Streams had some 
sort of a trigger (based on x number of records processed, or y window of time 
passed). Which brings me to the next use case.

- I have some logic for calculating hourly statistics. So I’m dealing with 
Windowed data already. These stats then need to be written to an external 
database for use by user facing systems. Obviously I need to write the final 
result for each hourly window after we’re past that window of time (or I can 
write as often as it gets updated but the problem is that the external database 
is not as fast as Kafka). I do understand that I need to take into account the 
fact that events may arrive out of order and there may be some records arriving 
a little bit after I’ve considered the previous window over and have moved to 
the next one. I’d like to have some sort of an hourly trigger (not just pure x 
milliseconds trigger, but also support for cron style timing) and then also 
have the option to update the stats I’ve already written for a window a set 
amount of time after the trigger got triggered so that I can deal with events 
which arrive after the write for that window. And then there’s a cut-off point 
after which updating the stats for a very old window is just not worth it. 
Something like this DSL:

kstream.trigger(/* when to trigger */ Cron.of(“0 * * * *”), /* update every 
hour afterwards */ Hours.toMillis(1), /* discard changes older than this */ 
Hours.toMillis(24), /* lambda */ (windowStartTime, windowedKey, record) -> { /* 
write */ } );

The tricky part is reconciling event source time and event processing time. 
Clearly this trigger is in the event processing time whereas the data is in the 
event source time most probably.

Something like that :)

Ara.

On Sep 26, 2016, at 1:59 AM, Michael Noll 
> wrote:

Ara,

may I ask why you need to use micro-batching in the first place?

Reason why I am asking: Typically, when people talk about micro-batching,
they are refer to the way some originally batch-based stream processing
tools "bolt on" real-time processing by making their batch sizes really
small.  Here, micro-batching belongs to the realm of the inner workings of
the stream processing tool.

Orthogonally to that, you have features/operations such as windowing,
triggers, etc. that -- unlike micro-batching -- allow you as the user of
the stream processing tool to define which exact computation logic you
need.  Whether or not, say, windowing is or is not computed via
micro-batching behind the scenes should (at least in an ideal world) be of
no concern to the user.

-Michael





On Mon, Sep 5, 2016 at 9:10 PM, Ara Ebrahimi 
>
wrote:

Hi,

What’s the best way to do micro-batching in Kafka Streams? Any plans for a
built-in mechanism? Perhaps StateStore could act as the buffer? What
exactly are ProcessorContext.schedule()/punctuate() for? They don’t seem
to be used anywhere?

http://hortonworks.com/blog/apache-storm-design-pattern-micro-batching/

Ara.





This message is for the designated recipient only and may contain
privileged, proprietary, or otherwise confidential information. If you have
received it in error, please notify the sender immediately and delete the
original. Any other use of the e-mail by you is prohibited. Thank you in
advance for your cooperation.







Kafka consumer picking up the same message multiple times

2016-09-27 Thread Shamik Bandopadhyay
Hi,

  I've recently started using kafka to read documents coming through a web
crawler. What I'm noticing is when I'm dealing with few million documents,
the consumer is processing the same message over and over again. Looks like
the data is not getting committed for some reason. This is not the case
when I'm testing the consumer with few hundred message.

I'm using kafka high level consumer client code in java. I'm using consumer
group running on number of threads equivalent to number of partitions.
Here's a code snippet for polling data.

while (true) {
try{
if(consumerDao.canPollTopic()){
ConsumerRecords records =
consumer.poll(this.config.getPropertyAsIneger(IPreProcessorConstant.KAFKA_POLL_COUNT));
for (ConsumerRecord record : records) {
if(record.value()!=null){
TextAnalysisRequest textAnalysisObj = record.value();
if(textAnalysisObj!=null){
PostProcessRequest req = new PostProcessRequest();
req.setRequest(this.getRequest(textAnalysisObj));
PreProcessorUtil.submitPostProcessRequest(req, config);
}
}
}
}else{
Thread.sleep(this.config.getPropertyAsIneger(IPreProcessorConstant.KAFKA_POLL_SLEEP));
}
}catch(Exception ex){
LOGGER.error("Error in Full Consumer group worker", ex);
}
}

Here's the kafka consumer configuration parameters I'm setting. Rest are
default values.

consumer.auto.commit=true
consumer.auto.commit.interval=1000
consumer.session.timeout=18
consumer.poll.records=2147483647
consumer.request.timeout=181000


Here's the complete consumer config:

metric.reporters = []
metadata.max.age.ms = 30
partition.assignment.strategy =
[org.apache.kafka.clients.consumer.RangeAssignor]
reconnect.backoff.ms = 50
sasl.kerberos.ticket.renew.window.factor = 0.8
max.partition.fetch.bytes = 1048576
bootstrap.servers = [kafkahost1:9092, kafkahost2:9092]
ssl.keystore.type = JKS
enable.auto.commit = true
sasl.mechanism = GSSAPI
interceptor.classes = null
exclude.internal.topics = true
ssl.truststore.password = null
client.id =
ssl.endpoint.identification.algorithm = null
max.poll.records = 2147483647
check.crcs = true
request.timeout.ms = 181000
heartbeat.interval.ms = 3000
auto.commit.interval.ms = 1000
receive.buffer.bytes = 65536
ssl.truststore.type = JKS
ssl.truststore.location = null
ssl.keystore.password = null
fetch.min.bytes = 1
send.buffer.bytes = 131072
value.deserializer = class
com.test.preprocessor.consumer.serializer.KryoObjectSerializer
group.id = full_group
retry.backoff.ms = 100
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
ssl.trustmanager.algorithm = PKIX
ssl.key.password = null
fetch.max.wait.ms = 500
sasl.kerberos.min.time.before.relogin = 6
connections.max.idle.ms = 54
session.timeout.ms = 18
metrics.num.samples = 2
key.deserializer = class
org.apache.kafka.common.serialization.StringDeserializer
ssl.protocol = TLS
ssl.provider = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.keystore.location = null
ssl.cipher.suites = null
security.protocol = PLAINTEXT
ssl.keymanager.algorithm = SunX509
metrics.sample.window.ms = 3
auto.offset.reset = latest

My sample kafka queue is having 8 partitions with 2 replication factor.

The log retention period in server.properties is setup as 168 hours.

log.retention.hours=168
log.roll.hours=168

Not sure what I'm missing here. Any pointers will be appreciated.

-Thanks,
Shamik


Re: Exception while deserializing in kafka streams

2016-09-27 Thread Walter rakoff
Ah, that was it. I was passing the same Serde while creating the topology.
It works after I removed it.

Thanks!

Walter

On Mon, Sep 26, 2016 at 1:16 PM, Guozhang Wang  wrote:

> Hi Walter,
>
> One thing I can think of is that, if you pass the serde object as part of
> your topology definition, instead of passing the serde class in the config,
> then these serde objects will not be auto configured and hence for your
> case the schema registry client will not be constructed and initialized.
>
> https://issues.apache.org/jira/browse/KAFKA-3729
>
> So in case your application's topology does overwrite serdes with direct
> serde object passing, you need to configure them manually for now.
>
>
> Guozhang
>
> On Thu, Sep 22, 2016 at 5:36 PM, Guozhang Wang  wrote:
>
> > Hi Walter,
> >
> > I downloaded the 0.10.0 jar and verified that the configure() function is
> > auto-triggered when you get the serde classes from `context.keySerde /
> > valueSerde`, which is auto-triggered if you use the DSL. And your Scala
> > code is the same as to our examples code:
> >
> > https://github.com/confluentinc/examples/blob/
> > 030343f9acbb9f73a13ab0f0fd31690dca97c606/kafka-streams/src/
> > main/java/io/confluent/examples/streams/utils/GenericAvroSerializer.java
> >
> >
> > Which demo example were you running? And are there any other jars
> > co-located with the 0.10.0.0 jar that could cause another class be
> loaded?
> >
> >
> > Guozhang
> >
> >
> > On Thu, Sep 22, 2016 at 8:41 AM, Walter rakoff 
> > wrote:
> >
> >> Guozhang,
> >>
> >> I tried your suggestion. Below is the log from Serde, Serializer
> >> & Deserializer.
> >> Confirmed that KafkaAvroDeserializer.configure does get invoked.
> >>
> >> Line 379: 16/09/22 15:28:46 WARN GenericAvroSerdeWithSchemaRegistry: In
> >> > configure {num.standby.replicas=1, replication.factor=3,
> >> > commit.interval.ms=125000, bootstrap.servers=10.200.184.29:9092,
> >> > schema.registry.url=http://10.200.184.41:8081,
> >> > key.serde=org.apache.kafka.common.serialization.Serdes$LongSerde,
> >> > zookeeper.connect=10.200.184.26:2181, value.serde=class
> >> > kstreams.serdes.GenericAvroSerdeWithSchemaRegistry,
> >> > auto.offset.reset=earliest, num.stream.threads=1, client.id
> >> =KStreams-Test,
> >> > application.id=testing-app}
> >> > Line 380: 16/09/22 15:28:46 WARN GenericAvroSerializerWithSchem
> >> aRegistry:
> >> > In configure{num.standby.replicas=1, replication.factor=3,
> >> > commit.interval.ms=125000, bootstrap.servers=10.200.184.29:9092,
> >> > schema.registry.url=http://10.200.184.41:8081,
> >> > key.serde=org.apache.kafka.common.serialization.Serdes$LongSerde,
> >> > zookeeper.connect=10.200.184.26:2181, value.serde=class
> >> > kstreams.serdes.GenericAvroSerdeWithSchemaRegistry,
> >> > auto.offset.reset=earliest, num.stream.threads=1, client.id
> >> =KStreams-Test,
> >> > application.id=testing-app}
> >> > Line 385: 16/09/22 15:28:46 WARN
> >> > GenericAvroDeserializerWithSchemaRegistry: In
> >> > configure{num.standby.replicas=1, replication.factor=3,
> >> commit.interval.ms=125000,
> >> > bootstrap.servers=10.200.184.29:9092, schema.registry.url=
> >> > http://10.200.184.41:8081,
> >> > key.serde=org.apache.kafka.common.serialization.Serdes$LongSerde,
> >> > zookeeper.connect=10.200.184.26:2181, value.serde=class
> >> > kstreams.serdes.GenericAvroSerdeWithSchemaRegistry,
> >> > auto.offset.reset=earliest, num.stream.threads=1, client.id
> >> =KStreams-Test,
> >> > application.id=testing-app}
> >>
> >>
> >> Still the same exception
> >>
> >> 16/09/22 15:28:47 INFO StreamThread: Stream thread shutdown complete
> >> > [StreamThread-1]
> >> > Exception in thread "StreamThread-1"
> >> > org.apache.kafka.common.errors.SerializationException: Error
> >> deserializing
> >> > Avro message for id 7
> >> > Caused by: java.lang.NullPointerException
> >> > at
> >> > io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer
> >> .deserialize(AbstractKafkaAvroDeserializer.java:120)
> >> > at
> >> > io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer
> >> .deserialize(AbstractKafkaAvroDeserializer.java:92)
> >>
> >>
> >>
> >> Walter
> >>
> >> On Mon, Sep 19, 2016 at 8:25 PM, Guozhang Wang 
> >> wrote:
> >>
> >> > Hello Walter,
> >> >
> >> > The WARN log entry should not be the cause of this issue.
> >> >
> >> > I double checked the 0.10.0.0 release and this issue should not really
> >> > happen, so your observation is a bit weird to me. Could your add a log
> >> > entry in the `configure` function which constructs the registry client
> >> to
> >> > make sure it is indeed triggered when the streams app start up?
> >> >
> >> >
> >> > Guozhang
> >> >
> >> >
> >> >
> >> > On Fri, Sep 16, 2016 at 2:27 PM, Walter rakoff <
> walter.rak...@gmail.com
> >> >
> >> > wrote:
> >> >
> >> > > Guozhang,
> >> > >
> >> > > Any clues on this one?
> >> > >
> >> > > Walter
> >> > >
> >> > > On 

RE: SendFailedException

2016-09-27 Thread Martin Gainty
sometimes engineers run scripts out-of-order we will need the exact steps you 
are following:Are you running thru Virtualbox-Vagrant in which case we will 
need to see Vagrantfile.local 
file?https://www.codatlas.com/github.com/apache/kafka/trunk/vagrant/system-test-Vagrantfile.local
We will also need the exact  version you are implementing for Kafka-Producer ?
We will also need the exact  version you are implementing for Kafka-Consumer?
we will need to view the exact configurations you are implementing for:
.\config\connect-distributed.propertieshttps://www.codatlas.com/github.com/apache/kafka/HEAD/config/connect-distributed.properties
-OR if running 
standalone-.\config\connect-standalone.propertieshttps://www.codatlas.com/github.com/apache/kafka/trunk/config/connect-standalone.properties
.\config\consumer.propertieshttp://kafka.apache.org/082/documentation.html#consumerconfigs
.\config\producer.propertieshttp://kafka.apache.org/082/documentation.html#producerconfigs
.\config\server.propertieshttps://www.codatlas.com/github.com/apache/kafka/trunk/config/server.properties
if  you are implementing a distributed networking system such as 
Zookeeper?https://www.codatlas.com/github.com/apache/kafka/trunk/config/zookeeper.properties
which version JVM are you implementing?are you building BEFORE running kafka in 
which case which version Scala Compiler?
Thanks
Martin Gainty Enterprise *Contractor*
__ 



> From: achintya_gh...@comcast.com
> To: users@kafka.apache.org
> CC: d...@kafka.apache.org
> Subject: SendFailedException
> Date: Mon, 26 Sep 2016 20:08:22 +
> 
> Hi there,
> 
> Can anyone please help us as we are getting the SendFailedException when 
> Kafka consumer is starting and not able to consume any message?
> 
> Thanks
> Achintya
  

Consumer offsets reset for _all_ topics after increasing partitions for one topic

2016-09-27 Thread Juho Autio
I increased partitions for one existing topic (2->10), but was surprised to
see that it entirely reset the committed offsets of my consumer group.

All topics & partitions were reset to the earliest offset available, and
the consumer read everything again.

Documentation doesn't mention anything like this. Is this how it's supposed
to work, or a bug?

I would've expected the consumer offsets to not decrease at all, especially
for the topics that I didn't even touch.

For the altered topic I would've expected that consuming the previously
existing partitions 0 and 1 would've continued from the position where they
were, and naturally starting to read the new added partitions from 0.

I added partitions according to the "Modifying topics" section of Kafka
0.10.0 Documentation:

"To add partitions you can do

 > bin/kafka-topics.sh --zookeeper $ZOOKEEPER_HOST --alter --topic
altered_topic --partitions 10
"

Previously this topic had 2 partitions.

For the consumer I'm using:
kafka.javaapi.consumer.ConsumerConnector.createMessageStreamsByFilter()

And version is:

org.apache.kafka
kafka_2.11
0.10.0.1

Kafka cluster itself is kafka_2.11-0.10.0.1.


Re: producer can't push msg sometimes with 1 broker recoved

2016-09-27 Thread Kamal C
Aggie,

I'm not able to re-produce your behavior in 0.10.0.1.

> I did more testing and find the rule (Topic is created with
"--replication-factor 2 --partitions 1" in following case):
> node 1   node 2
> down(lead)   down (replica)
> down(replica) up   (lead)  producer send fail !!!

When node 2 is up, after the metadata update producer able to connect and
send messages to it.

Logs:

[2016-09-27T15:18:17,907] NetworkClient: handleDisconnections(): Node 1
disconnected.
[2016-09-27T15:18:18,007] NetworkClient: initiateConnect(): Initiating
connection to node 1 at localhost:9093.
[2016-09-27T15:18:18,008] Selector: pollSelectionKeys(): Connection with
localhost/127.0.0.1 disconnected
java.net.ConnectException: Connection refused
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
~[?:1.8.0_45]
at
sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
~[?:1.8.0_45]
at
org.apache.kafka.common.network.PlaintextTransportLayer.finishConnect(PlaintextTransportLayer.java:51)
~[kafka-clients-0.10.0.1.jar:?]
at
org.apache.kafka.common.network.KafkaChannel.finishConnect(KafkaChannel.java:73)
~[kafka-clients-0.10.0.1.jar:?]
at
org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:309)
[kafka-clients-0.10.0.1.jar:?]
at org.apache.kafka.common.network.Selector.poll(Selector.java:283)
[kafka-clients-0.10.0.1.jar:?]
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:260)
[kafka-clients-0.10.0.1.jar:?]
at
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:229)
[kafka-clients-0.10.0.1.jar:?]
at
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:134)
[kafka-clients-0.10.0.1.jar:?]
at java.lang.Thread.run(Thread.java:745) [?:1.8.0_45]
[2016-09-27T15:18:18,008] NetworkClient: handleDisconnections(): Node 1
disconnected.
[2016-09-27T15:18:18,043] NetworkClient: maybeUpdate(): Sending metadata
request {topics=[hello]} to node 0
[2016-09-27T15:18:18,052] Metadata: update(): Updated cluster metadata
version 4 to Cluster(nodes = [tcltest1.nmsworks.co.in:9092 (id: 0 rack:
null)], partitions = [Partition(topic = hello, partition = 0, leader =
none, replicas = [0,1,], isr = []])
[2016-09-27T15:18:19,053] NetworkClient: maybeUpdate(): Sending metadata
request {topics=[hello]} to node 0
[2016-09-27T15:18:19,056] Metadata: update(): Updated cluster metadata
version 5 to Cluster(nodes = [tcltest1.nmsworks.co.in:9092 (id: 0 rack:
null)], partitions = [Partition(topic = hello, partition = 0, leader = 0,
replicas = [0,1,], isr = [0,]])
[2016-09-27T15:18:19,081] KafkaProducer: main(): Batch : 4 sent
[2016-09-27T15:18:19,182] KafkaProducer: main(): Batch : 5, Sending the
record with key : 0

- Kamal

On Mon, Sep 26, 2016 at 8:53 AM, FEI Aggie 
wrote:

> Kamal,
> Thanks for your response. I tried testing with metadata.max.age.ms
> reduced to 10s, but the behavior not changed, and producer still can't find
> the live broker.
>
> I did more testing and find the rule (Topic is created with
> "--replication-factor 2 --partitions 1" in following case):
> node 1   node 2
> down(lead)   down (replica)
> down(replica) up   (lead)  producer send fail !!!
>
>

> down(lead)   down (replica)
> up  (lead)   down (replica) producer send ok !!!
>
> If the only node with original lead partition up, everything is fine.
> If the only node with original replica partition up, producer can't
> connect to broker alive (always try to connect to the original lead broker,
> node 1 in my case).
>
> Kafka can't recover for this situation? Anyone has clue for this?
>
> Thanks!
> Aggie
> -Original Message-
> From: Kamal C [mailto:kamaltar...@gmail.com]
> Sent: Saturday, September 24, 2016 1:37 PM
> To: users@kafka.apache.org
> Subject: Re: producer can't push msg sometimes with 1 broker recoved
>
> Reduce the metadata refresh interval 'metadata.max.age.ms' from 5 min to
> your desired time interval.
> This may reduce the time window of non-availability broker.
>
> -- Kamal
>


Re: Handling out-of-order messaging w/ Kafka Streams

2016-09-27 Thread Eno Thereska
Hi Mathieu,

If the messages are sent asynchronously, then what you're observing is indeed 
right. There is no guarantee that the first will arrive at the destination 
first.
Perhaps you can try sending them synchronously (i.e., wait until the first one 
is received, before sending the second). That might slow down your pipeline, 
but perhaps that's acceptable?

Eno


> On 27 Sep 2016, at 04:26, Mathieu Fenniak  
> wrote:
> 
> Hey Apache Users,
> 
> I'm working on a web application that has a web service component, and a
> background processor component.  Both applications will send messages to
> the same Kafka topic as an object is manipulated.
> 
> In some cases, a web service call in the service component will send a
> message to Kafka saying key K has state S1, then trigger a background
> operation, and then the background component will send a message to Kafka
> saying key K has state S2.  However, I'm finding that the topic ends up
> occasionally having a message K/S2 followed by K/S1, rather than the other
> way around.  As both producers in the web service call and the background
> processor send messages asynchronously with librdkafka, I believe this is a
> relatively simple race condition where messages just aren't coming in like
> I'd like them to.
> 
> In a consuming Kafka Streams application, I'd be creating a KTable of this
> topic.  What approaches can I take to ensure the the KTable will end up
> with K/S2 as the state for K, rather than the stale-er K/S1?
> 
> Would KS reorder messages if they had ordered & coordinated timestamps?  If
> so, how much leeway would it have for S2 being delivered before S1?  (I
> believe librdkafka 0.9.1 doesn't support sending create-time in messages,
> which makes this is a bit more painful.)
> 
> Any other approaches that are worth exploring?
> 
> Thanks for any thoughts,
> 
> Mathieu



intilisation of the contexte

2016-09-27 Thread Hamza HACHANI
Hi,


i would like to know how in kafka streams  the context is initilised.

Because I 've a problem with one kafka-stream apllication. every time i call it 
i notice that  the context is initilaised more than once or is created more 
than once which is abnormal and this cause a bug in the system.


Hamza