Hello Mathieu,

I think this issue is fixed in trunk but may get missed in the 0.10.0
branch, could you try running your program from trunk to verify if it is
the case? If yes we can consider backportting the hotfix from trunk to
0.10.0 and have another bug fix release.


Guozhang

On Wed, Aug 10, 2016 at 7:32 AM, Mathieu Fenniak <
mathieu.fenn...@replicon.com> wrote:

> Hey there, Kafka Users,
>
> I'm trying to join two topics with Kafka Streams.  The first topic is a
> changelog of one object, and the second is a changelog of a related
> object.  In order to join these tables, I'm grouping the second table by a
> piece of data in it that indicates what record it is related to in the
> first table.  But I'm getting an unexpected error related to the
> repartitioning topic for the aggregated table:
>
> org.apache.kafka.streams.errors.TopologyBuilderException: Invalid topology
> building: External source topic not found:
> *TableNumber2Aggregated-repartition*
> at
> org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.
> ensureCopartitioning(StreamPartitionAssignor.java:452)
> at
> org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.
> ensureCopartitioning(StreamPartitionAssignor.java:440)
>
>
> (Full exception:
> https://gist.github.com/mfenniak/11ca081191932fbb33a0c3cc32ad1686)
>
> It appears that the "TableNumber2Aggregated-repartition" topic *is*
> created
> in Kafka by the streams application, but the Kafka topic has a prefix that
> matches my application id (timesheet-status).  Perhaps something is
> prefixing the topic name, but it isn't being applied everywhere?
>
> $ ./kafka-topics.sh --zookeeper localhost --list
> TableNumber1
> TableNumber2
> __consumer_offsets
> timesheet-status-TableNumber2Aggregated-repartition
>
>
> Here's a sample that reproduces the issue (note, I've cut out all the
> actual mapping, grouping, and aggregating logic, but, this still reproduces
> the error):
>
>     public static TopologyBuilder createTopology() {
>         KStreamBuilder builder = new KStreamBuilder();
>
>         KTable table1Mapped = builder.table(Serdes.String(), new
> JsonSerde(Map.class), "TableNumber1")
>                 .mapValues((value) -> null);
>
>         KTable table2Aggregated = builder.table(Serdes.String(), new
> JsonSerde(Map.class), "TableNumber2")
>             .groupBy((key, value) -> null)
>             .aggregate(() -> null, (k, v, t) -> null, (k, v, t) -> null,
> new JsonSerde(Map.class), "TableNumber2Aggregated");
>
>         table1Mapped.join(table2Aggregated, (left, right) -> {
>             LOG.debug("join");
>             return null;
>         });
>
>         return builder;
>     }
>
> I'm using the latest Kafka Streams release, 0.10.0.1.  Any thoughts on how
> I could proceed to debug or workaround this?
>
> Thanks all,
>
> Mathieu
>



-- 
-- Guozhang

Reply via email to