chirag-wadhwa5 commented on code in PR #16456: URL: https://github.com/apache/kafka/pull/16456#discussion_r1668342690
########## core/src/test/scala/unit/kafka/server/KafkaApisTest.scala: ########## @@ -4390,6 +4401,2215 @@ class KafkaApisTest extends Logging { assertEquals("broker2", node.host) } + private def expectedAcquiredRecords(firstOffset : Long, lastOffset : Long, deliveryCount : Int) : util.List[AcquiredRecords] = { + val acquiredRecordsList : util.List[AcquiredRecords] = new util.ArrayList() + acquiredRecordsList.add(new AcquiredRecords() + .setFirstOffset(firstOffset) + .setLastOffset(lastOffset) + .setDeliveryCount(deliveryCount.toShort)) + acquiredRecordsList + } + + private def memoryRecordsBuilder(numOfRecords : Int, startOffset : Long) : MemoryRecordsBuilder = { + + val buffer: ByteBuffer = ByteBuffer.allocate(1024) + val compression: Compression = Compression.of(CompressionType.NONE).build() + val timestampType: TimestampType = TimestampType.CREATE_TIME + + val builder: MemoryRecordsBuilder = MemoryRecords.builder(buffer, compression, timestampType, startOffset) + for (i <- 0 until numOfRecords) { + builder.appendWithOffset(startOffset + i, 0L, TestUtils.randomBytes(10), TestUtils.randomBytes(10)) + } + builder + } + + private def memoryRecords(numOfRecords : Int, startOffset : Long) : MemoryRecords = { + memoryRecordsBuilder(numOfRecords, startOffset).build() + } + + @Test + def testHandleShareFetchRequestSuccessWithoutAcknowledgements(): Unit = { + val topicName = "foo" + val topicId = Uuid.randomUuid() + val partitionIndex = 0 + metadataCache = MetadataCache.kRaftMetadataCache(brokerId) + addTopicToMetadataCache(topicName, 1, topicId = topicId) + val memberId : Uuid = Uuid.ZERO_UUID + + val shareSessionEpoch = 0 + + val records = memoryRecords(10, 0) + + val sharePartitionManager : SharePartitionManager = mock(classOf[SharePartitionManager]) + + when(sharePartitionManager.fetchMessages(any(), any(), any(), any(), any())).thenReturn( + CompletableFuture.completedFuture(Map[TopicIdPartition, ShareFetchResponseData.PartitionData]( + new TopicIdPartition(topicId, new TopicPartition(topicName, partitionIndex)) -> + new ShareFetchResponseData.PartitionData() + .setErrorCode(Errors.NONE.code) + .setAcknowledgeErrorCode(Errors.NONE.code) + .setRecords(records) + .setAcquiredRecords(new util.ArrayList(List( + new ShareFetchResponseData.AcquiredRecords() + .setFirstOffset(0) + .setLastOffset(9) + .setDeliveryCount(1) + ).asJava)) Review Comment: Thanks for the review. Actually yes it is required, since the List() is a scala list. There are other ways that do not use asJava, but they span over multiple lines, and would require new variable initialisations, reducing the code readability. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org