Hi Henry,

Kafka Streams scales differently and does not support having the same 
application ID subscribe to different topics for scale-out. The way we support 
scaling out if you want to use the same application id is through partitions, 
i.e., Kafka Streams automatically assigns partitions to your multiple 
instances. If you want to scale out using topics you'll need to use different 
application IDs.

So in a nutshell this pattern is not supported. Was there a reason you needed 
to do it like that? 

Thanks
Eno

> On 28 Apr 2017, at 11:41, Henry Thacker <he...@henrythacker.com> wrote:
> 
> Should also add - there are definitely live incoming messages on both input
> topics when my streams are running. The auto offset reset config is set to
> "earliest" and because the input data streams are quite large (several
> millions records each), I set a relatively small max poll records (200) so
> we don't run into heartbeating issues if we restart intraday.
> 
> Thanks,
> Henry
> 
> -- 
> Henry Thacker
> 
> On 28 April 2017 at 11:37:53, Henry Thacker (he...@henrythacker.com) wrote:
> 
>> Hi Eno,
>> 
>> Thanks for your reply - the code that builds the topology is something
>> like this (I don't have email and the code access on the same machine
>> unfortunately - so might not be 100% accurate / terribly formatted!).
>> 
>> The stream application is a simple verifier which stores a tiny bit of
>> state in a state store. The processor is custom and only has logic in
>> init() to store the context and retrieve the store and process(...) to
>> validate the incoming messages and forward these on when appropriate.
>> 
>> There is no joining, aggregates or windowing.
>> 
>> In public static void main:
>> 
>> String topic = args[0];
>> String output = args[1];
>> 
>> KStreamBuilder builder = new KStreamBuilder();
>> 
>> StateStoreSupplier stateStore =
>> Stores.create("mystore").withStringKeys().withByteArrayValues().persistent().build();
>> 
>> KStream<Bytes, Bytes> stream = builder.stream(topic);
>> 
>> builder.addStateStore(stateStore);
>> 
>> stream.process(this::buildStreamProcessor, "mystore");
>> 
>> stream.to(outputTopic);
>> 
>> KafkaStreams streams = new KafkaStreams(builder, getProps());
>> streams.setUncaughtExceptionHandler(...);
>> streams.start();
>> 
>> Thanks,
>> Henry
>> 
>> 
>> On 28 April 2017 at 11:26:07, Eno Thereska (eno.there...@gmail.com) wrote:
>> 
>>> Hi Henry,
>>> 
>>> Could you share the code that builds your topology so we see how the
>>> topics are passed in? Also, this would depend on what the streaming logic
>>> is doing with the topics, e.g., if you're joining them then both partitions
>>> need to be consumed by the same instance.
>>> 
>>> Eno
>>> 
>>> On 28 Apr 2017, at 11:01, Henry Thacker <he...@henrythacker.com> wrote:
>>> 
>>> Hi,
>>> 
>>> I'm using Kafka 0.10.0.1 and Kafka streams. When I have two different
>>> processes, Consumer 1 and 2. They both share the same application ID, but
>>> subscribe for different single-partition topics. Only one stream consumer
>>> receives messages.
>>> 
>>> The non working stream consumer just sits there logging:
>>> 
>>> Starting stream thread [StreamThread-1]
>>> Discovered coordinator <Host> (Id: ...) for group my-streamer
>>> Revoking previously assigned partitions [] for group my-streamer
>>> (Re-)joining group my-streamer
>>> Successfully joined group my-streamer with generation 3
>>> Setting newly assigned partitions [] for group my-streamer
>>> (Re-)joining group my-streamer
>>> Successfully joined group my-streamer with generation 4
>>> 
>>> If I was trying to subscribe to the same topic & partition I could
>>> understand this behaviour, but given that the subscriptions are for
>>> different input topics, I would have thought this should work?
>>> 
>>> Thanks,
>>> Henry
>>> 
>>> --
>>> Henry Thacker
>>> 
>>> 
>>> 

Reply via email to