Fail to clean kafka stream state directory due to NFS silly rename

2018-06-07 Thread TSANG, Brilly
Hi all,

Is anyone having issue with the NFS silly rename for the kafka stream state?

Seems like most of the issue in jira are resolved since 0.10 but I'm currently 
using 1.0.0 and still having issue. (KAFKA-4392, KAFKA-5070)

06:17:42,741 ERROR 31 [StateDirectory] stream-thread,  Failed to lock the state 
directory due to an unexpected exception
java.nio.file.FileSystemException: /opt/kafkadata/kafkaStreams/benchmark 
/0_4/.nfs003a871f607e: Device or resource busy
at 
sun.nio.fs.UnixException.translateToIOException(UnixException.java:91) 
~[?:1.8.0_40]

Regards,
Brilly




DISCLAIMER:
This email and any attachment(s) are intended solely for the person(s) named 
above, and are or may contain information of a proprietary or confidential 
nature. If you are not the intended recipient(s), you should delete this 
message immediately. Any use, disclosure or distribution of this message 
without our prior consent is strictly prohibited.
This message may be subject to errors, incomplete or late delivery, 
interruption, interception, modification, or may contain viruses. Neither Daiwa 
Capital Markets Hong Kong Limited, its subsidiaries, affiliates nor their 
officers or employees represent or warrant the accuracy or completeness, nor 
accept any responsibility or liability whatsoever for any use of or reliance 
upon, this email or any of the contents hereof. The contents of this message 
are for information purposes only, and subject to change without notice.
This message is not and is not intended to be an offer or solicitation to buy 
or sell any securities or financial products, nor does any recommendation, 
opinion or advice necessarily reflect those of Daiwa Capital Markets Hong Kong 
Limited, its subsidiaries or affiliates.



RE: Kafka Stream tuning.

2018-02-13 Thread TSANG, Brilly
I have also check the commit-latency-avg, it's around 23 millis per commit.  
That translate to about the same throughput that I'm getting now 
(0.04message/millis).  Does anyone got any benchmark for kafka stream's 
commit-latency-avg?  Is it possible to tune it to be faster?  I just want to 
verify if this is supposed to be latency limit and we will have to work with 
horizontal scaling with more partition and stream processes if the input 
throughput is higher.

Another side question will be is custom consumer/publisher going to be faster 
than default kafka stream implementation?

Regards,
Brilly

-Original Message-
From: TSANG, Brilly [mailto:brilly.ts...@hk.daiwacm.com]
Sent: Wednesday, February 14, 2018 11:01 AM
To: users@kafka.apache.org
Subject: RE: Kafka Stream tuning.

Hey Damian and folks,

I've also tried 1000 and 500 and the performance state is exactly the same.  
Any other ideas?

Regards,
Brilly

-Original Message-
From: Damian Guy [mailto:damian@gmail.com]
Sent: Tuesday, February 13, 2018 4:48 PM
To: users@kafka.apache.org
Subject: Re: Kafka Stream tuning.

Hi Brilly,

My initial guess is that it is the overhead of committing. Commit is 
synchronous and you have the commit interval set to 50ms. Perhaps try 
increasing it.

Thanks,
Damian

On Tue, 13 Feb 2018 at 07:49 TSANG, Brilly <brilly.ts...@hk.daiwacm.com>
wrote:

> Hi kafka users,
>
> I created a filtering stream with the Processor API;  input topic that
> have input rate at ~5 records per millisecond.  The filtering function
> on average takes 0.05milliseconds to complete which in ideal case
> would translate to (1/0.05)  20 records per millisecond.  However,
> when I benchmark the whole process, the streams is only processing
> 0.05 record per milliseconds.
>
> Anyone have any idea on how to tune the steaming system to be faster
> as
> 0.05 record is very far away from the theoretical max of 20?  The
> results above are per partition based where I have 16 partition for
> the input topic and all partitions have similar throughput.
>
> I've only set the streams to have the following config:
> Properties config = new Properties();
> config.put(StreamsConfig.APPLICATION_ID_CONFIG, appId);
> config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrap);
> config.put(StreamsConfig.STATE_DIR_CONFIG, stateDir);
> config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 50);
>
> I'm not defining TimeExtractor so the default one is used.
>
> Thanks for any help in advance.
>
> Regards,
> Brilly
>
> 
>
>
> **
> **
> DISCLAIMER:
> This email and any attachment(s) are intended solely for the person(s)
> named above, and are or may contain information of a proprietary or
> confidential nature. If you are not the intended recipient(s), you
> should delete this message immediately. Any use, disclosure or
> distribution of this message without our prior consent is strictly prohibited.
> This message may be subject to errors, incomplete or late delivery,
> interruption, interception, modification, or may contain viruses.
> Neither Daiwa Capital Markets Hong Kong Limited, its subsidiaries,
> affiliates nor their officers or employees represent or warrant the
> accuracy or completeness, nor accept any responsibility or liability
> whatsoever for any use of or reliance upon, this email or any of the
> contents hereof. The contents of this message are for information
> purposes only, and subject to change without notice.
> This message is not and is not intended to be an offer or solicitation
> to buy or sell any securities or financial products, nor does any
> recommendation, opinion or advice necessarily reflect those of Daiwa
> Capital Markets Hong Kong Limited, its subsidiaries or affiliates.
>
> **
> **
>




DISCLAIMER:
This email and any attachment(s) are intended solely for the person(s) named 
above, and are or may contain information of a proprietary or confidential 
nature. If you are not the intended recipient(s), you should delete this 
message immediately. Any use, disclosure or distribution of this message 
without our prior consent is strictly prohibited.
This message may be subject to errors, incomplete or late delivery, 
interruption, interception, modification, or may contain viruses. Neither Daiwa 
Capital Markets Hong Kong Limited, its subsidiaries, affiliates nor their 
officers or employees represent or warrant the accuracy or completeness, nor 
accept any responsibility or liability whatsoever for any use of or

RE: Kafka Stream tuning.

2018-02-13 Thread TSANG, Brilly
Hey Damian and folks,

I've also tried 1000 and 500 and the performance state is exactly the same.  
Any other ideas?

Regards,
Brilly

-Original Message-
From: Damian Guy [mailto:damian@gmail.com]
Sent: Tuesday, February 13, 2018 4:48 PM
To: users@kafka.apache.org
Subject: Re: Kafka Stream tuning.

Hi Brilly,

My initial guess is that it is the overhead of committing. Commit is 
synchronous and you have the commit interval set to 50ms. Perhaps try 
increasing it.

Thanks,
Damian

On Tue, 13 Feb 2018 at 07:49 TSANG, Brilly <brilly.ts...@hk.daiwacm.com>
wrote:

> Hi kafka users,
>
> I created a filtering stream with the Processor API;  input topic that
> have input rate at ~5 records per millisecond.  The filtering function
> on average takes 0.05milliseconds to complete which in ideal case
> would translate to (1/0.05)  20 records per millisecond.  However,
> when I benchmark the whole process, the streams is only processing
> 0.05 record per milliseconds.
>
> Anyone have any idea on how to tune the steaming system to be faster
> as
> 0.05 record is very far away from the theoretical max of 20?  The
> results above are per partition based where I have 16 partition for
> the input topic and all partitions have similar throughput.
>
> I've only set the streams to have the following config:
> Properties config = new Properties();
> config.put(StreamsConfig.APPLICATION_ID_CONFIG, appId);
> config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrap);
> config.put(StreamsConfig.STATE_DIR_CONFIG, stateDir);
> config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 50);
>
> I'm not defining TimeExtractor so the default one is used.
>
> Thanks for any help in advance.
>
> Regards,
> Brilly
>
> 
>
>
> **
> **
> DISCLAIMER:
> This email and any attachment(s) are intended solely for the person(s)
> named above, and are or may contain information of a proprietary or
> confidential nature. If you are not the intended recipient(s), you
> should delete this message immediately. Any use, disclosure or
> distribution of this message without our prior consent is strictly prohibited.
> This message may be subject to errors, incomplete or late delivery,
> interruption, interception, modification, or may contain viruses.
> Neither Daiwa Capital Markets Hong Kong Limited, its subsidiaries,
> affiliates nor their officers or employees represent or warrant the
> accuracy or completeness, nor accept any responsibility or liability
> whatsoever for any use of or reliance upon, this email or any of the
> contents hereof. The contents of this message are for information
> purposes only, and subject to change without notice.
> This message is not and is not intended to be an offer or solicitation
> to buy or sell any securities or financial products, nor does any
> recommendation, opinion or advice necessarily reflect those of Daiwa
> Capital Markets Hong Kong Limited, its subsidiaries or affiliates.
>
> **
> **
>




DISCLAIMER:
This email and any attachment(s) are intended solely for the person(s) named 
above, and are or may contain information of a proprietary or confidential 
nature. If you are not the intended recipient(s), you should delete this 
message immediately. Any use, disclosure or distribution of this message 
without our prior consent is strictly prohibited.
This message may be subject to errors, incomplete or late delivery, 
interruption, interception, modification, or may contain viruses. Neither Daiwa 
Capital Markets Hong Kong Limited, its subsidiaries, affiliates nor their 
officers or employees represent or warrant the accuracy or completeness, nor 
accept any responsibility or liability whatsoever for any use of or reliance 
upon, this email or any of the contents hereof. The contents of this message 
are for information purposes only, and subject to change without notice.
This message is not and is not intended to be an offer or solicitation to buy 
or sell any securities or financial products, nor does any recommendation, 
opinion or advice necessarily reflect those of Daiwa Capital Markets Hong Kong 
Limited, its subsidiaries or affiliates.



Kafka Stream tuning.

2018-02-12 Thread TSANG, Brilly
Hi kafka users,

I created a filtering stream with the Processor API;  input topic that have 
input rate at ~5 records per millisecond.  The filtering function on average 
takes 0.05milliseconds to complete which in ideal case would translate to 
(1/0.05)  20 records per millisecond.  However, when I benchmark the whole 
process, the streams is only processing 0.05 record per milliseconds.

Anyone have any idea on how to tune the steaming system to be faster as 0.05 
record is very far away from the theoretical max of 20?  The results above are 
per partition based where I have 16 partition for the input topic and all 
partitions have similar throughput.

I've only set the streams to have the following config:
Properties config = new Properties();
config.put(StreamsConfig.APPLICATION_ID_CONFIG, appId);
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrap);
config.put(StreamsConfig.STATE_DIR_CONFIG, stateDir);
config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 50);

I'm not defining TimeExtractor so the default one is used.

Thanks for any help in advance.

Regards,
Brilly




DISCLAIMER:
This email and any attachment(s) are intended solely for the person(s) named 
above, and are or may contain information of a proprietary or confidential 
nature. If you are not the intended recipient(s), you should delete this 
message immediately. Any use, disclosure or distribution of this message 
without our prior consent is strictly prohibited.
This message may be subject to errors, incomplete or late delivery, 
interruption, interception, modification, or may contain viruses. Neither Daiwa 
Capital Markets Hong Kong Limited, its subsidiaries, affiliates nor their 
officers or employees represent or warrant the accuracy or completeness, nor 
accept any responsibility or liability whatsoever for any use of or reliance 
upon, this email or any of the contents hereof. The contents of this message 
are for information purposes only, and subject to change without notice.
This message is not and is not intended to be an offer or solicitation to buy 
or sell any securities or financial products, nor does any recommendation, 
opinion or advice necessarily reflect those of Daiwa Capital Markets Hong Kong 
Limited, its subsidiaries or affiliates.



RE: NoClassDefFoundError: Could not initialize class com.yammer.metrics.Metrics

2018-01-31 Thread TSANG, Brilly
Just make sure metrics-core-x.x.x.jar is in your class path.  That jar should 
be in your /libs.

I am using kafka_2.11-1.0.0 so I don't have the exact version number of 
metrics-core for you.

Regards,
Brilly

-Original Message-
From: ? ?? [mailto:wangchunc...@outlook.com]
Sent: Thursday, February 01, 2018 2:11 PM
To: users; user
Subject: NoClassDefFoundError: Could not initialize class 
com.yammer.metrics.Metrics

Hi,

I am using KafkaSpout to ingest data into Storm. The versions are:

Storm-1.1.0
Storm-kafka 1.1.0
kafka_2.10-0.8.2.2

The program worked well at the beginning, but at some point the KafkaSpout 
threw an exception and the program seemed stuck there afterwards. The program 
can proceed after restarted but the exception recurs occasionally.

Below is the log:

java.lang.NoClassDefFoundError: Could not initialize class 
com.yammer.metrics.Metrics at 
kafka.metrics.KafkaMetricsGroup$class.newTimer(KafkaMetricsGroup.scala:85) at 
kafka.consumer.FetchRequestAndResponseMetrics.newTimer(FetchRequestAndResponseStats.scala:26)
 at 
kafka.consumer.FetchRequestAndResponseMetrics.(FetchRequestAndResponseStats.scala:35)
 at 
kafka.consumer.FetchRequestAndResponseStats.(FetchRequestAndResponseStats.scala:46)
 at 
kafka.consumer.FetchRequestAndResponseStatsRegistry$$anonfun$2.apply(FetchRequestAndResponseStats.scala:59)
 at 
kafka.consumer.FetchRequestAndResponseStatsRegistry$$anonfun$2.apply(FetchRequestAndResponseStats.scala:59)
 at kafka.utils.Pool.getAndMaybePut(Pool.scala:61) at 
kafka.consumer.FetchRequestAndResponseStatsRegistry$.getFetchRequestAndResponseStats(FetchRequestAndResponseStats.scala:63)
 at kafka.consumer.SimpleConsumer.(SimpleConsumer.scala:39) at 
kafka.javaapi.consumer.SimpleConsumer.(SimpleConsumer.scala:34) at 
org.apache.storm.kafka.DynamicPartitionConnections.register(DynamicPartitionConnections.java:60)
 at org.apache.storm.kafka.PartitionManager.(PartitionManager.java:96) at 
org.apache.storm.kafka.ZkCoordinator.refresh(ZkCoordinator.java:108) at 
org.apache.storm.kafka.ZkCoordinator.getMyManagedPartitions(ZkCoordinator.java:69)
 at org.apache.storm.kafka.KafkaSpout.nextTuple(KafkaSpout.java:130) at 
org.apache.storm.daemon.executor$fn__4976$fn__4991$fn__5022.invoke(executor.clj:644)
 at org.apache.storm.util$async_loop$fn__557.invoke(util.clj:484) at 
clojure.lang.AFn.run(AFn.java:22) at java.lang.Thread.run(Thread.java:748)

Could anybody tell me why?


wangchunc...@outlook.com




DISCLAIMER:
This email and any attachment(s) are intended solely for the person(s) named 
above, and are or may contain information of a proprietary or confidential 
nature. If you are not the intended recipient(s), you should delete this 
message immediately. Any use, disclosure or distribution of this message 
without our prior consent is strictly prohibited.
This message may be subject to errors, incomplete or late delivery, 
interruption, interception, modification, or may contain viruses. Neither Daiwa 
Capital Markets Hong Kong Limited, its subsidiaries, affiliates nor their 
officers or employees represent or warrant the accuracy or completeness, nor 
accept any responsibility or liability whatsoever for any use of or reliance 
upon, this email or any of the contents hereof. The contents of this message 
are for information purposes only, and subject to change without notice.
This message is not and is not intended to be an offer or solicitation to buy 
or sell any securities or financial products, nor does any recommendation, 
opinion or advice necessarily reflect those of Daiwa Capital Markets Hong Kong 
Limited, its subsidiaries or affiliates.



RE: How to always consume from latest offset in kafka-streams

2018-01-22 Thread TSANG, Brilly

If you are doing dynamic assignment ( consumer.subscription), you can try this 
in your code:

 KafkaConsumer consumer = new KafkaConsumer<>(props);
 consumer.subscribe(Collections.singletonList("your_topic"), this);
consumer.poll(0)  //Just so you are connected and will have TopicPartition 
dynamically assigned to your consumer
Set assignment = consumer.assignment();

//Seek to end should reset all the index to latest and you can poll from there 
to read.
consumer.seekToEnd(assignment);

// do your regular consumer loop here
.

Regards,
Brilly

-Original Message-
From: Xin Li [mailto:xin...@trivago.com]
Sent: Tuesday, January 23, 2018 3:01 AM
To: users@kafka.apache.org
Subject: Re: How to always consume from latest offset in kafka-streams

This?
consumer.auto.offset.reset = latest

Best,
Xin

On 19.01.18, 19:34, "Saloni Vithalani"  wrote:

Our requirement is such that if a kafka-stream app is consuming a
partition, it should start it's consumption from latest offset of that
partition.

This seems like do-able using

streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")

Now, let's say using above configuration, the kafka-stream app started
consuming data from latest offset for a partition. And after some time, the
app crashes. When the app comes back live, we want it to consume data from
the latest offset of that partition, instead of the where it left last
reading.

But I can't find anything that can help achieve it using kafka-streams api.

P.S. We are using kafka-1.0.0.
Saloni Vithalani
Developer
Email salo...@thoughtworks.com
Telephone +91 8552889571 <8552889571>
[image: ThoughtWorks]








DISCLAIMER:
This email and any attachment(s) are intended solely for the person(s) named 
above, and are or may contain information of a proprietary or confidential 
nature. If you are not the intended recipient(s), you should delete this 
message immediately. Any use, disclosure or distribution of this message 
without our prior consent is strictly prohibited.
This message may be subject to errors, incomplete or late delivery, 
interruption, interception, modification, or may contain viruses. Neither Daiwa 
Capital Markets Hong Kong Limited, its subsidiaries, affiliates nor their 
officers or employees represent or warrant the accuracy or completeness, nor 
accept any responsibility or liability whatsoever for any use of or reliance 
upon, this email or any of the contents hereof. The contents of this message 
are for information purposes only, and subject to change without notice.
This message is not and is not intended to be an offer or solicitation to buy 
or sell any securities or financial products, nor does any recommendation, 
opinion or advice necessarily reflect those of Daiwa Capital Markets Hong Kong 
Limited, its subsidiaries or affiliates.



RE: can't feed remote broker with producer demo

2018-01-22 Thread TSANG, Brilly
Hi Rotem,

I'm not 100% sure but you can try set listeners in 
\config\server.properties and see if that help.

Regards,
Brilly

-Original Message-
From: Rotem Jacobi [mailto:rot...@radcom.com]
Sent: Monday, January 22, 2018 11:16 PM
To: users@kafka.apache.org
Subject: can't feed remote broker with producer demo

Hi,
When running the quickstart guide (producer, broker (with zookeeper) and 
consumer on the same machine) it works perfectly.
When trying to run the producer from another machine to feed the broker I'm 
getting an error message:

C:\Development\kafka_2.11-0.11.0.0\bin\windows>kafka-console-producer.bat 
--broker-list 172.16.40.125:9092 --topic test
>test_message
>[2018-01-22 17:14:44,240] ERROR Error when sending message to topic
>test with key: null, value: 12 bytes with error:
>(org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for 
test-0: 2298 ms has passed since batch creation plus linger time Any idea?
Is there any broker configuration that I'm missing here?

Thanks,
Rotem.




DISCLAIMER:
This email and any attachment(s) are intended solely for the person(s) named 
above, and are or may contain information of a proprietary or confidential 
nature. If you are not the intended recipient(s), you should delete this 
message immediately. Any use, disclosure or distribution of this message 
without our prior consent is strictly prohibited.
This message may be subject to errors, incomplete or late delivery, 
interruption, interception, modification, or may contain viruses. Neither Daiwa 
Capital Markets Hong Kong Limited, its subsidiaries, affiliates nor their 
officers or employees represent or warrant the accuracy or completeness, nor 
accept any responsibility or liability whatsoever for any use of or reliance 
upon, this email or any of the contents hereof. The contents of this message 
are for information purposes only, and subject to change without notice.
This message is not and is not intended to be an offer or solicitation to buy 
or sell any securities or financial products, nor does any recommendation, 
opinion or advice necessarily reflect those of Daiwa Capital Markets Hong Kong 
Limited, its subsidiaries or affiliates.



Does Kafka consumer api provide a way for you to search for partition number of a topic with key?

2018-01-21 Thread TSANG, Brilly
Hi folks,


I'm working with a topic that have many messages. Kafka is scale horizontally. 
As a result, when they spread out to multiple processes, only 1 will work with 
the specific key. The rest are not related and should stop the processing.

Is there a way from the client API to hash the partition number from the key? 
With the partition number, we can stop the other processor from wasting cycle 
and start processing other events
Regards,
Brilly






DISCLAIMER:
This email and any attachment(s) are intended solely for the person(s) named 
above, and are or may contain information of a proprietary or confidential 
nature. If you are not the intended recipient(s), you should delete this 
message immediately. Any use, disclosure or distribution of this message 
without our prior consent is strictly prohibited.
This message may be subject to errors, incomplete or late delivery, 
interruption, interception, modification, or may contain viruses. Neither Daiwa 
Capital Markets Hong Kong Limited, its subsidiaries, affiliates nor their 
officers or employees represent or warrant the accuracy or completeness, nor 
accept any responsibility or liability whatsoever for any use of or reliance 
upon, this email or any of the contents hereof. The contents of this message 
are for information purposes only, and subject to change without notice.
This message is not and is not intended to be an offer or solicitation to buy 
or sell any securities or financial products, nor does any recommendation, 
opinion or advice necessarily reflect those of Daiwa Capital Markets Hong Kong 
Limited, its subsidiaries or affiliates.