Hello Bart,

Thanks for your detailed explanation. I saw your motivation now and it
indeed validates itself as a single application that dynamically change
subscriptions.

As I mentioned Streams today do not have a native support for dynamically
changing subscriptions. That being said, If you would not care about the
offsets then there may be one sub-optimal workaround: you can have a
two-stage pipeline to separate topic consumption with processing, where the
first stage is a very simple "Pipe" app (e.g.
https://github.com/apache/kafka/blob/trunk/streams/quickstart/java/src/main/resources/archetype-resources/src/main/java/Pipe.java)
which is very lightweight in resource consumption. Each app pipes from a
topic to the processing topic which aggregates all the topics that you want
to track in the second stage, i.e. the processing stage.

Your processing app only reads from this aggregated source topic, while
your piping apps pipe each input topic to the aggregate topic. You can read
from the global "command" topic to 1) auto generate the code with the
source topic swapped with the specified string, compile and execute the
code, or 2) shutdown an existing program piping from a topic. This will
admittedly introduce a duplicate topic containing the aggregated data, but
operational-wise may still be simpler.


Guozhang


On Mon, Aug 14, 2017 at 2:47 AM, Bart Vercammen <b...@cloutrix.com> wrote:

> Hi Guozhang,
>
> For the use-cases I have in mind, the offset of the source topics is
> irrelevant to the state stored by the streams application.
> So, when topic 'A' gets dropped and topic 'B' is added,  I would prefer the
> application to start reading from 'latest' but that is actually not *that*
> important to me.
> The main thing is that I'm able somehow to add a new kafka topic to the
> source of the streams application at runtime, triggered by messages flowing
> on another "metadata" kafka topic.
>
> So considering your example:
> When 'B' is added, I would expect the streams application to start reading
> from 'latest' (but not that important)
> When 'A' is removed, the state from 'A' is still valid in the state stores,
> but 'A' should not be tracked anymore.
> When 'A' is added again, I would expect the streams application to start
> reading from 'latest' (not the committed offset, but again not that
> important for my use-case)
>
> But this being said, my main focus is on the ability to 'add' new kafka
> topics to the application rather than removing them.
> What I could do is define a wildcard subscription on all topics.  This
> would update dynamically then, but the problem here is that I will run
> *all* topics through the application which is major overkill and would make
> it unperformant (especially as there are topics in there that produce a lot
> of data that should not be tracked, but will pass through the streams
> application then).  Let's say from the 300 kafka topics on the system,
> about 50 of them need to be tracked by the application.
> We have the ability to mark these topics through the "metadata" topic, so
> it would be nice that this could also trigger updating the source-pattern
> for the subscriptions in the kafka streams application.
>
> The problem with multiple applications is the following:
> - the "state" should be centralized, as it can be queried (having multiple
> applications would make it more difficult to achieve this)
> - multiple applications will required more resources to be reserved on the
> cluster
> - We need an external tool to start/stop the streams applications,
> depending on the info on the metadata topic.
>
> Greets,
> Bart
>
>
> On Mon, Aug 14, 2017 at 3:03 AM, Guozhang Wang <wangg...@gmail.com> wrote:
>
> > Hello Bart,
> >
> > Before we talk about dynamic subscription in Streams I'd like to ask some
> > more questions about your usage.
> >
> > Let's say the current running application is reading from topic A, and
> have
> > just reached offset 100; now from the global state there is a
> notification
> > saying "stop on topic A, start on topic B". How would you start fetching
> > from topic B? Would you start at the earliest offset or latest offset (so
> > it is totally irrelevant to the point you stopped at topic A), or would
> you
> > start at offset 100? In addition, if your application maintains some
> local
> > state, how would the state be affected by the switching of the topic,
> since
> > at that point it is actually reflecting "the state of the application up
> to
> > topic A's offset 100", could you still reuse that state with topic B at
> the
> > specified offset?
> >
> > To make things a bit more complicated, if later another notification from
> > the global state has arrived saying "now switch back to topic A again",
> > would you restart at where you stopped, i.e. offset 100, or you'll stop
> at
> > the earliest or latest offset of topic A, are the application states
> still
> > reusable?
> >
> > I think if your answer to these questions are "no", then you'd better
> treat
> > them as different applications, i.e. one app reading from A, and one app
> > reading from B but with similar topology. It's just that based on the
> meta
> > topic the apps may be stopped / restarted dynamically. Currently Kafka
> > Streams do not have support for dynamic support yet, since lots of such
> > request scenarios turned out to be not really fit to be collapsed into a
> > single applications; if there is indeed a common request, I think one way
> > to do that is to make PartitionAssignor customizable by users as you
> > suggested, so that only the selected partitions are used to re-form the
> > tasks; but one still need some way to trigger a rebalance so that the
> > PartitionAssignor can be called.
> >
> >
> > Guozhang
> >
> > On Fri, Aug 11, 2017 at 1:42 AM, Bart Vercammen <b...@cloutrix.com>
> wrote:
> >
> > > Hi,
> > >
> > > I have a question basically on how it would be the best way to
> implement
> > > something within Kafka Streams.  The thing I would like to do:
> > "dynamically
> > > update the subscription pattern of the source topics.
> > >
> > > The reasoning behind this (in my project):
> > > meta data about the source topics is evented on an other kafka topic,
> > that
> > > should be tracked by the kafka streams topology, and depending on that
> > meta
> > > data specific source topics should be added, or removed from the kafka
> > > streams topology.
> > >
> > > Currently I track the "meta data topic" as "global state", so that
> every
> > > processor can actually access it to fetch the meta data (this meta data
> > for
> > > instance also describes whether or not a specific topic pattern should
> be
> > > tracked by the stream processor) - so consider this as some kind of
> > > "configuration" stream about the source topics.
> > >
> > > So now it comes,
> > > Is there any way I could (from a running topology) update the kafka
> > > consumer subscriptions?
> > > So that I'm able to replace the source topic pattern while the topology
> > is
> > > running?
> > >
> > > I don't think there currently is a way to do this, but as under the
> hood
> > it
> > > is just a kafka consumer, my believe is that it should be possible
> > somehow
> > > ...
> > >
> > > I was thinking about the PartitionAssigner ... if I could get my hands
> on
> > > that one, maybe I could dynamically configure it to only allow specific
> > > topic-patterns?
> > > Or directly alter the subscription on the underlying consumer?
> > >
> > > I don't know all the nifty details about the Kafka Streams internals,
> so
> > it
> > > would be nice if someone could direct me in the right direction to
> > achieve
> > > this ...
> > >
> > > Thanks,
> > > Bart
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>



-- 
-- Guozhang

Reply via email to