[GitHub] [kafka] dima5rr commented on a change in pull request #8706: KAFKA-10030 allow fetching a key from a single partition

2020-05-29 Thread GitBox


dima5rr commented on a change in pull request #8706:
URL: https://github.com/apache/kafka/pull/8706#discussion_r432393459



##
File path: 
streams/src/test/java/org/apache/kafka/streams/state/internals/QueryableStoreProviderTest.java
##
@@ -88,5 +91,23 @@ public void shouldFindGlobalStores() {
 
assertNotNull(storeProvider.getStore(StoreQueryParameters.fromNameAndType("global",
 QueryableStoreTypes.keyValueStore(;
 }
 
+@Test
+public void shouldReturnKVStoreWithPartitionWhenItExists() {
+
assertNotNull(storeProvider.getStore(StoreQueryParameters.fromNameAndType(keyValueStore,
 QueryableStoreTypes.keyValueStore()).withPartition(numStateStorePartitions - 
1)));

Review comment:
   I think how to validate returned store reference, QueryableStoreProvider 
always wraps it in CompositeReadOnlyKeyValueStore?
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dima5rr commented on a change in pull request #8706: KAFKA-10030 allow fetching a key from a single partition

2020-05-28 Thread GitBox


dima5rr commented on a change in pull request #8706:
URL: https://github.com/apache/kafka/pull/8706#discussion_r431687179



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/QueryableStoreProvider.java
##
@@ -58,9 +58,21 @@ public QueryableStoreProvider(final 
List storePr
 }
 final List allStores = new ArrayList<>();
 for (final StreamThreadStateStoreProvider storeProvider : 
storeProviders) {
-allStores.addAll(storeProvider.stores(storeQueryParameters));
+final List stores = storeProvider.stores(storeQueryParameters);
+if (stores != null && !stores.isEmpty()) {
+allStores.addAll(stores);
+if (storeQueryParameters.partition() != null) {
+break;
+}
+}
 }
 if (allStores.isEmpty()) {
+if (storeQueryParameters.partition() != null) {

Review comment:
   I extended StateStoreProviderStub functionality to add store by 
partition with default partition 0, + relevant tests in 
QueryableStoreProviderTest





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dima5rr commented on a change in pull request #8706: KAFKA-10030 allow fetching a key from a single partition

2020-05-23 Thread GitBox


dima5rr commented on a change in pull request #8706:
URL: https://github.com/apache/kafka/pull/8706#discussion_r429567784



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java
##
@@ -296,6 +297,87 @@ public void shouldQuerySpecificStalePartitionStores() 
throws Exception {
 assertThat(store4.get(key), is(nullValue()));
 }
 
+@Test
+public void shouldQuerySpecificActivePartitionStoresMultiStreamThreads() 
throws Exception {
+final int batch1NumMessages = 100;
+final int key = 1;
+final Semaphore semaphore = new Semaphore(0);
+final int numStreamThreads = 2;
+
+final StreamsBuilder builder = new StreamsBuilder();
+builder.table(INPUT_TOPIC_NAME, Consumed.with(Serdes.Integer(), 
Serdes.Integer()),
+Materialized.>as(TABLE_NAME)
+.withCachingDisabled())
+.toStream()
+.peek((k, v) -> semaphore.release());
+
+final Properties streamsConfiguration1 = streamsConfiguration();
+streamsConfiguration1.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 
numStreamThreads);
+
+final Properties streamsConfiguration2 = streamsConfiguration();
+streamsConfiguration2.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 
numStreamThreads);
+
+final KafkaStreams kafkaStreams1 = createKafkaStreams(builder, 
streamsConfiguration1);
+final KafkaStreams kafkaStreams2 = createKafkaStreams(builder, 
streamsConfiguration2);
+final List kafkaStreamsList = 
Arrays.asList(kafkaStreams1, kafkaStreams2);
+
+startApplicationAndWaitUntilRunning(kafkaStreamsList, 
Duration.ofSeconds(60));
+
+assertTrue(numStreamThreads > 1);
+assertTrue(kafkaStreams1.localThreadsMetadata().size() > 1);
+assertTrue(kafkaStreams2.localThreadsMetadata().size() > 1);
+
+produceValueRange(key, 0, batch1NumMessages);
+
+// Assert that all messages in the first batch were processed in a 
timely manner
+assertThat(semaphore.tryAcquire(batch1NumMessages, 60, 
TimeUnit.SECONDS), is(equalTo(true)));
+final KeyQueryMetadata keyQueryMetadata = 
kafkaStreams1.queryMetadataForKey(TABLE_NAME, key, new IntegerSerializer());
+
+//key belongs to this partition
+final int keyPartition = keyQueryMetadata.getPartition();
+
+//key doesn't belongs to this partition
+final int keyDontBelongPartition = (keyPartition == 0) ? 1 : 0;
+final boolean kafkaStreams1IsActive = 
(keyQueryMetadata.getActiveHost().port() % 2) == 1;
+
+StoreQueryParameters> 
storeQueryParam =
+StoreQueryParameters.>fromNameAndType(TABLE_NAME, QueryableStoreTypes.keyValueStore())
+.withPartition(keyPartition);
+ReadOnlyKeyValueStore store1 = null;
+ReadOnlyKeyValueStore store2 = null;
+if (kafkaStreams1IsActive) {
+store1 = IntegrationTestUtils.getStore(kafkaStreams1, 
storeQueryParam);
+} else {
+store2 = IntegrationTestUtils.getStore(kafkaStreams2, 
storeQueryParam);
+}
+
+if (kafkaStreams1IsActive) {
+assertThat(store1, is(notNullValue()));
+assertThat(store2, is(nullValue()));
+} else {
+assertThat(store2, is(notNullValue()));
+assertThat(store1, is(nullValue()));
+}
+
+// Assert that only active for a specific requested partition serves 
key if stale stores and not enabled

Review comment:
   correct, changed test to check both store types





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dima5rr commented on a change in pull request #8706: KAFKA-10030 allow fetching a key from a single partition

2020-05-21 Thread GitBox


dima5rr commented on a change in pull request #8706:
URL: https://github.com/apache/kafka/pull/8706#discussion_r428899383



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java
##
@@ -332,6 +332,7 @@ private Properties streamsConfiguration() {
 config.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 200);
 config.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 1000);
 config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
+config.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2);

Review comment:
   Added dedicated test





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org