[GitHub] [kafka] dima5rr commented on a change in pull request #8706: KAFKA-10030 allow fetching a key from a single partition
dima5rr commented on a change in pull request #8706: URL: https://github.com/apache/kafka/pull/8706#discussion_r432393459 ## File path: streams/src/test/java/org/apache/kafka/streams/state/internals/QueryableStoreProviderTest.java ## @@ -88,5 +91,23 @@ public void shouldFindGlobalStores() { assertNotNull(storeProvider.getStore(StoreQueryParameters.fromNameAndType("global", QueryableStoreTypes.keyValueStore(; } +@Test +public void shouldReturnKVStoreWithPartitionWhenItExists() { + assertNotNull(storeProvider.getStore(StoreQueryParameters.fromNameAndType(keyValueStore, QueryableStoreTypes.keyValueStore()).withPartition(numStateStorePartitions - 1))); Review comment: I think how to validate returned store reference, QueryableStoreProvider always wraps it in CompositeReadOnlyKeyValueStore? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dima5rr commented on a change in pull request #8706: KAFKA-10030 allow fetching a key from a single partition
dima5rr commented on a change in pull request #8706: URL: https://github.com/apache/kafka/pull/8706#discussion_r431687179 ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/QueryableStoreProvider.java ## @@ -58,9 +58,21 @@ public QueryableStoreProvider(final List storePr } final List allStores = new ArrayList<>(); for (final StreamThreadStateStoreProvider storeProvider : storeProviders) { -allStores.addAll(storeProvider.stores(storeQueryParameters)); +final List stores = storeProvider.stores(storeQueryParameters); +if (stores != null && !stores.isEmpty()) { +allStores.addAll(stores); +if (storeQueryParameters.partition() != null) { +break; +} +} } if (allStores.isEmpty()) { +if (storeQueryParameters.partition() != null) { Review comment: I extended StateStoreProviderStub functionality to add store by partition with default partition 0, + relevant tests in QueryableStoreProviderTest This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dima5rr commented on a change in pull request #8706: KAFKA-10030 allow fetching a key from a single partition
dima5rr commented on a change in pull request #8706: URL: https://github.com/apache/kafka/pull/8706#discussion_r429567784 ## File path: streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java ## @@ -296,6 +297,87 @@ public void shouldQuerySpecificStalePartitionStores() throws Exception { assertThat(store4.get(key), is(nullValue())); } +@Test +public void shouldQuerySpecificActivePartitionStoresMultiStreamThreads() throws Exception { +final int batch1NumMessages = 100; +final int key = 1; +final Semaphore semaphore = new Semaphore(0); +final int numStreamThreads = 2; + +final StreamsBuilder builder = new StreamsBuilder(); +builder.table(INPUT_TOPIC_NAME, Consumed.with(Serdes.Integer(), Serdes.Integer()), +Materialized.>as(TABLE_NAME) +.withCachingDisabled()) +.toStream() +.peek((k, v) -> semaphore.release()); + +final Properties streamsConfiguration1 = streamsConfiguration(); +streamsConfiguration1.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numStreamThreads); + +final Properties streamsConfiguration2 = streamsConfiguration(); +streamsConfiguration2.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numStreamThreads); + +final KafkaStreams kafkaStreams1 = createKafkaStreams(builder, streamsConfiguration1); +final KafkaStreams kafkaStreams2 = createKafkaStreams(builder, streamsConfiguration2); +final List kafkaStreamsList = Arrays.asList(kafkaStreams1, kafkaStreams2); + +startApplicationAndWaitUntilRunning(kafkaStreamsList, Duration.ofSeconds(60)); + +assertTrue(numStreamThreads > 1); +assertTrue(kafkaStreams1.localThreadsMetadata().size() > 1); +assertTrue(kafkaStreams2.localThreadsMetadata().size() > 1); + +produceValueRange(key, 0, batch1NumMessages); + +// Assert that all messages in the first batch were processed in a timely manner +assertThat(semaphore.tryAcquire(batch1NumMessages, 60, TimeUnit.SECONDS), is(equalTo(true))); +final KeyQueryMetadata keyQueryMetadata = kafkaStreams1.queryMetadataForKey(TABLE_NAME, key, new IntegerSerializer()); + +//key belongs to this partition +final int keyPartition = keyQueryMetadata.getPartition(); + +//key doesn't belongs to this partition +final int keyDontBelongPartition = (keyPartition == 0) ? 1 : 0; +final boolean kafkaStreams1IsActive = (keyQueryMetadata.getActiveHost().port() % 2) == 1; + +StoreQueryParameters> storeQueryParam = +StoreQueryParameters.>fromNameAndType(TABLE_NAME, QueryableStoreTypes.keyValueStore()) +.withPartition(keyPartition); +ReadOnlyKeyValueStore store1 = null; +ReadOnlyKeyValueStore store2 = null; +if (kafkaStreams1IsActive) { +store1 = IntegrationTestUtils.getStore(kafkaStreams1, storeQueryParam); +} else { +store2 = IntegrationTestUtils.getStore(kafkaStreams2, storeQueryParam); +} + +if (kafkaStreams1IsActive) { +assertThat(store1, is(notNullValue())); +assertThat(store2, is(nullValue())); +} else { +assertThat(store2, is(notNullValue())); +assertThat(store1, is(nullValue())); +} + +// Assert that only active for a specific requested partition serves key if stale stores and not enabled Review comment: correct, changed test to check both store types This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dima5rr commented on a change in pull request #8706: KAFKA-10030 allow fetching a key from a single partition
dima5rr commented on a change in pull request #8706: URL: https://github.com/apache/kafka/pull/8706#discussion_r428899383 ## File path: streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java ## @@ -332,6 +332,7 @@ private Properties streamsConfiguration() { config.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 200); config.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 1000); config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100); +config.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2); Review comment: Added dedicated test This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org