[jira] [Commented] (KAFKA-3544) Missing topics on startup

2016-10-06 Thread Mykola Polonskyi (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15552065#comment-15552065
 ] 

Mykola Polonskyi commented on KAFKA-3544:
-

[~guozhang] greetings!
My comment a bit late but I catch error with similar stack-trace (like Greg), 
with using of KGroupedTable#aggregation and the intermediate topic.
kafka streams 0.10.0.1
{code:kotlin}
private val streamId = "STREAM_USER_WITH_SKICARD"
connectionProperties.put(StreamsConfig.APPLICATION_ID_CONFIG, streamId)
val skicardByOwnerIdTopicName = "skicards_by_owner_id"
userTable.join(
skicardsTable.groupBy {
key, value -> KeyValue(value.skicardInfo.ownerId, value.skicardInfo)
}.aggregate(
{ mutableSetOf() }, // accumulator definition
{ ownerIdString, skicardInfoObject, accumulator -> 
accumulator.put(skicardInfo) },
{ ownerIdString, skicardInfoObject, accumulator -> 
accumulator }, //XXX fix this one
skicardByOwnerIdSerde,
skicardByOwnerIdTopicName
),
{ userCreatedOrUpdated, skicardInfoSet -> 
UserWithSkicardsCreatedOrUpdated(userCreatedOrUpdated.user, skicardInfoSet) }
).to(
Serdes.StringSerde(),
userWithSkicardsSerde,
USER_WITH_SKICARDS_TOPIC.name
)
{code}

And inside kafka got appeared topic 
"STREAM_USER_WITH_SKICARD-skicards_by_owner_id-repartition"

And then incide the debug I see that code try to retrieve meta-info 
(StreamPartitionAssignor.java:446) for topic skicards_by_owner_id-repartition 
instead "STREAM_USER_WITH_SKICARD-skicards_by_owner_id-repartition".

Is that bug that you spoke about? 
p.s. I`m not sure should I report it as new ticket that`s why provide the 
additional comment here. 

> Missing topics on startup
> -
>
> Key: KAFKA-3544
> URL: https://issues.apache.org/jira/browse/KAFKA-3544
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.0.0
>Reporter: Greg Fodor
>Assignee: Guozhang Wang
>  Labels: semantics
>
> When running a relatively complex job with multiple tasks and state stores, 
> on the first run I get errors due to some of the intermediate topics not 
> existing. Subsequent runs work OK. My assumption is streams may be creating 
> topics lazily, so if downstream tasks are initializing before their parents 
> have had a chance to create their necessary topics then the children will 
> attempt to start consuming from topics that do not exist yet.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3544) Missing topics on startup

2016-04-14 Thread Greg Fodor (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15240723#comment-15240723
 ] 

Greg Fodor commented on KAFKA-3544:
---

Ah this makes sense. Fortunately thanks to the other responses you've given 
I've been able to refactor my job to use through() in this case, so it will end 
up not having this problem once the relevant updates are made to through(). 
I'll close the ticket.

> Missing topics on startup
> -
>
> Key: KAFKA-3544
> URL: https://issues.apache.org/jira/browse/KAFKA-3544
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.0.0
>Reporter: Greg Fodor
>Assignee: Guozhang Wang
>  Labels: semantics
>
> When running a relatively complex job with multiple tasks and state stores, 
> on the first run I get errors due to some of the intermediate topics not 
> existing. Subsequent runs work OK. My assumption is streams may be creating 
> topics lazily, so if downstream tasks are initializing before their parents 
> have had a chance to create their necessary topics then the children will 
> attempt to start consuming from topics that do not exist yet.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3544) Missing topics on startup

2016-04-12 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15238307#comment-15238307
 ] 

Guozhang Wang commented on KAFKA-3544:
--

The topics that Kafka Streams reads as sources or writes as sinks, or more 
explicitly,

{code}
builder.stream("topic1");
builder.table("topic2");

stream / table.through("topic3");
stream / table.to("topic4");
{code}

In these cases, currently topic1/2/3/4 are considered "external" to the Kafka 
Streams, which are not auto-created by the library as other changelog / 
re-partition for aggregation, and users are supposed to create these 
themselves. It's just that by default, Kafka brokers will auto-create the 
topics when it receives metadata requests from the clients (in this case, from 
Kafka Streams) and depending on the timing it may succeed, but some times it 
may now.

As I mentioned before, we plan to auto-embed "through" for repartitioning 
before joining, and hence those topics will be considered as internal topics as 
well, but for now they are out of the library "black box".

> Missing topics on startup
> -
>
> Key: KAFKA-3544
> URL: https://issues.apache.org/jira/browse/KAFKA-3544
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.0.0
>Reporter: Greg Fodor
>Assignee: Guozhang Wang
>  Labels: semantics
>
> When running a relatively complex job with multiple tasks and state stores, 
> on the first run I get errors due to some of the intermediate topics not 
> existing. Subsequent runs work OK. My assumption is streams may be creating 
> topics lazily, so if downstream tasks are initializing before their parents 
> have had a chance to create their necessary topics then the children will 
> attempt to start consuming from topics that do not exist yet.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3544) Missing topics on startup

2016-04-11 Thread Greg Fodor (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15236512#comment-15236512
 ] 

Greg Fodor commented on KAFKA-3544:
---

Not sure of the best way to share the topology. Here's the relevant part of the 
code:
{code}
builder
.stream(Serdes.Long(), userSpaceBroadcastSerde, 
"positron-db-user_space_broadcasts")
.map((id, broadcast) -> 
KeyValue.pair(broadcast.getUserId().toString(), broadcast))
.to(Serdes.String(), userSpaceBroadcastSerde, 
"user_space_broadcasts-user_id");

KTable userSpaceBroadcastsByUserId 
= builder
.stream(Serdes.String(), userSpaceBroadcastSerde, 
"user_space_broadcasts-user_id")
.aggregateByKey(...);
{code}

In this example userSpaceBroadcastSerde is a Serde for a custom avro type. I'm 
basically pivoting the first stream onto a foreign key and then creating a 
KTable off of that output by tapping it and then aggregating. (Given our 
discussions on other tickets there may be a way to simplify this, but I wanted 
to capture it as-is for this report.)

The topology is failing to build on the user_space_broadcasts-user_id topic:

{code}
Exception in thread "StreamThread-1" 
org.apache.kafka.streams.errors.TopologyBuilderException: Invalid topology 
building: External source topic not found: 
room_operation_message_incoming-user_id
at 
org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.ensureCopartitioning(StreamPartitionAssignor.java:435)
at 
org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.ensureCopartitioning(StreamPartitionAssignor.java:423)
at 
org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.assign(StreamPartitionAssignor.java:200)
at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:233)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:385)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$700(AbstractCoordinator.java:80)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:339)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:321)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:659)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:638)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:659)
   [0/1952]
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:638)
at 
org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167)
at 
org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
at 
org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:381)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:279)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:321)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:214)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:220)
at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ensurePartitionAssignment(ConsumerCoordinator.java:317)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:898)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:860)
at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:325)
at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:248)
2016-04-12 02:57:36 StreamThread [INFO] Stream thread shutdown complete 
[StreamThread-2]
Exception in thread 

[jira] [Commented] (KAFKA-3544) Missing topics on startup

2016-04-11 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15236324#comment-15236324
 ] 

Guozhang Wang commented on KAFKA-3544:
--

Thanks for reporting this. Sounds like a bug as we actually create all the 
intermediate topics (for repartitioning and changelog) at the initialization 
phase. Could you paste your stack trace and your topology skeleton for me to do 
further investigation?

> Missing topics on startup
> -
>
> Key: KAFKA-3544
> URL: https://issues.apache.org/jira/browse/KAFKA-3544
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.0.0
>Reporter: Greg Fodor
>Assignee: Guozhang Wang
>
> When running a relatively complex job with multiple tasks and state stores, 
> on the first run I get errors due to some of the intermediate topics not 
> existing. Subsequent runs work OK. My assumption is streams may be creating 
> topics lazily, so if downstream tasks are initializing before their parents 
> have had a chance to create their necessary topics then the children will 
> attempt to start consuming from topics that do not exist yet.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)