I am not familiar with Spring Boot. But in general, you could query the store to see if the counts are as expected: https://kafka.apache.org/22/documentation/streams/developer-guide/interactive-queries.html
As an alternative, you could inspect either the store changelog topic, or get a stream from the table .count().toStream().print(...); // for development Instead of `print()` you could also use `peek()` or `foreach()`. You could also write the result stream into an output topic via `to()` -Matthias On 5/9/19 1:23 AM, Pavel Molchanov wrote: > Hi, > > I created a simple Spring Boot Application with Kafka and added a > dependency from Kafka Streams. The application can send and receive > messages and works fine. > > In the same app, I want to use Kafka Streams to calculate statistic about > the topic. > > I wrote the following code from the word count example: > > @Service > public class StartupService { > > @Value(value = "${kafka.events.topic.name}") > private String eventsTopicName; > > @Value(value = "${kafka.bootstrap.servers}") > private String bootstrapAddress; > > @PostConstruct > public void init() { > Properties streamsConfiguration = new Properties(); > streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "test"); > streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, > "localhost:9092"); > streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, > Serdes.String().getClass().getName()); > streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, > Serdes.String().getClass().getName()); > streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, > System.getProperty("java.io.tmpdir")+"\\count"); > StreamsBuilder builder = new StreamsBuilder(); > KStream<String, String> textLines = builder.stream(eventsTopicName); > Pattern pattern = Pattern.compile("\\W+", Pattern.UNICODE_CHARACTER_CLASS); > > KTable<String, Long> wordCounts = textLines > .flatMapValues(value -> Arrays.asList(pattern.split(value.toLowerCase()))) > .groupBy((key, word) -> word) > .count(); > Topology topology = builder.build(); > KafkaStreams streams = new KafkaStreams(topology, streamsConfiguration); > Runtime.getRuntime().addShutdownHook(new Thread(streams::close)); > streams.setUncaughtExceptionHandler((Thread thread, Throwable throwable) -> > { > System.out.println("Exception in thread:" + thread.getId() + ", > Message:" + throwable.getMessage()); > }); > streams.cleanUp(); > streams.start(); > try { > Thread.sleep(20000); > } catch (InterruptedException e) { > // TODO Auto-generated catch block > e.printStackTrace(); > } > // Check the result here: > > } > > } > > How can I check the result of the calculation? The last lines in the log > file are: > > 2019-05-08 19:19:36.091 INFO 18380 --- [-StreamThread-1] > o.a.k.c.c.internals.AbstractCoordinator : [Consumer > clientId=test-a2d0314e-c872-469e-b34c-c08e2fdf422a-StreamThread-1-consumer, > groupId=test] (Re-)joining group > 2019-05-08 19:19:36.163 INFO 18380 --- [-StreamThread-1] > o.a.k.s.p.i.StreamsPartitionAssignor : stream-thread > [test-a2d0314e-c872-469e-b34c-c08e2fdf422a-StreamThread-1-consumer] > Assigned tasks to clients as > {a2d0314e-c872-469e-b34c-c08e2fdf422a=[activeTasks: ([0_0, 1_0]) > standbyTasks: ([]) assignedTasks: ([0_0, 1_0]) prevActiveTasks: ([]) > prevStandbyTasks: ([]) prevAssignedTasks: ([]) capacity: 1]}. > 2019-05-08 19:19:36.205 INFO 18380 --- [-StreamThread-1] > o.a.k.c.c.internals.AbstractCoordinator : [Consumer > clientId=test-a2d0314e-c872-469e-b34c-c08e2fdf422a-StreamThread-1-consumer, > groupId=test] Successfully joined group with generation 21 > 2019-05-08 19:19:36.209 INFO 18380 --- [-StreamThread-1] > o.a.k.c.c.internals.ConsumerCoordinator : [Consumer > clientId=test-a2d0314e-c872-469e-b34c-c08e2fdf422a-StreamThread-1-consumer, > groupId=test] Setting newly assigned partitions [events-0, > test-KSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition-0] > 2019-05-08 19:19:36.209 INFO 18380 --- [-StreamThread-1] > o.a.k.s.p.internals.StreamThread : stream-thread > [test-a2d0314e-c872-469e-b34c-c08e2fdf422a-StreamThread-1] State transition > from PARTITIONS_REVOKED to PARTITIONS_ASSIGNED > 2019-05-08 19:19:36.234 INFO 18380 --- [-StreamThread-1] > o.a.k.s.p.internals.StreamThread : stream-thread > [test-a2d0314e-c872-469e-b34c-c08e2fdf422a-StreamThread-1] partition > assignment took 25 ms. > current active tasks: [0_0, 1_0] > current standby tasks: [] > previous active tasks: [] > > 2019-05-08 19:19:36.635 INFO 18380 --- [-StreamThread-1] > org.apache.kafka.clients.Metadata : Cluster ID: > I6GfqZSORWGKqPHE7zA_cQ > 2019-05-08 19:19:36.659 INFO 18380 --- [-StreamThread-1] > o.a.k.s.p.i.StoreChangelogReader : stream-thread > [test-a2d0314e-c872-469e-b34c-c08e2fdf422a-StreamThread-1] Restoring task > 1_0's state store KSTREAM-AGGREGATE-STATE-STORE-0000000003 from beginning > of the changelog test-KSTREAM-AGGREGATE-STATE-STORE-0000000003-changelog-0 > 2019-05-08 19:19:36.668 INFO 18380 --- [-StreamThread-1] > o.a.k.c.consumer.internals.Fetcher : [Consumer > clientId=test-a2d0314e-c872-469e-b34c-c08e2fdf422a-StreamThread-1-restore-consumer, > groupId=] Resetting offset for partition > test-KSTREAM-AGGREGATE-STATE-STORE-0000000003-changelog-0 to offset 0. > 2019-05-08 19:19:36.852 INFO 18380 --- [-StreamThread-1] > o.a.k.s.p.internals.StreamThread : stream-thread > [test-a2d0314e-c872-469e-b34c-c08e2fdf422a-StreamThread-1] State transition > from PARTITIONS_ASSIGNED to RUNNING > 2019-05-08 19:19:36.853 INFO 18380 --- [-StreamThread-1] > org.apache.kafka.streams.KafkaStreams : stream-client > [test-a2d0314e-c872-469e-b34c-c08e2fdf422a] State transition from > REBALANCING to RUNNING > > It's staying in the RUNNING state and doesn't move forward. What I am doing > wrong? > > > > *Pavel Molchanov* >
signature.asc
Description: OpenPGP digital signature