Hi, hard to diagnose. The new consumer should not affect the Streams app though -- even if I am wondering why you need it.
> KafkaConsumer (with a UUID as group.id) that reads some historical data from
> input topic
Maybe using GlobalKTable instead might be a better solution?
> (i.e. I feed 1000 records into source topic and receive around 200 on the
> target topic)
Are this the first 200 records? Or are those 200 record "random ones"
from your input topic? How many partitions do you have for input/output
topic?
> looks like a lot of rebalancing happens.
We recommend to change StreamsConfig values as follows to improve in
rebalance behavior:
> props.put(ProducerConfig.RETRIES_CONFIG, 10); <---- increase to 10 from
> default of 0
> props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG,
> Integer.toString(Integer.MAX_VALUE)); <--------- increase to infinity from
> default of 300 s
We will change the default values accordingly in future release but for
now you should set this manually.
Hope this helps.
-Matthias
On 4/24/17 10:01 AM, Andreas Voss wrote:
> Hi, I have a simple streaming app that copies data from one topic to another,
> so when I feed 1000 records into source topic I receive 1000 records in the
> target topic. Also the app contains a transform() step which does nothing,
> except instantiating a KafkaConsumer (with a UUID as group.id) that reads
> some historical data from input topic. As soon as this consumer is in place,
> the streaming app does not work anymore, records get lost (i.e. I feed 1000
> records into source topic and receive around 200 on the target topic) and
> it's terribly slow - looks like a lot of rebalancing happens.
>
> To me this looks like a bug, because the KStreamBuilder uses the application
> id as group.id ("kafka-smurfing" in this case), and the transformer uses a
> different one (uuid).
>
> Full source code:
>
> public class Main {
>
> public static final String BOOTSTRAP_SERVERS = "192.168.99.100:9092";
> public static final String SOURCE_TOPIC = "transactions";
> public static final String TARGET_TOPIC = "alerts";
>
> public static void main(String[] args) throws Exception {
>
> KStreamBuilder builder = new KStreamBuilder();
> builder.stream(Serdes.String(), Serdes.String(), SOURCE_TOPIC)
> .transform(() -> new DebugTransformer())
> .to(Serdes.String(), Serdes.String(), TARGET_TOPIC);
>
> Properties props = new Properties();
> props.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-smurfing");
> props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, Main.BOOTSTRAP_SERVERS);
> props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
> Serdes.String().getClass().getName());
> props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
> Serdes.String().getClass().getName());
> props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2);
> props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
>
> KafkaStreams streams = new KafkaStreams(builder, props);
> streams.start();
> Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
>
> }
>
> }
>
> public class DebugTransformer implements Transformer<String, String,
> KeyValue<String, String>> {
>
> private KafkaConsumer<String, String> consumer;
> private ProcessorContext context;
>
> @Override
> public void init(ProcessorContext context) {
> this.context = context;
> Properties props = new Properties();
> props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
> Main.BOOTSTRAP_SERVERS);
> props.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
> props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none");
> props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
> props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
> StringDeserializer.class.getName());
> props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
> StringDeserializer.class.getName());
> consumer = new KafkaConsumer<>(props);
> }
>
> @Override
> public KeyValue<String, String> transform(String key, String value) {
> TopicPartition partition = new TopicPartition(Main.SOURCE_TOPIC,
> context.partition());
> consumer.assign(Arrays.asList(partition));
> consumer.seek(partition, 0);
> consumer.poll(100);
> return KeyValue.pair(key, value);
> }
>
> @Override
> public void close() {
> consumer.close();
> }
>
> @Override
> public KeyValue<String, String> punctuate(long timestamp) {
> return null;
> }
>
> }
>
> Thanks for any hints,
> Andreas
>
> This email and any files transmitted with it are confidential, proprietary
> and intended solely for the individual or entity to whom they are addressed.
> If you have received this email in error please delete it immediately.
>
signature.asc
Description: OpenPGP digital signature
