Re: Kafka consumer to unzip stream of .gz files and read

2018-05-21 Thread Liam Clarke
Also, if I recall correctly - the console producer uses a BufferedReader to
read from the console and assumes that a newline terminates a message, so
any byte of value 0A in your gzipped file will send a message.

I suggest using a Python producer to send your gzipped file.

Regards,

Liam Clarke

On Tue, May 22, 2018 at 10:59 AM, Koushik Chitta <
kchi...@microsoft.com.invalid> wrote:

> You should read the message value as byte array rather than string .
> Other Approach is , while producing you can use the kafka compression =
> GZIP to have similar results.
>
>
> -Original Message-
> From: mayur shah 
> Sent: Monday, May 21, 2018 1:50 AM
> To: users@kafka.apache.org; d...@kafka.apache.org
> Subject: Kafka consumer to unzip stream of .gz files and read
>
>  HI Team,
>
> Greeting!
>
> I am facing one issue on kafka consumer using python hope you guys help us
> to resolve this issue
>
> Kafka consumer to unzip stream of .gz files and read <
> https://na01.safelinks.protection.outlook.com/?url=
> https%3A%2F%2Fstackoverflow.com%2Fquestions%2F50232186%
> 2Fkafka-consumer-to-unzip-stream-of-gz-files-and-read&
> data=02%7C01%7Ckchitta%40microsoft.com%7Cf6bb56d82595416ead9508d5bef7e6c9%
> 7C72f988bf86f141af91ab2d7cd011db47%7C1%7C0%7C636624894296815698=
> 3d0yQUtWTq8AcpzDs01jqDPh2EsPeIztlznJmLbT0ns%3D=0>
>
> Kafka producer is sending .gz files but not able to decompress and read
> the files at the consumer end. Getting error as "IOError: Not a gzipped
> file"
>
> Producer -
>
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic Airport
> < ~/Downloads/stocks.json.gz
>
> Consumer -
>
> import sys import gzipimport StringIOfrom kafka import KafkaConsumer
>
> consumer = KafkaConsumer(KAFKA_TOPIC, bootstrap_servers=KAFKA_BROKERS)
> try:
> for message in consumer:
> f = StringIO.StringIO(message.value)
> gzip_f = gzip.GzipFile(fileobj=f)
> unzipped_content = gzip_f.read()
> content = unzipped_content.decode('utf8')
> print (content)except KeyboardInterrupt:
> sys.exit()
>
> Error at consumer -
>
> Traceback (most recent call last):
>   File "consumer.py", line 18, in 
> unzipped_content = gzip_f.read()
>   File "/usr/lib64/python2.6/gzip.py", line 212, in read
> self._read(readsize)
>   File "/usr/lib64/python2.6/gzip.py", line 255, in _read
> self._read_gzip_header()
>   File "/usr/lib64/python2.6/gzip.py", line 156, in _read_gzip_header
> raise IOError, 'Not a gzipped file'IOError: Not a gzipped file
>
> Regards,
> Mayur
>


Kafka behind NAT

2018-05-21 Thread 周正虎
We have kafka behind NAT with *only one broker*.
Let say we have internal (A) and external (B) network.

When we try to reach the broker from external network (we use
bootstrap.servers parameter set to B address) then what is obvious the
broker responds with internal network's address (A) which is not resolvable
in external network. We cannot set advertised.listeners to external
network's address because the broker is also used from internal network.

I hope that somebody dealt with simillar problem.
Thanks for any help.

RE: Kafka consumer to unzip stream of .gz files and read

2018-05-21 Thread Koushik Chitta
You should read the message value as byte array rather than string .
Other Approach is , while producing you can use the kafka compression = GZIP to 
have similar results.


-Original Message-
From: mayur shah  
Sent: Monday, May 21, 2018 1:50 AM
To: users@kafka.apache.org; d...@kafka.apache.org
Subject: Kafka consumer to unzip stream of .gz files and read

 HI Team,

Greeting!

I am facing one issue on kafka consumer using python hope you guys help us to 
resolve this issue

Kafka consumer to unzip stream of .gz files and read 


Kafka producer is sending .gz files but not able to decompress and read the 
files at the consumer end. Getting error as "IOError: Not a gzipped file"

Producer -

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic Airport < 
~/Downloads/stocks.json.gz

Consumer -

import sys import gzipimport StringIOfrom kafka import KafkaConsumer

consumer = KafkaConsumer(KAFKA_TOPIC, bootstrap_servers=KAFKA_BROKERS)
try:
for message in consumer:
f = StringIO.StringIO(message.value)
gzip_f = gzip.GzipFile(fileobj=f)
unzipped_content = gzip_f.read()
content = unzipped_content.decode('utf8')
print (content)except KeyboardInterrupt:
sys.exit()

Error at consumer -

Traceback (most recent call last):
  File "consumer.py", line 18, in 
unzipped_content = gzip_f.read()
  File "/usr/lib64/python2.6/gzip.py", line 212, in read
self._read(readsize)
  File "/usr/lib64/python2.6/gzip.py", line 255, in _read
self._read_gzip_header()
  File "/usr/lib64/python2.6/gzip.py", line 156, in _read_gzip_header
raise IOError, 'Not a gzipped file'IOError: Not a gzipped file

Regards,
Mayur


Re: Tools to view logs and/or clean up problems?

2018-05-21 Thread Skip Montanaro
> Unfortunately, Kafka's careful approach to shutdown means my script is
> probably not given enough time for a proper shutdown, and upon reboot, I
> often find my brokers won't accept connections.

Never mind. I believe I shot myself in the foot elsewhere. Not directly a
Kafka problem.

Skip


Re: Best practices

2018-05-21 Thread Matthias J. Sax
If you specify some bootstrap.servers, after connecting the producer
will learn about all available brokers automatically, by fetching
cluster metadata from the first broker it connects. Thus, in practice it
is usually sufficient to specify 3 to 5 brokers (in case one is down,
the producer can "bootstrap" itself connecting to any other broker in
the list initially).

Also note, that producer do no write to an arbitrary broker: for each
partition, there is a dedicates leader and the producer sends all write
to the leader (as explained in the blog post is shared in my last reply
--- please read it:
https://www.confluent.io/blog/hands-free-kafka-replication-a-lesson-in-operational-simplicity/)

For best practices, it's recommended to configure producer retries via
config parameter `retries` (default is zero). For this case, if a write
fails, the producer will retry the write internally (potentially to a
different broker in case the leader changed). Only after all retries are
exhausted, you would get a callback that indicates the failed write.


-Matthias

On 5/21/18 5:40 AM, Pavel Sapozhnikov wrote:
> If a process failed to write a message into one broker what is the best
> practice to write to same topic on different broker? Is there one? I should
> be able to get a list of brokers programmatically from zk path /brokers/ids
> ?
> 
> On Sun, May 20, 2018, 3:21 PM Matthias J. Sax  wrote:
> 
>> You can register a callback for each sent record to learn about
>> successful write or fail:
>>
>>> producer.send(record, callback);
>>
>> For replication, you don't need to send twice. If the replication factor
>> is configured broker side, the broker take care of replication
>> automatically.
>>
>> You can also configure when you want to be informed about successful
>> write: before or after replication.
>>
>> This blog post should help:
>>
>> https://www.confluent.io/blog/hands-free-kafka-replication-a-lesson-in-operational-simplicity/
>>
>>
>>
>> -Matthias
>>
>> On 5/20/18 11:00 AM, Pavel Sapozhnikov wrote:
>>> Hello
>>>
>>> What are the best practices when it comes to publishing a message into
>>> kafka. When sending a message into Kafka is it possible to know if that
>>> message has successfully been published? If not, what is the best
>> practice
>>> to know when messages are not getting published?
>>>
>>> Second question.
>>>
>>> If I have two kafka brokers and very simplistic one kafka topic
>> replicated
>>> on both. Do I need to send to both brokers. What are the best practices
>> for
>>> that.
>>>
>>
>>
> 



signature.asc
Description: OpenPGP digital signature


Re: Removing the Kafka DEBUG logs in catalina.out using log4j.properties

2018-05-21 Thread Karthick Kumar
Okay Kaufman, will check on that

On Sun, May 20, 2018 at 12:08 AM, Kaufman Ng  wrote:

> Karthick,
>
> I am suspecting that it could be the log4j.properties in your webapp is not
> picked up by Tomcat, or somehow Tomcat is overriding your webapp's log4j
> setup.
>
> On Tue, May 15, 2018 at 2:19 AM, Karthick Kumar 
> wrote:
>
> > Hi Andras/Kaufman,
> >
> > I have tried both logger properties, But it is not working for me.
> >
> > Here I share the some DEBUG logger message which is print in my tomcat
> node
> >
> >
> > DEBUG AbstractCoordinator:704 - Sending Heartbeat request for group
> > apptivodb5-newsfeed-messages-4-consumer to coordinator xx.xx.xx.xx:9092
> > (id: 2147483645 rack: null)
> > DEBUG AbstractCoordinator:717 - Received successful Heartbeat response
> for
> > group apptivodb4-newsfeed-messages-4-consumer
> > DEBUG AbstractCoordinator:717 - Received successful Heartbeat response
> for
> > group apptivodb5-newsfeed-messages-4-consumer
> > DEBUG Fetcher:180 - Sending fetch for partitions
> > [apptivodb5-newsfeed-messages-0-1] to broker xx.xx.xx.xx:9092 (id: 2
> rack:
> > null)
> > DEBUG Fetcher:180 - Sending fetch for partitions
> > [apptivodb1-campaign-launcher-1] to broker xx.xx.xx.xx:9092 (id: 2 rack:
> > null)
> > DEBUG Fetcher:180 - Sending fetch for partitions
> > [apptivodb8-campaign-tracker-email-0] to broker xx.xx.xx.xx:9092 (id: 2
> > rack: null)
> > DEBUG Fetcher:180 - Sending fetch for partitions
> > [apptivodb5-campaign-tracker-email-1] to broker xx.xx.xx.xx:9092 (id: 2
> > rack: null)
> >
> >
> >
> > On Mon, May 14, 2018 at 12:46 AM, Kaufman Ng 
> wrote:
> >
> > > Is the log coming from your application on Tomcat or Kafka? Make sure
> you
> > > set the right log4j properties file. In general you can set this in
> > > log4j.properties like this:
> > >
> > > log4j.rootLogger=INFO, stdout
> > >
> > > The line in your log4j.properties may look a little bit differently.
> The
> > > key thing is to set the root logger to "INFO".
> > >
> > > Hope this helps.
> > >
> > >
> > > On Sun, May 13, 2018 at 9:10 AM, Andras Beni 
> > > wrote:
> > >
> > > > Hi Kathick,
> > > >
> > > > You probably want to add this line to your log4j.properties:
> > > > log4j.logger.org.apache.kafka=INFO
> > > > This will remove all DEBUG lines where the logger name starts with
> > > > org.apache.kafka.
> > > >
> > > > HTH,
> > > > Andras
> > > >
> > > > On Fri, May 11, 2018 at 9:28 AM, Karthick Kumar <
> kku...@apptivo.co.in>
> > > > wrote:
> > > >
> > > > > Hi,
> > > > >
> > > > > I'm using tomcat node as a Kafka consumer, It prints the INFO,
> DEBUG
> > > and
> > > > > ERROR logs.
> > > > >
> > > > > When I analyzed in log file debug logs are taking more space. So
> i'm
> > > > having
> > > > > disk space issue.
> > > > >
> > > > > I'm using *log4j.properties* for managing the logs, Now I want to
> > > remove
> > > > > the DEBUG logs from my logger file.
> > > > >
> > > > > Anyone, please guide me to remove the DEBUG logs.
> > > > >
> > > > > --
> > > > > With Regards,
> > > > > Karthick.K
> > > > >
> > > >
> > >
> > >
> > >
> > > --
> > > Kaufman Ng
> > > +1 646 961 8063
> > > Solutions Architect | Confluent | www.confluent.io
> > >
> >
> >
> >
> > --
> > With Regards,
> > Karthick.K
> >
>
>
>
> --
> Kaufman Ng
> +1 646 961 8063
> Solutions Architect | Confluent | www.confluent.io
>



-- 
With Regards,
Karthick.K


Tools to view logs and/or clean up problems?

2018-05-21 Thread Skip Montanaro
I operate in a work environment where all the servers are rebooted once a
week whether they need it or not. An internal subsystem which gives me the
opportunity to run a command on shutdown, so I wrote a simple shell script
which runs kafka-server-stop.sh, waits a bit, then stops zookeeper.

Unfortunately, Kafka's careful approach to shutdown means my script is
probably not given enough time for a proper shutdown, and upon reboot, I
often find my brokers won't accept connections. Sometime they just need a
bit of time to recover. A few times, I've simply given up and deleted my
kafka-logs-N and zookeeper directories.

The latest reboot was early Saturday. Two days later, my dev clients still
can't connect to my dev brokers. Are there tools available which allow me
to view logs and perhaps recover state? None of the shell scripts in
.../bin looked promising.

Thanks,

Skip Montanaro


Re: Best practices

2018-05-21 Thread Pavel Sapozhnikov
If a process failed to write a message into one broker what is the best
practice to write to same topic on different broker? Is there one? I should
be able to get a list of brokers programmatically from zk path /brokers/ids
?

On Sun, May 20, 2018, 3:21 PM Matthias J. Sax  wrote:

> You can register a callback for each sent record to learn about
> successful write or fail:
>
> > producer.send(record, callback);
>
> For replication, you don't need to send twice. If the replication factor
> is configured broker side, the broker take care of replication
> automatically.
>
> You can also configure when you want to be informed about successful
> write: before or after replication.
>
> This blog post should help:
>
> https://www.confluent.io/blog/hands-free-kafka-replication-a-lesson-in-operational-simplicity/
>
>
>
> -Matthias
>
> On 5/20/18 11:00 AM, Pavel Sapozhnikov wrote:
> > Hello
> >
> > What are the best practices when it comes to publishing a message into
> > kafka. When sending a message into Kafka is it possible to know if that
> > message has successfully been published? If not, what is the best
> practice
> > to know when messages are not getting published?
> >
> > Second question.
> >
> > If I have two kafka brokers and very simplistic one kafka topic
> replicated
> > on both. Do I need to send to both brokers. What are the best practices
> for
> > that.
> >
>
>


Re: streams windowing question

2018-05-21 Thread Damian Guy
Hi Peter,

It depends on how you specify the JoinWindow. But using `JoinWindows.of(10
secocds)` would mean that a record will join with any other record with the
matching key that arrived between 10 seconds before it arrived and 10
seconds after it arrived.

So your example is correct. You would need to have a JoinWIndow large
enough to allow for the expected difference in arrival time.

Thanks,
Damian

On Sat, 19 May 2018 at 19:50 Peter Kleinmann  wrote:

> Hi Damian,
>
> thank you for the informative reply.I think this answers 95% of my
> questions (or maybe 100% and I missed the explanation).
>
> what is still unresolved is how to handle trades and risks that arrive far
> apart.
>
> Suppose we have
>
> timeToAllowAJoin = 10  seconds
>
> and we have
>
> Time | Trade | Risk
> 0s  --
>
> 1s Trade(t1, v1)
> 4s Trade(t2, v1)
> 5s Trade(t3, v1)
> 8s Risk(t2, v1)
> 10s --
> 14sRisk(t1, v1)
> 20s --
> 27sRisk(t4, v1)
> 30s --
> 37sRisk(t3, v1)
> 40s --
> 47sTrade(t4, v1)
> 50s --
>
>
> I think
> trades.join(risk, valueJoiner, JoinWindows.of(timeToAllowAJoin));
>
> will join
> Risk(t2,v1) -> Trade(t2,v1)
> for window 0-10s efficiently
>
> but I don't think I get the other joins, even running
> trades.join(risk, valueJoiner, JoinWindows.of(timeToAllowAJoin));
> for windows
> 10s - 20s
> 20s - 30s
> 40s - 40s
> If this is correct, then is there another common way to handle a scenario
> like the one above?
>
> thanks in advance,
>
> Peter
>
>
>
>
>
>
>
> On Fri, May 18, 2018 at 6:27 PM, Damian Guy  wrote:
>
>> Hi,
>>
>> In order to join the two streams they need to have the same key and the
>> same number of partitions in each topic. If they don't have the same key
>> you can force a repartition by using:
>>
>> `stream.selectKey(KeyValueMapper)`
>>
>> if the number of partitions is also different you could do:
>> `stream.selectKey(KeyValueMapper).through("your-new-topic")`
>>
>> You would need to create "your-new-topic" in advance with the correct
>> number of partitions.
>>
>> Now assuming that we have the same key and the same number of partitions,
>> the join is something like:
>>
>> `trades.join(risk, valueJoiner, JoinWindows.of(timeToAllowAJoin));`
>>
>> Because the trade and risk have the same key when a trade or risk event
>> arrives you will only join against the corresponding event (within the
>> time
>> window specified in the join). For example:
>>
>> Trade 
>> Trade 
>> Risk   -> join(Trade )
>> Risk -> join(Trade )
>>
>> Note: if multiple events for the same key arrive within the same
>> JoinWindow
>> you will get multiple outputs. However, you could avoid this from going
>> downstream by using `transformValues(..)` after the join. You would attach
>> a StateStore to the `transformValues`, i.e., by first creating the store
>> and then passing in the store name as a param to the method. Then when a
>> join result for a given key arrives, your transformer would first check in
>> the store if there was already a result, if there isn't a result update
>> the
>> store and send the result downstream. If there is a result you drop it.
>>
>> Regards,
>> Damian
>>
>>
>>
>>  -
>>
>> On Fri, 18 May 2018 at 22:57 Peter Kleinmann  wrote:
>>
>> > Dear community, sorry in advance for what will be a newbie question:
>> >
>> >
>> > suppose I have two topics
>> > trades
>> > risks
>> >
>> > and I want to join a trade in the trades topic to a risk message in the
>> > risks topic by fields tradeId, and version, which exist in both trade
>> and
>> > risk messages.
>> >
>> > Seems I can naturally create streams on top of each topic, but here is
>> the
>> > question:
>> >
>> > Suppose in one period between time boundary b0 and b1 trades t1 and t2
>> > arrive, and risk r1 matching t1 arrives.
>> >
>> > In the next period, risk r2 arrives matching t2.
>> >
>> > a) How do I join r2 to t2?
>> >
>> > b) How do I not reprocess t1 and r1?
>> >
>> > I'm going to have between 2 million and 25 million trades and risks a
>> day,
>> > so once a trade and risk has been matched, I dont want to handle them
>> > again.
>> >
>> > Do I need to sink the kafka topics to something like postgres, and have
>> a
>> > umatched trades table
>> > unmatched risks table
>> > matched table
>> >
>> > Many Many Thanks in Advance!!!
>> >
>>
>
>


Unclear client-to-broker communication

2018-05-21 Thread chw
Hi everybody,

the communication between the client and the broker is unclear to me.
The documentation states:

> The client initiates a socket connection and then writes a sequence of
> request messages and reads back the corresponding response message. No
> handshake is required on connection or disconnection.

Does the client hold the TCP connection for its whole livecycle? That
is, the client connects once to the broker and keeps the connection for
all subsequent requests/messages (as opposed to a HTTP request)?

As I know, the TCP requires a 3-way handshake to establish a connection.
However, the documentation states that no handskake is required. Could
anybody explain that point in more detail?

> TCP is happier if you maintain persistent connections used for many
> requests to amortize the cost of the TCP handshake, but beyond this
> penalty connecting is pretty cheap.
I do not understand what the purpose of this sentence is. On the one
hand, TCP is explained a little. On the other hand, a justification
concerning performance is made. But: none of this information helps the
user. Should I, as user, ensure that a connection is maintained
persistently or does Kafka do that for me?

It would be great, if someone could update the documentation accordingly.

Regards,
Christian




Kafka consumer to unzip stream of .gz files and read

2018-05-21 Thread mayur shah
 HI Team,

Greeting!

I am facing one issue on kafka consumer using python hope you guys help us
to resolve this issue

Kafka consumer to unzip stream of .gz files and read


Kafka producer is sending .gz files but not able to decompress and read the
files at the consumer end. Getting error as "IOError: Not a gzipped file"

Producer -

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic Airport
< ~/Downloads/stocks.json.gz

Consumer -

import sys import gzipimport StringIOfrom kafka import KafkaConsumer

consumer = KafkaConsumer(KAFKA_TOPIC, bootstrap_servers=KAFKA_BROKERS)
try:
for message in consumer:
f = StringIO.StringIO(message.value)
gzip_f = gzip.GzipFile(fileobj=f)
unzipped_content = gzip_f.read()
content = unzipped_content.decode('utf8')
print (content)except KeyboardInterrupt:
sys.exit()

Error at consumer -

Traceback (most recent call last):
  File "consumer.py", line 18, in 
unzipped_content = gzip_f.read()
  File "/usr/lib64/python2.6/gzip.py", line 212, in read
self._read(readsize)
  File "/usr/lib64/python2.6/gzip.py", line 255, in _read
self._read_gzip_header()
  File "/usr/lib64/python2.6/gzip.py", line 156, in _read_gzip_header
raise IOError, 'Not a gzipped file'IOError: Not a gzipped file

Regards,
Mayur