Re: Log retention just for offset topic

2016-06-29 Thread Sathyakumar Seshachalam
Or may be am wrong, and Log cleaner only picks up topics with a
cleanup.policy.
>From the documentation it is not very obvious what the behaviour is.

On Thu, Jun 30, 2016 at 10:33 AM, Sathyakumar Seshachalam <
sathyakumar_seshacha...@trimble.com> wrote:

> Hi,
>
> Thanks for the response.
>
> I still like to know what happens for topics which have not defined a
> cleanup.policy.
> I assume the default value is compact. And hence all topic's logs will be
> compacted which I want to avoid.
>
> Am running 0.9.0, So will have to manually set log.cleaner.enable=true
>
> Regards,
> Sathya
>
> On Thu, Jun 30, 2016 at 10:20 AM, Manikumar Reddy <
> manikumar.re...@gmail.com> wrote:
>
>> Hi,
>>
>> Kafka internally creates the offsets topic (__consumer_offsets) with
>> compact mode on.
>> From 0.9.0.1 onwards log.cleaner.enable=true by default.  This means
>> topics
>> with a
>> cleanup.policy=compact will now be compacted by default,
>>
>> You can tweak the offset topic configuration by using below  props
>> offsets.topic.compression.codec
>> offsets.topic.num.partitions
>> offsets.topic.replication.factor
>> offsets.topic.segment.bytes
>> offsets.retention.minutes
>> offsets.retention.check.interval.ms
>>
>>
>> Thanks
>> Manikumar
>>
>> On Thu, Jun 30, 2016 at 9:49 AM, Sathyakumar Seshachalam <
>> sathyakumar_seshacha...@trimble.com> wrote:
>>
>> > Am little confused about how log cleaner works. My use case is that I
>> want
>> > to compact just selected topics (or in my case just the internal topic
>> > __consumers_offsets and want to leave other topics as is).
>> >
>> > Whats the right settings/configuration for this to happen.
>> >
>> > As I understand log cleaner enable/disable is a global setting. And my
>> > understanding is that they will clean all logs (compact logs based on
>> > cleanup policy), and so all topics' clean up policy will be considered
>> and
>> > hence compacted - compact being the default policy. Is this correct ?
>> >
>> > I have set all topic's retention duration to be a really exorbitantly
>> high
>> > value. Does it mean __consumer_offsets wont be compacted at all ? If so,
>> > how to set retention time just for offset topic it being an internal
>> topic.
>> >
>> > Regards,
>> > Sathya
>> >
>>
>
>


Re: Log retention just for offset topic

2016-06-29 Thread Sathyakumar Seshachalam
Hi,

Thanks for the response.

I still like to know what happens for topics which have not defined a
cleanup.policy.
I assume the default value is compact. And hence all topic's logs will be
compacted which I want to avoid.

Am running 0.9.0, So will have to manually set log.cleaner.enable=true

Regards,
Sathya

On Thu, Jun 30, 2016 at 10:20 AM, Manikumar Reddy  wrote:

> Hi,
>
> Kafka internally creates the offsets topic (__consumer_offsets) with
> compact mode on.
> From 0.9.0.1 onwards log.cleaner.enable=true by default.  This means topics
> with a
> cleanup.policy=compact will now be compacted by default,
>
> You can tweak the offset topic configuration by using below  props
> offsets.topic.compression.codec
> offsets.topic.num.partitions
> offsets.topic.replication.factor
> offsets.topic.segment.bytes
> offsets.retention.minutes
> offsets.retention.check.interval.ms
>
>
> Thanks
> Manikumar
>
> On Thu, Jun 30, 2016 at 9:49 AM, Sathyakumar Seshachalam <
> sathyakumar_seshacha...@trimble.com> wrote:
>
> > Am little confused about how log cleaner works. My use case is that I
> want
> > to compact just selected topics (or in my case just the internal topic
> > __consumers_offsets and want to leave other topics as is).
> >
> > Whats the right settings/configuration for this to happen.
> >
> > As I understand log cleaner enable/disable is a global setting. And my
> > understanding is that they will clean all logs (compact logs based on
> > cleanup policy), and so all topics' clean up policy will be considered
> and
> > hence compacted - compact being the default policy. Is this correct ?
> >
> > I have set all topic's retention duration to be a really exorbitantly
> high
> > value. Does it mean __consumer_offsets wont be compacted at all ? If so,
> > how to set retention time just for offset topic it being an internal
> topic.
> >
> > Regards,
> > Sathya
> >
>


Re: Log retention just for offset topic

2016-06-29 Thread Manikumar Reddy
Hi,

Kafka internally creates the offsets topic (__consumer_offsets) with
compact mode on.
>From 0.9.0.1 onwards log.cleaner.enable=true by default.  This means topics
with a
cleanup.policy=compact will now be compacted by default,

You can tweak the offset topic configuration by using below  props
offsets.topic.compression.codec
offsets.topic.num.partitions
offsets.topic.replication.factor
offsets.topic.segment.bytes
offsets.retention.minutes
offsets.retention.check.interval.ms


Thanks
Manikumar

On Thu, Jun 30, 2016 at 9:49 AM, Sathyakumar Seshachalam <
sathyakumar_seshacha...@trimble.com> wrote:

> Am little confused about how log cleaner works. My use case is that I want
> to compact just selected topics (or in my case just the internal topic
> __consumers_offsets and want to leave other topics as is).
>
> Whats the right settings/configuration for this to happen.
>
> As I understand log cleaner enable/disable is a global setting. And my
> understanding is that they will clean all logs (compact logs based on
> cleanup policy), and so all topics' clean up policy will be considered and
> hence compacted - compact being the default policy. Is this correct ?
>
> I have set all topic's retention duration to be a really exorbitantly high
> value. Does it mean __consumer_offsets wont be compacted at all ? If so,
> how to set retention time just for offset topic it being an internal topic.
>
> Regards,
> Sathya
>


Re: Kafka Producer connection issue on AWS private EC2 instance

2016-06-29 Thread vivek thakre
There are no errors in the broker logs.
The Kafka Cluster in itself is functional. I have other producers and
consumers working which are in public subnet (same as kafka cluster).



On Wed, Jun 29, 2016 at 7:15 PM, Kamesh Kompella  wrote:

> For what it's worth, I used to get similar messages with docker instances
> on centos.
>
> The way I debugged the problem was by looking at Kafka logs. In that case,
> it turned out that brokers could not reach zk and this info was in the
> logs. The logs will list the parameters the broker used at start up and any
> errors.
>
> In my case, the problem was the firewall that blocked access to zk from
> Kafka.
>
> > On Jun 29, 2016, at 6:56 PM, vivek thakre 
> wrote:
> >
> > I have Kafka Cluster setup on AWS Public Subnet with all brokers having
> > elastic IPs
> > My producers are on private subnet and not able to produce to the kafka
> on
> > public subnet.
> > Both subnets are in same VPC
> >
> > I added the private ip/cidr of producer ec2 instance to Public Kafka's
> > security group.
> > (I can telnet from private ec2 instance to brokers private ip on 9092
> port)
> >
> > From the ec2 instance on private subnet, I can list the topics using ZK's
> > private ip
> >
> > [ec2-user@ip-x-x-x-x kafka_2.10-0.9.0.1]$ bin/kafka-topics.sh
> --zookeeper
> > :2181 --list
> > test
> >
> > When I try to produce from private ec2 instance using broker's private
> IP,
> > I get following error
> >
> > [ec2-user@ip-x-x-x-x kafka_2.10-0.9.0.1]$ bin/kafka-console-producer.sh
> > --broker-list :9092 --topic test
> >
> > [2016-06-29 18:47:38,328] ERROR Error when sending message to topic test
> > with key: null, value: 3 bytes with error: Batch Expired
> > (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
> >
> > When I try to produce from private ec2 instance using broker's public
> IP, I
> > get following error.
> >
> > [2016-06-29 18:53:15,918] ERROR Error when sending message to topic test
> > with key: null, value: 3 bytes with error: Failed to update metadata
> after
> > 6 ms.
> (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
> >
> > Few settings from server.properties
> > # The id of the broker. This must be set to a unique integer for each
> > broker.
> > broker.id=0
> >
> > # Socket Server Settings
> > #
> >
> > listeners=PLAINTEXT://:9092
> >
> > # The port the socket server listens on
> > #port=9092
> >
> > # Hostname the broker will bind to. If not set, the server will bind to
> all
> > interfaces
> > host.name=
> >
> > # Hostname the broker will advertise to producers and consumers. If not
> > set, it uses the
> > # value for "host.name" if configured.  Otherwise, it will use the value
> > returned from
> > # java.net.InetAddress.getCanonicalHostName().
> > advertised.host.name=
> >
> > Please let me know if I am doing something wrong.
> >
> > Thank you
> >
> > Vivek
>


Log retention just for offset topic

2016-06-29 Thread Sathyakumar Seshachalam
Am little confused about how log cleaner works. My use case is that I want
to compact just selected topics (or in my case just the internal topic
__consumers_offsets and want to leave other topics as is).

Whats the right settings/configuration for this to happen.

As I understand log cleaner enable/disable is a global setting. And my
understanding is that they will clean all logs (compact logs based on
cleanup policy), and so all topics' clean up policy will be considered and
hence compacted - compact being the default policy. Is this correct ?

I have set all topic's retention duration to be a really exorbitantly high
value. Does it mean __consumer_offsets wont be compacted at all ? If so,
how to set retention time just for offset topic it being an internal topic.

Regards,
Sathya


Re: Streams RocksDB State Store Disk Usage

2016-06-29 Thread Guozhang Wang
Hello Avi,

One way to mentally quantify your state store usage is to consider the
total key space in your reduceByKey() operator, and multiply by the average
key-value pair size. Then you need to consider the RocksDB write / space
amplification factor as well.

Currently Kafka Streams hard-write some RocksDB config values such as block
size to achieve good write performance with the cost of write
amplification, but we are now working on exposing those configs to the
users so that they can override themselves:

https://issues.apache.org/jira/browse/KAFKA-3740


Guozhang


On Wed, Jun 29, 2016 at 11:59 AM, Avi Flax  wrote:

> On Jun 29, 2016, at 14:15, Matthias J. Sax  wrote:
> >
> > If you use window-operations, windows are kept until there retention
> > time expires. Thus, reducing the retention time, should decrease the
> > memory RocksDB needs to preserve windows.
>
> Thanks Matthias, that makes sense and I appreciate all the helpful
> pointers! This is really good to know. However, the app that’s generating
> the large RocksDB log files is not using windowing, just basic aggregation
> with reduceByKey.
>
> Thanks!
> Avi




-- 
-- Guozhang


Re: Kafka Roadmap

2016-06-29 Thread Guozhang Wang
Hello Szymon,

Currently the community do not have regular time-based release plans for
Kafka.

>From your question I think you are more interested in learning about how to
keep older versioned clients to talk to different / maybe newer versioned
brokers. We are working on a proposal to help you have a easier compatible
story and smooth upgrade process:

https://cwiki.apache.org/confluence/display/KAFKA/KIP-35+-+Retrieving+protocol+version


Guozhang


On Wed, Jun 29, 2016 at 1:05 AM, Panas, Szymon  wrote:

> All,
>
> I am building solution around Apache Kafka with set of dependencies in
> other products mainly Kafka client.
>
> Is there defined road map for the Kafka?
> I am particularly interested in version 1.X so it is marked stable.
>
> Does community plan release every 6 months (each version of Kafka changes
> way client communicates and prefer to be on latest stack if possible). Such
> information or at least reasonable assumption would help plan the projects.
>
> Thanks,
> Szymon Panas
> CONFIDENTIALITY NOTICE: This message is the property of International Game
> Technology PLC and/or its subsidiaries and may contain proprietary,
> confidential or trade secret information. This message is intended solely
> for the use of the addressee. If you are not the intended recipient and
> have received this message in error, please delete this message from your
> system. Any unauthorized reading, distribution, copying, or other use of
> this message or its attachments is strictly prohibited.
>



-- 
-- Guozhang


Re: Kafka Producer connection issue on AWS private EC2 instance

2016-06-29 Thread Kamesh Kompella
For what it's worth, I used to get similar messages with docker instances on 
centos.

The way I debugged the problem was by looking at Kafka logs. In that case, it 
turned out that brokers could not reach zk and this info was in the logs. The 
logs will list the parameters the broker used at start up and any errors.

In my case, the problem was the firewall that blocked access to zk from Kafka.

> On Jun 29, 2016, at 6:56 PM, vivek thakre  wrote:
> 
> I have Kafka Cluster setup on AWS Public Subnet with all brokers having
> elastic IPs
> My producers are on private subnet and not able to produce to the kafka on
> public subnet.
> Both subnets are in same VPC
> 
> I added the private ip/cidr of producer ec2 instance to Public Kafka's
> security group.
> (I can telnet from private ec2 instance to brokers private ip on 9092 port)
> 
> From the ec2 instance on private subnet, I can list the topics using ZK's
> private ip
> 
> [ec2-user@ip-x-x-x-x kafka_2.10-0.9.0.1]$ bin/kafka-topics.sh --zookeeper
> :2181 --list
> test
> 
> When I try to produce from private ec2 instance using broker's private IP,
> I get following error
> 
> [ec2-user@ip-x-x-x-x kafka_2.10-0.9.0.1]$ bin/kafka-console-producer.sh
> --broker-list :9092 --topic test
> 
> [2016-06-29 18:47:38,328] ERROR Error when sending message to topic test
> with key: null, value: 3 bytes with error: Batch Expired
> (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
> 
> When I try to produce from private ec2 instance using broker's public IP, I
> get following error.
> 
> [2016-06-29 18:53:15,918] ERROR Error when sending message to topic test
> with key: null, value: 3 bytes with error: Failed to update metadata after
> 6 ms. (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
> 
> Few settings from server.properties
> # The id of the broker. This must be set to a unique integer for each
> broker.
> broker.id=0
> 
> # Socket Server Settings
> #
> 
> listeners=PLAINTEXT://:9092
> 
> # The port the socket server listens on
> #port=9092
> 
> # Hostname the broker will bind to. If not set, the server will bind to all
> interfaces
> host.name=
> 
> # Hostname the broker will advertise to producers and consumers. If not
> set, it uses the
> # value for "host.name" if configured.  Otherwise, it will use the value
> returned from
> # java.net.InetAddress.getCanonicalHostName().
> advertised.host.name=
> 
> Please let me know if I am doing something wrong.
> 
> Thank you
> 
> Vivek


Kafka Producer connection issue on AWS private EC2 instance

2016-06-29 Thread vivek thakre
I have Kafka Cluster setup on AWS Public Subnet with all brokers having
elastic IPs
My producers are on private subnet and not able to produce to the kafka on
public subnet.
Both subnets are in same VPC

I added the private ip/cidr of producer ec2 instance to Public Kafka's
security group.
(I can telnet from private ec2 instance to brokers private ip on 9092 port)

>From the ec2 instance on private subnet, I can list the topics using ZK's
private ip

[ec2-user@ip-x-x-x-x kafka_2.10-0.9.0.1]$ bin/kafka-topics.sh --zookeeper
:2181 --list
test

When I try to produce from private ec2 instance using broker's private IP,
I get following error

[ec2-user@ip-x-x-x-x kafka_2.10-0.9.0.1]$ bin/kafka-console-producer.sh
--broker-list :9092 --topic test

[2016-06-29 18:47:38,328] ERROR Error when sending message to topic test
with key: null, value: 3 bytes with error: Batch Expired
(org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)

When I try to produce from private ec2 instance using broker's public IP, I
get following error.

[2016-06-29 18:53:15,918] ERROR Error when sending message to topic test
with key: null, value: 3 bytes with error: Failed to update metadata after
6 ms. (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)

Few settings from server.properties
# The id of the broker. This must be set to a unique integer for each
broker.
broker.id=0

# Socket Server Settings
#

listeners=PLAINTEXT://:9092

# The port the socket server listens on
#port=9092

# Hostname the broker will bind to. If not set, the server will bind to all
interfaces
host.name=

# Hostname the broker will advertise to producers and consumers. If not
set, it uses the
# value for "host.name" if configured.  Otherwise, it will use the value
returned from
# java.net.InetAddress.getCanonicalHostName().
advertised.host.name=

Please let me know if I am doing something wrong.

Thank you

Vivek


Re: Consumer Group, relabancing and partition uniqueness

2016-06-29 Thread Milind Vaidya
Florin,

Thanks, I got your point.

The documentation as well as diagram showing the mechanism of consumer
group indicates that,the partitions are shared disjointly by consumers in a
group.
You also stated above "Each of your consumer will receive message for its
allocated partition for that they subscribed."

e.g. P1..P5 are partitions and we have C1C5 consumers belonging to
same group. So is it correct to assume that C2 will consume for P4(say) and
not from any other partition. Similarly Ck will consume from Pm where 1 >=
k, m <= 5. If no rebalancing happens, as in none of the consumers dies, how
long will this combination sustain ? or random rebalance may happen after a
while leading to C2 consuming from P3 as against P4 from which it was
originally consuming.

I have my logs for the consumer, which indicate that partitions associated
with a consumer change periodically.
Is there any mechanism by which I can make sure a consumer consumes from a
particular partition for sufficient amount of time which is configurable
provided none of the consumers goes down triggering rebalance.




On Wed, Jun 29, 2016 at 3:02 PM, Spico Florin  wrote:

> Hi!
>   By default kafka uses internally a round robin partitioner that will send
> the messages to the right partition based on the message key. Each of your
> consumer will receive message for its allocated partition for that they
> subscribed.
>   In case of rebalance, if you add more consumers than the partitions then
> some of the consumers will not get any data. If one of the consumers dies,
> then the remained consumers will get messages from the partitions depending
> on their client id. Kafka internally uses the client id (lexicogarphic
> order) to allocate the partitions.
>
> I hope that this give you an overview of what happens and somehow answer to
> your questions.
>
> Regards,
> florin
>
> On Thu, Jun 30, 2016 at 12:36 AM, Milind Vaidya  wrote:
>
> > Hi
> >
> > Background :
> >
> > I am using a java based multithreaded kafka consumer.
> >
> > Two instances of  this consumer are running on 2 different machines i.e.
> > one consumer process per box, and  belong to same consumer group.
> >
> > Internally each process has 2 threads each.
> >
> > Both the consumer processes consume from same topic "rawlogs" which has 4
> > partitions.
> >
> > Problem :
> >
> > As per the documentation of consumer group "each message published to a
> > topic is delivered to one consumer instance within each subscribing
> > consumer
> > group" . But is there any mechanism by which a each consumer consumes
> from
> > disjoint set of partitions too ? or each message from whichever partition
> > it is, will be given randomly to one of the consumers ?
> >
> > In case of rebalance, the partitions may get shuffled among consumers but
> > then again they should get divided into 2 disjoint sets one for each
> > consumer.
> >
>


Re: Consumer Group, relabancing and partition uniqueness

2016-06-29 Thread Spico Florin
Hi!
  By default kafka uses internally a round robin partitioner that will send
the messages to the right partition based on the message key. Each of your
consumer will receive message for its allocated partition for that they
subscribed.
  In case of rebalance, if you add more consumers than the partitions then
some of the consumers will not get any data. If one of the consumers dies,
then the remained consumers will get messages from the partitions depending
on their client id. Kafka internally uses the client id (lexicogarphic
order) to allocate the partitions.

I hope that this give you an overview of what happens and somehow answer to
your questions.

Regards,
florin

On Thu, Jun 30, 2016 at 12:36 AM, Milind Vaidya  wrote:

> Hi
>
> Background :
>
> I am using a java based multithreaded kafka consumer.
>
> Two instances of  this consumer are running on 2 different machines i.e.
> one consumer process per box, and  belong to same consumer group.
>
> Internally each process has 2 threads each.
>
> Both the consumer processes consume from same topic "rawlogs" which has 4
> partitions.
>
> Problem :
>
> As per the documentation of consumer group "each message published to a
> topic is delivered to one consumer instance within each subscribing
> consumer
> group" . But is there any mechanism by which a each consumer consumes from
> disjoint set of partitions too ? or each message from whichever partition
> it is, will be given randomly to one of the consumers ?
>
> In case of rebalance, the partitions may get shuffled among consumers but
> then again they should get divided into 2 disjoint sets one for each
> consumer.
>


Consumer Group, relabancing and partition uniqueness

2016-06-29 Thread Milind Vaidya
Hi

Background :

I am using a java based multithreaded kafka consumer.

Two instances of  this consumer are running on 2 different machines i.e.
one consumer process per box, and  belong to same consumer group.

Internally each process has 2 threads each.

Both the consumer processes consume from same topic "rawlogs" which has 4
partitions.

Problem :

As per the documentation of consumer group "each message published to a
topic is delivered to one consumer instance within each subscribing consumer
group" . But is there any mechanism by which a each consumer consumes from
disjoint set of partitions too ? or each message from whichever partition
it is, will be given randomly to one of the consumers ?

In case of rebalance, the partitions may get shuffled among consumers but
then again they should get divided into 2 disjoint sets one for each
consumer.


Re: Streams RocksDB State Store Disk Usage

2016-06-29 Thread Avi Flax
On Jun 29, 2016, at 14:15, Matthias J. Sax  wrote:
> 
> If you use window-operations, windows are kept until there retention
> time expires. Thus, reducing the retention time, should decrease the
> memory RocksDB needs to preserve windows.

Thanks Matthias, that makes sense and I appreciate all the helpful pointers! 
This is really good to know. However, the app that’s generating the large 
RocksDB log files is not using windowing, just basic aggregation with 
reduceByKey.

Thanks!
Avi

Re: Streams RocksDB State Store Disk Usage

2016-06-29 Thread Avi Flax
On Jun 29, 2016, at 11:49, Eno Thereska  wrote:

> These are internal files to RockDb.

Yeah, that makes sense.

However, since Streams is encapsulating/employing RocksDB, in my view it’s 
Streams’ responsibility to configure RocksDB well with good defaults and/or at 
least provide a way for me to configure it. I’d hope that people operating 
Streams apps wouldn’t have to learn much about operating RocksDB; ideally it 
would be fully or mostly encapsulated.

> Depending on your load in the system I suppose they could contain quite a bit 
> of data. How large was the load in the system these past two weeks so we can 
> calibrate? 

I’m really not sure how to quantify it. I’m fairly new to Kafka and Streams so 
I’m not sure what metrics we’d use to describe load nor how to measure them. 
(I’d guess maybe throughput in bytes + number of records? I don’t know how to 
measure that… JMX? My topic’s current offset is 1.5mm so that’s approximately 
how many records the app has processed.)

> Otherwise I'm not sure if 1-2GB is a lot or not (sounds like not that big to 
> make the disk full, was there something else as well that ate up space?)

I agree that these specific numbers aren’t that much. My concern is that I 
don’t have a mental model for what’s going on here, nor for what will happen 
over longer periods of time — is RocksDB going to continue to generate these 
big files? Is there some cleanup process built in to RocksDB and/or Streams 
that just hasn’t kicked in yet? Is there a config setting I can use to tune 
this? Do I just need to set up a cron job?

My longstanding impression of Kafka Streams, since it was first proposed 
through today, is that one of its goals is to produce apps that are easy to 
deploy and operate — easy in terms of time and effort, but also in cognitive 
load. This behavior with RocksDB log files might be a negative factor WRT that 
goal… of course it might also be a fluke or an error on my side ;)

Re: Streams RocksDB State Store Disk Usage

2016-06-29 Thread Matthias J. Sax
One thing I want to add:

If you use window-operations, windows are kept until there retention
time expires. Thus, reducing the retention time, should decrease the
memory RocksDB needs to preserve windows.

See
http://docs.confluent.io/3.0.0/streams/developer-guide.html?highlight=retention#windowing-a-stream

You can set the retention time via
> Windows#until(long duration)


Of course, this is a trade-off between memory-usage and the ability to
process late arriving data. As long as windows are not expired, they can
get updated with late data. If retention time passed, late arrive
records gets dropped.

See
http://docs.confluent.io/3.0.0/streams/concepts.html?highlight=late%20arriving#windowing


-Matthias

On 06/29/2016 05:49 PM, Eno Thereska wrote:
> Hi Avi,
> 
> These are internal files to RockDb. Depending on your load in the system I 
> suppose they could contain quite a bit of data. How large was the load in the 
> system these past two weeks so we can calibrate? Otherwise I'm not sure if 
> 1-2GB is a lot or not (sounds like not that big to make the disk full, was 
> there something else as well that ate up space?)
> 
> Thanks
> Eno
> 
> 
>> On 29 Jun 2016, at 16:33, Avi Flax  wrote:
>>
>> Hello all,
>>
>> I’ve been working on a Streams app and so far it’s going quite well. I’ve 
>> had the app up and running in a staging environment for a few weeks, 
>> processing real data.
>>
>> Today I logged into the server to check in on some things, and I found the 
>> disk was full.
>>
>> I managed (with ncdu) to find that most of the unexpected usage was in:
>>
>> /tmp/kafka-streams/my_app/0_0/rocksdb/my_intermediate_aggregate_topic
>>
>> and specifically in files named LOG.old.{timestamp}. The files were 2.6 GB 
>> and 118 MB. Also, LOG was 1.1 GB.
>>
>> I checked the docs and searched the list (and Google) for info on Kafka 
>> Streams and/or RocksDB disk usage, cleanup, etc, but didn’t find much.
>>
>> So:
>>
>> * Is this normal, expected behavior?
>>
>> * Any recommendations or suggestions for dealing with this?
>>
>> Thanks!
>> Avi
> 



signature.asc
Description: OpenPGP digital signature


Monitoring Kafka Connect

2016-06-29 Thread Sumit Arora
Hello,

We are currently building our data-pipeline using Confluent and as part of this 
implementation, we have written couple of Kafka Connect Sink Connectors for 
Azure and MS SQL server. To provide some more context, we are planning to use 
SQL Server's Change Data Capture feature to track changes happening on 
individual tables in the source database and then use JDBC connector's "QUERY" 
mode to fetch the changed data and push it to Kafka topics. We had to go down 
this route because we cannot guarantee the presence of Identity or Timestamp 
columns in our source tables so we are relying on the timestamp columns present 
in the system tables created by Change Data Capture (trans_begin_time) and use 
it along with "QUERY" mode in JDBC connector. However, this design requires us 
to run one JDBC connector instance per table.

On the consumer side, once the data is on Kafka, we have written an Azure 
connector to push the data from Kafka topics onto Azure Blob Storage and 
another one to push data to MS SQL server.

Now coming to the actual question :) , the above design leads to a number of 
Kafka Connect workers, connectors and tasks running on our distributed cluster. 
We are planning to run multiple workers per host and have them share the tasks 
from our 3 connectors (JDBC, Azure and SQL Server). How can we monitor these 
connectors and tasks in a production environment? How can we make sure that 
workers are restarted once they are killed? How can we identify the failing 
tasks on different workers?Are there any best practices on monitoring Kafka 
Connect and managing workers/connectors/tasks?

We are close to finishing our development and deploying this to test and 
production environments and it is very important that we figure out a way to 
monitor this set-up.

Thanks,
Sumit
P.S. We are looking at Command Central as our long-term monitoring solution but 
need a solution for the short-term.


CONFIDENTIALITY NOTICE: This e-mail message including attachments, if any, is 
intended for the person or entity to which it is addressed and may contain 
confidential, privileged, and/or proprietary material. Any unauthorized review, 
use, disclosure or distribution is prohibited.  If you are not the intended 
recipient, please contact the sender by reply e-mail and destroy all copies of 
the original message.


Re: Building API to make Kafka reactive

2016-06-29 Thread Shekar Tippur
Thanks for the suggestion Lohith. Will try that and provide a feedback.

- Shekar

On Tue, Jun 28, 2016 at 11:45 PM, Lohith Samaga M  wrote:

> Hi Shekar,
> Alternatively, you could make each stage of your pipeline to write
> to a Cassandra (or other DB) and your API will read from it. With Cassandra
> TTL, the row will be deleted after TTL is passed. No manual cleanup is
> required.
>
> Best regards / Mit freundlichen Grüßen / Sincères salutations
> M. Lohith Samaga
>
>
>
> -Original Message-
> From: Shekar Tippur [mailto:ctip...@gmail.com]
> Sent: Wednesday, June 29, 2016 12.10
> To: users
> Subject: Building API to make Kafka reactive
>
> I am looking at building a reactive api on top of Kafka.
> This API produces event to Kafka topic. I want to add a unique session id
> into the payload.
> The data gets transformed as it goes through different stages of a
> pipeline. I want to specify a final topic where I want the api to know that
> the processing was successful.
> The API should give different status at each part of the pipeline.
> At the ingestion, the API responds with "submitted"
> During the progression, the API returns "in progress"
> After successful completion, the API returns "Success"
>
> Couple of questions:
> 1. Is this feasible?
> 2. I was looking at project reactor (https://projectreactor.io) where the
> docs talk about event bus. I wanted to see if I can implement a consumer
> that points to the "end" topic and throws an event into the event bus.
> Since I would know the session ID, I can process the request accordingly.
>
> Appreciate your inputs.
>
> - Shekar
> Information transmitted by this e-mail is proprietary to Mphasis, its
> associated companies and/ or its customers and is intended
> for use only by the individual or entity to which it is addressed, and may
> contain information that is privileged, confidential or
> exempt from disclosure under applicable law. If you are not the intended
> recipient or it appears that this mail has been forwarded
> to you without proper authority, you are notified that any use or
> dissemination of this information in any manner is strictly
> prohibited. In such cases, please notify us immediately at
> mailmas...@mphasis.com and delete this mail from your records.
>


Re: Building API to make Kafka reactive

2016-06-29 Thread Shekar Tippur
Thanks Rajini,

I have seen this. Looks like quite a bit of work has been done.
I was trying to go through this code and understand how to get started.

- Shekar

On Wed, Jun 29, 2016 at 12:49 AM, Rajini Sivaram <
rajinisiva...@googlemail.com> wrote:

> Hi Shekar,
>
> We are working on a reactive streams API for Kafka. It is in its very early
> experimental stage, but if you want to take a look, the code is in github (
> https://github.com/reactor/reactor-kafka). I think you can add a session
> id
> without making it part of the Kafka API. In the coming weeks, we will be
> trying out some examples to improve the API. We welcome any feedback.
>
> Regards,
>
> Rajini
>
> On Wed, Jun 29, 2016 at 7:45 AM, Lohith Samaga M <
> lohith.sam...@mphasis.com>
> wrote:
>
> > Hi Shekar,
> > Alternatively, you could make each stage of your pipeline to
> write
> > to a Cassandra (or other DB) and your API will read from it. With
> Cassandra
> > TTL, the row will be deleted after TTL is passed. No manual cleanup is
> > required.
> >
> > Best regards / Mit freundlichen Grüßen / Sincères salutations
> > M. Lohith Samaga
> >
> >
> >
> > -Original Message-
> > From: Shekar Tippur [mailto:ctip...@gmail.com]
> > Sent: Wednesday, June 29, 2016 12.10
> > To: users
> > Subject: Building API to make Kafka reactive
> >
> > I am looking at building a reactive api on top of Kafka.
> > This API produces event to Kafka topic. I want to add a unique session id
> > into the payload.
> > The data gets transformed as it goes through different stages of a
> > pipeline. I want to specify a final topic where I want the api to know
> that
> > the processing was successful.
> > The API should give different status at each part of the pipeline.
> > At the ingestion, the API responds with "submitted"
> > During the progression, the API returns "in progress"
> > After successful completion, the API returns "Success"
> >
> > Couple of questions:
> > 1. Is this feasible?
> > 2. I was looking at project reactor (https://projectreactor.io) where
> the
> > docs talk about event bus. I wanted to see if I can implement a consumer
> > that points to the "end" topic and throws an event into the event bus.
> > Since I would know the session ID, I can process the request accordingly.
> >
> > Appreciate your inputs.
> >
> > - Shekar
> > Information transmitted by this e-mail is proprietary to Mphasis, its
> > associated companies and/ or its customers and is intended
> > for use only by the individual or entity to which it is addressed, and
> may
> > contain information that is privileged, confidential or
> > exempt from disclosure under applicable law. If you are not the intended
> > recipient or it appears that this mail has been forwarded
> > to you without proper authority, you are notified that any use or
> > dissemination of this information in any manner is strictly
> > prohibited. In such cases, please notify us immediately at
> > mailmas...@mphasis.com and delete this mail from your records.
> >
>


kafka-console-producer.sh TimeOutException

2016-06-29 Thread Ludek Cigler
Hi,
I am trying to send messages using the Kafka console producer to a Kafka
broker that is running on the same machine. When I run

$ echo "Hello world" | ./kafka-console-producer.sh --broker-list
localhost:9092 --topic test

I receive the following error message:

[2016-06-29 15:00:44,069] ERROR Error when sending message to topic test
with key: null, value: 11 bytes with error:
(org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
org.apache.kafka.common.errors.TimeoutException: Batch containing 1
record(s) expired due to timeout while requesting metadata from brokers for
test-0

When I check if the Kafka broker is listening on port 9092, it is there and
running. How do I debug why cannot the command line producer connect to the
broker?

Thank you!
Ludek


Re: Streams RocksDB State Store Disk Usage

2016-06-29 Thread Eno Thereska
Hi Avi,

These are internal files to RockDb. Depending on your load in the system I 
suppose they could contain quite a bit of data. How large was the load in the 
system these past two weeks so we can calibrate? Otherwise I'm not sure if 
1-2GB is a lot or not (sounds like not that big to make the disk full, was 
there something else as well that ate up space?)

Thanks
Eno


> On 29 Jun 2016, at 16:33, Avi Flax  wrote:
> 
> Hello all,
> 
> I’ve been working on a Streams app and so far it’s going quite well. I’ve had 
> the app up and running in a staging environment for a few weeks, processing 
> real data.
> 
> Today I logged into the server to check in on some things, and I found the 
> disk was full.
> 
> I managed (with ncdu) to find that most of the unexpected usage was in:
> 
> /tmp/kafka-streams/my_app/0_0/rocksdb/my_intermediate_aggregate_topic
> 
> and specifically in files named LOG.old.{timestamp}. The files were 2.6 GB 
> and 118 MB. Also, LOG was 1.1 GB.
> 
> I checked the docs and searched the list (and Google) for info on Kafka 
> Streams and/or RocksDB disk usage, cleanup, etc, but didn’t find much.
> 
> So:
> 
> * Is this normal, expected behavior?
> 
> * Any recommendations or suggestions for dealing with this?
> 
> Thanks!
> Avi



Streams RocksDB State Store Disk Usage

2016-06-29 Thread Avi Flax
Hello all,

I’ve been working on a Streams app and so far it’s going quite well. I’ve had 
the app up and running in a staging environment for a few weeks, processing 
real data.

Today I logged into the server to check in on some things, and I found the disk 
was full.

I managed (with ncdu) to find that most of the unexpected usage was in:

/tmp/kafka-streams/my_app/0_0/rocksdb/my_intermediate_aggregate_topic

and specifically in files named LOG.old.{timestamp}. The files were 2.6 GB and 
118 MB. Also, LOG was 1.1 GB.

I checked the docs and searched the list (and Google) for info on Kafka Streams 
and/or RocksDB disk usage, cleanup, etc, but didn’t find much.

So:

* Is this normal, expected behavior?

* Any recommendations or suggestions for dealing with this?

Thanks!
Avi

Kafka Roadmap

2016-06-29 Thread Panas, Szymon
All,

I am building solution around Apache Kafka with set of dependencies in other 
products mainly Kafka client.

Is there defined road map for the Kafka?
I am particularly interested in version 1.X so it is marked stable.

Does community plan release every 6 months (each version of Kafka changes way 
client communicates and prefer to be on latest stack if possible). Such 
information or at least reasonable assumption would help plan the projects.

Thanks,
Szymon Panas
CONFIDENTIALITY NOTICE: This message is the property of International Game 
Technology PLC and/or its subsidiaries and may contain proprietary, 
confidential or trade secret information. This message is intended solely for 
the use of the addressee. If you are not the intended recipient and have 
received this message in error, please delete this message from your system. 
Any unauthorized reading, distribution, copying, or other use of this message 
or its attachments is strictly prohibited.


Re: Can I access Kafka Streams Key/Value store outside of Processor?

2016-06-29 Thread Eno Thereska
Yes! We just made KIP-67 available yesterday to start the discussions: 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-67%3A+Queryable+state+for+Kafka+Streams
 


Any feedback is welcome, there is a mail thread in the dev mailing list.

Thanks
Eno

> On 29 Jun 2016, at 15:52, Yi Chen  wrote:
> 
> This is awesome Eno! Would you mind sharing the JIRA ticket if you have one?
> 
> On Sun, Jun 19, 2016 at 12:07 PM, Eno Thereska 
> wrote:
> 
>> Hi Yi,
>> 
>> Your observation about accessing the state stores that are already there
>> vs. keeping state outside of Kafka Streams is a good one. We are currently
>> working on having the state stores accessible like you mention and should
>> be able to share some design docs shortly.
>> 
>> Thanks
>> Eno
>> 
>>> On 19 Jun 2016, at 19:49, Yi Chen  wrote:
>>> 
>>> Hello,
>>> 
>>> I am thinking of using the Kafka Steams feature to "unify" our real-time
>>> and scheduled workflow. An example is that in our workflow with stages
>> A-->
>>> B --> C, the A --> B segment can be achieved in real-time, but B-->C
>>> segment is usually a done with a scheduled job, running maybe once per
>> hour
>>> or once per 5 minutes, etc.
>>> 
>>> I am hoping to model this using Kafka Streams. Each stage would be a
>> topic:
>>> the Kafka Streams will process real-time events in topic-A and send
>> result
>>> to topic-B. The challenge is when I process the events in topic-B, I want
>>> to be able to process each event with a crontab-like schedule, so that if
>>> the process is successful (by checking an external API) the event is send
>>> to topic-C, otherwise, we will re-process the event again according to
>> the
>>> schedule.
>>> 
>>> Can I use the RocksDB key/value state store to store the topic-B events
>>> that failed to process, and have a scheduler (like quartz scheduler) to
>>> iterate all events in the store and re-process again? I know I can always
>>> keep the state outside of Kafka but I like that the state store is
>>> fault-tolerant and can be rebuilt automatically if the instance fails.
>> The
>>> examples I found so far seems to imply that the state store is only
>>> accessible from within a processor.
>>> 
>>> Thanks,
>>> Yi
>> 
>> 



Re: Can I access Kafka Streams Key/Value store outside of Processor?

2016-06-29 Thread Yi Chen
This is awesome Eno! Would you mind sharing the JIRA ticket if you have one?

On Sun, Jun 19, 2016 at 12:07 PM, Eno Thereska 
wrote:

> Hi Yi,
>
> Your observation about accessing the state stores that are already there
> vs. keeping state outside of Kafka Streams is a good one. We are currently
> working on having the state stores accessible like you mention and should
> be able to share some design docs shortly.
>
> Thanks
> Eno
>
> > On 19 Jun 2016, at 19:49, Yi Chen  wrote:
> >
> > Hello,
> >
> > I am thinking of using the Kafka Steams feature to "unify" our real-time
> > and scheduled workflow. An example is that in our workflow with stages
> A-->
> > B --> C, the A --> B segment can be achieved in real-time, but B-->C
> > segment is usually a done with a scheduled job, running maybe once per
> hour
> > or once per 5 minutes, etc.
> >
> > I am hoping to model this using Kafka Streams. Each stage would be a
> topic:
> > the Kafka Streams will process real-time events in topic-A and send
> result
> > to topic-B. The challenge is when I process the events in topic-B, I want
> > to be able to process each event with a crontab-like schedule, so that if
> > the process is successful (by checking an external API) the event is send
> > to topic-C, otherwise, we will re-process the event again according to
> the
> > schedule.
> >
> > Can I use the RocksDB key/value state store to store the topic-B events
> > that failed to process, and have a scheduler (like quartz scheduler) to
> > iterate all events in the store and re-process again? I know I can always
> > keep the state outside of Kafka but I like that the state store is
> > fault-tolerant and can be rebuilt automatically if the instance fails.
> The
> > examples I found so far seems to imply that the state store is only
> > accessible from within a processor.
> >
> > Thanks,
> > Yi
>
>


Re: Question about bootstrap processing in KafkaStreams.

2016-06-29 Thread Matthias J. Sax
Hi,

there was a similar discussion on the list already "Kafka stream join
scenario":

http://search-hadoop.com/m/uyzND1WsAGW1vB5O91=Kafka+stream+join+scenarios

Long story short: there is no explicit support or guarantee. As Jay
mentioned, some alignment is best effort. However, the main issues is
the question, what does it mean to load a KTable *completely*. As a
KTable consumers a changelog stream, there is no defined end as a KTable
is a always/infinitely updating "dynamic" table...

You might be able to build a custom solution for it, though (see the
email thread I linked above). Hope this helps.


-Matthias

On 06/29/2016 04:26 AM, Gwen Shapira wrote:
> Upgrade :)
> 
> On Tue, Jun 28, 2016 at 6:49 PM, Rohit Valsakumar  
> wrote:
>> Hi Jay,
>>
>> Thanks for the reply.
>>
>> Unfortunately in our case due to legacy reasons we are using
>> WallclockTimestampExtractor in the application for all the streams and the
>> existing messages in the stream probably won¹t have timestamps as they are
>> being produced by legacy clients. So the events are being ingested with
>> processing times and it may not be able to synchronize based on the
>> message timestamps. What do you recommend for this scenario?
>>
>> Rohit
>>
>> On 6/28/16, 5:18 PM, "Jay Kreps"  wrote:
>>
>>> I think you may get this for free as Kafka Streams attempts to align
>>> consumption across different topics/partitions by the timestamp in the
>>> messages. So in a case where you are starting a job fresh and it has a
>>> database changelog to consume and a event stream to consume, it will
>>> attempt to keep the Ktable at the "time" the event stream is at. This is
>>> only a heuristic, of course, since messages are necessarily strongly
>>> ordered by time. I think this is likely mostly the same but slightly
>>> better
>>> than the bootstrap usage in Samza but also covers other cases of
>>> alignment.
>>>
>>> If you want more control you can override the timestamp extractor that
>>> associates time and hence priority for the streams:
>>> https://kafka.apache.org/0100/javadoc/org/apache/kafka/streams/processor/T
>>> imestampExtractor.html
>>>
>>> -Jay
>>>
>>> On Tue, Jun 28, 2016 at 2:49 PM, Rohit Valsakumar 
>>> wrote:
>>>
 Hi all,

 Is there a way to consume all the contents of a kafka topic into a
 KTable
 before doing a left join with another Kstream?

 I am looking at something that simulates a bootstrap topic in a Samza
 job.

 Thanks,
 Rohit Valsakumar

 

 This email and any attachments may contain confidential and privileged
 material for the sole use of the intended recipient. Any review,
 copying,
 or distribution of this email (or any attachments) by others is
 prohibited.
 If you are not the intended recipient, please contact the sender
 immediately and permanently delete this email and any attachments. No
 employee or agent of TiVo Inc. is authorized to conclude any binding
 agreement on behalf of TiVo Inc. by email. Binding agreements with TiVo
 Inc. may only be made by a signed written agreement.

>>
>>
>> 
>>
>> This email and any attachments may contain confidential and privileged 
>> material for the sole use of the intended recipient. Any review, copying, or 
>> distribution of this email (or any attachments) by others is prohibited. If 
>> you are not the intended recipient, please contact the sender immediately 
>> and permanently delete this email and any attachments. No employee or agent 
>> of TiVo Inc. is authorized to conclude any binding agreement on behalf of 
>> TiVo Inc. by email. Binding agreements with TiVo Inc. may only be made by a 
>> signed written agreement.



signature.asc
Description: OpenPGP digital signature


Re: AWS EFS

2016-06-29 Thread Tom Crayford
I think you'll be far better off using EBS and Kafka's inbuilt distribution
than NFS mounts. Kafka's designed for distributing data natively, and not
for NFS style mounts.

On Wed, Jun 29, 2016 at 11:46 AM, Ben Davison 
wrote:

> Does anyone have any opinions on this?
>
> https://aws.amazon.com/blogs/aws/amazon-elastic-file-system-production-ready-in-three-regions/
>
> Looks interesting, just wondering if anyone else uses NFS mounts with
> Kafka?
>
> Thanks,
>
> Ben
>
> --
>
>
> This email, including attachments, is private and confidential. If you have
> received this email in error please notify the sender and delete it from
> your system. Emails are not secure and may contain viruses. No liability
> can be accepted for viruses that might be transferred by this email or any
> attachment. Any unauthorised copying of this message or unauthorised
> distribution and publication of the information contained herein are
> prohibited.
>
> 7digital Limited. Registered office: 69 Wilson Street, London EC2A 2BB.
> Registered in England and Wales. Registered No. 04843573.
>


AWS EFS

2016-06-29 Thread Ben Davison
Does anyone have any opinions on this?
https://aws.amazon.com/blogs/aws/amazon-elastic-file-system-production-ready-in-three-regions/

Looks interesting, just wondering if anyone else uses NFS mounts with Kafka?

Thanks,

Ben

-- 


This email, including attachments, is private and confidential. If you have 
received this email in error please notify the sender and delete it from 
your system. Emails are not secure and may contain viruses. No liability 
can be accepted for viruses that might be transferred by this email or any 
attachment. Any unauthorised copying of this message or unauthorised 
distribution and publication of the information contained herein are 
prohibited.

7digital Limited. Registered office: 69 Wilson Street, London EC2A 2BB.
Registered in England and Wales. Registered No. 04843573.


Colocating Kafka Connect on Kafka Broker

2016-06-29 Thread Kristoffer Sjögren
Hi

We want to use Kafka Connect to copy data to HDFS (using
kafka-connect-hdfs) in parquet format and was wondering if its a good
idea to collocate distributed Kafka Connect 1-1 on Kafka Brokers?

Considering the parquet indexing process would steal (a lot of / too
much?) computing resources from the brokers. The alternative is to
deploy on separate hardware (1-1?).

Experiences in this area?

Cheers,
-Kristoffer


Re: Building API to make Kafka reactive

2016-06-29 Thread Rajini Sivaram
Hi Shekar,

We are working on a reactive streams API for Kafka. It is in its very early
experimental stage, but if you want to take a look, the code is in github (
https://github.com/reactor/reactor-kafka). I think you can add a session id
without making it part of the Kafka API. In the coming weeks, we will be
trying out some examples to improve the API. We welcome any feedback.

Regards,

Rajini

On Wed, Jun 29, 2016 at 7:45 AM, Lohith Samaga M 
wrote:

> Hi Shekar,
> Alternatively, you could make each stage of your pipeline to write
> to a Cassandra (or other DB) and your API will read from it. With Cassandra
> TTL, the row will be deleted after TTL is passed. No manual cleanup is
> required.
>
> Best regards / Mit freundlichen Grüßen / Sincères salutations
> M. Lohith Samaga
>
>
>
> -Original Message-
> From: Shekar Tippur [mailto:ctip...@gmail.com]
> Sent: Wednesday, June 29, 2016 12.10
> To: users
> Subject: Building API to make Kafka reactive
>
> I am looking at building a reactive api on top of Kafka.
> This API produces event to Kafka topic. I want to add a unique session id
> into the payload.
> The data gets transformed as it goes through different stages of a
> pipeline. I want to specify a final topic where I want the api to know that
> the processing was successful.
> The API should give different status at each part of the pipeline.
> At the ingestion, the API responds with "submitted"
> During the progression, the API returns "in progress"
> After successful completion, the API returns "Success"
>
> Couple of questions:
> 1. Is this feasible?
> 2. I was looking at project reactor (https://projectreactor.io) where the
> docs talk about event bus. I wanted to see if I can implement a consumer
> that points to the "end" topic and throws an event into the event bus.
> Since I would know the session ID, I can process the request accordingly.
>
> Appreciate your inputs.
>
> - Shekar
> Information transmitted by this e-mail is proprietary to Mphasis, its
> associated companies and/ or its customers and is intended
> for use only by the individual or entity to which it is addressed, and may
> contain information that is privileged, confidential or
> exempt from disclosure under applicable law. If you are not the intended
> recipient or it appears that this mail has been forwarded
> to you without proper authority, you are notified that any use or
> dissemination of this information in any manner is strictly
> prohibited. In such cases, please notify us immediately at
> mailmas...@mphasis.com and delete this mail from your records.
>


RE: Building API to make Kafka reactive

2016-06-29 Thread Lohith Samaga M
Hi Shekar,
Alternatively, you could make each stage of your pipeline to write to a 
Cassandra (or other DB) and your API will read from it. With Cassandra TTL, the 
row will be deleted after TTL is passed. No manual cleanup is required.

Best regards / Mit freundlichen Grüßen / Sincères salutations
M. Lohith Samaga



-Original Message-
From: Shekar Tippur [mailto:ctip...@gmail.com] 
Sent: Wednesday, June 29, 2016 12.10
To: users
Subject: Building API to make Kafka reactive

I am looking at building a reactive api on top of Kafka.
This API produces event to Kafka topic. I want to add a unique session id into 
the payload.
The data gets transformed as it goes through different stages of a pipeline. I 
want to specify a final topic where I want the api to know that the processing 
was successful.
The API should give different status at each part of the pipeline.
At the ingestion, the API responds with "submitted"
During the progression, the API returns "in progress"
After successful completion, the API returns "Success"

Couple of questions:
1. Is this feasible?
2. I was looking at project reactor (https://projectreactor.io) where the docs 
talk about event bus. I wanted to see if I can implement a consumer that points 
to the "end" topic and throws an event into the event bus.
Since I would know the session ID, I can process the request accordingly.

Appreciate your inputs.

- Shekar
Information transmitted by this e-mail is proprietary to Mphasis, its 
associated companies and/ or its customers and is intended 
for use only by the individual or entity to which it is addressed, and may 
contain information that is privileged, confidential or 
exempt from disclosure under applicable law. If you are not the intended 
recipient or it appears that this mail has been forwarded 
to you without proper authority, you are notified that any use or dissemination 
of this information in any manner is strictly 
prohibited. In such cases, please notify us immediately at 
mailmas...@mphasis.com and delete this mail from your records.


Building API to make Kafka reactive

2016-06-29 Thread Shekar Tippur
I am looking at building a reactive api on top of Kafka.
This API produces event to Kafka topic. I want to add a unique session id
into the payload.
The data gets transformed as it goes through different stages of a
pipeline. I want to specify a final topic where I want the api to know that
the processing was successful.
The API should give different status at each part of the pipeline.
At the ingestion, the API responds with "submitted"
During the progression, the API returns "in progress"
After successful completion, the API returns "Success"

Couple of questions:
1. Is this feasible?
2. I was looking at project reactor (https://projectreactor.io) where the
docs talk about event bus. I wanted to see if I can implement a consumer
that points to the "end" topic and throws an event into the event bus.
Since I would know the session ID, I can process the request accordingly.

Appreciate your inputs.

- Shekar