Hello Erwin,

Thanks for reporting this issue. I'd agree with you that root cause is not
very clearly explained in the WARN log entries, which makes users very
confused when debugging such scenarios. More specifically:

When you are letting two sub-topologies to access the same store, Streams
will actually merge them into a single topology since we do not allow state
stores to be concurrently accessed by multiple threads, and by merging them
into a single topology we make sure that only one thread will process it at
a given time.
You can look for
`builder.internalTopologyBuilder.connectProcessorAndStateStores(name,
stateStoreNames);` internally for the details.

So when you add the second sub-topology, what happens effectively is that
Streams will "enlarge" the first topology with another source topic
"unknown-in" and the corresponding processor nodes; and within
StreamsPartitionAssignor, we need to decide how many tasks to create for
this topology, but since one of the source topics's number of partitions is
not known yet we will skip this step and not assigning any tasks for this
topology, you should see the following entry in logs indicating this is the
case:

Skipping assigning topic {} to tasks since its metadata is not available yet



We can indeed improve the log4j situations to make it more clear to users
when this happens.


Guozhang


On Mon, Apr 16, 2018 at 7:10 PM, Erwin Joost BOLWIDT <erwinbolw...@dbs.com>
wrote:

> Hi,
>
> It took me a long time to figure the problem out, and I think it may be a
> bug in kafka-streams, or at least something that could have a better log
> warning message.
>
> I’m using the high-level API. I get errors of the form “Partition XXX is
> not assigned to any tasks: {}”, even though I was using that topic. In my
> original code, I was using KTables, but I managed to reproduce the problem
> in the code below with simple KStream processing.
>
> I have two seemingly independent subtopologies. The only shared element is
> that they share a Store. (In my original code, one topology is reading a
> topic with update messages that get applied to the store, while the other
> topology uses the data in the store to classify input data)
>
> Before I added the second subtopology, the first subtopology worked fine.
> After adding the second subtopology, I started getting the dreaded
> “Partition XXX is not assigned to any tasks: {}” for the topic of the first
> subtopology.
> As the second subtopology was a new feature (I had already populated the
> store in another way), I had not yet created the input topic for the second
> subtopology on my integration environment. It turns out that this is the
> reason that the first subtopology fails. Because they share the same store,
> somehow the subtopologies interact.
>
> Below is a snippet from my log file, running the bug reporting code that I
> posted below it.
> The reproduce the problem using the bug reporting code, ensure that the
> topics “number-in” and “number-out” exist, but “unknown-in” doesn’t exist.
>
> Once the problem is clear, it is very easy to fix it: create the
> “unknown-in” topic, and the code runs fine. But the reported warnings make
> no reference to the real underlying problem.
>
> I’ve tested the below code with kafka-streams version 1.0.1 on JDK8
> 1.8.0_132.
>
> Best Regards.
> Erwin Bolwidt
>
> 09:56:46.245 WARN  o.a.k.s.p.i.StreamPartitionAssignor - stream-thread
> [FooBar-1eab35bd-5198-4d24-8ce4-9f1ac9687764-StreamThread-1-consumer] No
> partitions found for topic unknown-in
> 09:56:46.246 WARN  o.a.k.s.p.i.StreamPartitionAssignor - stream-thread
> [FooBar-1eab35bd-5198-4d24-8ce4-9f1ac9687764-StreamThread-1-consumer]
> Partition number-in-0 is not assigned to any tasks: {}
> 09:56:46.247 INFO  o.a.k.s.p.i.StreamPartitionAssignor - stream-thread
> [FooBar-1eab35bd-5198-4d24-8ce4-9f1ac9687764-StreamThread-1-consumer]
> Assigned tasks to clients as 
> {1eab35bd-5198-4d24-8ce4-9f1ac9687764=[activeTasks:
> ([]) standbyTasks: ([]) assignedTasks: ([]) prevActiveTasks: ([])
> prevAssignedTasks: ([0_0, 1_0]) capacity: 1]}.
> 09:56:46.248 WARN  o.a.k.c.c.i.ConsumerCoordinator - [Consumer
> clientId=FooBar-1eab35bd-5198-4d24-8ce4-9f1ac9687764-StreamThread-1-consumer,
> groupId=FooBar] The following subscribed topics are not assigned to any
> members: [number-in]
>
>
> Code that reproduces the problem:
>
> package bugreport;
>
> import java.util.HashMap;
> import java.util.Map;
>
> import org.apache.kafka.clients.consumer.ConsumerConfig;
> import org.apache.kafka.common.serialization.Serdes;
> import org.apache.kafka.streams.Consumed;
> import org.apache.kafka.streams.KafkaStreams;
> import org.apache.kafka.streams.KeyValue;
> import org.apache.kafka.streams.StreamsBuilder;
> import org.apache.kafka.streams.StreamsConfig;
> import org.apache.kafka.streams.Topology;
> import org.apache.kafka.streams.kstream.KStream;
> import org.apache.kafka.streams.kstream.Produced;
> import org.apache.kafka.streams.kstream.Transformer;
> import org.apache.kafka.streams.kstream.TransformerSupplier;
> import org.apache.kafka.streams.processor.ProcessorContext;
> import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
> import org.apache.kafka.streams.state.KeyValueStore;
> import org.apache.kafka.streams.state.StoreBuilder;
> import org.apache.kafka.streams.state.Stores;
>
> public class BugReportTwoTopologiesInteractionThroughStore {
>     private static final String STORE_NAME = "store";
>
>     static class TransformerAdapter<K, V, R> implements Transformer<K, V,
> R> {
>         @Override
>         public void init(ProcessorContext context) {
>         }
>
>         @Override
>         public void close() {
>         }
>
>         @Override
>         public R punctuate(long timestamp) {
>             return null;
>         }
>
>         @Override
>         public R transform(K key, V value) {
>             return null;
>         }
>     }
>
>     public static Map<String, Object> kafkaStreamsConfiguration() {
>         Map<String, Object> properties = new HashMap<>();
>         properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "
> 10.91.132.98:9092");
>         properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "FooBar");
>         properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
>         properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
> "earliest");
>         return properties;
>     }
>
>     public static void constructTopology(StreamsBuilder builder, String
> topicIn, String topicOut, int base) {
>         KStream<Long, Long> stream = builder.stream(topicIn,
> Consumed.with(Serdes.Long(), Serdes.Long()));
>         stream = stream.transform(new TransformerSupplier<Long, Long,
> KeyValue<Long, Long>>() {
>             @Override
>             public Transformer<Long, Long, KeyValue<Long, Long>> get() {
>                 return new TransformerAdapter<Long, Long, KeyValue<Long,
> Long>>() {
>                     @Override
>                     public KeyValue<Long, Long> transform(Long key, Long
> value) {
>                         return new KeyValue<Long, Long>(key, base - value);
>                     }
>                 };
>             }
>         }, STORE_NAME);
>         stream.to(topicOut, Produced.with(Serdes.Long(), Serdes.Long()));
>     }
>
>     public static void main(String[] args) {
>         StreamsBuilder builder = new StreamsBuilder();
>         KeyValueBytesStoreSupplier storeSupplier = Stores.
> persistentKeyValueStore(STORE_NAME);
>         StoreBuilder<KeyValueStore<String, String>> storeBuilder =
> Stores.keyValueStoreBuilder(storeSupplier,
>                 Serdes.String(), Serdes.String());
>         builder.addStateStore(storeBuilder);
>         constructTopology(builder, "number-in", "number-out", 10);
>         constructTopology(builder, "unknown-in", "unknown-out", 20);
>
>         Topology topology = builder.build();
>         KafkaStreams streams = new KafkaStreams(topology, new
> StreamsConfig(kafkaStreamsConfiguration()));
>         streams.start();
>     }
> }
>
> CONFIDENTIAL NOTE:
> The information contained in this email is intended only for the use of
> the individual or entity named above and may contain information that is
> privileged, confidential and exempt from disclosure under applicable law.
> If the reader of this message is not the intended recipient, you are hereby
> notified that any dissemination, distribution or copying of this
> communication is strictly prohibited. If you have received this message in
> error, please immediately notify the sender and delete the mail. Thank you.
>



-- 
-- Guozhang

Reply via email to