Re: implementing kafka transactions : performance issue

2017-09-19 Thread Hugues . Deslandes
.KEY_SERIALIZER_CLASS_CONFIG,
> > org.apache.kafka.common.serialization.StringSerializer.class);
> > props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
> > io.confluent.kafka.serializers.KafkaAvroSerializer.class);
> > props.put(AbstractKafkaAvroSerDeConfig.
> > SCHEMA_REGISTRY_URL_CONFIG,schema_Registry_URL);
> >
> > props.put(ProducerConfig.ACKS_CONFIG, "all");
> > props.put(ProducerConfig.RETRIES_CONFIG , 5);
> >
> > props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,true);
> > props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,
> > transactionnalId);
> > props.put(ProducerConfig.
> > MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION,1);
> >
> > confluentProducer = new KafkaProducer<>(props);
> >
> > Any idea what could be wrong ? have I forgotten something ?
> > Thanks
> > Hugues DESLANDES
> >
> >
> >
> >
> >
> >


implementing kafka transactions : performance issue

2017-09-18 Thread Hugues . Deslandes
Hi,
I am testing an app with transactions on the producer side of kafka 
(0.11.0.1) .   I  defined the producer config (see below) and added the 
necessary lines in the app (#initTransaction, #begintransaction and 
#commitTransaction) around the existing #send
The problem I am facing is that each transcation takes up to 150ms to be 
treated which doesn't make sense, even for a laptop !
I have tested some batch size config witout any success (messages are 
around 100 bytes)
I certainly made a mistake in the setup but can't figure out which one, or 
how to investigate. I checked by removing the transaction lines and the 
app works fine (in my case less than 200 ms for 100 "send"s  to kafka) 

My config is : 3 VMs on my laptop for the kafka cluster.  My main topic 
has 3 partitions, with 3 replicas and the min.insync .replicas is set to 2 


the producer is defined by (remaing configs by default)
final Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
bootstrap_Servers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
org.apache.kafka.common.serialization.StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
io.confluent.kafka.serializers.KafkaAvroSerializer.class);
props.put(AbstractKafkaAvroSerDeConfig.
SCHEMA_REGISTRY_URL_CONFIG,schema_Registry_URL);

props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.RETRIES_CONFIG , 5);

props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,true);
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,
transactionnalId);
props.put(ProducerConfig.
MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION,1);
 
confluentProducer = new KafkaProducer<>(props);

Any idea what could be wrong ? have I forgotten something ?
Thanks
Hugues DESLANDES







Re: State store and input topics with different number of partition

2017-09-05 Thread Hugues . Deslandes
Thanks Matthias for the explanation that clarifies the behavior. 
In my case, I will go with another consumer, the risk of losing some data, 
in this particular case, is not a problem.
Regards
Hugues





De :"Matthias J. Sax" 
A : users@kafka.apache.org
Date :  01/09/2017 18:56
Objet : Re: State store and input topics with different number of 
partition



Your observation is correct. Kafka Streams creates a task per partition.
As you have a shared state store over two operator, the tasks of both
input streams need to be merged to ensure co-partitioning.

Thus, task0 reads topic1 partition0 and topic2 partion0, and all other
task[123] only topic1 partition[123] (as there are no more partitions
for the second topic.

Using the same number of partitions should be the better solution.

If you introduce a second consumer, KS does not know anything about this
consumer, and thus, cannot control when this consumer commits offsets.
And as you don't know when KS's internal consumer commits offsets, you
cannot align your own offset commits to the KS commits. Long story
short, this would be a problem for fail-over scenarios and might result
in data loss.


-Matthias

On 9/1/17 5:42 AM, hugues.deslan...@hardis.fr wrote:
> Hi,
> 
> I'd like to have your comments on the problem I met while testing my app 

> with kafka streams  (0.10.2.1) 
> Roughly, my stream app has 2 input topics :
> . the first one has 4 partitions (main data)
> . the second one has only one partition and receives messages from time 
to 
> time
> 
> At first, I supposed I had 2 sub topologies 
> A : With the first topic, I build a state store using process() and I 
also 
> have punctuate activated . 
> B : The second topic is used to trigger an analysis using the state 
store 
> data with process()
> (both processes use the same kafka topic as sink) 
> 
> During tests I realised the content of the state store viewed by this 
> process B is only based on data received on partition 0 fo the first 
topic
> I finally understood the link between those 2 sub-topologies  forced the 

> system to see it as one unique topology and have only one task by 
> partition reading the first and second topic; am I right ?
> 
> I imagined 2 options to solve this issue 
> option 1 : replace topology B by a consumer on second topic that will 
> trigger a query on statestore 
> option 2 : have 4 partitions for topic 2 and write the same message in 
the 
> 4 partitions 
> 
> I tested both but not sure which one is better ...
> Do you have any other suggestions or comments
> Thanks in advance.
> 
> Hugues
> 

[pièce jointe "signature.asc" supprimée par Hugues Deslandes/R et 
D/Hardis] 


State store and input topics with different number of partition

2017-09-01 Thread Hugues . Deslandes
Hi,

I'd like to have your comments on the problem I met while testing my app 
with kafka streams  (0.10.2.1) 
Roughly, my stream app has 2 input topics :
. the first one has 4 partitions (main data)
. the second one has only one partition and receives messages from time to 
time

At first, I supposed I had 2 sub topologies 
A : With the first topic, I build a state store using process() and I also 
have punctuate activated . 
B : The second topic is used to trigger an analysis using the state store 
data with process()
(both processes use the same kafka topic as sink) 

During tests I realised the content of the state store viewed by this 
process B is only based on data received on partition 0 fo the first topic
I finally understood the link between those 2 sub-topologies  forced the 
system to see it as one unique topology and have only one task by 
partition reading the first and second topic; am I right ?

I imagined 2 options to solve this issue 
option 1 : replace topology B by a consumer on second topic that will 
trigger a query on statestore 
option 2 : have 4 partitions for topic 2 and write the same message in the 
4 partitions 

I tested both but not sure which one is better ...
Do you have any other suggestions or comments
Thanks in advance.

Hugues