Re: Add Nginx in front of Kafka cluster?

2017-09-14 Thread Ali Akhtar
parties = ports *

On Thu, Sep 14, 2017 at 8:04 PM, Ali Akhtar <ali.rac...@gmail.com> wrote:

> I would try to put the SSL on different ports than what you're sending
> kafka to. Make sure the kafka ports don't do anything except communicate in
> plaintext, put all 3rd parties on different parties.
>
>
> On Thu, Sep 14, 2017 at 7:23 PM, Yongtao You <yongtao_...@yahoo.com>
> wrote:
>
>> Does the following message mean broker 6 is having trouble talking to
>> broker 7? Broker 6's advertised listener is "PLAINTEXT://nginx:9906" and
>> Broker 7's advertised listener is "PLAINTEXT://nginx:9907". However, on
>> nginx server, port 9906 and 9907 are both SSL ports because that's what
>> producers (filebeat) send data to and they need to be encrypted.
>>
>>
>> [2017-09-14 21:59:32,543] WARN [Controller-6-to-broker-7-send-thread]:
>> Controller 6 epoch 1 fails to send request (type: UpdateMetadataRequest=,
>> controllerId=6, controllerEpoch=1, partitionStates={}, liveBrokers=(id=6,
>> endPoints=(host=nginx, port=9906, listenerName=ListenerName(PLAINTEXT),
>> securityProtocol=PLAINTEXT), rack=null), (id=7, endPoints=(host=nginx,
>> port=9907, listenerName=ListenerName(PLAINTEXT),
>> securityProtocol=PLAINTEXT), rack=null)) to broker nginx:9907 (id: 7 rack:
>> null). Reconnecting to broker. (kafka.controller.RequestSendThread)
>> java.io.IOException: Connection to 7 was disconnected before the response
>> was read
>> at org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(N
>> etworkClientUtils.java:93)
>> at kafka.controller.RequestSendThread.doWork(ControllerChannelM
>> anager.scala:225)
>> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:64)
>>
>>
>>
>>
>> On Thursday, September 14, 2017, 9:42:58 PM GMT+8, Yongtao You
>> <yongtao_...@yahoo.com.INVALID> wrote:
>>
>>
>> You are correct, that error message was a result of my misconfiguration.
>> I've corrected that. Although filebeat still can't send messages to Kafka.
>> In the Nginx log, I see the following:
>>
>> 2017/09/14 21:35:09 [info] 4030#4030: *60056 SSL_do_handshake() failed
>> (SSL: error:140760FC:SSL routines:SSL23_GET_CLIENT_HELLO:unknown
>> protocol) while SSL handshaking, client: 172.16.16.101, server:
>> 0.0.0.0:9907
>>
>>
>> where 172.16.16.101 is the host where one of the two Kafka brokers is
>> running. Looks like it tries to connect to port 9907 which is where the
>> other Kafka broker listens on. It's an [info] message so I'm not sure how
>> serious it is, but I don't see messages sent from filebeat in Kafka. :(
>>
>> Thanks!
>> -Yongtao
>>
>> On Thursday, September 14, 2017, 8:31:31 PM GMT+8, Ali Akhtar <
>> ali.rac...@gmail.com> wrote:
>>
>> If you ssh to the server where you got this error, are you able to ping
>> the
>> ip of node 7 on the port its trying to reach?
>>
>> On Thu, Sep 14, 2017 at 5:20 PM, Yongtao You <yongtao_...@yahoo.com>
>> wrote:
>>
>> > I'm getting a lot of these in the server.log:
>> >
>> >
>> > [2017-09-14 20:18:32,753] WARN Connection to node 7 could not be
>> > established. Broker may not be available. (org.apache.kafka.clients.
>> > NetworkClient)
>> >
>> >
>> > where node 7 is another broker in the cluster.
>> >
>> >
>> > Thanks.
>> >
>> > -Yongtao
>> >
>> >
>> > On Thursday, September 14, 2017, 8:13:09 PM GMT+8, Yongtao You <
>> > yongtao_...@yahoo.com> wrote:
>> >
>> >
>> > I got errors saying the other brokers are not reachable, or something
>> like
>> > that. Let me dig up the exact error messages. I am guessing the problem
>> was
>> > that the advertised listeners are of PLAINTEXT format, but the Nginx
>> > requires SSL. But I could be wrong.
>> >
>> >
>> > Thanks!
>> >
>> > -Yongtao
>> >
>> >
>> > On Thursday, September 14, 2017, 8:07:38 PM GMT+8, Ali Akhtar <
>> > ali.rac...@gmail.com> wrote:
>> >
>> >
>> > How do you know that the brokers don't talk to each other?
>> >
>> > On Thu, Sep 14, 2017 at 4:32 PM, Yongtao You <
>> > yongtao_...@yahoo.com.invalid>
>> > wrote:
>> >
>> > > Hi,
>> > > I would like to know the right way to setup a Kafka cluster with
>> Nginx in
>> > > front of it as a reverse proxy. Let's say I have 2 Ka

Re: Add Nginx in front of Kafka cluster?

2017-09-14 Thread Ali Akhtar
I would try to put the SSL on different ports than what you're sending
kafka to. Make sure the kafka ports don't do anything except communicate in
plaintext, put all 3rd parties on different parties.


On Thu, Sep 14, 2017 at 7:23 PM, Yongtao You <yongtao_...@yahoo.com> wrote:

> Does the following message mean broker 6 is having trouble talking to
> broker 7? Broker 6's advertised listener is "PLAINTEXT://nginx:9906" and
> Broker 7's advertised listener is "PLAINTEXT://nginx:9907". However, on
> nginx server, port 9906 and 9907 are both SSL ports because that's what
> producers (filebeat) send data to and they need to be encrypted.
>
>
> [2017-09-14 21:59:32,543] WARN [Controller-6-to-broker-7-send-thread]:
> Controller 6 epoch 1 fails to send request (type: UpdateMetadataRequest=,
> controllerId=6, controllerEpoch=1, partitionStates={}, liveBrokers=(id=6,
> endPoints=(host=nginx, port=9906, listenerName=ListenerName(PLAINTEXT),
> securityProtocol=PLAINTEXT), rack=null), (id=7, endPoints=(host=nginx,
> port=9907, listenerName=ListenerName(PLAINTEXT),
> securityProtocol=PLAINTEXT), rack=null)) to broker nginx:9907 (id: 7 rack:
> null). Reconnecting to broker. (kafka.controller.RequestSendThread)
> java.io.IOException: Connection to 7 was disconnected before the response
> was read
> at org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(
> NetworkClientUtils.java:93)
> at kafka.controller.RequestSendThread.doWork(ControllerChannelManager.
> scala:225)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:64)
>
>
>
>
> On Thursday, September 14, 2017, 9:42:58 PM GMT+8, Yongtao You
> <yongtao_...@yahoo.com.INVALID> wrote:
>
>
> You are correct, that error message was a result of my misconfiguration.
> I've corrected that. Although filebeat still can't send messages to Kafka.
> In the Nginx log, I see the following:
>
> 2017/09/14 21:35:09 [info] 4030#4030: *60056 SSL_do_handshake() failed
> (SSL: error:140760FC:SSL routines:SSL23_GET_CLIENT_HELLO:unknown
> protocol) while SSL handshaking, client: 172.16.16.101, server:
> 0.0.0.0:9907
>
>
> where 172.16.16.101 is the host where one of the two Kafka brokers is
> running. Looks like it tries to connect to port 9907 which is where the
> other Kafka broker listens on. It's an [info] message so I'm not sure how
> serious it is, but I don't see messages sent from filebeat in Kafka. :(
>
> Thanks!
> -Yongtao
>
> On Thursday, September 14, 2017, 8:31:31 PM GMT+8, Ali Akhtar <
> ali.rac...@gmail.com> wrote:
>
> If you ssh to the server where you got this error, are you able to ping the
> ip of node 7 on the port its trying to reach?
>
> On Thu, Sep 14, 2017 at 5:20 PM, Yongtao You <yongtao_...@yahoo.com>
> wrote:
>
> > I'm getting a lot of these in the server.log:
> >
> >
> > [2017-09-14 20:18:32,753] WARN Connection to node 7 could not be
> > established. Broker may not be available. (org.apache.kafka.clients.
> > NetworkClient)
> >
> >
> > where node 7 is another broker in the cluster.
> >
> >
> > Thanks.
> >
> > -Yongtao
> >
> >
> > On Thursday, September 14, 2017, 8:13:09 PM GMT+8, Yongtao You <
> > yongtao_...@yahoo.com> wrote:
> >
> >
> > I got errors saying the other brokers are not reachable, or something
> like
> > that. Let me dig up the exact error messages. I am guessing the problem
> was
> > that the advertised listeners are of PLAINTEXT format, but the Nginx
> > requires SSL. But I could be wrong.
> >
> >
> > Thanks!
> >
> > -Yongtao
> >
> >
> > On Thursday, September 14, 2017, 8:07:38 PM GMT+8, Ali Akhtar <
> > ali.rac...@gmail.com> wrote:
> >
> >
> > How do you know that the brokers don't talk to each other?
> >
> > On Thu, Sep 14, 2017 at 4:32 PM, Yongtao You <
> > yongtao_...@yahoo.com.invalid>
> > wrote:
> >
> > > Hi,
> > > I would like to know the right way to setup a Kafka cluster with Nginx
> in
> > > front of it as a reverse proxy. Let's say I have 2 Kafka brokers
> running
> > on
> > > 2 different hosts; and an Nginx server running on another host. Nginx
> > will
> > > listen on 2 different ports, and each will forward to one Kafka broker.
> > > Producers will connect to one of the 2 ports on the Nginx host.
> > > Nginx-Host: listens on 9000 ssl (forward to :9092 in
> plain
> > > text); 9001 ssl (forward to :9092 in plain text);
> > >
> > > Kafka-Host-0: listeners=PLAINTEXT://:9092;
> > > advertised.listeners=PL

Re: Add Nginx in front of Kafka cluster?

2017-09-14 Thread Ali Akhtar
If you ssh to the server where you got this error, are you able to ping the
ip of node 7 on the port its trying to reach?

On Thu, Sep 14, 2017 at 5:20 PM, Yongtao You <yongtao_...@yahoo.com> wrote:

> I'm getting a lot of these in the server.log:
>
>
> [2017-09-14 20:18:32,753] WARN Connection to node 7 could not be
> established. Broker may not be available. (org.apache.kafka.clients.
> NetworkClient)
>
>
> where node 7 is another broker in the cluster.
>
>
> Thanks.
>
> -Yongtao
>
>
> On Thursday, September 14, 2017, 8:13:09 PM GMT+8, Yongtao You <
> yongtao_...@yahoo.com> wrote:
>
>
> I got errors saying the other brokers are not reachable, or something like
> that. Let me dig up the exact error messages. I am guessing the problem was
> that the advertised listeners are of PLAINTEXT format, but the Nginx
> requires SSL. But I could be wrong.
>
>
> Thanks!
>
> -Yongtao
>
>
> On Thursday, September 14, 2017, 8:07:38 PM GMT+8, Ali Akhtar <
> ali.rac...@gmail.com> wrote:
>
>
> How do you know that the brokers don't talk to each other?
>
> On Thu, Sep 14, 2017 at 4:32 PM, Yongtao You <
> yongtao_...@yahoo.com.invalid>
> wrote:
>
> > Hi,
> > I would like to know the right way to setup a Kafka cluster with Nginx in
> > front of it as a reverse proxy. Let's say I have 2 Kafka brokers running
> on
> > 2 different hosts; and an Nginx server running on another host. Nginx
> will
> > listen on 2 different ports, and each will forward to one Kafka broker.
> > Producers will connect to one of the 2 ports on the Nginx host.
> > Nginx-Host: listens on 9000 ssl (forward to :9092 in plain
> > text); 9001 ssl (forward to :9092 in plain text);
> >
> > Kafka-Host-0: listeners=PLAINTEXT://:9092;
> > advertised.listeners=PLAINTEXT://:9000Kafka-Host-1:
> > listeners=PLAINTEXT://:9092; advertised.listeners=
> > PLAINTEXT://:9001
> > Ports on Nginx will have SSL enabled so that messages sent from producers
> > to Nginx will be encrypted; Traffic between Nginx and Kafka are in plain
> > text since it's on the internal network.
> > Why have producers go through Nginx? The main reason is that producers
> > will only need to open their firewall to a single IP so that even later
> on
> > when I add another Kafka broker, I don't need to modify the firewall of
> all
> > the producers.
> > My problem is that I can't make the above setup work. Brokers are unable
> > to talk to one another. :(
> > So, what's the right way to do this? Anyone has experience setting up
> > something similar? Or any recommendations for a different setup that will
> > not require changes on the producer's side when new Kafka brokers are
> added?
> >
> > Thanks!Yongtao
> > PS. The producers in question are Filebeats (https://www.elastic.co/
> > products/beats/filebeat).
> >
>


Re: Kafka with kubernetes

2017-08-22 Thread Ali Akhtar
Not too familiar with that error, but I do have Kafka working on
Kubernetes. I'll share my files here in case that helps:

Zookeeper:
https://gist.github.com/aliakhtar/812974c35cf2658022fca55cc83f4b1d

Kafka: https://gist.github.com/aliakhtar/724fbee6910dec7263ab70332386af33

Essentially I have 3 kafka nodes and 3 zookeeper nodes, and my hacky way of
getting this to work was to have 3 kafka deployments + services, and vice
versa for zookeeper.

Ideally you would use a StatefulSet for this, but zookeeper and kafka
require a unique id for each node to be provided in the config, and there's
no way currently to do that in Kubernetes (or wasn't, last I checked). If
there was, e.g using the pod IP, then you'd use a StatefulSet with a
valueFrom of the pod's ip, and pass that on as the unique ID to each node.

On Tue, Aug 22, 2017 at 7:28 PM, Sean McElroy 
wrote:

> I'm not sure this is the correct place to post this question, but anyway...
>
> When running kafka in kubernetes, the kafka config contains this:
>
> listeners = PLAINTEXT://:tcp://10.0.0.186:9092
>
> Which is leading to this error: No security protocol defined for listener
> PLAINTEXT://:TCP
>
> Here is the section of the kubernetes yaml file that defines kafka:
>
> - image: wurstmeister/kafka
>   name: kafka
>   volumeMounts:
>   - name: kafka-vol
> mountPath: /var/run/docker.sock
>   env:
>   - name: KAFKA_ADVERTISED_HOST_NAME
> valueFrom:
>   fieldRef:
> fieldPath: status.podIP
>   - name: KAFKA_ADVERTISED_PORT
> value: "9092"
>   - name: KAFKA_ZOOKEEPER_CONNECT
> value: localhost:2181
>   - name: KAFKA_LISTENER_SECURITY_PROTOCOL_MAP
> value: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
>   - name: KAFKA_ADVERTISED_PROTOCOL_NAME
> value: OUTSIDE
>   - name: KAFKA_PROTOCOL_NAME
> value: INSIDE
>   ports:
> - containerPort: 9092
>
> Can anyone see what I'm doing wrong?
>
> Thanks
>


Re: Kafka behavior when consuming a topic which doesn't exist?

2017-07-08 Thread Ali Akhtar
Oh gotcha, thanks. So a topic will be created if topic creation is enabled.

On Sat, Jul 8, 2017 at 8:14 PM, M. Manna <manme...@gmail.com> wrote:

> Please check my previous email.
>
> On Sat, 8 Jul 2017 at 2:32 am, Ali Akhtar <ali.rac...@gmail.com> wrote:
>
> > What happens if auto creation is enabled but the topic doesn't exist.
> > Consumers subscribe to that topic which doesn't exist. Then messages are
> > posted to that topic.
> >
> > Will the consumer receive those messages in this scenario?
> >
> >
> >
> > On 8 Jul 2017 4:38 a.m., "M. Manna" <manme...@gmail.com> wrote:
> >
> > That depends.
> >
> > If auto creation of non-existent topic enabled (check docs), then it will
> > simple use the minimum partiotion and replication settings defined in
> > broker config to create a topic. If auto creation is disabled, your
> > consumer group won't do anything.
> >
> > With auto creation enable - It's the same if you publish to a topic that
> > doesn't yet exist. But that means topic will get created first, and then
> > the messages are sent on the bus. If you subscribe to that topic later
> you
> > will get messages. Check below:
> >
> > http://grokbase.com/t/kafka/users/1648kbr04c/subscribe-on-
> > a-topic-that-does-not-exist
> >
> >
> >
> > On 7 Jul 2017 9:46 pm, "Ali Akhtar" <ali.rac...@gmail.com> wrote:
> >
> > > Sometimes I see warnings in my logs if i create a consumer for a topic
> > > which doesn't exist. Such as:
> > >
> > > org.apache.kafka.clients.NetworkClient  - Error while fetching
> metadata
> > > with correlation id 1 : {example_topic=LEADER_NOT_AVAILABLE}
> > >
> > > If later messages are posted to that topic (which will create it), will
> > my
> > > consumer receive those messages?
> > >
> >
>


Re: Kafka behavior when consuming a topic which doesn't exist?

2017-07-07 Thread Ali Akhtar
What happens if auto creation is enabled but the topic doesn't exist.
Consumers subscribe to that topic which doesn't exist. Then messages are
posted to that topic.

Will the consumer receive those messages in this scenario?



On 8 Jul 2017 4:38 a.m., "M. Manna" <manme...@gmail.com> wrote:

That depends.

If auto creation of non-existent topic enabled (check docs), then it will
simple use the minimum partiotion and replication settings defined in
broker config to create a topic. If auto creation is disabled, your
consumer group won't do anything.

With auto creation enable - It's the same if you publish to a topic that
doesn't yet exist. But that means topic will get created first, and then
the messages are sent on the bus. If you subscribe to that topic later you
will get messages. Check below:

http://grokbase.com/t/kafka/users/1648kbr04c/subscribe-on-
a-topic-that-does-not-exist



On 7 Jul 2017 9:46 pm, "Ali Akhtar" <ali.rac...@gmail.com> wrote:

> Sometimes I see warnings in my logs if i create a consumer for a topic
> which doesn't exist. Such as:
>
> org.apache.kafka.clients.NetworkClient  - Error while fetching metadata
> with correlation id 1 : {example_topic=LEADER_NOT_AVAILABLE}
>
> If later messages are posted to that topic (which will create it), will my
> consumer receive those messages?
>


Kafka behavior when consuming a topic which doesn't exist?

2017-07-07 Thread Ali Akhtar
Sometimes I see warnings in my logs if i create a consumer for a topic
which doesn't exist. Such as:

org.apache.kafka.clients.NetworkClient  - Error while fetching metadata
with correlation id 1 : {example_topic=LEADER_NOT_AVAILABLE}

If later messages are posted to that topic (which will create it), will my
consumer receive those messages?


Re: Caching in Kafka Streams to ignore garbage message

2017-04-27 Thread Ali Akhtar
I'd like to avoid repeated trips to the db, and caching a large amount of
data in memory.

Is it possible to send a message w/ the id as the partition key to a topic,
and then use the same id as the key, so the same node which will receive
the data for an id is the one which will process it?


On Fri, Apr 28, 2017 at 2:32 AM, Matthias J. Sax <matth...@confluent.io>
wrote:

> The recommended solution would be to use Kafka Connect to load you DB
> data into a Kafka topic.
>
> With Kafka Streams you read your db-topic as KTable and do a (inne)
> KStream-KTable join to lookup the IDs.
>
>
> -Matthias
>
> On 4/27/17 2:22 PM, Ali Akhtar wrote:
> > I have a Kafka topic which will receive a large amount of data.
> >
> > This data has an 'id' field. I need to look up the id in an external db,
> > see if we are tracking that id, and if yes, we process that message, if
> > not, we ignore it.
> >
> > 99% of the data will be for ids which are not being tracked - 1% or so
> will
> > be for ids which are tracked.
> >
> > My concern is, that there'd be a lot of round trips to the db made just
> to
> > check the id, and if it'd be better to cache the ids being tracked
> > somewhere, so other ids are ignored.
> >
> > I was considering sending a message to another (or the same topic)
> whenever
> > a new id is added to the track list, and that id should then get
> processed
> > on the node which will process the messages.
> >
> > Should I just cache all ids on all nodes (which may be a large amount),
> or
> > is there a way to only cache the id on the same kafka streams node which
> > will receive data for that id?
> >
>
>


Caching in Kafka Streams to ignore garbage message

2017-04-27 Thread Ali Akhtar
I have a Kafka topic which will receive a large amount of data.

This data has an 'id' field. I need to look up the id in an external db,
see if we are tracking that id, and if yes, we process that message, if
not, we ignore it.

99% of the data will be for ids which are not being tracked - 1% or so will
be for ids which are tracked.

My concern is, that there'd be a lot of round trips to the db made just to
check the id, and if it'd be better to cache the ids being tracked
somewhere, so other ids are ignored.

I was considering sending a message to another (or the same topic) whenever
a new id is added to the track list, and that id should then get processed
on the node which will process the messages.

Should I just cache all ids on all nodes (which may be a large amount), or
is there a way to only cache the id on the same kafka streams node which
will receive data for that id?


Calculating time elapsed using event start / stop notification messages

2017-04-21 Thread Ali Akhtar
I have a tricky use case where a user initiates an event (by clicking a
button) and then stops it (by clicking it again, losing connection, closing
the browser, etc).

Each time the event starts or stops, a notification is sent to a kafka
topic, with the user's id as the message key and the current timestamp, and
the state of the event (started, or stopped).

I'm using Kafka streaming to process these events.

Based on the notifications, I need to determine the total time spent
'working', i.e the time between user clicked start, and they stopped. Per
hour, per day, etc.

E.g total time spent 'working' per hour, per day.

Any ideas how this could be achieved, while accounting for messages
arriving out of order due to latency, etc (e.g the stop notification may
arrive before start)?

Would the kafka streams local store be of any use here (all events by the
same user will have the same message key), or should i use Redis? Or do I
need an hourly job which runs and processes last hour's events?


Leader not available error after kafka node goes down

2017-04-08 Thread Ali Akhtar
I have a 3 node kafka cluster which is being managed via kubernetes, in
docker containers.

Recently, one of the 3 nodes went down, and was automatically re-created by
kubernetes.

However, now whenever I try to consume from one of my Kafka topics, thru
Kafka Streaming, i get the error:

>6687 [StreamThread-1] WARN  org.apache.kafka.clients.NetworkClient  -
Error while fetching metadata with correlation id 1 :
{my_topic=LEADER_NOT_AVAILABLE}

> org.apache.kafka.streams.errors.StreamsException: Topic not found during
partition assignment: my_topic

When I tried to re-create the topic via 'kafka-topics.sh --create', I
received:

> Error while executing topic command : Topic "my_topic" already exists.

Any ideas what's going on here, and how to have Kafka recover from a node
going down and automatically elect a new leader?


Re: Out of order message processing with Kafka Streams

2017-03-21 Thread Ali Akhtar
Hans,

Which class's javadocs should i look at? From my initial look at the
javadocs and discussion with Michael, it doesn't seem possible.

On Tue, Mar 21, 2017 at 10:44 PM, Hans Jespersen <h...@confluent.io> wrote:

> Yes, and yes!
>
> -hans
>
>
>
> > On Mar 21, 2017, at 7:45 AM, Ali Akhtar <ali.rac...@gmail.com> wrote:
> >
> > That would require
> >
> > - Knowing the current window's id (or some other identifier) to
> > differentiate it from other windows
> >
> > - Being able to process individual messages in a window
> >
> > Are those 2 things possible w/ kafka streams? (java)
> >
> > On Tue, Mar 21, 2017 at 7:43 PM, Hans Jespersen <h...@confluent.io>
> wrote:
> >
> >> While it's not exactly the same as the window start/stop time you can
> >> store (in the state store) the earliest and latest timestamps of any
> >> messages in each window and use that as a good approximation for the
> window
> >> boundary times.
> >>
> >> -hans
> >>
> >>> On Mar 20, 2017, at 1:00 PM, Ali Akhtar <ali.rac...@gmail.com> wrote:
> >>>
> >>> Yeah, windowing seems perfect, if only I could find out the current
> >>> window's start time (so I can log the current bucket's start & end
> times)
> >>> and process window messages individually rather than as aggregates.
> >>>
> >>> It doesn't seem like i can get this metadata from ProcessorContext
> >> though,
> >>> from looking over the javadocs
> >>>
> >>>> On Tue, Mar 21, 2017 at 12:38 AM, Michael Noll <mich...@confluent.io>
> >> wrote:
> >>>>
> >>>> Ali,
> >>>>
> >>>> what you describe is (roughly!) how Kafka Streams implements the
> >> internal
> >>>> state stores to support windowing.
> >>>>
> >>>> Some users have been following a similar approach as you outlined,
> using
> >>>> the Processor API.
> >>>>
> >>>>
> >>>>
> >>>>> On Mon, Mar 20, 2017 at 7:54 PM, Ali Akhtar <ali.rac...@gmail.com>
> >> wrote:
> >>>>>
> >>>>> It would be helpful to know the 'start' and 'end' of the current
> >>>> metadata,
> >>>>> so if an out of order message arrives late, and is being processed in
> >>>>> foreach(), you'd know which window / bucket it belongs to, and can
> >> handle
> >>>>> it accordingly.
> >>>>>
> >>>>> I'm guessing that's not possible at the moment.
> >>>>>
> >>>>> (My use case is, i receive a stream of messages. Messages need to be
> >>>> stored
> >>>>> and sorted into 'buckets', to indicate 'sessions'. Each time there's
> a
> >>>> gap
> >>>>> of 30 mins or more since the last message (under a key), a new
> >> 'session'
> >>>>> (bucket) should be started, and future messages should belong to that
> >>>>> 'session', until the next 30+ min gap).
> >>>>>
> >>>>> On Mon, Mar 20, 2017 at 11:44 PM, Michael Noll <mich...@confluent.io
> >
> >>>>> wrote:
> >>>>>
> >>>>>>> Can windows only be used for aggregations, or can they also be used
> >>>> for
> >>>>>> foreach(),
> >>>>>> and such?
> >>>>>>
> >>>>>> As of today, you can use windows only in aggregations.
> >>>>>>
> >>>>>>> And is it possible to get metadata on the message, such as whether
> or
> >>>>>> not its
> >>>>>> late, its index/position within the other messages, etc?
> >>>>>>
> >>>>>> If you use the Processor API of Kafka Streams, you can have access
> to
> >>>> an
> >>>>>> incoming record's topic, partition, offset, etc. via the so-called
> >>>>>> ProcessorContext (which is updated for every new incoming record):
> >>>>>>
> >>>>>> http://docs.confluent.io/current/streams/javadocs/org/
> >>>>>> apache/kafka/streams/processor/Processor.html
> >>>>>> - You can get/store a reference to the ProcessorContext from
> >>>>>> `Processor#init()`.
> >>>>>>
> >>>>&g

Re: Out of order message processing with Kafka Streams

2017-03-21 Thread Ali Akhtar
That would require

- Knowing the current window's id (or some other identifier) to
differentiate it from other windows

- Being able to process individual messages in a window

Are those 2 things possible w/ kafka streams? (java)

On Tue, Mar 21, 2017 at 7:43 PM, Hans Jespersen <h...@confluent.io> wrote:

> While it's not exactly the same as the window start/stop time you can
> store (in the state store) the earliest and latest timestamps of any
> messages in each window and use that as a good approximation for the window
> boundary times.
>
> -hans
>
> > On Mar 20, 2017, at 1:00 PM, Ali Akhtar <ali.rac...@gmail.com> wrote:
> >
> > Yeah, windowing seems perfect, if only I could find out the current
> > window's start time (so I can log the current bucket's start & end times)
> > and process window messages individually rather than as aggregates.
> >
> > It doesn't seem like i can get this metadata from ProcessorContext
> though,
> > from looking over the javadocs
> >
> >> On Tue, Mar 21, 2017 at 12:38 AM, Michael Noll <mich...@confluent.io>
> wrote:
> >>
> >> Ali,
> >>
> >> what you describe is (roughly!) how Kafka Streams implements the
> internal
> >> state stores to support windowing.
> >>
> >> Some users have been following a similar approach as you outlined, using
> >> the Processor API.
> >>
> >>
> >>
> >>> On Mon, Mar 20, 2017 at 7:54 PM, Ali Akhtar <ali.rac...@gmail.com>
> wrote:
> >>>
> >>> It would be helpful to know the 'start' and 'end' of the current
> >> metadata,
> >>> so if an out of order message arrives late, and is being processed in
> >>> foreach(), you'd know which window / bucket it belongs to, and can
> handle
> >>> it accordingly.
> >>>
> >>> I'm guessing that's not possible at the moment.
> >>>
> >>> (My use case is, i receive a stream of messages. Messages need to be
> >> stored
> >>> and sorted into 'buckets', to indicate 'sessions'. Each time there's a
> >> gap
> >>> of 30 mins or more since the last message (under a key), a new
> 'session'
> >>> (bucket) should be started, and future messages should belong to that
> >>> 'session', until the next 30+ min gap).
> >>>
> >>> On Mon, Mar 20, 2017 at 11:44 PM, Michael Noll <mich...@confluent.io>
> >>> wrote:
> >>>
> >>>>> Can windows only be used for aggregations, or can they also be used
> >> for
> >>>> foreach(),
> >>>> and such?
> >>>>
> >>>> As of today, you can use windows only in aggregations.
> >>>>
> >>>>> And is it possible to get metadata on the message, such as whether or
> >>>> not its
> >>>> late, its index/position within the other messages, etc?
> >>>>
> >>>> If you use the Processor API of Kafka Streams, you can have access to
> >> an
> >>>> incoming record's topic, partition, offset, etc. via the so-called
> >>>> ProcessorContext (which is updated for every new incoming record):
> >>>>
> >>>> http://docs.confluent.io/current/streams/javadocs/org/
> >>>> apache/kafka/streams/processor/Processor.html
> >>>> - You can get/store a reference to the ProcessorContext from
> >>>> `Processor#init()`.
> >>>>
> >>>> http://docs.confluent.io/current/streams/javadocs/org/
> >>>> apache/kafka/streams/processor/ProcessorContext.html
> >>>> - The context can then be used within `Processor#process()` when you
> >>>> process a new record.  As I said, the context is updated behind the
> >>> scenes
> >>>> to match the record that is currently being processed.
> >>>>
> >>>>
> >>>> Best,
> >>>> Michael
> >>>>
> >>>>
> >>>>
> >>>>
> >>>> On Mon, Mar 20, 2017 at 5:59 PM, Ali Akhtar <ali.rac...@gmail.com>
> >>> wrote:
> >>>>
> >>>>> Can windows only be used for aggregations, or can they also be used
> >> for
> >>>>> foreach(), and such?
> >>>>>
> >>>>> And is it possible to get metadata on the message, such as whether or
> >>> not
> >>>>> its late, its index/position within the other mes

Re: Out of order message processing with Kafka Streams

2017-03-20 Thread Ali Akhtar
Yeah, windowing seems perfect, if only I could find out the current
window's start time (so I can log the current bucket's start & end times)
and process window messages individually rather than as aggregates.

It doesn't seem like i can get this metadata from ProcessorContext though,
from looking over the javadocs

On Tue, Mar 21, 2017 at 12:38 AM, Michael Noll <mich...@confluent.io> wrote:

> Ali,
>
> what you describe is (roughly!) how Kafka Streams implements the internal
> state stores to support windowing.
>
> Some users have been following a similar approach as you outlined, using
> the Processor API.
>
>
>
> On Mon, Mar 20, 2017 at 7:54 PM, Ali Akhtar <ali.rac...@gmail.com> wrote:
>
> > It would be helpful to know the 'start' and 'end' of the current
> metadata,
> > so if an out of order message arrives late, and is being processed in
> > foreach(), you'd know which window / bucket it belongs to, and can handle
> > it accordingly.
> >
> > I'm guessing that's not possible at the moment.
> >
> > (My use case is, i receive a stream of messages. Messages need to be
> stored
> > and sorted into 'buckets', to indicate 'sessions'. Each time there's a
> gap
> > of 30 mins or more since the last message (under a key), a new 'session'
> > (bucket) should be started, and future messages should belong to that
> > 'session', until the next 30+ min gap).
> >
> > On Mon, Mar 20, 2017 at 11:44 PM, Michael Noll <mich...@confluent.io>
> > wrote:
> >
> > > > Can windows only be used for aggregations, or can they also be used
> for
> > > foreach(),
> > > and such?
> > >
> > > As of today, you can use windows only in aggregations.
> > >
> > > > And is it possible to get metadata on the message, such as whether or
> > > not its
> > > late, its index/position within the other messages, etc?
> > >
> > > If you use the Processor API of Kafka Streams, you can have access to
> an
> > > incoming record's topic, partition, offset, etc. via the so-called
> > > ProcessorContext (which is updated for every new incoming record):
> > >
> > > http://docs.confluent.io/current/streams/javadocs/org/
> > > apache/kafka/streams/processor/Processor.html
> > > - You can get/store a reference to the ProcessorContext from
> > > `Processor#init()`.
> > >
> > > http://docs.confluent.io/current/streams/javadocs/org/
> > > apache/kafka/streams/processor/ProcessorContext.html
> > > - The context can then be used within `Processor#process()` when you
> > > process a new record.  As I said, the context is updated behind the
> > scenes
> > > to match the record that is currently being processed.
> > >
> > >
> > > Best,
> > > Michael
> > >
> > >
> > >
> > >
> > > On Mon, Mar 20, 2017 at 5:59 PM, Ali Akhtar <ali.rac...@gmail.com>
> > wrote:
> > >
> > > > Can windows only be used for aggregations, or can they also be used
> for
> > > > foreach(), and such?
> > > >
> > > > And is it possible to get metadata on the message, such as whether or
> > not
> > > > its late, its index/position within the other messages, etc?
> > > >
> > > > On Mon, Mar 20, 2017 at 9:44 PM, Michael Noll <mich...@confluent.io>
> > > > wrote:
> > > >
> > > > > And since you asked for a pointer, Ali:
> > > > > http://docs.confluent.io/current/streams/concepts.html#windowing
> > > > >
> > > > >
> > > > > On Mon, Mar 20, 2017 at 5:43 PM, Michael Noll <
> mich...@confluent.io>
> > > > > wrote:
> > > > >
> > > > > > Late-arriving and out-of-order data is only treated specially for
> > > > > windowed
> > > > > > aggregations.
> > > > > >
> > > > > > For stateless operations such as `KStream#foreach()` or
> > > > `KStream#map()`,
> > > > > > records are processed in the order they arrive (per partition).
> > > > > >
> > > > > > -Michael
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > > On Sat, Mar 18, 2017 at 10:47 PM, Ali Akhtar <
> ali.rac...@gmail.com
> > >
> > > > > wrote:
> > > > > >
> > > > > >> > later when 

Re: Out of order message processing with Kafka Streams

2017-03-20 Thread Ali Akhtar
It would be helpful to know the 'start' and 'end' of the current metadata,
so if an out of order message arrives late, and is being processed in
foreach(), you'd know which window / bucket it belongs to, and can handle
it accordingly.

I'm guessing that's not possible at the moment.

(My use case is, i receive a stream of messages. Messages need to be stored
and sorted into 'buckets', to indicate 'sessions'. Each time there's a gap
of 30 mins or more since the last message (under a key), a new 'session'
(bucket) should be started, and future messages should belong to that
'session', until the next 30+ min gap).

On Mon, Mar 20, 2017 at 11:44 PM, Michael Noll <mich...@confluent.io> wrote:

> > Can windows only be used for aggregations, or can they also be used for
> foreach(),
> and such?
>
> As of today, you can use windows only in aggregations.
>
> > And is it possible to get metadata on the message, such as whether or
> not its
> late, its index/position within the other messages, etc?
>
> If you use the Processor API of Kafka Streams, you can have access to an
> incoming record's topic, partition, offset, etc. via the so-called
> ProcessorContext (which is updated for every new incoming record):
>
> http://docs.confluent.io/current/streams/javadocs/org/
> apache/kafka/streams/processor/Processor.html
> - You can get/store a reference to the ProcessorContext from
> `Processor#init()`.
>
> http://docs.confluent.io/current/streams/javadocs/org/
> apache/kafka/streams/processor/ProcessorContext.html
> - The context can then be used within `Processor#process()` when you
> process a new record.  As I said, the context is updated behind the scenes
> to match the record that is currently being processed.
>
>
> Best,
> Michael
>
>
>
>
> On Mon, Mar 20, 2017 at 5:59 PM, Ali Akhtar <ali.rac...@gmail.com> wrote:
>
> > Can windows only be used for aggregations, or can they also be used for
> > foreach(), and such?
> >
> > And is it possible to get metadata on the message, such as whether or not
> > its late, its index/position within the other messages, etc?
> >
> > On Mon, Mar 20, 2017 at 9:44 PM, Michael Noll <mich...@confluent.io>
> > wrote:
> >
> > > And since you asked for a pointer, Ali:
> > > http://docs.confluent.io/current/streams/concepts.html#windowing
> > >
> > >
> > > On Mon, Mar 20, 2017 at 5:43 PM, Michael Noll <mich...@confluent.io>
> > > wrote:
> > >
> > > > Late-arriving and out-of-order data is only treated specially for
> > > windowed
> > > > aggregations.
> > > >
> > > > For stateless operations such as `KStream#foreach()` or
> > `KStream#map()`,
> > > > records are processed in the order they arrive (per partition).
> > > >
> > > > -Michael
> > > >
> > > >
> > > >
> > > >
> > > > On Sat, Mar 18, 2017 at 10:47 PM, Ali Akhtar <ali.rac...@gmail.com>
> > > wrote:
> > > >
> > > >> > later when message A arrives it will put that message back into
> > > >> > the right temporal context and publish an amended result for the
> > > proper
> > > >> > time/session window as if message B were consumed in the timestamp
> > > order
> > > >> > before message A.
> > > >>
> > > >> Does this apply to the aggregation Kafka stream methods then, and
> not
> > to
> > > >> e.g foreach?
> > > >>
> > > >> On Sun, Mar 19, 2017 at 2:40 AM, Hans Jespersen <h...@confluent.io>
> > > >> wrote:
> > > >>
> > > >> > Yes stream processing and CEP are subtlety different things.
> > > >> >
> > > >> > Kafka Streams helps you write stateful apps and allows that state
> to
> > > be
> > > >> > preserved on disk (a local State store) as well as distributed for
> > HA
> > > or
> > > >> > for parallel partitioned processing (via Kafka topic partitions
> and
> > > >> > consumer groups) as well as in memory (as a performance
> > enhancement).
> > > >> >
> > > >> > However a classical CEP engine with a pre-modeled state machine
> and
> > > >> > pattern matching rules is something different from stream
> > processing.
> > > >> >
> > > >> > It is on course possible to build a CEP system on top on Kafka
> > Streams
> > > >> and
> > >

Re: Processing multiple topics

2017-03-20 Thread Ali Akhtar
Are you saying, that it should process all messages from topic 1, then
topic 2, then topic 3, then 4?

Or that they need to be processed exactly at the same time?

On Mon, Mar 20, 2017 at 10:05 PM, Manasa Danda 
wrote:

> Hi,
>
> I am Manasa, currently working on a project that requires processing data
> from multiple topics at the same time. I am looking for an advise on how to
> approach this problem. Below is the use case.
>
>
> We have 4 topics, with data coming in at a different rate in each topic,
> but the messages in each topic share a common unique identifier (
> attributionId). I need to process all the events in the 4 topics with same
> attributionId at the same time. we are currently using spark streaming for
> processing.
>
> Here's the steps for current logic.
>
> 1. Read and filter data in topic 1
> 2. Read and filter data in topic 2
> 3. Read and filter data in topic 3
> 4. Read and filter data in topic 4
> 5. Union of DStreams from steps 1-4, which were executed in parallel
> 6. process unified DStream
>
> However, since the data is coming at a different rate, the associated data
> ( topic 1 is generating 1000 times more than topic 2), is not coming in
> same batch window.
>
> Any ideas on how it can implemented would help.
>
> Thank you!!
>
> -Manasa
>


Re: Out of order message processing with Kafka Streams

2017-03-20 Thread Ali Akhtar
Can windows only be used for aggregations, or can they also be used for
foreach(), and such?

And is it possible to get metadata on the message, such as whether or not
its late, its index/position within the other messages, etc?

On Mon, Mar 20, 2017 at 9:44 PM, Michael Noll <mich...@confluent.io> wrote:

> And since you asked for a pointer, Ali:
> http://docs.confluent.io/current/streams/concepts.html#windowing
>
>
> On Mon, Mar 20, 2017 at 5:43 PM, Michael Noll <mich...@confluent.io>
> wrote:
>
> > Late-arriving and out-of-order data is only treated specially for
> windowed
> > aggregations.
> >
> > For stateless operations such as `KStream#foreach()` or `KStream#map()`,
> > records are processed in the order they arrive (per partition).
> >
> > -Michael
> >
> >
> >
> >
> > On Sat, Mar 18, 2017 at 10:47 PM, Ali Akhtar <ali.rac...@gmail.com>
> wrote:
> >
> >> > later when message A arrives it will put that message back into
> >> > the right temporal context and publish an amended result for the
> proper
> >> > time/session window as if message B were consumed in the timestamp
> order
> >> > before message A.
> >>
> >> Does this apply to the aggregation Kafka stream methods then, and not to
> >> e.g foreach?
> >>
> >> On Sun, Mar 19, 2017 at 2:40 AM, Hans Jespersen <h...@confluent.io>
> >> wrote:
> >>
> >> > Yes stream processing and CEP are subtlety different things.
> >> >
> >> > Kafka Streams helps you write stateful apps and allows that state to
> be
> >> > preserved on disk (a local State store) as well as distributed for HA
> or
> >> > for parallel partitioned processing (via Kafka topic partitions and
> >> > consumer groups) as well as in memory (as a performance enhancement).
> >> >
> >> > However a classical CEP engine with a pre-modeled state machine and
> >> > pattern matching rules is something different from stream processing.
> >> >
> >> > It is on course possible to build a CEP system on top on Kafka Streams
> >> and
> >> > get the best of both worlds.
> >> >
> >> > -hans
> >> >
> >> > > On Mar 18, 2017, at 11:36 AM, Sabarish Sasidharan <
> >> > sabarish@gmail.com> wrote:
> >> > >
> >> > > Hans
> >> > >
> >> > > What you state would work for aggregations, but not for state
> machines
> >> > and
> >> > > CEP.
> >> > >
> >> > > Regards
> >> > > Sab
> >> > >
> >> > >> On 19 Mar 2017 12:01 a.m., "Hans Jespersen" <h...@confluent.io>
> >> wrote:
> >> > >>
> >> > >> The only way to make sure A is consumed first would be to delay the
> >> > >> consumption of message B for at least 15 minutes which would fly in
> >> the
> >> > >> face of the principals of a true streaming platform so the short
> >> answer
> >> > to
> >> > >> your question is "no" because that would be batch processing not
> >> stream
> >> > >> processing.
> >> > >>
> >> > >> However, Kafka Streams does handle late arriving data. So if you
> had
> >> > some
> >> > >> analytics that computes results on a time window or a session
> window
> >> > then
> >> > >> Kafka streams will compute on the stream in real time (processing
> >> > message
> >> > >> B) and then later when message A arrives it will put that message
> >> back
> >> > into
> >> > >> the right temporal context and publish an amended result for the
> >> proper
> >> > >> time/session window as if message B were consumed in the timestamp
> >> order
> >> > >> before message A. The end result of this flow is that you
> eventually
> >> get
> >> > >> the same results you would get in a batch processing system but
> with
> >> the
> >> > >> added benefit of getting intermediary result at much lower latency.
> >> > >>
> >> > >> -hans
> >> > >>
> >> > >> /**
> >> > >> * Hans Jespersen, Principal Systems Engineer, Confluent Inc.
> >> > >> * h...@confluent.io (650)924-2670
> >> > >> */
> >> > >>
> >> > >>> On Sat, Mar 18, 2017 at 10:29 AM, Ali Akhtar <
> ali.rac...@gmail.com>
> >> > wrote:
> >> > >>>
> >> > >>> Is it possible to have Kafka Streams order messages correctly by
> >> their
> >> > >>> timestamps, even if they arrived out of order?
> >> > >>>
> >> > >>> E.g, say Message A with a timestamp of 5:00 PM and Message B with
> a
> >> > >>> timestamp of 5:15 PM, are sent.
> >> > >>>
> >> > >>> Message B arrives sooner than Message A, due to network issues.
> >> > >>>
> >> > >>> Is it possible to make sure that, across all consumers of Kafka
> >> Streams
> >> > >>> (even if they are across different servers, but have the same
> >> consumer
> >> > >>> group), Message A is consumed first, before Message B?
> >> > >>>
> >> > >>> Thanks.
> >> > >>>
> >> > >>
> >> >
> >>
> >
> >
> >
>


Re: Out of order message processing with Kafka Streams

2017-03-18 Thread Ali Akhtar
> later when message A arrives it will put that message back into
> the right temporal context and publish an amended result for the proper
> time/session window as if message B were consumed in the timestamp order
> before message A.

Does this apply to the aggregation Kafka stream methods then, and not to
e.g foreach?

On Sun, Mar 19, 2017 at 2:40 AM, Hans Jespersen <h...@confluent.io> wrote:

> Yes stream processing and CEP are subtlety different things.
>
> Kafka Streams helps you write stateful apps and allows that state to be
> preserved on disk (a local State store) as well as distributed for HA or
> for parallel partitioned processing (via Kafka topic partitions and
> consumer groups) as well as in memory (as a performance enhancement).
>
> However a classical CEP engine with a pre-modeled state machine and
> pattern matching rules is something different from stream processing.
>
> It is on course possible to build a CEP system on top on Kafka Streams and
> get the best of both worlds.
>
> -hans
>
> > On Mar 18, 2017, at 11:36 AM, Sabarish Sasidharan <
> sabarish@gmail.com> wrote:
> >
> > Hans
> >
> > What you state would work for aggregations, but not for state machines
> and
> > CEP.
> >
> > Regards
> > Sab
> >
> >> On 19 Mar 2017 12:01 a.m., "Hans Jespersen" <h...@confluent.io> wrote:
> >>
> >> The only way to make sure A is consumed first would be to delay the
> >> consumption of message B for at least 15 minutes which would fly in the
> >> face of the principals of a true streaming platform so the short answer
> to
> >> your question is "no" because that would be batch processing not stream
> >> processing.
> >>
> >> However, Kafka Streams does handle late arriving data. So if you had
> some
> >> analytics that computes results on a time window or a session window
> then
> >> Kafka streams will compute on the stream in real time (processing
> message
> >> B) and then later when message A arrives it will put that message back
> into
> >> the right temporal context and publish an amended result for the proper
> >> time/session window as if message B were consumed in the timestamp order
> >> before message A. The end result of this flow is that you eventually get
> >> the same results you would get in a batch processing system but with the
> >> added benefit of getting intermediary result at much lower latency.
> >>
> >> -hans
> >>
> >> /**
> >> * Hans Jespersen, Principal Systems Engineer, Confluent Inc.
> >> * h...@confluent.io (650)924-2670
> >> */
> >>
> >>> On Sat, Mar 18, 2017 at 10:29 AM, Ali Akhtar <ali.rac...@gmail.com>
> wrote:
> >>>
> >>> Is it possible to have Kafka Streams order messages correctly by their
> >>> timestamps, even if they arrived out of order?
> >>>
> >>> E.g, say Message A with a timestamp of 5:00 PM and Message B with a
> >>> timestamp of 5:15 PM, are sent.
> >>>
> >>> Message B arrives sooner than Message A, due to network issues.
> >>>
> >>> Is it possible to make sure that, across all consumers of Kafka Streams
> >>> (even if they are across different servers, but have the same consumer
> >>> group), Message A is consumed first, before Message B?
> >>>
> >>> Thanks.
> >>>
> >>
>


Re: Out of order message processing with Kafka Streams

2017-03-18 Thread Ali Akhtar
> However, Kafka Streams does handle late arriving data. So if you had some
> analytics that computes results on a time window or a session window then
> Kafka streams will compute on the stream in real time (processing message
> B) and then later when message A arrives it will put that message back
into
> the right temporal context and publish an amended result for the proper
> time/session window as if message B were consumed in the timestamp order
> before message A.

Can you link me to the javadocs or documentation for where I can read more
into how this is done / how to use this? Thanks.



On Sat, Mar 18, 2017 at 11:11 PM, Hans Jespersen <h...@confluent.io> wrote:

> sorry I mixed up Message A and B wrt the to question but the answer is
> still valid.
>
> -hans
>
> /**
>  * Hans Jespersen, Principal Systems Engineer, Confluent Inc.
>  * h...@confluent.io (650)924-2670
>  */
>
> On Sat, Mar 18, 2017 at 11:07 AM, Hans Jespersen <h...@confluent.io>
> wrote:
>
> > The only way to make sure A is consumed first would be to delay the
> > consumption of message B for at least 15 minutes which would fly in the
> > face of the principals of a true streaming platform so the short answer
> to
> > your question is "no" because that would be batch processing not stream
> > processing.
> >
> > However, Kafka Streams does handle late arriving data. So if you had some
> > analytics that computes results on a time window or a session window then
> > Kafka streams will compute on the stream in real time (processing message
> > B) and then later when message A arrives it will put that message back
> into
> > the right temporal context and publish an amended result for the proper
> > time/session window as if message B were consumed in the timestamp order
> > before message A. The end result of this flow is that you eventually get
> > the same results you would get in a batch processing system but with the
> > added benefit of getting intermediary result at much lower latency.
> >
> > -hans
> >
> > /**
> >  * Hans Jespersen, Principal Systems Engineer, Confluent Inc.
> >  * h...@confluent.io (650)924-2670 <(650)%20924-2670>
> >  */
> >
> > On Sat, Mar 18, 2017 at 10:29 AM, Ali Akhtar <ali.rac...@gmail.com>
> wrote:
> >
> >> Is it possible to have Kafka Streams order messages correctly by their
> >> timestamps, even if they arrived out of order?
> >>
> >> E.g, say Message A with a timestamp of 5:00 PM and Message B with a
> >> timestamp of 5:15 PM, are sent.
> >>
> >> Message B arrives sooner than Message A, due to network issues.
> >>
> >> Is it possible to make sure that, across all consumers of Kafka Streams
> >> (even if they are across different servers, but have the same consumer
> >> group), Message A is consumed first, before Message B?
> >>
> >> Thanks.
> >>
> >
> >
>


Out of order message processing with Kafka Streams

2017-03-18 Thread Ali Akhtar
Is it possible to have Kafka Streams order messages correctly by their
timestamps, even if they arrived out of order?

E.g, say Message A with a timestamp of 5:00 PM and Message B with a
timestamp of 5:15 PM, are sent.

Message B arrives sooner than Message A, due to network issues.

Is it possible to make sure that, across all consumers of Kafka Streams
(even if they are across different servers, but have the same consumer
group), Message A is consumed first, before Message B?

Thanks.


Re: Processing time series data in order

2016-12-28 Thread Ali Akhtar
This will only ensure the order of delivery though, not the actual order of
the events, right?

I.e if due to network lag or any other reason, if the producer sends A,
then B, but B arrives before A, then B will be returned before A even if
they both went to the same partition. Am I correct about that?

Or can I use KTables to ensure A is processed before B? (Both messages will
have a timestamp which is being extracted by a TimestampExtractor ).

On Tue, Dec 27, 2016 at 8:15 PM, Tauzell, Dave <dave.tauz...@surescripts.com
> wrote:

> If you specify a key with each message then all messages with the same key
> get sent to the same partition.
>
> > On Dec 26, 2016, at 23:32, Ali Akhtar <ali.rac...@gmail.com> wrote:
> >
> > How would I route the messages to a specific partition?
> >
> >> On 27 Dec 2016 10:25 a.m., "Asaf Mesika" <asaf.mes...@gmail.com> wrote:
> >>
> >> There is a much easier approach: your can route all messages of a given
> Id
> >> to a specific partition. Since each partition has a single writer you
> get
> >> the ordering you wish for. Of course this won't work if your updates
> occur
> >> in different hosts.
> >> Also maybe Kafka streams can help shard the based on item Id to a second
> >> topic
> >>> On Thu, 22 Dec 2016 at 4:31 Ali Akhtar <ali.rac...@gmail.com> wrote:
> >>>
> >>> The batch size can be large, so in memory ordering isn't an option,
> >>> unfortunately.
> >>>
> >>> On Thu, Dec 22, 2016 at 7:09 AM, Jesse Hodges <hodges.je...@gmail.com>
> >>> wrote:
> >>>
> >>>> Depending on the expected max out of order window, why not order them
> >> in
> >>>> memory? Then you don't need to reread from Cassandra, in case of a
> >>> problem
> >>>> you can reread data from Kafka.
> >>>>
> >>>> -Jesse
> >>>>
> >>>>> On Dec 21, 2016, at 7:24 PM, Ali Akhtar <ali.rac...@gmail.com>
> >> wrote:
> >>>>>
> >>>>> - I'm receiving a batch of messages to a Kafka topic.
> >>>>>
> >>>>> Each message has a timestamp, however the messages can arrive / get
> >>>> processed out of order. I.e event 1's timestamp could've been a few
> >>> seconds
> >>>> before event 2, and event 2 could still get processed before event 1.
> >>>>>
> >>>>> - I know the number of messages that are sent per batch.
> >>>>>
> >>>>> - I need to process the messages in order. The messages are basically
> >>>> providing the history of an item. I need to be able to track the
> >> history
> >>>> accurately (i.e, if an event occurred 3 times, i need to accurately
> log
> >>> the
> >>>> dates of the first, 2nd, and 3rd time it occurred).
> >>>>>
> >>>>> The approach I'm considering is:
> >>>>>
> >>>>> - Creating a cassandra table which is ordered by the timestamp of the
> >>>> messages.
> >>>>>
> >>>>> - Once a batch of messages has arrived, writing them all to
> >> cassandra,
> >>>> counting on them being ordered by the timestamp even if they are
> >>> processed
> >>>> out of order.
> >>>>>
> >>>>> - Then iterating over the messages in the cassandra table, to process
> >>>> them in order.
> >>>>>
> >>>>> However, I'm concerned about Cassandra's eventual consistency. Could
> >> it
> >>>> be that even though I wrote the messages, they are not there when I
> try
> >>> to
> >>>> read them (which would be almost immediately after they are written)?
> >>>>>
> >>>>> Should I enforce consistency = ALL to make sure the messages will be
> >>>> available immediately after being written?
> >>>>>
> >>>>> Is there a better way to handle this thru either Kafka streams or
> >>>> Cassandra?
> >>>>
> >>>
> >>
> This e-mail and any files transmitted with it are confidential, may
> contain sensitive information, and are intended solely for the use of the
> individual or entity to whom they are addressed. If you have received this
> e-mail in error, please notify the sender by reply e-mail immediately and
> destroy all copies of the e-mail and any attachments.
>


Re: Processing time series data in order

2016-12-21 Thread Ali Akhtar
The batch size can be large, so in memory ordering isn't an option,
unfortunately.

On Thu, Dec 22, 2016 at 7:09 AM, Jesse Hodges <hodges.je...@gmail.com>
wrote:

> Depending on the expected max out of order window, why not order them in
> memory? Then you don't need to reread from Cassandra, in case of a problem
> you can reread data from Kafka.
>
> -Jesse
>
> > On Dec 21, 2016, at 7:24 PM, Ali Akhtar <ali.rac...@gmail.com> wrote:
> >
> > - I'm receiving a batch of messages to a Kafka topic.
> >
> > Each message has a timestamp, however the messages can arrive / get
> processed out of order. I.e event 1's timestamp could've been a few seconds
> before event 2, and event 2 could still get processed before event 1.
> >
> > - I know the number of messages that are sent per batch.
> >
> > - I need to process the messages in order. The messages are basically
> providing the history of an item. I need to be able to track the history
> accurately (i.e, if an event occurred 3 times, i need to accurately log the
> dates of the first, 2nd, and 3rd time it occurred).
> >
> > The approach I'm considering is:
> >
> > - Creating a cassandra table which is ordered by the timestamp of the
> messages.
> >
> > - Once a batch of messages has arrived, writing them all to cassandra,
> counting on them being ordered by the timestamp even if they are processed
> out of order.
> >
> > - Then iterating over the messages in the cassandra table, to process
> them in order.
> >
> > However, I'm concerned about Cassandra's eventual consistency. Could it
> be that even though I wrote the messages, they are not there when I try to
> read them (which would be almost immediately after they are written)?
> >
> > Should I enforce consistency = ALL to make sure the messages will be
> available immediately after being written?
> >
> > Is there a better way to handle this thru either Kafka streams or
> Cassandra?
>


Processing time series data in order

2016-12-21 Thread Ali Akhtar
- I'm receiving a batch of messages to a Kafka topic.

Each message has a timestamp, however the messages can arrive / get
processed out of order. I.e event 1's timestamp could've been a few seconds
before event 2, and event 2 could still get processed before event 1.

- I know the number of messages that are sent per batch.

- I need to process the messages in order. The messages are basically
providing the history of an item. I need to be able to track the history
accurately (i.e, if an event occurred 3 times, i need to accurately log the
dates of the first, 2nd, and 3rd time it occurred).

The approach I'm considering is:

- Creating a cassandra table which is ordered by the timestamp of the
messages.

- Once a batch of messages has arrived, writing them all to cassandra,
counting on them being ordered by the timestamp even if they are processed
out of order.

- Then iterating over the messages in the cassandra table, to process them
in order.

However, I'm concerned about Cassandra's eventual consistency. Could it be
that even though I wrote the messages, they are not there when I try to
read them (which would be almost immediately after they are written)?

Should I enforce consistency = ALL to make sure the messages will be
available immediately after being written?

Is there a better way to handle this thru either Kafka streams or Cassandra?


Re: [Streams] Threading Frustration

2016-12-12 Thread Ali Akhtar
@Damian,

In the Java equivalent of this, does each KStream / KStreamBuilder.stream()
invocation create its own topic group, i.e its own thread?

On Mon, Dec 12, 2016 at 10:29 PM, Damian Guy  wrote:

> Yep - that looks correct
>
> On Mon, 12 Dec 2016 at 17:18 Avi Flax  wrote:
>
> >
> > > On Dec 12, 2016, at 11:42, Damian Guy  wrote:
> > >
> > > If you want to split these out so that they can run in parallel, then
> you
> > > will need to create a new stream for each topic.
> >
> >
> > Just to make sure I’m understanding this, does this code change look
> right?
> >
> > From this:
> >
> > ```ruby
> > topic_names = SDP::Config.topic_names(config).to_java(:string)
> > bay_events = builder.stream key_serde, val_serde, topic_names
> > # etc
> > ```
> >
> >
> > To this:
> >
> > ```ruby
> > SDP::Config.topic_names(config).each do |topic_name|
> >   bay_events = builder.stream key_serde, val_serde, topic_name
> >   # etc
> > end
> > ```
> >
> >
> > I hope so!
> >
> > Thanks!
> > Avi
>


Re: Deleting a topic without delete.topic.enable=true?

2016-12-09 Thread Ali Akhtar
You need to also delete / restart zookeeper, its probably storing the
topics there. (Or yeah, just enable it and then delete the topic)

On Fri, Dec 9, 2016 at 9:09 PM, Rodrigo Sandoval  wrote:

> Why did you do all those things instead of just setting
> delete.topic.enable=true?
>
> On Dec 9, 2016 13:40, "Tim Visher"  wrote:
>
> > Hi Everyone,
> >
> > I'm really confused at the moment. We created a topic with brokers set to
> > delete.topic.enable=false.
> >
> > We now need to delete that topic. To do that we shut down all the
> brokers,
> > deleted everything under log.dirs and logs.dir on all the kafka brokers,
> > `rmr`ed the entire chroot that kafka was storing things under in
> zookeeper,
> > and then brought kafka back up.
> >
> > After doing all that, the topic comes back, every time.
> >
> > What can we do to delete that topic?
> >
> > --
> >
> > In Christ,
> >
> > Timmy V.
> >
> > http://blog.twonegatives.com/
> > http://five.sentenc.es/ -- Spend less time on mail
> >
>


Re: Tracking when a batch of messages has arrived?

2016-12-04 Thread Ali Akhtar
I don't - it would require fetching all messages and iterating over them
just to count them, which is expensive. I know the counts after they have
been sent.

On Sun, Dec 4, 2016 at 9:34 PM, Marko Bonaći <marko.bon...@sematext.com>
wrote:

> Do you know in advance (when sending the first message) how many messages
> that batch is going to have?
>
>
> Marko Bonaći
> Monitoring | Alerting | Anomaly Detection | Centralized Log Management
> Solr & Elasticsearch Support
> Sematext <http://sematext.com/> | Contact
> <http://sematext.com/about/contact.html>
>
> On Sat, Dec 3, 2016 at 1:01 AM, Ali Akhtar <ali.rac...@gmail.com> wrote:
>
> > Hey Apurva,
> >
> > I am including the batch_id inside the messages.
> >
> > Could you give me an example of what you mean by custom control messages
> > with a control topic please?
> >
> >
> >
> > On Sat, Dec 3, 2016 at 12:35 AM, Apurva Mehta <apu...@confluent.io>
> wrote:
> >
> > > That should work, though it sounds like you may be interested in :
> > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > 98+-+Exactly+Once+Delivery+and+Transactional+Messaging
> > >
> > > If you can include the 'batch_id' inside your messages, and define
> custom
> > > control messages with a control topic, then you would not need one
> topic
> > > per batch, and you would be very close to the essence of the above
> > > proposal.
> > >
> > > Thanks,
> > > Apurva
> > >
> > > On Fri, Dec 2, 2016 at 5:02 AM, Ali Akhtar <ali.rac...@gmail.com>
> wrote:
> > >
> > > > Heya,
> > > >
> > > > I need to send a group of messages, which are all related, and then
> > > process
> > > > those messages, only when all of them have arrived.
> > > >
> > > > Here is how I'm planning to do this. Is this the right way, and can
> any
> > > > improvements be made to this?
> > > >
> > > > 1) Send a message to a topic called batch_start, with a batch id
> (which
> > > > will be a UUID)
> > > >
> > > > 2) Post the messages to a topic called batch_msgs_. Here
> > > batch_id
> > > > will be the batch id sent in batch_start.
> > > >
> > > > The number of messages sent will be recorded by the producer.
> > > >
> > > > 3) Send a message to batch_end with the batch id and the number of
> sent
> > > > messages.
> > > >
> > > > 4) On the consumer side, using Kafka Streaming, I would listen to
> > > > batch_end.
> > > >
> > > > 5) When the message there arrives, I will start another instance of
> > Kafka
> > > > Streaming, which will process the messages in batch_msgs_
> > > >
> > > > 6) Perhaps to be extra safe, whenever batch_end arrives, I will
> start a
> > > > throwaway consumer which will just count the number of messages in
> > > > batch_msgs_. If these don't match the # of messages
> specified
> > > in
> > > > the batch_end message, then it will assume that the batch hasn't yet
> > > > finished arriving, and it will wait for some time before retrying.
> Once
> > > the
> > > > correct # of messages have arrived, THEN it will trigger step 5
> above.
> > > >
> > > > Will the above method work, or should I make any changes to it?
> > > >
> > > > Is step 6 necessary?
> > > >
> > >
> >
>


Re: Adding topics to KafkaStreams after ingestion has been started?

2016-12-03 Thread Ali Akhtar
I suppose the topic won't be deleted, but this would be a rare enough
occurrence that there won't be too many dormant topics hanging around.

Alternatively perhaps I can store the undeleted topics somewhere, and
whenever a new node starts, it could check this list and delete them.

On Sat, Dec 3, 2016 at 3:23 PM, Matthias J. Sax <matth...@confluent.io>
wrote:

> Not sure. Would need to think about it more. However, default commit
> interval in streams is 30 sec. You can configure is via StreamConfig
> COMMIT_INTERVAL_MS. So using the additional thread and waiting for 5
> minutes sounds ok. Question is, what would happen if the JVM goes down
> before you delete the topic.
>
>
> -Matthias
>
> On 12/3/16 2:07 AM, Ali Akhtar wrote:
> > Is there a way to make sure the offsets got committed? Perhaps, after the
> > last msg has been consumed, I can setup a task to run after a safe time
> > (say 5 mins? ) in another thread which would delete the topic? What would
> > be a safe time to use?
> >
> > On Sat, Dec 3, 2016 at 3:04 PM, Matthias J. Sax <matth...@confluent.io>
> > wrote:
> >
> >> I guess yes. You might only want to make sure the topic offsets got
> >> committed -- not sure if committing offsets of a deleted topic could
> >> cause issue (ie, crashing you Streams app)
> >>
> >> -Matthias
> >>
> >> On 12/2/16 11:04 PM, Ali Akhtar wrote:
> >>> Thank you very much. Last q - Is it safe to do this from within a call
> >> back
> >>> processing that topic ,  once it reaches the last message? (It keeps a
> >>> count of how many messages processed vs how many remaining)
> >>>
> >>> On 3 Dec 2016 11:36 a.m., "Matthias J. Sax" <matth...@confluent.io>
> >> wrote:
> >>>
> >>>> You can use TopicCommand to delete a topic within Java:
> >>>>
> >>>>> final TopicCommand.TopicCommandOptions commandOptions = new
> >>>> TopicCommand.TopicCommandOptions(new String[]{
> >>>>> "--zookeeper", "zookeperHost:2181",
> >>>>> "--delete",
> >>>>> "--topic", "TOPIC-TO-BE-DELETED"});
> >>>>> TopicCommand.deleteTopic(zkUtils, commandOptions);
> >>>>
> >>>> So you can delete a topic within your Streams app.
> >>>>
> >>>> -Matthias
> >>>>
> >>>>
> >>>>
> >>>> On 12/2/16 9:25 PM, Ali Akhtar wrote:
> >>>>> Is there a way to delete the processed topics via streams or the java
> >>>>> driver? Or only thru the bash script?
> >>>>>
> >>>>> On 3 Dec 2016 5:27 a.m., "Matthias J. Sax" <matth...@confluent.io>
> >>>> wrote:
> >>>>>
> >>>>>> If you keep old topics that are completely processed, there would be
> >>>>>> increasing overhead, because Streams would try to read from those
> >> topics
> >>>>>> as long as they exist. Thus, more fetch request will be sent to
> those
> >>>>>> more topics over time, while most fetch request will return without
> >> any
> >>>>>> new data (as those old topic do not have new data)
> >>>>>>
> >>>>>> If you delete completely processed topics, there will be no
> overhead.
> >>>>>>
> >>>>>> -Matthias
> >>>>>>
> >>>>>> On 12/2/16 3:58 PM, Ali Akhtar wrote:
> >>>>>>> Hey Matthias,
> >>>>>>>
> >>>>>>> So I have a scenario where I need to batch a group of messages
> >>>> together.
> >>>>>>>
> >>>>>>> I'm considering creating a new topic for each batch that arrives,
> i.e
> >>>>>>> batch_.
> >>>>>>>
> >>>>>>> Each batch_ topic will have a finite number of messages, and
> then
> >>>> it
> >>>>>>> will remain empty. Essentially these will be throwaway topics.
> >>>>>>>
> >>>>>>> Is there any overhead to there being a lot of these topics, and
> >> having
> >>>> a
> >>>>>>> listener for batch_.* , or is this effectively like having one
> >> listener
> >>>>>> for
> >>>>>>> one topic?

Re: Adding topics to KafkaStreams after ingestion has been started?

2016-12-03 Thread Ali Akhtar
Is there a way to make sure the offsets got committed? Perhaps, after the
last msg has been consumed, I can setup a task to run after a safe time
(say 5 mins? ) in another thread which would delete the topic? What would
be a safe time to use?

On Sat, Dec 3, 2016 at 3:04 PM, Matthias J. Sax <matth...@confluent.io>
wrote:

> I guess yes. You might only want to make sure the topic offsets got
> committed -- not sure if committing offsets of a deleted topic could
> cause issue (ie, crashing you Streams app)
>
> -Matthias
>
> On 12/2/16 11:04 PM, Ali Akhtar wrote:
> > Thank you very much. Last q - Is it safe to do this from within a call
> back
> > processing that topic ,  once it reaches the last message? (It keeps a
> > count of how many messages processed vs how many remaining)
> >
> > On 3 Dec 2016 11:36 a.m., "Matthias J. Sax" <matth...@confluent.io>
> wrote:
> >
> >> You can use TopicCommand to delete a topic within Java:
> >>
> >>> final TopicCommand.TopicCommandOptions commandOptions = new
> >> TopicCommand.TopicCommandOptions(new String[]{
> >>> "--zookeeper", "zookeperHost:2181",
> >>> "--delete",
> >>> "--topic", "TOPIC-TO-BE-DELETED"});
> >>> TopicCommand.deleteTopic(zkUtils, commandOptions);
> >>
> >> So you can delete a topic within your Streams app.
> >>
> >> -Matthias
> >>
> >>
> >>
> >> On 12/2/16 9:25 PM, Ali Akhtar wrote:
> >>> Is there a way to delete the processed topics via streams or the java
> >>> driver? Or only thru the bash script?
> >>>
> >>> On 3 Dec 2016 5:27 a.m., "Matthias J. Sax" <matth...@confluent.io>
> >> wrote:
> >>>
> >>>> If you keep old topics that are completely processed, there would be
> >>>> increasing overhead, because Streams would try to read from those
> topics
> >>>> as long as they exist. Thus, more fetch request will be sent to those
> >>>> more topics over time, while most fetch request will return without
> any
> >>>> new data (as those old topic do not have new data)
> >>>>
> >>>> If you delete completely processed topics, there will be no overhead.
> >>>>
> >>>> -Matthias
> >>>>
> >>>> On 12/2/16 3:58 PM, Ali Akhtar wrote:
> >>>>> Hey Matthias,
> >>>>>
> >>>>> So I have a scenario where I need to batch a group of messages
> >> together.
> >>>>>
> >>>>> I'm considering creating a new topic for each batch that arrives, i.e
> >>>>> batch_.
> >>>>>
> >>>>> Each batch_ topic will have a finite number of messages, and then
> >> it
> >>>>> will remain empty. Essentially these will be throwaway topics.
> >>>>>
> >>>>> Is there any overhead to there being a lot of these topics, and
> having
> >> a
> >>>>> listener for batch_.* , or is this effectively like having one
> listener
> >>>> for
> >>>>> one topic?
> >>>>>
> >>>>> On Fri, Dec 2, 2016 at 11:09 PM, Matthias J. Sax <
> >> matth...@confluent.io>
> >>>>> wrote:
> >>>>>
> >>>>>> 1) There will be once consumer per thread. The number of thread is
> >>>>>> defined by the number of instances you start and how many threads
> you
> >>>>>> configure for each instance via StreamConfig parameter
> >>>>>> NUM_STREAM_THREADS_CONFIG. Thus, you control this completely by
> >>>> yourself.
> >>>>>>
> >>>>>> Depending on the number to partitions in your topics, each thread
> will
> >>>>>> process one or multiple partitions. As a partition will be processed
> >> by
> >>>>>> exactly one thread, the overall number of partitions over all you
> >> input
> >>>>>> topics limits your max number of thread (if you have more threads,
> >> those
> >>>>>> will just be idle)
> >>>>>>
> >>>>>> 2) Thus, there should be no performance issues. Furthermore, if you
> >>>>>> create new topic while you application is running -- and if this
> might
> >>>>>> overload you current application -- yo

Re: Adding topics to KafkaStreams after ingestion has been started?

2016-12-02 Thread Ali Akhtar
Thank you very much. Last q - Is it safe to do this from within a call back
processing that topic ,  once it reaches the last message? (It keeps a
count of how many messages processed vs how many remaining)

On 3 Dec 2016 11:36 a.m., "Matthias J. Sax" <matth...@confluent.io> wrote:

> You can use TopicCommand to delete a topic within Java:
>
> > final TopicCommand.TopicCommandOptions commandOptions = new
> TopicCommand.TopicCommandOptions(new String[]{
> > "--zookeeper", "zookeperHost:2181",
> > "--delete",
> > "--topic", "TOPIC-TO-BE-DELETED"});
> > TopicCommand.deleteTopic(zkUtils, commandOptions);
>
> So you can delete a topic within your Streams app.
>
> -Matthias
>
>
>
> On 12/2/16 9:25 PM, Ali Akhtar wrote:
> > Is there a way to delete the processed topics via streams or the java
> > driver? Or only thru the bash script?
> >
> > On 3 Dec 2016 5:27 a.m., "Matthias J. Sax" <matth...@confluent.io>
> wrote:
> >
> >> If you keep old topics that are completely processed, there would be
> >> increasing overhead, because Streams would try to read from those topics
> >> as long as they exist. Thus, more fetch request will be sent to those
> >> more topics over time, while most fetch request will return without any
> >> new data (as those old topic do not have new data)
> >>
> >> If you delete completely processed topics, there will be no overhead.
> >>
> >> -Matthias
> >>
> >> On 12/2/16 3:58 PM, Ali Akhtar wrote:
> >>> Hey Matthias,
> >>>
> >>> So I have a scenario where I need to batch a group of messages
> together.
> >>>
> >>> I'm considering creating a new topic for each batch that arrives, i.e
> >>> batch_.
> >>>
> >>> Each batch_ topic will have a finite number of messages, and then
> it
> >>> will remain empty. Essentially these will be throwaway topics.
> >>>
> >>> Is there any overhead to there being a lot of these topics, and having
> a
> >>> listener for batch_.* , or is this effectively like having one listener
> >> for
> >>> one topic?
> >>>
> >>> On Fri, Dec 2, 2016 at 11:09 PM, Matthias J. Sax <
> matth...@confluent.io>
> >>> wrote:
> >>>
> >>>> 1) There will be once consumer per thread. The number of thread is
> >>>> defined by the number of instances you start and how many threads you
> >>>> configure for each instance via StreamConfig parameter
> >>>> NUM_STREAM_THREADS_CONFIG. Thus, you control this completely by
> >> yourself.
> >>>>
> >>>> Depending on the number to partitions in your topics, each thread will
> >>>> process one or multiple partitions. As a partition will be processed
> by
> >>>> exactly one thread, the overall number of partitions over all you
> input
> >>>> topics limits your max number of thread (if you have more threads,
> those
> >>>> will just be idle)
> >>>>
> >>>> 2) Thus, there should be no performance issues. Furthermore, if you
> >>>> create new topic while you application is running -- and if this might
> >>>> overload you current application -- you can always start new instances
> >>>> an scale-out you application dynamically -- Kafka Streams is fully
> >> elastic.
> >>>>
> >>>> Have a look here for more details:
> >>>> http://docs.confluent.io/current/streams/architecture.html
> >>>>
> >>>>
> >>>> -Matthias
> >>>>
> >>>> On 12/2/16 6:23 AM, Ali Akhtar wrote:
> >>>>> That's pretty useful to know - thanks.
> >>>>>
> >>>>> 1) If I listened too foo-.*, and there were 5 foo topics created
> after
> >>>>> kafka streaming was running: foo1, foo2, foo3, foo4, foo5, will this
> >>>> create
> >>>>> 5 consumers / threads / instances, or will it be just 1 instance that
> >>>>> receives the messages for all of those topics?
> >>>>>
> >>>>> 2) Will this cause issues performance issues if i had a lot of
> >> throwaway
> >>>>> foo topics being created, or will this scale?
> >>>>>
> >>>>> On Fri, Dec 2, 2016 at 7:17 PM, Damian Guy <damian@gmail.com>
> >> wrote:
> >>>>>
> >>>>>> Hi Ali,
> >>>>>>
> >>>>>> The only way KafkaStreams will process new topics after start is if
> >> the
> >>>>>> original stream was defined with a regular expression, i.e,
> >>>>>> kafka.stream(Pattern.compile("foo-.*");
> >>>>>>
> >>>>>> If any new topics are added after start that match the pattern, then
> >>>> they
> >>>>>> will also be consumed.
> >>>>>>
> >>>>>> Thanks,
> >>>>>> Damian
> >>>>>>
> >>>>>> On Fri, 2 Dec 2016 at 13:13 Ali Akhtar <ali.rac...@gmail.com>
> wrote:
> >>>>>>
> >>>>>>> Heya,
> >>>>>>>
> >>>>>>> Normally, you add your topics and their callbacks to a
> StreamBuilder,
> >>>> and
> >>>>>>> then call KafkaStreams.start() to start ingesting those topics.
> >>>>>>>
> >>>>>>> Is it possible to add a new topic to the StreamBuilder, and start
> >>>>>> ingesting
> >>>>>>> that as well, after KafkaStreams.start() has been called?
> >>>>>>>
> >>>>>>> Thanks.
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>>
> >>>
> >>
> >>
> >
>
>


Re: Adding topics to KafkaStreams after ingestion has been started?

2016-12-02 Thread Ali Akhtar
Is there a way to delete the processed topics via streams or the java
driver? Or only thru the bash script?

On 3 Dec 2016 5:27 a.m., "Matthias J. Sax" <matth...@confluent.io> wrote:

> If you keep old topics that are completely processed, there would be
> increasing overhead, because Streams would try to read from those topics
> as long as they exist. Thus, more fetch request will be sent to those
> more topics over time, while most fetch request will return without any
> new data (as those old topic do not have new data)
>
> If you delete completely processed topics, there will be no overhead.
>
> -Matthias
>
> On 12/2/16 3:58 PM, Ali Akhtar wrote:
> > Hey Matthias,
> >
> > So I have a scenario where I need to batch a group of messages together.
> >
> > I'm considering creating a new topic for each batch that arrives, i.e
> > batch_.
> >
> > Each batch_ topic will have a finite number of messages, and then it
> > will remain empty. Essentially these will be throwaway topics.
> >
> > Is there any overhead to there being a lot of these topics, and having a
> > listener for batch_.* , or is this effectively like having one listener
> for
> > one topic?
> >
> > On Fri, Dec 2, 2016 at 11:09 PM, Matthias J. Sax <matth...@confluent.io>
> > wrote:
> >
> >> 1) There will be once consumer per thread. The number of thread is
> >> defined by the number of instances you start and how many threads you
> >> configure for each instance via StreamConfig parameter
> >> NUM_STREAM_THREADS_CONFIG. Thus, you control this completely by
> yourself.
> >>
> >> Depending on the number to partitions in your topics, each thread will
> >> process one or multiple partitions. As a partition will be processed by
> >> exactly one thread, the overall number of partitions over all you input
> >> topics limits your max number of thread (if you have more threads, those
> >> will just be idle)
> >>
> >> 2) Thus, there should be no performance issues. Furthermore, if you
> >> create new topic while you application is running -- and if this might
> >> overload you current application -- you can always start new instances
> >> an scale-out you application dynamically -- Kafka Streams is fully
> elastic.
> >>
> >> Have a look here for more details:
> >> http://docs.confluent.io/current/streams/architecture.html
> >>
> >>
> >> -Matthias
> >>
> >> On 12/2/16 6:23 AM, Ali Akhtar wrote:
> >>> That's pretty useful to know - thanks.
> >>>
> >>> 1) If I listened too foo-.*, and there were 5 foo topics created after
> >>> kafka streaming was running: foo1, foo2, foo3, foo4, foo5, will this
> >> create
> >>> 5 consumers / threads / instances, or will it be just 1 instance that
> >>> receives the messages for all of those topics?
> >>>
> >>> 2) Will this cause issues performance issues if i had a lot of
> throwaway
> >>> foo topics being created, or will this scale?
> >>>
> >>> On Fri, Dec 2, 2016 at 7:17 PM, Damian Guy <damian@gmail.com>
> wrote:
> >>>
> >>>> Hi Ali,
> >>>>
> >>>> The only way KafkaStreams will process new topics after start is if
> the
> >>>> original stream was defined with a regular expression, i.e,
> >>>> kafka.stream(Pattern.compile("foo-.*");
> >>>>
> >>>> If any new topics are added after start that match the pattern, then
> >> they
> >>>> will also be consumed.
> >>>>
> >>>> Thanks,
> >>>> Damian
> >>>>
> >>>> On Fri, 2 Dec 2016 at 13:13 Ali Akhtar <ali.rac...@gmail.com> wrote:
> >>>>
> >>>>> Heya,
> >>>>>
> >>>>> Normally, you add your topics and their callbacks to a StreamBuilder,
> >> and
> >>>>> then call KafkaStreams.start() to start ingesting those topics.
> >>>>>
> >>>>> Is it possible to add a new topic to the StreamBuilder, and start
> >>>> ingesting
> >>>>> that as well, after KafkaStreams.start() has been called?
> >>>>>
> >>>>> Thanks.
> >>>>>
> >>>>
> >>>
> >>
> >>
> >
>
>


Re: Tracking when a batch of messages has arrived?

2016-12-02 Thread Ali Akhtar
Hey Apurva,

I am including the batch_id inside the messages.

Could you give me an example of what you mean by custom control messages
with a control topic please?



On Sat, Dec 3, 2016 at 12:35 AM, Apurva Mehta <apu...@confluent.io> wrote:

> That should work, though it sounds like you may be interested in :
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 98+-+Exactly+Once+Delivery+and+Transactional+Messaging
>
> If you can include the 'batch_id' inside your messages, and define custom
> control messages with a control topic, then you would not need one topic
> per batch, and you would be very close to the essence of the above
> proposal.
>
> Thanks,
> Apurva
>
> On Fri, Dec 2, 2016 at 5:02 AM, Ali Akhtar <ali.rac...@gmail.com> wrote:
>
> > Heya,
> >
> > I need to send a group of messages, which are all related, and then
> process
> > those messages, only when all of them have arrived.
> >
> > Here is how I'm planning to do this. Is this the right way, and can any
> > improvements be made to this?
> >
> > 1) Send a message to a topic called batch_start, with a batch id (which
> > will be a UUID)
> >
> > 2) Post the messages to a topic called batch_msgs_. Here
> batch_id
> > will be the batch id sent in batch_start.
> >
> > The number of messages sent will be recorded by the producer.
> >
> > 3) Send a message to batch_end with the batch id and the number of sent
> > messages.
> >
> > 4) On the consumer side, using Kafka Streaming, I would listen to
> > batch_end.
> >
> > 5) When the message there arrives, I will start another instance of Kafka
> > Streaming, which will process the messages in batch_msgs_
> >
> > 6) Perhaps to be extra safe, whenever batch_end arrives, I will start a
> > throwaway consumer which will just count the number of messages in
> > batch_msgs_. If these don't match the # of messages specified
> in
> > the batch_end message, then it will assume that the batch hasn't yet
> > finished arriving, and it will wait for some time before retrying. Once
> the
> > correct # of messages have arrived, THEN it will trigger step 5 above.
> >
> > Will the above method work, or should I make any changes to it?
> >
> > Is step 6 necessary?
> >
>


Re: Adding topics to KafkaStreams after ingestion has been started?

2016-12-02 Thread Ali Akhtar
Hey Matthias,

So I have a scenario where I need to batch a group of messages together.

I'm considering creating a new topic for each batch that arrives, i.e
batch_.

Each batch_ topic will have a finite number of messages, and then it
will remain empty. Essentially these will be throwaway topics.

Is there any overhead to there being a lot of these topics, and having a
listener for batch_.* , or is this effectively like having one listener for
one topic?

On Fri, Dec 2, 2016 at 11:09 PM, Matthias J. Sax <matth...@confluent.io>
wrote:

> 1) There will be once consumer per thread. The number of thread is
> defined by the number of instances you start and how many threads you
> configure for each instance via StreamConfig parameter
> NUM_STREAM_THREADS_CONFIG. Thus, you control this completely by yourself.
>
> Depending on the number to partitions in your topics, each thread will
> process one or multiple partitions. As a partition will be processed by
> exactly one thread, the overall number of partitions over all you input
> topics limits your max number of thread (if you have more threads, those
> will just be idle)
>
> 2) Thus, there should be no performance issues. Furthermore, if you
> create new topic while you application is running -- and if this might
> overload you current application -- you can always start new instances
> an scale-out you application dynamically -- Kafka Streams is fully elastic.
>
> Have a look here for more details:
> http://docs.confluent.io/current/streams/architecture.html
>
>
> -Matthias
>
> On 12/2/16 6:23 AM, Ali Akhtar wrote:
> > That's pretty useful to know - thanks.
> >
> > 1) If I listened too foo-.*, and there were 5 foo topics created after
> > kafka streaming was running: foo1, foo2, foo3, foo4, foo5, will this
> create
> > 5 consumers / threads / instances, or will it be just 1 instance that
> > receives the messages for all of those topics?
> >
> > 2) Will this cause issues performance issues if i had a lot of throwaway
> > foo topics being created, or will this scale?
> >
> > On Fri, Dec 2, 2016 at 7:17 PM, Damian Guy <damian@gmail.com> wrote:
> >
> >> Hi Ali,
> >>
> >> The only way KafkaStreams will process new topics after start is if the
> >> original stream was defined with a regular expression, i.e,
> >> kafka.stream(Pattern.compile("foo-.*");
> >>
> >> If any new topics are added after start that match the pattern, then
> they
> >> will also be consumed.
> >>
> >> Thanks,
> >> Damian
> >>
> >> On Fri, 2 Dec 2016 at 13:13 Ali Akhtar <ali.rac...@gmail.com> wrote:
> >>
> >>> Heya,
> >>>
> >>> Normally, you add your topics and their callbacks to a StreamBuilder,
> and
> >>> then call KafkaStreams.start() to start ingesting those topics.
> >>>
> >>> Is it possible to add a new topic to the StreamBuilder, and start
> >> ingesting
> >>> that as well, after KafkaStreams.start() has been called?
> >>>
> >>> Thanks.
> >>>
> >>
> >
>
>


Re: Adding topics to KafkaStreams after ingestion has been started?

2016-12-02 Thread Ali Akhtar
That's pretty useful to know - thanks.

1) If I listened too foo-.*, and there were 5 foo topics created after
kafka streaming was running: foo1, foo2, foo3, foo4, foo5, will this create
5 consumers / threads / instances, or will it be just 1 instance that
receives the messages for all of those topics?

2) Will this cause issues performance issues if i had a lot of throwaway
foo topics being created, or will this scale?

On Fri, Dec 2, 2016 at 7:17 PM, Damian Guy <damian@gmail.com> wrote:

> Hi Ali,
>
> The only way KafkaStreams will process new topics after start is if the
> original stream was defined with a regular expression, i.e,
> kafka.stream(Pattern.compile("foo-.*");
>
> If any new topics are added after start that match the pattern, then they
> will also be consumed.
>
> Thanks,
> Damian
>
> On Fri, 2 Dec 2016 at 13:13 Ali Akhtar <ali.rac...@gmail.com> wrote:
>
> > Heya,
> >
> > Normally, you add your topics and their callbacks to a StreamBuilder, and
> > then call KafkaStreams.start() to start ingesting those topics.
> >
> > Is it possible to add a new topic to the StreamBuilder, and start
> ingesting
> > that as well, after KafkaStreams.start() has been called?
> >
> > Thanks.
> >
>


Adding topics to KafkaStreams after ingestion has been started?

2016-12-02 Thread Ali Akhtar
Heya,

Normally, you add your topics and their callbacks to a StreamBuilder, and
then call KafkaStreams.start() to start ingesting those topics.

Is it possible to add a new topic to the StreamBuilder, and start ingesting
that as well, after KafkaStreams.start() has been called?

Thanks.


Tracking when a batch of messages has arrived?

2016-12-02 Thread Ali Akhtar
Heya,

I need to send a group of messages, which are all related, and then process
those messages, only when all of them have arrived.

Here is how I'm planning to do this. Is this the right way, and can any
improvements be made to this?

1) Send a message to a topic called batch_start, with a batch id (which
will be a UUID)

2) Post the messages to a topic called batch_msgs_. Here batch_id
will be the batch id sent in batch_start.

The number of messages sent will be recorded by the producer.

3) Send a message to batch_end with the batch id and the number of sent
messages.

4) On the consumer side, using Kafka Streaming, I would listen to batch_end.

5) When the message there arrives, I will start another instance of Kafka
Streaming, which will process the messages in batch_msgs_

6) Perhaps to be extra safe, whenever batch_end arrives, I will start a
throwaway consumer which will just count the number of messages in
batch_msgs_. If these don't match the # of messages specified in
the batch_end message, then it will assume that the batch hasn't yet
finished arriving, and it will wait for some time before retrying. Once the
correct # of messages have arrived, THEN it will trigger step 5 above.

Will the above method work, or should I make any changes to it?

Is step 6 necessary?


Re: Message order different each time stream is replayed?

2016-11-30 Thread Ali Akhtar
Ah, That explains it. I have 3 partitions.

On 1 Dec 2016 5:42 a.m., "Apurva Mehta" <apu...@confluent.io> wrote:

How many partitions do you have in that topic. Kafka only guarantees a
total ordering of messages within a partition, not across partitions of a
topic. If you want total ordering over the entire topic, you need to create
a topic with a single partition.

On Wed, Nov 30, 2016 at 4:10 AM, Ali Akhtar <ali.rac...@gmail.com> wrote:

> While I was connected to console-consumer.sh, I posted a few messages to a
> Kafka topic, one message at a time, across a few hours.
>
> I'd post a message, see it arrive in console-consumer, a few mins later
I'd
> post the next message, and so on.
>
> They all arrived in order.
>
> However, when I now try to view console-consumer.sh --from-beginning, each
> time I do so, the messages seem to be shown in a shuffled order.
>
> Also the last 2 messages appear to be missing, although its possible I
> didn't see them in the shuffled output (the topic has over 10k messages
> total which are all dumped to console when I view).
>
> Is this just due to network lag when viewing console-consumer, or should I
> expect my actual consumers to also receive messages out of order each time
> they replay a topic?
>


Message order different each time stream is replayed?

2016-11-30 Thread Ali Akhtar
While I was connected to console-consumer.sh, I posted a few messages to a
Kafka topic, one message at a time, across a few hours.

I'd post a message, see it arrive in console-consumer, a few mins later I'd
post the next message, and so on.

They all arrived in order.

However, when I now try to view console-consumer.sh --from-beginning, each
time I do so, the messages seem to be shown in a shuffled order.

Also the last 2 messages appear to be missing, although its possible I
didn't see them in the shuffled output (the topic has over 10k messages
total which are all dumped to console when I view).

Is this just due to network lag when viewing console-consumer, or should I
expect my actual consumers to also receive messages out of order each time
they replay a topic?


Re: Unit tests (in Java) take a *very* long time to 'clean up'?

2016-11-11 Thread Ali Akhtar
Eno,

I tried the following, but i haven't seen any improvement. In fact it seems
to run longer. Are you sure Kafka / ZK are run as threads and not as
process:


Set threadSet = Thread.getAllStackTraces().keySet();
threadSet.forEach(t ->
{
if (t == Thread.currentThread())
return;

t.stop();
});


On Fri, Nov 11, 2016 at 9:52 PM, Ali Akhtar <ali.rac...@gmail.com> wrote:

> Oh, so it seems like there's no easy way to just Thread.stop() without
> changing the internal kafka / zk code? :(
>
> Perhaps its possible to start kafka / zk within another thread, and then
> kill the wrapper thread. Will that stop the children threads, if the
> wrapper thread is killed?
>
> Hmm, or may be a an Executor which is shutdown, and which force shuts down
> the children threads?
>
>
>
> On Fri, Nov 11, 2016 at 9:46 PM, Eno Thereska <eno.there...@gmail.com>
> wrote:
>
>> It's the org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster
>> (start ZK and calls org.apache.kafka.streams.integration.utils.KafkaEmbedded
>> to start Kafka).
>> So these are embedded in the sense that it's not another process, just
>> threads within the main streams test process.
>>
>> Thanks
>> Eno
>>
>> > On 11 Nov 2016, at 16:26, Ali Akhtar <ali.rac...@gmail.com> wrote:
>> >
>> > Hey Eno,
>> >
>> > Thanks for the quick reply.
>> >
>> > In the meantime, is it possible to just send a sigterm / kill -9 which
>> just
>> > kills the zookeeper + kafka? I can figure out how to do it if you can
>> point
>> > out which class / method creates the processes / threads.
>> >
>> > Thanks.
>> >
>> > On Fri, Nov 11, 2016 at 9:24 PM, Eno Thereska <eno.there...@gmail.com>
>> > wrote:
>> >
>> >> Hi Ali,
>> >>
>> >> You're right, shutting down the broker and ZK is expensive. We kept the
>> >> number of integration tests relatively small (and pushed some more
>> tests as
>> >> system tests, while doing as many as possible as unit tests). It's not
>> just
>> >> the shutdown that's expensive, it's also the starting up unfortunately.
>> >> It's on our todo list to do something about this, but we haven't gotten
>> >> there yet. If someone from the community wants to have a look and help
>> out,
>> >> that'd be great (with a JIRA and PR).
>> >>
>> >> About the second problem with ZK logs, this is being worked on as part
>> of
>> >> removing the ZK dependency from streams and should be merged shortly:
>> >> https://github.com/apache/kafka/pull/1884 <https://github.com/apache/
>> >> kafka/pull/1884>. The msg you see does not affect correctness, it's
>> just
>> >> annoying and it will go away.
>> >>
>> >> Thanks,
>> >> Eno
>> >>
>> >>
>> >>> On 11 Nov 2016, at 14:28, Ali Akhtar <ali.rac...@gmail.com> wrote:
>> >>>
>> >>> I have some unit tests in which I create an embedded single broker
>> kafka
>> >>> cluster, using :
>> >>>
>> >>> EmbeddedSingleNodeKafkaCluster.java from
>> >>> https://github.com/confluentinc/examples/blob/
>> >> master/kafka-streams/src/test/java/io/confluent/examples/str
>> eams/kafka/
>> >> EmbeddedSingleNodeKafkaCluster.java
>> >>>
>> >>> That class also creates an embedded zookeeper cluster / instance.
>> >>>
>> >>> The problem is, while the tests run pretty fast and pass, they then
>> stay
>> >>> stuck in the 'teardown / clean up' stage for a really long time, often
>> >> upto
>> >>> 10-20
>> >>> seconds per test.
>> >>>
>> >>> As I have a lot of test classes, each class creating its own embedded
>> >> kafka
>> >>> cluster, this time can really add up during compiles.
>> >>>
>> >>> Is it possible to get these test classes to not do any clean up /
>> safety
>> >>> stuff, because the instances are just throwaway. Just have them kill
>> -9
>> >> the
>> >>> kafka / zookeeper and exit?
>> >>>
>> >>> It doesn't make any sense that tests pass within seconds, but can't
>> move
>> >> on
>> >>> to the next test class because its cleaning up.
>> >>>
>> >>> I also have an embedded cassandra instance in these tests, but I don't
>> >>> think that one is the problem, as i see a lot of zookeeper logs such
>> as
>> >>> these after the test runs:
>> >>>
>> >>> 133764 [main-SendThread(127.0.0.1:38846)] WARN
>> >>> org.apache.zookeeper.ClientCnxn  - Session 0x15853c3497f0001 for
>> server
>> >>> null, unexpected error, closing socket connection and attempting
>> >> reconnect
>> >>> java.net.ConnectException: Connection refused
>> >>> at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
>> >>> at sun.nio.ch.SocketChannelImpl.finishConnect(
>> >> SocketChannelImpl.java:717)
>> >>> at
>> >>> org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(
>> >> ClientCnxnSocketNIO.java:361)
>> >>> at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.
>> java:1081)
>> >>>
>> >>>
>> >>> Could it be that zookeeper doesn't exit and keeps retrying to connect?
>> >>
>> >>
>>
>>
>


Re: Unit tests (in Java) take a *very* long time to 'clean up'?

2016-11-11 Thread Ali Akhtar
Oh, so it seems like there's no easy way to just Thread.stop() without
changing the internal kafka / zk code? :(

Perhaps its possible to start kafka / zk within another thread, and then
kill the wrapper thread. Will that stop the children threads, if the
wrapper thread is killed?

Hmm, or may be a an Executor which is shutdown, and which force shuts down
the children threads?



On Fri, Nov 11, 2016 at 9:46 PM, Eno Thereska <eno.there...@gmail.com>
wrote:

> It's the org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster
> (start ZK and calls org.apache.kafka.streams.integration.utils.KafkaEmbedded
> to start Kafka).
> So these are embedded in the sense that it's not another process, just
> threads within the main streams test process.
>
> Thanks
> Eno
>
> > On 11 Nov 2016, at 16:26, Ali Akhtar <ali.rac...@gmail.com> wrote:
> >
> > Hey Eno,
> >
> > Thanks for the quick reply.
> >
> > In the meantime, is it possible to just send a sigterm / kill -9 which
> just
> > kills the zookeeper + kafka? I can figure out how to do it if you can
> point
> > out which class / method creates the processes / threads.
> >
> > Thanks.
> >
> > On Fri, Nov 11, 2016 at 9:24 PM, Eno Thereska <eno.there...@gmail.com>
> > wrote:
> >
> >> Hi Ali,
> >>
> >> You're right, shutting down the broker and ZK is expensive. We kept the
> >> number of integration tests relatively small (and pushed some more
> tests as
> >> system tests, while doing as many as possible as unit tests). It's not
> just
> >> the shutdown that's expensive, it's also the starting up unfortunately.
> >> It's on our todo list to do something about this, but we haven't gotten
> >> there yet. If someone from the community wants to have a look and help
> out,
> >> that'd be great (with a JIRA and PR).
> >>
> >> About the second problem with ZK logs, this is being worked on as part
> of
> >> removing the ZK dependency from streams and should be merged shortly:
> >> https://github.com/apache/kafka/pull/1884 <https://github.com/apache/
> >> kafka/pull/1884>. The msg you see does not affect correctness, it's just
> >> annoying and it will go away.
> >>
> >> Thanks,
> >> Eno
> >>
> >>
> >>> On 11 Nov 2016, at 14:28, Ali Akhtar <ali.rac...@gmail.com> wrote:
> >>>
> >>> I have some unit tests in which I create an embedded single broker
> kafka
> >>> cluster, using :
> >>>
> >>> EmbeddedSingleNodeKafkaCluster.java from
> >>> https://github.com/confluentinc/examples/blob/
> >> master/kafka-streams/src/test/java/io/confluent/examples/streams/kafka/
> >> EmbeddedSingleNodeKafkaCluster.java
> >>>
> >>> That class also creates an embedded zookeeper cluster / instance.
> >>>
> >>> The problem is, while the tests run pretty fast and pass, they then
> stay
> >>> stuck in the 'teardown / clean up' stage for a really long time, often
> >> upto
> >>> 10-20
> >>> seconds per test.
> >>>
> >>> As I have a lot of test classes, each class creating its own embedded
> >> kafka
> >>> cluster, this time can really add up during compiles.
> >>>
> >>> Is it possible to get these test classes to not do any clean up /
> safety
> >>> stuff, because the instances are just throwaway. Just have them kill -9
> >> the
> >>> kafka / zookeeper and exit?
> >>>
> >>> It doesn't make any sense that tests pass within seconds, but can't
> move
> >> on
> >>> to the next test class because its cleaning up.
> >>>
> >>> I also have an embedded cassandra instance in these tests, but I don't
> >>> think that one is the problem, as i see a lot of zookeeper logs such as
> >>> these after the test runs:
> >>>
> >>> 133764 [main-SendThread(127.0.0.1:38846)] WARN
> >>> org.apache.zookeeper.ClientCnxn  - Session 0x15853c3497f0001 for
> server
> >>> null, unexpected error, closing socket connection and attempting
> >> reconnect
> >>> java.net.ConnectException: Connection refused
> >>> at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
> >>> at sun.nio.ch.SocketChannelImpl.finishConnect(
> >> SocketChannelImpl.java:717)
> >>> at
> >>> org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(
> >> ClientCnxnSocketNIO.java:361)
> >>> at org.apache.zookeeper.ClientCnxn$SendThread.run(
> ClientCnxn.java:1081)
> >>>
> >>>
> >>> Could it be that zookeeper doesn't exit and keeps retrying to connect?
> >>
> >>
>
>


Re: Unit tests (in Java) take a *very* long time to 'clean up'?

2016-11-11 Thread Ali Akhtar
For me, the startup doesn't take anywhere near as long as shutdown does.

On Fri, Nov 11, 2016 at 9:37 PM, Ali Akhtar <ali.rac...@gmail.com> wrote:

> Unless I'm missing anything, there's no reason why these throwaway
> processes should be shutdown gracefully. Just kill them as soon as the test
> finishes.
>
> On Fri, Nov 11, 2016 at 9:26 PM, Ali Akhtar <ali.rac...@gmail.com> wrote:
>
>> Hey Eno,
>>
>> Thanks for the quick reply.
>>
>> In the meantime, is it possible to just send a sigterm / kill -9 which
>> just kills the zookeeper + kafka? I can figure out how to do it if you can
>> point out which class / method creates the processes / threads.
>>
>> Thanks.
>>
>> On Fri, Nov 11, 2016 at 9:24 PM, Eno Thereska <eno.there...@gmail.com>
>> wrote:
>>
>>> Hi Ali,
>>>
>>> You're right, shutting down the broker and ZK is expensive. We kept the
>>> number of integration tests relatively small (and pushed some more tests as
>>> system tests, while doing as many as possible as unit tests). It's not just
>>> the shutdown that's expensive, it's also the starting up unfortunately.
>>> It's on our todo list to do something about this, but we haven't gotten
>>> there yet. If someone from the community wants to have a look and help out,
>>> that'd be great (with a JIRA and PR).
>>>
>>> About the second problem with ZK logs, this is being worked on as part
>>> of removing the ZK dependency from streams and should be merged shortly:
>>> https://github.com/apache/kafka/pull/1884 <https://github.com/apache/kaf
>>> ka/pull/1884>. The msg you see does not affect correctness, it's just
>>> annoying and it will go away.
>>>
>>> Thanks,
>>> Eno
>>>
>>>
>>> > On 11 Nov 2016, at 14:28, Ali Akhtar <ali.rac...@gmail.com> wrote:
>>> >
>>> > I have some unit tests in which I create an embedded single broker
>>> kafka
>>> > cluster, using :
>>> >
>>> > EmbeddedSingleNodeKafkaCluster.java from
>>> > https://github.com/confluentinc/examples/blob/master/kafka-s
>>> treams/src/test/java/io/confluent/examples/streams/kafka/
>>> EmbeddedSingleNodeKafkaCluster.java
>>> >
>>> > That class also creates an embedded zookeeper cluster / instance.
>>> >
>>> > The problem is, while the tests run pretty fast and pass, they then
>>> stay
>>> > stuck in the 'teardown / clean up' stage for a really long time, often
>>> upto
>>> > 10-20
>>> > seconds per test.
>>> >
>>> > As I have a lot of test classes, each class creating its own embedded
>>> kafka
>>> > cluster, this time can really add up during compiles.
>>> >
>>> > Is it possible to get these test classes to not do any clean up /
>>> safety
>>> > stuff, because the instances are just throwaway. Just have them kill
>>> -9 the
>>> > kafka / zookeeper and exit?
>>> >
>>> > It doesn't make any sense that tests pass within seconds, but can't
>>> move on
>>> > to the next test class because its cleaning up.
>>> >
>>> > I also have an embedded cassandra instance in these tests, but I don't
>>> > think that one is the problem, as i see a lot of zookeeper logs such as
>>> > these after the test runs:
>>> >
>>> > 133764 [main-SendThread(127.0.0.1:38846)] WARN
>>> > org.apache.zookeeper.ClientCnxn  - Session 0x15853c3497f0001 for
>>> server
>>> > null, unexpected error, closing socket connection and attempting
>>> reconnect
>>> > java.net.ConnectException: Connection refused
>>> > at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
>>> > at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl
>>> .java:717)
>>> > at
>>> > org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientC
>>> nxnSocketNIO.java:361)
>>> > at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.ja
>>> va:1081)
>>> >
>>> >
>>> > Could it be that zookeeper doesn't exit and keeps retrying to connect?
>>>
>>>
>>
>


Re: Unit tests (in Java) take a *very* long time to 'clean up'?

2016-11-11 Thread Ali Akhtar
Unless I'm missing anything, there's no reason why these throwaway
processes should be shutdown gracefully. Just kill them as soon as the test
finishes.

On Fri, Nov 11, 2016 at 9:26 PM, Ali Akhtar <ali.rac...@gmail.com> wrote:

> Hey Eno,
>
> Thanks for the quick reply.
>
> In the meantime, is it possible to just send a sigterm / kill -9 which
> just kills the zookeeper + kafka? I can figure out how to do it if you can
> point out which class / method creates the processes / threads.
>
> Thanks.
>
> On Fri, Nov 11, 2016 at 9:24 PM, Eno Thereska <eno.there...@gmail.com>
> wrote:
>
>> Hi Ali,
>>
>> You're right, shutting down the broker and ZK is expensive. We kept the
>> number of integration tests relatively small (and pushed some more tests as
>> system tests, while doing as many as possible as unit tests). It's not just
>> the shutdown that's expensive, it's also the starting up unfortunately.
>> It's on our todo list to do something about this, but we haven't gotten
>> there yet. If someone from the community wants to have a look and help out,
>> that'd be great (with a JIRA and PR).
>>
>> About the second problem with ZK logs, this is being worked on as part of
>> removing the ZK dependency from streams and should be merged shortly:
>> https://github.com/apache/kafka/pull/1884 <https://github.com/apache/kaf
>> ka/pull/1884>. The msg you see does not affect correctness, it's just
>> annoying and it will go away.
>>
>> Thanks,
>> Eno
>>
>>
>> > On 11 Nov 2016, at 14:28, Ali Akhtar <ali.rac...@gmail.com> wrote:
>> >
>> > I have some unit tests in which I create an embedded single broker kafka
>> > cluster, using :
>> >
>> > EmbeddedSingleNodeKafkaCluster.java from
>> > https://github.com/confluentinc/examples/blob/master/kafka-
>> streams/src/test/java/io/confluent/examples/streams/
>> kafka/EmbeddedSingleNodeKafkaCluster.java
>> >
>> > That class also creates an embedded zookeeper cluster / instance.
>> >
>> > The problem is, while the tests run pretty fast and pass, they then stay
>> > stuck in the 'teardown / clean up' stage for a really long time, often
>> upto
>> > 10-20
>> > seconds per test.
>> >
>> > As I have a lot of test classes, each class creating its own embedded
>> kafka
>> > cluster, this time can really add up during compiles.
>> >
>> > Is it possible to get these test classes to not do any clean up / safety
>> > stuff, because the instances are just throwaway. Just have them kill -9
>> the
>> > kafka / zookeeper and exit?
>> >
>> > It doesn't make any sense that tests pass within seconds, but can't
>> move on
>> > to the next test class because its cleaning up.
>> >
>> > I also have an embedded cassandra instance in these tests, but I don't
>> > think that one is the problem, as i see a lot of zookeeper logs such as
>> > these after the test runs:
>> >
>> > 133764 [main-SendThread(127.0.0.1:38846)] WARN
>> > org.apache.zookeeper.ClientCnxn  - Session 0x15853c3497f0001 for server
>> > null, unexpected error, closing socket connection and attempting
>> reconnect
>> > java.net.ConnectException: Connection refused
>> > at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
>> > at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl
>> .java:717)
>> > at
>> > org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientC
>> nxnSocketNIO.java:361)
>> > at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1081)
>> >
>> >
>> > Could it be that zookeeper doesn't exit and keeps retrying to connect?
>>
>>
>


Re: Unit tests (in Java) take a *very* long time to 'clean up'?

2016-11-11 Thread Ali Akhtar
Hey Eno,

Thanks for the quick reply.

In the meantime, is it possible to just send a sigterm / kill -9 which just
kills the zookeeper + kafka? I can figure out how to do it if you can point
out which class / method creates the processes / threads.

Thanks.

On Fri, Nov 11, 2016 at 9:24 PM, Eno Thereska <eno.there...@gmail.com>
wrote:

> Hi Ali,
>
> You're right, shutting down the broker and ZK is expensive. We kept the
> number of integration tests relatively small (and pushed some more tests as
> system tests, while doing as many as possible as unit tests). It's not just
> the shutdown that's expensive, it's also the starting up unfortunately.
> It's on our todo list to do something about this, but we haven't gotten
> there yet. If someone from the community wants to have a look and help out,
> that'd be great (with a JIRA and PR).
>
> About the second problem with ZK logs, this is being worked on as part of
> removing the ZK dependency from streams and should be merged shortly:
> https://github.com/apache/kafka/pull/1884 <https://github.com/apache/
> kafka/pull/1884>. The msg you see does not affect correctness, it's just
> annoying and it will go away.
>
> Thanks,
> Eno
>
>
> > On 11 Nov 2016, at 14:28, Ali Akhtar <ali.rac...@gmail.com> wrote:
> >
> > I have some unit tests in which I create an embedded single broker kafka
> > cluster, using :
> >
> > EmbeddedSingleNodeKafkaCluster.java from
> > https://github.com/confluentinc/examples/blob/
> master/kafka-streams/src/test/java/io/confluent/examples/streams/kafka/
> EmbeddedSingleNodeKafkaCluster.java
> >
> > That class also creates an embedded zookeeper cluster / instance.
> >
> > The problem is, while the tests run pretty fast and pass, they then stay
> > stuck in the 'teardown / clean up' stage for a really long time, often
> upto
> > 10-20
> > seconds per test.
> >
> > As I have a lot of test classes, each class creating its own embedded
> kafka
> > cluster, this time can really add up during compiles.
> >
> > Is it possible to get these test classes to not do any clean up / safety
> > stuff, because the instances are just throwaway. Just have them kill -9
> the
> > kafka / zookeeper and exit?
> >
> > It doesn't make any sense that tests pass within seconds, but can't move
> on
> > to the next test class because its cleaning up.
> >
> > I also have an embedded cassandra instance in these tests, but I don't
> > think that one is the problem, as i see a lot of zookeeper logs such as
> > these after the test runs:
> >
> > 133764 [main-SendThread(127.0.0.1:38846)] WARN
> > org.apache.zookeeper.ClientCnxn  - Session 0x15853c3497f0001 for server
> > null, unexpected error, closing socket connection and attempting
> reconnect
> > java.net.ConnectException: Connection refused
> > at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
> > at sun.nio.ch.SocketChannelImpl.finishConnect(
> SocketChannelImpl.java:717)
> > at
> > org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(
> ClientCnxnSocketNIO.java:361)
> > at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1081)
> >
> >
> > Could it be that zookeeper doesn't exit and keeps retrying to connect?
>
>


Unit tests (in Java) take a *very* long time to 'clean up'?

2016-11-11 Thread Ali Akhtar
I have some unit tests in which I create an embedded single broker kafka
cluster, using :

EmbeddedSingleNodeKafkaCluster.java from
https://github.com/confluentinc/examples/blob/master/kafka-streams/src/test/java/io/confluent/examples/streams/kafka/EmbeddedSingleNodeKafkaCluster.java

That class also creates an embedded zookeeper cluster / instance.

The problem is, while the tests run pretty fast and pass, they then stay
stuck in the 'teardown / clean up' stage for a really long time, often upto
10-20
seconds per test.

As I have a lot of test classes, each class creating its own embedded kafka
cluster, this time can really add up during compiles.

Is it possible to get these test classes to not do any clean up / safety
stuff, because the instances are just throwaway. Just have them kill -9 the
kafka / zookeeper and exit?

It doesn't make any sense that tests pass within seconds, but can't move on
to the next test class because its cleaning up.

I also have an embedded cassandra instance in these tests, but I don't
think that one is the problem, as i see a lot of zookeeper logs such as
these after the test runs:

133764 [main-SendThread(127.0.0.1:38846)] WARN
 org.apache.zookeeper.ClientCnxn  - Session 0x15853c3497f0001 for server
null, unexpected error, closing socket connection and attempting reconnect
java.net.ConnectException: Connection refused
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
at
org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:361)
at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1081)


Could it be that zookeeper doesn't exit and keeps retrying to connect?


Re: Kafka UTF 8 encoding problem

2016-11-09 Thread Ali Akhtar
Its probably not UTF-8 if it contains Turkish characters. That's why base64
encoding / decoding it might help.

On Wed, Nov 9, 2016 at 4:22 PM, Radoslaw Gruchalski 
wrote:

> Are you sure your string is in utf-8 in the first place?
> What if you pass your string via something like:
>
> System.out.println( new String( args[0].getBytes(StandardCharsets.UTF8),
> StandardCharsets.UTF8) )
>
> –
> Best regards,
> Radek Gruchalski
> ra...@gruchalski.com
>
>
> On November 9, 2016 at 12:14:03 PM, Baris Akgun (Garanti Teknoloji) (
> barisa...@garanti.com.tr) wrote:
>
> Hi,
>
>
>
> Producer Side//
>
>
>
> Properties props = *new* Properties();
>
> props.put("metadata.broker.list", brokerList);
>
> props.put("serializer.class", “kafka.serializer.StringEncoder”);
>
> props.put("request.required.acks", "1");
>
>
>
> Consumer side//
>
>
>
> I am using Spark Streaming Kafka API, I also try with Kafka CLI and Java
> kafka api but I always face with same issue.
>
>
>
> Thanks
>
>
>
> *From:* Radoslaw Gruchalski [mailto:ra...@gruchalski.com]
> *Sent:* Wednesday, November 9, 2016 1:49 PM
> *To:* Baris Akgun (Garanti Teknoloji); users@kafka.apache.org
> *Subject:* Re: Kafka UTF 8 encoding problem
>
>
>
> Baris,
>
>
>
> Kafka does not care about encoding, everything is transported as bytes.
>
> What’s the configueration of your producer / consumer?
>
> Are you using Java / JVM?
>
> –
> Best regards,
> Radek Gruchalski
> ra...@gruchalski.com
>
>
>
> On November 9, 2016 at 11:42:02 AM, Baris Akgun (Garanti Teknoloji) (
> barisa...@garanti.com.tr) wrote:
>
> Hi All,
>
> We are using Kafka 0,9.0.0 and we want to send our messages to topic in
> UTF-8 format but when we consume the messages from topic we saw that kafka
> does not keep the original utf-8 format and we did not see the messages
> exactly.
>
>
> For example our message that includes turkish characters is "Barış" but
> when we consume it we saw Bar?? . How can we solve that problem? Is there
> any way to set kafka topic encoding?
>
> Thanks
>
> Barış
> Bu mesaj ve ekleri, mesajda gonderildigi belirtilen kisi/kisilere ozeldir
> ve gizlidir. Bu mesajin muhatabi olmamaniza ragmen tarafiniza ulasmis
> olmasi halinde mesaj iceriginin gizliligi ve bu gizlilik yukumlulugune
> uyulmasi zorunlulugu tarafiniz icin de soz konusudur. Mesaj ve eklerinde
> yer alan bilgilerin dogrulugu ve guncelligi konusunda gonderenin ya da
> sirketimizin herhangi bir sorumlulugu bulunmamaktadir. Sirketimiz mesajin
> ve bilgilerinin size degisiklige ugrayarak veya gec ulasmasindan,
> butunlugunun ve gizliliginin korunamamasindan, virus icermesinden ve
> bilgisayar sisteminize verebilecegi herhangi bir zarardan sorumlu
> tutulamaz.
>
> This message and attachments are confidential and intended solely for the
> individual(s) stated in this message. If you received this message although
> you are not the addressee, you are responsible to keep the message
> confidential. The sender has no responsibility for the accuracy or
> correctness of the information in the message and its attachments. Our
> company shall have no liability for any changes or late receiving, loss of
> integrity and confidentiality, viruses and any damages caused in anyway to
> your computer system.
>
> Bu mesaj ve ekleri, mesajda gonderildigi belirtilen kisi/kisilere ozeldir
> ve gizlidir. Bu mesajin muhatabi olmamaniza ragmen tarafiniza ulasmis
> olmasi halinde mesaj iceriginin gizliligi ve bu gizlilik yukumlulugune
> uyulmasi zorunlulugu tarafiniz icin de soz konusudur. Mesaj ve eklerinde
> yer alan bilgilerin dogrulugu ve guncelligi konusunda gonderenin ya da
> sirketimizin herhangi bir sorumlulugu bulunmamaktadir. Sirketimiz mesajin
> ve bilgilerinin size degisiklige ugrayarak veya gec ulasmasindan,
> butunlugunun ve gizliliginin korunamamasindan, virus icermesinden ve
> bilgisayar sisteminize verebilecegi herhangi bir zarardan sorumlu
> tutulamaz.
>
> This message and attachments are confidential and intended solely for the
> individual(s) stated in this message. If you received this message although
> you are not the addressee, you are responsible to keep the message
> confidential. The sender has no responsibility for the accuracy or
> correctness of the information in the message and its attachments. Our
> company shall have no liability for any changes or late receiving, loss of
> integrity and confidentiality, viruses and any damages caused in anyway to
> your computer system.
>


Re: Kafka UTF 8 encoding problem

2016-11-09 Thread Ali Akhtar
I would recommend base64 encoding the message on the producer side, and
decoding it on the consumer side.

On Wed, Nov 9, 2016 at 3:40 PM, Baris Akgun (Garanti Teknoloji) <
barisa...@garanti.com.tr> wrote:

> Hi All,
>
> We are using Kafka 0,9.0.0 and we want to send our messages to topic in
> UTF-8 format but when we consume the messages from topic we saw that kafka
> does not keep the original utf-8 format and we did not see the messages
> exactly.
>
>
> For example our message that includes turkish characters is "Barış" but
> when we consume it we saw Bar?? . How can we solve that problem? Is there
> any way to set kafka topic encoding?
>
> Thanks
>
> Barış
> Bu mesaj ve ekleri, mesajda gonderildigi belirtilen kisi/kisilere ozeldir
> ve gizlidir. Bu mesajin muhatabi olmamaniza ragmen tarafiniza ulasmis
> olmasi halinde mesaj iceriginin gizliligi ve bu gizlilik yukumlulugune
> uyulmasi zorunlulugu tarafiniz icin de soz konusudur. Mesaj ve eklerinde
> yer alan bilgilerin dogrulugu ve guncelligi konusunda gonderenin ya da
> sirketimizin herhangi bir sorumlulugu bulunmamaktadir. Sirketimiz mesajin
> ve bilgilerinin size degisiklige ugrayarak veya gec ulasmasindan,
> butunlugunun ve gizliliginin korunamamasindan, virus icermesinden ve
> bilgisayar sisteminize verebilecegi herhangi bir zarardan sorumlu tutulamaz.
>
> This message and attachments are confidential and intended solely for the
> individual(s) stated in this message. If you received this message although
> you are not the addressee, you are responsible to keep the message
> confidential. The sender has no responsibility for the accuracy or
> correctness of the information in the message and its attachments. Our
> company shall have no liability for any changes or late receiving, loss of
> integrity and confidentiality, viruses and any damages caused in anyway to
> your computer system.
>


Re: [VOTE] Add REST Server to Apache Kafka

2016-10-25 Thread Ali Akhtar
And this will make adding health checks via Kubernetes easy.

On Wed, Oct 26, 2016 at 4:12 AM, Ali Akhtar <ali.rac...@gmail.com> wrote:

> +1. I hope there will be a corresponding Java library for doing admin
> functionality.
>
> On Wed, Oct 26, 2016 at 4:10 AM, Jungtaek Lim <kabh...@gmail.com> wrote:
>
>> +1
>>
>>
>> On Wed, 26 Oct 2016 at 8:00 AM craig w <codecr...@gmail.com> wrote:
>>
>> > -1
>> >
>> > On Tuesday, October 25, 2016, Sriram Subramanian <r...@confluent.io>
>> wrote:
>> >
>> > > -1 for all the reasons that have been described before. This does not
>> > need
>> > > to be part of the core project.
>> > >
>> > > On Tue, Oct 25, 2016 at 3:25 PM, Suresh Srinivas <
>> sur...@hortonworks.com
>> > > <javascript:;>>
>> > > wrote:
>> > >
>> > > > +1.
>> > > >
>> > > > This is an http access to core Kafka. This is very much needed as
>> part
>> > of
>> > > > Apache Kafka under ASF governance model.  This would be great for
>> the
>> > > > community instead of duplicated and splintered efforts that may
>> spring
>> > > up.
>> > > >
>> > > > Get Outlook for iOS<https://aka.ms/o0ukef>
>> > > >
>> > > > _
>> > > > From: Harsha Chintalapani <ka...@harsha.io <javascript:;>> > > ka...@harsha.io <javascript:;>>>
>> > > > Sent: Tuesday, October 25, 2016 2:20 PM
>> > > > Subject: [VOTE] Add REST Server to Apache Kafka
>> > > > To: <d...@kafka.apache.org <javascript:;><mailto:dev@kafk
>> a.apache.org
>> > > <javascript:;>>>, <
>> > > > users@kafka.apache.org <javascript:;><mailto:users@kafka.apache.org
>> > > <javascript:;>>>
>> > > >
>> > > >
>> > > > Hi All,
>> > > >We are proposing to have a REST Server as part of  Apache
>> > > Kafka
>> > > > to provide producer/consumer/admin APIs. We Strongly believe having
>> > > > REST server functionality with Apache Kafka will help a lot of
>> users.
>> > > > Here is the KIP that Mani Kumar wrote
>> > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>> > > > 80:+Kafka+Rest+Server.
>> > > > There is a discussion thread in dev list that had differing
>> opinions on
>> > > > whether to include REST server in Apache Kafka or not. You can read
>> > more
>> > > > about that in this thread
>> > > >
>> > http://mail-archives.apache.org/mod_mbox/kafka-dev/201610.mb
>> ox/%3CCAMVt_
>> > > > aymqeudm39znsxgktpdde46sowmqhsxop-+jmbcuv7...@mail.gmail.com
>> > > <javascript:;>%3E
>> > > >
>> > > >   This is a VOTE thread to check interest in the community
>> for
>> > > > adding REST Server implementation in Apache Kafka.
>> > > >
>> > > > Thanks,
>> > > > Harsha
>> > > >
>> > > >
>> > > >
>> > >
>> >
>> >
>> > --
>> >
>> > https://github.com/mindscratch
>> > https://www.google.com/+CraigWickesser
>> > https://twitter.com/mind_scratch
>> > https://twitter.com/craig_links
>> >
>>
>
>


Re: [VOTE] Add REST Server to Apache Kafka

2016-10-25 Thread Ali Akhtar
+1. I hope there will be a corresponding Java library for doing admin
functionality.

On Wed, Oct 26, 2016 at 4:10 AM, Jungtaek Lim  wrote:

> +1
>
>
> On Wed, 26 Oct 2016 at 8:00 AM craig w  wrote:
>
> > -1
> >
> > On Tuesday, October 25, 2016, Sriram Subramanian 
> wrote:
> >
> > > -1 for all the reasons that have been described before. This does not
> > need
> > > to be part of the core project.
> > >
> > > On Tue, Oct 25, 2016 at 3:25 PM, Suresh Srinivas <
> sur...@hortonworks.com
> > > >
> > > wrote:
> > >
> > > > +1.
> > > >
> > > > This is an http access to core Kafka. This is very much needed as
> part
> > of
> > > > Apache Kafka under ASF governance model.  This would be great for the
> > > > community instead of duplicated and splintered efforts that may
> spring
> > > up.
> > > >
> > > > Get Outlook for iOS
> > > >
> > > > _
> > > > From: Harsha Chintalapani  > > ka...@harsha.io >>
> > > > Sent: Tuesday, October 25, 2016 2:20 PM
> > > > Subject: [VOTE] Add REST Server to Apache Kafka
> > > > To:  > > >>, <
> > > > users@kafka.apache.org  > > >>
> > > >
> > > >
> > > > Hi All,
> > > >We are proposing to have a REST Server as part of  Apache
> > > Kafka
> > > > to provide producer/consumer/admin APIs. We Strongly believe having
> > > > REST server functionality with Apache Kafka will help a lot of users.
> > > > Here is the KIP that Mani Kumar wrote
> > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > > 80:+Kafka+Rest+Server.
> > > > There is a discussion thread in dev list that had differing opinions
> on
> > > > whether to include REST server in Apache Kafka or not. You can read
> > more
> > > > about that in this thread
> > > >
> > http://mail-archives.apache.org/mod_mbox/kafka-dev/201610.mbox/%3CCAMVt_
> > > > aymqeudm39znsxgktpdde46sowmqhsxop-+jmbcuv7...@mail.gmail.com
> > > %3E
> > > >
> > > >   This is a VOTE thread to check interest in the community
> for
> > > > adding REST Server implementation in Apache Kafka.
> > > >
> > > > Thanks,
> > > > Harsha
> > > >
> > > >
> > > >
> > >
> >
> >
> > --
> >
> > https://github.com/mindscratch
> > https://www.google.com/+CraigWickesser
> > https://twitter.com/mind_scratch
> > https://twitter.com/craig_links
> >
>


Re: Removing kafka topic contents from Java

2016-10-24 Thread Ali Akhtar
There isn't a java API for this, you'd have to mess around with bash
scripts which I haven't found to be worth it.

Just let the data expire and get deleted. Set a short expiry time for the
topic if necessary.


On Mon, Oct 24, 2016 at 6:30 PM, Demian Calcaprina 
wrote:

> Hi Guys,
>
> Is there a way to remove a kafka topic from the java api?
>
> I have the following scenario:
> We have a subscription service, which returns responses to subscribed
> clients through kafka topic. Per subscription, we create a new kafka topic
> and send the data by publishing in this new topic. And, when the
> subscription ends, this topic is not needed anymore, so I would like to
> purge it so that it does not store data it will not be used anymore.
> Does it make sense?
>
> Thanks!
>
> Demian
>


Re: How to block tests of Kafka Streams until messages processed?

2016-10-20 Thread Ali Akhtar
Michael,

Would there be any advantage to using the kafka connect method? Seems like
it'd just add an extra step of overhead?

On Thu, Oct 20, 2016 at 12:35 PM, Michael Noll <mich...@confluent.io> wrote:

> Ali,
>
> my main feedback is similar to what Eno and Dave have already said.  In
> your situation, options like these are what you'd currently need to do
> since you are writing directly from your Kafka Stream app to Cassandra,
> rather than writing from your app to Kafka and then using Kafka Connect to
> ingest into Cassandra.
>
>
>
> On Wed, Oct 19, 2016 at 11:03 PM, Ali Akhtar <ali.rac...@gmail.com> wrote:
>
> > Yeah, I did think to use that method, but as you said, it writes to a
> dummy
> > output topic, which means I'd have to put in magic code just for the
> tests
> > to pass (the actual code writes to cassandra and not to a dummy topic).
> >
> >
> > On Thu, Oct 20, 2016 at 2:00 AM, Tauzell, Dave <
> > dave.tauz...@surescripts.com
> > > wrote:
> >
> > > For similar queue related tests we put the check in a loop.  Check
> every
> > > second until either the result is found or a timeout  happens.
> > >
> > > -Dave
> > >
> > > -Original Message-
> > > From: Ali Akhtar [mailto:ali.rac...@gmail.com]
> > > Sent: Wednesday, October 19, 2016 3:38 PM
> > > To: users@kafka.apache.org
> > > Subject: How to block tests of Kafka Streams until messages processed?
> > >
> > > I'm using Kafka Streams, and I'm attempting to write integration tests
> > for
> > > a stream processor.
> > >
> > > The processor listens to a topic, processes incoming messages, and
> writes
> > > some data to Cassandra tables.
> > >
> > > I'm attempting to write a test which produces some test data, and then
> > > checks whether or not the expected data was written to Cassandra.
> > >
> > > It looks like this:
> > >
> > > - Step 1: Produce data in the test
> > > - Step 2: Kafka stream gets triggered
> > > - Step 3: Test checks whether cassandra got populated
> > >
> > > The problem is, Step 3 is occurring before Step 2, and as a result, the
> > > test fails as it doesn't find the data in the table.
> > >
> > > I've resolved this by adding a Thread.sleep(2000) call after Step 1,
> > which
> > > ensures that Step 2 gets triggered before Step 3.
> > >
> > > However, I'm wondering if there's a more reliable way of blocking the
> > test
> > > until Kafka stream processor gets triggered?
> > >
> > > At the moment, I'm using 1 thread for the processor. If I increase that
> > to
> > > 2 threads, will that achieve what I want?
> > > This e-mail and any files transmitted with it are confidential, may
> > > contain sensitive information, and are intended solely for the use of
> the
> > > individual or entity to whom they are addressed. If you have received
> > this
> > > e-mail in error, please notify the sender by reply e-mail immediately
> and
> > > destroy all copies of the e-mail and any attachments.
> > >
> >
>


Re: How to block tests of Kafka Streams until messages processed?

2016-10-19 Thread Ali Akhtar
Yeah, I did think to use that method, but as you said, it writes to a dummy
output topic, which means I'd have to put in magic code just for the tests
to pass (the actual code writes to cassandra and not to a dummy topic).


On Thu, Oct 20, 2016 at 2:00 AM, Tauzell, Dave <dave.tauz...@surescripts.com
> wrote:

> For similar queue related tests we put the check in a loop.  Check every
> second until either the result is found or a timeout  happens.
>
> -Dave
>
> -Original Message-
> From: Ali Akhtar [mailto:ali.rac...@gmail.com]
> Sent: Wednesday, October 19, 2016 3:38 PM
> To: users@kafka.apache.org
> Subject: How to block tests of Kafka Streams until messages processed?
>
> I'm using Kafka Streams, and I'm attempting to write integration tests for
> a stream processor.
>
> The processor listens to a topic, processes incoming messages, and writes
> some data to Cassandra tables.
>
> I'm attempting to write a test which produces some test data, and then
> checks whether or not the expected data was written to Cassandra.
>
> It looks like this:
>
> - Step 1: Produce data in the test
> - Step 2: Kafka stream gets triggered
> - Step 3: Test checks whether cassandra got populated
>
> The problem is, Step 3 is occurring before Step 2, and as a result, the
> test fails as it doesn't find the data in the table.
>
> I've resolved this by adding a Thread.sleep(2000) call after Step 1, which
> ensures that Step 2 gets triggered before Step 3.
>
> However, I'm wondering if there's a more reliable way of blocking the test
> until Kafka stream processor gets triggered?
>
> At the moment, I'm using 1 thread for the processor. If I increase that to
> 2 threads, will that achieve what I want?
> This e-mail and any files transmitted with it are confidential, may
> contain sensitive information, and are intended solely for the use of the
> individual or entity to whom they are addressed. If you have received this
> e-mail in error, please notify the sender by reply e-mail immediately and
> destroy all copies of the e-mail and any attachments.
>


Re: Embedded Kafka Cluster - Maven artifact?

2016-10-19 Thread Ali Akhtar
Please change that.

On Thu, Oct 20, 2016 at 1:53 AM, Eno Thereska <eno.there...@gmail.com>
wrote:

> I'm afraid we haven't released this as a maven artefact yet :(
>
> Eno
>
> > On 18 Oct 2016, at 13:22, Ali Akhtar <ali.rac...@gmail.com> wrote:
> >
> > Is there a maven artifact that can be used to create instances
> > of EmbeddedSingleNodeKafkaCluster for unit / integration tests?
>
>


How to block tests of Kafka Streams until messages processed?

2016-10-19 Thread Ali Akhtar
I'm using Kafka Streams, and I'm attempting to write integration tests for
a stream processor.

The processor listens to a topic, processes incoming messages, and writes
some data to Cassandra tables.

I'm attempting to write a test which produces some test data, and then
checks whether or not the expected data was written to Cassandra.

It looks like this:

- Step 1: Produce data in the test
- Step 2: Kafka stream gets triggered
- Step 3: Test checks whether cassandra got populated

The problem is, Step 3 is occurring before Step 2, and as a result, the
test fails as it doesn't find the data in the table.

I've resolved this by adding a Thread.sleep(2000) call after Step 1, which
ensures that Step 2 gets triggered before Step 3.

However, I'm wondering if there's a more reliable way of blocking the test
until Kafka stream processor gets triggered?

At the moment, I'm using 1 thread for the processor. If I increase that to
2 threads, will that achieve what I want?


Embedded Kafka Cluster - Maven artifact?

2016-10-18 Thread Ali Akhtar
Is there a maven artifact that can be used to create instances
of EmbeddedSingleNodeKafkaCluster for unit / integration tests?


Re: Understanding out of order message processing w/ Streaming

2016-10-13 Thread Ali Akhtar
Makes sense. Thanks

On 13 Oct 2016 12:42 pm, "Michael Noll" <mich...@confluent.io> wrote:

> > But if they arrive out of order, I have to detect / process that myself
> in
> > the processor logic.
>
> Yes -- if your processing logic depends on the specific ordering of
> messages (which is the case for you), then you must manually implement this
> ordering-specific logic at the moment.
>
> Other use cases may not need to do that and "just work" even with
> out-of-order data.  If, for example, you are counting objects or are
> computing the sum of numbers, then you do not need to anything special.
>
>
>
>
>
> On Wed, Oct 12, 2016 at 10:22 PM, Ali Akhtar <ali.rac...@gmail.com> wrote:
>
> > Thanks Matthias.
> >
> > So, if I'm understanding this right, Kafka will not discard which
> messages
> > which arrive out of order.
> >
> > What it will do is show messages in the order in which they arrive.
> >
> > But if they arrive out of order, I have to detect / process that myself
> in
> > the processor logic.
> >
> > Is that correct?
> >
> > Thanks.
> >
> > On Wed, Oct 12, 2016 at 11:37 PM, Matthias J. Sax <matth...@confluent.io
> >
> > wrote:
> >
> > > -BEGIN PGP SIGNED MESSAGE-
> > > Hash: SHA512
> > >
> > > Last question first: A KTable is basically in finite window over the
> > > whole stream providing a single result (that gets updated when new
> > > data arrives). If you use windows, you cut the overall stream into
> > > finite subsets and get a result per window. Thus, I guess you do not
> > > need windows (if I understood you use case correctly).
> > >
> > > However, current state of Kafka Streams DSL, you will not be able to
> > > use KTable (directly -- see suggestion to fix this below) because is
> > > does (currently) not allow to access the timestamp of the current
> > > record (thus, you can not know if a record is late or not). You will
> > > need to use Processor API which allows you to access the current
> > > records timestamp via the Context object given in init()
> > >
> > > Your reasoning about partitions and Streams instances is correct.
> > > However, the following two are not
> > >
> > > > - Because I'm using a KTable, the timestamp of the messages is
> > > > extracted, and I'm not shown the older bid because I've already
> > > > processed the later bid. The older bid is ignored.
> > >
> > > and
> > >
> > > > - Because of this, the replica already knows which timestamps it
> > > > has processed, and is able to ignore the older messages.
> > >
> > > Late arriving records are not dropped but processes regularly. Thus,
> > > your KTable aggregate function will be called for the late arriving
> > > record, too (but as described about, you have currently no way to know
> > > it is a later record).
> > >
> > >
> > > Last but not least, you last statement is a valid concern:
> > >
> > > > Also, what will happen if bid 2 arrived and got processed, and then
> > > > the particular replica crashed, and was restarted. The restarted
> > > > replica won't have any memory of which timestamps it has previously
> > > > processed.
> > > >
> > > > So if bid 2 got processed, replica crashed and restarted, and then
> > > > bid 1 arrived, what would happen in that case?
> > >
> > > In order to make this work, you would need to store the timestamp in
> > > you store next to the actual data. Thus, you can compare the timestamp
> > > of the latest result (safely stored in operator state) with the
> > > timestamp of the current record.
> > >
> > > Does this makes sense?
> > >
> > > To fix you issue, you could add a .transformValue() before you KTable,
> > > which allows you to access the timestamp of a record. If you add this
> > > timestamp to you value and pass it to KTable afterwards, you can
> > > access it and it gets also store reliably.
> > >
> > >  => transformValue =>  > > timestamp} => aggregate
> > >
> > > Hope this helps.
> > >
> > > - -Matthias
> > >
> > >
> > > On 10/11/16 9:12 PM, Ali Akhtar wrote:
> > > > P.S, does my scenario require using windows, or can it be achieved
> > > > using just KTable?
> > > >
> > > > On Wed,

Re: Understanding out of order message processing w/ Streaming

2016-10-12 Thread Ali Akhtar
Thanks Matthias.

So, if I'm understanding this right, Kafka will not discard which messages
which arrive out of order.

What it will do is show messages in the order in which they arrive.

But if they arrive out of order, I have to detect / process that myself in
the processor logic.

Is that correct?

Thanks.

On Wed, Oct 12, 2016 at 11:37 PM, Matthias J. Sax <matth...@confluent.io>
wrote:

> -BEGIN PGP SIGNED MESSAGE-
> Hash: SHA512
>
> Last question first: A KTable is basically in finite window over the
> whole stream providing a single result (that gets updated when new
> data arrives). If you use windows, you cut the overall stream into
> finite subsets and get a result per window. Thus, I guess you do not
> need windows (if I understood you use case correctly).
>
> However, current state of Kafka Streams DSL, you will not be able to
> use KTable (directly -- see suggestion to fix this below) because is
> does (currently) not allow to access the timestamp of the current
> record (thus, you can not know if a record is late or not). You will
> need to use Processor API which allows you to access the current
> records timestamp via the Context object given in init()
>
> Your reasoning about partitions and Streams instances is correct.
> However, the following two are not
>
> > - Because I'm using a KTable, the timestamp of the messages is
> > extracted, and I'm not shown the older bid because I've already
> > processed the later bid. The older bid is ignored.
>
> and
>
> > - Because of this, the replica already knows which timestamps it
> > has processed, and is able to ignore the older messages.
>
> Late arriving records are not dropped but processes regularly. Thus,
> your KTable aggregate function will be called for the late arriving
> record, too (but as described about, you have currently no way to know
> it is a later record).
>
>
> Last but not least, you last statement is a valid concern:
>
> > Also, what will happen if bid 2 arrived and got processed, and then
> > the particular replica crashed, and was restarted. The restarted
> > replica won't have any memory of which timestamps it has previously
> > processed.
> >
> > So if bid 2 got processed, replica crashed and restarted, and then
> > bid 1 arrived, what would happen in that case?
>
> In order to make this work, you would need to store the timestamp in
> you store next to the actual data. Thus, you can compare the timestamp
> of the latest result (safely stored in operator state) with the
> timestamp of the current record.
>
> Does this makes sense?
>
> To fix you issue, you could add a .transformValue() before you KTable,
> which allows you to access the timestamp of a record. If you add this
> timestamp to you value and pass it to KTable afterwards, you can
> access it and it gets also store reliably.
>
>  => transformValue =>  timestamp} => aggregate
>
> Hope this helps.
>
> - -Matthias
>
>
> On 10/11/16 9:12 PM, Ali Akhtar wrote:
> > P.S, does my scenario require using windows, or can it be achieved
> > using just KTable?
> >
> > On Wed, Oct 12, 2016 at 8:56 AM, Ali Akhtar <ali.rac...@gmail.com>
> > wrote:
> >
> >> Heya,
> >>
> >> Say I'm building a live auction site, with different products.
> >> Different users will bid on different products. And each time
> >> they do, I want to update the product's price, so it should
> >> always have the latest price in place.
> >>
> >> Example: Person 1 bids $3 on Product A, and Person 2 bids $5 on
> >> the same product 100 ms later.
> >>
> >> The second bid arrives first and the price is updated to $5. Then
> >> the first bid arrives. I want the price to not be updated in this
> >> case, as this bid is older than the one I've already processed.
> >>
> >> Here's my understanding of how I can achieve this with Kafka
> >> Streaming - is my understanding correct?
> >>
> >> - I have a topic for receiving bids. The topic has N partitions,
> >> and I have N replicas of my application which hooks up w/ Kafka
> >> Streaming, up and running.
> >>
> >> - I assume each replica of my app will listen to a different
> >> partition of the topic.
> >>
> >> - A user makes a bid on product A.
> >>
> >> - This is pushed to the topic with the key bid_a
> >>
> >> - Another user makes a bid. This is also pushed with the same key
> >> (bid_a)
> >>
> >> - The 2nd bid arrives first, and gets processed. Then the first
> >> (older) bid arri

Re: How can I delete a topic programatically?

2016-10-11 Thread Ali Akhtar
The last time I tried, I couldn't find a way to do it, other than to
trigger the bash script for topic deletion programatically.

On Wed, Oct 12, 2016 at 9:18 AM, Ratha v  wrote:

> Hi all;
>
> I have two topics(source and target). I do some processing on the message
> available in the source topic and i merge both topic.
> That is;
>
> builder.stream(sourceTopic).to(targetTopic)
>
> Once merged I no longer require the sourceTopic. I want to delete it.
>
> How can I do that programatically in java? I use highelevel  client APIs,
> kafka v 0.10.0.1
>
>
> Thanks
> --
> -Ratha
> http://vvratha.blogspot.com/
>


Re: Understanding out of order message processing w/ Streaming

2016-10-11 Thread Ali Akhtar
P.S, does my scenario require using windows, or can it be achieved using
just KTable?

On Wed, Oct 12, 2016 at 8:56 AM, Ali Akhtar <ali.rac...@gmail.com> wrote:

> Heya,
>
> Say I'm building a live auction site, with different products. Different
> users will bid on different products. And each time they do, I want to
> update the product's price, so it should always have the latest price in
> place.
>
> Example: Person 1 bids $3 on Product A, and Person 2 bids $5 on the same
> product 100 ms later.
>
> The second bid arrives first and the price is updated to $5. Then the
> first bid arrives. I want the price to not be updated in this case, as this
> bid is older than the one I've already processed.
>
> Here's my understanding of how I can achieve this with Kafka Streaming -
> is my understanding correct?
>
> - I have a topic for receiving bids. The topic has N partitions, and I
> have N replicas of my application which hooks up w/ Kafka Streaming, up and
> running.
>
> - I assume each replica of my app will listen to a different partition of
> the topic.
>
> - A user makes a bid on product A.
>
> - This is pushed to the topic with the key bid_a
>
> - Another user makes a bid. This is also pushed with the same key (bid_a)
>
> - The 2nd bid arrives first, and gets processed. Then the first (older)
> bid arrives.
>
> - Because I'm using a KTable, the timestamp of the messages is extracted,
> and I'm not shown the older bid because I've already processed the later
> bid. The older bid is ignored.
>
> - All bids on product A go to the same topic partition, and hence the same
> replica of my app, because they all have the key bid_a.
>
> - Because of this, the replica already knows which timestamps it has
> processed, and is able to ignore the older messages.
>
> Is the above understandning correct?
>
> Also, what will happen if bid 2 arrived and got processed, and then the
> particular replica crashed, and was restarted. The restarted replica won't
> have any memory of which timestamps it has previously processed.
>
> So if bid 2 got processed, replica crashed and restarted, and then bid 1
> arrived, what would happen in that case?
>
> Thanks.
>


Re: In Kafka Streaming, Serdes should use Optionals

2016-10-11 Thread Ali Akhtar
Thanks. That filter() method is a good solution. But whenever I look at it,
I feel an empty spot in my heart which can only be filled by:
filter(Optional::isPresent)

On Wed, Oct 12, 2016 at 12:15 AM, Guozhang Wang <wangg...@gmail.com> wrote:

> Ali,
>
> We are working on moving from Java7 to Java8 in Apache Kafka, and the
> Streams client is one of the motivations doing so. Stay tuned on the
> mailing list when it will come.
>
> Currently Streams won't automatically filter out null values for you since
> in some other cases they may have semantic meanings and cannot be simply
> ignored; you can, though, apply a simple filter such like "filter((key,
> value) => value != null)" before your processor lambda operator, if it
> looks clearer in your code.
>
> Guozhang
>
>
> On Sun, Oct 9, 2016 at 3:14 PM, Ali Akhtar <ali.rac...@gmail.com> wrote:
>
> > It isn't a fatal error. It should be logged as a warning, and then the
> > stream should be continued w/ the next message.
> >
> > Checking for null is 'ok', in the sense that it gets the job done, but
> > after java 8's release, we really should be using optionals.
> >
> > Hopefully we can break compatibility w/ the bad old days soon and move
> into
> > the future.
> >
> > (If there's a way to do the null check automatically, i.e before calling
> > the lambda, please let me know).
> >
> > On Sun, Oct 9, 2016 at 11:14 PM, Guozhang Wang <wangg...@gmail.com>
> wrote:
> >
> > > Ali,
> > >
> > > In your scenario, if serde fails to parse the bytes should that be
> > treated
> > > as a fatal failure or it is expected?
> > >
> > > In the former case, instead of returning a null I think it is better to
> > > throw a runtime exception in order to let the whole client to stop and
> > > notify the error; in the latter case, returning and checking null looks
> > > fine to me.
> > >
> > >
> > > Guozhang
> > >
> > > On Fri, Oct 7, 2016 at 3:12 PM, Ali Akhtar <ali.rac...@gmail.com>
> wrote:
> > >
> > > > Hey G,
> > > >
> > > > Looks like the only difference is a valueSerde parameter.
> > > >
> > > > How does that prevent having to look for nulls in the consumer?
> > > >
> > > > E.g, I wrote a custom Serde which converts the messages (which are
> json
> > > > strings) into a Java class using Jackson.
> > > >
> > > > If the json parse fails, it sends back a null.
> > > >
> > > > When I'm reading this stream, in my callback, how would I prevent
> > having
> > > to
> > > > check if the serialized value isn't null?
> > > >
> > > > On Sat, Oct 8, 2016 at 1:07 AM, Guozhang Wang <wangg...@gmail.com>
> > > wrote:
> > > >
> > > > > Hello Ali,
> > > > >
> > > > > We do have corresponding overloaded functions for most of KStream /
> > > > KTable
> > > > > operators to avoid enforcing users to specify "null"; in these
> cases
> > > the
> > > > > default serdes specified in the configs are then used. For example:
> > > > >
> > > > >  KTable<K, T> aggregate(Initializer initializer,
> > > > >Aggregator<K, V, T> adder,
> > > > >Aggregator<K, V, T> subtractor,
> > > > >Serde aggValueSerde,
> > > > >String storeName);
> > > > >
> > > > > /**
> > > > >  * .. using default serializers and deserializers.
> > > > >  */
> > > > >  KTable<K, T> aggregate(Initializer initializer,
> > > > >Aggregator<K, V, T> adder,
> > > > >Aggregator<K, V, T> subtractor,
> > > > >String storeName);
> > > > >
> > > > >
> > > > >
> > > > > Guozhang
> > > > >
> > > > >
> > > > > On Fri, Oct 7, 2016 at 9:20 AM, Michael Noll <mich...@confluent.io
> >
> > > > wrote:
> > > > >
> > > > > > Ali, the Apache Kafka project still targets Java 7, which means
> we
> > > > can't
> > > > > > use Java 8 features just yet.
> > > > > >
> > > > > > FYI: There's on ongoing conversation about when Kafka would move
> > from
> > > > > Java
> > > > > > 7 to Java 8.
> > > > > >
> > > > > > On Fri, Oct 7, 2016 at 6:14 PM, Ali Akhtar <ali.rac...@gmail.com
> >
> > > > wrote:
> > > > > >
> > > > > > > Since we're using Java 8 in most cases anyway, Serdes /
> > > Serialiazers
> > > > > > should
> > > > > > > use options, to avoid having to deal with the lovely nulls.
> > > > > > >
> > > > > >
> > > > >
> > > > >
> > > > >
> > > > > --
> > > > > -- Guozhang
> > > > >
> > > >
> > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>
>
>
> --
> -- Guozhang
>


Understanding out of order message processing w/ Streaming

2016-10-11 Thread Ali Akhtar
Heya,

Say I'm building a live auction site, with different products. Different
users will bid on different products. And each time they do, I want to
update the product's price, so it should always have the latest price in
place.

Example: Person 1 bids $3 on Product A, and Person 2 bids $5 on the same
product 100 ms later.

The second bid arrives first and the price is updated to $5. Then the first
bid arrives. I want the price to not be updated in this case, as this bid
is older than the one I've already processed.

Here's my understanding of how I can achieve this with Kafka Streaming - is
my understanding correct?

- I have a topic for receiving bids. The topic has N partitions, and I have
N replicas of my application which hooks up w/ Kafka Streaming, up and
running.

- I assume each replica of my app will listen to a different partition of
the topic.

- A user makes a bid on product A.

- This is pushed to the topic with the key bid_a

- Another user makes a bid. This is also pushed with the same key (bid_a)

- The 2nd bid arrives first, and gets processed. Then the first (older) bid
arrives.

- Because I'm using a KTable, the timestamp of the messages is extracted,
and I'm not shown the older bid because I've already processed the later
bid. The older bid is ignored.

- All bids on product A go to the same topic partition, and hence the same
replica of my app, because they all have the key bid_a.

- Because of this, the replica already knows which timestamps it has
processed, and is able to ignore the older messages.

Is the above understandning correct?

Also, what will happen if bid 2 arrived and got processed, and then the
particular replica crashed, and was restarted. The restarted replica won't
have any memory of which timestamps it has previously processed.

So if bid 2 got processed, replica crashed and restarted, and then bid 1
arrived, what would happen in that case?

Thanks.


Safely deleting all existing topics?

2016-10-11 Thread Ali Akhtar
In development, I often need to delete all existing data in all topics, and
start over.

My process for this currently is: stop zookeeper, stop kafka broker, rm -rf
~/kafka/data/*

But when I bring the broker back on, it often prints a bunch of errors and
needs to be restarted before it actually works.

I'm assuming some info is being persisted on Zookeeper.

Any ideas which other data I need to delete to start over w/ a clean slate?

Thanks.


KTable and KStream should share an interface

2016-10-11 Thread Ali Akhtar
They both have a lot of the same methods, and yet they can't be used
polymorphically because they don't share the same parent interface.

I think KIterable or something like that should be used as their base
interface w/ shared methods.


Re: Kafka null keys - OK or a problem?

2016-10-10 Thread Ali Akhtar
It seems to be using a Hash Partitioner here:
https://github.com/Shopify/sarama/blob/master/config.go#L262

and HashPartitioner is documented as:

>  If the message's key is nil then a random partition is chosen

https://godoc.org/github.com/Shopify/sarama#example-Partitioner--Random

So.. it should be okay to have null keys, I'm guessing.

On Mon, Oct 10, 2016 at 11:51 AM, Ali Akhtar <ali.rac...@gmail.com> wrote:

> Hey Michael,
>
> We're using this one: https://github.com/Shopify/sarama
>
> Any ideas how that one works?
>
> On Mon, Oct 10, 2016 at 11:48 AM, Michael Noll <mich...@confluent.io>
> wrote:
>
>> FYI: Kafka's new Java producer (which ships with Kafka) the behavior is as
>> follows:  If no partition is explicitly specified (to send the message to)
>> AND the key is null, then the DefaultPartitioner [1] will assign messages
>> to topic partitions in a round-robin fashion.  See the javadoc and also
>> the
>> little bit of code in [1] for details.
>>
>> Not sure which Go client you're using exactly so I can't tell whether your
>> Go client follows the behavior of Kafka's Java producer.
>>
>> -Michael
>>
>>
>>
>>
>> [1]
>> https://github.com/apache/kafka/blob/trunk/clients/src/main/
>> java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java
>>
>>
>> On Mon, Oct 10, 2016 at 7:53 AM, Ali Akhtar <ali.rac...@gmail.com> wrote:
>>
>> > If keys are null, what happens in terms of partitioning, is the load
>> spread
>> > evenly..?
>> >
>> > On Mon, Oct 10, 2016 at 7:59 AM, Gwen Shapira <g...@confluent.io>
>> wrote:
>> >
>> > > Kafka itself supports null keys. I'm not sure about the Go client you
>> > > use, but Confluent's Go client also supports null keys
>> > > (https://github.com/confluentinc/confluent-kafka-go/).
>> > >
>> > > If you decide to generate keys and you want even spread, a random
>> > > number generator is probably your best bet.
>> > >
>> > > Gwen
>> > >
>> > > On Sun, Oct 9, 2016 at 6:05 PM, Ali Akhtar <ali.rac...@gmail.com>
>> wrote:
>> > > > A kafka producer written elsewhere that I'm using, which uses the Go
>> > > kafka
>> > > > driver, is sending messages where the key is null.
>> > > >
>> > > > Is this OK - or will this cause issues due to partitioning not
>> > happening
>> > > > correctly?
>> > > >
>> > > > What would be a good way to generate keys in this case, to ensure
>> even
>> > > > partition spread?
>> > > >
>> > > > Thanks.
>> > >
>> > >
>> > >
>> > > --
>> > > Gwen Shapira
>> > > Product Manager | Confluent
>> > > 650.450.2760 | @gwenshap
>> > > Follow us: Twitter | blog
>> > >
>> >
>>
>
>


Re: Kafka null keys - OK or a problem?

2016-10-09 Thread Ali Akhtar
If keys are null, what happens in terms of partitioning, is the load spread
evenly..?

On Mon, Oct 10, 2016 at 7:59 AM, Gwen Shapira <g...@confluent.io> wrote:

> Kafka itself supports null keys. I'm not sure about the Go client you
> use, but Confluent's Go client also supports null keys
> (https://github.com/confluentinc/confluent-kafka-go/).
>
> If you decide to generate keys and you want even spread, a random
> number generator is probably your best bet.
>
> Gwen
>
> On Sun, Oct 9, 2016 at 6:05 PM, Ali Akhtar <ali.rac...@gmail.com> wrote:
> > A kafka producer written elsewhere that I'm using, which uses the Go
> kafka
> > driver, is sending messages where the key is null.
> >
> > Is this OK - or will this cause issues due to partitioning not happening
> > correctly?
> >
> > What would be a good way to generate keys in this case, to ensure even
> > partition spread?
> >
> > Thanks.
>
>
>
> --
> Gwen Shapira
> Product Manager | Confluent
> 650.450.2760 | @gwenshap
> Follow us: Twitter | blog
>


Kafka null keys - OK or a problem?

2016-10-09 Thread Ali Akhtar
A kafka producer written elsewhere that I'm using, which uses the Go kafka
driver, is sending messages where the key is null.

Is this OK - or will this cause issues due to partitioning not happening
correctly?

What would be a good way to generate keys in this case, to ensure even
partition spread?

Thanks.


Re: In Kafka Streaming, Serdes should use Optionals

2016-10-09 Thread Ali Akhtar
It isn't a fatal error. It should be logged as a warning, and then the
stream should be continued w/ the next message.

Checking for null is 'ok', in the sense that it gets the job done, but
after java 8's release, we really should be using optionals.

Hopefully we can break compatibility w/ the bad old days soon and move into
the future.

(If there's a way to do the null check automatically, i.e before calling
the lambda, please let me know).

On Sun, Oct 9, 2016 at 11:14 PM, Guozhang Wang <wangg...@gmail.com> wrote:

> Ali,
>
> In your scenario, if serde fails to parse the bytes should that be treated
> as a fatal failure or it is expected?
>
> In the former case, instead of returning a null I think it is better to
> throw a runtime exception in order to let the whole client to stop and
> notify the error; in the latter case, returning and checking null looks
> fine to me.
>
>
> Guozhang
>
> On Fri, Oct 7, 2016 at 3:12 PM, Ali Akhtar <ali.rac...@gmail.com> wrote:
>
> > Hey G,
> >
> > Looks like the only difference is a valueSerde parameter.
> >
> > How does that prevent having to look for nulls in the consumer?
> >
> > E.g, I wrote a custom Serde which converts the messages (which are json
> > strings) into a Java class using Jackson.
> >
> > If the json parse fails, it sends back a null.
> >
> > When I'm reading this stream, in my callback, how would I prevent having
> to
> > check if the serialized value isn't null?
> >
> > On Sat, Oct 8, 2016 at 1:07 AM, Guozhang Wang <wangg...@gmail.com>
> wrote:
> >
> > > Hello Ali,
> > >
> > > We do have corresponding overloaded functions for most of KStream /
> > KTable
> > > operators to avoid enforcing users to specify "null"; in these cases
> the
> > > default serdes specified in the configs are then used. For example:
> > >
> > >  KTable<K, T> aggregate(Initializer initializer,
> > >Aggregator<K, V, T> adder,
> > >Aggregator<K, V, T> subtractor,
> > >Serde aggValueSerde,
> > >String storeName);
> > >
> > > /**
> > >  * .. using default serializers and deserializers.
> > >  */
> > >  KTable<K, T> aggregate(Initializer initializer,
> > >Aggregator<K, V, T> adder,
> > >Aggregator<K, V, T> subtractor,
> > >String storeName);
> > >
> > >
> > >
> > > Guozhang
> > >
> > >
> > > On Fri, Oct 7, 2016 at 9:20 AM, Michael Noll <mich...@confluent.io>
> > wrote:
> > >
> > > > Ali, the Apache Kafka project still targets Java 7, which means we
> > can't
> > > > use Java 8 features just yet.
> > > >
> > > > FYI: There's on ongoing conversation about when Kafka would move from
> > > Java
> > > > 7 to Java 8.
> > > >
> > > > On Fri, Oct 7, 2016 at 6:14 PM, Ali Akhtar <ali.rac...@gmail.com>
> > wrote:
> > > >
> > > > > Since we're using Java 8 in most cases anyway, Serdes /
> Serialiazers
> > > > should
> > > > > use options, to avoid having to deal with the lovely nulls.
> > > > >
> > > >
> > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>
>
>
> --
> -- Guozhang
>


Re: Deleting a message after all consumer have consumed it

2016-10-07 Thread Ali Akhtar
Also, you can set a retention period and have messages get auto deleted
after a certain time (default 1 week)

On Sat, Oct 8, 2016 at 3:21 AM, Hans Jespersen  wrote:

> Kafka doesn’t work that way. Kafka is “Publish-subscribe messaging
> rethought as a distributed commit log”. The messages in the log do not get
> deleted just because “all" clients have consumed the messages. Besides you
> could always have a late joining consumer come along and if you mistakenly
> deleted the messages then they wouldn’t be there for any new consumers.
> Ditto for replaying messages back to consumers after they consume the
> messages the first time. Kafka just keeps the messages and once consumed
> each client moves their offset forward to point to the next message. You
> can tell is any consumer has finished processing a message by checking the
> offsets for that client. For the older clients these are stored in
> Zookeeper but for the new consumers (0.9+) they are in a special kafka
> topic dedicated to storing client offsets.
>
>
> -hans
>
>
>
> > On Oct 7, 2016, at 1:34 PM, Hysi, Lorenc  wrote:
> >
> > Hello,
> >
> > Thank you for your time.
> >
> > I wanted to ask whether it's possible to remove a message from a topic
> after making sure all consumers have gotten a it. If so what is the best
> way to achieve this?
> >
> > Also how do I make sure that all consumers  have received a message. Any
> way to do this in kafka or zookeeper? I keep looking but I'm only seeing
> that only consumers themselves keep track of where they are and if so any
> way to have consumers talk to each other to make sure they all have
> consumed the same message.
> >
> > If this is not the right place to ask this question I apologize in
> advance and please let me know where to ask if you can.
> >
> > Thank you!
> >
> >
>
>


Re: In Kafka Streaming, Serdes should use Optionals

2016-10-07 Thread Ali Akhtar
Hey G,

Looks like the only difference is a valueSerde parameter.

How does that prevent having to look for nulls in the consumer?

E.g, I wrote a custom Serde which converts the messages (which are json
strings) into a Java class using Jackson.

If the json parse fails, it sends back a null.

When I'm reading this stream, in my callback, how would I prevent having to
check if the serialized value isn't null?

On Sat, Oct 8, 2016 at 1:07 AM, Guozhang Wang <wangg...@gmail.com> wrote:

> Hello Ali,
>
> We do have corresponding overloaded functions for most of KStream / KTable
> operators to avoid enforcing users to specify "null"; in these cases the
> default serdes specified in the configs are then used. For example:
>
>  KTable<K, T> aggregate(Initializer initializer,
>Aggregator<K, V, T> adder,
>Aggregator<K, V, T> subtractor,
>Serde aggValueSerde,
>String storeName);
>
> /**
>  * .. using default serializers and deserializers.
>  */
>  KTable<K, T> aggregate(Initializer initializer,
>Aggregator<K, V, T> adder,
>Aggregator<K, V, T> subtractor,
>String storeName);
>
>
>
> Guozhang
>
>
> On Fri, Oct 7, 2016 at 9:20 AM, Michael Noll <mich...@confluent.io> wrote:
>
> > Ali, the Apache Kafka project still targets Java 7, which means we can't
> > use Java 8 features just yet.
> >
> > FYI: There's on ongoing conversation about when Kafka would move from
> Java
> > 7 to Java 8.
> >
> > On Fri, Oct 7, 2016 at 6:14 PM, Ali Akhtar <ali.rac...@gmail.com> wrote:
> >
> > > Since we're using Java 8 in most cases anyway, Serdes / Serialiazers
> > should
> > > use options, to avoid having to deal with the lovely nulls.
> > >
> >
>
>
>
> --
> -- Guozhang
>


In Kafka Streaming, Serdes should use Optionals

2016-10-07 Thread Ali Akhtar
Since we're using Java 8 in most cases anyway, Serdes / Serialiazers should
use options, to avoid having to deal with the lovely nulls.


Re: Printing to stdin from KStreams?

2016-10-07 Thread Ali Akhtar
Thank you.

I've resolved this by adding a run config in Intellij for running
streams-reset, and using the same application id in all applications in
development (transparently reading the application id from environment
variables, so in my kubernetes config I can specify different app ids for
production)

On Fri, Oct 7, 2016 at 8:05 PM, Michael Noll <mich...@confluent.io> wrote:

> > Is it possible to have kafka-streams-reset be automatically called during
> > development? Something like streams.cleanUp() but which also does reset?
>
> Unfortunately this isn't possible (yet), Ali.  I am also not aware of any
> plan to add such a feature in the short-term.
>
>
>
> On Fri, Oct 7, 2016 at 1:36 PM, Ali Akhtar <ali.rac...@gmail.com> wrote:
>
> > Is it possible to have kafka-streams-reset be automatically called during
> > development? Something like streams.cleanUp() but which also does reset?
> >
> > On Fri, Oct 7, 2016 at 2:45 PM, Michael Noll <mich...@confluent.io>
> wrote:
> >
> > > Ali,
> > >
> > > adding to what Matthias said:
> > >
> > > Kafka 0.10 changed the message format to add so-called "embedded
> > > timestamps" into each Kafka message.  The Java producer included in
> Kafka
> > > 0.10 includes such embedded timestamps into any generated message as
> > > expected.
> > >
> > > However, other clients (like the go kafka plugin you are using) may not
> > > have been updated yet to be compatible with the new 0.10 message
> format.
> > > That's the root cause why see these "-1" negative timestamps.   (The
> same
> > > negative timestamp problem also happens if you attempt to read messages
> > > that were generated with pre-0.10 versions of Kafka's Java producer.)
> > >
> > > FYI: Kafka Streams' default timestamp extractor attempts to read those
> > new
> > > embedded timestamps.  If there are no such embedded timestamps, you run
> > > into these "negative timestamps" errors.
> > >
> > > Now, how to fix your problem?
> > >
> > > - Fix the root cause: Check if there's a newer version of your Go kafka
> > > plugin that generates messages in the new Kafka 0.10 format.  If there
> is
> > > no such version, ask the maintainers for an update. :-)
> > >
> > > - Work around the problem:  As Matthias said, you can also tell Kafka
> > > Streams to not use its default timestamp extractor.  You can fallback
> to
> > > the WallclockTimestampExtractor, though this means your application
> will
> > > not use event-time but processing-time when processing your data, which
> > is
> > > probably not what you want (but it does prevent the -1 timestamp
> errors).
> > > If your data (generated by the go kafka plugin) *does* contain
> timestamp
> > > information in the message payload, then the better option is to write
> a
> > > custom timestamp extract that inspects each message, extracts the
> > timestamp
> > > from the payload, and returns it to Kafka Streams.  The Timestamp
> > Extractor
> > > section in [1] explains how to write a custom one and how to configure
> > your
> > > app to use it.
> > >
> > > Hope this helps,
> > > Michael
> > >
> > >
> > >
> > > [1]
> > > http://docs.confluent.io/3.0.1/streams/developer-guide.
> > > html#optional-configuration-parameters
> > >
> > >
> > >
> > >
> > >
> > >
> > > On Fri, Oct 7, 2016 at 5:17 AM, Matthias J. Sax <matth...@confluent.io
> >
> > > wrote:
> > >
> > > > -BEGIN PGP SIGNED MESSAGE-
> > > > Hash: SHA512
> > > >
> > > > If you restart your application, it will resume where is left off
> > > > (same as any other Kafka consumer that does use group management and
> > > > commits offsets).
> > > >
> > > > If you want to reprocess data from scratch, you need to reset your
> > > > application using bin/kafka-streams-application-reset.sh
> > > >
> > > > See also
> > > > http://docs.confluent.io/3.0.1/streams/developer-guide.
> > html#application-
> > > > reset-tool
> > > >
> > > > and
> > > > http://www.confluent.io/blog/data-reprocessing-with-kafka-
> > streams-resett
> > > > ing-a-streams-application/
> > > >
> > > >
> > > > About the timestamp issue: it 

Re: Printing to stdin from KStreams?

2016-10-07 Thread Ali Akhtar
Is it possible to have kafka-streams-reset be automatically called during
development? Something like streams.cleanUp() but which also does reset?

On Fri, Oct 7, 2016 at 2:45 PM, Michael Noll <mich...@confluent.io> wrote:

> Ali,
>
> adding to what Matthias said:
>
> Kafka 0.10 changed the message format to add so-called "embedded
> timestamps" into each Kafka message.  The Java producer included in Kafka
> 0.10 includes such embedded timestamps into any generated message as
> expected.
>
> However, other clients (like the go kafka plugin you are using) may not
> have been updated yet to be compatible with the new 0.10 message format.
> That's the root cause why see these "-1" negative timestamps.   (The same
> negative timestamp problem also happens if you attempt to read messages
> that were generated with pre-0.10 versions of Kafka's Java producer.)
>
> FYI: Kafka Streams' default timestamp extractor attempts to read those new
> embedded timestamps.  If there are no such embedded timestamps, you run
> into these "negative timestamps" errors.
>
> Now, how to fix your problem?
>
> - Fix the root cause: Check if there's a newer version of your Go kafka
> plugin that generates messages in the new Kafka 0.10 format.  If there is
> no such version, ask the maintainers for an update. :-)
>
> - Work around the problem:  As Matthias said, you can also tell Kafka
> Streams to not use its default timestamp extractor.  You can fallback to
> the WallclockTimestampExtractor, though this means your application will
> not use event-time but processing-time when processing your data, which is
> probably not what you want (but it does prevent the -1 timestamp errors).
> If your data (generated by the go kafka plugin) *does* contain timestamp
> information in the message payload, then the better option is to write a
> custom timestamp extract that inspects each message, extracts the timestamp
> from the payload, and returns it to Kafka Streams.  The Timestamp Extractor
> section in [1] explains how to write a custom one and how to configure your
> app to use it.
>
> Hope this helps,
> Michael
>
>
>
> [1]
> http://docs.confluent.io/3.0.1/streams/developer-guide.
> html#optional-configuration-parameters
>
>
>
>
>
>
> On Fri, Oct 7, 2016 at 5:17 AM, Matthias J. Sax <matth...@confluent.io>
> wrote:
>
> > -BEGIN PGP SIGNED MESSAGE-
> > Hash: SHA512
> >
> > If you restart your application, it will resume where is left off
> > (same as any other Kafka consumer that does use group management and
> > commits offsets).
> >
> > If you want to reprocess data from scratch, you need to reset your
> > application using bin/kafka-streams-application-reset.sh
> >
> > See also
> > http://docs.confluent.io/3.0.1/streams/developer-guide.html#application-
> > reset-tool
> >
> > and
> > http://www.confluent.io/blog/data-reprocessing-with-kafka-streams-resett
> > ing-a-streams-application/
> >
> >
> > About the timestamp issue: it seems that your Go client does not
> > assign valid timestamps when writing the data. As you already said,
> > you need to provide a custom TimestampExtractor (or you
> > WallclockTimestampExtractor if semantic permit) instead of default
> > ConsumerRecordTimestampExtractor)
> >
> >
> > - -Matthias
> >
> > On 10/6/16 7:53 PM, Ali Akhtar wrote:
> > > Thanks.
> > >
> > > I'm encountering a strange issue.
> > >
> > > If I create messages thru console-producer.sh on a new topic,
> > > things work fine.
> > >
> > > But on the topic that I need to consume, the messages are being
> > > produced via the go kafka plugin.
> > >
> > > On this topic, at first, nothing happens when the stream starts
> > > (i.e it doesn't process the messages which are already in there)
> > >
> > > Then, if I produce new messages, then my exception handler is
> > > called with the exception that timestamp is negative.
> > >
> > > I'm pretty sure I'm on kafka 0.10. I downloaded it 2-3 weeks ago.
> > >
> > > I'm going to write a new timestamp extractor, but any ideas why
> > > nothing happens with the old messages which are in the topic, it
> > > only responds if i push new messages to this topic?
> > >
> > > On Fri, Oct 7, 2016 at 7:25 AM, Matthias J. Sax
> > > <matth...@confluent.io> wrote:
> > >
> > > Sure.
> > >
> > > Just use #print() or #writeAsText()
> > >
> > >
> > > -

Re: Printing to stdin from KStreams?

2016-10-06 Thread Ali Akhtar
Thanks.

I'm encountering a strange issue.

If I create messages thru console-producer.sh on a new topic, things work
fine.

But on the topic that I need to consume, the messages are being produced
via the go kafka plugin.

On this topic, at first, nothing happens when the stream starts (i.e it
doesn't process the messages which are already in there)

Then, if I produce new messages, then my exception handler is called with
the exception that timestamp is negative.

I'm pretty sure I'm on kafka 0.10. I downloaded it 2-3 weeks ago.

I'm going to write a new timestamp extractor, but any ideas why nothing
happens with the old messages which are in the topic, it only responds if i
push new messages to this topic?

On Fri, Oct 7, 2016 at 7:25 AM, Matthias J. Sax <matth...@confluent.io>
wrote:

> -BEGIN PGP SIGNED MESSAGE-
> Hash: SHA512
>
> Sure.
>
> Just use #print() or #writeAsText()
>
>
> - -Matthias
>
> On 10/6/16 6:25 PM, Ali Akhtar wrote:
> > What the subject says. For dev, it would be a lot easier if
> > debugging info can be printed to stdin instead of another topic,
> > where it will persist.
> >
> > Any ideas if this is possible?
> >
> -BEGIN PGP SIGNATURE-
> Comment: GPGTools - https://gpgtools.org
>
> iQIcBAEBCgAGBQJX9weuAAoJECnhiMLycopPtbIP/R2hJgcfOtPqlRqbuQ1590D8
> zznMVi9TPPZM4RiGInuUFpefT3Emwxh0lbkNziEQKh+CmPjFqL6nUqssK+2G4dgf
> Aer3s5lMa0gz/AxhOwYi97VjptCGT21zxop5gbgWi4FlNapncHAFiziS5WxDkJgf
> LqJR6/isexI5Av5pQQkXvxQ7/S7GWFQKHygrqKJqGP6N5+kFtpPjJrzBie3VLuCN
> nMkgInWPFMoLNvLUCHGHIClPJe1Go+/4guXUuy1xDWHjeEHnT26+KnpOlTfuiBU6
> rNWoJqFWUb4je+QVQt/9izCuaBQQhPp4HaH2e5F9DJd72VOjg3e0CB1uRYVf4qCM
> zhXhiXXkDKfunxhMHcPFTXITPXClwRxhTJnDuYWOFaMPyrwDeXz3c3hMEaJlNRDe
> PyIvsh3OWcA9IxGjsvHETtFK//ujBDn3zh9t/lkQlj36s+h/Sz+NMYMrFh1ipPbR
> yd/YwYUNnRp5f8udmWOktUvx1RUc8vjwA8VXCppiIiCLUhMF8HH8lFNRzOgTz+PF
> kV//LXSpovfzlPDDfuK1WijgjG1su/IWSpOGBqOM56fke9/xtoY5T5iqYVyvIsDI
> I3I4M8xOSZHRm4ye+OX+qa9Cwf4h0/LcdWJkbQT1TgNvYf0dOTZDKsLvcy/tOk/U
> 2SXFUX7k+6Es38vhcfXU
> =ypHP
> -END PGP SIGNATURE-
>


Printing to stdin from KStreams?

2016-10-06 Thread Ali Akhtar
What the subject says. For dev, it would be a lot easier if debugging info
can be printed to stdin instead of another topic, where it will persist.

Any ideas if this is possible?


Re: Handling out of order messages without KTables

2016-10-06 Thread Ali Akhtar
Thanks! That looks perfect.

Last q.. is there any shortcut to having the json string messages
automatically get serialized to their equivalent Java class via Jackson, or
such?

Perhaps I can write a Serde impl which takes the java.lang.Class of the
class to be mapped, and maps it via Jackson?

On Fri, Oct 7, 2016 at 3:39 AM, Matthias J. Sax <matth...@confluent.io>
wrote:

> -BEGIN PGP SIGNED MESSAGE-
> Hash: SHA512
>
> Exactly. You need to set the key using KStream#selectKey() and
> re-distribute data via #through().
>
> About timestamps: you can provide a custom TimestampExtractor that
> returns the JSON embedded TS instead of record TS (as
> DefaultTimestampExtractor does)
>
> See
> http://docs.confluent.io/3.0.1/streams/developer-guide.html#timestamp-ex
> tractor
>
>
> - -Matthias
>
> On 10/6/16 2:59 PM, Ali Akhtar wrote:
> > Sorry, to be clear:
> >
> > - Producers post to topic A - Consumers of topic A receive the
> > data, parse it to find the keys, and post the correct key + message
> > to Topic B - Topic B is treated as a KTable by 2nd consumer layer,
> > and its this layer which does the writes to ensure 'last one wins'
> > (Assuming 'last one' can be determined using the timestamp in the
> > json of the message)
> >
> > On Fri, Oct 7, 2016 at 2:54 AM, Ali Akhtar <ali.rac...@gmail.com>
> > wrote:
> >
> >> Thanks for the reply.
> >>
> >> Its not possible to provide keys, unfortunately. (Producer is
> >> written by a colleague, and said colleague just wants to provide
> >> whatever data the API gives, and leave all processing of the data
> >> to me).
> >>
> >> Perhaps I can use an intermediate kafka topic, and have producers
> >> post to that topic w/ whatever data they receive. Then, another
> >> consumer can listen to that topic, and use it as a KTable to
> >> process data in the order of 'last one winning'.
> >>
> >> However, the source of truth on the time of the messages, is
> >> embedded in the message itself, its not the Kafka internal
> >> timestamp.
> >>
> >> The message is a json string, which contains a 'timestamp'
> >> field, containing a string timestamp, and that string timestamp
> >> is the source of truth on when this message was generated.
> >>
> >> So, is it possible to use a KTable which lets me parse the
> >> message and return the time  which is contained inside the
> >> message, and use THAT time for sorting the messages?
> >>
> >> On Fri, Oct 7, 2016 at 2:33 AM, Matthias J. Sax
> >> <matth...@confluent.io> wrote:
> >>
> > It is not global in this sense.
> >
> > Thus, you need to ensure that records updating the same product, go
> > to the same instance. You can ensure this, by given all records of
> > the same product the same key and "groupByKey" before processing
> > the data.
> >
> > -Matthias
> >
> > On 10/6/16 10:55 AM, Ali Akhtar wrote:
> >>>>> Thank you, State Store seems promising. But, is it
> >>>>> distributed, or limited to the particular instance of my
> >>>>> application?
> >>>>>
> >>>>> I.e if there are 3 messages, setting product 1's price to
> >>>>> $1, $3, and $5, and all 3 of them go to a different
> >>>>> instance of my application, will they be able to correctly
> >>>>> identify the latest received message using State Store?
> >>>>>
> >>>>> On Thu, Oct 6, 2016 at 10:48 PM, Matthias J. Sax
> >>>>> <matth...@confluent.io> wrote:
> >>>>>
> >>>>> What do you mean by "message keys are random" -- do you
> >>>>> effectively have no keys and want all messages to be
> >>>>> processed as if they all have the same key?
> >>>>>
> >>>>> To access record TS in general, you need to use Processor
> >>>>> API. The given ProcessorContext object given by
> >>>>> Processor#init() always return the timestamp of the
> >>>>> currently processed on #timestamp().
> >>>>>
> >>>>> Thus, you can attach a state store to your processor and
> >>>>> compare the timestamps of the current record with the
> >>>>> timestamp of the one in your store.
> >>>>>
> >>>>> -Matthias
> >>>>>
> >>>>> On 10/6/16 8:52 AM, Ali Akhtar wr

Re: Handling out of order messages without KTables

2016-10-06 Thread Ali Akhtar
Sorry, to be clear:

- Producers post to topic A
- Consumers of topic A receive the data, parse it to find the keys, and
post the correct key + message to Topic B
- Topic B is treated as a KTable by 2nd consumer layer, and its this layer
which does the writes to ensure 'last one wins' (Assuming 'last one' can be
determined using the timestamp in the json of the message)

On Fri, Oct 7, 2016 at 2:54 AM, Ali Akhtar <ali.rac...@gmail.com> wrote:

> Thanks for the reply.
>
> Its not possible to provide keys, unfortunately. (Producer is written by a
> colleague, and said colleague just wants to provide whatever data the API
> gives, and leave all processing of the data to me).
>
> Perhaps I can use an intermediate kafka topic, and have producers post to
> that topic w/ whatever data they receive. Then, another consumer can listen
> to that topic, and use it as a KTable to process data in the order of 'last
> one winning'.
>
> However, the source of truth on the time of the messages, is embedded in
> the message itself, its not the Kafka internal timestamp.
>
> The message is a json string, which contains a 'timestamp' field,
> containing a string timestamp, and that string timestamp is the source of
> truth on when this message was generated.
>
> So, is it possible to use a KTable which lets me parse the message and
> return the time  which is contained inside the message, and use THAT time
> for sorting the messages?
>
> On Fri, Oct 7, 2016 at 2:33 AM, Matthias J. Sax <matth...@confluent.io>
> wrote:
>
>> -BEGIN PGP SIGNED MESSAGE-
>> Hash: SHA512
>>
>> It is not global in this sense.
>>
>> Thus, you need to ensure that records updating the same product, go to
>> the same instance. You can ensure this, by given all records of the
>> same product the same key and "groupByKey" before processing the data.
>>
>> - -Matthias
>>
>> On 10/6/16 10:55 AM, Ali Akhtar wrote:
>> > Thank you, State Store seems promising. But, is it distributed, or
>> > limited to the particular instance of my application?
>> >
>> > I.e if there are 3 messages, setting product 1's price to $1, $3,
>> > and $5, and all 3 of them go to a different instance of my
>> > application, will they be able to correctly identify the latest
>> > received message using State Store?
>> >
>> > On Thu, Oct 6, 2016 at 10:48 PM, Matthias J. Sax
>> > <matth...@confluent.io> wrote:
>> >
>> > What do you mean by "message keys are random" -- do you
>> > effectively have no keys and want all messages to be processed as
>> > if they all have the same key?
>> >
>> > To access record TS in general, you need to use Processor API. The
>> > given ProcessorContext object given by Processor#init() always
>> > return the timestamp of the currently processed on #timestamp().
>> >
>> > Thus, you can attach a state store to your processor and compare
>> > the timestamps of the current record with the timestamp of the one
>> > in your store.
>> >
>> > -Matthias
>> >
>> > On 10/6/16 8:52 AM, Ali Akhtar wrote:
>> >>>> Heya,
>> >>>>
>> >>>> I have some Kafka producers, which are listening to webhook
>> >>>> events, and for each webhook event, they post its payload to
>> >>>> a Kafka topic.
>> >>>>
>> >>>> Each payload contains a timestamp from the webhook source.
>> >>>>
>> >>>> This timestamp is the source of truth about which events
>> >>>> happened first, which happened last, etc.
>> >>>>
>> >>>> I need to ensure that the last arrival of a particular type
>> >>>> of message wins.
>> >>>>
>> >>>> E.g, if there are 5 messages, saying the price of a product
>> >>>> with id 1, was set to $1, then $3, then something else, etc,
>> >>>> before finally being set to $10, then I need to make sure
>> >>>> that the final price for that product is $10.
>> >>>>
>> >>>> These messages can be out of order, and I need to determine
>> >>>> the latest arrival based on the timestamp from the webhook
>> >>>> source. (Its atm in a string format which can be parsed)
>> >>>>
>> >>>> Since KTable looks like it uses message keys to determine
>> >>>> what happens - and in this case, the message keys are random,
>> >>>&

Re: Handling out of order messages without KTables

2016-10-06 Thread Ali Akhtar
Thanks for the reply.

Its not possible to provide keys, unfortunately. (Producer is written by a
colleague, and said colleague just wants to provide whatever data the API
gives, and leave all processing of the data to me).

Perhaps I can use an intermediate kafka topic, and have producers post to
that topic w/ whatever data they receive. Then, another consumer can listen
to that topic, and use it as a KTable to process data in the order of 'last
one winning'.

However, the source of truth on the time of the messages, is embedded in
the message itself, its not the Kafka internal timestamp.

The message is a json string, which contains a 'timestamp' field,
containing a string timestamp, and that string timestamp is the source of
truth on when this message was generated.

So, is it possible to use a KTable which lets me parse the message and
return the time  which is contained inside the message, and use THAT time
for sorting the messages?

On Fri, Oct 7, 2016 at 2:33 AM, Matthias J. Sax <matth...@confluent.io>
wrote:

> -BEGIN PGP SIGNED MESSAGE-
> Hash: SHA512
>
> It is not global in this sense.
>
> Thus, you need to ensure that records updating the same product, go to
> the same instance. You can ensure this, by given all records of the
> same product the same key and "groupByKey" before processing the data.
>
> - -Matthias
>
> On 10/6/16 10:55 AM, Ali Akhtar wrote:
> > Thank you, State Store seems promising. But, is it distributed, or
> > limited to the particular instance of my application?
> >
> > I.e if there are 3 messages, setting product 1's price to $1, $3,
> > and $5, and all 3 of them go to a different instance of my
> > application, will they be able to correctly identify the latest
> > received message using State Store?
> >
> > On Thu, Oct 6, 2016 at 10:48 PM, Matthias J. Sax
> > <matth...@confluent.io> wrote:
> >
> > What do you mean by "message keys are random" -- do you
> > effectively have no keys and want all messages to be processed as
> > if they all have the same key?
> >
> > To access record TS in general, you need to use Processor API. The
> > given ProcessorContext object given by Processor#init() always
> > return the timestamp of the currently processed on #timestamp().
> >
> > Thus, you can attach a state store to your processor and compare
> > the timestamps of the current record with the timestamp of the one
> > in your store.
> >
> > -Matthias
> >
> > On 10/6/16 8:52 AM, Ali Akhtar wrote:
> >>>> Heya,
> >>>>
> >>>> I have some Kafka producers, which are listening to webhook
> >>>> events, and for each webhook event, they post its payload to
> >>>> a Kafka topic.
> >>>>
> >>>> Each payload contains a timestamp from the webhook source.
> >>>>
> >>>> This timestamp is the source of truth about which events
> >>>> happened first, which happened last, etc.
> >>>>
> >>>> I need to ensure that the last arrival of a particular type
> >>>> of message wins.
> >>>>
> >>>> E.g, if there are 5 messages, saying the price of a product
> >>>> with id 1, was set to $1, then $3, then something else, etc,
> >>>> before finally being set to $10, then I need to make sure
> >>>> that the final price for that product is $10.
> >>>>
> >>>> These messages can be out of order, and I need to determine
> >>>> the latest arrival based on the timestamp from the webhook
> >>>> source. (Its atm in a string format which can be parsed)
> >>>>
> >>>> Since KTable looks like it uses message keys to determine
> >>>> what happens - and in this case, the message keys are random,
> >>>> and the timestamp contained in the value of the message is
> >>>> what determines the order of the events - any pointers on
> >>>> what the best way to do this is?
> >>>>
> >>>> I'm using kafka streaming, latest version, Java.
> >>>>
> >>
> >
> -BEGIN PGP SIGNATURE-
> Comment: GPGTools - https://gpgtools.org
>
> iQIcBAEBCgAGBQJX9sM4AAoJECnhiMLycopPtDQQAIRZ5X/w4u9tdeBORgLlvvRJ
> VQpdov1/xhY1VDzNLbqxnW0HlBlPcWl0UJ2gHd9vWbHyGlm0D/amZbaAr+n54xSu
> NTZ9u4zLDD6bRtNpnoFX8m2lsxb4AAzpzbCQqCCeJRPQ9D5eJCkV8i+mcRo6CA8V
> JEkb+OZkEFwvlFFr5jUuLfrnFEI4pV+gRguFrmRdbXKAbqGgV8hMk/hS4aNAjiz0
> hZ3uHW3JAXhn+kgCqykHlsVCHA/yQUU4Gm/5mNuYvrsTYW1UObuBJ/O0SPSfPcs9
> XnJbV0T6xeNZLbCnGer+IykdLItdjs3slfAxINtyJkLXv7A6kOkE0Odb05gXYiy9
> b+/vLiEiVnXU/eO70V5kcs3NCCovEu/+vFcsEEVg/UnCXl0K96ywVm44ljXOe80O
> 4ESuagg6oNO50uLVrOydYGLlgVYjBL/LM/ld4DnWlt37g6r50FTclEkXZExlnWtz
> pWjOsn/bbsx3Nybcvc4blga/7I5C25yZ44yQCcGtEg5JFqn4+2cUQGy6mOmRUjMV
> wghdgHOc+A79sSmBx9BuOB4Lt15AYfvMP/NUGjESSKvIEoAbBavQKBFNYcbqMdfX
> UgYR7Icx5KmdB9ufB2dFMkaD163b1tVUT1nAzyyWcPrCjGQ9n7CDEQA4b9eUR5bH
> cydxl9H+2QRo1jh8UOlE
> =Mwrf
> -END PGP SIGNATURE-
>


Re: Handling out of order messages without KTables

2016-10-06 Thread Ali Akhtar
Thank you, State Store seems promising. But, is it distributed, or limited
to the particular instance of my application?

I.e if there are 3 messages, setting product 1's price to $1, $3, and $5,
and all 3 of them go to a different instance of my application, will they
be able to correctly identify the latest received message using State Store?

On Thu, Oct 6, 2016 at 10:48 PM, Matthias J. Sax <matth...@confluent.io>
wrote:

> -BEGIN PGP SIGNED MESSAGE-
> Hash: SHA512
>
> What do you mean by "message keys are random" -- do you effectively
> have no keys and want all messages to be processed as if they all have
> the same key?
>
> To access record TS in general, you need to use Processor API. The
> given ProcessorContext object given by Processor#init() always return
> the timestamp of the currently processed on #timestamp().
>
> Thus, you can attach a state store to your processor and compare the
> timestamps of the current record with the timestamp of the one in your
> store.
>
> - -Matthias
>
> On 10/6/16 8:52 AM, Ali Akhtar wrote:
> > Heya,
> >
> > I have some Kafka producers, which are listening to webhook events,
> > and for each webhook event, they post its payload to a Kafka
> > topic.
> >
> > Each payload contains a timestamp from the webhook source.
> >
> > This timestamp is the source of truth about which events happened
> > first, which happened last, etc.
> >
> > I need to ensure that the last arrival of a particular type of
> > message wins.
> >
> > E.g, if there are 5 messages, saying the price of a product with id
> > 1, was set to $1, then $3, then something else, etc, before finally
> > being set to $10, then I need to make sure that the final price for
> > that product is $10.
> >
> > These messages can be out of order, and I need to determine the
> > latest arrival based on the timestamp from the webhook source. (Its
> > atm in a string format which can be parsed)
> >
> > Since KTable looks like it uses message keys to determine what
> > happens - and in this case, the message keys are random, and the
> > timestamp contained in the value of the message is what determines
> > the order of the events - any pointers on what the best way to do
> > this is?
> >
> > I'm using kafka streaming, latest version, Java.
> >
> -BEGIN PGP SIGNATURE-
> Comment: GPGTools - https://gpgtools.org
>
> iQIcBAEBCgAGBQJX9o6AAAoJECnhiMLycopPIjEP/At7i3ttitxDkDUKeaaemZbD
> BF6XX3GMt1TDM6h+PwryjssZmHpJgmdFmNOZe1HkZIrJKmwDP7lxtA5OoKCXeJxm
> 2O+vbaTSYi5W5isCVYRjNnGWjxjrdoD+HQoPElUM8byfqlIBBrmsE0NZ3P7WEpKI
> vnpOJJLd/mNfrBgXuXPClvOXl45uRyHyPBoAZ6sZEoNn9QxXXKGLuelpElpF2sGz
> lMFlELW90SUGjt+dVF61WtcdNj9Bn0pzjR5PYGq29wwOPiZqc8P7anithZi0k4Eb
> VBupjQMzivrvbdZ9ebnZljsf4G/6omTCFhf5y4ph6y8453Qr/4E/b9LKkM3gI79M
> NzjyCyPCSAjtO+TTIyQdn6fUvUDa7qeEC+CiqtlILdSeWxP5SajJKIHxbUpjLWEQ
> FByC21KKI20Lh81Pd4NqCddiUpOZae4ZTpv7Pa30YjKB3ofOjeHnIiFuic9lQ7Ro
> SJ8GuDWI1hxQ9T3h0uz3eqR1mpZvpIWlVq3bccT7d9olx39VW8c5/GYiZtibPZjb
> g4r459CSfcar5olR8TSbxivXxglj7vjn74DR9zQAorc3wsmcXigiYac8yhpwnCzQ
> os1ChrT7XDtTbaPNnE9JZeGFX920/6pJdSbpBf8OmFiT7arVEujyQgwLIKQaWU4j
> 0EhFm3pJKOsXvWS5sqEc
> =f+6C
> -END PGP SIGNATURE-
>


Handling out of order messages without KTables

2016-10-06 Thread Ali Akhtar
Heya,

I have some Kafka producers, which are listening to webhook events, and for
each webhook event, they post its payload to a Kafka topic.

Each payload contains a timestamp from the webhook source.

This timestamp is the source of truth about which events happened first,
which happened last, etc.

I need to ensure that the last arrival of a particular type of message wins.

E.g, if there are 5 messages, saying the price of a product with id 1, was
set to $1, then $3, then something else, etc, before finally being set to
$10, then I need to make sure that the final price for that product is $10.

These messages can be out of order, and I need to determine the latest
arrival based on the timestamp from the webhook source. (Its atm in a
string format which can be parsed)

Since KTable looks like it uses message keys to determine what happens -
and in this case, the message keys are random, and the timestamp contained
in the value of the message is what determines the order of the events -
any pointers on what the best way to do this is?

I'm using kafka streaming, latest version, Java.


Re: Kafka Streams dynamic partitioning

2016-10-05 Thread Ali Akhtar
> It's often a good
idea to over-partition your topics.  For example, even if today 10 machines
(and thus 10 partitions) would be sufficient, pick a higher number of
partitions (say, 50) so you have some wiggle room to add more machines
(11...50) later if need be.

If you create e.g 30 partitions, but only have e.g 5 instances of your
program, all on the same consumer group, all using kafka streams to consume
the topic, do you still receive all the data posted to the topic, or will
you need to have the same instances of the program as there are partitions?

(If you have 1 instance, 30 partitions, will the same rules apply, i.e it
will receive all data?)

On Wed, Oct 5, 2016 at 8:52 PM, Michael Noll  wrote:

> > So, in this case I should know the max number of possible keys so that
> > I can create that number of partitions.
>
> Assuming I understand your original question correctly, then you would not
> need to do/know this.  Rather, pick the number of partitions in a way that
> matches your needs to process the data in parallel (e.g. if you expect that
> you require 10 machines in order to process the incoming data, then you'd
> need 10 partitions).  Also, as a general recommendation:  It's often a good
> idea to over-partition your topics.  For example, even if today 10 machines
> (and thus 10 partitions) would be sufficient, pick a higher number of
> partitions (say, 50) so you have some wiggle room to add more machines
> (11...50) later if need be.
>
>
>
> On Wed, Oct 5, 2016 at 9:34 AM, Adrienne Kole 
> wrote:
>
> > Hi Guozhang,
> >
> > So, in this case I should know the max number of possible keys so that I
> > can create that number of partitions.
> >
> > Thanks
> >
> > Adrienne
> >
> > On Wed, Oct 5, 2016 at 1:00 AM, Guozhang Wang 
> wrote:
> >
> > > By default the partitioner will use murmur hash on the key and mode on
> > > current num.partitions to determine which partitions to go to, so
> records
> > > with the same key will be assigned to the same partition. Would that be
> > OK
> > > for your case?
> > >
> > >
> > > Guozhang
> > >
> > >
> > > On Tue, Oct 4, 2016 at 3:00 PM, Adrienne Kole  >
> > > wrote:
> > >
> > > > Hi,
> > > >
> > > > From Streams documentation, I can see that each Streams instance is
> > > > processing data independently (from other instances), reads from
> topic
> > > > partition(s) and writes to specified topic.
> > > >
> > > >
> > > > So here, the partitions of topic should be determined beforehand and
> > > should
> > > > remain static.
> > > > In my usecase I want to create partitioned/keyed (time) windows and
> > > > aggregate them.
> > > > I can partition the incoming data to specified topic's partitions and
> > > each
> > > > Stream instance can do windowed aggregations.
> > > >
> > > > However, if I don't know the number of possible keys (to partition),
> > then
> > > > what should I do?
> > > >
> > > > Thanks
> > > > Adrienne
> > > >
> > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>


Re: difficulty to delete a topic because of its syntax

2016-10-05 Thread Ali Akhtar
I don't see a space in that topic name

On Wed, Oct 5, 2016 at 6:42 PM, Hamza HACHANI 
wrote:

> Hi,
>
> I created a topic called device-connection-invert-key-value-the
> metric-changelog.
>
> I insit that there is a space in it.
>
>
>
> Now that i want to delete it because my  cluster can no longer work
> correctly i can't do it as it  only reads the first part of it : (
> device-connection-invert-key-value-the) which obviously it doesn't find.
>
> Does some body have a wolution to delete it ?
>
> Thanks in advance.
>
>
> Hamza
>
>


Re: Snazzy new look to our website

2016-10-04 Thread Ali Akhtar
Just noticed this on pulling up the documentation. Oh yeah! This new look
is fantastic.

On Wed, Oct 5, 2016 at 4:31 AM, Vahid S Hashemian  wrote:

> +1
>
> Thank you for the much needed new design.
> At first glance, it looks great, and more professional.
>
> --Vahid
>
>
>
> From:   Gwen Shapira 
> To: d...@kafka.apache.org, Users 
> Cc: Derrick Or 
> Date:   10/04/2016 04:13 PM
> Subject:Snazzy new look to our website
>
>
>
> Hi Team Kafka,
>
> I just merged PR 20 to our website - which gives it a new (and IMO
> pretty snazzy) look and feel. Thanks to Derrick Or for contributing
> the update.
>
> I had to do a hard-refresh (shift-f5 on my mac) to get the new look to
> load properly - so if stuff looks off, try it.
>
> Comments and contributions to the site are welcome.
>
> Gwen
>
>
>
>
>
>


Re: Kafka Streams - Parallel by default or 1 thread per topic?

2016-10-04 Thread Ali Akhtar
<3

On Wed, Oct 5, 2016 at 2:31 AM, Ali Akhtar <ali.rac...@gmail.com> wrote:

> That's awesome. Thanks.
>
> On Wed, Oct 5, 2016 at 2:19 AM, Matthias J. Sax <matth...@confluent.io>
> wrote:
>
>> -BEGIN PGP SIGNED MESSAGE-
>> Hash: SHA512
>>
>> Yes.
>>
>> On 10/4/16 1:47 PM, Ali Akhtar wrote:
>> > Hey Matthias,
>> >
>> > All my topics have 3 partitions each, and I will have about 20-30
>> > topics in total that need to be subscribed to and managed.
>> >
>> > So, if I create an app which registers handles for each of the 30
>> > topics, the parallelization / multithreading will be handled behind
>> > the scenes by kafka streaming, correct?
>> >
>> > If I deployed 2 more instances of the same app, to have 3 instances
>> > of my app,  will the load get redistributed automatically so that
>> > instead of the same app listening to all 3 partitions for each
>> > topic, this gets spread around so now each instances of the app
>> > will listen to 1 partition of each topic each?
>> >
>> > (Each instance of the app will be using the same consumer group
>> > name)
>> >
>> > On Wed, Oct 5, 2016 at 1:43 AM, Matthias J. Sax
>> > <matth...@confluent.io> wrote:
>> >
>> > Kafka Stream parallelizes via Kafka partitions -- for each
>> > partitions a task is created. If you subscribe to multiple topics,
>> > the topics with the most partitions determine the number of task,
>> > and each task get partitions from all topics assigned.
>> >
>> > Furthermore, you can configure the number to thread
>> > (num.stream.threads , see
>> > http://docs.confluent.io/current/streams/developer-guide.html#optional
>> - -c
>> >
>> >
>> onfiguration-parameters)
>> > -- the max useful configuration is the number of created tasks.
>> > Keep in mind, if you start multiple instanced of you Streams app,
>> > partitions are managed in a consumer group fashion, ie, are
>> > distributed over the running instances.
>> >
>> > Please see here for more details
>> > http://docs.confluent.io/current/streams/architecture.html#parallelism
>> - -m
>> >
>> >
>> odel
>> >
>> >
>> > -Matthias
>> >
>> > On 10/4/16 1:27 PM, Ali Akhtar wrote:
>> >>>> I need to consume a large number of topics, and handle each
>> >>>> topic in a different way.
>> >>>>
>> >>>> I was thinking about creating a different KStream for each
>> >>>> topic, and doing KStream.foreach for each stream, to process
>> >>>> incoming messages.
>> >>>>
>> >>>> However, its unclear if this will be handled in a parallel
>> >>>> way by default, or if I need to create a managed ThreadPool
>> >>>> and create the KStream for each topic within its own thread
>> >>>> pool.
>> >>>>
>> >>>> Can anyone shed some light - does KStreamBuilder / KStream
>> >>>> handle concurrency for each KStream, or does this need to be
>> >>>> managed?
>> >>>>
>> >>>> Thanks.
>> >>>>
>> >>
>> >
>> -BEGIN PGP SIGNATURE-
>> Comment: GPGTools - https://gpgtools.org
>>
>> iQIcBAEBCgAGBQJX9BzJAAoJECnhiMLycopPjlQP/j8wVrA8zMMAZESTNbHEOlvT
>> tQp/l00MW5XWWVWiR/i/BUeRdmUNGRWzKikchdZGzyxsCYxvprP6k8/JKHEr/mD7
>> VyoS6/dZLL7Z51cP0wVyWUIwXU9BRr5VvzUZlXthFFA6F7gY7azS3LqRpx+aZNZD
>> IOBpPJtpvSXiFIBbOqrfHtHy62WRd9C9koSg2wfyGjPxH0J9qFO+/Jq5VrGk0HH2
>> FXkYtT8PVm60RuKkMa1DrAK148iPJDfLO/GADAgfBejnV6PP/csh9JNIwi6ZcLLe
>> xko/WKhLPD0SPnnqPFpf6Sqguv38y/6fUFJw44MNs4z5A5c0KG7lqZ9FELs9CiUO
>> +mJL97WYy4yhxYiTI0E4Cbr9BsPwJ5CRNPrhu0euHvQ08O1WjIJoFvgYCNo+dKnS
>> oq4A8IYV31+U8hQASQGSe4ejq6og/55EigKMa+VkqoGW8vSqEofL4AXZEq1tR+Ya
>> +nql7aOSFef9/n2JRghOaZB12QW/oXz2nX+yzj8fhoyZMIQGuzQGQebYq2I9Zkg6
>> /+QsJKHUkCGzh37k66LusEL/HgMEhIs4nhlxhZ3rWybrkkv5Oi7jO1o8ox8k9Cvh
>> Ahp/YGPsD8O1L/9x4IaA4l/U2MY3/wVQpyFCRQQEL49FmS66wdIGlnodZwO4SrdS
>> DO4SLxIyi70WtIXV8qVK
>> =y2zf
>> -END PGP SIGNATURE-
>>
>
>


Re: Kafka Streams - Parallel by default or 1 thread per topic?

2016-10-04 Thread Ali Akhtar
Hey Matthias,

All my topics have 3 partitions each, and I will have about 20-30 topics in
total that need to be subscribed to and managed.

So, if I create an app which registers handles for each of the 30 topics,
the parallelization / multithreading will be handled behind the scenes by
kafka streaming, correct?

If I deployed 2 more instances of the same app, to have 3 instances of my
app,  will the load get redistributed automatically so that instead of the
same app listening to all 3 partitions for each topic, this gets spread
around so now each instances of the app will listen to 1 partition of each
topic each?

(Each instance of the app will be using the same consumer group name)

On Wed, Oct 5, 2016 at 1:43 AM, Matthias J. Sax <matth...@confluent.io>
wrote:

> -BEGIN PGP SIGNED MESSAGE-
> Hash: SHA512
>
> Kafka Stream parallelizes via Kafka partitions -- for each partitions
> a task is created. If you subscribe to multiple topics, the topics
> with the most partitions determine the number of task, and each task
> get partitions from all topics assigned.
>
> Furthermore, you can configure the number to thread
> (num.stream.threads , see
> http://docs.confluent.io/current/streams/developer-guide.html#optional-c
> onfiguration-parameters)
> - -- the max useful configuration is the number of created tasks. Keep
> in mind, if you start multiple instanced of you Streams app,
> partitions are managed in a consumer group fashion, ie, are
> distributed over the running instances.
>
> Please see here for more details
> http://docs.confluent.io/current/streams/architecture.html#parallelism-m
> odel
>
>
> - -Matthias
>
> On 10/4/16 1:27 PM, Ali Akhtar wrote:
> > I need to consume a large number of topics, and handle each topic
> > in a different way.
> >
> > I was thinking about creating a different KStream for each topic,
> > and doing KStream.foreach for each stream, to process incoming
> > messages.
> >
> > However, its unclear if this will be handled in a parallel way by
> > default, or if I need to create a managed ThreadPool and create the
> > KStream for each topic within its own thread pool.
> >
> > Can anyone shed some light - does KStreamBuilder / KStream handle
> > concurrency for each KStream, or does this need to be managed?
> >
> > Thanks.
> >
> -BEGIN PGP SIGNATURE-
> Comment: GPGTools - https://gpgtools.org
>
> iQIcBAEBCgAGBQJX9BRgAAoJECnhiMLycopP9hEQAKgilK2PEaG3QgohJSi8THbb
> yE/u6vVjSgNpIcZKVULoJ8cWMwh9pHb0LH28B/RNBVujHaBc5WO8YuhpvvlKeBro
> TPCvbE+IHKJI/R9pD6OdUkeN09SOd6iJ/Bbc6N02+3rsCFTLiFzwusPx8pH9Tx59
> RTA6VmWBwkzQt1pCYHIKVYil138jgRh7hjQs/3XYex0vibL3bQBltmZwYnIalcbX
> n7fAr3rKlrwLMvH1LPr5NPiyzp6al4gdXxeqNAFNI0wwb6y7nqbMeywdOh4KEruC
> XT8O63O8ykfpL+wNSldT7lnvsxwL5myEp0ONKPRD5S1URzTVEFNj9dzohwGFV7ZE
> M/1nBu5pxf6BzSBWgi1A30iTUgQo7pP7ManKhRw71kGotD/oLdu2gAL4mHuKiao6
> 6Z/6prVsDouAk4CbuvXmNlmAFgHHZswtza0qZEG0797Xl3ByhOfAcuREzOJ2c+LE
> gI1C7E3iV3FgWuOK5B6VIdu0qjC88r8hD7+Q1ep+iDoZUXKqH9LUB06sGE8EaM5Q
> X67ihVuYo1akG3Hta2JFIHbRuoHLTUnqx3BGMEi8bbZWXfeIY+jl2IwQIqAVaXEo
> soA6fgDJmGS0vrVKwF7ceT3XmGtkG9h7tHjxnr+TruC6vdWMlNrSg0A6mb158RNd
> pvdyd9Qf++uud/UZYQb0
> =VfUR
> -END PGP SIGNATURE-
>


Kafka Streams - Parallel by default or 1 thread per topic?

2016-10-04 Thread Ali Akhtar
I need to consume a large number of topics, and handle each topic in a
different way.

I was thinking about creating a different KStream for each topic, and doing
KStream.foreach for each stream, to process incoming messages.

However, its unclear if this will be handled in a parallel way by default,
or if I need to create a managed ThreadPool and create the KStream for each
topic within its own thread pool.

Can anyone shed some light - does KStreamBuilder / KStream handle
concurrency for each KStream, or does this need to be managed?

Thanks.


Re: New Idea for Kafka multiple consumers running parallel.

2016-10-04 Thread Ali Akhtar
You may be able to control the starting offset,  but if you try to control
which instance gets offset 4.. you'll lose all benefits of parallelism.

On 4 Oct 2016 3:02 pm, "Kaushil Rambhia/ MUM/CORP/ ENGINEERING" <
kaushi...@pepperfry.com> wrote:

> Hi guys,
> i am using apache kafka with phprd kafka, i want to know how can i use
> multiple Kafka consumers on same partition from different groups to consume
> message parallel, say if consumer are c1,c2,c3 consuming single partition
> 0,
> than if c1 is consuming from 0 offset than c2 should start from 1 and c3
> from 2 and if any new consumer comes up it should start from latest offset
> which is yet to be consumed by any consumer.
>
> --
> Regards,
> Kaushil Rambhia
>
> --
>
>
>
> DISCLAIMER: The contents of this message may be legally privileged and
> confidential and are for the use of the intended recipient(s) only. It
> should not be read, copied and used by anyone other than the intended
> recipient(s). If you have received this message in error, please
> immediately notify the sender, preserve its confidentiality and delete it.
> Before opening any attachments please check them for viruses and defects.
>


Re: offset topics growing huge

2016-10-03 Thread Ali Akhtar
Newbie question, but what exactly does log.cleaner.enable=true do, and how
do I know if I need to set it to be true?

Also, if config changes like that need to be made once a cluster is up and
running, what's the recommended way to do that? Do you killall -12 kafka
and then make the change, and then start kafka again, one broker at a time?

On Mon, Oct 3, 2016 at 9:27 PM, Tom Crayford  wrote:

> Yes, offset topic compaction is just the normal compaction.
>
> Thanks
>
> Tom Crayford
> Heroku Kafka
>
> On Monday, 3 October 2016, Tobias Adamson  wrote:
>
> > Hi
> > We are using Kafka 0.10.1 with offsets commits being stored inside of
> Kafka
> > After a while these topics become extremely large and we are wondering if
> > we
> > need to enable log.cleaner.enable=true (currently false) to make sure the
> > internal
> > offset topics get compacted and keep their size down?
> >
> > Regards
> > T
>


Re: Architecture recommendations for a tricky use case

2016-09-29 Thread Ali Akhtar
Avi,

Why did you choose Druid over Postgres / Cassandra / Elasticsearch?

On Fri, Sep 30, 2016 at 1:09 AM, Avi Flax <avi.f...@parkassist.com> wrote:

>
> > On Sep 29, 2016, at 09:54, Ali Akhtar <ali.rac...@gmail.com> wrote:
> >
> > I'd appreciate some thoughts / suggestions on which of these
> alternatives I
> > should go with (e.g, using raw Kafka consumers vs Spark for ETL, which
> > persistent data store to use, and how to query that data store in the
> > backend of the web UI, for displaying the reports).
>
> Hi Ali, I’m no expert in any of this, but I’m working on a project that is
> broadly similar to yours, and FWIW I’m evaluating Druid as the datastore
> which would host the queryable data and, well, actually handle and fulfill
> queries.
>
> Since Druid has built-in support for streaming ingestion from Kafka
> topics, I’m tentatively thinking of doing my ETL in a stream processing
> topology (I’m using Kafka Streams, FWIW), which would write the events
> destined for Druid into certain topics, from which Druid would ingest those
> events.
>
> HTH,
> Avi
>
> 
> Software Architect @ Park Assist
> We’re hiring! http://tech.parkassist.com/jobs/
>
>


Re: Architecture recommendations for a tricky use case

2016-09-29 Thread Ali Akhtar
The business use case is to read a user's data from a variety of different
services through their API, and then allowing the user to query that data,
on a per service basis, as well as an aggregate across all services.

The way I'm considering doing it, is to do some basic ETL (drop all the
unnecessary fields, rename some fields into something more manageable, etc)
and then store the data in Cassandra / Postgres.

Then, when the user wants to view a particular report, query the respective
table in Cassandra / Postgres. (select .. from data where user = ? and date
between  and  and some_field = ?)

How will Spark Streaming help w/ aggregation? Couldn't the data be queried
from Cassandra / Postgres via the Kafka consumer and aggregated that way?

On Thu, Sep 29, 2016 at 8:43 PM, Cody Koeninger <c...@koeninger.org> wrote:

> No, direct stream in and of itself won't ensure an end-to-end
> guarantee, because it doesn't know anything about your output actions.
>
> You still need to do some work.  The point is having easy access to
> offsets for batches on a per-partition basis makes it easier to do
> that work, especially in conjunction with aggregation.
>
> On Thu, Sep 29, 2016 at 10:40 AM, Deepak Sharma <deepakmc...@gmail.com>
> wrote:
> > If you use spark direct streams , it ensure end to end guarantee for
> > messages.
> >
> >
> > On Thu, Sep 29, 2016 at 9:05 PM, Ali Akhtar <ali.rac...@gmail.com>
> wrote:
> >>
> >> My concern with Postgres / Cassandra is only scalability. I will look
> >> further into Postgres horizontal scaling, thanks.
> >>
> >> Writes could be idempotent if done as upserts, otherwise updates will be
> >> idempotent but not inserts.
> >>
> >> Data should not be lost. The system should be as fault tolerant as
> >> possible.
> >>
> >> What's the advantage of using Spark for reading Kafka instead of direct
> >> Kafka consumers?
> >>
> >> On Thu, Sep 29, 2016 at 8:28 PM, Cody Koeninger <c...@koeninger.org>
> >> wrote:
> >>>
> >>> I wouldn't give up the flexibility and maturity of a relational
> >>> database, unless you have a very specific use case.  I'm not trashing
> >>> cassandra, I've used cassandra, but if all I know is that you're doing
> >>> analytics, I wouldn't want to give up the ability to easily do ad-hoc
> >>> aggregations without a lot of forethought.  If you're worried about
> >>> scaling, there are several options for horizontally scaling Postgres
> >>> in particular.  One of the current best from what I've worked with is
> >>> Citus.
> >>>
> >>> On Thu, Sep 29, 2016 at 10:15 AM, Deepak Sharma <deepakmc...@gmail.com
> >
> >>> wrote:
> >>> > Hi Cody
> >>> > Spark direct stream is just fine for this use case.
> >>> > But why postgres and not cassandra?
> >>> > Is there anything specific here that i may not be aware?
> >>> >
> >>> > Thanks
> >>> > Deepak
> >>> >
> >>> > On Thu, Sep 29, 2016 at 8:41 PM, Cody Koeninger <c...@koeninger.org>
> >>> > wrote:
> >>> >>
> >>> >> How are you going to handle etl failures?  Do you care about lost /
> >>> >> duplicated data?  Are your writes idempotent?
> >>> >>
> >>> >> Absent any other information about the problem, I'd stay away from
> >>> >> cassandra/flume/hdfs/hbase/whatever, and use a spark direct stream
> >>> >> feeding postgres.
> >>> >>
> >>> >> On Thu, Sep 29, 2016 at 10:04 AM, Ali Akhtar <ali.rac...@gmail.com>
> >>> >> wrote:
> >>> >> > Is there an advantage to that vs directly consuming from Kafka?
> >>> >> > Nothing
> >>> >> > is
> >>> >> > being done to the data except some light ETL and then storing it
> in
> >>> >> > Cassandra
> >>> >> >
> >>> >> > On Thu, Sep 29, 2016 at 7:58 PM, Deepak Sharma
> >>> >> > <deepakmc...@gmail.com>
> >>> >> > wrote:
> >>> >> >>
> >>> >> >> Its better you use spark's direct stream to ingest from kafka.
> >>> >> >>
> >>> >> >> On Thu, Sep 29, 2016 at 8:24 PM, Ali Akhtar <
> ali.rac...@gmail.com>
> >>> >> >> wrote:
> >>> >> >>>
> >&g

Re: Architecture recommendations for a tricky use case

2016-09-29 Thread Ali Akhtar
My concern with Postgres / Cassandra is only scalability. I will look
further into Postgres horizontal scaling, thanks.

Writes could be idempotent if done as upserts, otherwise updates will be
idempotent but not inserts.

Data should not be lost. The system should be as fault tolerant as possible.

What's the advantage of using Spark for reading Kafka instead of direct
Kafka consumers?

On Thu, Sep 29, 2016 at 8:28 PM, Cody Koeninger <c...@koeninger.org> wrote:

> I wouldn't give up the flexibility and maturity of a relational
> database, unless you have a very specific use case.  I'm not trashing
> cassandra, I've used cassandra, but if all I know is that you're doing
> analytics, I wouldn't want to give up the ability to easily do ad-hoc
> aggregations without a lot of forethought.  If you're worried about
> scaling, there are several options for horizontally scaling Postgres
> in particular.  One of the current best from what I've worked with is
> Citus.
>
> On Thu, Sep 29, 2016 at 10:15 AM, Deepak Sharma <deepakmc...@gmail.com>
> wrote:
> > Hi Cody
> > Spark direct stream is just fine for this use case.
> > But why postgres and not cassandra?
> > Is there anything specific here that i may not be aware?
> >
> > Thanks
> > Deepak
> >
> > On Thu, Sep 29, 2016 at 8:41 PM, Cody Koeninger <c...@koeninger.org>
> wrote:
> >>
> >> How are you going to handle etl failures?  Do you care about lost /
> >> duplicated data?  Are your writes idempotent?
> >>
> >> Absent any other information about the problem, I'd stay away from
> >> cassandra/flume/hdfs/hbase/whatever, and use a spark direct stream
> >> feeding postgres.
> >>
> >> On Thu, Sep 29, 2016 at 10:04 AM, Ali Akhtar <ali.rac...@gmail.com>
> wrote:
> >> > Is there an advantage to that vs directly consuming from Kafka?
> Nothing
> >> > is
> >> > being done to the data except some light ETL and then storing it in
> >> > Cassandra
> >> >
> >> > On Thu, Sep 29, 2016 at 7:58 PM, Deepak Sharma <deepakmc...@gmail.com
> >
> >> > wrote:
> >> >>
> >> >> Its better you use spark's direct stream to ingest from kafka.
> >> >>
> >> >> On Thu, Sep 29, 2016 at 8:24 PM, Ali Akhtar <ali.rac...@gmail.com>
> >> >> wrote:
> >> >>>
> >> >>> I don't think I need a different speed storage and batch storage.
> Just
> >> >>> taking in raw data from Kafka, standardizing, and storing it
> somewhere
> >> >>> where
> >> >>> the web UI can query it, seems like it will be enough.
> >> >>>
> >> >>> I'm thinking about:
> >> >>>
> >> >>> - Reading data from Kafka via Spark Streaming
> >> >>> - Standardizing, then storing it in Cassandra
> >> >>> - Querying Cassandra from the web ui
> >> >>>
> >> >>> That seems like it will work. My question now is whether to use
> Spark
> >> >>> Streaming to read Kafka, or use Kafka consumers directly.
> >> >>>
> >> >>>
> >> >>> On Thu, Sep 29, 2016 at 7:41 PM, Mich Talebzadeh
> >> >>> <mich.talebza...@gmail.com> wrote:
> >> >>>>
> >> >>>> - Spark Streaming to read data from Kafka
> >> >>>> - Storing the data on HDFS using Flume
> >> >>>>
> >> >>>> You don't need Spark streaming to read data from Kafka and store on
> >> >>>> HDFS. It is a waste of resources.
> >> >>>>
> >> >>>> Couple Flume to use Kafka as source and HDFS as sink directly
> >> >>>>
> >> >>>> KafkaAgent.sources = kafka-sources
> >> >>>> KafkaAgent.sinks.hdfs-sinks.type = hdfs
> >> >>>>
> >> >>>> That will be for your batch layer. To analyse you can directly read
> >> >>>> from
> >> >>>> hdfs files with Spark or simply store data in a database of your
> >> >>>> choice via
> >> >>>> cron or something. Do not mix your batch layer with speed layer.
> >> >>>>
> >> >>>> Your speed layer will ingest the same data directly from Kafka into
> >> >>>> spark streaming and that will be  online or near real time (defined
> >> >>>> by your
> >> >>>&g

Re: Architecture recommendations for a tricky use case

2016-09-29 Thread Ali Akhtar
Is there an advantage to that vs directly consuming from Kafka? Nothing is
being done to the data except some light ETL and then storing it in
Cassandra

On Thu, Sep 29, 2016 at 7:58 PM, Deepak Sharma <deepakmc...@gmail.com>
wrote:

> Its better you use spark's direct stream to ingest from kafka.
>
> On Thu, Sep 29, 2016 at 8:24 PM, Ali Akhtar <ali.rac...@gmail.com> wrote:
>
>> I don't think I need a different speed storage and batch storage. Just
>> taking in raw data from Kafka, standardizing, and storing it somewhere
>> where the web UI can query it, seems like it will be enough.
>>
>> I'm thinking about:
>>
>> - Reading data from Kafka via Spark Streaming
>> - Standardizing, then storing it in Cassandra
>> - Querying Cassandra from the web ui
>>
>> That seems like it will work. My question now is whether to use Spark
>> Streaming to read Kafka, or use Kafka consumers directly.
>>
>>
>> On Thu, Sep 29, 2016 at 7:41 PM, Mich Talebzadeh <
>> mich.talebza...@gmail.com> wrote:
>>
>>> - Spark Streaming to read data from Kafka
>>> - Storing the data on HDFS using Flume
>>>
>>> You don't need Spark streaming to read data from Kafka and store on
>>> HDFS. It is a waste of resources.
>>>
>>> Couple Flume to use Kafka as source and HDFS as sink directly
>>>
>>> KafkaAgent.sources = kafka-sources
>>> KafkaAgent.sinks.hdfs-sinks.type = hdfs
>>>
>>> That will be for your batch layer. To analyse you can directly read from
>>> hdfs files with Spark or simply store data in a database of your choice via
>>> cron or something. Do not mix your batch layer with speed layer.
>>>
>>> Your speed layer will ingest the same data directly from Kafka into
>>> spark streaming and that will be  online or near real time (defined by your
>>> window).
>>>
>>> Then you have a a serving layer to present data from both speed  (the
>>> one from SS) and batch layer.
>>>
>>> HTH
>>>
>>>
>>>
>>>
>>> Dr Mich Talebzadeh
>>>
>>>
>>>
>>> LinkedIn * 
>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>
>>>
>>>
>>> http://talebzadehmich.wordpress.com
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>> On 29 September 2016 at 15:15, Ali Akhtar <ali.rac...@gmail.com> wrote:
>>>
>>>> The web UI is actually the speed layer, it needs to be able to query
>>>> the data online, and show the results in real-time.
>>>>
>>>> It also needs a custom front-end, so a system like Tableau can't be
>>>> used, it must have a custom backend + front-end.
>>>>
>>>> Thanks for the recommendation of Flume. Do you think this will work:
>>>>
>>>> - Spark Streaming to read data from Kafka
>>>> - Storing the data on HDFS using Flume
>>>> - Using Spark to query the data in the backend of the web UI?
>>>>
>>>>
>>>>
>>>> On Thu, Sep 29, 2016 at 7:08 PM, Mich Talebzadeh <
>>>> mich.talebza...@gmail.com> wrote:
>>>>
>>>>> You need a batch layer and a speed layer. Data from Kafka can be
>>>>> stored on HDFS using flume.
>>>>>
>>>>> -  Query this data to generate reports / analytics (There will be a
>>>>> web UI which will be the front-end to the data, and will show the reports)
>>>>>
>>>>> This is basically batch layer and you need something like Tableau or
>>>>> Zeppelin to query data
>>>>>
>>>>> You will also need spark streaming to query data online for speed
>>>>> layer. That data could be stored in some transient fabric like ignite or
>>>>> even druid.
>>>>>
>>>>> HTH
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>&g

Re: Architecture recommendations for a tricky use case

2016-09-29 Thread Ali Akhtar
I don't think I need a different speed storage and batch storage. Just
taking in raw data from Kafka, standardizing, and storing it somewhere
where the web UI can query it, seems like it will be enough.

I'm thinking about:

- Reading data from Kafka via Spark Streaming
- Standardizing, then storing it in Cassandra
- Querying Cassandra from the web ui

That seems like it will work. My question now is whether to use Spark
Streaming to read Kafka, or use Kafka consumers directly.


On Thu, Sep 29, 2016 at 7:41 PM, Mich Talebzadeh <mich.talebza...@gmail.com>
wrote:

> - Spark Streaming to read data from Kafka
> - Storing the data on HDFS using Flume
>
> You don't need Spark streaming to read data from Kafka and store on HDFS.
> It is a waste of resources.
>
> Couple Flume to use Kafka as source and HDFS as sink directly
>
> KafkaAgent.sources = kafka-sources
> KafkaAgent.sinks.hdfs-sinks.type = hdfs
>
> That will be for your batch layer. To analyse you can directly read from
> hdfs files with Spark or simply store data in a database of your choice via
> cron or something. Do not mix your batch layer with speed layer.
>
> Your speed layer will ingest the same data directly from Kafka into spark
> streaming and that will be  online or near real time (defined by your
> window).
>
> Then you have a a serving layer to present data from both speed  (the one
> from SS) and batch layer.
>
> HTH
>
>
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
> On 29 September 2016 at 15:15, Ali Akhtar <ali.rac...@gmail.com> wrote:
>
>> The web UI is actually the speed layer, it needs to be able to query the
>> data online, and show the results in real-time.
>>
>> It also needs a custom front-end, so a system like Tableau can't be used,
>> it must have a custom backend + front-end.
>>
>> Thanks for the recommendation of Flume. Do you think this will work:
>>
>> - Spark Streaming to read data from Kafka
>> - Storing the data on HDFS using Flume
>> - Using Spark to query the data in the backend of the web UI?
>>
>>
>>
>> On Thu, Sep 29, 2016 at 7:08 PM, Mich Talebzadeh <
>> mich.talebza...@gmail.com> wrote:
>>
>>> You need a batch layer and a speed layer. Data from Kafka can be stored
>>> on HDFS using flume.
>>>
>>> -  Query this data to generate reports / analytics (There will be a web
>>> UI which will be the front-end to the data, and will show the reports)
>>>
>>> This is basically batch layer and you need something like Tableau or
>>> Zeppelin to query data
>>>
>>> You will also need spark streaming to query data online for speed layer.
>>> That data could be stored in some transient fabric like ignite or even
>>> druid.
>>>
>>> HTH
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> Dr Mich Talebzadeh
>>>
>>>
>>>
>>> LinkedIn * 
>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>
>>>
>>>
>>> http://talebzadehmich.wordpress.com
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>> On 29 September 2016 at 15:01, Ali Akhtar <ali.rac...@gmail.com> wrote:
>>>
>>>> It needs to be able to scale to a very large amount of data, yes.
>>>>
>>>> On Thu, Sep 29, 2016 at 7:00 PM, Deepak Sharma <deepakmc...@gmail.com>
>>>> wrote:
>>>>
>>>>> What is the message inflow ?
>>>>> If it's really high

Re: Architecture recommendations for a tricky use case

2016-09-29 Thread Ali Akhtar
The web UI is actually the speed layer, it needs to be able to query the
data online, and show the results in real-time.

It also needs a custom front-end, so a system like Tableau can't be used,
it must have a custom backend + front-end.

Thanks for the recommendation of Flume. Do you think this will work:

- Spark Streaming to read data from Kafka
- Storing the data on HDFS using Flume
- Using Spark to query the data in the backend of the web UI?



On Thu, Sep 29, 2016 at 7:08 PM, Mich Talebzadeh <mich.talebza...@gmail.com>
wrote:

> You need a batch layer and a speed layer. Data from Kafka can be stored on
> HDFS using flume.
>
> -  Query this data to generate reports / analytics (There will be a web UI
> which will be the front-end to the data, and will show the reports)
>
> This is basically batch layer and you need something like Tableau or
> Zeppelin to query data
>
> You will also need spark streaming to query data online for speed layer.
> That data could be stored in some transient fabric like ignite or even
> druid.
>
> HTH
>
>
>
>
>
>
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
> On 29 September 2016 at 15:01, Ali Akhtar <ali.rac...@gmail.com> wrote:
>
>> It needs to be able to scale to a very large amount of data, yes.
>>
>> On Thu, Sep 29, 2016 at 7:00 PM, Deepak Sharma <deepakmc...@gmail.com>
>> wrote:
>>
>>> What is the message inflow ?
>>> If it's really high , definitely spark will be of great use .
>>>
>>> Thanks
>>> Deepak
>>>
>>> On Sep 29, 2016 19:24, "Ali Akhtar" <ali.rac...@gmail.com> wrote:
>>>
>>>> I have a somewhat tricky use case, and I'm looking for ideas.
>>>>
>>>> I have 5-6 Kafka producers, reading various APIs, and writing their raw
>>>> data into Kafka.
>>>>
>>>> I need to:
>>>>
>>>> - Do ETL on the data, and standardize it.
>>>>
>>>> - Store the standardized data somewhere (HBase / Cassandra / Raw HDFS /
>>>> ElasticSearch / Postgres)
>>>>
>>>> - Query this data to generate reports / analytics (There will be a web
>>>> UI which will be the front-end to the data, and will show the reports)
>>>>
>>>> Java is being used as the backend language for everything (backend of
>>>> the web UI, as well as the ETL layer)
>>>>
>>>> I'm considering:
>>>>
>>>> - Using raw Kafka consumers, or Spark Streaming, as the ETL layer
>>>> (receive raw data from Kafka, standardize & store it)
>>>>
>>>> - Using Cassandra, HBase, or raw HDFS, for storing the standardized
>>>> data, and to allow queries
>>>>
>>>> - In the backend of the web UI, I could either use Spark to run queries
>>>> across the data (mostly filters), or directly run queries against Cassandra
>>>> / HBase
>>>>
>>>> I'd appreciate some thoughts / suggestions on which of these
>>>> alternatives I should go with (e.g, using raw Kafka consumers vs Spark for
>>>> ETL, which persistent data store to use, and how to query that data store
>>>> in the backend of the web UI, for displaying the reports).
>>>>
>>>>
>>>> Thanks.
>>>>
>>>
>>
>


Re: Architecture recommendations for a tricky use case

2016-09-29 Thread Ali Akhtar
It needs to be able to scale to a very large amount of data, yes.

On Thu, Sep 29, 2016 at 7:00 PM, Deepak Sharma <deepakmc...@gmail.com>
wrote:

> What is the message inflow ?
> If it's really high , definitely spark will be of great use .
>
> Thanks
> Deepak
>
> On Sep 29, 2016 19:24, "Ali Akhtar" <ali.rac...@gmail.com> wrote:
>
>> I have a somewhat tricky use case, and I'm looking for ideas.
>>
>> I have 5-6 Kafka producers, reading various APIs, and writing their raw
>> data into Kafka.
>>
>> I need to:
>>
>> - Do ETL on the data, and standardize it.
>>
>> - Store the standardized data somewhere (HBase / Cassandra / Raw HDFS /
>> ElasticSearch / Postgres)
>>
>> - Query this data to generate reports / analytics (There will be a web UI
>> which will be the front-end to the data, and will show the reports)
>>
>> Java is being used as the backend language for everything (backend of the
>> web UI, as well as the ETL layer)
>>
>> I'm considering:
>>
>> - Using raw Kafka consumers, or Spark Streaming, as the ETL layer
>> (receive raw data from Kafka, standardize & store it)
>>
>> - Using Cassandra, HBase, or raw HDFS, for storing the standardized data,
>> and to allow queries
>>
>> - In the backend of the web UI, I could either use Spark to run queries
>> across the data (mostly filters), or directly run queries against Cassandra
>> / HBase
>>
>> I'd appreciate some thoughts / suggestions on which of these alternatives
>> I should go with (e.g, using raw Kafka consumers vs Spark for ETL, which
>> persistent data store to use, and how to query that data store in the
>> backend of the web UI, for displaying the reports).
>>
>>
>> Thanks.
>>
>


Architecture recommendations for a tricky use case

2016-09-29 Thread Ali Akhtar
I have a somewhat tricky use case, and I'm looking for ideas.

I have 5-6 Kafka producers, reading various APIs, and writing their raw
data into Kafka.

I need to:

- Do ETL on the data, and standardize it.

- Store the standardized data somewhere (HBase / Cassandra / Raw HDFS /
ElasticSearch / Postgres)

- Query this data to generate reports / analytics (There will be a web UI
which will be the front-end to the data, and will show the reports)

Java is being used as the backend language for everything (backend of the
web UI, as well as the ETL layer)

I'm considering:

- Using raw Kafka consumers, or Spark Streaming, as the ETL layer (receive
raw data from Kafka, standardize & store it)

- Using Cassandra, HBase, or raw HDFS, for storing the standardized data,
and to allow queries

- In the backend of the web UI, I could either use Spark to run queries
across the data (mostly filters), or directly run queries against Cassandra
/ HBase

I'd appreciate some thoughts / suggestions on which of these alternatives I
should go with (e.g, using raw Kafka consumers vs Spark for ETL, which
persistent data store to use, and how to query that data store in the
backend of the web UI, for displaying the reports).


Thanks.


Re: Zookeeper hostname/ip change

2016-09-25 Thread Ali Akhtar
Perhaps if you add 1 node, take down existing node, etc?

On Sun, Sep 25, 2016 at 10:37 PM, brenfield111 
wrote:

> I need to change the hostnames and ips for the Zookeeper ensemble
> serving my Kafka cluster.
>
> Will Kafka carry on as usual, along with it's existing ZK nodes, after
> making the config changes?
>
> Thanks
>


Re: java.nio.channels.ClosedChannelException in console-consumer.sh

2016-09-16 Thread Ali Akhtar
Some googling indicates that there are issues on AWS / EC2 when using the
private IP, and its recommended to use the public ip as the advertised
hostname instead.

I have zookeeper and Kafka both running on EC2, and both are in the same
availability zone, so both should be able to talk to each other using the
private IPs.

Shouldn't that be enough? I don't want to expose kafka publicly.

On Fri, Sep 16, 2016 at 10:48 PM, Ali Akhtar <ali.rac...@gmail.com> wrote:

> I've created a 3 broker kafka cluster, changing only the config values for
> broker id, log.dirs, and zookeeper connect. I left the remaining fields as
> default.
>
> The broker ids are 1, 2, 3. I opened the port 9092 on AWS.
>
> I then created a topic 'test' with replication factor of 2, and 3
> partitions.
>
> When I describe the topic using kafka-topics.sh --describe , it shows:
>
> Topic:test PartitionCount:3 ReplicationFactor:2 Configs:
> Topic: test Partition: 0 Leader: 1 Replicas: 1,3 Isr: 1,3
> Topic: test Partition: 1 Leader: 2 Replicas: 2,1 Isr: 2,1
> Topic: test Partition: 2 Leader: 3 Replicas: 3,2 Isr: 3,2
>
> So it looks like the 3 brokers have successfully connected to each other.
>
> I then tried running
>
> bin/kafka-console-consumer.sh --zookeeper $KAFKA_ZOOKEEPER_CONNECT --topic
>  test --from-beginning
>
> But it began to give me a lot of the following exceptions:
>
>  WARN Fetctestng topic metadata with correlation id 1 for topics
> [Set(test)] from broker [BrokerEndPoint(1,kafka1-1876849043-91zso,9092)]
> failed (kafka.client.ClientUtils$)
> java.nio.channels.ClosedChannelException
> at kafka.network.BlockingChannel.send(BlockingChannel.scala:110)
> at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:80)
> at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$
> doSend(SyncProducer.scala:79)
> at kafka.producer.SyncProducer.send(SyncProducer.scala:124)
> at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:59)
> at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:94)
> at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(
> ConsumerFetcherManager.scala:66)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
> [2016-09-16 17:22:11,323] WARN Fetctestng topic metadata with correlation
> id 2 for topics [Set(test)] from broker 
> [BrokerEndPoint(3,kafka3-2571399577-96he4,9092)]
> failed (kafka.client.ClientUtils$)
> java.nio.channels.ClosedChannelException
> at kafka.network.BlockingChannel.send(BlockingChannel.scala:110)
> at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:80)
> at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$
> doSend(SyncProducer.scala:79)
> at kafka.producer.SyncProducer.send(SyncProducer.scala:124)
> at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:59)
> at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:94)
> at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(
> ConsumerFetcherManager.scala:66)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
>
> Likewise when I run kafka-console-producer.sh , I see errors like:
>
> [2016-09-16 17:24:42,901] WARN Error while fetching metadata with
> correlation id 1343 : {test=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.
> NetworkClient)
> [2016-09-16 17:24:43,012] WARN Error while fetching metadata with
> correlation id 1344 : {test=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.
> NetworkClient)
> [2016-09-16 17:24:43,127] WARN Error while fetching metadata with
> correlation id 1345 : {test=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.
> NetworkClient)
>
> Any ideas what the problem here is?
>
> I'm using kafka_2.11-0.10.0.1
> <http://www-us.apache.org/dist/kafka/0.10.0.1/kafka_2.11-0.10.0.1.tgz>
>
>
>
>


java.nio.channels.ClosedChannelException in console-consumer.sh

2016-09-16 Thread Ali Akhtar
I've created a 3 broker kafka cluster, changing only the config values for
broker id, log.dirs, and zookeeper connect. I left the remaining fields as
default.

The broker ids are 1, 2, 3. I opened the port 9092 on AWS.

I then created a topic 'test' with replication factor of 2, and 3
partitions.

When I describe the topic using kafka-topics.sh --describe , it shows:

Topic:test PartitionCount:3 ReplicationFactor:2 Configs:
Topic: test Partition: 0 Leader: 1 Replicas: 1,3 Isr: 1,3
Topic: test Partition: 1 Leader: 2 Replicas: 2,1 Isr: 2,1
Topic: test Partition: 2 Leader: 3 Replicas: 3,2 Isr: 3,2

So it looks like the 3 brokers have successfully connected to each other.

I then tried running

bin/kafka-console-consumer.sh --zookeeper $KAFKA_ZOOKEEPER_CONNECT --topic
 test --from-beginning

But it began to give me a lot of the following exceptions:

 WARN Fetctestng topic metadata with correlation id 1 for topics
[Set(test)] from broker [BrokerEndPoint(1,kafka1-1876849043-91zso,9092)]
failed (kafka.client.ClientUtils$)
java.nio.channels.ClosedChannelException
at kafka.network.BlockingChannel.send(BlockingChannel.scala:110)
at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:80)
at
kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:79)
at kafka.producer.SyncProducer.send(SyncProducer.scala:124)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:59)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:94)
at
kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
[2016-09-16 17:22:11,323] WARN Fetctestng topic metadata with correlation
id 2 for topics [Set(test)] from broker
[BrokerEndPoint(3,kafka3-2571399577-96he4,9092)] failed
(kafka.client.ClientUtils$)
java.nio.channels.ClosedChannelException
at kafka.network.BlockingChannel.send(BlockingChannel.scala:110)
at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:80)
at
kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:79)
at kafka.producer.SyncProducer.send(SyncProducer.scala:124)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:59)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:94)
at
kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)

Likewise when I run kafka-console-producer.sh , I see errors like:

[2016-09-16 17:24:42,901] WARN Error while fetching metadata with
correlation id 1343 : {test=LEADER_NOT_AVAILABLE}
(org.apache.kafka.clients.NetworkClient)
[2016-09-16 17:24:43,012] WARN Error while fetching metadata with
correlation id 1344 : {test=LEADER_NOT_AVAILABLE}
(org.apache.kafka.clients.NetworkClient)
[2016-09-16 17:24:43,127] WARN Error while fetching metadata with
correlation id 1345 : {test=LEADER_NOT_AVAILABLE}
(org.apache.kafka.clients.NetworkClient)

Any ideas what the problem here is?

I'm using kafka_2.11-0.10.0.1



Re: which port should I use 9091 or 9092 or 2181 to send messages through kafka when using a client Library?

2016-09-15 Thread Ali Akhtar
Examine server.properties and see which port you're using in there

On Thu, Sep 15, 2016 at 3:52 PM, kant kodali  wrote:

> which port should I use 9091 or 9092 or 2181 to send messages through kafka
> when using a client Library?
> I start kafka as follows:
> sudo bin/zookeeper-server-start.sh config/zookeeper.propertiessudo
> ./bin/kafka-server-start.sh config/server.properties
>
> and I dont see any process running on 9091 or 9092 however lot of client
> library
> examples have a consumer client pointing to 9092. for example here
> https://github.com/apache/kafka/blob/trunk/examples/src/main
> /java/kafka/examples/Producer.java#L34
> shouldn't both producer and consumer point to zookeeper port 2181? which I
> am
> assuming will do the lookup?
> Thanks,Kant


Re: Publish to 1 topic, consume from N

2016-09-15 Thread Ali Akhtar
It sounds like you can implement the 'mapping service'  component yourself
using Kafka.

Have all of your messages go to one kafka topic. Have one consumer group
listening to this 'everything goes here' topic. This consumer group acts as
your mapping service. It looks at each message, and based on your rules, it
sends that message to a different topic for those specific rules.

Then you have your consumers listening to the specific topics that they
need to. Your mapping service does the job of redirecting messages from the
'everything' topic to the specific topics based on your rules.

On Thu, Sep 15, 2016 at 1:43 PM, Luiz Cordeiro <
luiz.corde...@mobilityhouse.com> wrote:

> Hello,
>
> We’re considering migrating an AMQ-based platform to Kafka. However our
> application logic needs an AMQ feature called Dynamic Binding, that is, on
> AMQ one publishes messages to an Exchange, which can be dynamically
> configured to deliver a copy of the message to several queues, based on
> binding rules. So when a new client comes alive, it may create its binding
> rules to specify a set of topics to listen to, and receive all the messages
> from these topics on a private queue.
>
> I understand that Kafka neither provides this nor will, as it is not its
> objective, but I was wondering if there’s another component, an overlay to
> Kafka, that could provide this feature while using Kafka behind the scenes
> for the persistence, something like this:
>
> Publisher --> Mapping Service --> Kafka <-- Consumers
>  ^  |
>  |   Binding rules  |
>  \--/
>
> Are you aware of such a component? Otherwise, how would you solve this
> issue of publish to 1 place and have it replicated on N topics.
>
> Best Regards,
> Luiz
>


Re: hi

2016-09-15 Thread Ali Akhtar
I'd post to the mailing list again with a new subject and ask that.

On Thu, Sep 15, 2016 at 1:52 PM, kant kodali <kanth...@gmail.com> wrote:

> I used node.js client libraries for all three and yes I want to make sure
> I am
> comparing apples to apples so I make it as equivalent as possible.
> Again the big question is What is the right setup for Kafka to be
> comparable
> with the other I mentioned in my previous email?
>
>
>
>
>
>
> On Thu, Sep 15, 2016 1:47 AM, Ali Akhtar ali.rac...@gmail.com
> wrote:
> The issue is clearly that you're running out of resources, so I would add
>
> more brokers and/or larger instances.
>
>
>
>
> You're also using Node which is not the best for performance. A compiled
>
> language such as Java would give you the best performance.
>
>
>
>
> Here's a case study that should help:
>
> https://engineering.linkedin.com/kafka/benchmarking-apache-k
> afka-2-million-writes-second-three-cheap-machines
>
>
>
>
> Good luck, let us know how it goes
>
>
>
>
> On Thu, Sep 15, 2016 at 1:42 PM, kant kodali <kanth...@gmail.com> wrote:
>
>
>
>
> yeah..
>>
>
> I tried it with 10 messages with single broker and only one partiton that
>>
>
> looked
>>
>
> instantaneous and ~5K messages/sec for the data size of 1KBI tried it with
>>
>
> 1000
>>
>
> messages that looked instantaneous as well ~5K messages/sec for the data
>>
>
> size of
>>
>
> 1KBI tried it with 10K messages with single broker and only one
>>
>
> partiton things
>>
>
> started to go down ~1K messages/sec for the data size of 1KB
>>
>
> having only one partition on a single broker is a bad? My goal is to run
>>
>
> some
>>
>
> basic benchmarks on NATS & NSQ & KAFKA
>>
>
> I have the same environment for all three (NATS & NSQ & KAFKA)
>>
>
> a broker on Machine 1producer on Machine 2Consumer on Machine 3
>>
>
> with a data size of 1KB (so each message is 1KB ) and m4.xlarge aws
>>
>
> instance.
>>
>
> I have pushed 300K messages with NATS and it was able to handle easily and
>>
>
> receive throughput was 5K messages/secI have pushed 300K messages and NSQ
>>
>
> and
>>
>
> receive throughput was 2K messages/secI am unable to push 300K messages
>>
>
> with
>>
>
> Kafka with the above configuration and environment so at this point my
>>
>
> biggest
>>
>
> question is what is the fair setup for Kafka so its comparable with NATS
>>
>
> and
>>
>
> NSQ?
>>
>
> kant
>>
>
>
>>
>
>>
>
>>
>
>>
>
>>
>
>>
> On Thu, Sep 15, 2016 12:43 AM, Ali Akhtar ali.rac...@gmail.com
>>
>
> wrote:
>>
>
> Lower the workload gradually, start from 10 messages, increase to 100, then
>>
>
>
>>
> 1000, and so on. See if it slows down as the workload increases. If so, you
>>
>
>
>>
> need more brokers + partitions to handle the workload.
>>
>
>
>>
>
>>
>
>>
>
>>
> On Thu, Sep 15, 2016 at 12:42 PM, kant kodali <kanth...@gmail.com> wrote:
>>
>
>
>>
>
>>
>
>>
>
>>
> > m4.xlarge
>>
>
>
>>
> >
>>
>
>
>>
> >
>>
>
>
>>
> >
>>
>
>
>>
> >
>>
>
>
>>
> >
>>
>
>
>>
> >
>>
>
>
>>
> > On Thu, Sep 15, 2016 12:33 AM, Ali Akhtar ali.rac...@gmail.com
>>
>
>
>>
> >
>>
>
>
>>
> > wrote:
>>
>
>
>>
> > What's the instance size that you're using? With 300k messages your
>>
>
> single
>>
>
>
>>
> >
>>
>
>
>>
> > broker might not be able to handle it.
>>
>
>
>>
> >
>>
>
>
>>
> >
>>
>
>
>>
> >
>>
>
>
>>
> >
>>
>
>
>>
> > On Thu, Sep 15, 2016 at 12:30 PM, kant kodali <kanth...@gmail.com>
>>
>
> wrote:
>>
>
>
>>
> >
>>
>
>
>>
> >
>>
>
>
>>
> >
>>
>
>
>>
> >
>>
>
>
>>
> > My goal is to test the throughput (#messages per second) given my setup
>>
>
> and
>>
>
>
>>
> >>
>>
>
>
>>
> >
>>
>
>
>>
> > with a data size of 1KB. if you guys a

Re: hi

2016-09-15 Thread Ali Akhtar
The issue is clearly that you're running out of resources, so I would add
more brokers and/or larger instances.

You're also using Node which is not the best for performance. A compiled
language such as Java would give you the best performance.

Here's a case study that should help:
https://engineering.linkedin.com/kafka/benchmarking-apache-kafka-2-million-writes-second-three-cheap-machines

Good luck, let us know how it goes

On Thu, Sep 15, 2016 at 1:42 PM, kant kodali <kanth...@gmail.com> wrote:

> yeah..
> I tried it with 10 messages with single broker and only one partiton that
> looked
> instantaneous and ~5K messages/sec for the data size of 1KBI tried it with
> 1000
> messages that looked instantaneous as well ~5K messages/sec for the data
> size of
> 1KBI tried it with 10K messages with single broker and only one
> partiton  things
> started to go down ~1K messages/sec for the data size of 1KB
> having only one partition on a single broker is a bad?  My goal is to run
> some
> basic benchmarks on NATS & NSQ & KAFKA
> I have the same environment for all three (NATS & NSQ & KAFKA)
> a broker  on Machine 1producer on Machine 2Consumer on Machine 3
> with a data size of 1KB (so each message is 1KB ) and m4.xlarge aws
> instance.
> I have pushed 300K messages with NATS and it was able to handle easily and
> receive throughput was 5K messages/secI have pushed 300K messages and NSQ
> and
> receive throughput was 2K messages/secI am unable to push 300K messages
> with
> Kafka with the above configuration and environment so at this point my
> biggest
> question is what is the fair setup for Kafka so its comparable with NATS
> and
> NSQ?
> kant
>
>
>
>
>
>
> On Thu, Sep 15, 2016 12:43 AM, Ali Akhtar ali.rac...@gmail.com
> wrote:
> Lower the workload gradually, start from 10 messages, increase to 100, then
>
> 1000, and so on. See if it slows down as the workload increases. If so, you
>
> need more brokers + partitions to handle the workload.
>
>
>
>
> On Thu, Sep 15, 2016 at 12:42 PM, kant kodali <kanth...@gmail.com> wrote:
>
>
>
>
> > m4.xlarge
>
> >
>
> >
>
> >
>
> >
>
> >
>
> >
>
> > On Thu, Sep 15, 2016 12:33 AM, Ali Akhtar ali.rac...@gmail.com
>
> >
>
> > wrote:
>
> > What's the instance size that you're using? With 300k messages your
> single
>
> >
>
> > broker might not be able to handle it.
>
> >
>
> >
>
> >
>
> >
>
> > On Thu, Sep 15, 2016 at 12:30 PM, kant kodali <kanth...@gmail.com>
> wrote:
>
> >
>
> >
>
> >
>
> >
>
> > My goal is to test the throughput (#messages per second) given my setup
> and
>
> >>
>
> >
>
> > with a data size of 1KB. if you guys already have some idea on these
>
> >>
>
> >
>
> > numbers
>
> >>
>
> >
>
> > that would be helpful as well.
>
> >>
>
> >
>
> >
>
> >>
>
> >
>
> >>
>
> >
>
> >>
>
> >
>
> >>
>
> >
>
> >>
>
> >
>
> >>
>
> > On Thu, Sep 15, 2016 12:24 AM, kant kodali kanth...@gmail.com
>
> >>
>
> >
>
> > wrote:
>
> >>
>
> >
>
> > 172.* is all private ip's for my machine I double checked it.I have not
>
> >>
>
> >
>
> > changed
>
> >>
>
> >
>
> > any default settingsI dont know how to use kafka-consumer.sh
>
> >>
>
> >
>
> > or kafka-producer.sh because it looks like they want me to specify a
> group
>
> >>
>
> >
>
> > and I
>
> >>
>
> >
>
> > didn't create any consumer group because I am using single producer and
>
> >>
>
> >
>
> > consumer. is there a default group?Also, I am receiving message but very
>
> >>
>
> >
>
> > late. I
>
> >>
>
> >
>
> > send about 300K messages using the node.js client and I am receiving at a
>
> >>
>
> >
>
> > very
>
> >>
>
> >
>
> > low rate. really not sure what is going on?
>
> >>
>
> >
>
> >
>
> >>
>
> >
>
> >>
>
> >
>
> >>
>
> >
>
> >>
>
> >
>
> >>
>
> >
>
> >>
>
> > On Thu, Sep 15, 2016 12:06 AM, Ali Akhtar ali.rac...@gmail.com
>
> >>
>
> >
>
> > 

  1   2   >