[GitHub] [kafka] mjsax commented on a change in pull request #8496: KAFKA-9748: Add Streams eos-beta integration test

2020-04-30 Thread GitBox


mjsax commented on a change in pull request #8496:
URL: https://github.com/apache/kafka/pull/8496#discussion_r418209285



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java
##
@@ -1064,14 +1071,16 @@ private void addAllKeys(final Set allKeys, final 
List
 
 // must be public to allow KafkaProducer to instantiate it
 public static class KeyPartitioner implements Partitioner {
+private final static LongDeserializer LONG_DESERIALIZER = new 
LongDeserializer();
+
 @Override
 public int partition(final String topic,
  final Object key,
  final byte[] keyBytes,
  final Object value,
  final byte[] valueBytes,
  final Cluster cluster) {
-return ((Long) key).intValue() % NUM_TOPIC_PARTITIONS;
+return LONG_DESERIALIZER.deserialize(topic, keyBytes).intValue() % 
NUM_TOPIC_PARTITIONS;

Review comment:
   I don't think so. The original impl (just for the upstream producer to 
write into the input topics) was:
   ```
   return ((Long) key).intValue() % NUM_TOPIC_PARTITIONS;
   ```
   
   However, this assumes that `key` is of type `Long` what is not true when 
used within streams, because Streams does serialize all data upfront and `key` 
and `value` type is `byte[]` -- thus, we need to deserialize  to get the 
original key object.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #8496: KAFKA-9748: Add Streams eos-beta integration test

2020-04-29 Thread GitBox


mjsax commented on a change in pull request #8496:
URL: https://github.com/apache/kafka/pull/8496#discussion_r417740290



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java
##
@@ -0,0 +1,1123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Partitioner;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.IsolationLevel;
+import org.apache.kafka.common.errors.ProducerFencedException;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.LongSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StoreQueryParameters;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.Transformer;
+import org.apache.kafka.streams.kstream.TransformerSupplier;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.QueryableStoreTypes;
+import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
+import org.apache.kafka.streams.state.StoreBuilder;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.StreamsTestUtils;
+import org.apache.kafka.test.TestUtils;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+
+import static org.apache.kafka.common.utils.Utils.mkSet;
+import static org.apache.kafka.test.TestUtils.waitForCondition;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.fail;
+
+@RunWith(Parameterized.class)
+@Category({IntegrationTest.class})
+public class EosBetaUpgradeIntegrationTest {
+
+@Parameterized.Parameters(name = "{0}")
+public static Collection data() {
+return Arrays.asList(new Boolean[][] {
+{false},
+{true}
+});
+}
+
+@Parameterized.Parameter
+public boolean injectError;
+
+private static final int NUM_BROKERS = 3;
+private static final int MAX_POLL_INTERVAL_MS = 100 * 1000;
+private static final int MAX_WAIT_TIME_MS = 60 * 1000;
+
+private static final List> TWO_REBALANCES_STARTUP =
+Collections.unmodifiableList(
+Arrays.asList(
+KeyValue.pair(KafkaStreams.State.CREATED, 
KafkaStreams.State.REBALANCING),
+KeyValue.pair(KafkaStreams.State.REBALANCING, 
KafkaStreams.State.RUNNING),
+KeyValue.pair(KafkaStreams.State.RUNNING, 
KafkaStreams.State.REBALANCING),
+

[GitHub] [kafka] mjsax commented on a change in pull request #8496: KAFKA-9748: Add Streams eos-beta integration test

2020-04-29 Thread GitBox


mjsax commented on a change in pull request #8496:
URL: https://github.com/apache/kafka/pull/8496#discussion_r417739843



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java
##
@@ -0,0 +1,1123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Partitioner;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.IsolationLevel;
+import org.apache.kafka.common.errors.ProducerFencedException;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.LongSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StoreQueryParameters;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.Transformer;
+import org.apache.kafka.streams.kstream.TransformerSupplier;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.QueryableStoreTypes;
+import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
+import org.apache.kafka.streams.state.StoreBuilder;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.StreamsTestUtils;
+import org.apache.kafka.test.TestUtils;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+
+import static org.apache.kafka.common.utils.Utils.mkSet;
+import static org.apache.kafka.test.TestUtils.waitForCondition;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.fail;
+
+@RunWith(Parameterized.class)
+@Category({IntegrationTest.class})
+public class EosBetaUpgradeIntegrationTest {
+
+@Parameterized.Parameters(name = "{0}")
+public static Collection data() {
+return Arrays.asList(new Boolean[][] {
+{false},
+{true}
+});
+}
+
+@Parameterized.Parameter
+public boolean injectError;
+
+private static final int NUM_BROKERS = 3;
+private static final int MAX_POLL_INTERVAL_MS = 100 * 1000;
+private static final int MAX_WAIT_TIME_MS = 60 * 1000;
+
+private static final List> TWO_REBALANCES_STARTUP =
+Collections.unmodifiableList(
+Arrays.asList(
+KeyValue.pair(KafkaStreams.State.CREATED, 
KafkaStreams.State.REBALANCING),
+KeyValue.pair(KafkaStreams.State.REBALANCING, 
KafkaStreams.State.RUNNING),
+KeyValue.pair(KafkaStreams.State.RUNNING, 
KafkaStreams.State.REBALANCING),
+

[GitHub] [kafka] mjsax commented on a change in pull request #8496: KAFKA-9748: Add Streams eos-beta integration test

2020-04-29 Thread GitBox


mjsax commented on a change in pull request #8496:
URL: https://github.com/apache/kafka/pull/8496#discussion_r417739443



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java
##
@@ -0,0 +1,1123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Partitioner;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.IsolationLevel;
+import org.apache.kafka.common.errors.ProducerFencedException;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.LongSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StoreQueryParameters;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.Transformer;
+import org.apache.kafka.streams.kstream.TransformerSupplier;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.QueryableStoreTypes;
+import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
+import org.apache.kafka.streams.state.StoreBuilder;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.StreamsTestUtils;
+import org.apache.kafka.test.TestUtils;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+
+import static org.apache.kafka.common.utils.Utils.mkSet;
+import static org.apache.kafka.test.TestUtils.waitForCondition;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.fail;
+
+@RunWith(Parameterized.class)
+@Category({IntegrationTest.class})
+public class EosBetaUpgradeIntegrationTest {
+
+@Parameterized.Parameters(name = "{0}")
+public static Collection data() {
+return Arrays.asList(new Boolean[][] {
+{false},
+{true}
+});
+}
+
+@Parameterized.Parameter
+public boolean injectError;
+
+private static final int NUM_BROKERS = 3;
+private static final int MAX_POLL_INTERVAL_MS = 100 * 1000;
+private static final int MAX_WAIT_TIME_MS = 60 * 1000;
+
+private static final List> TWO_REBALANCES_STARTUP =
+Collections.unmodifiableList(
+Arrays.asList(
+KeyValue.pair(KafkaStreams.State.CREATED, 
KafkaStreams.State.REBALANCING),
+KeyValue.pair(KafkaStreams.State.REBALANCING, 
KafkaStreams.State.RUNNING),
+KeyValue.pair(KafkaStreams.State.RUNNING, 
KafkaStreams.State.REBALANCING),
+

[GitHub] [kafka] mjsax commented on a change in pull request #8496: KAFKA-9748: Add Streams eos-beta integration test

2020-04-29 Thread GitBox


mjsax commented on a change in pull request #8496:
URL: https://github.com/apache/kafka/pull/8496#discussion_r417739068



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java
##
@@ -0,0 +1,1123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Partitioner;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.IsolationLevel;
+import org.apache.kafka.common.errors.ProducerFencedException;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.LongSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StoreQueryParameters;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.Transformer;
+import org.apache.kafka.streams.kstream.TransformerSupplier;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.QueryableStoreTypes;
+import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
+import org.apache.kafka.streams.state.StoreBuilder;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.StreamsTestUtils;
+import org.apache.kafka.test.TestUtils;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+
+import static org.apache.kafka.common.utils.Utils.mkSet;
+import static org.apache.kafka.test.TestUtils.waitForCondition;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.fail;
+
+@RunWith(Parameterized.class)
+@Category({IntegrationTest.class})
+public class EosBetaUpgradeIntegrationTest {
+
+@Parameterized.Parameters(name = "{0}")
+public static Collection data() {
+return Arrays.asList(new Boolean[][] {
+{false},
+{true}
+});
+}
+
+@Parameterized.Parameter
+public boolean injectError;
+
+private static final int NUM_BROKERS = 3;
+private static final int MAX_POLL_INTERVAL_MS = 100 * 1000;
+private static final int MAX_WAIT_TIME_MS = 60 * 1000;
+
+private static final List> TWO_REBALANCES_STARTUP =
+Collections.unmodifiableList(
+Arrays.asList(
+KeyValue.pair(KafkaStreams.State.CREATED, 
KafkaStreams.State.REBALANCING),
+KeyValue.pair(KafkaStreams.State.REBALANCING, 
KafkaStreams.State.RUNNING),
+KeyValue.pair(KafkaStreams.State.RUNNING, 
KafkaStreams.State.REBALANCING),
+

[GitHub] [kafka] mjsax commented on a change in pull request #8496: KAFKA-9748: Add Streams eos-beta integration test

2020-04-29 Thread GitBox


mjsax commented on a change in pull request #8496:
URL: https://github.com/apache/kafka/pull/8496#discussion_r417738801



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java
##
@@ -0,0 +1,1123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Partitioner;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.IsolationLevel;
+import org.apache.kafka.common.errors.ProducerFencedException;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.LongSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StoreQueryParameters;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.Transformer;
+import org.apache.kafka.streams.kstream.TransformerSupplier;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.QueryableStoreTypes;
+import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
+import org.apache.kafka.streams.state.StoreBuilder;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.StreamsTestUtils;
+import org.apache.kafka.test.TestUtils;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+
+import static org.apache.kafka.common.utils.Utils.mkSet;
+import static org.apache.kafka.test.TestUtils.waitForCondition;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.fail;
+
+@RunWith(Parameterized.class)
+@Category({IntegrationTest.class})
+public class EosBetaUpgradeIntegrationTest {
+
+@Parameterized.Parameters(name = "{0}")
+public static Collection data() {
+return Arrays.asList(new Boolean[][] {
+{false},
+{true}
+});
+}
+
+@Parameterized.Parameter
+public boolean injectError;
+
+private static final int NUM_BROKERS = 3;
+private static final int MAX_POLL_INTERVAL_MS = 100 * 1000;
+private static final int MAX_WAIT_TIME_MS = 60 * 1000;
+
+private static final List> TWO_REBALANCES_STARTUP =
+Collections.unmodifiableList(
+Arrays.asList(
+KeyValue.pair(KafkaStreams.State.CREATED, 
KafkaStreams.State.REBALANCING),
+KeyValue.pair(KafkaStreams.State.REBALANCING, 
KafkaStreams.State.RUNNING),
+KeyValue.pair(KafkaStreams.State.RUNNING, 
KafkaStreams.State.REBALANCING),
+

[GitHub] [kafka] mjsax commented on a change in pull request #8496: KAFKA-9748: Add Streams eos-beta integration test

2020-04-29 Thread GitBox


mjsax commented on a change in pull request #8496:
URL: https://github.com/apache/kafka/pull/8496#discussion_r417738371



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java
##
@@ -1064,14 +1071,16 @@ private void addAllKeys(final Set allKeys, final 
List
 
 // must be public to allow KafkaProducer to instantiate it
 public static class KeyPartitioner implements Partitioner {
+private final static LongDeserializer LONG_DESERIALIZER = new 
LongDeserializer();
+
 @Override
 public int partition(final String topic,
  final Object key,
  final byte[] keyBytes,
  final Object value,
  final byte[] valueBytes,
  final Cluster cluster) {
-return ((Long) key).intValue() % NUM_TOPIC_PARTITIONS;
+return LONG_DESERIALIZER.deserialize(topic, keyBytes).intValue() % 
NUM_TOPIC_PARTITIONS;

Review comment:
   This is just needed to make the partitioner work for writing into input 
topics and to use within KS to write into output topic.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #8496: KAFKA-9748: Add Streams eos-beta integration test

2020-04-29 Thread GitBox


mjsax commented on a change in pull request #8496:
URL: https://github.com/apache/kafka/pull/8496#discussion_r417738193



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java
##
@@ -881,6 +882,10 @@ public void close() { }
 
properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG),
 5 * 1000);
 
properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG),
 5 * 1000 - 1);
 
properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG),
 MAX_POLL_INTERVAL_MS);
+// TODO
+//   if we don't use this custom partitioner the test fails for the 
non-error case
+//   unclear why -- see other TODO
+
properties.put(StreamsConfig.producerPrefix(ProducerConfig.PARTITIONER_CLASS_CONFIG),
 KeyPartitioner.class);

Review comment:
   This is the workaround that make the test (clean run) pass. For the 
error injection run, the test passed w/ and w/o this partitioner.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #8496: KAFKA-9748: Add Streams eos-beta integration test

2020-04-29 Thread GitBox


mjsax commented on a change in pull request #8496:
URL: https://github.com/apache/kafka/pull/8496#discussion_r417738073



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java
##
@@ -388,6 +388,7 @@ public void shouldUpgradeFromEosAlphaToEosBeta() throws 
Exception {
 
 final List> expectedCommittedResult =
 
computeExpectedResult(committedInputDataDuringFirstUpgrade, committedState);
+// TODO: if we don't use the custom partitioner, the test 
hangs here until TX times out and is aborted
 verifyCommitted(expectedCommittedResult);

Review comment:
   The test fails here... (cf. over TODO)





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #8496: KAFKA-9748: Add Streams eos-beta integration test

2020-04-29 Thread GitBox


mjsax commented on a change in pull request #8496:
URL: https://github.com/apache/kafka/pull/8496#discussion_r417737952



##
File path: streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
##
@@ -1283,9 +1284,6 @@ private void verifyMaxInFlightRequestPerConnection(final 
Object maxInFlightReque
 // add client id with stream client id prefix
 props.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId);
 
-// Reduce the transaction timeout for quicker pending offset 
expiration on broker side.
-props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 1);

Review comment:
   Minor bug: this value was hard coded and it was not possible to 
overwrite it.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org