No, internal topics do not need to be manually created.

Eno
> On 13 Apr 2017, at 10:00, Shimi Kiviti <shim...@gmail.com> wrote:
> 
> Is that (manual topic creation) also true for internal topics?
> 
> On Thu, 13 Apr 2017 at 19:14 Matthias J. Sax <matth...@confluent.io> wrote:
> 
>> Hi,
>> 
>> thanks for reporting this issue. We are aware of a bug in 0.10.2 that
>> seems to be related: https://issues.apache.org/jira/browse/KAFKA-5037
>> 
>> However, I also want to point out, that it is highly recommended to not
>> use auto topic create for Streams, but to manually create all
>> input/output topics before you start your Streams application.
>> 
>> For more details, see
>> 
>> http://docs.confluent.io/current/streams/developer-guide.html#managing-topics-of-a-kafka-streams-application
>> 
>> 
>> May I ask, why your are using topic auto create?
>> 
>> 
>> -Matthias
>> 
>> 
>> On 4/11/17 1:09 PM, Dmitry Minkovsky wrote:
>>> I updated from 10.1 and 10.2. I updated both the broker and maven
>>> dependency.
>>> 
>>> I am using topic auto-create. With 10.1, starting the application with a
>>> broker would sometimes result in an error like:
>>> 
>>>> Exception in thread "StreamThread-1"
>>> org.apache.kafka.streams.errors.TopologyBuilderException: Invalid
>> topology
>>> building: stream-thread [StreamThread-1] Topic not found: $topic
>>> 
>>> But this would only happen once. Upon the second attempt, the topics are
>>> already created and everything works fine.
>>> 
>>> But with 10.2 this error does not go away. I have confirmed and tested
>> that
>>> auto topic creation is enabled.
>>> 
>>> Here is the error/trace:
>>> 
>>> 
>>> Exception in thread "StreamThread-1"
>>> org.apache.kafka.streams.errors.TopologyBuilderException: Invalid
>> topology
>>> building: stream-thread [StreamThread-1] Topic not found: session-updates
>>> at
>>> 
>> org.apache.kafka.streams.processor.internals.StreamPartitionAssignor$CopartitionedTopicsValidator.validate(StreamPartitionAssignor.java:734)
>>> at
>>> 
>> org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.ensureCopartitioning(StreamPartitionAssignor.java:648)
>>> at
>>> 
>> org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.assign(StreamPartitionAssignor.java:368)
>>> at
>>> 
>> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:339)
>>> at
>>> 
>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:488)
>>> at
>>> 
>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$1100(AbstractCoordinator.java:89)
>>> at
>>> 
>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:438)
>>> at
>>> 
>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:420)
>>> at
>>> 
>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:764)
>>> at
>>> 
>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:745)
>>> at
>>> 
>> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:186)
>>> at
>>> 
>> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:149)
>>> at
>>> 
>> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:116)
>>> at
>>> 
>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:493)
>>> at
>>> 
>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:322)
>>> at
>>> 
>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:253)
>>> at
>>> 
>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:172)
>>> at
>>> 
>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:334)
>>> at
>>> 
>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
>>> at
>>> 
>> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:286)
>>> at
>>> 
>> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1030)
>>> at
>>> 
>> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
>>> at
>>> 
>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:582)
>>> at
>>> 
>> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368)
>>> 
>>> 
>>> It does not occur if my topology only defines streams and tables.
>> However,
>>> when I attempt to join a stream and a table, this error is thrown:
>>> 
>>>        // No error if this is in topology
>>>        KTable<K, V> sessions = topology.table(byteStringSerde,
>>> sessionSerde, "sessions", "sessions");
>>> 
>>>        // No error if this is in topology
>>>        KStream<ByteString, Messages.EntityUpdate> sessionUpdates =
>>> topology.stream(byteStringSerde, sessionUpdateSerde, "session-updates");
>>> 
>>>        // Error if this is in topology
>>>        sessionUpdates
>>>          .leftJoin(sessions, (update, value) -> {
>>>              // do update, omitted
>>>          })
>>>          .filter((k, v) -> v != null)
>>>          .to(byteStringSerde, sessionSerde, "sessions");
>>> 
>> 
>> 

Reply via email to