[jira] [Commented] (KAFKA-3544) Missing topics on startup
[ https://issues.apache.org/jira/browse/KAFKA-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15552065#comment-15552065 ] Mykola Polonskyi commented on KAFKA-3544: - [~guozhang] greetings! My comment a bit late but I catch error with similar stack-trace (like Greg), with using of KGroupedTable#aggregation and the intermediate topic. kafka streams 0.10.0.1 {code:kotlin} private val streamId = "STREAM_USER_WITH_SKICARD" connectionProperties.put(StreamsConfig.APPLICATION_ID_CONFIG, streamId) val skicardByOwnerIdTopicName = "skicards_by_owner_id" userTable.join( skicardsTable.groupBy { key, value -> KeyValue(value.skicardInfo.ownerId, value.skicardInfo) }.aggregate( { mutableSetOf() }, // accumulator definition { ownerIdString, skicardInfoObject, accumulator -> accumulator.put(skicardInfo) }, { ownerIdString, skicardInfoObject, accumulator -> accumulator }, //XXX fix this one skicardByOwnerIdSerde, skicardByOwnerIdTopicName ), { userCreatedOrUpdated, skicardInfoSet -> UserWithSkicardsCreatedOrUpdated(userCreatedOrUpdated.user, skicardInfoSet) } ).to( Serdes.StringSerde(), userWithSkicardsSerde, USER_WITH_SKICARDS_TOPIC.name ) {code} And inside kafka got appeared topic "STREAM_USER_WITH_SKICARD-skicards_by_owner_id-repartition" And then incide the debug I see that code try to retrieve meta-info (StreamPartitionAssignor.java:446) for topic skicards_by_owner_id-repartition instead "STREAM_USER_WITH_SKICARD-skicards_by_owner_id-repartition". Is that bug that you spoke about? p.s. I`m not sure should I report it as new ticket that`s why provide the additional comment here. > Missing topics on startup > - > > Key: KAFKA-3544 > URL: https://issues.apache.org/jira/browse/KAFKA-3544 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.0.0 >Reporter: Greg Fodor >Assignee: Guozhang Wang > Labels: semantics > > When running a relatively complex job with multiple tasks and state stores, > on the first run I get errors due to some of the intermediate topics not > existing. Subsequent runs work OK. My assumption is streams may be creating > topics lazily, so if downstream tasks are initializing before their parents > have had a chance to create their necessary topics then the children will > attempt to start consuming from topics that do not exist yet. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3544) Missing topics on startup
[ https://issues.apache.org/jira/browse/KAFKA-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15240723#comment-15240723 ] Greg Fodor commented on KAFKA-3544: --- Ah this makes sense. Fortunately thanks to the other responses you've given I've been able to refactor my job to use through() in this case, so it will end up not having this problem once the relevant updates are made to through(). I'll close the ticket. > Missing topics on startup > - > > Key: KAFKA-3544 > URL: https://issues.apache.org/jira/browse/KAFKA-3544 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.0.0 >Reporter: Greg Fodor >Assignee: Guozhang Wang > Labels: semantics > > When running a relatively complex job with multiple tasks and state stores, > on the first run I get errors due to some of the intermediate topics not > existing. Subsequent runs work OK. My assumption is streams may be creating > topics lazily, so if downstream tasks are initializing before their parents > have had a chance to create their necessary topics then the children will > attempt to start consuming from topics that do not exist yet. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3544) Missing topics on startup
[ https://issues.apache.org/jira/browse/KAFKA-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15238307#comment-15238307 ] Guozhang Wang commented on KAFKA-3544: -- The topics that Kafka Streams reads as sources or writes as sinks, or more explicitly, {code} builder.stream("topic1"); builder.table("topic2"); stream / table.through("topic3"); stream / table.to("topic4"); {code} In these cases, currently topic1/2/3/4 are considered "external" to the Kafka Streams, which are not auto-created by the library as other changelog / re-partition for aggregation, and users are supposed to create these themselves. It's just that by default, Kafka brokers will auto-create the topics when it receives metadata requests from the clients (in this case, from Kafka Streams) and depending on the timing it may succeed, but some times it may now. As I mentioned before, we plan to auto-embed "through" for repartitioning before joining, and hence those topics will be considered as internal topics as well, but for now they are out of the library "black box". > Missing topics on startup > - > > Key: KAFKA-3544 > URL: https://issues.apache.org/jira/browse/KAFKA-3544 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.0.0 >Reporter: Greg Fodor >Assignee: Guozhang Wang > Labels: semantics > > When running a relatively complex job with multiple tasks and state stores, > on the first run I get errors due to some of the intermediate topics not > existing. Subsequent runs work OK. My assumption is streams may be creating > topics lazily, so if downstream tasks are initializing before their parents > have had a chance to create their necessary topics then the children will > attempt to start consuming from topics that do not exist yet. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3544) Missing topics on startup
[ https://issues.apache.org/jira/browse/KAFKA-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15236512#comment-15236512 ] Greg Fodor commented on KAFKA-3544: --- Not sure of the best way to share the topology. Here's the relevant part of the code: {code} builder .stream(Serdes.Long(), userSpaceBroadcastSerde, "positron-db-user_space_broadcasts") .map((id, broadcast) -> KeyValue.pair(broadcast.getUserId().toString(), broadcast)) .to(Serdes.String(), userSpaceBroadcastSerde, "user_space_broadcasts-user_id"); KTableuserSpaceBroadcastsByUserId = builder .stream(Serdes.String(), userSpaceBroadcastSerde, "user_space_broadcasts-user_id") .aggregateByKey(...); {code} In this example userSpaceBroadcastSerde is a Serde for a custom avro type. I'm basically pivoting the first stream onto a foreign key and then creating a KTable off of that output by tapping it and then aggregating. (Given our discussions on other tickets there may be a way to simplify this, but I wanted to capture it as-is for this report.) The topology is failing to build on the user_space_broadcasts-user_id topic: {code} Exception in thread "StreamThread-1" org.apache.kafka.streams.errors.TopologyBuilderException: Invalid topology building: External source topic not found: room_operation_message_incoming-user_id at org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.ensureCopartitioning(StreamPartitionAssignor.java:435) at org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.ensureCopartitioning(StreamPartitionAssignor.java:423) at org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.assign(StreamPartitionAssignor.java:200) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:233) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:385) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$700(AbstractCoordinator.java:80) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:339) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:321) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:659) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:638) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:659) [0/1952] at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:638) at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167) at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133) at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:381) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:279) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:321) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:214) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:220) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ensurePartitionAssignment(ConsumerCoordinator.java:317) at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:898) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:860) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:325) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:248) 2016-04-12 02:57:36 StreamThread [INFO] Stream thread shutdown complete [StreamThread-2] Exception in thread
[jira] [Commented] (KAFKA-3544) Missing topics on startup
[ https://issues.apache.org/jira/browse/KAFKA-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15236324#comment-15236324 ] Guozhang Wang commented on KAFKA-3544: -- Thanks for reporting this. Sounds like a bug as we actually create all the intermediate topics (for repartitioning and changelog) at the initialization phase. Could you paste your stack trace and your topology skeleton for me to do further investigation? > Missing topics on startup > - > > Key: KAFKA-3544 > URL: https://issues.apache.org/jira/browse/KAFKA-3544 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.0.0 >Reporter: Greg Fodor >Assignee: Guozhang Wang > > When running a relatively complex job with multiple tasks and state stores, > on the first run I get errors due to some of the intermediate topics not > existing. Subsequent runs work OK. My assumption is streams may be creating > topics lazily, so if downstream tasks are initializing before their parents > have had a chance to create their necessary topics then the children will > attempt to start consuming from topics that do not exist yet. -- This message was sent by Atlassian JIRA (v6.3.4#6332)