[ 
https://issues.apache.org/jira/browse/KAFKA-8689?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16888983#comment-16888983
 ] 

Bill Bejeck commented on KAFKA-8689:
------------------------------------

One thing I forgot to mention "Joined.as" is used to name the join operator in 
the topology and is also used as the base name for any repartition topics 
created (if necessary to perform the join).  Hope this clears things up for you.

 

Thanks,

Bill

> Cannot Name Join State Store Topics
> -----------------------------------
>
>                 Key: KAFKA-8689
>                 URL: https://issues.apache.org/jira/browse/KAFKA-8689
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 2.3.0
>            Reporter: Simon Dean
>            Priority: Major
>
> Performing a join on two KStreams, produces two state store topics.  
> Currently the names state store topics are auto generated and cannot be 
> overridden. 
> Example code:
>  
> {code:java}
> import org.apache.kafka.clients.producer.KafkaProducer;
> import org.apache.kafka.clients.producer.ProducerConfig;
> import org.apache.kafka.clients.producer.ProducerRecord;
> import org.apache.kafka.common.serialization.LongSerializer;
> import org.apache.kafka.common.serialization.Serde;
> import org.apache.kafka.common.serialization.Serdes;
> import org.apache.kafka.common.serialization.StringSerializer;
> import org.apache.kafka.streams.KafkaStreams;
> import org.apache.kafka.streams.StreamsBuilder;
> import org.apache.kafka.streams.StreamsConfig;
> import org.apache.kafka.streams.kstream.Consumed;
> import org.apache.kafka.streams.kstream.JoinWindows;
> import org.apache.kafka.streams.kstream.Joined;
> import org.apache.kafka.streams.kstream.KStream;
> import java.time.Duration;
> import java.util.HashMap;
> import java.util.Map;
> import java.util.Properties;
> import java.util.concurrent.TimeUnit;
> public class JoinTopicNamesExample {
>     public static void main(final String[] args) throws InterruptedException {
>         new Thread(() -> {
>             produce(args);
>         }).run();
>         new Thread(() -> {
>             try {
>                 streams(args);
>             } catch (InterruptedException e) {
>                 e.printStackTrace();
>             }
>         }).run();
>     }
>     private static void produce(String[] args) {
>         Map<String, Object> props = new HashMap<>();
>         props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
>         props.put(ProducerConfig.RETRIES_CONFIG, 0);
>         props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
>         props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
>         props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
>         props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
> StringSerializer.class);
>         props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
> LongSerializer.class);
>         KafkaProducer<String, Long> producer = new KafkaProducer<>(props);
>         for (long i = 0; i < 10; i++) {
>             producer.send(new ProducerRecord("left", Long.toString(i), i));
>         }
>         for (long i = 0; i < 10; i++) {
>             producer.send(new ProducerRecord("right", Long.toString(i), i));
>         }
>     }
>     private static void streams(String[] args) throws InterruptedException {
>         final String bootstrapServers = args.length > 0 ? args[0] : 
> "localhost:9092";
>         final Properties streamsConfiguration = new Properties();
>         // Give the Streams application a unique name.  The name must be 
> unique in the Kafka cluster
>         // against which the application is run.
>         streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, 
> "join-topic-names-example");
>         streamsConfiguration.put(StreamsConfig.CLIENT_ID_CONFIG, 
> "user-region-lambda-example-client");
>         // Where to find Kafka broker(s).
>         streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
> bootstrapServers);
>         // Specify default (de)serializers for record keys and for record 
> values.
>         
> streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
> Serdes.String().getClass().getName());
>         
> streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
> Serdes.Long().getClass().getName());
>         // Records should be flushed every 10 seconds. This is less than the 
> default
>         // in order to keep this example interactive.
>         streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10 
> * 1000);
>         final Serde<String> stringSerde = Serdes.String();
>         final Serde<Long> longSerde = Serdes.Long();
>         final StreamsBuilder builder = new StreamsBuilder();
>         final KStream<String, Long> left = builder.stream("left", 
> Consumed.with(stringSerde, longSerde));
>         final KStream<String, Long> right = builder.stream("right", 
> Consumed.with(stringSerde, longSerde));
>         left.join(
>                 right,
>                 (value1, value2) -> value1 + value2,
>                 JoinWindows.of(Duration.ofHours(1)), 
>                 Joined.as("sum"));
>         final KafkaStreams streams = new KafkaStreams(builder.build(), 
> streamsConfiguration);
>         streams.start();
>         Thread.sleep(TimeUnit.MINUTES.toMillis(1));
>         // Add shutdown hook to respond to SIGTERM and gracefully close Kafka 
> Streams
>         Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
>     }
> }
> {code}
>  
>  
> Here are the topics produce by the example code:
>  * join-topic-names-example-KSTREAM-JOINOTHER-0000000005-store-changelog
>  * join-topic-names-example-KSTREAM-JOINTHIS-0000000004-store-changelog
>  * left
>  * right
> In the example code above, a material name is passed into the Join with 
> Joined.as("sum").  The "sum" name is ignored when the Kafka Stream decides on 
> the state store topic names. 



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)

Reply via email to