jeqo commented on code in PR #15241:
URL: https://github.com/apache/kafka/pull/15241#discussion_r1650469442
##########
storage/src/main/java/org/apache/kafka/storage/internals/log/TransactionIndex.java:
##########
@@ -190,75 +175,145 @@ public void sanityCheck() {
AbortedTxn abortedTxn = txnWithPosition.txn;
if (abortedTxn.lastOffset() < startOffset)
throw new CorruptIndexException("Last offset of aborted
transaction " + abortedTxn + " in index "
- + file.getAbsolutePath() + " is less than start offset " +
startOffset);
+ + txnFile.path().toAbsolutePath() + " is less than start
offset " + startOffset);
}
}
- private FileChannel openChannel() throws IOException {
- FileChannel channel = FileChannel.open(file.toPath(),
StandardOpenOption.CREATE,
- StandardOpenOption.READ, StandardOpenOption.WRITE);
- maybeChannel = Optional.of(channel);
- channel.position(channel.size());
- return channel;
- }
-
- private FileChannel channel() throws IOException {
- FileChannel channel = channelOrNull();
- if (channel == null)
- return openChannel();
- else
- return channel;
- }
-
- private FileChannel channelOrNull() {
- return maybeChannel.orElse(null);
- }
-
private Iterable<AbortedTxnWithPosition> iterable() {
return iterable(() -> ByteBuffer.allocate(AbortedTxn.TOTAL_SIZE));
}
private Iterable<AbortedTxnWithPosition> iterable(Supplier<ByteBuffer>
allocate) {
- FileChannel channel = channelOrNull();
- if (channel == null)
+ if (!txnFile.exists())
return Collections.emptyList();
- PrimitiveRef.IntRef position = PrimitiveRef.ofInt(0);
+ try {
+ FileChannel channel = txnFile.channel();
+ PrimitiveRef.IntRef position = PrimitiveRef.ofInt(0);
+
+ return () -> new Iterator<AbortedTxnWithPosition>() {
+
+ @Override
+ public boolean hasNext() {
+ try {
+ return channel.position() - position.value >=
AbortedTxn.TOTAL_SIZE;
+ } catch (IOException e) {
+ throw new KafkaException("Failed read position from
the transaction index " + txnFile.path().toAbsolutePath(), e);
+ }
+ }
+
+ @Override
+ public AbortedTxnWithPosition next() {
+ try {
+ ByteBuffer buffer = allocate.get();
+ Utils.readFully(channel, buffer, position.value);
+ buffer.flip();
+
+ AbortedTxn abortedTxn = new AbortedTxn(buffer);
+ if (abortedTxn.version() > AbortedTxn.CURRENT_VERSION)
+ throw new KafkaException("Unexpected aborted
transaction version " + abortedTxn.version()
+ + " in transaction index " +
txnFile.path().toAbsolutePath() + ", current version is "
+ + AbortedTxn.CURRENT_VERSION);
+ AbortedTxnWithPosition nextEntry = new
AbortedTxnWithPosition(abortedTxn, position.value);
+ position.value += AbortedTxn.TOTAL_SIZE;
+ return nextEntry;
+ } catch (IOException e) {
+ // We received an unexpected error reading from the
index file. We propagate this as an
+ // UNKNOWN error to the consumer, which will cause it
to retry the fetch.
+ throw new KafkaException("Failed to read from the
transaction index " + txnFile.path().toAbsolutePath(), e);
+ }
+ }
- return () -> new Iterator<AbortedTxnWithPosition>() {
+ };
+
+ } catch (IOException e) {
+ throw new KafkaException("Failed to read from the transaction
index " + txnFile.path().toAbsolutePath(), e);
+ }
+ }
- @Override
- public boolean hasNext() {
- try {
- return channel.position() - position.value >=
AbortedTxn.TOTAL_SIZE;
- } catch (IOException e) {
- throw new KafkaException("Failed read position from the
transaction index " + file.getAbsolutePath(), e);
+ public static class TransactionIndexFile {
Review Comment:
Seems that by making it private, it can only be accessed from
TransactionIndex class but not tests or others on the same module.
Switching access to package-private and adding comment on visibility on last
commit.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]