[jira] [Commented] (KAFKA-7630) Clarify that broker doesn't need scram username/password for delegation tokens

2018-11-14 Thread Andras Beni (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16687615#comment-16687615
 ] 

Andras Beni commented on KAFKA-7630:


[~asasvari], [~viktorsomogyi] you might want to take a look at this.

> Clarify that broker doesn't need scram username/password for delegation tokens
> --
>
> Key: KAFKA-7630
> URL: https://issues.apache.org/jira/browse/KAFKA-7630
> Project: Kafka
>  Issue Type: Improvement
>  Components: documentation, security
>Affects Versions: 2.0.0
>Reporter: Andras Beni
>Priority: Minor
>
> [Documentation|https://kafka.apache.org/documentation/#security_token_authentication]
>  on delegation tokens refers to SCRAM 
> [configuration|https://kafka.apache.org/documentation/#security_sasl_scram_brokerconfig]
>  section. However, in a setup where only delegation tokens use SCRAM and all 
> other authentication goes via Kerberos, {{ScramLoginModule}} does not need 
> {{username}} and {{password}}.
> This is not obvious from the documentation.
> I believe the same is true for setups where SCRAM is used by clients but 
> inter broker communication is GSSAPI or PLAIN, but have not tested it.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7630) Clarify that broker doesn't need scram username/password for delegation tokens

2018-11-14 Thread Andras Beni (JIRA)
Andras Beni created KAFKA-7630:
--

 Summary: Clarify that broker doesn't need scram username/password 
for delegation tokens
 Key: KAFKA-7630
 URL: https://issues.apache.org/jira/browse/KAFKA-7630
 Project: Kafka
  Issue Type: Improvement
  Components: documentation, security
Affects Versions: 2.0.0
Reporter: Andras Beni


[Documentation|https://kafka.apache.org/documentation/#security_token_authentication]
 on delegation tokens refers to SCRAM 
[configuration|https://kafka.apache.org/documentation/#security_sasl_scram_brokerconfig]
 section. However, in a setup where only delegation tokens use SCRAM and all 
other authentication goes via Kerberos, {{ScramLoginModule}} does not need 
{{username}} and {{password}}.

This is not obvious from the documentation.

I believe the same is true for setups where SCRAM is used by clients but inter 
broker communication is GSSAPI or PLAIN, but have not tested it.

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7628) KafkaStream is not closing

2018-11-14 Thread Ozgur (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7628?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16687486#comment-16687486
 ] 

Ozgur commented on KAFKA-7628:
--

Hi John,

It returns *false* from the close() method, but state is NOT_RUNNING. And still 
bounds to the TCP ports after a few mins. After trying to close stream, some 
sockets were in the state of {{CLOSE_WAIT like this output:}}

 
$ sudo lsof -i -n -P | grep 9092
java      29457          ozgur  133u  IPv6 0x531e550533f38283      0t0    TCP 
x.27.227.182:54419->x.x.164.33:9092 (ESTABLISHED)

java      29457          ozgur  134u  IPv6 0x531e55051a789ec3      0t0    TCP 
x.27.227.182:54420->x.x.164.45:9092 (ESTABLISHED)

java      29457          ozgur  135u  IPv6 0x531e55051a789903      0t0    TCP 
x.27.227.182:54421->x.x.164.25:9092 (ESTABLISHED)

java      29457          ozgur  136u  IPv6 0x531e55051a78aa43      0t0    TCP 
x.27.227.182:54422->x.x.164.25:9092 (ESTABLISHED)

java      29457          ozgur  140u  IPv6 0x531e55051a78c703      0t0    TCP 
x.27.227.182:54423->x.x.164.25:9092 (ESTABLISHED)

java      29457          ozgur  141u  IPv6 0x531e55051a78a483      0t0    TCP 
x.27.227.182:54424->x.x.164.45:9092 (CLOSE_WAIT)
java      29457          ozgur  141u  IPv6 0x531e55051a78a483      0t0    TCP 
x.27.227.182:54425->x.x.164.45:9092 (CLOSE_WAIT)

java      29457          ozgur  141u  IPv6 0x531e55051a78a483      0t0    TCP 
x.27.227.182:54426->x.x.164.45:9092 (ESTABLISHED)


Thanks

> KafkaStream is not closing
> --
>
> Key: KAFKA-7628
> URL: https://issues.apache.org/jira/browse/KAFKA-7628
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.1
> Environment: Macbook Pro
>Reporter: Ozgur
>Priority: Major
>
> I'm closing a KafkaStream when I need based on a certain condition:
> Closing:
>  
> {code:java}
> if(kafkaStream == null) {
> logger.info("KafkaStream already closed?");
> } else {
> boolean closed = kafkaStream.close(10L, TimeUnit.SECONDS);
> if(closed) {
> kafkaStream = null;
> logger.info("KafkaStream closed");
> } else {
> logger.info("KafkaStream could not closed");
> }
> }
> {code}
> Starting:
>  
> {code:java}
> if(kafkaStream == null) {
> logger.info("KafkaStream is starting");
> kafkaStream = 
> KafkaManager.getInstance().getStream(this.getConfigFilePath(),
> this,
> this.getTopic()
> );
> kafkaStream.start();
> logger.info("KafkaStream is started");
> }
> {code}
>  
>  
> In my implementation of Processor, {{process(String key, byte[] value)}} is 
> still called although successfully closing stream:
>  
> {code:java}
> // code placeholder
> public abstract class BaseKafkaProcessor implements Processor 
> {
> private static Logger logger = 
> LogManager.getLogger(BaseKafkaProcessor.class);
> private ProcessorContext context;
> private ProcessorContext getContext() {
> return context;
> }
> @Override
> public void init(ProcessorContext context) {
> this.context = context;
> this.context.schedule(1000);
> }
> @Override
> public void process(String key, byte[] value) {
> try {
> String topic = key.split("-")[0];
> byte[] uncompressed = GzipCompressionUtil.uncompress(value);
> String json = new String(uncompressed, "UTF-8");
> processRecord(topic, json);
> this.getContext().commit();
> } catch (Exception e) {
> logger.error("Error processing json", e);
> }
> }
> protected abstract void processRecord(String topic, String json);
> @Override
> public void punctuate(long timestamp) {
> this.getContext().commit();
> }
> @Override
> public void close() {
> this.getContext().commit();
> }
> }
> {code}
>  
> My configuration for KafkaStreams:
>  
> {code:java}
> application.id=dv_ws_in_app_activity_dev4
> bootstrap.servers=VLXH1
> auto.offset.reset=latest
> num.stream.threads=1
> key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
> value.serde=org.apache.kafka.common.serialization.Serdes$ByteArraySerde
> poll.ms = 100
> commit.interval.ms=1000
> state.dir=../../temp/kafka-state-dir
> {code}
> Version: *0.11.0.1* 
>  
> I'm witnessing that after closing() the streams, these ports are still 
> listening:
>  
> {code:java}
> $ sudo lsof -i -n -P | grep 9092
> java      29457          ozgur  133u  IPv6 0x531e550533f38283      0t0    TCP 
> x.27.227.182:54419->x.x.164.33:9092 (ESTABLISHED)
> java      29457          ozgur  134u  IPv6 0x531e55051a789ec3      0t0    TCP 
> 

[jira] [Commented] (KAFKA-7610) Detect consumer failures in initial JoinGroup

2018-11-14 Thread Boyang Chen (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7610?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16687475#comment-16687475
 ] 

Boyang Chen commented on KAFKA-7610:


Thanks [~guozhang] for jumping into the discussion! Just to clarify with 
[~hachikuji] so far the only goal of "detecting consumer failures in initial 
JoinGroup" is to prevent broker from memory burst due to many invalid member 
metadata correct? Do we have other considerations in this design?

> Detect consumer failures in initial JoinGroup
> -
>
> Key: KAFKA-7610
> URL: https://issues.apache.org/jira/browse/KAFKA-7610
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jason Gustafson
>Priority: Major
>
> The session timeout and heartbeating logic in the consumer allow us to detect 
> failures after a consumer joins the group. However, we have no mechanism to 
> detect failures during a consumer's initial JoinGroup when its memberId is 
> empty. When a client fails (e.g. due to a disconnect), the newly created 
> MemberMetadata will be left in the group metadata cache. Typically when this 
> happens, the client simply retries the JoinGroup. Every retry results in a 
> new dangling member created and left in the group. These members are doomed 
> to a session timeout when the group finally finishes the rebalance, but 
> before that time, they are occupying memory. In extreme cases, when a 
> rebalance is delayed (possibly due to a buggy application), this cycle can 
> repeat and the cache can grow quite large.
> There are a couple options that come to mind to fix the problem:
> 1. During the initial JoinGroup, we can detect failed members when the TCP 
> connection fails. This is difficult at the moment because we do not have a 
> mechanism to propagate disconnects from the network layer. A potential option 
> is to treat the disconnect as just another type of request and pass it to the 
> handlers through the request queue.
> 2. Rather than holding the JoinGroup in purgatory for an indefinite amount of 
> time, we can return earlier with the generated memberId and an error code 
> (say REBALANCE_IN_PROGRESS) to indicate that retry is needed to complete the 
> rebalance. The consumer can then poll for the rebalance using its assigned 
> memberId. And we can detect failures through the session timeout. Obviously 
> this option requires a KIP (and some more thought).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7192) State-store can desynchronise with changelog

2018-11-14 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7192?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16687443#comment-16687443
 ] 

ASF GitHub Bot commented on KAFKA-7192:
---

mjsax opened a new pull request #5915: KAFKA-7192: Wipe out state store if EOS 
is turned on and checkpoint file does not exist
URL: https://github.com/apache/kafka/pull/5915
 
 
   Follow up PR to #5657. Fixes buggy back port.
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> State-store can desynchronise with changelog
> 
>
> Key: KAFKA-7192
> URL: https://issues.apache.org/jira/browse/KAFKA-7192
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.3, 1.0.2, 1.1.1, 2.0.0
>Reporter: Jon Bates
>Assignee: Guozhang Wang
>Priority: Critical
>  Labels: bugs
> Fix For: 0.11.0.4, 1.0.3, 2.0.1, 2.1.0
>
>
> n.b. this bug has been verified with exactly-once processing enabled
> Consider the following scenario:
>  * A record, N is read into a Kafka topology
>  * the state store is updated
>  * the topology crashes
> h3. *Expected behaviour:*
>  # Node is restarted
>  # Offset was never updated, so record N is reprocessed
>  # State-store is reset to position N-1
>  # Record is reprocessed
> h3. *Actual Behaviour*
>  # Node is restarted
>  # Record N is reprocessed (good)
>  # The state store has the state from the previous processing
> I'd consider this a corruption of the state-store, hence the critical 
> Priority, although High may be more appropriate.
> I wrote a proof-of-concept here, which demonstrates the problem on Linux:
> [https://github.com/spadger/kafka-streams-sad-state-store]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7615) Support different topic name in source and destination server in Mirrormaker

2018-11-14 Thread James Cheng (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7615?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16687423#comment-16687423
 ] 

James Cheng commented on KAFKA-7615:


[~adeetikaushal]: This can be implemented by supplying a MessageHandler to 
mirrormaker. See 
[https://github.com/gwenshap/kafka-examples/tree/master/MirrorMakerHandler] for 
an example.

> Support different topic name in source and destination server in Mirrormaker
> 
>
> Key: KAFKA-7615
> URL: https://issues.apache.org/jira/browse/KAFKA-7615
> Project: Kafka
>  Issue Type: New Feature
>  Components: mirrormaker
>Reporter: Adeeti Kaushal
>Priority: Minor
>
> Currently mirrormaker only supports same topic name in source and destination 
> broker. Support for different topic names in source and destination brokers 
> is needed.
>  
> source broker : topic name -> topicA
> destination broker: topic name -> topicB
>  
> MirrorData from topicA to topicB



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-7626) Possible duplicate message delivery with exactly-once semantics

2018-11-14 Thread Noam Berman (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7626?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Noam Berman resolved KAFKA-7626.

Resolution: Not A Problem

> Possible duplicate message delivery with exactly-once semantics
> ---
>
> Key: KAFKA-7626
> URL: https://issues.apache.org/jira/browse/KAFKA-7626
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 1.1.0
> Environment: google cloud build docker, all brokers, consumers and 
> producers running on the same container (this is a test, not production).
>Reporter: Noam Berman
>Priority: Major
>  Labels: exactly-once
> Attachments: full.tar.gz, partial.log
>
>
> Hello,
> I've come across an issue with exactly-once processing (running kafka 1.1.0):
> In my test I bring up 3 brokers, and I start sending messages to `topicX`.
> While I'm sending messages, I bring up a few consumers on `topicX` one at a 
> time (all with the same group id) - and they produce the same message to 
> `topicY`. At some point I bring one broker down and up again, to check 
> resiliency to failures.
> Eventually I assert that `topicY` contains exactly the messages sent to 
> `topicX`.
> This usually works as expected, but when running the same test 1000s of times 
> to check for flakiness, some of them act as follows (in this order):
> 1. Consumer `C1` owns partition `p`.
> 1a. Consumers rebalance occurs (because one of the new consumers is starting).
> 1b. Consumer `C1` is revoked and then re-assigned partition `p`.
> 2. One of the 3 brokers starts controlled shutdown.
> 3. Consumer `C1` uses a transactional producer to send a message on offset 
> `o`.
> 4. Consumer `C1` sends offset `o+1` for partition `p` to the transaction.
> 5. Consumer `C1` successfully commits the message.
> 6. Broker controlled shutdown finishes successfully.
> ... a few seconds after...
> 7. Another consumer rebalance occurs, Consumer `C2` is assigned to partition 
> `p`.
> 8. Consumer `C2` polls message on offset `o` for partition `p`.
> This means we do double processing for the message on offset `o`, violating 
> exactly-once semantics.
> So it looks like during broker restart, a commit to the transactional 
> producer gets lost - and because we rebalance after that before another 
> commit happened, we actually poll the same message again, although previously 
> committed.
> The brokers are configured with:
> `transaction.state.log.min.isr=2`
> `transaction.state.log.replication.factor=3`
> `offsets.topic.replication.factor=3`
> The consumer is configured with
> `isolation.level=read_committed`
> The original producer to `topicX` has transactional semantics, and the test 
> shows that it didn't send double messages (using idempodent producer config).
>  
> Thanks!



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7626) Possible duplicate message delivery with exactly-once semantics

2018-11-14 Thread Noam Berman (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7626?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16687373#comment-16687373
 ] 

Noam Berman commented on KAFKA-7626:


after talking with Matthias J Sax in the confluent slack, it seems that there 
was a mis-use of the api - transactional.ids were unique which caused dual 
processing.

> Possible duplicate message delivery with exactly-once semantics
> ---
>
> Key: KAFKA-7626
> URL: https://issues.apache.org/jira/browse/KAFKA-7626
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 1.1.0
> Environment: google cloud build docker, all brokers, consumers and 
> producers running on the same container (this is a test, not production).
>Reporter: Noam Berman
>Priority: Major
>  Labels: exactly-once
> Attachments: full.tar.gz, partial.log
>
>
> Hello,
> I've come across an issue with exactly-once processing (running kafka 1.1.0):
> In my test I bring up 3 brokers, and I start sending messages to `topicX`.
> While I'm sending messages, I bring up a few consumers on `topicX` one at a 
> time (all with the same group id) - and they produce the same message to 
> `topicY`. At some point I bring one broker down and up again, to check 
> resiliency to failures.
> Eventually I assert that `topicY` contains exactly the messages sent to 
> `topicX`.
> This usually works as expected, but when running the same test 1000s of times 
> to check for flakiness, some of them act as follows (in this order):
> 1. Consumer `C1` owns partition `p`.
> 1a. Consumers rebalance occurs (because one of the new consumers is starting).
> 1b. Consumer `C1` is revoked and then re-assigned partition `p`.
> 2. One of the 3 brokers starts controlled shutdown.
> 3. Consumer `C1` uses a transactional producer to send a message on offset 
> `o`.
> 4. Consumer `C1` sends offset `o+1` for partition `p` to the transaction.
> 5. Consumer `C1` successfully commits the message.
> 6. Broker controlled shutdown finishes successfully.
> ... a few seconds after...
> 7. Another consumer rebalance occurs, Consumer `C2` is assigned to partition 
> `p`.
> 8. Consumer `C2` polls message on offset `o` for partition `p`.
> This means we do double processing for the message on offset `o`, violating 
> exactly-once semantics.
> So it looks like during broker restart, a commit to the transactional 
> producer gets lost - and because we rebalance after that before another 
> commit happened, we actually poll the same message again, although previously 
> committed.
> The brokers are configured with:
> `transaction.state.log.min.isr=2`
> `transaction.state.log.replication.factor=3`
> `offsets.topic.replication.factor=3`
> The consumer is configured with
> `isolation.level=read_committed`
> The original producer to `topicX` has transactional semantics, and the test 
> shows that it didn't send double messages (using idempodent producer config).
>  
> Thanks!



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7628) KafkaStream is not closing

2018-11-14 Thread John Roesler (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7628?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16687359#comment-16687359
 ] 

John Roesler commented on KAFKA-7628:
-

Hi [~lugrugzo],

To confirm, you see that Streams successfully closes, but afterwards, it's 
still bound to the TCP ports?

Have you noticed whether it stays bound indefinitely, or does it stop listening 
at some point after closing?

Thanks,

-John

> KafkaStream is not closing
> --
>
> Key: KAFKA-7628
> URL: https://issues.apache.org/jira/browse/KAFKA-7628
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.1
> Environment: Macbook Pro
>Reporter: Ozgur
>Priority: Major
>
> I'm closing a KafkaStream when I need based on a certain condition:
> Closing:
>  
> {code:java}
> if(kafkaStream == null) {
> logger.info("KafkaStream already closed?");
> } else {
> boolean closed = kafkaStream.close(10L, TimeUnit.SECONDS);
> if(closed) {
> kafkaStream = null;
> logger.info("KafkaStream closed");
> } else {
> logger.info("KafkaStream could not closed");
> }
> }
> {code}
> Starting:
>  
> {code:java}
> if(kafkaStream == null) {
> logger.info("KafkaStream is starting");
> kafkaStream = 
> KafkaManager.getInstance().getStream(this.getConfigFilePath(),
> this,
> this.getTopic()
> );
> kafkaStream.start();
> logger.info("KafkaStream is started");
> }
> {code}
>  
>  
> In my implementation of Processor, {{process(String key, byte[] value)}} is 
> still called although successfully closing stream:
>  
> {code:java}
> // code placeholder
> public abstract class BaseKafkaProcessor implements Processor 
> {
> private static Logger logger = 
> LogManager.getLogger(BaseKafkaProcessor.class);
> private ProcessorContext context;
> private ProcessorContext getContext() {
> return context;
> }
> @Override
> public void init(ProcessorContext context) {
> this.context = context;
> this.context.schedule(1000);
> }
> @Override
> public void process(String key, byte[] value) {
> try {
> String topic = key.split("-")[0];
> byte[] uncompressed = GzipCompressionUtil.uncompress(value);
> String json = new String(uncompressed, "UTF-8");
> processRecord(topic, json);
> this.getContext().commit();
> } catch (Exception e) {
> logger.error("Error processing json", e);
> }
> }
> protected abstract void processRecord(String topic, String json);
> @Override
> public void punctuate(long timestamp) {
> this.getContext().commit();
> }
> @Override
> public void close() {
> this.getContext().commit();
> }
> }
> {code}
>  
> My configuration for KafkaStreams:
>  
> {code:java}
> application.id=dv_ws_in_app_activity_dev4
> bootstrap.servers=VLXH1
> auto.offset.reset=latest
> num.stream.threads=1
> key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
> value.serde=org.apache.kafka.common.serialization.Serdes$ByteArraySerde
> poll.ms = 100
> commit.interval.ms=1000
> state.dir=../../temp/kafka-state-dir
> {code}
> Version: *0.11.0.1* 
>  
> I'm witnessing that after closing() the streams, these ports are still 
> listening:
>  
> {code:java}
> $ sudo lsof -i -n -P | grep 9092
> java      29457          ozgur  133u  IPv6 0x531e550533f38283      0t0    TCP 
> x.27.227.182:54419->x.x.164.33:9092 (ESTABLISHED)
> java      29457          ozgur  134u  IPv6 0x531e55051a789ec3      0t0    TCP 
> x.27.227.182:54420->x.x.164.45:9092 (ESTABLISHED)
> java      29457          ozgur  135u  IPv6 0x531e55051a789903      0t0    TCP 
> x.27.227.182:54421->x.x.164.25:9092 (ESTABLISHED)
> java      29457          ozgur  136u  IPv6 0x531e55051a78aa43      0t0    TCP 
> x.27.227.182:54422->x.x.164.25:9092 (ESTABLISHED)
> java      29457          ozgur  140u  IPv6 0x531e55051a78c703      0t0    TCP 
> x.27.227.182:54423->x.x.164.25:9092 (ESTABLISHED)
> java      29457          ozgur  141u  IPv6 0x531e55051a78a483      0t0    TCP 
> x.27.227.182:54424->x.x.164.45:9092 (ESTABLISHED)
> {code}
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7626) Possible duplicate message delivery with exactly-once semantics

2018-11-14 Thread Noam Berman (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7626?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16687330#comment-16687330
 ] 

Noam Berman commented on KAFKA-7626:


I set the kafka consumer/producer logs to TRACE so we can have a better 
understanding of the issue. 

Another run of the test discovered a similar issue (same configs for all 
consumers/producers/brokers as stated in bug description).

the attached file partial.log is an excerpt from the attached full log, it 
shows the relevant log entries for this scenario:
 * consumer-1 is assigned to all partitions at the beginning. we start by 
bringing down 1 broker.
the message "3" is sent to topic0 on offset 4 (key="3", value="3", offset=4).
 * it starts a transaction with test-producer-1, produces a message to topic1 
(just echo the message, same key/value), and on AddOffsetsToTxnRequest it gets 
stuck for 15 seconds. at this point the broker has finished going down, and it 
comes back up.
 * the consumers are configured with *max.poll.latency=5000*, so they rebalance 
after 5 seconds, while consumer-1 is stuck without polling.
 * consumer-4 gets partition 3 now, and it receives the message on offset 4 
since we didn't finish the transaction previously. it sends the message to 
topic1, commits the offset and the transaction using test-producer-2.
 * at this point we've handled offset 4 once.
 * some seconds after, test-producer-1 receives an error for 
AddOffsetsToTxnResponse, followed by a successful AddOffsetsToTxnResponse.
 * since there's no exception, it proceeds to complete the initial transaction, 
and succeeds. at this point we've handled [topic0 partition 3 offset 4] two 
times, breaking exactly-once semantics.

> Possible duplicate message delivery with exactly-once semantics
> ---
>
> Key: KAFKA-7626
> URL: https://issues.apache.org/jira/browse/KAFKA-7626
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 1.1.0
> Environment: google cloud build docker, all brokers, consumers and 
> producers running on the same container (this is a test, not production).
>Reporter: Noam Berman
>Priority: Major
>  Labels: exactly-once
> Attachments: full.tar.gz, partial.log
>
>
> Hello,
> I've come across an issue with exactly-once processing (running kafka 1.1.0):
> In my test I bring up 3 brokers, and I start sending messages to `topicX`.
> While I'm sending messages, I bring up a few consumers on `topicX` one at a 
> time (all with the same group id) - and they produce the same message to 
> `topicY`. At some point I bring one broker down and up again, to check 
> resiliency to failures.
> Eventually I assert that `topicY` contains exactly the messages sent to 
> `topicX`.
> This usually works as expected, but when running the same test 1000s of times 
> to check for flakiness, some of them act as follows (in this order):
> 1. Consumer `C1` owns partition `p`.
> 1a. Consumers rebalance occurs (because one of the new consumers is starting).
> 1b. Consumer `C1` is revoked and then re-assigned partition `p`.
> 2. One of the 3 brokers starts controlled shutdown.
> 3. Consumer `C1` uses a transactional producer to send a message on offset 
> `o`.
> 4. Consumer `C1` sends offset `o+1` for partition `p` to the transaction.
> 5. Consumer `C1` successfully commits the message.
> 6. Broker controlled shutdown finishes successfully.
> ... a few seconds after...
> 7. Another consumer rebalance occurs, Consumer `C2` is assigned to partition 
> `p`.
> 8. Consumer `C2` polls message on offset `o` for partition `p`.
> This means we do double processing for the message on offset `o`, violating 
> exactly-once semantics.
> So it looks like during broker restart, a commit to the transactional 
> producer gets lost - and because we rebalance after that before another 
> commit happened, we actually poll the same message again, although previously 
> committed.
> The brokers are configured with:
> `transaction.state.log.min.isr=2`
> `transaction.state.log.replication.factor=3`
> `offsets.topic.replication.factor=3`
> The consumer is configured with
> `isolation.level=read_committed`
> The original producer to `topicX` has transactional semantics, and the test 
> shows that it didn't send double messages (using idempodent producer config).
>  
> Thanks!



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7610) Detect consumer failures in initial JoinGroup

2018-11-14 Thread Guozhang Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7610?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16687336#comment-16687336
 ] 

Guozhang Wang commented on KAFKA-7610:
--

This is a great discussion!

1) I think today we do not fence JG request with empty member id: we will check 
if the non-empty member id is indeed contained in the group, but if it is empty 
we always assign a new member id and register it. Note for other non-JG 
requests, like commit request, we do use member id for fencing.

2) If we already do 1) as Jason suggested above, i.e. not storing the newly 
generated member id yet but wait for another JG request with that member id 
(i.e. fire and forget), then the group size should be reasonably bounded with 
the group size -- so far we have not really seen a real issue with the group 
message simply due to the group size too large I think -- and hence we can 
probably wait and see if we really need this config `group.max.size`.

> Detect consumer failures in initial JoinGroup
> -
>
> Key: KAFKA-7610
> URL: https://issues.apache.org/jira/browse/KAFKA-7610
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jason Gustafson
>Priority: Major
>
> The session timeout and heartbeating logic in the consumer allow us to detect 
> failures after a consumer joins the group. However, we have no mechanism to 
> detect failures during a consumer's initial JoinGroup when its memberId is 
> empty. When a client fails (e.g. due to a disconnect), the newly created 
> MemberMetadata will be left in the group metadata cache. Typically when this 
> happens, the client simply retries the JoinGroup. Every retry results in a 
> new dangling member created and left in the group. These members are doomed 
> to a session timeout when the group finally finishes the rebalance, but 
> before that time, they are occupying memory. In extreme cases, when a 
> rebalance is delayed (possibly due to a buggy application), this cycle can 
> repeat and the cache can grow quite large.
> There are a couple options that come to mind to fix the problem:
> 1. During the initial JoinGroup, we can detect failed members when the TCP 
> connection fails. This is difficult at the moment because we do not have a 
> mechanism to propagate disconnects from the network layer. A potential option 
> is to treat the disconnect as just another type of request and pass it to the 
> handlers through the request queue.
> 2. Rather than holding the JoinGroup in purgatory for an indefinite amount of 
> time, we can return earlier with the generated memberId and an error code 
> (say REBALANCE_IN_PROGRESS) to indicate that retry is needed to complete the 
> rebalance. The consumer can then poll for the rebalance using its assigned 
> memberId. And we can detect failures through the session timeout. Obviously 
> this option requires a KIP (and some more thought).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7626) Possible duplicate message delivery with exactly-once semantics

2018-11-14 Thread Noam Berman (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7626?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Noam Berman updated KAFKA-7626:
---
Attachment: full.tar.gz

> Possible duplicate message delivery with exactly-once semantics
> ---
>
> Key: KAFKA-7626
> URL: https://issues.apache.org/jira/browse/KAFKA-7626
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 1.1.0
> Environment: google cloud build docker, all brokers, consumers and 
> producers running on the same container (this is a test, not production).
>Reporter: Noam Berman
>Priority: Major
>  Labels: exactly-once
> Attachments: full.tar.gz, partial.log
>
>
> Hello,
> I've come across an issue with exactly-once processing (running kafka 1.1.0):
> In my test I bring up 3 brokers, and I start sending messages to `topicX`.
> While I'm sending messages, I bring up a few consumers on `topicX` one at a 
> time (all with the same group id) - and they produce the same message to 
> `topicY`. At some point I bring one broker down and up again, to check 
> resiliency to failures.
> Eventually I assert that `topicY` contains exactly the messages sent to 
> `topicX`.
> This usually works as expected, but when running the same test 1000s of times 
> to check for flakiness, some of them act as follows (in this order):
> 1. Consumer `C1` owns partition `p`.
> 1a. Consumers rebalance occurs (because one of the new consumers is starting).
> 1b. Consumer `C1` is revoked and then re-assigned partition `p`.
> 2. One of the 3 brokers starts controlled shutdown.
> 3. Consumer `C1` uses a transactional producer to send a message on offset 
> `o`.
> 4. Consumer `C1` sends offset `o+1` for partition `p` to the transaction.
> 5. Consumer `C1` successfully commits the message.
> 6. Broker controlled shutdown finishes successfully.
> ... a few seconds after...
> 7. Another consumer rebalance occurs, Consumer `C2` is assigned to partition 
> `p`.
> 8. Consumer `C2` polls message on offset `o` for partition `p`.
> This means we do double processing for the message on offset `o`, violating 
> exactly-once semantics.
> So it looks like during broker restart, a commit to the transactional 
> producer gets lost - and because we rebalance after that before another 
> commit happened, we actually poll the same message again, although previously 
> committed.
> The brokers are configured with:
> `transaction.state.log.min.isr=2`
> `transaction.state.log.replication.factor=3`
> `offsets.topic.replication.factor=3`
> The consumer is configured with
> `isolation.level=read_committed`
> The original producer to `topicX` has transactional semantics, and the test 
> shows that it didn't send double messages (using idempodent producer config).
>  
> Thanks!



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-7443) OffsetOutOfRangeException in restoring state store from changelog topic when start offset of local checkpoint is smaller than that of changelog topic

2018-11-14 Thread John Roesler (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7443?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler reassigned KAFKA-7443:
---

Assignee: John Roesler

> OffsetOutOfRangeException in restoring state store from changelog topic when 
> start offset of local checkpoint is smaller than that of changelog topic
> -
>
> Key: KAFKA-7443
> URL: https://issues.apache.org/jira/browse/KAFKA-7443
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.0.0, 2.1.0
>Reporter: linyue li
>Assignee: John Roesler
>Priority: Major
>  Labels: feather
>
> When restoring local state store from a changelog topic in EOS, kafka stream 
> will sometimes throw out the OffsetOutOfRangeException such as:
> {code:java}
> Restoring StreamTasks failed. Deleting StreamTasks stores to recreate from 
> scratch.
> org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Offsets out of 
> range with no configured reset policy for partitions: 
> {AuditTrailBatch_PROD3-Dedup-key-store-changelog-32=75465112}
>  at 
> org.apache.kafka.clients.consumer.internals.Fetcher.parseCompletedFetch(Fetcher.java:950)
>  at 
> org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:470)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1249)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1181)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1157)
>  at 
> org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:89)
>  at 
> org.apache.kafka.streams.processor.internals.TaskManager.updateNewAndRestoringTasks(TaskManager.java:321)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:822)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:765)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:734){code}
>  
> This scenario occurs when changelog topic deleted the expired log segments 
> according to the retention.ms, but the start offset in the local .checkpoint 
> file is the position when the task last exits from this instance, which may 
> be smaller than the updated beginning offset of changelog topic. Restoring 
> store from start offset in checkpoint file will throw exception.
> It can be reproduced as below (Kafka Stream runs in EOS):
>  # task for topic partition test-1 is running on instance A. When task exits, 
> kafka stream writes the last committed offset 100 for test-1 in checkpoint 
> file.
>  # task test-1 transfer to instance B.
>  # During this time, the remote changelog topic for test-1 updates its start 
> offset to 120 as the old log segment reaches retention time and is deleted.
>  # After a while, task test-1 exits from instance B and resumes on instance 
> A, and task restores local state store of A from checkpoint offset 100, which 
> is smaller than the valid offset 120 of changelog topic. Such exception 
> throws out.
> When this exception occurs, kafka stream tries to reinitialize the task and 
> intends to restore from beginning in catch block below. Unfortunately, this 
> handle not work and the task keeps throwing  OffsetOutOfRangeException in the 
> following restoring processes.
> {code:java}
> //org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
> //handle for OffsetOutOfRangeException in kafka stream
> catch (final InvalidOffsetException recoverableException) {
>  log.warn("Restoring StreamTasks failed. Deleting StreamTasks stores to 
> recreate from scratch.", recoverableException);
>  final Set partitions = recoverableException.partitions();
>  for (final TopicPartition partition : partitions) {
>final StreamTask task = active.restoringTaskFor(partition);
>log.info("Reinitializing StreamTask {} for changelog {}", task, partition);
>needsInitializing.remove(partition);
>needsRestoring.remove(partition);
>
> task.reinitializeStateStoresForPartitions(recoverableException.partitions());
>  }
>  restoreConsumer.seekToBeginning(partitions);
> }{code}
>  
>  Investigate why the handle for this exception not work, I found the root 
> cause:
>  Kafka stream registered state restorers in the variable stateRestorers, 
> which is used to read /update the start and end offset for restoring local 
> state store.
> {code:java}
> //org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
> private final Map stateRestorers = new 
> HashMap<>();{code}
> When the OffsetOutOfRangeException occurs, kafka stream should updates the 
> checkpoint 

[jira] [Commented] (KAFKA-7367) Streams should not create state store directories unless they are needed

2018-11-14 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7367?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16687113#comment-16687113
 ] 

ASF GitHub Bot commented on KAFKA-7367:
---

vvcephei closed pull request #5647: KAFKA-7367: Ensure stateless topologies 
don't require disk access
URL: https://github.com/apache/kafka/pull/5647
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/StatelessIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/StatelessIntegrationTest.java
new file mode 100644
index 000..f5981a984ae
--- /dev/null
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/StatelessIntegrationTest.java
@@ -0,0 +1,107 @@
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.SystemTime;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.test.TestUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.List;
+import java.util.Properties;
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+import static java.util.Arrays.asList;
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.produceKeyValuesSynchronously;
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinRecordsReceived;
+
+public class StatelessIntegrationTest {
+@Test
+public void statelessTopologiesShouldNotCreateDirectories() throws 
IOException, InterruptedException, ExecutionException {
+final EmbeddedKafkaCluster broker = new EmbeddedKafkaCluster(1);
+broker.start();
+broker.deleteAllTopicsAndWait(30_000L);
+
+final String applicationId = UUID.randomUUID().toString();
+
+final String inputTopic = "input" + applicationId;
+final String outputTopic = "output" + applicationId;
+
+broker.createTopic(inputTopic, 2, 1);
+broker.createTopic(outputTopic, 2, 1);
+
+final String path = TestUtils.tempDirectory(applicationId).getPath();
+
+final Properties streamsConfiguration = new Properties();
+streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, 
applicationId);
+streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
broker.bootstrapServers());
+
streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
+streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, path);
+streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 
1000);
+streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"earliest");
+
streamsConfiguration.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, 
true);
+
+final StreamsBuilder builder = new StreamsBuilder();
+final KStream input = builder.stream(inputTopic, 
Consumed.with(Serdes.String(), Serdes.String()));
+final KStream filter = input.filter((k, s) -> 
s.length() % 2 == 0);
+final KStream map = filter.map((k, v) -> new 
KeyValue<>(k, k + v));
+map.to(outputTopic, Produced.with(Serdes.String(), Serdes.String()));
+
+final Topology topology = builder.build();
+final KafkaStreams kafkaStreams = new KafkaStreams(topology, 
streamsConfiguration);
+try {
+kafkaStreams.start();
+
+final Properties producerConfig = new Properties();
+producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
Serdes.String().serializer().getClass());
+producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
Serdes.String().serializer().getClass());
+producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
broker.bootstrapServers());
+

[jira] [Commented] (KAFKA-5117) Kafka Connect REST endpoints reveal Password typed values

2018-11-14 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-5117?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16687038#comment-16687038
 ] 

ASF GitHub Bot commented on KAFKA-5117:
---

qiao-meng-zefr closed pull request #4269: KAFKA-5117: Add password masking for 
kafka connect REST endpoint
URL: https://github.com/apache/kafka/pull/4269
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
index fbe0ae2afb2..9eb2def0c64 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
@@ -184,6 +184,22 @@ public void resumeConnector(String connector) {
 configBackingStore.putTargetState(connector, TargetState.STARTED);
 }
 
+@Override
+public Map maskCredentials(String connName, Map config) {
+Map newConfig = new LinkedHashMap<>();
+for (Map.Entry entry : config.entrySet()) {
+// Password.toString() will return the hidden value
+String value = null;
+if (entry.getValue() != null) {
+value = entry.getValue().toString();
+}
+
+newConfig.put(entry.getKey(), value);
+}
+
+return newConfig;
+}
+
 @Override
 public Plugins plugins() {
 return worker.getPlugins();
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java
index 5dfb808f764..774dbaf8ca1 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java
@@ -169,6 +169,14 @@
  */
 void resumeConnector(String connector);
 
+/**
+ * Goes through config parameters and replace password field value with 
"[hidden"]
+ * @param connName name of the connector
+ * @param config configuration of the connector
+ * @return new map of the configurations, with password omitted from 
clear-text
+ */
+Map maskCredentials(String connName, Map 
config);
+
 /**
  * Returns a handle to the plugin factory used by this herder and its 
worker.
  *
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
index 2c031245c06..6e19cc729a8 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
@@ -50,6 +50,7 @@
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
+import java.util.ArrayList;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
@@ -109,7 +110,8 @@ public ConnectorInfo getConnector(final 
@PathParam("connector") String connector
   final @QueryParam("forward") Boolean 
forward) throws Throwable {
 FutureCallback cb = new FutureCallback<>();
 herder.connectorInfo(connector, cb);
-return completeOrForwardRequest(cb, "/connectors/" + connector, "GET", 
null, forward);
+ConnectorInfo connectorInfo = completeOrForwardRequest(cb, 
"/connectors/" + connector, "GET", null, forward);
+return new ConnectorInfo(connectorInfo.name(), 
herder.maskCredentials(connector, connectorInfo.config()), 
connectorInfo.tasks(), connectorInfo.type());
 }
 
 @GET
@@ -118,7 +120,8 @@ public ConnectorInfo getConnector(final 
@PathParam("connector") String connector
   final @QueryParam("forward") 
Boolean forward) throws Throwable {
 FutureCallback> cb = new FutureCallback<>();
 herder.connectorConfig(connector, cb);
-return completeOrForwardRequest(cb, "/connectors/" + connector + 
"/config", "GET", null, forward);
+Map config = completeOrForwardRequest(cb, 
"/connectors/" + connector + "/config", "GET", null, forward);
+return herder.maskCredentials(connector, config);
 }
 
 @GET
@@ -177,8 +180,14 @@ public Response resumeConnector(@PathParam("connector") 
String connector) {
  final @QueryParam("forward") Boolean 
forward) throws Throwable {
 FutureCallback> 

[jira] [Assigned] (KAFKA-7620) ConfigProvider is broken for KafkaConnect when TTL is not null

2018-11-14 Thread Robert Yokota (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7620?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Robert Yokota reassigned KAFKA-7620:


Assignee: Robert Yokota

> ConfigProvider is broken for KafkaConnect when TTL is not null
> --
>
> Key: KAFKA-7620
> URL: https://issues.apache.org/jira/browse/KAFKA-7620
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.0.0, 2.0.1
>Reporter: Ye Ji
>Assignee: Robert Yokota
>Priority: Major
>
> If the ConfigData returned by ConfigProvider.get implementations has non-null 
> and non-negative ttl, it will trigger infinite recursion, here is an excerpt 
> of the stack trace:
> {code:java}
> at 
> org.apache.kafka.connect.runtime.WorkerConfigTransformer.scheduleReload(WorkerConfigTransformer.java:62)
>   at 
> org.apache.kafka.connect.runtime.WorkerConfigTransformer.scheduleReload(WorkerConfigTransformer.java:56)
>   at 
> org.apache.kafka.connect.runtime.WorkerConfigTransformer.transform(WorkerConfigTransformer.java:49)
>   at 
> org.apache.kafka.connect.runtime.distributed.ClusterConfigState.connectorConfig(ClusterConfigState.java:121)
>   at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.connectorConfigReloadAction(DistributedHerder.java:648)
>   at 
> org.apache.kafka.connect.runtime.WorkerConfigTransformer.scheduleReload(WorkerConfigTransformer.java:62)
>   at 
> org.apache.kafka.connect.runtime.WorkerConfigTransformer.scheduleReload(WorkerConfigTransformer.java:56)
>   at 
> org.apache.kafka.connect.runtime.WorkerConfigTransformer.transform(WorkerConfigTransformer.java:49)
> {code}
> Basically, 
> 1) if a non-null ttl is returned from the config provider, connect runtime 
> will try to schedule a reload in the future, 
> 2) scheduleReload function reads the config again to see if it is a restart 
> or not, by calling 
> org.apache.kafka.connect.runtime.WorkerConfigTransformer.transform to 
> transform the config
> 3) the transform function calls config provider, and gets a non-null ttl, 
> causing scheduleReload being called, we are back to step 1.
> To reproduce, simply fork the provided 
> [FileConfigProvider|https://github.com/apache/kafka/blob/3cdc78e6bb1f83973a14ce1550fe3874f7348b05/clients/src/main/java/org/apache/kafka/common/config/provider/FileConfigProvider.java],
>  and add a non-negative ttl to the ConfigData returned by the get functions.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7629) Mirror maker goes into infinite loop

2018-11-14 Thread Darshan Mehta (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7629?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Darshan Mehta updated KAFKA-7629:
-
Description: 
*Setup:*

I have 2 kafka images running Spoify Kafka image : 
[https://hub.docker.com/r/spotify/kafka/|https://hub.docker.com/r/spotify/kafka/)]

Config:

Image 1:
 * host: kafka1
 * zk port : 2181
 * broker port : 9092

Image 2:
 * host: kafka2
 * zk port : 1181
 * broker port : 8092

Producer Properties for Mirror maker: 
{code:java}
bootstrap.servers=kafka2:8092
{code}
Consumer Properties for Mirror maker: 
{code:java}
bootstrap.servers=kafka1:9092
group.id=test-consumer-group
exclude.internal.topics=true
{code}
 

*Steps to replicate :*
 # Start mirror maker with following command : 
{code:java}
$KAFKA_INSTALLATION_DIR/bin/kafka-mirror-maker.sh --producer.config 
 --consumer.config  
--num.streams 1 --whitelist topic-1
{code}

 # Start local kafka console consumer to listen to topic-1 for kafka2:8092 
{code:java}
$KAFKA_INSTALLATION_DIR/bin/kafka-console-consumer.sh --bootstrap-server 
kafka2:8092 --topic topic-1
{code}

 # Produce an event to kafka1:9092 - topic-1  -> It will be printed on the 
console in Step 2
 # Stop mirror maker with ctrl+C (started in step 1)
 # Restart mirror maker with same command
 # Produce an event onto the same topic (i.e. repeat step 3)
 # Both source and destination will be flooded with the same messages until 
mirror maker is stopped

Surprisingly, source kafka also gets flooded with the same message. I believe 
when restarted, the mirror maker is unable to read the state?

  was:
*Setup:*

I have 2 kafka images running (Spoify Kafka image : 
[https://hub.docker.com/r/spotify/kafka/|https://hub.docker.com/r/spotify/kafka/)])

Config:

Image 1:
 * host: kafka1
 * zk port : 2181
 * broker port : 9092

Image 2:
 * host: kafka2
 * zk port : 1181
 * broker port : 8092

Producer Properties for Mirror maker: 
{code:java}
bootstrap.servers=kafka2:8092
{code}
Consumer Properties for Mirror maker: 
{code:java}
bootstrap.servers=kafka1:9092
group.id=test-consumer-group
exclude.internal.topics=true
{code}
 

*Steps to replicate :*
 # Start mirror maker with following command : 
{code:java}
$KAFKA_INSTALLATION_DIR/bin/kafka-mirror-maker.sh --producer.config 
 --consumer.config  
--num.streams 1 --whitelist topic-1
{code}

 # Start local kafka console consumer to listen to topic-1 for kafka2:8092 
{code:java}
$KAFKA_INSTALLATION_DIR/bin/kafka-console-consumer.sh --bootstrap-server 
kafka2:8092 --topic topic-1
{code}

 # Produce an event to kafka1:9092 - topic-1  -> It will be printed on the 
console in Step 2
 # Stop mirror maker with ctrl+C (started in step 1)
 # Restart mirror maker with same command
 # Produce an event onto the same topic (i.e. repeat step 3)
 # Both source and destination will be flooded with the same messages until 
mirror maker is stopped

Surprisingly, source kafka also gets flooded with the same message. I believe 
when restarted, the mirror maker is unable to read the state?


> Mirror maker goes into infinite loop
> 
>
> Key: KAFKA-7629
> URL: https://issues.apache.org/jira/browse/KAFKA-7629
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 2.0.0
> Environment: local
>Reporter: Darshan Mehta
>Priority: Major
>
> *Setup:*
> I have 2 kafka images running Spoify Kafka image : 
> [https://hub.docker.com/r/spotify/kafka/|https://hub.docker.com/r/spotify/kafka/)]
> Config:
> Image 1:
>  * host: kafka1
>  * zk port : 2181
>  * broker port : 9092
> Image 2:
>  * host: kafka2
>  * zk port : 1181
>  * broker port : 8092
> Producer Properties for Mirror maker: 
> {code:java}
> bootstrap.servers=kafka2:8092
> {code}
> Consumer Properties for Mirror maker: 
> {code:java}
> bootstrap.servers=kafka1:9092
> group.id=test-consumer-group
> exclude.internal.topics=true
> {code}
>  
> *Steps to replicate :*
>  # Start mirror maker with following command : 
> {code:java}
> $KAFKA_INSTALLATION_DIR/bin/kafka-mirror-maker.sh --producer.config 
>  --consumer.config  
> --num.streams 1 --whitelist topic-1
> {code}
>  # Start local kafka console consumer to listen to topic-1 for kafka2:8092 
> {code:java}
> $KAFKA_INSTALLATION_DIR/bin/kafka-console-consumer.sh --bootstrap-server 
> kafka2:8092 --topic topic-1
> {code}
>  # Produce an event to kafka1:9092 - topic-1  -> It will be printed on the 
> console in Step 2
>  # Stop mirror maker with ctrl+C (started in step 1)
>  # Restart mirror maker with same command
>  # Produce an event onto the same topic (i.e. repeat step 3)
>  # Both source and destination will be flooded with the same messages until 
> mirror maker is stopped
> Surprisingly, source kafka also gets flooded with the same message. I believe 
> when restarted, the mirror maker is unable to read the 

[jira] [Updated] (KAFKA-7629) Mirror maker goes into infinite loop

2018-11-14 Thread Darshan Mehta (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7629?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Darshan Mehta updated KAFKA-7629:
-
Description: 
*Setup:*

I have 2 kafka images running (Spoify Kafka image : 
[https://hub.docker.com/r/spotify/kafka/|https://hub.docker.com/r/spotify/kafka/)])

Config:

Image 1:
 * host: kafka1
 * zk port : 2181
 * broker port : 9092

Image 2:
 * host: kafka2
 * zk port : 1181
 * broker port : 8092

Producer Properties for Mirror maker: 
{code:java}
bootstrap.servers=kafka2:8092
{code}
Consumer Properties for Mirror maker: 
{code:java}
bootstrap.servers=kafka1:9092
group.id=test-consumer-group
exclude.internal.topics=true
{code}
 

*Steps to replicate :*
 # Start mirror maker with following command : 
{code:java}
$KAFKA_INSTALLATION_DIR/bin/kafka-mirror-maker.sh --producer.config 
 --consumer.config  
--num.streams 1 --whitelist topic-1
{code}

 # Start local kafka console consumer to listen to topic-1 for kafka2:8092 
{code:java}
$KAFKA_INSTALLATION_DIR/bin/kafka-console-consumer.sh --bootstrap-server 
kafka2:8092 --topic topic-1
{code}

 # Produce an event to kafka1:9092 - topic-1  -> It will be printed on the 
console in Step 2
 # Stop mirror maker with ctrl+C (started in step 1)
 # Restart mirror maker with same command
 # Produce an event onto the same topic (i.e. repeat step 3)
 # Both source and destination will be flooded with the same messages until 
mirror maker is stopped

Surprisingly, source kafka also gets flooded with the same message. I believe 
when restarted, the mirror maker is unable to read the state?

  was:
*Setup:*

I have 2 kafka images running (Spoify Kafka image : 
[https://hub.docker.com/r/spotify/kafka/)]

Config:

Image 1:
 * host: kafka1
 * zk port : 2181
 * broker port : 9092

Image 2:
 * host: kafka2
 * zk port : 1181
 * broker port : 8092

Producer Properties for Mirror maker: 
{code:java}
bootstrap.servers=kafka2:8092
{code}
Consumer Properties for Mirror maker: 
{code:java}
bootstrap.servers=kafka1:9092
group.id=test-consumer-group
exclude.internal.topics=true
{code}
 

*Steps to replicate :*
 # Start mirror maker with following command : 
{code:java}
$KAFKA_INSTALLATION_DIR/bin/kafka-mirror-maker.sh --producer.config 
 --consumer.config  
--num.streams 1 --whitelist topic-1
{code}

 # Start local kafka console consumer to listen to topic-1 for kafka2:8092 
{code:java}
$KAFKA_INSTALLATION_DIR/bin/kafka-console-consumer.sh --bootstrap-server 
kafka2:8092 --topic topic-1
{code}

 # Produce an event to kafka1:9092 - topic-1  -> It will be printed on the 
console in Step 2
 # Stop mirror maker with ctrl+C (started in step 1)
 # Restart mirror maker with same command
 # Produce an event onto the same topic (i.e. repeat step 3)
 # Both source and destination will be flooded with the same messages until 
mirror maker is stopped

Surprisingly, source kafka also gets flooded with the same message. I believe 
when restarted, the mirror maker is unable to read the state?


> Mirror maker goes into infinite loop
> 
>
> Key: KAFKA-7629
> URL: https://issues.apache.org/jira/browse/KAFKA-7629
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 2.0.0
> Environment: local
>Reporter: Darshan Mehta
>Priority: Major
>
> *Setup:*
> I have 2 kafka images running (Spoify Kafka image : 
> [https://hub.docker.com/r/spotify/kafka/|https://hub.docker.com/r/spotify/kafka/)])
> Config:
> Image 1:
>  * host: kafka1
>  * zk port : 2181
>  * broker port : 9092
> Image 2:
>  * host: kafka2
>  * zk port : 1181
>  * broker port : 8092
> Producer Properties for Mirror maker: 
> {code:java}
> bootstrap.servers=kafka2:8092
> {code}
> Consumer Properties for Mirror maker: 
> {code:java}
> bootstrap.servers=kafka1:9092
> group.id=test-consumer-group
> exclude.internal.topics=true
> {code}
>  
> *Steps to replicate :*
>  # Start mirror maker with following command : 
> {code:java}
> $KAFKA_INSTALLATION_DIR/bin/kafka-mirror-maker.sh --producer.config 
>  --consumer.config  
> --num.streams 1 --whitelist topic-1
> {code}
>  # Start local kafka console consumer to listen to topic-1 for kafka2:8092 
> {code:java}
> $KAFKA_INSTALLATION_DIR/bin/kafka-console-consumer.sh --bootstrap-server 
> kafka2:8092 --topic topic-1
> {code}
>  # Produce an event to kafka1:9092 - topic-1  -> It will be printed on the 
> console in Step 2
>  # Stop mirror maker with ctrl+C (started in step 1)
>  # Restart mirror maker with same command
>  # Produce an event onto the same topic (i.e. repeat step 3)
>  # Both source and destination will be flooded with the same messages until 
> mirror maker is stopped
> Surprisingly, source kafka also gets flooded with the same message. I believe 
> when restarted, the mirror maker is unable to read the state?



--
This message was sent by 

[jira] [Updated] (KAFKA-7629) Mirror maker goes into infinite loop

2018-11-14 Thread Darshan Mehta (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7629?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Darshan Mehta updated KAFKA-7629:
-
Description: 
*Setup:*

I have 2 kafka images running Spoify Kafka image 
[https://hub.docker.com/r/spotify/kafka]

Config:

Image 1:
 * host: kafka1
 * zk port : 2181
 * broker port : 9092

Image 2:
 * host: kafka2
 * zk port : 1181
 * broker port : 8092

Producer Properties for Mirror maker: 
{code:java}
bootstrap.servers=kafka2:8092
{code}
Consumer Properties for Mirror maker: 
{code:java}
bootstrap.servers=kafka1:9092
group.id=test-consumer-group
exclude.internal.topics=true
{code}
 

*Steps to replicate :*
 # Start mirror maker with following command : 
{code:java}
$KAFKA_INSTALLATION_DIR/bin/kafka-mirror-maker.sh --producer.config 
 --consumer.config  
--num.streams 1 --whitelist topic-1
{code}

 # Start local kafka console consumer to listen to topic-1 for kafka2:8092 
{code:java}
$KAFKA_INSTALLATION_DIR/bin/kafka-console-consumer.sh --bootstrap-server 
kafka2:8092 --topic topic-1
{code}

 # Produce an event to kafka1:9092 - topic-1  -> It will be printed on the 
console in Step 2
 # Stop mirror maker with ctrl+C (started in step 1)
 # Restart mirror maker with same command
 # Produce an event onto the same topic (i.e. repeat step 3)
 # Both source and destination will be flooded with the same messages until 
mirror maker is stopped

Surprisingly, source kafka also gets flooded with the same message. I believe 
when restarted, the mirror maker is unable to read the state?

  was:
*Setup:*

I have 2 kafka images running Spoify Kafka image : 
[https://hub.docker.com/r/spotify/kafka/|https://hub.docker.com/r/spotify/kafka/)]

Config:

Image 1:
 * host: kafka1
 * zk port : 2181
 * broker port : 9092

Image 2:
 * host: kafka2
 * zk port : 1181
 * broker port : 8092

Producer Properties for Mirror maker: 
{code:java}
bootstrap.servers=kafka2:8092
{code}
Consumer Properties for Mirror maker: 
{code:java}
bootstrap.servers=kafka1:9092
group.id=test-consumer-group
exclude.internal.topics=true
{code}
 

*Steps to replicate :*
 # Start mirror maker with following command : 
{code:java}
$KAFKA_INSTALLATION_DIR/bin/kafka-mirror-maker.sh --producer.config 
 --consumer.config  
--num.streams 1 --whitelist topic-1
{code}

 # Start local kafka console consumer to listen to topic-1 for kafka2:8092 
{code:java}
$KAFKA_INSTALLATION_DIR/bin/kafka-console-consumer.sh --bootstrap-server 
kafka2:8092 --topic topic-1
{code}

 # Produce an event to kafka1:9092 - topic-1  -> It will be printed on the 
console in Step 2
 # Stop mirror maker with ctrl+C (started in step 1)
 # Restart mirror maker with same command
 # Produce an event onto the same topic (i.e. repeat step 3)
 # Both source and destination will be flooded with the same messages until 
mirror maker is stopped

Surprisingly, source kafka also gets flooded with the same message. I believe 
when restarted, the mirror maker is unable to read the state?


> Mirror maker goes into infinite loop
> 
>
> Key: KAFKA-7629
> URL: https://issues.apache.org/jira/browse/KAFKA-7629
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 2.0.0
> Environment: local
>Reporter: Darshan Mehta
>Priority: Major
>
> *Setup:*
> I have 2 kafka images running Spoify Kafka image 
> [https://hub.docker.com/r/spotify/kafka]
> Config:
> Image 1:
>  * host: kafka1
>  * zk port : 2181
>  * broker port : 9092
> Image 2:
>  * host: kafka2
>  * zk port : 1181
>  * broker port : 8092
> Producer Properties for Mirror maker: 
> {code:java}
> bootstrap.servers=kafka2:8092
> {code}
> Consumer Properties for Mirror maker: 
> {code:java}
> bootstrap.servers=kafka1:9092
> group.id=test-consumer-group
> exclude.internal.topics=true
> {code}
>  
> *Steps to replicate :*
>  # Start mirror maker with following command : 
> {code:java}
> $KAFKA_INSTALLATION_DIR/bin/kafka-mirror-maker.sh --producer.config 
>  --consumer.config  
> --num.streams 1 --whitelist topic-1
> {code}
>  # Start local kafka console consumer to listen to topic-1 for kafka2:8092 
> {code:java}
> $KAFKA_INSTALLATION_DIR/bin/kafka-console-consumer.sh --bootstrap-server 
> kafka2:8092 --topic topic-1
> {code}
>  # Produce an event to kafka1:9092 - topic-1  -> It will be printed on the 
> console in Step 2
>  # Stop mirror maker with ctrl+C (started in step 1)
>  # Restart mirror maker with same command
>  # Produce an event onto the same topic (i.e. repeat step 3)
>  # Both source and destination will be flooded with the same messages until 
> mirror maker is stopped
> Surprisingly, source kafka also gets flooded with the same message. I believe 
> when restarted, the mirror maker is unable to read the state?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7629) Mirror maker goes into infinite loop

2018-11-14 Thread Darshan Mehta (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7629?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Darshan Mehta updated KAFKA-7629:
-
Description: 
*Setup:*

I have 2 kafka images running (Spoify Kafka image : 
[https://hub.docker.com/r/spotify/kafka/)]

Config:

Image 1:
 * host: kafka1
 * zk port : 2181
 * broker port : 9092

Image 2:
 * host: kafka2
 * zk port : 1181
 * broker port : 8092

Producer Properties for Mirror maker: 
{code:java}
bootstrap.servers=kafka2:8092
{code}
Consumer Properties for Mirror maker: 
{code:java}
bootstrap.servers=kafka1:9092
group.id=test-consumer-group
exclude.internal.topics=true
{code}
 

*Steps to replicate :*
 # Start mirror maker with following command : 
{code:java}
$KAFKA_INSTALLATION_DIR/bin/kafka-mirror-maker.sh --producer.config 
 --consumer.config  
--num.streams 1 --whitelist topic-1
{code}

 # Start local kafka console consumer to listen to topic-1 for kafka2:8092 
{code:java}
$KAFKA_INSTALLATION_DIR/bin/kafka-console-consumer.sh --bootstrap-server 
kafka2:8092 --topic topic-1
{code}

 # Produce an event to kafka1:9092 - topic-1  -> It will be printed on the 
console in Step 2
 # Stop mirror maker with ctrl+C (started in step 1)
 # Restart mirror maker with same command
 # Produce an event onto the same topic (i.e. repeat step 3)
 # Both source and destination will be flooded with the same messages until 
mirror maker is stopped

Surprisingly, source kafka also gets flooded with the same message. I believe 
when restarted, the mirror maker is unable to read the state?

  was:
*Setup:*

I have 2 kafka images running (Spoify Kafka image : 
[https://hub.docker.com/r/spotify/kafka/)]

Config:

Image 1:
 * host: kafka1
 * zk port : 2181
 * broker port : 9092

Image 2:
 * host: kafka2
 * zk port : 1181
 * broker port : 8092

Producer Properties for Mirror maker:

 
{code:java}
bootstrap.servers=kafka2:8092
{code}
Consumer Properties for Mirror maker:

 

 
{code:java}
bootstrap.servers=kafka1:9092
group.id=test-consumer-group
exclude.internal.topics=true
{code}
 

*Steps to replicate :*
 # Start mirror maker with following command : 
{code:java}
$KAFKA_INSTALLATION_DIR/bin/kafka-mirror-maker.sh --producer.config 
 --consumer.config  
--num.streams 1 --whitelist topic-1
{code}

 # Start local kafka console consumer to listen to topic-1 for kafka2:8092 
{code:java}
$KAFKA_INSTALLATION_DIR/bin/kafka-console-consumer.sh --bootstrap-server 
kafka2:8092 --topic topic-1
{code}

 # Produce an event to kafka1:9092 - topic-1  -> It will be printed on the 
console in Step 2
 # Stop mirror maker with ctrl+C (started in step 1)
 # Restart mirror maker with same command
 # Produce an event onto the same topic (i.e. repeat step 3)
 # Both source and destination will be flooded with the same messages until 
mirror maker is stopped

Surprisingly, source kafka also gets flooded with the same message. I believe 
when restarted, the mirror maker is unable to read the state?


> Mirror maker goes into infinite loop
> 
>
> Key: KAFKA-7629
> URL: https://issues.apache.org/jira/browse/KAFKA-7629
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 2.0.0
> Environment: local
>Reporter: Darshan Mehta
>Priority: Major
>
> *Setup:*
> I have 2 kafka images running (Spoify Kafka image : 
> [https://hub.docker.com/r/spotify/kafka/)]
> Config:
> Image 1:
>  * host: kafka1
>  * zk port : 2181
>  * broker port : 9092
> Image 2:
>  * host: kafka2
>  * zk port : 1181
>  * broker port : 8092
> Producer Properties for Mirror maker: 
> {code:java}
> bootstrap.servers=kafka2:8092
> {code}
> Consumer Properties for Mirror maker: 
> {code:java}
> bootstrap.servers=kafka1:9092
> group.id=test-consumer-group
> exclude.internal.topics=true
> {code}
>  
> *Steps to replicate :*
>  # Start mirror maker with following command : 
> {code:java}
> $KAFKA_INSTALLATION_DIR/bin/kafka-mirror-maker.sh --producer.config 
>  --consumer.config  
> --num.streams 1 --whitelist topic-1
> {code}
>  # Start local kafka console consumer to listen to topic-1 for kafka2:8092 
> {code:java}
> $KAFKA_INSTALLATION_DIR/bin/kafka-console-consumer.sh --bootstrap-server 
> kafka2:8092 --topic topic-1
> {code}
>  # Produce an event to kafka1:9092 - topic-1  -> It will be printed on the 
> console in Step 2
>  # Stop mirror maker with ctrl+C (started in step 1)
>  # Restart mirror maker with same command
>  # Produce an event onto the same topic (i.e. repeat step 3)
>  # Both source and destination will be flooded with the same messages until 
> mirror maker is stopped
> Surprisingly, source kafka also gets flooded with the same message. I believe 
> when restarted, the mirror maker is unable to read the state?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7629) Mirror maker goes into infinite loop

2018-11-14 Thread Darshan Mehta (JIRA)
Darshan Mehta created KAFKA-7629:


 Summary: Mirror maker goes into infinite loop
 Key: KAFKA-7629
 URL: https://issues.apache.org/jira/browse/KAFKA-7629
 Project: Kafka
  Issue Type: Bug
  Components: mirrormaker
Affects Versions: 2.0.0
 Environment: local
Reporter: Darshan Mehta


*Setup:*

I have 2 kafka images running (Spoify Kafka image : 
[https://hub.docker.com/r/spotify/kafka/)]

Config:

Image 1:
 * host: kafka1
 * zk port : 2181
 * broker port : 9092

Image 2:
 * host: kafka2
 * zk port : 1181
 * broker port : 8092

Producer Properties for Mirror maker:

 
{code:java}
bootstrap.servers=kafka2:8092
{code}
Consumer Properties for Mirror maker:

 

 
{code:java}
bootstrap.servers=kafka1:9092
group.id=test-consumer-group
exclude.internal.topics=true
{code}
 

*Steps to replicate :*
 # Start mirror maker with following command : 
{code:java}
$KAFKA_INSTALLATION_DIR/bin/kafka-mirror-maker.sh --producer.config 
 --consumer.config  
--num.streams 1 --whitelist topic-1
{code}

 # Start local kafka console consumer to listen to topic-1 for kafka2:8092 
{code:java}
$KAFKA_INSTALLATION_DIR/bin/kafka-console-consumer.sh --bootstrap-server 
kafka2:8092 --topic topic-1
{code}

 # Produce an event to kafka1:9092 - topic-1  -> It will be printed on the 
console in Step 2
 # Stop mirror maker with ctrl+C (started in step 1)
 # Restart mirror maker with same command
 # Produce an event onto the same topic (i.e. repeat step 3)
 # Both source and destination will be flooded with the same messages until 
mirror maker is stopped

Surprisingly, source kafka also gets flooded with the same message. I believe 
when restarted, the mirror maker is unable to read the state?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-5505) Connect: Do not restart connector and existing tasks on task-set change

2018-11-14 Thread Randall Hauch (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-5505?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Randall Hauch reassigned KAFKA-5505:


Assignee: Konstantine Karantasis

> Connect: Do not restart connector and existing tasks on task-set change
> ---
>
> Key: KAFKA-5505
> URL: https://issues.apache.org/jira/browse/KAFKA-5505
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 0.10.2.1
>Reporter: Per Steffensen
>Assignee: Konstantine Karantasis
>Priority: Major
>
> I am writing a connector with a frequently changing task-set. It is really 
> not working very well, because the connector and all existing tasks are 
> restarted when the set of tasks changes. E.g. if the connector is running 
> with 10 tasks, and an additional task is needed, the connector itself and all 
> 10 existing tasks are restarted, just to make the 11th task run also. My 
> tasks have a fairly heavy initialization, making it extra annoying. I would 
> like to see a change, introducing a "mode", where only new/deleted tasks are 
> started/stopped when notifying the system that the set of tasks changed 
> (calling context.requestTaskReconfiguration() - or something similar).
> Discussed this issue a little on d...@kafka.apache.org in the thread "Kafka 
> Connect: To much restarting with a SourceConnector with dynamic set of tasks"



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7628) KafkaStream is not closing

2018-11-14 Thread Ozgur (JIRA)
Ozgur created KAFKA-7628:


 Summary: KafkaStream is not closing
 Key: KAFKA-7628
 URL: https://issues.apache.org/jira/browse/KAFKA-7628
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 0.11.0.1
 Environment: Macbook Pro
Reporter: Ozgur


I'm closing a KafkaStream when I need based on a certain condition:

Closing:

 
{code:java}
if(kafkaStream == null) {
logger.info("KafkaStream already closed?");
} else {
boolean closed = kafkaStream.close(10L, TimeUnit.SECONDS);
if(closed) {
kafkaStream = null;
logger.info("KafkaStream closed");
} else {
logger.info("KafkaStream could not closed");
}
}
{code}
Starting:

 
{code:java}
if(kafkaStream == null) {
logger.info("KafkaStream is starting");
kafkaStream = 
KafkaManager.getInstance().getStream(this.getConfigFilePath(),
this,
this.getTopic()
);
kafkaStream.start();
logger.info("KafkaStream is started");
}
{code}
 

 

In my implementation of Processor, {{process(String key, byte[] value)}} is 
still called although successfully closing stream:

 
{code:java}
// code placeholder
public abstract class BaseKafkaProcessor implements Processor {
private static Logger logger = 
LogManager.getLogger(BaseKafkaProcessor.class);
private ProcessorContext context;


private ProcessorContext getContext() {
return context;
}

@Override
public void init(ProcessorContext context) {
this.context = context;
this.context.schedule(1000);
}


@Override
public void process(String key, byte[] value) {
try {
String topic = key.split("-")[0];
byte[] uncompressed = GzipCompressionUtil.uncompress(value);
String json = new String(uncompressed, "UTF-8");
processRecord(topic, json);
this.getContext().commit();
} catch (Exception e) {
logger.error("Error processing json", e);
}
}

protected abstract void processRecord(String topic, String json);

@Override
public void punctuate(long timestamp) {
this.getContext().commit();
}

@Override
public void close() {
this.getContext().commit();
}
}
{code}
 

My configuration for KafkaStreams:

 
{code:java}
application.id=dv_ws_in_app_activity_dev4
bootstrap.servers=VLXH1
auto.offset.reset=latest
num.stream.threads=1
key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
value.serde=org.apache.kafka.common.serialization.Serdes$ByteArraySerde
poll.ms = 100
commit.interval.ms=1000
state.dir=../../temp/kafka-state-dir
{code}
Version: *0.11.0.1* 

 

I'm witnessing that after closing() the streams, these ports are still 
listening:

 
{code:java}
$ sudo lsof -i -n -P | grep 9092
java      29457          ozgur  133u  IPv6 0x531e550533f38283      0t0    TCP 
x.27.227.182:54419->x.x.164.33:9092 (ESTABLISHED)

java      29457          ozgur  134u  IPv6 0x531e55051a789ec3      0t0    TCP 
x.27.227.182:54420->x.x.164.45:9092 (ESTABLISHED)

java      29457          ozgur  135u  IPv6 0x531e55051a789903      0t0    TCP 
x.27.227.182:54421->x.x.164.25:9092 (ESTABLISHED)

java      29457          ozgur  136u  IPv6 0x531e55051a78aa43      0t0    TCP 
x.27.227.182:54422->x.x.164.25:9092 (ESTABLISHED)

java      29457          ozgur  140u  IPv6 0x531e55051a78c703      0t0    TCP 
x.27.227.182:54423->x.x.164.25:9092 (ESTABLISHED)

java      29457          ozgur  141u  IPv6 0x531e55051a78a483      0t0    TCP 
x.27.227.182:54424->x.x.164.45:9092 (ESTABLISHED)
{code}
 

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7625) Kafka Broker node JVM crash - kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleEndTransaction

2018-11-14 Thread JIRA


 [ 
https://issues.apache.org/jira/browse/KAFKA-7625?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sebastian Puzoń updated KAFKA-7625:
---
 Attachment: hs_err_pid4194.log
 hs_err_pid4299.log
 hs_err_pid10238.log
 hs_err_pid15119.log
 hs_err_pid19131.log
 hs_err_pid22373.log
 hs_err_pid22386.log
 hs_err_pid22633.log
 hs_err_pid24681.log
 hs_err_pid25513.log
 hs_err_pid25701.log
 hs_err_pid26844.log
 hs_err_pid27290.log
Description: 
I observe broker node JVM crashes with same problematic frame:
{code:java}
#
# A fatal error has been detected by the Java Runtime Environment:
#
#  SIGSEGV (0xb) at pc=0x7ff4a2588261, pid=24681, tid=0x7ff3b9bb1700
#
# JRE version: Java(TM) SE Runtime Environment (8.0_92-b14) (build 1.8.0_92-b14)
# Java VM: Java HotSpot(TM) 64-Bit Server VM (25.92-b14 mixed mode linux-amd64 
compressed oops)
# Problematic frame:
# J 9736 C1 
kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleEndTransaction$7(Lkafka/coordinator/transaction/TransactionCoordinator;Ljava/lang/String;JSLorg/apache/kafka/common/requests/TransactionResult;Lkafka/coordinator/transaction/TransactionMetadata;)Lscala/util/Either;
 (518 bytes) @ 0x7ff4a2588261 [0x7ff4a25871a0+0x10c1]
#
# Failed to write core dump. Core dumps have been disabled. To enable core 
dumping, try "ulimit -c unlimited" before starting Java again
#
# If you would like to submit a bug report, please visit:
#   http://bugreport.java.com/bugreport/crash.jsp
#

---  T H R E A D  ---

Current thread (0x7ff4b356f800):  JavaThread "kafka-request-handler-3" 
daemon [_thread_in_Java, id=24781, stack(0x7ff3b9ab1000,0x7ff3b9bb2000)]
{code}
{code:java}
Stack: [0x7ff3b9ab1000,0x7ff3b9bb2000],  sp=0x7ff3b9bafca0,  free 
space=1019k
Native frames: (J=compiled Java code, j=interpreted, Vv=VM code, C=native code)
J 9736 C1 
kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleEndTransaction$7(Lkafka/coordinator/transaction/TransactionCoordinator;Ljava/lang/String;JSLorg/apache/kafka/common/requests/TransactionResult;Lkafka/coordinator/transaction/TransactionMetadata;)Lscala/util/Either;
 (518 bytes) @ 0x7ff4a2588261 [0x7ff4a25871a0+0x10c1]
J 10456 C2 
kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleEndTransaction$5(Lkafka/coordinator/transaction/TransactionCoordinator;Ljava/lang/String;JSLorg/apache/kafka/common/requests/TransactionResult;ILscala/Option;)Lscala/util/Either;
 (192 bytes) @ 0x7ff4a1d413f0 [0x7ff4a1d41240+0x1b0]
J 9303 C1 
kafka.coordinator.transaction.TransactionCoordinator$$Lambda$1107.apply(Ljava/lang/Object;)Ljava/lang/Object;
 (32 bytes) @ 0x7ff4a245f55c [0x7ff4a245f3c0+0x19c]
J 10018 C2 
scala.util.Either$RightProjection.flatMap(Lscala/Function1;)Lscala/util/Either; 
(43 bytes) @ 0x7ff4a1f242c4 [0x7ff4a1f24260+0x64]
J 9644 C1 
kafka.coordinator.transaction.TransactionCoordinator.sendTxnMarkersCallback$1(Lorg/apache/kafka/common/protocol/Errors;Ljava/lang/String;JSLorg/apache/kafka/common/requests/TransactionResult;Lscala/Function1;ILkafka/coordinator/transaction/TxnTransitMetadata;)V
 (251 bytes) @ 0x7ff4a1ef6254 [0x7ff4a1ef5120+0x1134]
J 9302 C1 
kafka.coordinator.transaction.TransactionCoordinator$$Lambda$1106.apply(Ljava/lang/Object;)Ljava/lang/Object;
 (40 bytes) @ 0x7ff4a24747ec [0x7ff4a24745a0+0x24c]
J 10125 C2 
kafka.coordinator.transaction.TransactionStateManager.updateCacheCallback$1(Lscala/collection/Map;Ljava/lang/String;ILkafka/coordinator/transaction/TxnTransitMetadata;Lscala/Function1;Lscala/Function1;Lorg/apache/kafka/common/TopicPartition;)V
 (892 bytes) @ 0x7ff4a27045ec [0x7ff4a2703c60+0x98c]
J 10051 C2 
kafka.coordinator.transaction.TransactionStateManager$$Lambda$814.apply(Ljava/lang/Object;)Ljava/lang/Object;
 (36 bytes) @ 0x7ff4a1a9cd08 [0x7ff4a1a9cc80+0x88]
J 9349 C2 kafka.server.DelayedProduce.tryComplete()Z (52 bytes) @ 
0x7ff4a1e46e5c [0x7ff4a1e46980+0x4dc]
J 10111 C2 
kafka.server.DelayedOperationPurgatory.tryCompleteElseWatch(Lkafka/server/DelayedOperation;Lscala/collection/Seq;)Z
 (147 bytes) @ 0x7ff4a1c6e000 [0x7ff4a1c6df20+0xe0]
J 10448 C2 
kafka.server.ReplicaManager.appendRecords(JSZZLscala/collection/Map;Lscala/Function1;Lscala/Option;Lscala/Function1;)V
 (237 bytes) @ 0x7ff4a2340b6c [0x7ff4a233f3e0+0x178c]
J 10050 C2 
kafka.coordinator.transaction.TransactionStateManager.$anonfun$appendTransactionToLog$13(Lkafka/coordinator/transaction/TransactionStateManager;Ljava/lang/String;ILkafka/coordinator/transaction/TxnTransitMetadata;Lscala/Function1;Lscala/Function1;Lorg/apache/kafka/common/TopicPartition;Lscala/collection/immutable/Map;)V
 (294 bytes) @ 

[jira] [Created] (KAFKA-7626) Possible duplicate message delivery with exactly-once semantics

2018-11-14 Thread Noam Berman (JIRA)
Noam Berman created KAFKA-7626:
--

 Summary: Possible duplicate message delivery with exactly-once 
semantics
 Key: KAFKA-7626
 URL: https://issues.apache.org/jira/browse/KAFKA-7626
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 1.1.0
 Environment: google cloud build docker, all brokers, consumers and 
producers running on the same container (this is a test, not production).
Reporter: Noam Berman


Hello,
I've come across an issue with exactly-once processing (running kafka 1.1.0):
In my test I bring up 3 brokers, and I start sending messages to `topicX`.
While I'm sending messages, I bring up a few consumers on `topicX` one at a 
time (all with the same group id) - and they produce the same message to 
`topicY`. At some point I bring one broker down and up again, to check 
resiliency to failures.


Eventually I assert that `topicY` contains exactly the messages sent to 
`topicX`.
This usually works as expected, but when running the same test 1000s of times 
to check for flakiness, some of them act as follows (in this order):
1. Consumer `C1` owns partition `p`.
1a. Consumers rebalance occurs (because one of the new consumers is starting).
1b. Consumer `C1` is revoked and then re-assigned partition `p`.
2. One of the 3 brokers starts controlled shutdown.
3. Consumer `C1` uses a transactional producer to send a message on offset `o`.
4. Consumer `C1` sends offset `o+1` for partition `p` to the transaction.
5. Consumer `C1` successfully commits the message.
6. Broker controlled shutdown finishes successfully.
... a few seconds after...
7. Another consumer rebalance occurs, Consumer `C2` is assigned to partition 
`p`.
8. Consumer `C2` polls message on offset `o` for partition `p`.

This means we do double processing for the message on offset `o`, violating 
exactly-once semantics.

So it looks like during broker restart, a commit to the transactional producer 
gets lost - and because we rebalance after that before another commit happened, 
we actually poll the same message again, although previously committed.

The brokers are configured with:
`transaction.state.log.min.isr=2`
`transaction.state.log.replication.factor=3`
`offsets.topic.replication.factor=3`

The consumer is configured with
`isolation.level=read_committed`

The original producer to `topicX` has transactional semantics, and the test 
shows that it didn't send double messages (using idempodent producer config).

 

Thanks!



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7625) Kafka Broker node JVM crash - kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleEndTransaction

2018-11-14 Thread JIRA
Sebastian Puzoń created KAFKA-7625:
--

 Summary: Kafka Broker node JVM crash - 
kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleEndTransaction
 Key: KAFKA-7625
 URL: https://issues.apache.org/jira/browse/KAFKA-7625
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 2.0.0
 Environment:  environment:os.version=2.6.32-754.2.1.el6.x86_64 
java.version=1.8.0_92 
environment:zookeeper.version=3.4.13-2d71af4dbe22557fda74f9a9b4309b15a7487f03, 
built on 06/29/2018 00:39 GMT (org.apache.zookeeper.ZooKeeper)
Kafka commitId : 3402a8361b734732 
Reporter: Sebastian Puzoń


I observe broker node JVM crashes with same problematic frame:
{code:java}
#
# A fatal error has been detected by the Java Runtime Environment:
#
#  SIGSEGV (0xb) at pc=0x7ff4a2588261, pid=24681, tid=0x7ff3b9bb1700
#
# JRE version: Java(TM) SE Runtime Environment (8.0_92-b14) (build 1.8.0_92-b14)
# Java VM: Java HotSpot(TM) 64-Bit Server VM (25.92-b14 mixed mode linux-amd64 
compressed oops)
# Problematic frame:
# J 9736 C1 
kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleEndTransaction$7(Lkafka/coordinator/transaction/TransactionCoordinator;Ljava/lang/String;JSLorg/apache/kafka/common/requests/TransactionResult;Lkafka/coordinator/transaction/TransactionMetadata;)Lscala/util/Either;
 (518 bytes) @ 0x7ff4a2588261 [0x7ff4a25871a0+0x10c1]
#
# Failed to write core dump. Core dumps have been disabled. To enable core 
dumping, try "ulimit -c unlimited" before starting Java again
#
# If you would like to submit a bug report, please visit:
#   http://bugreport.java.com/bugreport/crash.jsp
#

---  T H R E A D  ---

Current thread (0x7ff4b356f800):  JavaThread "kafka-request-handler-3" 
daemon [_thread_in_Java, id=24781, stack(0x7ff3b9ab1000,0x7ff3b9bb2000)]
{code}
{code:java}
Stack: [0x7ff3b9ab1000,0x7ff3b9bb2000],  sp=0x7ff3b9bafca0,  free 
space=1019k
Native frames: (J=compiled Java code, j=interpreted, Vv=VM code, C=native code)
J 9736 C1 
kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleEndTransaction$7(Lkafka/coordinator/transaction/TransactionCoordinator;Ljava/lang/String;JSLorg/apache/kafka/common/requests/TransactionResult;Lkafka/coordinator/transaction/TransactionMetadata;)Lscala/util/Either;
 (518 bytes) @ 0x7ff4a2588261 [0x7ff4a25871a0+0x10c1]
J 10456 C2 
kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleEndTransaction$5(Lkafka/coordinator/transaction/TransactionCoordinator;Ljava/lang/String;JSLorg/apache/kafka/common/requests/TransactionResult;ILscala/Option;)Lscala/util/Either;
 (192 bytes) @ 0x7ff4a1d413f0 [0x7ff4a1d41240+0x1b0]
J 9303 C1 
kafka.coordinator.transaction.TransactionCoordinator$$Lambda$1107.apply(Ljava/lang/Object;)Ljava/lang/Object;
 (32 bytes) @ 0x7ff4a245f55c [0x7ff4a245f3c0+0x19c]
J 10018 C2 
scala.util.Either$RightProjection.flatMap(Lscala/Function1;)Lscala/util/Either; 
(43 bytes) @ 0x7ff4a1f242c4 [0x7ff4a1f24260+0x64]
J 9644 C1 
kafka.coordinator.transaction.TransactionCoordinator.sendTxnMarkersCallback$1(Lorg/apache/kafka/common/protocol/Errors;Ljava/lang/String;JSLorg/apache/kafka/common/requests/TransactionResult;Lscala/Function1;ILkafka/coordinator/transaction/TxnTransitMetadata;)V
 (251 bytes) @ 0x7ff4a1ef6254 [0x7ff4a1ef5120+0x1134]
J 9302 C1 
kafka.coordinator.transaction.TransactionCoordinator$$Lambda$1106.apply(Ljava/lang/Object;)Ljava/lang/Object;
 (40 bytes) @ 0x7ff4a24747ec [0x7ff4a24745a0+0x24c]
J 10125 C2 
kafka.coordinator.transaction.TransactionStateManager.updateCacheCallback$1(Lscala/collection/Map;Ljava/lang/String;ILkafka/coordinator/transaction/TxnTransitMetadata;Lscala/Function1;Lscala/Function1;Lorg/apache/kafka/common/TopicPartition;)V
 (892 bytes) @ 0x7ff4a27045ec [0x7ff4a2703c60+0x98c]
J 10051 C2 
kafka.coordinator.transaction.TransactionStateManager$$Lambda$814.apply(Ljava/lang/Object;)Ljava/lang/Object;
 (36 bytes) @ 0x7ff4a1a9cd08 [0x7ff4a1a9cc80+0x88]
J 9349 C2 kafka.server.DelayedProduce.tryComplete()Z (52 bytes) @ 
0x7ff4a1e46e5c [0x7ff4a1e46980+0x4dc]
J 10111 C2 
kafka.server.DelayedOperationPurgatory.tryCompleteElseWatch(Lkafka/server/DelayedOperation;Lscala/collection/Seq;)Z
 (147 bytes) @ 0x7ff4a1c6e000 [0x7ff4a1c6df20+0xe0]
J 10448 C2 
kafka.server.ReplicaManager.appendRecords(JSZZLscala/collection/Map;Lscala/Function1;Lscala/Option;Lscala/Function1;)V
 (237 bytes) @ 0x7ff4a2340b6c [0x7ff4a233f3e0+0x178c]
J 10050 C2 

[jira] [Created] (KAFKA-7624) HOW TO FILETER NESTED THROUGH KAFKA CONNECT

2018-11-14 Thread Chenchu Lakshman kumar (JIRA)
Chenchu Lakshman kumar created KAFKA-7624:
-

 Summary: HOW TO FILETER NESTED THROUGH KAFKA CONNECT
 Key: KAFKA-7624
 URL: https://issues.apache.org/jira/browse/KAFKA-7624
 Project: Kafka
  Issue Type: Test
  Components: KafkaConnect
Reporter: Chenchu Lakshman kumar


Hi Team

please help

 

{"messageID":"ID:COPTW_B_SIT.1D815BC7447C2A20:3880","messageType":"text","timestamp":1542191173487,"deliveryMode":2,"correlationID":null,"replyTo":null,"destination":\{"destinationType":"queue","name":"test.queue"},"redelivered":false,"type":null,"expiration":0,"priority":4,"properties":{},"bytes":null,"map":null,{color:#f6c342}"text":"helo{color}"}

 

 

we need to get only "text":"helo" message while consuming , so that i can send 
direclty to mango DB.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-7623) SMT STRUCT to MASK or FILTER

2018-11-14 Thread Chenchu Lakshman kumar (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7623?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chenchu Lakshman kumar resolved KAFKA-7623.
---
Resolution: Fixed

> SMT STRUCT to MASK or FILTER
> 
>
> Key: KAFKA-7623
> URL: https://issues.apache.org/jira/browse/KAFKA-7623
> Project: Kafka
>  Issue Type: Test
>  Components: KafkaConnect
>Reporter: Chenchu Lakshman kumar
>Priority: Major
>
> {
>  "schema": {
>  "type": "struct",
>  "fields": [{
>  "type": "string",
>  "optional": false,
>  "doc": "This field stores the value of `Message.getJMSMessageID() 
> `_.",
>  "field": "messageID"
>  }, {
>  "type": "string",
>  "optional": false,
>  "doc": "This field stores the type of message that was received. This 
> corresponds to the subinterfaces of `Message 
> `_. `BytesMessage 
> `_ = 
> `bytes`, `MapMessage 
> `_ = `map`, 
> `ObjectMessage 
> `_ = 
> `object`, `StreamMessage 
> `_ = 
> `stream` and `TextMessage 
> `_ = `text`. 
> The corresponding field will be populated with the values from the respective 
> Message subinterface.",
>  "field": "messageType"
>  }, {
>  "type": "int64",
>  "optional": false,
>  "doc": "Data from the `getJMSTimestamp() 
> `_
>  method.",
>  "field": "timestamp"
>  }, {
>  "type": "int32",
>  "optional": false,
>  "doc": "This field stores the value of `Message.getJMSDeliveryMode() 
> `_.",
>  "field": "deliveryMode"
>  }, {
>  "type": "string",
>  "optional": true,
>  "doc": "This field stores the value of `Message.getJMSCorrelationID() 
> `_.",
>  "field": "correlationID"
>  }, {
>  "type": "struct",
>  "fields": [{
>  "type": "string",
>  "optional": false,
>  "doc": "The type of JMS Destination, and either ``queue`` or ``topic``.",
>  "field": "destinationType"
>  }, {
>  "type": "string",
>  "optional": false,
>  "doc": "The name of the destination. This will be the value of 
> `Queue.getQueueName() 
> `_ 
> or `Topic.getTopicName() 
> `_.",
>  "field": "name"
>  }],
>  "optional": true,
>  "name": "io.confluent.connect.jms.Destination",
>  "doc": "This schema is used to represent a JMS Destination, and is either 
> `queue `_ or `topic 
> `_.",
>  "field": "replyTo"
>  }, {
>  "type": "struct",
>  "fields": [{
>  "type": "string",
>  "optional": false,
>  "doc": "The type of JMS Destination, and either ``queue`` or ``topic``.",
>  "field": "destinationType"
>  }, {
>  "type": "string",
>  "optional": false,
>  "doc": "The name of the destination. This will be the value of 
> `Queue.getQueueName() 
> `_ 
> or `Topic.getTopicName() 
> `_.",
>  "field": "name"
>  }],
>  "optional": true,
>  "name": "io.confluent.connect.jms.Destination",
>  "doc": "This schema is used to represent a JMS Destination, and is either 
> `queue `_ or `topic 
> `_.",
>  "field": "destination"
>  }, {
>  "type": "boolean",
>  "optional": false,
>  "doc": "This field stores the value of `Message.getJMSRedelivered() 
> `_.",
>  "field": "redelivered"
>  }, {
>  "type": "string",
>  "optional": true,
>  "doc": "This field stores the value of `Message.getJMSType() 
> `_.",
>  "field": "type"
>  }, {
>  "type": "int64",
>  "optional": false,
>  "doc": "This field stores the value of `Message.getJMSExpiration() 
> `_.",
>  "field": "expiration"
>  }, {
>  "type": "int32",
>  "optional": false,
>  "doc": "This field stores the value of `Message.getJMSPriority() 
> 

[jira] [Commented] (KAFKA-6812) Async ConsoleProducer exits with 0 status even after data loss

2018-11-14 Thread Stanislav Kozlovski (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6812?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16686259#comment-16686259
 ] 

Stanislav Kozlovski commented on KAFKA-6812:


[~kamalkang] I don't have access to that document. I just requested some.

For starting a discussion, you simply create an e-mail titled "[DISCUSSION] 
KIP- " and link the KIP. You send this e-mail 
to "d...@kafka.apache.org"
Here is the guide: 
[https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals] 
(under Process)

> Async ConsoleProducer exits with 0 status even after data loss
> --
>
> Key: KAFKA-6812
> URL: https://issues.apache.org/jira/browse/KAFKA-6812
> Project: Kafka
>  Issue Type: Bug
>  Components: tools
>Affects Versions: 1.1.0
>Reporter: Andras Beni
>Assignee: Kamal Kang
>Priority: Minor
>
> When {{ConsoleProducer}} is run without {{--sync}} flag and one of the 
> batches times out, {{ErrorLoggingCallback}} logs the error:
> {code:java}
>  18/04/21 04:23:01 WARN clients.NetworkClient: [Producer 
> clientId=console-producer] Connection to node 10 could not be established. 
> Broker may not be available.
>  18/04/21 04:23:02 ERROR internals.ErrorLoggingCallback: Error when sending 
> message to topic my-topic with key: null, value: 8 bytes with error:
>  org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for 
> my-topic-0: 1530 ms has passed since batch creation plus linger time{code}
>  However, the tool exits with status code 0. 
>  In my opinion the tool should indicate in the exit status that there was 
> data lost. Maybe it's reasonable to exit after the first error.
>   



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)