Kafka Stream tuning.

2018-02-12 Thread TSANG, Brilly
Hi kafka users,

I created a filtering stream with the Processor API;  input topic that have 
input rate at ~5 records per millisecond.  The filtering function on average 
takes 0.05milliseconds to complete which in ideal case would translate to 
(1/0.05)  20 records per millisecond.  However, when I benchmark the whole 
process, the streams is only processing 0.05 record per milliseconds.

Anyone have any idea on how to tune the steaming system to be faster as 0.05 
record is very far away from the theoretical max of 20?  The results above are 
per partition based where I have 16 partition for the input topic and all 
partitions have similar throughput.

I've only set the streams to have the following config:
Properties config = new Properties();
config.put(StreamsConfig.APPLICATION_ID_CONFIG, appId);
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrap);
config.put(StreamsConfig.STATE_DIR_CONFIG, stateDir);
config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 50);

I'm not defining TimeExtractor so the default one is used.

Thanks for any help in advance.

Regards,
Brilly




DISCLAIMER:
This email and any attachment(s) are intended solely for the person(s) named 
above, and are or may contain information of a proprietary or confidential 
nature. If you are not the intended recipient(s), you should delete this 
message immediately. Any use, disclosure or distribution of this message 
without our prior consent is strictly prohibited.
This message may be subject to errors, incomplete or late delivery, 
interruption, interception, modification, or may contain viruses. Neither Daiwa 
Capital Markets Hong Kong Limited, its subsidiaries, affiliates nor their 
officers or employees represent or warrant the accuracy or completeness, nor 
accept any responsibility or liability whatsoever for any use of or reliance 
upon, this email or any of the contents hereof. The contents of this message 
are for information purposes only, and subject to change without notice.
This message is not and is not intended to be an offer or solicitation to buy 
or sell any securities or financial products, nor does any recommendation, 
opinion or advice necessarily reflect those of Daiwa Capital Markets Hong Kong 
Limited, its subsidiaries or affiliates.



How to manually delete all files under a topic's partitioins

2018-02-12 Thread le....@qtparking.com
  Good afternoon Apache Kafka users group。 
  Excuse me, how to manually delete all partitions a topic message under the 
file, I know that the first step must be to stop the Kafka service but 
zookeeper service, there is no need to stop, then the second step is to 
directly execute the rm command to delete the partitions files, it is? 

Thank you!


unable to find custom JMX metrics

2018-02-12 Thread Salah Alkawari
hi,
i have a processor that generates custom jmx metrics:
public class ProcessorJMX implements Processor {

private StreamsMetrics streamsMetrics;
private Sensor sensorStartTs;

@Override
public void init(ProcessorContext processorContext) {
streamsMetrics = processorContext.metrics();
sensorStartTs = streamsMetrics.addSensor("start_ts", 
Sensor.RecordingLevel.INFO);
}

@Override
public void process(String key, GenericRecord val) {
streamsMetrics.recordThroughput(sensorStartTs, 
Long.valueOf(val.get("start_ts").toString()));
}

@Override
public void punctuate(long l) { }

@Override
public void close() { }
}i have this in my ProcessorSupplier - ProcessorSupplierJMX and use it like 
this:builder.stream(stringSerde, avroSerdeValue, topicOutput).process(new 
ProcessorSupplierJMX());
when i start my long running integration test, i goto jconsole, MBeans, 
kafka.streams - but i dont see this metric in any of the subfolder anywhere. 
what am i missing? am i looking in the wrong location? or do i need to activate 
something before i can see this metric?
Thanks,
Sal

Re: question on serialization ..

2018-02-12 Thread Debasish Ghosh
Regarding “has an according overload” I agree. But some operators like
reduce and leftJoin use the serdes implicitly and from the config. So if
the developer is not careful enough to have the default serdes correct then
it results in runtime error.

Also one more confusion on my part is that in config we can give one serde
for key and value. What happens if I have 2 leftJoin in my transformation
that needs different serdes from config. There is no overload for leftJoin
that allows me to provide a serde. Or am I missing something ?

regards.

On Tue, 13 Feb 2018 at 12:14 AM, Matthias J. Sax 
wrote:

> Each operator that needs to use a Serde, has a an according overload
> method that allows you to overwrite the Serde. If you don't overwrite
> it, the operator uses the Serde from the config.
>
> > If one gets the default
> >> serializer wrong then she gets run time errors in serialization /
> >> de-serialization (ClassCastException etc.)
>
> Default Serde are helpful if you use a generic format like Avro
> thoughout the whole topology. If you have many different types, it might
> be better to set default Serdes to `null` and set the Serde for each
> operator individually.
>
>
> -Matthias
>
> On 2/12/18 2:16 AM, Debasish Ghosh wrote:
> > Thanks a lot for the clear answer.
> >
> > One of the concerns that I have is that it's not always obvious when the
> > default serializers are used. e.g. it looks like KGroupedStream#reduce
> also
> > uses the default serializer under the hood. If one gets the default
> > serializer wrong then she gets run time errors in serialization /
> > de-serialization (ClassCastException etc.), which are quite hard to track
> > down.
> >
> > On Mon, Feb 12, 2018 at 4:52 AM, Matthias J. Sax 
> > wrote:
> >
> >> For stream-table-join, only the table is (de)serialized, the stream-side
> >> in only piped through and does lookups into the table.
> >>
> >> And when reading the stream
> >> (https://github.com/confluentinc/kafka-streams-
> >> examples/blob/3.3.x/src/test/scala/io/confluent/examples/streams/
> >> StreamToTableJoinScalaIntegrationTest.scala#L129)
> >> the Serdes from the config are overwritten by parameters passed into
> >> `#stream()`
> >>
> >> The default Serdes are used when reading/writing from/to a topic/store
> >> (including repartition or changelog) and if the operator does not
> >> overwrite the default Serdes via passed-in parameters.
> >>
> >>
> >> -Matthias
> >>
> >> On 2/10/18 10:34 PM, Debasish Ghosh wrote:
> >>> The inputs to the leftJoin are the stream with [String, Long] and the
> >> table
> >>> with [String, String]. Is the default serializer (I mean from the
> config)
> >>> used for [String, String] ? Then how does the [String, Long]
> >> serialization
> >>> work ?
> >>>
> >>> I guess the basic issue that I am trying to understand is how the
> default
> >>> serialisers (stringSerde, stringSerde) registered in config used for
> >>> serialising the inputs of leftJoin ..
> >>>
> >>> regards.
> >>>
> >>> On Sun, 11 Feb 2018 at 8:53 AM, Matthias J. Sax  >
> >>> wrote:
> >>>
>  userClicksJoinRegion is never serialized...
> 
>  It the result of the join and the join only (de)serializes its input
> in
>  the internal stores.
> 
>  The output it forwarded in-memory to a consecutive map and return
>  `clicksByRegion` that is [String,Long].
> 
> 
>  -Matthias
> 
>  On 2/10/18 1:17 PM, Ted Yu wrote:
> > Please read the javadoc:
> >
>  https://github.com/apache/kafka/blob/trunk/streams/src/
> >> main/java/org/apache/kafka/streams/Consumed.java
> >
> > and correlate with the sample code.
> >
> > Thanks
> >
> > On Sat, Feb 10, 2018 at 1:10 PM, Debasish Ghosh <
>  ghosh.debas...@gmail.com>
> > wrote:
> >
> >> Looking at
> >> https://github.com/confluentinc/kafka-streams-
> >> examples/blob/3.3.x/src/test/scala/io/confluent/examples/streams/
> >> StreamToTableJoinScalaIntegrationTest.scala#L148,
> >> it seems that the leftJoin generates a KStream[String, (String,
> >> Long)],
> >> which means the value is a tuple of (String, Long) .. I am not able
> to
>  get
> >> how this will serialize/de-serialize with the default serializers
> >> which
>  are
> >> both stringSerde for keys and values.
> >>
> >> or am I missing something ?
> >>
> >> regards.
> >>
> >> On Sun, Feb 11, 2018 at 2:30 AM, Ted Yu 
> wrote:
> >>
> >>> If I read the code correctly, the operation on this line prepares
> the
> >> input
> >>> for the (stringSerde, stringSerde) specified on line 142:
> >>>
> >>>   .leftJoin(userRegionsTable, (clicks: Long, region: String) =>
> >> (if
> >>> (region == null) "UNKNOWN" else region, clicks))
> >>>
> >>> FYI
> >>>
> >>> On Sat, Feb 10, 2018 at 11:00 AM, Debasish Ghosh <
> >> 

Kafka Transactions with a stretch cluster

2018-02-12 Thread Joe Hammerman
Good evening Apache Kafka users group,

I am architecting an Apache Kafka 5 node stretch cluster for 2 datacenters.
If we set min.isr to 4 and acks to 4, it would seem that we have a full
record of consumer and producer events on at least one node should
datacenter 1 get hit by a meteor.

Reviewing output of:

./bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe

On each node, one node should have a LAG value of 1 and the other should
have a value of 0 for each parititon -- can I rely on that to compile an
authoritative broker to elect?  Is the data I'm looking for easier to
retrieve if I use a zookeeper ensemble to record the offsets and perform
elections? Or should I simply set acks=all?

Thanks in advance for any assistance anyone can provide,
Joseph Hammerman


Re: [VOTE] 1.0.1 RC1

2018-02-12 Thread Ewen Cheslack-Postava
Thanks for the heads up, I forgot to drop the old ones, I've done that and
rc1 artifacts should be showing up now.

-Ewen


On Mon, Feb 12, 2018 at 12:57 PM, Ted Yu  wrote:

> +1
>
> Ran test suite which passed.
>
> BTW it seems the staging repo hasn't been updated yet:
>
> https://repository.apache.org/content/groups/staging/org/
> apache/kafka/kafka-clients/
>
> On Mon, Feb 12, 2018 at 10:16 AM, Ewen Cheslack-Postava  >
> wrote:
>
> > And of course I'm +1 since I've already done normal release validation
> > before posting this.
> >
> > -Ewen
> >
> > On Mon, Feb 12, 2018 at 10:15 AM, Ewen Cheslack-Postava <
> e...@confluent.io
> > >
> > wrote:
> >
> > > Hello Kafka users, developers and client-developers,
> > >
> > > This is the second candidate for release of Apache Kafka 1.0.1.
> > >
> > > This is a bugfix release for the 1.0 branch that was first released
> with
> > > 1.0.0 about 3 months ago. We've fixed 49 significant issues since that
> > > release. Most of these are non-critical, but in aggregate these fixes
> > will
> > > have significant impact. A few of the more significant fixes include:
> > >
> > > * KAFKA-6277: Make loadClass thread-safe for class loaders of Connect
> > > plugins
> > > * KAFKA-6185: Selector memory leak with high likelihood of OOM in case
> of
> > > down conversion
> > > * KAFKA-6269: KTable state restore fails after rebalance
> > > * KAFKA-6190: GlobalKTable never finishes restoring when consuming
> > > transactional messages
> > > * KAFKA-6529: Stop file descriptor leak when client disconnects with
> > > staged receives
> > >
> > > Release notes for the 1.0.1 release:
> > > http://home.apache.org/~ewencp/kafka-1.0.1-rc1/RELEASE_NOTES.html
> > >
> > > *** Please download, test and vote by Thursday, Feb 15, 5pm PT ***
> > >
> > > Kafka's KEYS file containing PGP keys we use to sign the release:
> > > http://kafka.apache.org/KEYS
> > >
> > > * Release artifacts to be voted upon (source and binary):
> > > http://home.apache.org/~ewencp/kafka-1.0.1-rc1/
> > >
> > > * Maven artifacts to be voted upon:
> > > https://repository.apache.org/content/groups/staging/
> > >
> > > * Javadoc:
> > > http://home.apache.org/~ewencp/kafka-1.0.1-rc1/javadoc/
> > >
> > > * Tag to be voted upon (off 1.0 branch) is the 1.0.1 tag:
> > > https://github.com/apache/kafka/tree/1.0.1-rc1
> > >
> > > * Documentation:
> > > http://kafka.apache.org/10/documentation.html
> > >
> > > * Protocol:
> > > http://kafka.apache.org/10/protocol.html
> > >
> > >
> > > Thanks,
> > > Ewen Cheslack-Postava
> > >
> >
>


Re: Typo in "Processing with local state"

2018-02-12 Thread Matthias J. Sax
If you think it's an mistake, please report at O'Reilly web page and the
authors will review there. Thx.

http://www.oreilly.com/catalog/errata.csp?isbn=0636920044123


-Matthias


On 2/12/18 1:49 PM, Ted Yu wrote:
> Hi,
> In "Kafka the definitive guide", page 257:
> 
> to calculate the minimum and average price ...
> 
> It seems average should be maximum.
> 
> Cheers
> 



signature.asc
Description: OpenPGP digital signature


Re: org.apache.kafka.clients.consumer.OffsetOutOfRangeException

2018-02-12 Thread Cody Koeninger
https://issues.apache.org/jira/browse/SPARK-19680

and

https://issues.apache.org/jira/browse/KAFKA-3370

has a good explanation.

Verify that it works correctly with auto offset set to latest, to rule
out other issues.

Then try providing explicit starting offsets reasonably near the
beginning of the topics (ConsumerStrategies.Subscribe takes a
Map[TopicPartition, Long] for starting offsets)

On Mon, Feb 12, 2018 at 3:39 PM, Matthias J. Sax  wrote:
> I don't know if any workaround. Maybe ask at Spark mailing list?
>
> -Matthias
>
> On 2/12/18 1:20 PM, Ted Yu wrote:
>> Have you looked at SPARK-19888 ?
>>
>> Please give the full stack trace of the exception you saw.
>>
>> Cheers
>>
>> On Mon, Feb 12, 2018 at 12:38 PM, Mina Aslani  wrote:
>>
>>> Hi Matthias,
>>> Are you referring to https://issues.apache.org/jira/browse/SPARK-19976?
>>> Doesn't look like that the jira was not fixed. (e.g. Resolution: "Not a
>>> Problem").
>>> So, is there any suggested workaround?
>>>
>>> Regards,
>>> Mina
>>>
>>> On Mon, Feb 12, 2018 at 3:03 PM, Matthias J. Sax 
>>> wrote:
>>>
 AFAIK, Spark does not pass this config to the consumer on purpose...
 It's not a Kafka issues -- IIRC, there is Spark JIRA ticket for this.

 -Matthias

 On 2/12/18 11:04 AM, Mina Aslani wrote:
> Hi,
>
> I am getting below error
> Caused by: org.apache.kafka.clients.consumer.
>>> OffsetOutOfRangeException:
> Offsets out of range with no configured reset policy for partitions:
> {topic1-0=304337}
> as soon as I submit a spark app to my cluster.
>
> I am using below dependency
> name: 'spark-streaming-kafka-0-10_2.11', version: '2.2.0' And setting
 the
> consumer's reset config(e.g. AUTO_OFFSET_RESET_CONFIG) to "earliest".
> As per https://kafka.apache.org/0110/documentation.html the exception
> should be thrown only when the consumer's reset config has not been set
> (e.g. default=none).
> Wondering what is the cause and how to fix.
>
> Best regards,
> Mina
>


>>>
>>
>


Confluent Replicator

2018-02-12 Thread Tauzell, Dave
Does anybody have any experience with Confluent Replicator?   Has it worked 
well for you?

-Dave


This e-mail and any files transmitted with it are confidential, may contain 
sensitive information, and are intended solely for the use of the individual or 
entity to whom they are addressed. If you have received this e-mail in error, 
please notify the sender by reply e-mail immediately and destroy all copies of 
the e-mail and any attachments.


Typo in "Processing with local state"

2018-02-12 Thread Ted Yu
Hi,
In "Kafka the definitive guide", page 257:

to calculate the minimum and average price ...

It seems average should be maximum.

Cheers


Re: org.apache.kafka.clients.consumer.OffsetOutOfRangeException

2018-02-12 Thread Matthias J. Sax
I don't know if any workaround. Maybe ask at Spark mailing list?

-Matthias

On 2/12/18 1:20 PM, Ted Yu wrote:
> Have you looked at SPARK-19888 ?
> 
> Please give the full stack trace of the exception you saw.
> 
> Cheers
> 
> On Mon, Feb 12, 2018 at 12:38 PM, Mina Aslani  wrote:
> 
>> Hi Matthias,
>> Are you referring to https://issues.apache.org/jira/browse/SPARK-19976?
>> Doesn't look like that the jira was not fixed. (e.g. Resolution: "Not a
>> Problem").
>> So, is there any suggested workaround?
>>
>> Regards,
>> Mina
>>
>> On Mon, Feb 12, 2018 at 3:03 PM, Matthias J. Sax 
>> wrote:
>>
>>> AFAIK, Spark does not pass this config to the consumer on purpose...
>>> It's not a Kafka issues -- IIRC, there is Spark JIRA ticket for this.
>>>
>>> -Matthias
>>>
>>> On 2/12/18 11:04 AM, Mina Aslani wrote:
 Hi,

 I am getting below error
 Caused by: org.apache.kafka.clients.consumer.
>> OffsetOutOfRangeException:
 Offsets out of range with no configured reset policy for partitions:
 {topic1-0=304337}
 as soon as I submit a spark app to my cluster.

 I am using below dependency
 name: 'spark-streaming-kafka-0-10_2.11', version: '2.2.0' And setting
>>> the
 consumer's reset config(e.g. AUTO_OFFSET_RESET_CONFIG) to "earliest".
 As per https://kafka.apache.org/0110/documentation.html the exception
 should be thrown only when the consumer's reset config has not been set
 (e.g. default=none).
 Wondering what is the cause and how to fix.

 Best regards,
 Mina

>>>
>>>
>>
> 



signature.asc
Description: OpenPGP digital signature


Re: why kafka index file use memory mapped files ,however log file doesn't

2018-02-12 Thread jan
A human-readable log file is likely to have much less activity in it
(it was a year ago I was using kafka and we could eat up gigs for the
data files but the log files were a few meg). So there's perhaps
little to gain.

Also if the power isn't pulled and the OS doesn't crash, log messages
will be, I guess, buffered by the OS then written out as a full
buffer, or perhaps every nth tick if the buffer fills up very slowly.
So it's still reasonably efficient.

Adding a few hundred context switches a second for the human log
probably isn't a big deal. I remember seeing several tens of
thousands/sec  when using kafka (although it was other processes
running on those multicore machines to be fair). I guess logging
overhead is down in the noise, though that's just a guess.

Also I remember reading a rather surprising post about mmaping. Just
found it 
.
Sniplets:
"There are major hardware related overheads to the use of mmap(), on
*ANY* operating system, that cannot be circumvented"
-and-
"you are assuming that copying is always bad (it isn't), that copying
is always horrendously expensive (it isn't), that memory mapping is
always cheap (it isn't cheap),"

A bit vague on my part, but HTH anyway

jan


On 12/02/2018, YuFeng Shen  wrote:
> Hi jan ,
>
> I think the reason is the same as why index file using  memory mapped file.
>
> As the memory mapped file can avoid the data copy between user and kernel
> buffer space, so it can improve the performance for the index file IO
> operation ,right? If it is ,why Log file cannot achieve the same performance
> improvement as memory mapped index file?
>
>
> Jacky
>
>
> 
> From: jan 
> Sent: Saturday, February 10, 2018 8:33 PM
> To: users@kafka.apache.org
> Subject: Re: why kafka index file use memory mapped files ,however log file
> doesn't
>
> I'm not sure I can answer your question, but may I pose another in
> return: why do you feel having a memory mapped log file would be a
> good thing?
>
>
> On 09/02/2018, YuFeng Shen  wrote:
>> Hi Experts,
>>
>> We know that kafka use memory mapped files for it's index files ,however
>> it's log files don't use the memory mapped files technology.
>>
>> May I know why index files use memory mapped files, however log files
>> don't
>> use the same technology?
>>
>>
>> Jacky
>>
>


Re: org.apache.kafka.clients.consumer.OffsetOutOfRangeException

2018-02-12 Thread Ted Yu
Have you looked at SPARK-19888 ?

Please give the full stack trace of the exception you saw.

Cheers

On Mon, Feb 12, 2018 at 12:38 PM, Mina Aslani  wrote:

> Hi Matthias,
> Are you referring to https://issues.apache.org/jira/browse/SPARK-19976?
> Doesn't look like that the jira was not fixed. (e.g. Resolution: "Not a
> Problem").
> So, is there any suggested workaround?
>
> Regards,
> Mina
>
> On Mon, Feb 12, 2018 at 3:03 PM, Matthias J. Sax 
> wrote:
>
> > AFAIK, Spark does not pass this config to the consumer on purpose...
> > It's not a Kafka issues -- IIRC, there is Spark JIRA ticket for this.
> >
> > -Matthias
> >
> > On 2/12/18 11:04 AM, Mina Aslani wrote:
> > > Hi,
> > >
> > > I am getting below error
> > > Caused by: org.apache.kafka.clients.consumer.
> OffsetOutOfRangeException:
> > > Offsets out of range with no configured reset policy for partitions:
> > > {topic1-0=304337}
> > > as soon as I submit a spark app to my cluster.
> > >
> > > I am using below dependency
> > > name: 'spark-streaming-kafka-0-10_2.11', version: '2.2.0' And setting
> > the
> > > consumer's reset config(e.g. AUTO_OFFSET_RESET_CONFIG) to "earliest".
> > > As per https://kafka.apache.org/0110/documentation.html the exception
> > > should be thrown only when the consumer's reset config has not been set
> > > (e.g. default=none).
> > > Wondering what is the cause and how to fix.
> > >
> > > Best regards,
> > > Mina
> > >
> >
> >
>


Causes of partition leader re election

2018-02-12 Thread Sunil Parmar
Environment : Cloudera Kafka 0.9
Centos 6.6

We notices that every Friday our Kafka partition's leader gets re-elected.
This causes temp issue in producers and consumers. Does / can anybody has
experience similar behavior and share what can cause partition leader
re-election ? This will help us debug this partition leader re-election.

Sunil Parmar


Re: [VOTE] 1.0.1 RC1

2018-02-12 Thread Ted Yu
+1

Ran test suite which passed.

BTW it seems the staging repo hasn't been updated yet:

https://repository.apache.org/content/groups/staging/org/apache/kafka/kafka-clients/

On Mon, Feb 12, 2018 at 10:16 AM, Ewen Cheslack-Postava 
wrote:

> And of course I'm +1 since I've already done normal release validation
> before posting this.
>
> -Ewen
>
> On Mon, Feb 12, 2018 at 10:15 AM, Ewen Cheslack-Postava  >
> wrote:
>
> > Hello Kafka users, developers and client-developers,
> >
> > This is the second candidate for release of Apache Kafka 1.0.1.
> >
> > This is a bugfix release for the 1.0 branch that was first released with
> > 1.0.0 about 3 months ago. We've fixed 49 significant issues since that
> > release. Most of these are non-critical, but in aggregate these fixes
> will
> > have significant impact. A few of the more significant fixes include:
> >
> > * KAFKA-6277: Make loadClass thread-safe for class loaders of Connect
> > plugins
> > * KAFKA-6185: Selector memory leak with high likelihood of OOM in case of
> > down conversion
> > * KAFKA-6269: KTable state restore fails after rebalance
> > * KAFKA-6190: GlobalKTable never finishes restoring when consuming
> > transactional messages
> > * KAFKA-6529: Stop file descriptor leak when client disconnects with
> > staged receives
> >
> > Release notes for the 1.0.1 release:
> > http://home.apache.org/~ewencp/kafka-1.0.1-rc1/RELEASE_NOTES.html
> >
> > *** Please download, test and vote by Thursday, Feb 15, 5pm PT ***
> >
> > Kafka's KEYS file containing PGP keys we use to sign the release:
> > http://kafka.apache.org/KEYS
> >
> > * Release artifacts to be voted upon (source and binary):
> > http://home.apache.org/~ewencp/kafka-1.0.1-rc1/
> >
> > * Maven artifacts to be voted upon:
> > https://repository.apache.org/content/groups/staging/
> >
> > * Javadoc:
> > http://home.apache.org/~ewencp/kafka-1.0.1-rc1/javadoc/
> >
> > * Tag to be voted upon (off 1.0 branch) is the 1.0.1 tag:
> > https://github.com/apache/kafka/tree/1.0.1-rc1
> >
> > * Documentation:
> > http://kafka.apache.org/10/documentation.html
> >
> > * Protocol:
> > http://kafka.apache.org/10/protocol.html
> >
> >
> > Thanks,
> > Ewen Cheslack-Postava
> >
>


Re: org.apache.kafka.clients.consumer.OffsetOutOfRangeException

2018-02-12 Thread Mina Aslani
Hi Matthias,
Are you referring to https://issues.apache.org/jira/browse/SPARK-19976?
Look like that the jira was not fixed. (e.g. Resolution: "Not a Problem").
So, is there any suggested workaround?

Regards,
Mina


On Mon, Feb 12, 2018 at 3:03 PM, Matthias J. Sax 
wrote:

> AFAIK, Spark does not pass this config to the consumer on purpose...
> It's not a Kafka issues -- IIRC, there is Spark JIRA ticket for this.
>
> -Matthias
>
> On 2/12/18 11:04 AM, Mina Aslani wrote:
> > Hi,
> >
> > I am getting below error
> > Caused by: org.apache.kafka.clients.consumer.OffsetOutOfRangeException:
> > Offsets out of range with no configured reset policy for partitions:
> > {topic1-0=304337}
> > as soon as I submit a spark app to my cluster.
> >
> > I am using below dependency
> > name: 'spark-streaming-kafka-0-10_2.11', version: '2.2.0' And setting
> the
> > consumer's reset config(e.g. AUTO_OFFSET_RESET_CONFIG) to "earliest".
> > As per https://kafka.apache.org/0110/documentation.html the exception
> > should be thrown only when the consumer's reset config has not been set
> > (e.g. default=none).
> > Wondering what is the cause and how to fix.
> >
> > Best regards,
> > Mina
> >
>
>


Re: org.apache.kafka.clients.consumer.OffsetOutOfRangeException

2018-02-12 Thread Mina Aslani
Hi Matthias,
Are you referring to https://issues.apache.org/jira/browse/SPARK-19976?
Doesn't look like that the jira was not fixed. (e.g. Resolution: "Not a
Problem").
So, is there any suggested workaround?

Regards,
Mina

On Mon, Feb 12, 2018 at 3:03 PM, Matthias J. Sax 
wrote:

> AFAIK, Spark does not pass this config to the consumer on purpose...
> It's not a Kafka issues -- IIRC, there is Spark JIRA ticket for this.
>
> -Matthias
>
> On 2/12/18 11:04 AM, Mina Aslani wrote:
> > Hi,
> >
> > I am getting below error
> > Caused by: org.apache.kafka.clients.consumer.OffsetOutOfRangeException:
> > Offsets out of range with no configured reset policy for partitions:
> > {topic1-0=304337}
> > as soon as I submit a spark app to my cluster.
> >
> > I am using below dependency
> > name: 'spark-streaming-kafka-0-10_2.11', version: '2.2.0' And setting
> the
> > consumer's reset config(e.g. AUTO_OFFSET_RESET_CONFIG) to "earliest".
> > As per https://kafka.apache.org/0110/documentation.html the exception
> > should be thrown only when the consumer's reset config has not been set
> > (e.g. default=none).
> > Wondering what is the cause and how to fix.
> >
> > Best regards,
> > Mina
> >
>
>


Re: org.apache.kafka.clients.consumer.OffsetOutOfRangeException

2018-02-12 Thread Matthias J. Sax
AFAIK, Spark does not pass this config to the consumer on purpose...
It's not a Kafka issues -- IIRC, there is Spark JIRA ticket for this.

-Matthias

On 2/12/18 11:04 AM, Mina Aslani wrote:
> Hi,
> 
> I am getting below error
> Caused by: org.apache.kafka.clients.consumer.OffsetOutOfRangeException:
> Offsets out of range with no configured reset policy for partitions:
> {topic1-0=304337}
> as soon as I submit a spark app to my cluster.
> 
> I am using below dependency
> name: 'spark-streaming-kafka-0-10_2.11', version: '2.2.0' And setting the
> consumer's reset config(e.g. AUTO_OFFSET_RESET_CONFIG) to "earliest".
> As per https://kafka.apache.org/0110/documentation.html the exception
> should be thrown only when the consumer's reset config has not been set
> (e.g. default=none).
> Wondering what is the cause and how to fix.
> 
> Best regards,
> Mina
> 



signature.asc
Description: OpenPGP digital signature


org.apache.kafka.clients.consumer.OffsetOutOfRangeException

2018-02-12 Thread Mina Aslani
Hi,

I am getting below error
Caused by: org.apache.kafka.clients.consumer.OffsetOutOfRangeException:
Offsets out of range with no configured reset policy for partitions:
{topic1-0=304337}
as soon as I submit a spark app to my cluster.

I am using below dependency
name: 'spark-streaming-kafka-0-10_2.11', version: '2.2.0' And setting the
consumer's reset config(e.g. AUTO_OFFSET_RESET_CONFIG) to "earliest".
As per https://kafka.apache.org/0110/documentation.html the exception
should be thrown only when the consumer's reset config has not been set
(e.g. default=none).
Wondering what is the cause and how to fix.

Best regards,
Mina


Re: question on serialization ..

2018-02-12 Thread Matthias J. Sax
Each operator that needs to use a Serde, has a an according overload
method that allows you to overwrite the Serde. If you don't overwrite
it, the operator uses the Serde from the config.

> If one gets the default
>> serializer wrong then she gets run time errors in serialization /
>> de-serialization (ClassCastException etc.)

Default Serde are helpful if you use a generic format like Avro
thoughout the whole topology. If you have many different types, it might
be better to set default Serdes to `null` and set the Serde for each
operator individually.


-Matthias

On 2/12/18 2:16 AM, Debasish Ghosh wrote:
> Thanks a lot for the clear answer.
> 
> One of the concerns that I have is that it's not always obvious when the
> default serializers are used. e.g. it looks like KGroupedStream#reduce also
> uses the default serializer under the hood. If one gets the default
> serializer wrong then she gets run time errors in serialization /
> de-serialization (ClassCastException etc.), which are quite hard to track
> down.
> 
> On Mon, Feb 12, 2018 at 4:52 AM, Matthias J. Sax 
> wrote:
> 
>> For stream-table-join, only the table is (de)serialized, the stream-side
>> in only piped through and does lookups into the table.
>>
>> And when reading the stream
>> (https://github.com/confluentinc/kafka-streams-
>> examples/blob/3.3.x/src/test/scala/io/confluent/examples/streams/
>> StreamToTableJoinScalaIntegrationTest.scala#L129)
>> the Serdes from the config are overwritten by parameters passed into
>> `#stream()`
>>
>> The default Serdes are used when reading/writing from/to a topic/store
>> (including repartition or changelog) and if the operator does not
>> overwrite the default Serdes via passed-in parameters.
>>
>>
>> -Matthias
>>
>> On 2/10/18 10:34 PM, Debasish Ghosh wrote:
>>> The inputs to the leftJoin are the stream with [String, Long] and the
>> table
>>> with [String, String]. Is the default serializer (I mean from the config)
>>> used for [String, String] ? Then how does the [String, Long]
>> serialization
>>> work ?
>>>
>>> I guess the basic issue that I am trying to understand is how the default
>>> serialisers (stringSerde, stringSerde) registered in config used for
>>> serialising the inputs of leftJoin ..
>>>
>>> regards.
>>>
>>> On Sun, 11 Feb 2018 at 8:53 AM, Matthias J. Sax 
>>> wrote:
>>>
 userClicksJoinRegion is never serialized...

 It the result of the join and the join only (de)serializes its input in
 the internal stores.

 The output it forwarded in-memory to a consecutive map and return
 `clicksByRegion` that is [String,Long].


 -Matthias

 On 2/10/18 1:17 PM, Ted Yu wrote:
> Please read the javadoc:
>
 https://github.com/apache/kafka/blob/trunk/streams/src/
>> main/java/org/apache/kafka/streams/Consumed.java
>
> and correlate with the sample code.
>
> Thanks
>
> On Sat, Feb 10, 2018 at 1:10 PM, Debasish Ghosh <
 ghosh.debas...@gmail.com>
> wrote:
>
>> Looking at
>> https://github.com/confluentinc/kafka-streams-
>> examples/blob/3.3.x/src/test/scala/io/confluent/examples/streams/
>> StreamToTableJoinScalaIntegrationTest.scala#L148,
>> it seems that the leftJoin generates a KStream[String, (String,
>> Long)],
>> which means the value is a tuple of (String, Long) .. I am not able to
 get
>> how this will serialize/de-serialize with the default serializers
>> which
 are
>> both stringSerde for keys and values.
>>
>> or am I missing something ?
>>
>> regards.
>>
>> On Sun, Feb 11, 2018 at 2:30 AM, Ted Yu  wrote:
>>
>>> If I read the code correctly, the operation on this line prepares the
>> input
>>> for the (stringSerde, stringSerde) specified on line 142:
>>>
>>>   .leftJoin(userRegionsTable, (clicks: Long, region: String) =>
>> (if
>>> (region == null) "UNKNOWN" else region, clicks))
>>>
>>> FYI
>>>
>>> On Sat, Feb 10, 2018 at 11:00 AM, Debasish Ghosh <
>> ghosh.debas...@gmail.com

>>> wrote:
>>>
 Hi -

 I was going through this example at
 https://github.com/confluentinc/kafka-streams-
 examples/blob/3.3.x/src/test/scala/io/confluent/examples/streams/
 StreamToTableJoinScalaIntegrationTest.scala,
 especially the leftJoin part
 https://github.com/confluentinc/kafka-streams-
 examples/blob/3.3.x/src/test/scala/io/confluent/examples/streams/
 StreamToTableJoinScalaIntegrationTest.scala#L156.
 This leftJoin returns KStream[String, (String, Long)], while default
 serializers are String for both key and value as in
 https://github.com/confluentinc/kafka-streams-
 examples/blob/3.3.x/src/test/scala/io/confluent/examples/streams/
 StreamToTableJoinScalaIntegrationTest.scala#L112-L113.

Re: [VOTE] 1.0.1 RC1

2018-02-12 Thread Ewen Cheslack-Postava
And of course I'm +1 since I've already done normal release validation
before posting this.

-Ewen

On Mon, Feb 12, 2018 at 10:15 AM, Ewen Cheslack-Postava 
wrote:

> Hello Kafka users, developers and client-developers,
>
> This is the second candidate for release of Apache Kafka 1.0.1.
>
> This is a bugfix release for the 1.0 branch that was first released with
> 1.0.0 about 3 months ago. We've fixed 49 significant issues since that
> release. Most of these are non-critical, but in aggregate these fixes will
> have significant impact. A few of the more significant fixes include:
>
> * KAFKA-6277: Make loadClass thread-safe for class loaders of Connect
> plugins
> * KAFKA-6185: Selector memory leak with high likelihood of OOM in case of
> down conversion
> * KAFKA-6269: KTable state restore fails after rebalance
> * KAFKA-6190: GlobalKTable never finishes restoring when consuming
> transactional messages
> * KAFKA-6529: Stop file descriptor leak when client disconnects with
> staged receives
>
> Release notes for the 1.0.1 release:
> http://home.apache.org/~ewencp/kafka-1.0.1-rc1/RELEASE_NOTES.html
>
> *** Please download, test and vote by Thursday, Feb 15, 5pm PT ***
>
> Kafka's KEYS file containing PGP keys we use to sign the release:
> http://kafka.apache.org/KEYS
>
> * Release artifacts to be voted upon (source and binary):
> http://home.apache.org/~ewencp/kafka-1.0.1-rc1/
>
> * Maven artifacts to be voted upon:
> https://repository.apache.org/content/groups/staging/
>
> * Javadoc:
> http://home.apache.org/~ewencp/kafka-1.0.1-rc1/javadoc/
>
> * Tag to be voted upon (off 1.0 branch) is the 1.0.1 tag:
> https://github.com/apache/kafka/tree/1.0.1-rc1
>
> * Documentation:
> http://kafka.apache.org/10/documentation.html
>
> * Protocol:
> http://kafka.apache.org/10/protocol.html
>
>
> Thanks,
> Ewen Cheslack-Postava
>


[VOTE] 1.0.1 RC1

2018-02-12 Thread Ewen Cheslack-Postava
Hello Kafka users, developers and client-developers,

This is the second candidate for release of Apache Kafka 1.0.1.

This is a bugfix release for the 1.0 branch that was first released with
1.0.0 about 3 months ago. We've fixed 49 significant issues since that
release. Most of these are non-critical, but in aggregate these fixes will
have significant impact. A few of the more significant fixes include:

* KAFKA-6277: Make loadClass thread-safe for class loaders of Connect
plugins
* KAFKA-6185: Selector memory leak with high likelihood of OOM in case of
down conversion
* KAFKA-6269: KTable state restore fails after rebalance
* KAFKA-6190: GlobalKTable never finishes restoring when consuming
transactional messages
* KAFKA-6529: Stop file descriptor leak when client disconnects with staged
receives

Release notes for the 1.0.1 release:
http://home.apache.org/~ewencp/kafka-1.0.1-rc1/RELEASE_NOTES.html

*** Please download, test and vote by Thursday, Feb 15, 5pm PT ***

Kafka's KEYS file containing PGP keys we use to sign the release:
http://kafka.apache.org/KEYS

* Release artifacts to be voted upon (source and binary):
http://home.apache.org/~ewencp/kafka-1.0.1-rc1/

* Maven artifacts to be voted upon:
https://repository.apache.org/content/groups/staging/

* Javadoc:
http://home.apache.org/~ewencp/kafka-1.0.1-rc1/javadoc/

* Tag to be voted upon (off 1.0 branch) is the 1.0.1 tag:
https://github.com/apache/kafka/tree/1.0.1-rc1

* Documentation:
http://kafka.apache.org/10/documentation.html

* Protocol:
http://kafka.apache.org/10/protocol.html


Thanks,
Ewen Cheslack-Postava


Re: why kafka index file use memory mapped files ,however log file doesn't

2018-02-12 Thread Vincent Dautremont
Just a guess : wouldn't it be because the log files on disk can be made of
compressed data when produced but needs to be uncompressed on consumption
(of a single message) ?

2018-02-12 15:50 GMT+01:00 YuFeng Shen :

> Hi jan ,
>
> I think the reason is the same as why index file using  memory mapped file.
>
> As the memory mapped file can avoid the data copy between user and kernel
> buffer space, so it can improve the performance for the index file IO
> operation ,right? If it is ,why Log file cannot achieve the same
> performance improvement as memory mapped index file?
>
>
> Jacky
>
>
> 
> From: jan 
> Sent: Saturday, February 10, 2018 8:33 PM
> To: users@kafka.apache.org
> Subject: Re: why kafka index file use memory mapped files ,however log
> file doesn't
>
> I'm not sure I can answer your question, but may I pose another in
> return: why do you feel having a memory mapped log file would be a
> good thing?
>
>
> On 09/02/2018, YuFeng Shen  wrote:
> > Hi Experts,
> >
> > We know that kafka use memory mapped files for it's index files ,however
> > it's log files don't use the memory mapped files technology.
> >
> > May I know why index files use memory mapped files, however log files
> don't
> > use the same technology?
> >
> >
> > Jacky
> >
>

-- 
The information transmitted is intended only for the person or entity to 
which it is addressed and may contain confidential and/or privileged 
material. Any review, retransmission, dissemination or other use of, or 
taking of any action in reliance upon, this information by persons or 
entities other than the intended recipient is prohibited. If you received 
this in error, please contact the sender and delete the material from any 
computer.


Re: why kafka index file use memory mapped files ,however log file doesn't

2018-02-12 Thread YuFeng Shen
Hi jan ,

I think the reason is the same as why index file using  memory mapped file.

As the memory mapped file can avoid the data copy between user and kernel 
buffer space, so it can improve the performance for the index file IO operation 
,right? If it is ,why Log file cannot achieve the same performance improvement 
as memory mapped index file?


Jacky



From: jan 
Sent: Saturday, February 10, 2018 8:33 PM
To: users@kafka.apache.org
Subject: Re: why kafka index file use memory mapped files ,however log file 
doesn't

I'm not sure I can answer your question, but may I pose another in
return: why do you feel having a memory mapped log file would be a
good thing?


On 09/02/2018, YuFeng Shen  wrote:
> Hi Experts,
>
> We know that kafka use memory mapped files for it's index files ,however
> it's log files don't use the memory mapped files technology.
>
> May I know why index files use memory mapped files, however log files don't
> use the same technology?
>
>
> Jacky
>


Re: Cross-cluster mirror making

2018-02-12 Thread Andrew Otto
Hi Husna,

The trouble with cross cluster Kafka use is unpredictable network latency.
If a consumer encounters high latency, it will just lag, but (hopefully)
eventually it will catch up.  If a producer encounters high latency, it
will have to buffer messages locally until those messages are ACKed by the
Kafka cluster.  If the buffer gets too full, it will have to start dropping
messages.

- Andrew Otto

On Thu, Feb 8, 2018 at 5:21 PM, Husna Hadi  wrote:

> Hi, I read on The Definitive Guide to Kafka that when using cross-cluster
> kafka mirroring, when possible, consume from a remote datacenter rather
> than produce to a
> remote datacenter. Why is that the case?
>
> -Husna
>
>


Re: question on serialization ..

2018-02-12 Thread Debasish Ghosh
Thanks a lot for the clear answer.

One of the concerns that I have is that it's not always obvious when the
default serializers are used. e.g. it looks like KGroupedStream#reduce also
uses the default serializer under the hood. If one gets the default
serializer wrong then she gets run time errors in serialization /
de-serialization (ClassCastException etc.), which are quite hard to track
down.

On Mon, Feb 12, 2018 at 4:52 AM, Matthias J. Sax 
wrote:

> For stream-table-join, only the table is (de)serialized, the stream-side
> in only piped through and does lookups into the table.
>
> And when reading the stream
> (https://github.com/confluentinc/kafka-streams-
> examples/blob/3.3.x/src/test/scala/io/confluent/examples/streams/
> StreamToTableJoinScalaIntegrationTest.scala#L129)
> the Serdes from the config are overwritten by parameters passed into
> `#stream()`
>
> The default Serdes are used when reading/writing from/to a topic/store
> (including repartition or changelog) and if the operator does not
> overwrite the default Serdes via passed-in parameters.
>
>
> -Matthias
>
> On 2/10/18 10:34 PM, Debasish Ghosh wrote:
> > The inputs to the leftJoin are the stream with [String, Long] and the
> table
> > with [String, String]. Is the default serializer (I mean from the config)
> > used for [String, String] ? Then how does the [String, Long]
> serialization
> > work ?
> >
> > I guess the basic issue that I am trying to understand is how the default
> > serialisers (stringSerde, stringSerde) registered in config used for
> > serialising the inputs of leftJoin ..
> >
> > regards.
> >
> > On Sun, 11 Feb 2018 at 8:53 AM, Matthias J. Sax 
> > wrote:
> >
> >> userClicksJoinRegion is never serialized...
> >>
> >> It the result of the join and the join only (de)serializes its input in
> >> the internal stores.
> >>
> >> The output it forwarded in-memory to a consecutive map and return
> >> `clicksByRegion` that is [String,Long].
> >>
> >>
> >> -Matthias
> >>
> >> On 2/10/18 1:17 PM, Ted Yu wrote:
> >>> Please read the javadoc:
> >>>
> >> https://github.com/apache/kafka/blob/trunk/streams/src/
> main/java/org/apache/kafka/streams/Consumed.java
> >>>
> >>> and correlate with the sample code.
> >>>
> >>> Thanks
> >>>
> >>> On Sat, Feb 10, 2018 at 1:10 PM, Debasish Ghosh <
> >> ghosh.debas...@gmail.com>
> >>> wrote:
> >>>
>  Looking at
>  https://github.com/confluentinc/kafka-streams-
>  examples/blob/3.3.x/src/test/scala/io/confluent/examples/streams/
>  StreamToTableJoinScalaIntegrationTest.scala#L148,
>  it seems that the leftJoin generates a KStream[String, (String,
> Long)],
>  which means the value is a tuple of (String, Long) .. I am not able to
> >> get
>  how this will serialize/de-serialize with the default serializers
> which
> >> are
>  both stringSerde for keys and values.
> 
>  or am I missing something ?
> 
>  regards.
> 
>  On Sun, Feb 11, 2018 at 2:30 AM, Ted Yu  wrote:
> 
> > If I read the code correctly, the operation on this line prepares the
>  input
> > for the (stringSerde, stringSerde) specified on line 142:
> >
> >   .leftJoin(userRegionsTable, (clicks: Long, region: String) =>
> (if
> > (region == null) "UNKNOWN" else region, clicks))
> >
> > FYI
> >
> > On Sat, Feb 10, 2018 at 11:00 AM, Debasish Ghosh <
>  ghosh.debas...@gmail.com
> >>
> > wrote:
> >
> >> Hi -
> >>
> >> I was going through this example at
> >> https://github.com/confluentinc/kafka-streams-
> >> examples/blob/3.3.x/src/test/scala/io/confluent/examples/streams/
> >> StreamToTableJoinScalaIntegrationTest.scala,
> >> especially the leftJoin part
> >> https://github.com/confluentinc/kafka-streams-
> >> examples/blob/3.3.x/src/test/scala/io/confluent/examples/streams/
> >> StreamToTableJoinScalaIntegrationTest.scala#L156.
> >> This leftJoin returns KStream[String, (String, Long)], while default
> >> serializers are String for both key and value as in
> >> https://github.com/confluentinc/kafka-streams-
> >> examples/blob/3.3.x/src/test/scala/io/confluent/examples/streams/
> >> StreamToTableJoinScalaIntegrationTest.scala#L112-L113.
> >> My question is how does this serialization work here ? I mean how
> does
> > the
> >> tuple get serialized with the default serializers ? And leftJoin
> only
> > works
> >> with default serializers ..
> >>
> >> regards.
> >>
> >> --
> >> Debasish Ghosh
> >> http://manning.com/ghosh2
> >> http://manning.com/ghosh
> >>
> >> Twttr: @debasishg
> >> Blog: http://debasishg.blogspot.com
> >> Code: http://github.com/debasishg
> >>
> >
> 
> 
> 
>  --
>  Debasish Ghosh
>  http://manning.com/ghosh2
>  http://manning.com/ghosh
> 
>  Twttr: @debasishg
>