Re: Reg. Kafka transactional producer and consumer

2017-11-08 Thread Apurva Mehta
Hi,

Your log segment dump and the producer log don't correlate. The producer
log shows the producerId == 4001. But your log segment dumps don't have
this producerId. Please share data from the same run where you reproduce
this issue.

For the producerId's 0-4 (shown in the dump), there seem to be no
transaction markers (because these would have sequence number == -1). So if
your messages from producerId 4001 are behind these messages, they would
never be read in read committed mode.

Thanks,
Apurva

On Mon, Nov 6, 2017 at 9:44 PM, Abhishek Verma <abhishekverma3...@gmail.com>
wrote:

> Hi Matthis J. Sax,
>
> Thank you for your suggestions.
>
> I tried the same in kafka 1.0.0 version also. Same issue is coming.
>
> I am attaching log segment below please let me know what might be the
> problem.
>
> Regards,
> Abhishek Verma
>
> 
>
>
>
> Dumping .index
>
> offset: 0 position: 0
>
> Dumping .log
>
> Starting offset: 0
>
> baseOffset: 0 lastOffset: 0 baseSequence: 0 lastSequence: 0 producerId: 0
> producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional: true position: 0
> CreateTime: 1509605714710 isvalid: true size: 103 magic: 2 compresscodec:
> NONE crc:344974185
>
> baseOffset: 1 lastOffset: 1 baseSequence: 1 lastSequence: 1 producerId: 0
> producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional: true position:
> 103 CreateTime: 1509605714863 isvalid: true size: 103 magic: 2
> compresscodec: NONE crc:102431214
>
> baseOffset: 2 lastOffset: 2 baseSequence: 0 lastSequence: 0 producerId: 1
> producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional: true position:
> 206 CreateTime: 1509607351944 isvalid: true size: 103 magic: 2
> compresscodec: NONE crc:1129944557
>
> baseOffset: 3 lastOffset: 3 baseSequence: 0 lastSequence: 0 producerId: 2
> producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional: true position:
> 309 CreateTime: 1509616649669 isvalid: true size: 110 magic: 2
> compresscodec: NONE crc:630443129
>
> baseOffset: 4 lastOffset: 4 baseSequence: 0 lastSequence: 0 producerId: 3
> producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional: true position:
> 419 CreateTime: 1509616850564 isvalid: true size: 110 magic: 2
> compresscodec: NONE crc:3357473778
>
> baseOffset: 5 lastOffset: 5 baseSequence: 0 lastSequence: 0 producerId: 4
> producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional: true position:
> 529 CreateTime: 1509624206511 isvalid: true size: 110 magic: 2
> compresscodec: NONE crc:1193735168
>
> baseOffset: 6 lastOffset: 6 baseSequence: 0 lastSequence: 0 producerId: 5
> producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional: true position:
> 639 CreateTime: 1509624453377 isvalid: true size: 110 magic: 2
> compresscodec: NONE crc:3859361029
>
> Dumping .timeindex
>
> timestamp: 0 offset: 0
>
> Found timestamp mismatch in :D:\tmp\kafka-logs-0\topic-0\
> .timeindex
>
> Index timestamp: 0, log timestamp: 1509605714710
>
> Index timestamp: 0, log timestamp: 1509605714710
>
> Found out of order timestamp in :D:\tmp\kafka-logs-0\topic-0\
> .timeindex
>
> Index timestamp: 0, Previously indexed timestamp: 0
>
>
>
> 
> 
> From: Matthias J. Sax <matth...@confluent.io>
> Sent: Saturday, November 4, 2017 8:11:07 PM
> To: users@kafka.apache.org
> Subject: Re: Reg. Kafka transactional producer and consumer
>
> Hi,
>
> this consumer log line indicates that there is an open/pending
> transaction (ie, neither committed nor aborted) and thus, the broker
> does not deliver the data to the consumer.
>
> -> highWaterMark = 5, but lastStableOffset = 0
>
>
> On 11/2/17 5:25 AM, Abhishek Verma wrote:
> > 1871 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher
> - Fetch READ_COMMITTED at offset 0 for partition topic-4-0 returned fetch
> data (error=NONE, highWaterMark=5, lastStableOffset = 0, logStartOffset =
> 0, abortedTransactions = [], recordsSizeInBytes=0)
>
>
> Thus, there must be an issue on the producer side, that the transactions
> does not get committed. Not sure why though, as producer logs indicate
> that the TX was committed successfully.
>
> Maybe you can dump the log segments to see what is in them?
>
> Btw: Kafka 1.0.0 was release recently, containing several bug fixes for
> transactions. Maybe you can try if it fixed in 1.0.0.
>
>
> -Matthias
>
>


Re: Reg. Kafka transactional producer and consumer

2017-11-06 Thread Abhishek Verma
Hi Matthis J. Sax,

Thank you for your suggestions.

I tried the same in kafka 1.0.0 version also. Same issue is coming.

I am attaching log segment below please let me know what might be the problem.

Regards,
Abhishek Verma





Dumping .index

offset: 0 position: 0

Dumping .log

Starting offset: 0

baseOffset: 0 lastOffset: 0 baseSequence: 0 lastSequence: 0 producerId: 0 
producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional: true position: 0 
CreateTime: 1509605714710 isvalid: true size: 103 magic: 2 compresscodec: NONE 
crc:344974185

baseOffset: 1 lastOffset: 1 baseSequence: 1 lastSequence: 1 producerId: 0 
producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional: true position: 103 
CreateTime: 1509605714863 isvalid: true size: 103 magic: 2 compresscodec: NONE 
crc:102431214

baseOffset: 2 lastOffset: 2 baseSequence: 0 lastSequence: 0 producerId: 1 
producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional: true position: 206 
CreateTime: 1509607351944 isvalid: true size: 103 magic: 2 compresscodec: NONE 
crc:1129944557

baseOffset: 3 lastOffset: 3 baseSequence: 0 lastSequence: 0 producerId: 2 
producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional: true position: 309 
CreateTime: 1509616649669 isvalid: true size: 110 magic: 2 compresscodec: NONE 
crc:630443129

baseOffset: 4 lastOffset: 4 baseSequence: 0 lastSequence: 0 producerId: 3 
producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional: true position: 419 
CreateTime: 1509616850564 isvalid: true size: 110 magic: 2 compresscodec: NONE 
crc:3357473778

baseOffset: 5 lastOffset: 5 baseSequence: 0 lastSequence: 0 producerId: 4 
producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional: true position: 529 
CreateTime: 1509624206511 isvalid: true size: 110 magic: 2 compresscodec: NONE 
crc:1193735168

baseOffset: 6 lastOffset: 6 baseSequence: 0 lastSequence: 0 producerId: 5 
producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional: true position: 639 
CreateTime: 1509624453377 isvalid: true size: 110 magic: 2 compresscodec: NONE 
crc:3859361029

Dumping .timeindex

timestamp: 0 offset: 0

Found timestamp mismatch in 
:D:\tmp\kafka-logs-0\topic-0\.timeindex

Index timestamp: 0, log timestamp: 1509605714710

Index timestamp: 0, log timestamp: 1509605714710

Found out of order timestamp in 
:D:\tmp\kafka-logs-0\topic-0\.timeindex

Index timestamp: 0, Previously indexed timestamp: 0





From: Matthias J. Sax <matth...@confluent.io>
Sent: Saturday, November 4, 2017 8:11:07 PM
To: users@kafka.apache.org
Subject: Re: Reg. Kafka transactional producer and consumer

Hi,

this consumer log line indicates that there is an open/pending
transaction (ie, neither committed nor aborted) and thus, the broker
does not deliver the data to the consumer.

-> highWaterMark = 5, but lastStableOffset = 0


On 11/2/17 5:25 AM, Abhishek Verma wrote:
> 1871 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher  - 
> Fetch READ_COMMITTED at offset 0 for partition topic-4-0 returned fetch data 
> (error=NONE, highWaterMark=5, lastStableOffset = 0, logStartOffset = 0, 
> abortedTransactions = [], recordsSizeInBytes=0)


Thus, there must be an issue on the producer side, that the transactions
does not get committed. Not sure why though, as producer logs indicate
that the TX was committed successfully.

Maybe you can dump the log segments to see what is in them?

Btw: Kafka 1.0.0 was release recently, containing several bug fixes for
transactions. Maybe you can try if it fixed in 1.0.0.


-Matthias



Re: Reg. Kafka transactional producer and consumer

2017-11-04 Thread Matthias J. Sax
Hi,

this consumer log line indicates that there is an open/pending
transaction (ie, neither committed nor aborted) and thus, the broker
does not deliver the data to the consumer.

-> highWaterMark = 5, but lastStableOffset = 0


On 11/2/17 5:25 AM, Abhishek Verma wrote:
> 1871 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher  - 
> Fetch READ_COMMITTED at offset 0 for partition topic-4-0 returned fetch data 
> (error=NONE, highWaterMark=5, lastStableOffset = 0, logStartOffset = 0, 
> abortedTransactions = [], recordsSizeInBytes=0)


Thus, there must be an issue on the producer side, that the transactions
does not get committed. Not sure why though, as producer logs indicate
that the TX was committed successfully.

Maybe you can dump the log segments to see what is in them?

Btw: Kafka 1.0.0 was release recently, containing several bug fixes for
transactions. Maybe you can try if it fixed in 1.0.0.


-Matthias



signature.asc
Description: OpenPGP digital signature


Reg. Kafka transactional producer and consumer

2017-11-01 Thread Abhishek Verma

Hi All,

I am trying to make a hello world example for Transactional Producer and trying 
to consume.
I am doing all this in plain java.

I can produce but consumer is not consuming message.

I searched over other places and I found some people have same problem.

Right now, I am using single broker. I tried same with 3 brokers also and it 
was not working at that time also.

I don’t know what I am missing and where… :p in Consumer I am missing something 
or in producer.

I have attached Producer and Consumer codes and console logs with my broker logs

Thanks,
Abhishek


My Broker logs after producing messages

[2017-11-01 18:45:55,000] INFO Updated PartitionLeaderEpoch. New: {epoch:4, 
offset:3}, Current: {epoch:3, offset0} for Partition: __transaction_state-2. 
Cache now contains 1 entries. (kafka.server.epoch.LeaderEpochFileCache)
[2017-11-01 18:46:03,482] INFO [Transaction Coordinator 1001]: Initialized 
transactionalId TXN_ID:0.5031925219291776-156417066 with producerId 4001 and 
producer epoch 0 on partition __transaction_state-2 
(kafka.coordinator.transaction.TransactionCoordinator)


My producer code is

import com.example.transaction.producer.utils.DataObject;
import com.example.transaction.producer.utils.serde.JsonSerializer;
import kafka.common.AuthorizationException;
import kafka.common.KafkaException;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.errors.OutOfOrderSequenceException;
import org.apache.kafka.common.errors.ProducerFencedException;

import java.util.*;

public class SampleProducer {

public static String topic = "topic-4";

public static void main(String[] args) {

Properties configProperties = new Properties();

//configProperties.put(ProducerConfig.CLIENT_ID_CONFIG, 
"some-client-id");
configProperties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "TXN_ID:" 
+ new Random().nextDouble() + new Random().nextInt());
configProperties.put("acks", "all");
configProperties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
configProperties.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
configProperties.put("key.serializer", 
"org.apache.kafka.common.serialization.IntegerSerializer");
configProperties.put("value.serializer", JsonSerializer.class);
configProperties.put("bootstrap.servers", 
"192.168.41.132:9090");


KafkaProducerproducer = new 
KafkaProducer<>(configProperties);

System.out.println("Init Transaction");
producer.initTransactions();
try {

System.out.println("transaction initialised going to begin 
transaction");
producer.beginTransaction();
System.out.println("Transaction started");

ProducerRecord rec = new ProducerRecord(topic, 5, new DataObject(5, 
"Hello, World!"));

RecordMetadata metadata = (RecordMetadata) producer.send(rec).get();
System.out.println("The offset of the record we just sent is: " + 
metadata.offset());

metadata = (RecordMetadata) producer.send(rec).get();
System.out.println("The offset of the record we just sent is: " + 
metadata.offset());

producer.commitTransaction();
System.out.println("Transaction Committed");

}catch (ProducerFencedException | OutOfOrderSequenceException | 
AuthorizationException e){
// We can't recover from these exceptions, so our only option is to 
close the producer and exit.
System.out.println("Connection closed but commit failed. We can't 
recover");
producer.close();
}catch(KafkaException e) {
// For all other exceptions, just abort the transaction and try 
again.
System.out.println("Abort Transaction");
producer.abortTransaction();
}catch (Exception x){}
producer.close();
System.out.println("Closed");
}
}




These are my producer console logs


0[main] INFO  org.apache.kafka.clients.producer.ProducerConfig  - 
ProducerConfig values:
acks = all
batch.size = 16384
bootstrap.servers = [192.168.41.132:9090]
buffer.memory = 33554432
client.id =
compression.type = none
connections.max.idle.ms = 54
enable.idempotence = true
interceptor.classes = null
key.serializer = class org.apache.kafka.common.serialization.IntegerSerializer
linger.ms = 0
max.block.ms = 6
max.in.flight.requests.per.connection = 5
max.request.size = 1048576
metadata.max.age.ms = 30
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 3
partitioner.class = class 
org.apache.kafka.clients.producer.internals.DefaultPartitioner
receive.buffer.bytes = 32768