[jira] [Created] (KAFKA-7235) Use brokerZkNodeVersion to prevent broker from processing outdated controller request

2018-08-01 Thread Dong Lin (JIRA)
Dong Lin created KAFKA-7235:
---

 Summary: Use brokerZkNodeVersion to prevent broker from processing 
outdated controller request
 Key: KAFKA-7235
 URL: https://issues.apache.org/jira/browse/KAFKA-7235
 Project: Kafka
  Issue Type: Improvement
Reporter: Dong Lin
Assignee: Dong Lin


Currently a broker can process controller requests that are sent before the 
broker is restarted. This could cause a few problems. Here is one example:

Let's assume partitions p1 and p2 exists on broker1.

1) Controller generates LeaderAndIsrRequest with p1 to be sent to broker1.

2) Before controller sends the request, broker1 is quickly restarted.

3) The LeaderAndIsrRequest with p1 is delivered to broker1.

4) After processing the first LeaderAndIsrRequest, broker1 starts to checkpoint 
high watermark for all partitions that it owns. Thus it may overwrite high 
watermark checkpoint file with only the hw for partition p1. The hw for 
partition p2 is now lost, which could be a problem.

In general, the correctness of broker logic currently relies on a few 
assumption, e.g. the first LeaderAndIsrRequest received by broker should 
contain all partitions hosted by the broker, which could break if broker can 
receive controller requests that were generated before it restarts.

 

One reasonable solution to the problem is to include the 
expectedBrokeNodeZkVersion in the controller requests. Broker should remember 
the broker znode zkVersion after it registers itself in the zookeeper. Then 
broker can reject those controller requests whose expectedBrokeNodeZkVersion is 
different from its broker znode zkVersion.

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7235) Use brokerZkNodeVersion to prevent broker from processing outdated controller request

2018-08-01 Thread Dong Lin (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7235?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin updated KAFKA-7235:

Description: 
Currently a broker can process controller requests that are sent before the 
broker is restarted. This could cause a few problems. Here is one example:

Let's assume partitions p1 and p2 exists on broker1.

1) Controller generates LeaderAndIsrRequest with p1 to be sent to broker1.

2) Before controller sends the request, broker1 is quickly restarted.

3) The LeaderAndIsrRequest with p1 is delivered to broker1.

4) After processing the first LeaderAndIsrRequest, broker1 starts to checkpoint 
high watermark for all partitions that it owns. Thus it may overwrite high 
watermark checkpoint file with only the hw for partition p1. The hw for 
partition p2 is now lost, which could be a problem.

In general, the correctness of broker logic currently relies on a few 
assumption, e.g. the first LeaderAndIsrRequest received by broker should 
contain all partitions hosted by the broker, which could break if broker can 
receive controller requests that were generated before it restarts. 

One reasonable solution to the problem is to include the 
expectedBrokeNodeZkVersion in the controller requests. Broker should remember 
the broker znode zkVersion after it registers itself in the zookeeper. Then 
broker can reject those controller requests whose expectedBrokeNodeZkVersion is 
different from its broker znode zkVersion.

 

  was:
Currently a broker can process controller requests that are sent before the 
broker is restarted. This could cause a few problems. Here is one example:

Let's assume partitions p1 and p2 exists on broker1.

1) Controller generates LeaderAndIsrRequest with p1 to be sent to broker1.

2) Before controller sends the request, broker1 is quickly restarted.

3) The LeaderAndIsrRequest with p1 is delivered to broker1.

4) After processing the first LeaderAndIsrRequest, broker1 starts to checkpoint 
high watermark for all partitions that it owns. Thus it may overwrite high 
watermark checkpoint file with only the hw for partition p1. The hw for 
partition p2 is now lost, which could be a problem.

In general, the correctness of broker logic currently relies on a few 
assumption, e.g. the first LeaderAndIsrRequest received by broker should 
contain all partitions hosted by the broker, which could break if broker can 
receive controller requests that were generated before it restarts.

 

One reasonable solution to the problem is to include the 
expectedBrokeNodeZkVersion in the controller requests. Broker should remember 
the broker znode zkVersion after it registers itself in the zookeeper. Then 
broker can reject those controller requests whose expectedBrokeNodeZkVersion is 
different from its broker znode zkVersion.

 


> Use brokerZkNodeVersion to prevent broker from processing outdated controller 
> request
> -
>
> Key: KAFKA-7235
> URL: https://issues.apache.org/jira/browse/KAFKA-7235
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Dong Lin
>Assignee: Dong Lin
>Priority: Major
>
> Currently a broker can process controller requests that are sent before the 
> broker is restarted. This could cause a few problems. Here is one example:
> Let's assume partitions p1 and p2 exists on broker1.
> 1) Controller generates LeaderAndIsrRequest with p1 to be sent to broker1.
> 2) Before controller sends the request, broker1 is quickly restarted.
> 3) The LeaderAndIsrRequest with p1 is delivered to broker1.
> 4) After processing the first LeaderAndIsrRequest, broker1 starts to 
> checkpoint high watermark for all partitions that it owns. Thus it may 
> overwrite high watermark checkpoint file with only the hw for partition p1. 
> The hw for partition p2 is now lost, which could be a problem.
> In general, the correctness of broker logic currently relies on a few 
> assumption, e.g. the first LeaderAndIsrRequest received by broker should 
> contain all partitions hosted by the broker, which could break if broker can 
> receive controller requests that were generated before it restarts. 
> One reasonable solution to the problem is to include the 
> expectedBrokeNodeZkVersion in the controller requests. Broker should remember 
> the broker znode zkVersion after it registers itself in the zookeeper. Then 
> broker can reject those controller requests whose expectedBrokeNodeZkVersion 
> is different from its broker znode zkVersion.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7234) Allow auto leader rebalance during partition reassignment

2018-08-01 Thread Jason Gustafson (JIRA)
Jason Gustafson created KAFKA-7234:
--

 Summary: Allow auto leader rebalance during partition reassignment
 Key: KAFKA-7234
 URL: https://issues.apache.org/jira/browse/KAFKA-7234
 Project: Kafka
  Issue Type: Improvement
Reporter: Jason Gustafson


We currently skip auto leader balancing while a reassignment is in progress. 
However, when bringing new nodes into the cluster, you actually want the 
leaders to rebalance as soon as possible so that the new nodes can begin doing 
work. In general, having better balance in the cluster seems like it would be 
better for the reassignment, but perhaps there is a good reason to skip it?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7233) InMemoryKeyValueStore is not thread-safe for Interactive Queries

2018-08-01 Thread Hashan Gayasri Udugahapattuwa (JIRA)
Hashan Gayasri Udugahapattuwa created KAFKA-7233:


 Summary: InMemoryKeyValueStore is not thread-safe for Interactive 
Queries
 Key: KAFKA-7233
 URL: https://issues.apache.org/jira/browse/KAFKA-7233
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 2.0.0, 1.1.0
 Environment: Fedora 27
Reporter: Hashan Gayasri Udugahapattuwa


When querying externally from via the 
[ReadOnlyKeyValueStore|https://github.com/apache/kafka/blob/1.0/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlyKeyValueStore.java]
 API (Interactive Queries), the InMemoryKeyValueStore does not protect its 
internal map from concurrent reads and writes which may produce incorrect 
results to the  ReadOnlyKeyValueStore API. Note that reads(query thread) and 
writes(stream thread) can happen concurrently.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7232) Allow kafka-topics.sh to take brokerid as parameter to show partitions associated with it

2018-08-01 Thread Ratish Ravindran (JIRA)
Ratish Ravindran created KAFKA-7232:
---

 Summary: Allow kafka-topics.sh to take brokerid as parameter to 
show partitions associated with it
 Key: KAFKA-7232
 URL: https://issues.apache.org/jira/browse/KAFKA-7232
 Project: Kafka
  Issue Type: Improvement
Reporter: Ratish Ravindran


Currently with kafka-topics.sh if we want to get the list of partitions 
associated with a specific broker irrespective of whether it is leader or 
replica, we pipe the output and then do grep on it.

I am proposing the change to add option in TopicCommand.scala to pass the 
broker id.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7225) Kafka Connect ConfigProvider not invoked before validation

2018-08-01 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7225?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16566297#comment-16566297
 ] 

ASF GitHub Bot commented on KAFKA-7225:
---

rayokota opened a new pull request #5445: KAFKA-7225 - Pretransform validated 
props
URL: https://github.com/apache/kafka/pull/5445
 
 
   If a property requires validation, it should be pretransformed if it is a 
variable reference, in order to have a value that will properly pass the 
validation.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Kafka Connect ConfigProvider not invoked before validation
> --
>
> Key: KAFKA-7225
> URL: https://issues.apache.org/jira/browse/KAFKA-7225
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.0.0
>Reporter: Nacho Munoz
>Assignee: Robert Yokota
>Priority: Minor
>
> When trying to register a JDBC connector with externalised secrets (e.g. 
> connection.password) the validation fails and the endpoint returns a 500. I 
> think that the problem is that the config transformer is not being invoked 
> before the validation so trying to exercise the credentials against the 
> database fails. I have checked that publishing the connector configuration 
> directly to the connect-config topic to skip the validation and restarting 
> the server is enough to get the connector working so that confirms that we 
> are just missing to call config transformer before validating the connector. 
> Please let me know if you need further information.
> I'm happy to open a PR to address this issue given that I think that this is 
> easy enough to fix for a new contributor to the project. So please feel free 
> to assign the resolution of the bug to me.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-5863) Potential null dereference in DistributedHerder#reconfigureConnector()

2018-08-01 Thread Ted Yu (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-5863?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu updated KAFKA-5863:
--
Description: 
Here is the call chain:

{code}
RestServer.httpRequest(reconfigUrl, "POST", 
taskProps, null);
{code}
In httpRequest():
{code}
} else if (responseCode >= 200 && responseCode < 300) {
InputStream is = connection.getInputStream();
T result = JSON_SERDE.readValue(is, responseFormat);
{code}
For readValue():
{code}
public  T readValue(InputStream src, TypeReference valueTypeRef)
throws IOException, JsonParseException, JsonMappingException
{
return (T) _readMapAndClose(_jsonFactory.createParser(src), 
_typeFactory.constructType(valueTypeRef));
{code}
Then there would be NPE in constructType():
{code}
public JavaType constructType(TypeReference typeRef)
{
// 19-Oct-2015, tatu: Simpler variant like so should work
return _fromAny(null, typeRef.getType(), EMPTY_BINDINGS);
{code}

  was:
Here is the call chain:
{code}
RestServer.httpRequest(reconfigUrl, "POST", 
taskProps, null);
{code}
In httpRequest():
{code}
} else if (responseCode >= 200 && responseCode < 300) {
InputStream is = connection.getInputStream();
T result = JSON_SERDE.readValue(is, responseFormat);
{code}
For readValue():
{code}
public  T readValue(InputStream src, TypeReference valueTypeRef)
throws IOException, JsonParseException, JsonMappingException
{
return (T) _readMapAndClose(_jsonFactory.createParser(src), 
_typeFactory.constructType(valueTypeRef));
{code}
Then there would be NPE in constructType():
{code}
public JavaType constructType(TypeReference typeRef)
{
// 19-Oct-2015, tatu: Simpler variant like so should work
return _fromAny(null, typeRef.getType(), EMPTY_BINDINGS);
{code}


> Potential null dereference in DistributedHerder#reconfigureConnector()
> --
>
> Key: KAFKA-5863
> URL: https://issues.apache.org/jira/browse/KAFKA-5863
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Ted Yu
>Priority: Minor
>
> Here is the call chain:
> {code}
> RestServer.httpRequest(reconfigUrl, "POST", 
> taskProps, null);
> {code}
> In httpRequest():
> {code}
> } else if (responseCode >= 200 && responseCode < 300) {
> InputStream is = connection.getInputStream();
> T result = JSON_SERDE.readValue(is, responseFormat);
> {code}
> For readValue():
> {code}
> public  T readValue(InputStream src, TypeReference valueTypeRef)
> throws IOException, JsonParseException, JsonMappingException
> {
> return (T) _readMapAndClose(_jsonFactory.createParser(src), 
> _typeFactory.constructType(valueTypeRef));
> {code}
> Then there would be NPE in constructType():
> {code}
> public JavaType constructType(TypeReference typeRef)
> {
> // 19-Oct-2015, tatu: Simpler variant like so should work
> return _fromAny(null, typeRef.getType(), EMPTY_BINDINGS);
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-6303) Potential lack of synchronization in NioEchoServer#AcceptorThread

2018-08-01 Thread Ted Yu (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6303?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16333251#comment-16333251
 ] 

Ted Yu edited comment on KAFKA-6303 at 8/2/18 1:48 AM:
---

+1 from me .


was (Author: yuzhih...@gmail.com):
+1 from me

> Potential lack of synchronization in NioEchoServer#AcceptorThread
> -
>
> Key: KAFKA-6303
> URL: https://issues.apache.org/jira/browse/KAFKA-6303
> Project: Kafka
>  Issue Type: Bug
>  Components: network
>Reporter: Ted Yu
>Assignee: siva santhalingam
>Priority: Minor
>
> In the run() method:
> {code}
> SocketChannel socketChannel = 
> ((ServerSocketChannel) key.channel()).accept();
> socketChannel.configureBlocking(false);
> newChannels.add(socketChannel);
> {code}
> Modification to newChannels should be protected by synchronized block.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7049) InternalTopicIntegrationTest sometimes fails

2018-08-01 Thread Ted Yu (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7049?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16566242#comment-16566242
 ] 

Ted Yu commented on KAFKA-7049:
---

Haven't seen this error for a little while.

> InternalTopicIntegrationTest sometimes fails
> 
>
> Key: KAFKA-7049
> URL: https://issues.apache.org/jira/browse/KAFKA-7049
> Project: Kafka
>  Issue Type: Test
>  Components: streams, unit tests
>Reporter: Ted Yu
>Priority: Minor
>
> Saw the following based on commit fa1d0383902260576132e09bdf9efcc2784b55b4 :
> {code}
> org.apache.kafka.streams.integration.InternalTopicIntegrationTest > 
> shouldCompactTopicsForKeyValueStoreChangelogs FAILED
> java.lang.RuntimeException: Timed out waiting for completion. 
> lagMetrics=[0/2] totalLag=[0.0]
> at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForCompletion(IntegrationTestUtils.java:227)
> at 
> org.apache.kafka.streams.integration.InternalTopicIntegrationTest.shouldCompactTopicsForKeyValueStoreChangelogs(InternalTopicIntegrationTest.java:164)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7175) Make version checking logic more flexible in streams_upgrade_test.py

2018-08-01 Thread Ted Yu (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7175?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16566241#comment-16566241
 ] 

Ted Yu commented on KAFKA-7175:
---

What Ray said sounds feasible.

> Make version checking logic more flexible in streams_upgrade_test.py
> 
>
> Key: KAFKA-7175
> URL: https://issues.apache.org/jira/browse/KAFKA-7175
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Ted Yu
>Priority: Major
>
> During debugging of system test failure for KAFKA-5037, it was re-discovered 
> that the version numbers inside version probing related messages are hard 
> coded in streams_upgrade_test.py
> This is in-flexible.
> We should correlate latest version from Java class with the expected version 
> numbers.
> Matthias made the following suggestion:
> We should also make this more generic and test upgrades from 3 -> 4, 3 -> 5 
> and 4 -> 5. The current code does only go from latest version to future 
> version.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-7225) Kafka Connect ConfigProvider not invoked before validation

2018-08-01 Thread Robert Yokota (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7225?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Robert Yokota reassigned KAFKA-7225:


Assignee: Robert Yokota

> Kafka Connect ConfigProvider not invoked before validation
> --
>
> Key: KAFKA-7225
> URL: https://issues.apache.org/jira/browse/KAFKA-7225
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.0.0
>Reporter: Nacho Munoz
>Assignee: Robert Yokota
>Priority: Minor
>
> When trying to register a JDBC connector with externalised secrets (e.g. 
> connection.password) the validation fails and the endpoint returns a 500. I 
> think that the problem is that the config transformer is not being invoked 
> before the validation so trying to exercise the credentials against the 
> database fails. I have checked that publishing the connector configuration 
> directly to the connect-config topic to skip the validation and restarting 
> the server is enough to get the connector working so that confirms that we 
> are just missing to call config transformer before validating the connector. 
> Please let me know if you need further information.
> I'm happy to open a PR to address this issue given that I think that this is 
> easy enough to fix for a new contributor to the project. So please feel free 
> to assign the resolution of the bug to me.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7231) NetworkClient.newClientRequest() ignores custom request timeout in favor of the default

2018-08-01 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16566168#comment-16566168
 ] 

ASF GitHub Bot commented on KAFKA-7231:
---

hachikuji opened a new pull request #5444: KAFKA-7231; Ensure NetworkClient 
uses overridden request timeout
URL: https://github.com/apache/kafka/pull/5444
 
 
   Fixed incorrect use of default timeout instead of the argument explicitly 
passed to `newClientRequest`.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> NetworkClient.newClientRequest() ignores custom request timeout in favor of 
> the default
> ---
>
> Key: KAFKA-7231
> URL: https://issues.apache.org/jira/browse/KAFKA-7231
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.0.0
>Reporter: Ron Dagostino
>Assignee: Jason Gustafson
>Priority: Minor
>
> The below code in {{org.apache.kafka.clients.KafkaClient}} is not passing in 
> the provided {{requestTimeoutMs}} -- it is ignoring it in favor of the 
> {{defaultRequestTimeoutMs}} value.
> {code:java}
> @Override
> public ClientRequest newClientRequest(String nodeId,
>   AbstractRequest.Builder 
> requestBuilder,
>   long createdTimeMs,
>   boolean expectResponse,
>   int requestTimeoutMs,
>   RequestCompletionHandler callback) {
>  return new ClientRequest(nodeId, requestBuilder, correlation++, 
> clientId, createdTimeMs, expectResponse,
> defaultRequestTimeoutMs, callback);
> }
> {code}
> This is an easy fix, but the impact of fixing it is difficult to quantify.  
> Clients that set a custom timeout are getting the default timeout -- fixing 
> this will suddenly cause the custom timeout to take effect.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-4701) Allow kafka brokers to dynamically reload truststore without restarting.

2018-08-01 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-4701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16566017#comment-16566017
 ] 

ASF GitHub Bot commented on KAFKA-4701:
---

allenxiang closed pull request #2446: KAFKA-4701: Add dynamic truststore 
manager.
URL: https://github.com/apache/kafka/pull/2446
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/clients/src/main/java/org/apache/kafka/common/security/ssl/ReloadableX509TrustManager.java
 
b/clients/src/main/java/org/apache/kafka/common/security/ssl/ReloadableX509TrustManager.java
new file mode 100644
index 000..bb33b0fbe63
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/common/security/ssl/ReloadableX509TrustManager.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.security.ssl;
+
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.net.ssl.SSLEngine;
+import javax.net.ssl.TrustManager;
+import javax.net.ssl.TrustManagerFactory;
+import javax.net.ssl.X509ExtendedTrustManager;
+import javax.net.ssl.X509TrustManager;
+import java.io.IOException;
+import java.net.Socket;
+import java.security.GeneralSecurityException;
+import java.security.KeyStore;
+import java.security.NoSuchAlgorithmException;
+import java.security.cert.CertificateException;
+import java.security.cert.X509Certificate;
+import java.util.Enumeration;
+
+class ReloadableX509TrustManager extends X509ExtendedTrustManager implements 
X509TrustManager {
+private static final Logger log = 
LoggerFactory.getLogger(ReloadableX509TrustManager.class);
+
+private final SecurityStore trustStore;
+private final TrustManagerFactory tmf;
+private X509TrustManager trustManager;
+private long lastReload = 0L;
+
+private KeyStore trustKeyStore;
+
+public ReloadableX509TrustManager(SecurityStore trustStore, 
TrustManagerFactory tmf) {
+this.trustStore = trustStore;
+this.tmf = tmf;
+}
+
+public KeyStore getTrustKeyStore() {
+return trustKeyStore;
+}
+
+@Override
+public void checkClientTrusted(X509Certificate[] chain, String authType) 
throws CertificateException {
+reloadTrustManager();
+if (trustManager == null) {
+throw new CertificateException("Trust manager not initialized.");
+}
+trustManager.checkClientTrusted(chain, authType);
+}
+
+@Override
+public void checkServerTrusted(X509Certificate[] chain, String authType) 
throws CertificateException {
+if (trustManager == null) {
+reloadTrustManager();
+}
+if (trustManager == null) {
+throw new CertificateException("Trust manager not initialized.");
+}
+trustManager.checkServerTrusted(chain, authType);
+}
+
+@Override
+public X509Certificate[] getAcceptedIssuers() {
+reloadTrustManager();
+
+if (trustManager == null) {
+return new X509Certificate[0];
+}
+return trustManager.getAcceptedIssuers();
+}
+
+@Override
+public void checkClientTrusted(X509Certificate[] x509Certificates, String 
s, Socket socket) throws CertificateException {
+reloadTrustManager();
+if (trustManager == null) {
+throw new CertificateException("Trust manager not initialized.");
+}
+((X509ExtendedTrustManager) 
trustManager).checkClientTrusted(x509Certificates, s, socket);
+}
+
+@Override
+public void checkServerTrusted(X509Certificate[] x509Certificates, String 
s, Socket socket) throws CertificateException {
+if (trustManager == null) {
+reloadTrustManager();
+}
+if (trustManager == null) {
+throw new CertificateException("Trust manager not initialized.");
+}
+((X509ExtendedTrustManager) 
trustManager).checkServerTrusted(x509Certificates, s, s

[jira] [Commented] (KAFKA-7231) NetworkClient.newClientRequest() ignores custom request timeout in favor of the default

2018-08-01 Thread Jason Gustafson (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16565964#comment-16565964
 ] 

Jason Gustafson commented on KAFKA-7231:


Ouch. I think the only place we use this is in the consumer when we override 
the join group timeout. Using the default in this case could cause problems 
when a consumer is first joining the group. In general retrying the JoinGroup 
is safe, but it causes problems when a member first joins the group since 
retries cause new members to be added.

> NetworkClient.newClientRequest() ignores custom request timeout in favor of 
> the default
> ---
>
> Key: KAFKA-7231
> URL: https://issues.apache.org/jira/browse/KAFKA-7231
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.0.0
>Reporter: Ron Dagostino
>Assignee: Jason Gustafson
>Priority: Minor
>
> The below code in {{org.apache.kafka.clients.KafkaClient}} is not passing in 
> the provided {{requestTimeoutMs}} -- it is ignoring it in favor of the 
> {{defaultRequestTimeoutMs}} value.
> {code:java}
> @Override
> public ClientRequest newClientRequest(String nodeId,
>   AbstractRequest.Builder 
> requestBuilder,
>   long createdTimeMs,
>   boolean expectResponse,
>   int requestTimeoutMs,
>   RequestCompletionHandler callback) {
>  return new ClientRequest(nodeId, requestBuilder, correlation++, 
> clientId, createdTimeMs, expectResponse,
> defaultRequestTimeoutMs, callback);
> }
> {code}
> This is an easy fix, but the impact of fixing it is difficult to quantify.  
> Clients that set a custom timeout are getting the default timeout -- fixing 
> this will suddenly cause the custom timeout to take effect.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-7231) NetworkClient.newClientRequest() ignores custom request timeout in favor of the default

2018-08-01 Thread Jason Gustafson (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7231?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson reassigned KAFKA-7231:
--

Assignee: Jason Gustafson

> NetworkClient.newClientRequest() ignores custom request timeout in favor of 
> the default
> ---
>
> Key: KAFKA-7231
> URL: https://issues.apache.org/jira/browse/KAFKA-7231
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.0.0
>Reporter: Ron Dagostino
>Assignee: Jason Gustafson
>Priority: Minor
>
> The below code in {{org.apache.kafka.clients.KafkaClient}} is not passing in 
> the provided {{requestTimeoutMs}} -- it is ignoring it in favor of the 
> {{defaultRequestTimeoutMs}} value.
> {code:java}
> @Override
> public ClientRequest newClientRequest(String nodeId,
>   AbstractRequest.Builder 
> requestBuilder,
>   long createdTimeMs,
>   boolean expectResponse,
>   int requestTimeoutMs,
>   RequestCompletionHandler callback) {
>  return new ClientRequest(nodeId, requestBuilder, correlation++, 
> clientId, createdTimeMs, expectResponse,
> defaultRequestTimeoutMs, callback);
> }
> {code}
> This is an easy fix, but the impact of fixing it is difficult to quantify.  
> Clients that set a custom timeout are getting the default timeout -- fixing 
> this will suddenly cause the custom timeout to take effect.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7228) DeadLetterQueue throws a NullPointerException

2018-08-01 Thread Arjun Satish (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7228?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Arjun Satish updated KAFKA-7228:

Fix Version/s: 2.1.0
   2.0.1

> DeadLetterQueue throws a NullPointerException
> -
>
> Key: KAFKA-7228
> URL: https://issues.apache.org/jira/browse/KAFKA-7228
> Project: Kafka
>  Issue Type: Task
>  Components: KafkaConnect
>Reporter: Arjun Satish
>Assignee: Arjun Satish
>Priority: Major
> Fix For: 2.0.1, 2.1.0
>
>
> Using the dead letter queue results in a NPE: 
> {code:java}
> [2018-08-01 07:41:08,907] ERROR WorkerSinkTask{id=s3-sink-yanivr-2-4} Task 
> threw an uncaught and unrecoverable exception 
> (org.apache.kafka.connect.runtime.WorkerTask)
> java.lang.NullPointerException
> at 
> org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter.report(DeadLetterQueueReporter.java:124)
> at 
> org.apache.kafka.connect.runtime.errors.ProcessingContext.report(ProcessingContext.java:137)
> at 
> org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:108)
> at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:513)
> at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:490)
> at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:321)
> at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:225)
> at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:193)
> at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
> at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> [2018-08-01 07:41:08,908] ERROR WorkerSinkTask{id=s3-sink-yanivr-2-4} Task is 
> being killed and will not recover until manually restarted 
> (org.apache.kafka.connect.runtime.WorkerTask)
> {code}
> DLQ reporter does not get a {{errorHandlingMetrics}} object when initialized 
> through the WorkerSinkTask.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7231) NetworkClient.newClientRequest() ignores custom request timeout in favor of the default

2018-08-01 Thread Ron Dagostino (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7231?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ron Dagostino updated KAFKA-7231:
-
Description: 
The below code in {{org.apache.kafka.clients.KafkaClient}} is not passing in 
the provided {{requestTimeoutMs}} -- it is ignoring it in favor of the 
{{defaultRequestTimeoutMs}} value.
{code:java}
@Override
public ClientRequest newClientRequest(String nodeId,
  AbstractRequest.Builder requestBuilder,
  long createdTimeMs,
  boolean expectResponse,
  int requestTimeoutMs,
  RequestCompletionHandler callback) {
 return new ClientRequest(nodeId, requestBuilder, correlation++, clientId, 
createdTimeMs, expectResponse,
defaultRequestTimeoutMs, callback);
}
{code}

This is an easy fix, but the impact of fixing it is difficult to quantify.  
Clients that set a custom timeout are getting the default timeout -- fixing 
this will suddenly cause the custom timeout to take effect.


  was:
The below code in {{org.apache.kafka.clients.KafkaClient}} is not passing in 
the provided {{requestTimeoutMs}} -- it is ignoring it in favor of the 
{{defaultRequestTimeoutMs}} value.
{code:java}
@Override
public ClientRequest newClientRequest(String nodeId,
  AbstractRequest.Builder requestBuilder,
  long createdTimeMs,
  boolean expectResponse,
  int requestTimeoutMs,
  RequestCompletionHandler callback) {
 return new ClientRequest(nodeId, requestBuilder, correlation++, clientId, 
createdTimeMs, expectResponse,
defaultRequestTimeoutMs, callback);
}
{code}

This is an easy fix, but the impact of fixing it is difficult to quantify.  
Clients that set a custom timeout are getting the default timeout of 1000 ms -- 
fixing this will suddenly cause the custom timeout to take effect.



> NetworkClient.newClientRequest() ignores custom request timeout in favor of 
> the default
> ---
>
> Key: KAFKA-7231
> URL: https://issues.apache.org/jira/browse/KAFKA-7231
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.0.0
>Reporter: Ron Dagostino
>Priority: Minor
>
> The below code in {{org.apache.kafka.clients.KafkaClient}} is not passing in 
> the provided {{requestTimeoutMs}} -- it is ignoring it in favor of the 
> {{defaultRequestTimeoutMs}} value.
> {code:java}
> @Override
> public ClientRequest newClientRequest(String nodeId,
>   AbstractRequest.Builder 
> requestBuilder,
>   long createdTimeMs,
>   boolean expectResponse,
>   int requestTimeoutMs,
>   RequestCompletionHandler callback) {
>  return new ClientRequest(nodeId, requestBuilder, correlation++, 
> clientId, createdTimeMs, expectResponse,
> defaultRequestTimeoutMs, callback);
> }
> {code}
> This is an easy fix, but the impact of fixing it is difficult to quantify.  
> Clients that set a custom timeout are getting the default timeout -- fixing 
> this will suddenly cause the custom timeout to take effect.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7231) NetworkClient.newClientRequest() ignores custom request timeout in favor of the default

2018-08-01 Thread Ron Dagostino (JIRA)
Ron Dagostino created KAFKA-7231:


 Summary: NetworkClient.newClientRequest() ignores custom request 
timeout in favor of the default
 Key: KAFKA-7231
 URL: https://issues.apache.org/jira/browse/KAFKA-7231
 Project: Kafka
  Issue Type: Bug
  Components: clients
Affects Versions: 2.0.0
Reporter: Ron Dagostino


The below code in {{org.apache.kafka.clients.KafkaClient}} is not passing in 
the provided {{requestTimeoutMs}} -- it is ignoring it in favor of the 
{{defaultRequestTimeoutMs}} value.
{code:java}
@Override
public ClientRequest newClientRequest(String nodeId,
  AbstractRequest.Builder requestBuilder,
  long createdTimeMs,
  boolean expectResponse,
  int requestTimeoutMs,
  RequestCompletionHandler callback) {
 return new ClientRequest(nodeId, requestBuilder, correlation++, clientId, 
createdTimeMs, expectResponse,
defaultRequestTimeoutMs, callback);
}
{code}

This is an easy fix, but the impact of fixing it is difficult to quantify.  
Clients that set a custom timeout are getting the default timeout of 1000 ms -- 
fixing this will suddenly cause the custom timeout to take effect.




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7209) Kafka stream does not rebalance when one node gets down

2018-08-01 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7209?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16565772#comment-16565772
 ] 

Matthias J. Sax commented on KAFKA-7209:


About `offsets.topic.replication.factor` – if you set it to one, and the 
corresponding broker goes down, you cannot commit offset any longer, thus, you 
might want to set it to 3. Also note, that the number of in-sync replicas 
config is important – the broker default is used, and you can only write to the 
topic if enough in-sync replicas are online. Thus, you should not set it to 3, 
but at max 2 to survive a single broker failure.

For `transaction.state.log.XXX` configs: as long as you don't use exactly-once, 
you can ignore those setting.

For the failure scenarios: can you provide DEBUG logs for the brokers and the 
Streams application so we can dig into it? For the first scenario, after the 
rebalance, the state directories should be created, but we will need Streams 
DEBUG logs to see. For scenario (2) there should not be any data loss – we 
might need Streams and broker logs to dig into it.

For a clean restart with the same application.id, you should check out the 
application reset tool: 
[https://kafka.apache.org/20/documentation/streams/developer-guide/app-reset-tool.html]

Btw: you report this error for 0.11.0.1 and 0.11.0.3 was release recently – I 
would highly recommend to upgrade to 0.11.0.3 and check if the issue is still 
there – there are many bug fixed and the issue might be resolved already.

> Kafka stream does not rebalance when one node gets down
> ---
>
> Key: KAFKA-7209
> URL: https://issues.apache.org/jira/browse/KAFKA-7209
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.1
>Reporter: Yogesh BG
>Priority: Critical
>
> I use kafka streams 0.11.0.1session timeout 6, retries to be int.max and 
> backoff time default
>  
> I have 3 nodes running kafka cluster of 3 broker
> and i am running the 3 kafka stream with same 
> [application.id|http://application.id/]
> each node has one broker one kafka stream application
> everything works fine during setup
> i bringdown one node, so one kafka broker and one streaming app is down
> now i see exceptions in other two streaming apps and it never gets re 
> balanced waited for hours and never comes back to norma
> is there anything am missing?
> i also tried looking into when one broker is down call stream.close, cleanup 
> and restart this also doesn't help
> can anyone help me?
>  
>  
>  
>  One thing i observed lately is that kafka topics with partitions one gets 
> reassigned but i have topics of 16 partitions and replication factor 3. It 
> never settles up



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6868) BufferUnderflowException in client when querying consumer group information

2018-08-01 Thread Sriharsha Chintalapani (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6868?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16565753#comment-16565753
 ] 

Sriharsha Chintalapani commented on KAFKA-6868:
---

[~hachikuji] can we include this into 1.1 branch. This is a critical fix 
affecting the clients from 0.10 onwards to query group offset info. If we are 
planning on doing a future 1.1.x release we should include this Jira as well.

> BufferUnderflowException in client when querying consumer group information
> ---
>
> Key: KAFKA-6868
> URL: https://issues.apache.org/jira/browse/KAFKA-6868
> Project: Kafka
>  Issue Type: Bug
>Reporter: Xavier Léauté
>Assignee: Colin P. McCabe
>Priority: Blocker
>
> Exceptions get thrown when describing consumer group or querying group 
> offsets from a 1.0 cluster
> Stacktrace is a result of calling 
> {{AdminClient.describeConsumerGroups(Collection 
> groupIds).describedGroups().entrySet()}} followed by 
> {{KafkaFuture.whenComplete()}}
> {code}
> java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.protocol.types.SchemaException: Error reading field 
> 'version': java.nio.BufferUnderflowException
>   at 
> org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
>   at 
> org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
>   at 
> org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:104)
>   at 
> org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:274)
>   at
> [snip]
> Caused by: org.apache.kafka.common.protocol.types.SchemaException: Error 
> reading field 'version': java.nio.BufferUnderflowException
>   at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:76)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerProtocol.deserializeAssignment(ConsumerProtocol.java:105)
>   at 
> org.apache.kafka.clients.admin.KafkaAdminClient$21$1.handleResponse(KafkaAdminClient.java:2307)
>   at 
> org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.handleResponses(KafkaAdminClient.java:960)
>   at 
> org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1045)
>   ... 1 more
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-7161) KTable Reduce should check for invalid conditions

2018-08-01 Thread Guozhang Wang (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7161?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-7161.
--
   Resolution: Fixed
Fix Version/s: 2.1.0

> KTable Reduce should check for invalid conditions
> -
>
> Key: KAFKA-7161
> URL: https://issues.apache.org/jira/browse/KAFKA-7161
> Project: Kafka
>  Issue Type: Improvement
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Major
> Fix For: 2.1.0
>
>
> KTableReduce has the opportunity to explicitly check if the state is 
> inconsistent with the oldValues arriving from the stream. If it did so, it 
> could help detect topology changes that needed an app reset and fail fast 
> before any data corruption occurs.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7161) KTable Reduce should check for invalid conditions

2018-08-01 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7161?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16565731#comment-16565731
 ] 

ASF GitHub Bot commented on KAFKA-7161:
---

guozhangwang closed pull request #5366: KAFKA-7161: check invariant: oldValue 
is in the state
URL: https://github.com/apache/kafka/pull/5366
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableReduceTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableReduceTest.java
new file mode 100644
index 000..05b74dca9f4
--- /dev/null
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableReduceTest.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.internals.AbstractProcessorContext;
+import org.apache.kafka.streams.processor.internals.ProcessorNode;
+import org.apache.kafka.streams.state.internals.InMemoryKeyValueStore;
+import org.apache.kafka.test.InternalMockProcessorContext;
+import org.junit.Test;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import static java.util.Collections.emptySet;
+import static java.util.Collections.singleton;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+public class KTableReduceTest {
+
+@Test
+public void shouldAddAndSubtract() {
+final AbstractProcessorContext context = new 
InternalMockProcessorContext();
+
+final Processor>> reduceProcessor =
+new KTableReduce>(
+"myStore",
+this::unionNotNullArgs,
+this::differenceNotNullArgs
+).get();
+
+
+final InMemoryKeyValueStore> myStore =
+new InMemoryKeyValueStore<>("myStore", null, null);
+
+context.register(myStore, null);
+reduceProcessor.init(context);
+context.setCurrentNode(new ProcessorNode<>("reduce", reduceProcessor, 
singleton("myStore")));
+
+reduceProcessor.process("A", new Change<>(singleton("a"), null));
+assertEquals(singleton("a"), myStore.get("A"));
+reduceProcessor.process("A", new Change<>(singleton("b"), 
singleton("a")));
+assertEquals(singleton("b"), myStore.get("A"));
+reduceProcessor.process("A", new Change<>(null, singleton("b")));
+assertEquals(emptySet(), myStore.get("A"));
+}
+
+private Set differenceNotNullArgs(final Set left, final 
Set right) {
+assertNotNull(left);
+assertNotNull(right);
+
+final HashSet strings = new HashSet<>(left);
+strings.removeAll(right);
+return strings;
+}
+
+private Set unionNotNullArgs(final Set left, final 
Set right) {
+assertNotNull(left);
+assertNotNull(right);
+
+final HashSet strings = new HashSet<>();
+strings.addAll(left);
+strings.addAll(right);
+return strings;
+}
+}


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> KTable Reduce should check for invalid conditions
> -
>
> Key: KAFKA-7161
> URL: https://issues.apache.org/jira/browse/KAFKA-7161
> Project: Kafka
>  Issue Type: Improvement
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Major
> Fix For: 2.1.0
>
>
> KTableReduce has the opportunity to explicitly check if the state is 
> inconsistent with the oldV

[jira] [Comment Edited] (KAFKA-7226) kafka-console-consumer.sh doesn't use security.protocol provided in config file

2018-08-01 Thread Manikumar (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16565681#comment-16565681
 ] 

Manikumar edited comment on KAFKA-7226 at 8/1/18 5:37 PM:
--

looks like the issue is specific to HDP Kafka distribution.  Can you raise the 
issue with the HDP vendor?
kafka-console-producer.sh script which is part of Apache Kafka release, does 
not accept "security-protocol" option. It only accepts "producer.config", 
"property" options.


was (Author: omkreddy):
looks like the issue is specific to HDP Kafka distribution.  Can you raise the 
issue with the HDP vendor?
kafka-console-producer.sh script which is part of Apache Kafka release, does 
not accept "--security-protocol" option. It only accepts "--producer.config", 
"--property" options.

> kafka-console-consumer.sh doesn't use security.protocol provided in config 
> file
> ---
>
> Key: KAFKA-7226
> URL: https://issues.apache.org/jira/browse/KAFKA-7226
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 1.0.0
>Reporter: Alexandre GRIFFAUT
>Priority: Minor
>
> It is confusing that kafka-console-consumer.sh doesn't read security.protocol 
> when provided in config file with --producer.config, wheras 
> kafka-console-consumer.sh does...
>  
> With debug activated:
> $ /usr/hdp/2.6.5.0-292/kafka/bin/kafka-console-producer.sh --broker-list 
> $(hostname):6668 --topic test --producer.config 
> /etc/kafka/ssl/kafka.client.properties
> [2018-08-01 14:17:18,505] INFO ProducerConfig values:
> ...
>     security.protocol = PLAINTEXT
> ...
> > abc
> ..
> java.io.EOFException
>     at 
> org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:147)
>     at 
> org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:93)
>     at 
> org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:235)
>     at 
> org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:196)
>     at 
> org.apache.kafka.common.network.Selector.attemptRead(Selector.java:538)
>     at 
> org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:482)
>     at org.apache.kafka.common.network.Selector.poll(Selector.java:412)
>     at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:460)
>     at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239)
>     at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:163)
>     at java.lang.Thread.run(Thread.java:745)
> ...
>  
> The only way produce with SSL, is to use --security-protocol SSL
> kafka-console-consumer.sh correctly read security.protocol from config file



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7219) Add topic/partition level metrics.

2018-08-01 Thread George Teo (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7219?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16565692#comment-16565692
 ] 

George Teo commented on KAFKA-7219:
---

Another slightly bigger ask would be to also emit Lowwatermark metric for each 
topic-partition. This makes it easier to compute retention safety (consumer 
offset - lowwatermark) metrics. This could potentially be put into the log 
cleaner.

> Add topic/partition level metrics.
> --
>
> Key: KAFKA-7219
> URL: https://issues.apache.org/jira/browse/KAFKA-7219
> Project: Kafka
>  Issue Type: Improvement
>  Components: metrics
>Reporter: Satish Duggana
>Assignee: Satish Duggana
>Priority: Major
>  Labels: needs-kip
>
> Currently, Kafka generates different metrics for topics on a broker.
>   - MessagesInPerSec
>   - BytesInPerSec
>   - BytesOutPerSec
>   - BytesRejectedPerSec
>   - ReplicationBytesInPerSec
>   - ReplicationBytesOutPerSec
>   - FailedProduceRequestsPerSec
>   - FailedFetchRequestsPerSec
>   - TotalProduceRequestsPerSec
>   - TotalFetchRequestsPerSec
>   - FetchMessageConversionsPerSec
>   - ProduceMessageConversionsPerSec
> Add metrics for individual partitions instead of having only at topic level. 
> Some of these partition level metrics are useful for monitoring applications 
> to monitor individual topic/partitions.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7230) Empty Record created when producer fails due to RecordTooLargeException

2018-08-01 Thread Julien Fabre (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7230?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Julien Fabre updated KAFKA-7230:

Summary: Empty Record created when producer fails due to 
RecordTooLargeException  (was: Empty Record created when producer failed due to 
RecordTooLargeException)

> Empty Record created when producer fails due to RecordTooLargeException
> ---
>
> Key: KAFKA-7230
> URL: https://issues.apache.org/jira/browse/KAFKA-7230
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 1.1.0
>Reporter: Julien Fabre
>Priority: Major
>
> When a producer tries to produce a RecordBatch (or Message) which is bigger 
> than the message.max.bytes value, it fails with the error 
> {code:java}org.apache.kafka.common.errors.RecordTooLargeException{code}
> BUT an empty Record gets created.
> While hitting the RecordTooLargeException is expected, I was not expecting 
> seeing a new offset with an empty Record in the Topic.
> Is that a problem with Kafka or should the consumer handle this case ?
> It seems that some libraries handle properly this issue but some don't. For 
> example, [latest version of 
> Spark|https://spark.apache.org/docs/2.2.0/streaming-kafka-0-10-integration.html]
>  is failing with errors like :
> {code}
> 18/07/24 06:01:46 WARN TaskSetManager: Lost task 48.0 in stage 0.0 (TID 48, 
> ip-10-50-15-192.us-west-2.compute.internal, executor 14): 
> java.lang.IllegalArgumentException: Unknown compression type id: 7
> at 
> org.apache.kafka.common.record.CompressionType.forId(CompressionType.java:46)
> at org.apache.kafka.common.record.Record.compressionType(Record.java:361)
> at 
> org.apache.kafka.common.record.MemoryRecords$RecordsIterator.makeNext(MemoryRecords.java:297)
> at 
> org.apache.kafka.common.record.MemoryRecords$RecordsIterator.makeNext(MemoryRecords.java:221)
> at 
> org.apache.kafka.common.utils.AbstractIterator.maybeComputeNext(AbstractIterator.java:79)
> at 
> org.apache.kafka.common.utils.AbstractIterator.hasNext(AbstractIterator.java:45)
> at 
> org.apache.kafka.clients.consumer.internals.Fetcher.parseFetchedData(Fetcher.java:545)
> at 
> org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:354)
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1000)
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:938)
> at 
> org.apache.spark.sql.kafka010.CachedKafkaConsumer.poll(CachedKafkaConsumer.scala:298)
> at 
> org.apache.spark.sql.kafka010.CachedKafkaConsumer.org$apache$spark$sql$kafka010$CachedKafkaConsumer$$fetchData(CachedKafkaConsumer.scala:206)
> at 
> org.apache.spark.sql.kafka010.CachedKafkaConsumer$$anonfun$get$1.apply(CachedKafkaConsumer.scala:117)
> at 
> org.apache.spark.sql.kafka010.CachedKafkaConsumer$$anonfun$get$1.apply(CachedKafkaConsumer.scala:106)
> at 
> org.apache.spark.util.UninterruptibleThread.runUninterruptibly(UninterruptibleThread.scala:85)
> at 
> org.apache.spark.sql.kafka010.CachedKafkaConsumer.runUninterruptiblyIfPossible(CachedKafkaConsumer.scala:68)
> at 
> org.apache.spark.sql.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:106)
> at 
> org.apache.spark.sql.kafka010.KafkaSourceRDD$$anon$1.getNext(KafkaSourceRDD.scala:157)
> at 
> org.apache.spark.sql.kafka010.KafkaSourceRDD$$anon$1.getNext(KafkaSourceRDD.scala:148)
> at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
> at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:461)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
> at 
> org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:190)
> at 
> org.apache.spark.sql.execution.SortExec$$anonfun$1.apply(SortExec.scala:108)
> at 
> org.apache.spark.sql.execution.SortExec$$anonfun$1.apply(SortExec.scala:101)
> at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
> at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
> at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
> at org.apache.spark.scheduler.Task.run(Task.scala:108)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadP

[jira] [Updated] (KAFKA-7230) Empty Record created when producer failed due to RecordTooLargeException

2018-08-01 Thread Julien Fabre (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7230?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Julien Fabre updated KAFKA-7230:

Description: 
When a producer tries to produce a RecordBatch which is bigger than the 
message.max.bytes value, it fails with the error 
{code:java}org.apache.kafka.common.errors.RecordTooLargeException{code}
but an empty Record gets created.

While hitting the RecordTooLargeException is expected, I was not expecting 
seeing a new offset with an empty Record in the Topic.

Is that a problem with Kafka or should the consumer handle this case ?

Test setup :
- Kafka 2.11-1.1.0
- The producer is written in Go, using a 
[SyncProducer|https://godoc.org/github.com/Shopify/sarama#SyncProducer] from 
the Sarama library.
- The consumer is kafkacat version 1.3.1-13-ga6b599

Debugs logs from Kafka :
{code}
[2018-08-01 17:21:11,201] DEBUG Accepted connection from /172.17.0.1:33718 on 
/172.17.0.3:9092 and assigned it to processor 1, sendBufferSize 
[actual|requested]: [102400|102400] recvBufferSize [actual|requested]: 
[102400|102400] (kafka.network.Acceptor)
[2018-08-01 17:21:11,201] DEBUG Processor 1 listening to new connection from 
/172.17.0.1:33718 (kafka.network.Processor)
[2018-08-01 17:21:11,203] DEBUG [ReplicaManager broker=1001] Request key 
events-0 unblocked 0 fetch requests. (kafka.server.ReplicaManager)
[2018-08-01 17:21:11,203] DEBUG [Partition events-0 broker=1001] High watermark 
updated to 2 [0 : 136] (kafka.cluster.Partition)
[2018-08-01 17:21:11,203] DEBUG Sessionless fetch context returning 1 
partition(s) (kafka.server.SessionlessFetchContext)
[2018-08-01 17:21:11,204] DEBUG [ReplicaManager broker=1001] Request key 
events-0 unblocked 1 fetch requests. (kafka.server.ReplicaManager)
[2018-08-01 17:21:11,204] DEBUG [ReplicaManager broker=1001] Request key 
events-0 unblocked 0 producer requests. (kafka.server.ReplicaManager)
[2018-08-01 17:21:11,204] DEBUG [ReplicaManager broker=1001] Request key 
events-0 unblocked 0 DeleteRecordsRequest. (kafka.server.ReplicaManager)
[2018-08-01 17:21:11,204] DEBUG [ReplicaManager broker=1001] Produce to local 
log in 2 ms (kafka.server.ReplicaManager)
[2018-08-01 17:21:11,205] DEBUG Created a new full FetchContext with 1 
partition(s). Will not try to create a new session. (kafka.server.FetchManager)
[2018-08-01 17:21:11,210] DEBUG [ReplicaManager broker=1001] Produce to local 
log in 0 ms (kafka.server.ReplicaManager)
[2018-08-01 17:21:11,210] DEBUG [KafkaApi-1001] Produce request with 
correlation id 1 from client sarama on partition events-0 failed due to 
org.apache.kafka.common.errors.RecordTooLargeException (kafka.server.KafkaApis)
{code}

Debug logs from kafkacat :
{code}
%7|1533144071.204|SEND|rdkafka#consumer-1| [thrd:localhost:9092/1001]: 
localhost:9092/1001: Sent FetchRequest (v4, 70 bytes @ 0, CorrId 89)
%7|1533144071.309|RECV|rdkafka#consumer-1| [thrd:localhost:9092/1001]: 
localhost:9092/1001: Received FetchResponse (v4, 50 bytes, CorrId 89, rtt 
104.62ms)
%7|1533144071.309|FETCH|rdkafka#consumer-1| [thrd:localhost:9092/1001]: 
localhost:9092/1001: Topic events [0] MessageSet size 0, error "Success", 
MaxOffset 2, Ver 2/2
%7|1533144071.309|BACKOFF|rdkafka#consumer-1| [thrd:localhost:9092/1001]: 
localhost:9092/1001: events [0]: Fetch backoff for 500ms: Broker: No more 
messages
%7|1533144071.309|FETCH|rdkafka#consumer-1| [thrd:localhost:9092/1001]: 
localhost:9092/1001: Topic events [0] in state active at offset 0 (1/10 
msgs, 0/100 kb queued, opv 2) is not fetchable: fetch backed off
%7|1533144071.309|FETCHADD|rdkafka#consumer-1| [thrd:localhost:9092/1001]: 
localhost:9092/1001: Removed events [0] from fetch list (0 entries, opv 2)
%7|1533144071.309|FETCH|rdkafka#consumer-1| [thrd:localhost:9092/1001]: 
localhost:9092/1001: Fetch backoff for 499ms
% Reached end of topic events [0] at offset 2
{code}


  was:
When a producer try to produce a RecordBatch which is bigger than the 
message.max.bytes value, it fails with the error 
{code:java}org.apache.kafka.common.errors.RecordTooLargeException{code}
but an empty Record gets created.

While hitting the RecordTooLargeException is expected, I was not expecting 
seeing a new offset with an empty Record in the Topic.

Is that a problem with Kafka or should the consumer handle this case ?

Test setup :
- Kafka 2.11-1.1.0
- The producer is written in Go, using a 
[SyncProducer|https://godoc.org/github.com/Shopify/sarama#SyncProducer] from 
the Sarama library.
- The consumer is kafkacat version 1.3.1-13-ga6b599

Debugs logs from Kafka :
{code}
[2018-08-01 17:21:11,201] DEBUG Accepted connection from /172.17.0.1:33718 on 
/172.17.0.3:9092 and assigned it to processor 1, sendBufferSize 
[actual|requested]: [102400|102400] recvBufferSize [actual|requested]: 
[102400|102400] (kafka.network.Acceptor)
[2018-08-01 17:21:11,201] DEBUG Processor 1 listening to new connection from 
/172.17.0.1:33718 (kafka.networ

[jira] [Updated] (KAFKA-7230) Empty Record created when producer failed due to RecordTooLargeException

2018-08-01 Thread Julien Fabre (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7230?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Julien Fabre updated KAFKA-7230:

Description: 
When a producer tries to produce a RecordBatch (or Message) which is bigger 
than the message.max.bytes value, it fails with the error 
{code:java}org.apache.kafka.common.errors.RecordTooLargeException{code}
BUT an empty Record gets created.

While hitting the RecordTooLargeException is expected, I was not expecting 
seeing a new offset with an empty Record in the Topic.

Is that a problem with Kafka or should the consumer handle this case ?

It seems that some libraries handle properly this issue but some don't. For 
example, [latest version of 
Spark|https://spark.apache.org/docs/2.2.0/streaming-kafka-0-10-integration.html]
 is failing with errors like :
{code}
18/07/24 06:01:46 WARN TaskSetManager: Lost task 48.0 in stage 0.0 (TID 48, 
ip-10-50-15-192.us-west-2.compute.internal, executor 14): 
java.lang.IllegalArgumentException: Unknown compression type id: 7
at 
org.apache.kafka.common.record.CompressionType.forId(CompressionType.java:46)
at org.apache.kafka.common.record.Record.compressionType(Record.java:361)
at 
org.apache.kafka.common.record.MemoryRecords$RecordsIterator.makeNext(MemoryRecords.java:297)
at 
org.apache.kafka.common.record.MemoryRecords$RecordsIterator.makeNext(MemoryRecords.java:221)
at 
org.apache.kafka.common.utils.AbstractIterator.maybeComputeNext(AbstractIterator.java:79)
at 
org.apache.kafka.common.utils.AbstractIterator.hasNext(AbstractIterator.java:45)
at 
org.apache.kafka.clients.consumer.internals.Fetcher.parseFetchedData(Fetcher.java:545)
at 
org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:354)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1000)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:938)
at 
org.apache.spark.sql.kafka010.CachedKafkaConsumer.poll(CachedKafkaConsumer.scala:298)
at 
org.apache.spark.sql.kafka010.CachedKafkaConsumer.org$apache$spark$sql$kafka010$CachedKafkaConsumer$$fetchData(CachedKafkaConsumer.scala:206)
at 
org.apache.spark.sql.kafka010.CachedKafkaConsumer$$anonfun$get$1.apply(CachedKafkaConsumer.scala:117)
at 
org.apache.spark.sql.kafka010.CachedKafkaConsumer$$anonfun$get$1.apply(CachedKafkaConsumer.scala:106)
at 
org.apache.spark.util.UninterruptibleThread.runUninterruptibly(UninterruptibleThread.scala:85)
at 
org.apache.spark.sql.kafka010.CachedKafkaConsumer.runUninterruptiblyIfPossible(CachedKafkaConsumer.scala:68)
at 
org.apache.spark.sql.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:106)
at 
org.apache.spark.sql.kafka010.KafkaSourceRDD$$anon$1.getNext(KafkaSourceRDD.scala:157)
at 
org.apache.spark.sql.kafka010.KafkaSourceRDD$$anon$1.getNext(KafkaSourceRDD.scala:148)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:461)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at 
org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:190)
at 
org.apache.spark.sql.execution.SortExec$$anonfun$1.apply(SortExec.scala:108)
at 
org.apache.spark.sql.execution.SortExec$$anonfun$1.apply(SortExec.scala:101)
at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
{code}

Test setup :
- Kafka 2.11-1.1.0
- The producer is written in Go, using a 
[SyncProducer|https://godoc.org/github.com/Shopify/sarama#SyncProducer] from 
the Sarama library.
- The consumer is kafkacat version 1.3.1-13-ga6b599

Debugs logs from Kafka :
{code}
[2018-08-01 17:21:11,201] DEBUG Accepted connection from /172.17.0.1:33718 on 
/172.17.0.3:9092 and assigned it to processor 1, sendBufferSize 
[actual|requested]: [102400|102400] recvBufferSize [actual|requested]: 
[102400|102400] (kafka.network.Acceptor)
[2018-08-01 17:21:11,201] DEBUG

[jira] [Updated] (KAFKA-7230) Empty Record created when producer failed due to RecordTooLargeException

2018-08-01 Thread Julien Fabre (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7230?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Julien Fabre updated KAFKA-7230:

Description: 
When a producer tries to produce a RecordBatch which is bigger than the 
message.max.bytes value, it fails with the error 
{code:java}org.apache.kafka.common.errors.RecordTooLargeException{code}
BUT an empty Record gets created.

While hitting the RecordTooLargeException is expected, I was not expecting 
seeing a new offset with an empty Record in the Topic.

Is that a problem with Kafka or should the consumer handle this case ?

It seems that some libraries handle properly this issue but some don't. For 
example, [latest version of 
Spark|https://spark.apache.org/docs/2.2.0/streaming-kafka-0-10-integration.html]
 is failing with errors like :
{code}
18/07/24 06:01:46 WARN TaskSetManager: Lost task 48.0 in stage 0.0 (TID 48, 
ip-10-50-15-192.us-west-2.compute.internal, executor 14): 
java.lang.IllegalArgumentException: Unknown compression type id: 7
at 
org.apache.kafka.common.record.CompressionType.forId(CompressionType.java:46)
at org.apache.kafka.common.record.Record.compressionType(Record.java:361)
at 
org.apache.kafka.common.record.MemoryRecords$RecordsIterator.makeNext(MemoryRecords.java:297)
at 
org.apache.kafka.common.record.MemoryRecords$RecordsIterator.makeNext(MemoryRecords.java:221)
at 
org.apache.kafka.common.utils.AbstractIterator.maybeComputeNext(AbstractIterator.java:79)
at 
org.apache.kafka.common.utils.AbstractIterator.hasNext(AbstractIterator.java:45)
at 
org.apache.kafka.clients.consumer.internals.Fetcher.parseFetchedData(Fetcher.java:545)
at 
org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:354)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1000)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:938)
at 
org.apache.spark.sql.kafka010.CachedKafkaConsumer.poll(CachedKafkaConsumer.scala:298)
at 
org.apache.spark.sql.kafka010.CachedKafkaConsumer.org$apache$spark$sql$kafka010$CachedKafkaConsumer$$fetchData(CachedKafkaConsumer.scala:206)
at 
org.apache.spark.sql.kafka010.CachedKafkaConsumer$$anonfun$get$1.apply(CachedKafkaConsumer.scala:117)
at 
org.apache.spark.sql.kafka010.CachedKafkaConsumer$$anonfun$get$1.apply(CachedKafkaConsumer.scala:106)
at 
org.apache.spark.util.UninterruptibleThread.runUninterruptibly(UninterruptibleThread.scala:85)
at 
org.apache.spark.sql.kafka010.CachedKafkaConsumer.runUninterruptiblyIfPossible(CachedKafkaConsumer.scala:68)
at 
org.apache.spark.sql.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:106)
at 
org.apache.spark.sql.kafka010.KafkaSourceRDD$$anon$1.getNext(KafkaSourceRDD.scala:157)
at 
org.apache.spark.sql.kafka010.KafkaSourceRDD$$anon$1.getNext(KafkaSourceRDD.scala:148)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:461)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at 
org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:190)
at 
org.apache.spark.sql.execution.SortExec$$anonfun$1.apply(SortExec.scala:108)
at 
org.apache.spark.sql.execution.SortExec$$anonfun$1.apply(SortExec.scala:101)
at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
{code}

Test setup :
- Kafka 2.11-1.1.0
- The producer is written in Go, using a 
[SyncProducer|https://godoc.org/github.com/Shopify/sarama#SyncProducer] from 
the Sarama library.
- The consumer is kafkacat version 1.3.1-13-ga6b599

Debugs logs from Kafka :
{code}
[2018-08-01 17:21:11,201] DEBUG Accepted connection from /172.17.0.1:33718 on 
/172.17.0.3:9092 and assigned it to processor 1, sendBufferSize 
[actual|requested]: [102400|102400] recvBufferSize [actual|requested]: 
[102400|102400] (kafka.network.Acceptor)
[2018-08-01 17:21:11,201] DEBUG Processor 1 

[jira] [Updated] (KAFKA-7230) Empty Record created when producer failed due to RecordTooLargeException

2018-08-01 Thread Julien Fabre (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7230?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Julien Fabre updated KAFKA-7230:

Description: 
When a producer tries to produce a RecordBatch which is bigger than the 
message.max.bytes value, it fails with the error 
{code:java}org.apache.kafka.common.errors.RecordTooLargeException{code}
BUT an empty Record gets created.

While hitting the RecordTooLargeException is expected, I was not expecting 
seeing a new offset with an empty Record in the Topic.

Is that a problem with Kafka or should the consumer handle this case ?

It seems that some libraries handle properly this issue but some don't. For 
example, [latest version of 
Spark|https://spark.apache.org/docs/2.2.0/streaming-kafka-0-10-integration.html]
 is failing with the following error :
{code}
18/07/24 06:01:46 WARN TaskSetManager: Lost task 48.0 in stage 0.0 (TID 48, 
ip-10-50-15-192.us-west-2.compute.internal, executor 14): 
java.lang.IllegalArgumentException: Unknown compression type id: 7
at 
org.apache.kafka.common.record.CompressionType.forId(CompressionType.java:46)
at org.apache.kafka.common.record.Record.compressionType(Record.java:361)
at 
org.apache.kafka.common.record.MemoryRecords$RecordsIterator.makeNext(MemoryRecords.java:297)
at 
org.apache.kafka.common.record.MemoryRecords$RecordsIterator.makeNext(MemoryRecords.java:221)
at 
org.apache.kafka.common.utils.AbstractIterator.maybeComputeNext(AbstractIterator.java:79)
at 
org.apache.kafka.common.utils.AbstractIterator.hasNext(AbstractIterator.java:45)
at 
org.apache.kafka.clients.consumer.internals.Fetcher.parseFetchedData(Fetcher.java:545)
at 
org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:354)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1000)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:938)
at 
org.apache.spark.sql.kafka010.CachedKafkaConsumer.poll(CachedKafkaConsumer.scala:298)
at 
org.apache.spark.sql.kafka010.CachedKafkaConsumer.org$apache$spark$sql$kafka010$CachedKafkaConsumer$$fetchData(CachedKafkaConsumer.scala:206)
at 
org.apache.spark.sql.kafka010.CachedKafkaConsumer$$anonfun$get$1.apply(CachedKafkaConsumer.scala:117)
at 
org.apache.spark.sql.kafka010.CachedKafkaConsumer$$anonfun$get$1.apply(CachedKafkaConsumer.scala:106)
at 
org.apache.spark.util.UninterruptibleThread.runUninterruptibly(UninterruptibleThread.scala:85)
at 
org.apache.spark.sql.kafka010.CachedKafkaConsumer.runUninterruptiblyIfPossible(CachedKafkaConsumer.scala:68)
at 
org.apache.spark.sql.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:106)
at 
org.apache.spark.sql.kafka010.KafkaSourceRDD$$anon$1.getNext(KafkaSourceRDD.scala:157)
at 
org.apache.spark.sql.kafka010.KafkaSourceRDD$$anon$1.getNext(KafkaSourceRDD.scala:148)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:461)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at 
org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:190)
at 
org.apache.spark.sql.execution.SortExec$$anonfun$1.apply(SortExec.scala:108)
at 
org.apache.spark.sql.execution.SortExec$$anonfun$1.apply(SortExec.scala:101)
at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
{code}

Test setup :
- Kafka 2.11-1.1.0
- The producer is written in Go, using a 
[SyncProducer|https://godoc.org/github.com/Shopify/sarama#SyncProducer] from 
the Sarama library.
- The consumer is kafkacat version 1.3.1-13-ga6b599

Debugs logs from Kafka :
{code}
[2018-08-01 17:21:11,201] DEBUG Accepted connection from /172.17.0.1:33718 on 
/172.17.0.3:9092 and assigned it to processor 1, sendBufferSize 
[actual|requested]: [102400|102400] recvBufferSize [actual|requested]: 
[102400|102400] (kafka.network.Acceptor)
[2018-08-01 17:21:11,201] DEBUG Proc

[jira] [Commented] (KAFKA-7226) kafka-console-consumer.sh doesn't use security.protocol provided in config file

2018-08-01 Thread Manikumar (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16565681#comment-16565681
 ] 

Manikumar commented on KAFKA-7226:
--

looks like the issue is specific to HDP Kafka distribution.  Can you raise the 
issue with the HDP vendor?
kafka-console-producer.sh script which is part of Apache Kafka release, does 
not accept "--security-protocol" option. It only accepts "--producer.config", 
"--property" options.

> kafka-console-consumer.sh doesn't use security.protocol provided in config 
> file
> ---
>
> Key: KAFKA-7226
> URL: https://issues.apache.org/jira/browse/KAFKA-7226
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 1.0.0
>Reporter: Alexandre GRIFFAUT
>Priority: Minor
>
> It is confusing that kafka-console-consumer.sh doesn't read security.protocol 
> when provided in config file with --producer.config, wheras 
> kafka-console-consumer.sh does...
>  
> With debug activated:
> $ /usr/hdp/2.6.5.0-292/kafka/bin/kafka-console-producer.sh --broker-list 
> $(hostname):6668 --topic test --producer.config 
> /etc/kafka/ssl/kafka.client.properties
> [2018-08-01 14:17:18,505] INFO ProducerConfig values:
> ...
>     security.protocol = PLAINTEXT
> ...
> > abc
> ..
> java.io.EOFException
>     at 
> org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:147)
>     at 
> org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:93)
>     at 
> org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:235)
>     at 
> org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:196)
>     at 
> org.apache.kafka.common.network.Selector.attemptRead(Selector.java:538)
>     at 
> org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:482)
>     at org.apache.kafka.common.network.Selector.poll(Selector.java:412)
>     at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:460)
>     at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239)
>     at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:163)
>     at java.lang.Thread.run(Thread.java:745)
> ...
>  
> The only way produce with SSL, is to use --security-protocol SSL
> kafka-console-consumer.sh correctly read security.protocol from config file



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7230) Empty Record created when producer failed due to RecordTooLargeException

2018-08-01 Thread Julien Fabre (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7230?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Julien Fabre updated KAFKA-7230:

Description: 
When a producer tries to produce a RecordBatch which is bigger than the 
message.max.bytes value, it fails with the error 
{code:java}org.apache.kafka.common.errors.RecordTooLargeException{code}
BUT an empty Record gets created.

While hitting the RecordTooLargeException is expected, I was not expecting 
seeing a new offset with an empty Record in the Topic.

Is that a problem with Kafka or should the consumer handle this case ?

Test setup :
- Kafka 2.11-1.1.0
- The producer is written in Go, using a 
[SyncProducer|https://godoc.org/github.com/Shopify/sarama#SyncProducer] from 
the Sarama library.
- The consumer is kafkacat version 1.3.1-13-ga6b599

Debugs logs from Kafka :
{code}
[2018-08-01 17:21:11,201] DEBUG Accepted connection from /172.17.0.1:33718 on 
/172.17.0.3:9092 and assigned it to processor 1, sendBufferSize 
[actual|requested]: [102400|102400] recvBufferSize [actual|requested]: 
[102400|102400] (kafka.network.Acceptor)
[2018-08-01 17:21:11,201] DEBUG Processor 1 listening to new connection from 
/172.17.0.1:33718 (kafka.network.Processor)
[2018-08-01 17:21:11,203] DEBUG [ReplicaManager broker=1001] Request key 
events-0 unblocked 0 fetch requests. (kafka.server.ReplicaManager)
[2018-08-01 17:21:11,203] DEBUG [Partition events-0 broker=1001] High watermark 
updated to 2 [0 : 136] (kafka.cluster.Partition)
[2018-08-01 17:21:11,203] DEBUG Sessionless fetch context returning 1 
partition(s) (kafka.server.SessionlessFetchContext)
[2018-08-01 17:21:11,204] DEBUG [ReplicaManager broker=1001] Request key 
events-0 unblocked 1 fetch requests. (kafka.server.ReplicaManager)
[2018-08-01 17:21:11,204] DEBUG [ReplicaManager broker=1001] Request key 
events-0 unblocked 0 producer requests. (kafka.server.ReplicaManager)
[2018-08-01 17:21:11,204] DEBUG [ReplicaManager broker=1001] Request key 
events-0 unblocked 0 DeleteRecordsRequest. (kafka.server.ReplicaManager)
[2018-08-01 17:21:11,204] DEBUG [ReplicaManager broker=1001] Produce to local 
log in 2 ms (kafka.server.ReplicaManager)
[2018-08-01 17:21:11,205] DEBUG Created a new full FetchContext with 1 
partition(s). Will not try to create a new session. (kafka.server.FetchManager)
[2018-08-01 17:21:11,210] DEBUG [ReplicaManager broker=1001] Produce to local 
log in 0 ms (kafka.server.ReplicaManager)
[2018-08-01 17:21:11,210] DEBUG [KafkaApi-1001] Produce request with 
correlation id 1 from client sarama on partition events-0 failed due to 
org.apache.kafka.common.errors.RecordTooLargeException (kafka.server.KafkaApis)
{code}

Debug logs from kafkacat :
{code}
%7|1533144071.204|SEND|rdkafka#consumer-1| [thrd:localhost:9092/1001]: 
localhost:9092/1001: Sent FetchRequest (v4, 70 bytes @ 0, CorrId 89)
%7|1533144071.309|RECV|rdkafka#consumer-1| [thrd:localhost:9092/1001]: 
localhost:9092/1001: Received FetchResponse (v4, 50 bytes, CorrId 89, rtt 
104.62ms)
%7|1533144071.309|FETCH|rdkafka#consumer-1| [thrd:localhost:9092/1001]: 
localhost:9092/1001: Topic events [0] MessageSet size 0, error "Success", 
MaxOffset 2, Ver 2/2
%7|1533144071.309|BACKOFF|rdkafka#consumer-1| [thrd:localhost:9092/1001]: 
localhost:9092/1001: events [0]: Fetch backoff for 500ms: Broker: No more 
messages
%7|1533144071.309|FETCH|rdkafka#consumer-1| [thrd:localhost:9092/1001]: 
localhost:9092/1001: Topic events [0] in state active at offset 0 (1/10 
msgs, 0/100 kb queued, opv 2) is not fetchable: fetch backed off
%7|1533144071.309|FETCHADD|rdkafka#consumer-1| [thrd:localhost:9092/1001]: 
localhost:9092/1001: Removed events [0] from fetch list (0 entries, opv 2)
%7|1533144071.309|FETCH|rdkafka#consumer-1| [thrd:localhost:9092/1001]: 
localhost:9092/1001: Fetch backoff for 499ms
% Reached end of topic events [0] at offset 2
{code}


  was:
When a producer tries to produce a RecordBatch which is bigger than the 
message.max.bytes value, it fails with the error 
{code:java}org.apache.kafka.common.errors.RecordTooLargeException{code}
but an empty Record gets created.

While hitting the RecordTooLargeException is expected, I was not expecting 
seeing a new offset with an empty Record in the Topic.

Is that a problem with Kafka or should the consumer handle this case ?

Test setup :
- Kafka 2.11-1.1.0
- The producer is written in Go, using a 
[SyncProducer|https://godoc.org/github.com/Shopify/sarama#SyncProducer] from 
the Sarama library.
- The consumer is kafkacat version 1.3.1-13-ga6b599

Debugs logs from Kafka :
{code}
[2018-08-01 17:21:11,201] DEBUG Accepted connection from /172.17.0.1:33718 on 
/172.17.0.3:9092 and assigned it to processor 1, sendBufferSize 
[actual|requested]: [102400|102400] recvBufferSize [actual|requested]: 
[102400|102400] (kafka.network.Acceptor)
[2018-08-01 17:21:11,201] DEBUG Processor 1 listening to new connection from 
/172.17.0.1:33718 (kafka.netw

[jira] [Created] (KAFKA-7230) Empty Record created when producer failed due to RecordTooLargeException

2018-08-01 Thread Julien Fabre (JIRA)
Julien Fabre created KAFKA-7230:
---

 Summary: Empty Record created when producer failed due to 
RecordTooLargeException
 Key: KAFKA-7230
 URL: https://issues.apache.org/jira/browse/KAFKA-7230
 Project: Kafka
  Issue Type: Bug
Affects Versions: 1.1.0
Reporter: Julien Fabre


When a producer try to produce a RecordBatch which is bigger than the 
message.max.bytes value, it fails with the error 
{code:java}org.apache.kafka.common.errors.RecordTooLargeException{code}
but an empty Record gets created.

While hitting the RecordTooLargeException is expected, I was not expecting 
seeing a new offset with an empty Record in the Topic.

Is that a problem with Kafka or should the consumer handle this case ?

Test setup :
- Kafka 2.11-1.1.0
- The producer is written in Go, using a 
[SyncProducer|https://godoc.org/github.com/Shopify/sarama#SyncProducer] from 
the Sarama library.
- The consumer is kafkacat version 1.3.1-13-ga6b599

Debugs logs from Kafka :
{code}
[2018-08-01 17:21:11,201] DEBUG Accepted connection from /172.17.0.1:33718 on 
/172.17.0.3:9092 and assigned it to processor 1, sendBufferSize 
[actual|requested]: [102400|102400] recvBufferSize [actual|requested]: 
[102400|102400] (kafka.network.Acceptor)
[2018-08-01 17:21:11,201] DEBUG Processor 1 listening to new connection from 
/172.17.0.1:33718 (kafka.network.Processor)
[2018-08-01 17:21:11,203] DEBUG [ReplicaManager broker=1001] Request key 
events-0 unblocked 0 fetch requests. (kafka.server.ReplicaManager)
[2018-08-01 17:21:11,203] DEBUG [Partition events-0 broker=1001] High watermark 
updated to 2 [0 : 136] (kafka.cluster.Partition)
[2018-08-01 17:21:11,203] DEBUG Sessionless fetch context returning 1 
partition(s) (kafka.server.SessionlessFetchContext)
[2018-08-01 17:21:11,204] DEBUG [ReplicaManager broker=1001] Request key 
events-0 unblocked 1 fetch requests. (kafka.server.ReplicaManager)
[2018-08-01 17:21:11,204] DEBUG [ReplicaManager broker=1001] Request key 
events-0 unblocked 0 producer requests. (kafka.server.ReplicaManager)
[2018-08-01 17:21:11,204] DEBUG [ReplicaManager broker=1001] Request key 
events-0 unblocked 0 DeleteRecordsRequest. (kafka.server.ReplicaManager)
[2018-08-01 17:21:11,204] DEBUG [ReplicaManager broker=1001] Produce to local 
log in 2 ms (kafka.server.ReplicaManager)
[2018-08-01 17:21:11,205] DEBUG Created a new full FetchContext with 1 
partition(s). Will not try to create a new session. (kafka.server.FetchManager)
[2018-08-01 17:21:11,210] DEBUG [ReplicaManager broker=1001] Produce to local 
log in 0 ms (kafka.server.ReplicaManager)
[2018-08-01 17:21:11,210] DEBUG [KafkaApi-1001] Produce request with 
correlation id 1 from client sarama on partition events-0 failed due to 
org.apache.kafka.common.errors.RecordTooLargeException (kafka.server.KafkaApis)
{code}

Debug logs from kafkacat :
{code}
%7|1533144071.204|SEND|rdkafka#consumer-1| [thrd:localhost:9092/1001]: 
localhost:9092/1001: Sent FetchRequest (v4, 70 bytes @ 0, CorrId 89)
%7|1533144071.309|RECV|rdkafka#consumer-1| [thrd:localhost:9092/1001]: 
localhost:9092/1001: Received FetchResponse (v4, 50 bytes, CorrId 89, rtt 
104.62ms)
%7|1533144071.309|FETCH|rdkafka#consumer-1| [thrd:localhost:9092/1001]: 
localhost:9092/1001: Topic events [0] MessageSet size 0, error "Success", 
MaxOffset 2, Ver 2/2
%7|1533144071.309|BACKOFF|rdkafka#consumer-1| [thrd:localhost:9092/1001]: 
localhost:9092/1001: events [0]: Fetch backoff for 500ms: Broker: No more 
messages
%7|1533144071.309|FETCH|rdkafka#consumer-1| [thrd:localhost:9092/1001]: 
localhost:9092/1001: Topic events [0] in state active at offset 0 (1/10 
msgs, 0/100 kb queued, opv 2) is not fetchable: fetch backed off
%7|1533144071.309|FETCHADD|rdkafka#consumer-1| [thrd:localhost:9092/1001]: 
localhost:9092/1001: Removed events [0] from fetch list (0 entries, opv 2)
%7|1533144071.309|FETCH|rdkafka#consumer-1| [thrd:localhost:9092/1001]: 
localhost:9092/1001: Fetch backoff for 499ms
% Reached end of topic events [0] at offset 2
{code}




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7219) Add topic/partition level metrics.

2018-08-01 Thread Sriharsha Chintalapani (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7219?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16565641#comment-16565641
 ] 

Sriharsha Chintalapani commented on KAFKA-7219:
---

[~satish.duggana] one more suggestion to add a Highwatermark metric to each 
topic/partition. This will make it easier to calculate the consumer lag from 
metrics instead of making a broker request to get the highwatermark for a topic 
partition.

> Add topic/partition level metrics.
> --
>
> Key: KAFKA-7219
> URL: https://issues.apache.org/jira/browse/KAFKA-7219
> Project: Kafka
>  Issue Type: Improvement
>  Components: metrics
>Reporter: Satish Duggana
>Assignee: Satish Duggana
>Priority: Major
>  Labels: needs-kip
>
> Currently, Kafka generates different metrics for topics on a broker.
>   - MessagesInPerSec
>   - BytesInPerSec
>   - BytesOutPerSec
>   - BytesRejectedPerSec
>   - ReplicationBytesInPerSec
>   - ReplicationBytesOutPerSec
>   - FailedProduceRequestsPerSec
>   - FailedFetchRequestsPerSec
>   - TotalProduceRequestsPerSec
>   - TotalFetchRequestsPerSec
>   - FetchMessageConversionsPerSec
>   - ProduceMessageConversionsPerSec
> Add metrics for individual partitions instead of having only at topic level. 
> Some of these partition level metrics are useful for monitoring applications 
> to monitor individual topic/partitions.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7229) Failed to dynamically update kafka certificate in kafka 2.0.0

2018-08-01 Thread Yu Yang (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7229?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yu Yang updated KAFKA-7229:
---
Priority: Major  (was: Critical)

> Failed to dynamically update kafka certificate in kafka 2.0.0
> -
>
> Key: KAFKA-7229
> URL: https://issues.apache.org/jira/browse/KAFKA-7229
> Project: Kafka
>  Issue Type: Bug
>  Components: security
>Affects Versions: 2.0.0
> Environment: Ubuntu 14.04.5 LTS
>Reporter: Yu Yang
>Priority: Major
>
> In kafka 1.1, we use the following command in a cron job to dynamically 
> update the certificate that kafka uses :
> kafka-configs.sh --bootstrap-server localhost:9093 --command-config 
> /var/pinterest/kafka/client.properties --alter --add-config 
> listener.name.ssl.ssl.keystore.location=/var/certs/kafka/kafka.keystore.jks.1533141082.38
>  --entity-type brokers --entity-name 9 
> In kafka 2.0.0, the command fails with the following exception: 
> [2018-08-01 16:38:01,480] ERROR [AdminClient clientId=adminclient-1] 
> Connection to node -1 failed authentication due to: SSL handshake failed 
> (org.apache.kafka.clients.NetworkClient)
> Error while executing config command with args '--bootstrap-server 
> localhost:9093 --command-config /var/pinterest/kafka/client.properties 
> --alter --add-config 
> listener.name.ssl.ssl.keystore.location=/var/pinterest/kafka/kafka.keystore.jks.1533141082.38
>  --entity-type brokers --entity-name 9'
> java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.SslAuthenticationException: SSL handshake 
> failed
>   at 
> org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
>   at 
> org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
>   at 
> org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:104)
>   at 
> org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:274)
>   at kafka.admin.ConfigCommand$.brokerConfig(ConfigCommand.scala:346)
>   at kafka.admin.ConfigCommand$.alterBrokerConfig(ConfigCommand.scala:304)
>   at 
> kafka.admin.ConfigCommand$.processBrokerConfig(ConfigCommand.scala:290)
>   at kafka.admin.ConfigCommand$.main(ConfigCommand.scala:83)
>   at kafka.admin.ConfigCommand.main(ConfigCommand.scala)
> Caused by: org.apache.kafka.common.errors.SslAuthenticationException: SSL 
> handshake failed
> Caused by: javax.net.ssl.SSLHandshakeException: General SSLEngine problem
>   at sun.security.ssl.Handshaker.checkThrown(Handshaker.java:1478)
>   at 
> sun.security.ssl.SSLEngineImpl.checkTaskThrown(SSLEngineImpl.java:535)
>   at 
> sun.security.ssl.SSLEngineImpl.writeAppRecord(SSLEngineImpl.java:1214)
>   at sun.security.ssl.SSLEngineImpl.wrap(SSLEngineImpl.java:1186)
>   at javax.net.ssl.SSLEngine.wrap(SSLEngine.java:469)
>   at 
> org.apache.kafka.common.network.SslTransportLayer.handshakeWrap(SslTransportLayer.java:439)
>   at 
> org.apache.kafka.common.network.SslTransportLayer.doHandshake(SslTransportLayer.java:304)
>   at 
> org.apache.kafka.common.network.SslTransportLayer.handshake(SslTransportLayer.java:258)
>   at 
> org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:125)
>   at 
> org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:487)
>   at org.apache.kafka.common.network.Selector.poll(Selector.java:425)
>   at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:510)
>   at 
> org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1116)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: javax.net.ssl.SSLHandshakeException: General SSLEngine problem
>   at sun.security.ssl.Alerts.getSSLException(Alerts.java:192)
>   at sun.security.ssl.SSLEngineImpl.fatal(SSLEngineImpl.java:1728)
>   at sun.security.ssl.Handshaker.fatalSE(Handshaker.java:304)
>   at sun.security.ssl.Handshaker.fatalSE(Handshaker.java:296)
>   at 
> sun.security.ssl.ClientHandshaker.serverCertificate(ClientHandshaker.java:1514)
>   at 
> sun.security.ssl.ClientHandshaker.processMessage(ClientHandshaker.java:216)
>   at sun.security.ssl.Handshaker.processLoop(Handshaker.java:1026)
>   at sun.security.ssl.Handshaker$1.run(Handshaker.java:966)
>   at sun.security.ssl.Handshaker$1.run(Handshaker.java:963)
>   at java.security.AccessController.doPrivileged(Native Method)
>   at sun.security.ssl.Handshaker$DelegatedTask.run(Handshaker.java:1416)
>   at 
> org.apache.kafka.common.network.SslTransportLayer.runDelegatedTasks(SslTransportLayer.java:393)
>   at 
> org.apache.kafka.common.network.SslTransportLayer.handshakeUnwrap(SslTransportLayer.java

[jira] [Issue Comment Deleted] (KAFKA-7229) Failed to dynamically update kafka certificate in kafka 2.0.0

2018-08-01 Thread Manikumar (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7229?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manikumar updated KAFKA-7229:
-
Comment: was deleted

(was: This may be due to the default value change for 
"ssl.endpoint.identification.algorithm" config. 
In 2.0.0 release, The default value for ssl.endpoint.identification.algorithm 
was changed to https.
We can set ssl.endpoint.identification.algorithm to an empty string to restore 
the previous behaviour.

http://kafka.apache.org/documentation/#upgrade_200_notable)

> Failed to dynamically update kafka certificate in kafka 2.0.0
> -
>
> Key: KAFKA-7229
> URL: https://issues.apache.org/jira/browse/KAFKA-7229
> Project: Kafka
>  Issue Type: Bug
>  Components: security
>Affects Versions: 2.0.0
> Environment: Ubuntu 14.04.5 LTS
>Reporter: Yu Yang
>Priority: Critical
>
> In kafka 1.1, we use the following command in a cron job to dynamically 
> update the certificate that kafka uses :
> kafka-configs.sh --bootstrap-server localhost:9093 --command-config 
> /var/pinterest/kafka/client.properties --alter --add-config 
> listener.name.ssl.ssl.keystore.location=/var/certs/kafka/kafka.keystore.jks.1533141082.38
>  --entity-type brokers --entity-name 9 
> In kafka 2.0.0, the command fails with the following exception: 
> [2018-08-01 16:38:01,480] ERROR [AdminClient clientId=adminclient-1] 
> Connection to node -1 failed authentication due to: SSL handshake failed 
> (org.apache.kafka.clients.NetworkClient)
> Error while executing config command with args '--bootstrap-server 
> localhost:9093 --command-config /var/pinterest/kafka/client.properties 
> --alter --add-config 
> listener.name.ssl.ssl.keystore.location=/var/pinterest/kafka/kafka.keystore.jks.1533141082.38
>  --entity-type brokers --entity-name 9'
> java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.SslAuthenticationException: SSL handshake 
> failed
>   at 
> org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
>   at 
> org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
>   at 
> org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:104)
>   at 
> org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:274)
>   at kafka.admin.ConfigCommand$.brokerConfig(ConfigCommand.scala:346)
>   at kafka.admin.ConfigCommand$.alterBrokerConfig(ConfigCommand.scala:304)
>   at 
> kafka.admin.ConfigCommand$.processBrokerConfig(ConfigCommand.scala:290)
>   at kafka.admin.ConfigCommand$.main(ConfigCommand.scala:83)
>   at kafka.admin.ConfigCommand.main(ConfigCommand.scala)
> Caused by: org.apache.kafka.common.errors.SslAuthenticationException: SSL 
> handshake failed
> Caused by: javax.net.ssl.SSLHandshakeException: General SSLEngine problem
>   at sun.security.ssl.Handshaker.checkThrown(Handshaker.java:1478)
>   at 
> sun.security.ssl.SSLEngineImpl.checkTaskThrown(SSLEngineImpl.java:535)
>   at 
> sun.security.ssl.SSLEngineImpl.writeAppRecord(SSLEngineImpl.java:1214)
>   at sun.security.ssl.SSLEngineImpl.wrap(SSLEngineImpl.java:1186)
>   at javax.net.ssl.SSLEngine.wrap(SSLEngine.java:469)
>   at 
> org.apache.kafka.common.network.SslTransportLayer.handshakeWrap(SslTransportLayer.java:439)
>   at 
> org.apache.kafka.common.network.SslTransportLayer.doHandshake(SslTransportLayer.java:304)
>   at 
> org.apache.kafka.common.network.SslTransportLayer.handshake(SslTransportLayer.java:258)
>   at 
> org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:125)
>   at 
> org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:487)
>   at org.apache.kafka.common.network.Selector.poll(Selector.java:425)
>   at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:510)
>   at 
> org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1116)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: javax.net.ssl.SSLHandshakeException: General SSLEngine problem
>   at sun.security.ssl.Alerts.getSSLException(Alerts.java:192)
>   at sun.security.ssl.SSLEngineImpl.fatal(SSLEngineImpl.java:1728)
>   at sun.security.ssl.Handshaker.fatalSE(Handshaker.java:304)
>   at sun.security.ssl.Handshaker.fatalSE(Handshaker.java:296)
>   at 
> sun.security.ssl.ClientHandshaker.serverCertificate(ClientHandshaker.java:1514)
>   at 
> sun.security.ssl.ClientHandshaker.processMessage(ClientHandshaker.java:216)
>   at sun.security.ssl.Handshaker.processLoop(Handshaker.java:1026)
>   at sun.security.ssl.Handshaker$1.run(Handshaker.java:966)
>   at sun.security.ssl.Handshaker$1.run(Handshaker.java:

[jira] [Commented] (KAFKA-7229) Failed to dynamically update kafka certificate in kafka 2.0.0

2018-08-01 Thread Manikumar (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16565625#comment-16565625
 ] 

Manikumar commented on KAFKA-7229:
--

This may be due to the default value change for 
"ssl.endpoint.identification.algorithm" config. 
In 2.0.0 release, The default value for ssl.endpoint.identification.algorithm 
was changed to https.
We can set ssl.endpoint.identification.algorithm to an empty string to restore 
the previous behaviour.

http://kafka.apache.org/documentation/#upgrade_200_notable

> Failed to dynamically update kafka certificate in kafka 2.0.0
> -
>
> Key: KAFKA-7229
> URL: https://issues.apache.org/jira/browse/KAFKA-7229
> Project: Kafka
>  Issue Type: Bug
>  Components: security
>Affects Versions: 2.0.0
> Environment: Ubuntu 14.04.5 LTS
>Reporter: Yu Yang
>Priority: Critical
>
> In kafka 1.1, we use the following command in a cron job to dynamically 
> update the certificate that kafka uses :
> kafka-configs.sh --bootstrap-server localhost:9093 --command-config 
> /var/pinterest/kafka/client.properties --alter --add-config 
> listener.name.ssl.ssl.keystore.location=/var/certs/kafka/kafka.keystore.jks.1533141082.38
>  --entity-type brokers --entity-name 9 
> In kafka 2.0.0, the command fails with the following exception: 
> [2018-08-01 16:38:01,480] ERROR [AdminClient clientId=adminclient-1] 
> Connection to node -1 failed authentication due to: SSL handshake failed 
> (org.apache.kafka.clients.NetworkClient)
> Error while executing config command with args '--bootstrap-server 
> localhost:9093 --command-config /var/pinterest/kafka/client.properties 
> --alter --add-config 
> listener.name.ssl.ssl.keystore.location=/var/pinterest/kafka/kafka.keystore.jks.1533141082.38
>  --entity-type brokers --entity-name 9'
> java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.SslAuthenticationException: SSL handshake 
> failed
>   at 
> org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
>   at 
> org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
>   at 
> org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:104)
>   at 
> org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:274)
>   at kafka.admin.ConfigCommand$.brokerConfig(ConfigCommand.scala:346)
>   at kafka.admin.ConfigCommand$.alterBrokerConfig(ConfigCommand.scala:304)
>   at 
> kafka.admin.ConfigCommand$.processBrokerConfig(ConfigCommand.scala:290)
>   at kafka.admin.ConfigCommand$.main(ConfigCommand.scala:83)
>   at kafka.admin.ConfigCommand.main(ConfigCommand.scala)
> Caused by: org.apache.kafka.common.errors.SslAuthenticationException: SSL 
> handshake failed
> Caused by: javax.net.ssl.SSLHandshakeException: General SSLEngine problem
>   at sun.security.ssl.Handshaker.checkThrown(Handshaker.java:1478)
>   at 
> sun.security.ssl.SSLEngineImpl.checkTaskThrown(SSLEngineImpl.java:535)
>   at 
> sun.security.ssl.SSLEngineImpl.writeAppRecord(SSLEngineImpl.java:1214)
>   at sun.security.ssl.SSLEngineImpl.wrap(SSLEngineImpl.java:1186)
>   at javax.net.ssl.SSLEngine.wrap(SSLEngine.java:469)
>   at 
> org.apache.kafka.common.network.SslTransportLayer.handshakeWrap(SslTransportLayer.java:439)
>   at 
> org.apache.kafka.common.network.SslTransportLayer.doHandshake(SslTransportLayer.java:304)
>   at 
> org.apache.kafka.common.network.SslTransportLayer.handshake(SslTransportLayer.java:258)
>   at 
> org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:125)
>   at 
> org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:487)
>   at org.apache.kafka.common.network.Selector.poll(Selector.java:425)
>   at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:510)
>   at 
> org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1116)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: javax.net.ssl.SSLHandshakeException: General SSLEngine problem
>   at sun.security.ssl.Alerts.getSSLException(Alerts.java:192)
>   at sun.security.ssl.SSLEngineImpl.fatal(SSLEngineImpl.java:1728)
>   at sun.security.ssl.Handshaker.fatalSE(Handshaker.java:304)
>   at sun.security.ssl.Handshaker.fatalSE(Handshaker.java:296)
>   at 
> sun.security.ssl.ClientHandshaker.serverCertificate(ClientHandshaker.java:1514)
>   at 
> sun.security.ssl.ClientHandshaker.processMessage(ClientHandshaker.java:216)
>   at sun.security.ssl.Handshaker.processLoop(Handshaker.java:1026)
>   at sun.security.ssl.Handshaker$1.run(Handshaker.java:966)
>   at sun.security.ssl.Handshak

[jira] [Resolved] (KAFKA-7229) Failed to dynamically update kafka certificate in kafka 2.0.0

2018-08-01 Thread Yu Yang (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7229?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yu Yang resolved KAFKA-7229.

Resolution: Not A Problem

> Failed to dynamically update kafka certificate in kafka 2.0.0
> -
>
> Key: KAFKA-7229
> URL: https://issues.apache.org/jira/browse/KAFKA-7229
> Project: Kafka
>  Issue Type: Bug
>  Components: security
>Affects Versions: 2.0.0
> Environment: Ubuntu 14.04.5 LTS
>Reporter: Yu Yang
>Priority: Critical
>
> In kafka 1.1, we use the following command in a cron job to dynamically 
> update the certificate that kafka uses :
> kafka-configs.sh --bootstrap-server localhost:9093 --command-config 
> /var/pinterest/kafka/client.properties --alter --add-config 
> listener.name.ssl.ssl.keystore.location=/var/certs/kafka/kafka.keystore.jks.1533141082.38
>  --entity-type brokers --entity-name 9 
> In kafka 2.0.0, the command fails with the following exception: 
> [2018-08-01 16:38:01,480] ERROR [AdminClient clientId=adminclient-1] 
> Connection to node -1 failed authentication due to: SSL handshake failed 
> (org.apache.kafka.clients.NetworkClient)
> Error while executing config command with args '--bootstrap-server 
> localhost:9093 --command-config /var/pinterest/kafka/client.properties 
> --alter --add-config 
> listener.name.ssl.ssl.keystore.location=/var/pinterest/kafka/kafka.keystore.jks.1533141082.38
>  --entity-type brokers --entity-name 9'
> java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.SslAuthenticationException: SSL handshake 
> failed
>   at 
> org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
>   at 
> org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
>   at 
> org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:104)
>   at 
> org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:274)
>   at kafka.admin.ConfigCommand$.brokerConfig(ConfigCommand.scala:346)
>   at kafka.admin.ConfigCommand$.alterBrokerConfig(ConfigCommand.scala:304)
>   at 
> kafka.admin.ConfigCommand$.processBrokerConfig(ConfigCommand.scala:290)
>   at kafka.admin.ConfigCommand$.main(ConfigCommand.scala:83)
>   at kafka.admin.ConfigCommand.main(ConfigCommand.scala)
> Caused by: org.apache.kafka.common.errors.SslAuthenticationException: SSL 
> handshake failed
> Caused by: javax.net.ssl.SSLHandshakeException: General SSLEngine problem
>   at sun.security.ssl.Handshaker.checkThrown(Handshaker.java:1478)
>   at 
> sun.security.ssl.SSLEngineImpl.checkTaskThrown(SSLEngineImpl.java:535)
>   at 
> sun.security.ssl.SSLEngineImpl.writeAppRecord(SSLEngineImpl.java:1214)
>   at sun.security.ssl.SSLEngineImpl.wrap(SSLEngineImpl.java:1186)
>   at javax.net.ssl.SSLEngine.wrap(SSLEngine.java:469)
>   at 
> org.apache.kafka.common.network.SslTransportLayer.handshakeWrap(SslTransportLayer.java:439)
>   at 
> org.apache.kafka.common.network.SslTransportLayer.doHandshake(SslTransportLayer.java:304)
>   at 
> org.apache.kafka.common.network.SslTransportLayer.handshake(SslTransportLayer.java:258)
>   at 
> org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:125)
>   at 
> org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:487)
>   at org.apache.kafka.common.network.Selector.poll(Selector.java:425)
>   at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:510)
>   at 
> org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1116)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: javax.net.ssl.SSLHandshakeException: General SSLEngine problem
>   at sun.security.ssl.Alerts.getSSLException(Alerts.java:192)
>   at sun.security.ssl.SSLEngineImpl.fatal(SSLEngineImpl.java:1728)
>   at sun.security.ssl.Handshaker.fatalSE(Handshaker.java:304)
>   at sun.security.ssl.Handshaker.fatalSE(Handshaker.java:296)
>   at 
> sun.security.ssl.ClientHandshaker.serverCertificate(ClientHandshaker.java:1514)
>   at 
> sun.security.ssl.ClientHandshaker.processMessage(ClientHandshaker.java:216)
>   at sun.security.ssl.Handshaker.processLoop(Handshaker.java:1026)
>   at sun.security.ssl.Handshaker$1.run(Handshaker.java:966)
>   at sun.security.ssl.Handshaker$1.run(Handshaker.java:963)
>   at java.security.AccessController.doPrivileged(Native Method)
>   at sun.security.ssl.Handshaker$DelegatedTask.run(Handshaker.java:1416)
>   at 
> org.apache.kafka.common.network.SslTransportLayer.runDelegatedTasks(SslTransportLayer.java:393)
>   at 
> org.apache.kafka.common.network.SslTransportLayer.handshakeUnwrap(SslTransportLayer.java:4

[jira] [Commented] (KAFKA-7229) Failed to dynamically update kafka certificate in kafka 2.0.0

2018-08-01 Thread Rajini Sivaram (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16565615#comment-16565615
 ] 

Rajini Sivaram commented on KAFKA-7229:
---

In 2.0.0, we enable SSL host name verification by default. If your certificates 
don't contain host name, you can disable this verification. For the command 
above, in the command configuration file 
/var/pinterest/kafka/client.properties, you should set:
{quote}ssl.endpoint.identification.algorithm=
{quote}

> Failed to dynamically update kafka certificate in kafka 2.0.0
> -
>
> Key: KAFKA-7229
> URL: https://issues.apache.org/jira/browse/KAFKA-7229
> Project: Kafka
>  Issue Type: Bug
>  Components: security
>Affects Versions: 2.0.0
> Environment: Ubuntu 14.04.5 LTS
>Reporter: Yu Yang
>Priority: Critical
>
> In kafka 1.1, we use the following command in a cron job to dynamically 
> update the certificate that kafka uses :
> kafka-configs.sh --bootstrap-server localhost:9093 --command-config 
> /var/pinterest/kafka/client.properties --alter --add-config 
> listener.name.ssl.ssl.keystore.location=/var/certs/kafka/kafka.keystore.jks.1533141082.38
>  --entity-type brokers --entity-name 9 
> In kafka 2.0.0, the command fails with the following exception: 
> [2018-08-01 16:38:01,480] ERROR [AdminClient clientId=adminclient-1] 
> Connection to node -1 failed authentication due to: SSL handshake failed 
> (org.apache.kafka.clients.NetworkClient)
> Error while executing config command with args '--bootstrap-server 
> localhost:9093 --command-config /var/pinterest/kafka/client.properties 
> --alter --add-config 
> listener.name.ssl.ssl.keystore.location=/var/pinterest/kafka/kafka.keystore.jks.1533141082.38
>  --entity-type brokers --entity-name 9'
> java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.SslAuthenticationException: SSL handshake 
> failed
>   at 
> org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
>   at 
> org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
>   at 
> org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:104)
>   at 
> org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:274)
>   at kafka.admin.ConfigCommand$.brokerConfig(ConfigCommand.scala:346)
>   at kafka.admin.ConfigCommand$.alterBrokerConfig(ConfigCommand.scala:304)
>   at 
> kafka.admin.ConfigCommand$.processBrokerConfig(ConfigCommand.scala:290)
>   at kafka.admin.ConfigCommand$.main(ConfigCommand.scala:83)
>   at kafka.admin.ConfigCommand.main(ConfigCommand.scala)
> Caused by: org.apache.kafka.common.errors.SslAuthenticationException: SSL 
> handshake failed
> Caused by: javax.net.ssl.SSLHandshakeException: General SSLEngine problem
>   at sun.security.ssl.Handshaker.checkThrown(Handshaker.java:1478)
>   at 
> sun.security.ssl.SSLEngineImpl.checkTaskThrown(SSLEngineImpl.java:535)
>   at 
> sun.security.ssl.SSLEngineImpl.writeAppRecord(SSLEngineImpl.java:1214)
>   at sun.security.ssl.SSLEngineImpl.wrap(SSLEngineImpl.java:1186)
>   at javax.net.ssl.SSLEngine.wrap(SSLEngine.java:469)
>   at 
> org.apache.kafka.common.network.SslTransportLayer.handshakeWrap(SslTransportLayer.java:439)
>   at 
> org.apache.kafka.common.network.SslTransportLayer.doHandshake(SslTransportLayer.java:304)
>   at 
> org.apache.kafka.common.network.SslTransportLayer.handshake(SslTransportLayer.java:258)
>   at 
> org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:125)
>   at 
> org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:487)
>   at org.apache.kafka.common.network.Selector.poll(Selector.java:425)
>   at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:510)
>   at 
> org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1116)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: javax.net.ssl.SSLHandshakeException: General SSLEngine problem
>   at sun.security.ssl.Alerts.getSSLException(Alerts.java:192)
>   at sun.security.ssl.SSLEngineImpl.fatal(SSLEngineImpl.java:1728)
>   at sun.security.ssl.Handshaker.fatalSE(Handshaker.java:304)
>   at sun.security.ssl.Handshaker.fatalSE(Handshaker.java:296)
>   at 
> sun.security.ssl.ClientHandshaker.serverCertificate(ClientHandshaker.java:1514)
>   at 
> sun.security.ssl.ClientHandshaker.processMessage(ClientHandshaker.java:216)
>   at sun.security.ssl.Handshaker.processLoop(Handshaker.java:1026)
>   at sun.security.ssl.Handshaker$1.run(Handshaker.java:966)
>   at sun.security.ssl.Handshaker$1.run(Handshaker.java:963)
>   at ja

[jira] [Updated] (KAFKA-7144) Kafka Streams doesn't properly balance partition assignment

2018-08-01 Thread Guozhang Wang (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7144?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-7144:
-
Fix Version/s: 2.0.1

> Kafka Streams doesn't properly balance partition assignment
> ---
>
> Key: KAFKA-7144
> URL: https://issues.apache.org/jira/browse/KAFKA-7144
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.1.0
>Reporter: James Cheng
>Assignee: Bill Bejeck
>Priority: Major
> Fix For: 2.0.1, 2.1.0
>
> Attachments: OneThenTwelve.java
>
>
> Kafka Streams doesn't always spread the tasks across all available 
> instances/threads
> I have a topology which consumes a single partition topic and goes .through() 
> a 12 partition topic. The makes 13 partitions.
>  
> I then started 2 instances of the application. I would have expected the 13 
> partitions to be split across the 2 instances roughly evenly (7 partitions on 
> one, 6 partitions on the other).
> Instead, one instance gets 12 partitions, and the other instance gets 1 
> partition.
>  
> Repro case attached. I ran it a couple times, and it was fairly repeatable.
> Setup for the repro:
> {code:java}
> $ ./bin/kafka-topics.sh --zookeeper localhost --create --topic one 
> --partitions 1 --replication-factor 1 
> $ ./bin/kafka-topics.sh --zookeeper localhost --create --topic twelve 
> --partitions 12 --replication-factor 1
> $ echo foo | kafkacat -P -b 127.0.0.1 -t one
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7229) Failed to dynamically update kafka certificate in kafka 2.0.0

2018-08-01 Thread Ismael Juma (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16565613#comment-16565613
 ] 

Ismael Juma commented on KAFKA-7229:


Looks like you have to disable hostname verification, which is enabled by 
default in 2.0.0.

> Failed to dynamically update kafka certificate in kafka 2.0.0
> -
>
> Key: KAFKA-7229
> URL: https://issues.apache.org/jira/browse/KAFKA-7229
> Project: Kafka
>  Issue Type: Bug
>  Components: security
>Affects Versions: 2.0.0
> Environment: Ubuntu 14.04.5 LTS
>Reporter: Yu Yang
>Priority: Critical
>
> In kafka 1.1, we use the following command in a cron job to dynamically 
> update the certificate that kafka uses :
> kafka-configs.sh --bootstrap-server localhost:9093 --command-config 
> /var/pinterest/kafka/client.properties --alter --add-config 
> listener.name.ssl.ssl.keystore.location=/var/certs/kafka/kafka.keystore.jks.1533141082.38
>  --entity-type brokers --entity-name 9 
> In kafka 2.0.0, the command fails with the following exception: 
> [2018-08-01 16:38:01,480] ERROR [AdminClient clientId=adminclient-1] 
> Connection to node -1 failed authentication due to: SSL handshake failed 
> (org.apache.kafka.clients.NetworkClient)
> Error while executing config command with args '--bootstrap-server 
> localhost:9093 --command-config /var/pinterest/kafka/client.properties 
> --alter --add-config 
> listener.name.ssl.ssl.keystore.location=/var/pinterest/kafka/kafka.keystore.jks.1533141082.38
>  --entity-type brokers --entity-name 9'
> java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.SslAuthenticationException: SSL handshake 
> failed
>   at 
> org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
>   at 
> org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
>   at 
> org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:104)
>   at 
> org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:274)
>   at kafka.admin.ConfigCommand$.brokerConfig(ConfigCommand.scala:346)
>   at kafka.admin.ConfigCommand$.alterBrokerConfig(ConfigCommand.scala:304)
>   at 
> kafka.admin.ConfigCommand$.processBrokerConfig(ConfigCommand.scala:290)
>   at kafka.admin.ConfigCommand$.main(ConfigCommand.scala:83)
>   at kafka.admin.ConfigCommand.main(ConfigCommand.scala)
> Caused by: org.apache.kafka.common.errors.SslAuthenticationException: SSL 
> handshake failed
> Caused by: javax.net.ssl.SSLHandshakeException: General SSLEngine problem
>   at sun.security.ssl.Handshaker.checkThrown(Handshaker.java:1478)
>   at 
> sun.security.ssl.SSLEngineImpl.checkTaskThrown(SSLEngineImpl.java:535)
>   at 
> sun.security.ssl.SSLEngineImpl.writeAppRecord(SSLEngineImpl.java:1214)
>   at sun.security.ssl.SSLEngineImpl.wrap(SSLEngineImpl.java:1186)
>   at javax.net.ssl.SSLEngine.wrap(SSLEngine.java:469)
>   at 
> org.apache.kafka.common.network.SslTransportLayer.handshakeWrap(SslTransportLayer.java:439)
>   at 
> org.apache.kafka.common.network.SslTransportLayer.doHandshake(SslTransportLayer.java:304)
>   at 
> org.apache.kafka.common.network.SslTransportLayer.handshake(SslTransportLayer.java:258)
>   at 
> org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:125)
>   at 
> org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:487)
>   at org.apache.kafka.common.network.Selector.poll(Selector.java:425)
>   at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:510)
>   at 
> org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1116)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: javax.net.ssl.SSLHandshakeException: General SSLEngine problem
>   at sun.security.ssl.Alerts.getSSLException(Alerts.java:192)
>   at sun.security.ssl.SSLEngineImpl.fatal(SSLEngineImpl.java:1728)
>   at sun.security.ssl.Handshaker.fatalSE(Handshaker.java:304)
>   at sun.security.ssl.Handshaker.fatalSE(Handshaker.java:296)
>   at 
> sun.security.ssl.ClientHandshaker.serverCertificate(ClientHandshaker.java:1514)
>   at 
> sun.security.ssl.ClientHandshaker.processMessage(ClientHandshaker.java:216)
>   at sun.security.ssl.Handshaker.processLoop(Handshaker.java:1026)
>   at sun.security.ssl.Handshaker$1.run(Handshaker.java:966)
>   at sun.security.ssl.Handshaker$1.run(Handshaker.java:963)
>   at java.security.AccessController.doPrivileged(Native Method)
>   at sun.security.ssl.Handshaker$DelegatedTask.run(Handshaker.java:1416)
>   at 
> org.apache.kafka.common.network.SslTransportLayer.runDelegatedTasks(SslTr

[jira] [Updated] (KAFKA-7229) Failed to dynamically update kafka certificate in kafka 2.0.0

2018-08-01 Thread Yu Yang (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7229?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yu Yang updated KAFKA-7229:
---
Priority: Critical  (was: Major)

> Failed to dynamically update kafka certificate in kafka 2.0.0
> -
>
> Key: KAFKA-7229
> URL: https://issues.apache.org/jira/browse/KAFKA-7229
> Project: Kafka
>  Issue Type: Bug
>  Components: security
>Affects Versions: 2.0.0
> Environment: Ubuntu 14.04.5 LTS
>Reporter: Yu Yang
>Priority: Critical
>
> In kafka 1.1, we use the following command in a cron job to dynamically 
> update the certificate that kafka uses :
> kafka-configs.sh --bootstrap-server localhost:9093 --command-config 
> /var/pinterest/kafka/client.properties --alter --add-config 
> listener.name.ssl.ssl.keystore.location=/var/certs/kafka/kafka.keystore.jks.1533141082.38
>  --entity-type brokers --entity-name 9 
> In kafka 2.0.0, the command fails with the following exception: 
> [2018-08-01 16:38:01,480] ERROR [AdminClient clientId=adminclient-1] 
> Connection to node -1 failed authentication due to: SSL handshake failed 
> (org.apache.kafka.clients.NetworkClient)
> Error while executing config command with args '--bootstrap-server 
> localhost:9093 --command-config /var/pinterest/kafka/client.properties 
> --alter --add-config 
> listener.name.ssl.ssl.keystore.location=/var/pinterest/kafka/kafka.keystore.jks.1533141082.38
>  --entity-type brokers --entity-name 9'
> java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.SslAuthenticationException: SSL handshake 
> failed
>   at 
> org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
>   at 
> org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
>   at 
> org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:104)
>   at 
> org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:274)
>   at kafka.admin.ConfigCommand$.brokerConfig(ConfigCommand.scala:346)
>   at kafka.admin.ConfigCommand$.alterBrokerConfig(ConfigCommand.scala:304)
>   at 
> kafka.admin.ConfigCommand$.processBrokerConfig(ConfigCommand.scala:290)
>   at kafka.admin.ConfigCommand$.main(ConfigCommand.scala:83)
>   at kafka.admin.ConfigCommand.main(ConfigCommand.scala)
> Caused by: org.apache.kafka.common.errors.SslAuthenticationException: SSL 
> handshake failed
> Caused by: javax.net.ssl.SSLHandshakeException: General SSLEngine problem
>   at sun.security.ssl.Handshaker.checkThrown(Handshaker.java:1478)
>   at 
> sun.security.ssl.SSLEngineImpl.checkTaskThrown(SSLEngineImpl.java:535)
>   at 
> sun.security.ssl.SSLEngineImpl.writeAppRecord(SSLEngineImpl.java:1214)
>   at sun.security.ssl.SSLEngineImpl.wrap(SSLEngineImpl.java:1186)
>   at javax.net.ssl.SSLEngine.wrap(SSLEngine.java:469)
>   at 
> org.apache.kafka.common.network.SslTransportLayer.handshakeWrap(SslTransportLayer.java:439)
>   at 
> org.apache.kafka.common.network.SslTransportLayer.doHandshake(SslTransportLayer.java:304)
>   at 
> org.apache.kafka.common.network.SslTransportLayer.handshake(SslTransportLayer.java:258)
>   at 
> org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:125)
>   at 
> org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:487)
>   at org.apache.kafka.common.network.Selector.poll(Selector.java:425)
>   at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:510)
>   at 
> org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1116)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: javax.net.ssl.SSLHandshakeException: General SSLEngine problem
>   at sun.security.ssl.Alerts.getSSLException(Alerts.java:192)
>   at sun.security.ssl.SSLEngineImpl.fatal(SSLEngineImpl.java:1728)
>   at sun.security.ssl.Handshaker.fatalSE(Handshaker.java:304)
>   at sun.security.ssl.Handshaker.fatalSE(Handshaker.java:296)
>   at 
> sun.security.ssl.ClientHandshaker.serverCertificate(ClientHandshaker.java:1514)
>   at 
> sun.security.ssl.ClientHandshaker.processMessage(ClientHandshaker.java:216)
>   at sun.security.ssl.Handshaker.processLoop(Handshaker.java:1026)
>   at sun.security.ssl.Handshaker$1.run(Handshaker.java:966)
>   at sun.security.ssl.Handshaker$1.run(Handshaker.java:963)
>   at java.security.AccessController.doPrivileged(Native Method)
>   at sun.security.ssl.Handshaker$DelegatedTask.run(Handshaker.java:1416)
>   at 
> org.apache.kafka.common.network.SslTransportLayer.runDelegatedTasks(SslTransportLayer.java:393)
>   at 
> org.apache.kafka.common.network.SslTransportLayer.handshakeUnwrap(SslTransportLayer.j

[jira] [Updated] (KAFKA-7229) Failed to dynamically update kafka certificate in kafka 2.0.0

2018-08-01 Thread Yu Yang (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7229?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yu Yang updated KAFKA-7229:
---
Description: 
In kafka 1.1, we use the following command in a cron job to dynamically update 
the certificate that kafka uses :

kafka-configs.sh --bootstrap-server localhost:9093 --command-config 
/var/pinterest/kafka/client.properties --alter --add-config 
listener.name.ssl.ssl.keystore.location=/var/certs/kafka/kafka.keystore.jks.1533141082.38
 --entity-type brokers --entity-name 9 

In kafka 2.0.0, the command fails with the following exception: 



[2018-08-01 16:38:01,480] ERROR [AdminClient clientId=adminclient-1] Connection 
to node -1 failed authentication due to: SSL handshake failed 
(org.apache.kafka.clients.NetworkClient)
Error while executing config command with args '--bootstrap-server 
localhost:9093 --command-config /var/pinterest/kafka/client.properties --alter 
--add-config 
listener.name.ssl.ssl.keystore.location=/var/pinterest/kafka/kafka.keystore.jks.1533141082.38
 --entity-type brokers --entity-name 9'
java.util.concurrent.ExecutionException: 
org.apache.kafka.common.errors.SslAuthenticationException: SSL handshake failed
at 
org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
at 
org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
at 
org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:104)
at 
org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:274)
at kafka.admin.ConfigCommand$.brokerConfig(ConfigCommand.scala:346)
at kafka.admin.ConfigCommand$.alterBrokerConfig(ConfigCommand.scala:304)
at 
kafka.admin.ConfigCommand$.processBrokerConfig(ConfigCommand.scala:290)
at kafka.admin.ConfigCommand$.main(ConfigCommand.scala:83)
at kafka.admin.ConfigCommand.main(ConfigCommand.scala)
Caused by: org.apache.kafka.common.errors.SslAuthenticationException: SSL 
handshake failed
Caused by: javax.net.ssl.SSLHandshakeException: General SSLEngine problem
at sun.security.ssl.Handshaker.checkThrown(Handshaker.java:1478)
at 
sun.security.ssl.SSLEngineImpl.checkTaskThrown(SSLEngineImpl.java:535)
at 
sun.security.ssl.SSLEngineImpl.writeAppRecord(SSLEngineImpl.java:1214)
at sun.security.ssl.SSLEngineImpl.wrap(SSLEngineImpl.java:1186)
at javax.net.ssl.SSLEngine.wrap(SSLEngine.java:469)
at 
org.apache.kafka.common.network.SslTransportLayer.handshakeWrap(SslTransportLayer.java:439)
at 
org.apache.kafka.common.network.SslTransportLayer.doHandshake(SslTransportLayer.java:304)
at 
org.apache.kafka.common.network.SslTransportLayer.handshake(SslTransportLayer.java:258)
at 
org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:125)
at 
org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:487)
at org.apache.kafka.common.network.Selector.poll(Selector.java:425)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:510)
at 
org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1116)
at java.lang.Thread.run(Thread.java:748)
Caused by: javax.net.ssl.SSLHandshakeException: General SSLEngine problem
at sun.security.ssl.Alerts.getSSLException(Alerts.java:192)
at sun.security.ssl.SSLEngineImpl.fatal(SSLEngineImpl.java:1728)
at sun.security.ssl.Handshaker.fatalSE(Handshaker.java:304)
at sun.security.ssl.Handshaker.fatalSE(Handshaker.java:296)
at 
sun.security.ssl.ClientHandshaker.serverCertificate(ClientHandshaker.java:1514)
at 
sun.security.ssl.ClientHandshaker.processMessage(ClientHandshaker.java:216)
at sun.security.ssl.Handshaker.processLoop(Handshaker.java:1026)
at sun.security.ssl.Handshaker$1.run(Handshaker.java:966)
at sun.security.ssl.Handshaker$1.run(Handshaker.java:963)
at java.security.AccessController.doPrivileged(Native Method)
at sun.security.ssl.Handshaker$DelegatedTask.run(Handshaker.java:1416)
at 
org.apache.kafka.common.network.SslTransportLayer.runDelegatedTasks(SslTransportLayer.java:393)
at 
org.apache.kafka.common.network.SslTransportLayer.handshakeUnwrap(SslTransportLayer.java:473)
at 
org.apache.kafka.common.network.SslTransportLayer.doHandshake(SslTransportLayer.java:331)
... 7 more
Caused by: java.security.cert.CertificateException: No subject alternative DNS 
name matching localhost found.
at sun.security.util.HostnameChecker.matchDNS(HostnameChecker.java:204)
at sun.security.util.HostnameChecker.match(HostnameChecker.java:95)
at 
sun.security.ssl.X509TrustManagerImpl.checkIdentity(X509TrustManagerImpl.java:455)
at 
sun.security.ssl.X509TrustManagerImpl.checkIdentit

[jira] [Created] (KAFKA-7229) Failed to dynamically update kafka certificate in kafka 2.0.0

2018-08-01 Thread Yu Yang (JIRA)
Yu Yang created KAFKA-7229:
--

 Summary: Failed to dynamically update kafka certificate in kafka 
2.0.0
 Key: KAFKA-7229
 URL: https://issues.apache.org/jira/browse/KAFKA-7229
 Project: Kafka
  Issue Type: Bug
  Components: security
Affects Versions: 2.0.0
 Environment: Ubuntu 14.04.5 LTS
Reporter: Yu Yang


In kafka 1.1, we use the following command to dynamically update the 
certificate that kafka uses :

kafka-configs.sh --bootstrap-server localhost:9093 --command-config 
/var/pinterest/kafka/client.properties --alter --add-config 
listener.name.ssl.ssl.keystore.location=/var/certs/kafka/kafka.keystore.jks.1533141082.38
 --entity-type brokers --entity-name 9 

In kafka 2.0.0, the command fails with the following exception: 



[2018-08-01 16:38:01,480] ERROR [AdminClient clientId=adminclient-1] Connection 
to node -1 failed authentication due to: SSL handshake failed 
(org.apache.kafka.clients.NetworkClient)
Error while executing config command with args '--bootstrap-server 
localhost:9093 --command-config /var/pinterest/kafka/client.properties --alter 
--add-config 
listener.name.ssl.ssl.keystore.location=/var/pinterest/kafka/kafka.keystore.jks.1533141082.38
 --entity-type brokers --entity-name 9'
java.util.concurrent.ExecutionException: 
org.apache.kafka.common.errors.SslAuthenticationException: SSL handshake failed
at 
org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
at 
org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
at 
org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:104)
at 
org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:274)
at kafka.admin.ConfigCommand$.brokerConfig(ConfigCommand.scala:346)
at kafka.admin.ConfigCommand$.alterBrokerConfig(ConfigCommand.scala:304)
at 
kafka.admin.ConfigCommand$.processBrokerConfig(ConfigCommand.scala:290)
at kafka.admin.ConfigCommand$.main(ConfigCommand.scala:83)
at kafka.admin.ConfigCommand.main(ConfigCommand.scala)
Caused by: org.apache.kafka.common.errors.SslAuthenticationException: SSL 
handshake failed
Caused by: javax.net.ssl.SSLHandshakeException: General SSLEngine problem
at sun.security.ssl.Handshaker.checkThrown(Handshaker.java:1478)
at 
sun.security.ssl.SSLEngineImpl.checkTaskThrown(SSLEngineImpl.java:535)
at 
sun.security.ssl.SSLEngineImpl.writeAppRecord(SSLEngineImpl.java:1214)
at sun.security.ssl.SSLEngineImpl.wrap(SSLEngineImpl.java:1186)
at javax.net.ssl.SSLEngine.wrap(SSLEngine.java:469)
at 
org.apache.kafka.common.network.SslTransportLayer.handshakeWrap(SslTransportLayer.java:439)
at 
org.apache.kafka.common.network.SslTransportLayer.doHandshake(SslTransportLayer.java:304)
at 
org.apache.kafka.common.network.SslTransportLayer.handshake(SslTransportLayer.java:258)
at 
org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:125)
at 
org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:487)
at org.apache.kafka.common.network.Selector.poll(Selector.java:425)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:510)
at 
org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1116)
at java.lang.Thread.run(Thread.java:748)
Caused by: javax.net.ssl.SSLHandshakeException: General SSLEngine problem
at sun.security.ssl.Alerts.getSSLException(Alerts.java:192)
at sun.security.ssl.SSLEngineImpl.fatal(SSLEngineImpl.java:1728)
at sun.security.ssl.Handshaker.fatalSE(Handshaker.java:304)
at sun.security.ssl.Handshaker.fatalSE(Handshaker.java:296)
at 
sun.security.ssl.ClientHandshaker.serverCertificate(ClientHandshaker.java:1514)
at 
sun.security.ssl.ClientHandshaker.processMessage(ClientHandshaker.java:216)
at sun.security.ssl.Handshaker.processLoop(Handshaker.java:1026)
at sun.security.ssl.Handshaker$1.run(Handshaker.java:966)
at sun.security.ssl.Handshaker$1.run(Handshaker.java:963)
at java.security.AccessController.doPrivileged(Native Method)
at sun.security.ssl.Handshaker$DelegatedTask.run(Handshaker.java:1416)
at 
org.apache.kafka.common.network.SslTransportLayer.runDelegatedTasks(SslTransportLayer.java:393)
at 
org.apache.kafka.common.network.SslTransportLayer.handshakeUnwrap(SslTransportLayer.java:473)
at 
org.apache.kafka.common.network.SslTransportLayer.doHandshake(SslTransportLayer.java:331)
... 7 more
Caused by: java.security.cert.CertificateException: No subject alternative DNS 
name matching localhost found.
at sun.security.util.HostnameChecker.matchDNS(HostnameChecker.java:204)
at sun.

[jira] [Commented] (KAFKA-7228) DeadLetterQueue throws a NullPointerException

2018-08-01 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7228?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16565540#comment-16565540
 ] 

ASF GitHub Bot commented on KAFKA-7228:
---

wicknicks opened a new pull request #5440: KAFKA-7228: Set errorHandlingMetrics 
for dead letter queue
URL: https://github.com/apache/kafka/pull/5440
 
 
   DLQ reporter does not get a errorHandlingMetrics object when created by the 
worker.
   
   Signed-off-by: Arjun Satish 
   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> DeadLetterQueue throws a NullPointerException
> -
>
> Key: KAFKA-7228
> URL: https://issues.apache.org/jira/browse/KAFKA-7228
> Project: Kafka
>  Issue Type: Task
>  Components: KafkaConnect
>Reporter: Arjun Satish
>Assignee: Arjun Satish
>Priority: Major
>
> Using the dead letter queue results in a NPE: 
> {code:java}
> [2018-08-01 07:41:08,907] ERROR WorkerSinkTask{id=s3-sink-yanivr-2-4} Task 
> threw an uncaught and unrecoverable exception 
> (org.apache.kafka.connect.runtime.WorkerTask)
> java.lang.NullPointerException
> at 
> org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter.report(DeadLetterQueueReporter.java:124)
> at 
> org.apache.kafka.connect.runtime.errors.ProcessingContext.report(ProcessingContext.java:137)
> at 
> org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:108)
> at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:513)
> at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:490)
> at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:321)
> at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:225)
> at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:193)
> at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
> at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> [2018-08-01 07:41:08,908] ERROR WorkerSinkTask{id=s3-sink-yanivr-2-4} Task is 
> being killed and will not recover until manually restarted 
> (org.apache.kafka.connect.runtime.WorkerTask)
> {code}
> DLQ reporter does not get a {{errorHandlingMetrics}} object when initialized 
> through the WorkerSinkTask.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7228) DeadLetterQueue throws a NullPointerException

2018-08-01 Thread Arjun Satish (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7228?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Arjun Satish updated KAFKA-7228:

Description: 
Using the dead letter queue results in a NPE: 

{code:java}
[2018-08-01 07:41:08,907] ERROR WorkerSinkTask{id=s3-sink-yanivr-2-4} Task 
threw an uncaught and unrecoverable exception 
(org.apache.kafka.connect.runtime.WorkerTask)
java.lang.NullPointerException
at 
org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter.report(DeadLetterQueueReporter.java:124)
at 
org.apache.kafka.connect.runtime.errors.ProcessingContext.report(ProcessingContext.java:137)
at 
org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:108)
at 
org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:513)
at 
org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:490)
at 
org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:321)
at 
org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:225)
at 
org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:193)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
[2018-08-01 07:41:08,908] ERROR WorkerSinkTask{id=s3-sink-yanivr-2-4} Task is 
being killed and will not recover until manually restarted 
(org.apache.kafka.connect.runtime.WorkerTask)
{code}

DLQ reporter does not get a {{errorHandlingMetrics}} object when initialized 
through the WorkerSinkTask.

  was:
Using the dead letter queue results in a NPE: 

{code:java}
[2018-08-01 07:41:08,907] ERROR WorkerSinkTask{id=s3-sink-yanivr-2-4} Task 
threw an uncaught and unrecoverable exception 
(org.apache.kafka.connect.runtime.WorkerTask)
java.lang.NullPointerException
at 
org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter.report(DeadLetterQueueReporter.java:124)
at 
org.apache.kafka.connect.runtime.errors.ProcessingContext.report(ProcessingContext.java:137)
at 
org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:108)
at 
org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:513)
at 
org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:490)
at 
org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:321)
at 
org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:225)
at 
org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:193)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
[2018-08-01 07:41:08,908] ERROR WorkerSinkTask{id=s3-sink-yanivr-2-4} Task is 
being killed and will not recover until manually restarted 
(org.apache.kafka.connect.runtime.WorkerTask)
{code}


> DeadLetterQueue throws a NullPointerException
> -
>
> Key: KAFKA-7228
> URL: https://issues.apache.org/jira/browse/KAFKA-7228
> Project: Kafka
>  Issue Type: Task
>  Components: KafkaConnect
>Reporter: Arjun Satish
>Assignee: Arjun Satish
>Priority: Major
>
> Using the dead letter queue results in a NPE: 
> {code:java}
> [2018-08-01 07:41:08,907] ERROR WorkerSinkTask{id=s3-sink-yanivr-2-4} Task 
> threw an uncaught and unrecoverable exception 
> (org.apache.kafka.connect.runtime.WorkerTask)
> java.lang.NullPointerException
> at 
> org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter.report(DeadLetterQueueReporter.java:124)
> at 
> org.apache.kafka.connect.runtime.errors.ProcessingContext.report(ProcessingContext.java:137)
> at 
> org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:108)
> at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.jav

[jira] [Created] (KAFKA-7228) DeadLetterQueue throws a NullPointerException

2018-08-01 Thread Arjun Satish (JIRA)
Arjun Satish created KAFKA-7228:
---

 Summary: DeadLetterQueue throws a NullPointerException
 Key: KAFKA-7228
 URL: https://issues.apache.org/jira/browse/KAFKA-7228
 Project: Kafka
  Issue Type: Task
  Components: KafkaConnect
Reporter: Arjun Satish


Using the dead letter queue results in a NPE: 

{code:java}
[2018-08-01 07:41:08,907] ERROR WorkerSinkTask{id=s3-sink-yanivr-2-4} Task 
threw an uncaught and unrecoverable exception 
(org.apache.kafka.connect.runtime.WorkerTask)
java.lang.NullPointerException
at 
org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter.report(DeadLetterQueueReporter.java:124)
at 
org.apache.kafka.connect.runtime.errors.ProcessingContext.report(ProcessingContext.java:137)
at 
org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:108)
at 
org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:513)
at 
org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:490)
at 
org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:321)
at 
org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:225)
at 
org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:193)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
[2018-08-01 07:41:08,908] ERROR WorkerSinkTask{id=s3-sink-yanivr-2-4} Task is 
being killed and will not recover until manually restarted 
(org.apache.kafka.connect.runtime.WorkerTask)
{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-7228) DeadLetterQueue throws a NullPointerException

2018-08-01 Thread Arjun Satish (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7228?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Arjun Satish reassigned KAFKA-7228:
---

Assignee: Arjun Satish

> DeadLetterQueue throws a NullPointerException
> -
>
> Key: KAFKA-7228
> URL: https://issues.apache.org/jira/browse/KAFKA-7228
> Project: Kafka
>  Issue Type: Task
>  Components: KafkaConnect
>Reporter: Arjun Satish
>Assignee: Arjun Satish
>Priority: Major
>
> Using the dead letter queue results in a NPE: 
> {code:java}
> [2018-08-01 07:41:08,907] ERROR WorkerSinkTask{id=s3-sink-yanivr-2-4} Task 
> threw an uncaught and unrecoverable exception 
> (org.apache.kafka.connect.runtime.WorkerTask)
> java.lang.NullPointerException
> at 
> org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter.report(DeadLetterQueueReporter.java:124)
> at 
> org.apache.kafka.connect.runtime.errors.ProcessingContext.report(ProcessingContext.java:137)
> at 
> org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:108)
> at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:513)
> at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:490)
> at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:321)
> at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:225)
> at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:193)
> at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
> at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> [2018-08-01 07:41:08,908] ERROR WorkerSinkTask{id=s3-sink-yanivr-2-4} Task is 
> being killed and will not recover until manually restarted 
> (org.apache.kafka.connect.runtime.WorkerTask)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7215) Improve LogCleaner behavior on error

2018-08-01 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7215?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16565498#comment-16565498
 ] 

ASF GitHub Bot commented on KAFKA-7215:
---

stanislavkozlovski opened a new pull request #5439: KAFKA-7215: Improve 
LogCleaner Error Handling
URL: https://github.com/apache/kafka/pull/5439
 
 
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   - Catch `KafkaStorageException` whose cause is `IOException` in 
`CleanerThread#clean()`'s delete part. Properly mark the directory which caused 
the IOException as "offline"
   - Catch any exceptions raised during log cleaning and mark the partition 
which caused it as "uncleanable". The log cleaner does not attempt to clean 
said partitions afterwards
   - Introduce new config - `max.uncleanable.partitions`. After X unclean 
partitions in the same log directory, mark the log directory as offline 
(assuming it is a disk problem)
   
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   KIP: 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-346+-+Improve+LogCleaner+behavior+on+error
   JIRA: https://issues.apache.org/jira/browse/KAFKA-7215
   
   Some notes and troubles:
   I had to create a mock `LogCleanerManagerMock`, because I had problems 
creating/setting/reading checkpoint files to ensure my tests worked. Not sure 
if this is the best approach to the problem
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Improve LogCleaner behavior on error
> 
>
> Key: KAFKA-7215
> URL: https://issues.apache.org/jira/browse/KAFKA-7215
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Stanislav Kozlovski
>Assignee: Stanislav Kozlovski
>Priority: Minor
>
> For more detailed information see 
> [KIP-346|https://cwiki.apache.org/confluence/display/KAFKA/KIP-346+-+Improve+LogCleaner+behavior+on+error]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-5587) Processor got uncaught exception: NullPointerException

2018-08-01 Thread lambdaliu (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-5587?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lambdaliu updated KAFKA-5587:
-
Affects Version/s: 0.10.2.1

> Processor got uncaught exception: NullPointerException
> --
>
> Key: KAFKA-5587
> URL: https://issues.apache.org/jira/browse/KAFKA-5587
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.10.1.1, 0.10.2.1
>Reporter: Dan
>Assignee: Rajini Sivaram
>Priority: Major
> Fix For: 0.11.0.1, 1.0.0
>
>
> [2017-07-12 21:56:39,964] ERROR Processor got uncaught exception. 
> (kafka.network.Processor)
> java.lang.NullPointerException
> at 
> kafka.network.Processor$$anonfun$processCompletedReceives$1.apply(SocketServer.scala:490)
> at 
> kafka.network.Processor$$anonfun$processCompletedReceives$1.apply(SocketServer.scala:487)
> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> at 
> kafka.network.Processor.processCompletedReceives(SocketServer.scala:487)
> at kafka.network.Processor.run(SocketServer.scala:417)
> at java.lang.Thread.run(Thread.java:745)
> Anyone knows the cause of this exception? What's the effect of it? 
> When this exception occurred, the log also showed that the broker was 
> frequently shrinking ISR to itself. Are these two things interrelated?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7227) Kafka cluster should has a default rack id if not set

2018-08-01 Thread Genmao Yu (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7227?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Genmao Yu updated KAFKA-7227:
-
Priority: Minor  (was: Major)

> Kafka cluster should has a default rack id if not set
> -
>
> Key: KAFKA-7227
> URL: https://issues.apache.org/jira/browse/KAFKA-7227
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 1.0.0, 2.0.0
>Reporter: Genmao Yu
>Priority: Minor
>
> In current behavior, if partial of broker machines have "server.rack",  we 
> will failed to create topic with exception like "Not all brokers have rack 
> information for replica rack aware assignment.". There is no big problem in 
> local cluster. But, this behavior may not be very suitable for cloud 
> environment. In cloud environment, there may be different types of machine, 
> some has rack info, while others do not has rack info. It could happen in our 
> kafka cluster for many reasons, like understock of specific machine.
> So, We can consider to give each broker a default rack id. 
>  * if not set any rack info, it is the same with previous behavior
>  * if partial of brokers are set with rack info, we can think of the 
> remaining brokers as same rack.
> As far as know, if not set rack id, DataNode/NodeManager has a default rack 
> id in Hadoop. 
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7227) Kafka cluster should has a default rack id if not set

2018-08-01 Thread Genmao Yu (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7227?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Genmao Yu updated KAFKA-7227:
-
Description: 
In current behavior, if partial of broker machines have "server.rack",  we will 
failed to create topic with exception like "Not all brokers have rack 
information for replica rack aware assignment.". There is no big problem in 
local cluster. But, this behavior may not be very suitable for cloud 
environment. In cloud environment, there may be different types of machine, 
some has rack info, while others do not has rack info. It could happen in our 
kafka cluster for many reasons, like understock of specific machine.

So, We can consider to give each broker a default rack id. 
 * if not set any rack info, it is the same with previous behavior
 * if partial of brokers are set with rack info, we can think of the remaining 
brokers as same rack.

As far as know, if not set rack id, DataNode/NodeManager has a default rack id 
in Hadoop. 

 

 

  was:
In current behavior, if partial of broker machines have "server.rack",  we will 
failed to create topic with exception like "Not all brokers have rack 
information". There is no big problem in local cluster. But, this behavior may 
not be very suitable for cloud environment. In cloud environment, there may be 
different types of machine, some has rack info, while others do not has rack 
info. It could happen in our kafka cluster for many reasons, like understock of 
specific machine.

So, We can consider to give each broker a default rack id. 
 * if not set any rack info, it is the same with previous behavior
 * if partial of brokers are set with rack info, we can think of the remaining 
brokers as same rack.

As far as know, if not set rack id, DataNode/NodeManager has a default rack id 
in Hadoop. 

 

 


> Kafka cluster should has a default rack id if not set
> -
>
> Key: KAFKA-7227
> URL: https://issues.apache.org/jira/browse/KAFKA-7227
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 1.0.0, 2.0.0
>Reporter: Genmao Yu
>Priority: Major
>
> In current behavior, if partial of broker machines have "server.rack",  we 
> will failed to create topic with exception like "Not all brokers have rack 
> information for replica rack aware assignment.". There is no big problem in 
> local cluster. But, this behavior may not be very suitable for cloud 
> environment. In cloud environment, there may be different types of machine, 
> some has rack info, while others do not has rack info. It could happen in our 
> kafka cluster for many reasons, like understock of specific machine.
> So, We can consider to give each broker a default rack id. 
>  * if not set any rack info, it is the same with previous behavior
>  * if partial of brokers are set with rack info, we can think of the 
> remaining brokers as same rack.
> As far as know, if not set rack id, DataNode/NodeManager has a default rack 
> id in Hadoop. 
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7216) Exception while running kafka-acls.sh from 1.0 env on target Kafka env with 1.1.1

2018-08-01 Thread Satish Duggana (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7216?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16565242#comment-16565242
 ] 

Satish Duggana commented on KAFKA-7216:
---

[~ijuma] I set component as `admin` as `AclCommand` is one of the commands in 
`kafka.admin` package. You may want to change that to respective component if 
admin is not right the component for this issue.

> Exception while running kafka-acls.sh from 1.0 env on target Kafka env with 
> 1.1.1
> -
>
> Key: KAFKA-7216
> URL: https://issues.apache.org/jira/browse/KAFKA-7216
> Project: Kafka
>  Issue Type: Bug
>  Components: admin
>Affects Versions: 1.1.0, 1.1.1, 2.0.0
>Reporter: Satish Duggana
>Priority: Major
>
> When `kafka-acls.sh` with SimpleAclAuthorizer on target Kafka cluster with 
> 1.1.1 version, it throws the below error.
> {code:java}
> kafka.common.KafkaException: DelegationToken not a valid resourceType name. 
> The valid names are Topic,Group,Cluster,TransactionalId
>   at 
> kafka.security.auth.ResourceType$$anonfun$fromString$1.apply(ResourceType.scala:56)
>   at 
> kafka.security.auth.ResourceType$$anonfun$fromString$1.apply(ResourceType.scala:56)
>   at scala.Option.getOrElse(Option.scala:121)
>   at kafka.security.auth.ResourceType$.fromString(ResourceType.scala:56)
>   at 
> kafka.security.auth.SimpleAclAuthorizer$$anonfun$loadCache$1$$anonfun$apply$mcV$sp$1.apply(SimpleAclAuthorizer.scala:233)
>   at 
> kafka.security.auth.SimpleAclAuthorizer$$anonfun$loadCache$1$$anonfun$apply$mcV$sp$1.apply(SimpleAclAuthorizer.scala:232)
>   at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>   at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>   at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>   at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>   at 
> kafka.security.auth.SimpleAclAuthorizer$$anonfun$loadCache$1.apply$mcV$sp(SimpleAclAuthorizer.scala:232)
>   at 
> kafka.security.auth.SimpleAclAuthorizer$$anonfun$loadCache$1.apply(SimpleAclAuthorizer.scala:230)
>   at 
> kafka.security.auth.SimpleAclAuthorizer$$anonfun$loadCache$1.apply(SimpleAclAuthorizer.scala:230)
>   at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:216)
>   at kafka.utils.CoreUtils$.inWriteLock(CoreUtils.scala:224)
>   at 
> kafka.security.auth.SimpleAclAuthorizer.loadCache(SimpleAclAuthorizer.scala:230)
>   at 
> kafka.security.auth.SimpleAclAuthorizer.configure(SimpleAclAuthorizer.scala:114)
>   at kafka.admin.AclCommand$.withAuthorizer(AclCommand.scala:83)
>   at kafka.admin.AclCommand$.addAcl(AclCommand.scala:93)
>   at kafka.admin.AclCommand$.main(AclCommand.scala:53)
>   at kafka.admin.AclCommand.main(AclCommand.scala)
> {code}
>  
>  This is because it tries to get all the resource types registered from ZK 
> path and it throws error when `DelegationToken` resource is not defined in 
> `ResourceType` of client's Kafka version(which is earlier than 1.1.x)
>   



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7227) Kafka cluster should has a default rack id if not set

2018-08-01 Thread Genmao Yu (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7227?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Genmao Yu updated KAFKA-7227:
-
Description: 
In current behavior, if partial of broker machines have "server.rack",  we will 
failed to create topic with exception like "Not all brokers have rack 
information". There is no big problem in local cluster. But, this behavior may 
not be very suitable for cloud environment. In cloud environment, there may be 
different types of machine, some has rack info, while others do not has rack 
info. It could happen in our kafka cluster for many reasons, like understock of 
specific machine.

So, We can consider to give each broker a default rack id. 
 * if not set any rack info, it is the same with previous behavior
 * if partial of brokers are set with rack info, we can think of the remaining 
brokers as same rack.

As far as know, if not set rack id, DataNode/NodeManager has a default rack id 
in Hadoop. 

 

 

  was:
In current behavior, if partial of broker machines have "server.rack",  we will 
failed to create topic with exception like "Not all brokers have rack 
information". There is no big problem in local cluster. But, this behavior may 
not be very suitable for cloud environment. In cloud environment, there may be 
different types of machine, some has rack info, while others do not has rack 
info. It could happen for many reasons, like understock of specific machine. We 
can give each broker a default rack id. 
 * if not set any rack info, it is the same with previous behavior
 * if partial of brokers are set with rack info, we can think of the remaining 
brokers as same rack.

As far as know, if not set rack id, DataNode/NodeManager has a default rack id 
in Hadoop. 

 

 


> Kafka cluster should has a default rack id if not set
> -
>
> Key: KAFKA-7227
> URL: https://issues.apache.org/jira/browse/KAFKA-7227
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 1.0.0, 2.0.0
>Reporter: Genmao Yu
>Priority: Major
>
> In current behavior, if partial of broker machines have "server.rack",  we 
> will failed to create topic with exception like "Not all brokers have rack 
> information". There is no big problem in local cluster. But, this behavior 
> may not be very suitable for cloud environment. In cloud environment, there 
> may be different types of machine, some has rack info, while others do not 
> has rack info. It could happen in our kafka cluster for many reasons, like 
> understock of specific machine.
> So, We can consider to give each broker a default rack id. 
>  * if not set any rack info, it is the same with previous behavior
>  * if partial of brokers are set with rack info, we can think of the 
> remaining brokers as same rack.
> As far as know, if not set rack id, DataNode/NodeManager has a default rack 
> id in Hadoop. 
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7216) Exception while running kafka-acls.sh from 1.0 env on target Kafka env with 1.1.1

2018-08-01 Thread Satish Duggana (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7216?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Satish Duggana updated KAFKA-7216:
--
Component/s: admin

> Exception while running kafka-acls.sh from 1.0 env on target Kafka env with 
> 1.1.1
> -
>
> Key: KAFKA-7216
> URL: https://issues.apache.org/jira/browse/KAFKA-7216
> Project: Kafka
>  Issue Type: Bug
>  Components: admin
>Affects Versions: 1.1.0, 1.1.1, 2.0.0
>Reporter: Satish Duggana
>Priority: Major
>
> When `kafka-acls.sh` with SimpleAclAuthorizer on target Kafka cluster with 
> 1.1.1 version, it throws the below error.
> {code:java}
> kafka.common.KafkaException: DelegationToken not a valid resourceType name. 
> The valid names are Topic,Group,Cluster,TransactionalId
>   at 
> kafka.security.auth.ResourceType$$anonfun$fromString$1.apply(ResourceType.scala:56)
>   at 
> kafka.security.auth.ResourceType$$anonfun$fromString$1.apply(ResourceType.scala:56)
>   at scala.Option.getOrElse(Option.scala:121)
>   at kafka.security.auth.ResourceType$.fromString(ResourceType.scala:56)
>   at 
> kafka.security.auth.SimpleAclAuthorizer$$anonfun$loadCache$1$$anonfun$apply$mcV$sp$1.apply(SimpleAclAuthorizer.scala:233)
>   at 
> kafka.security.auth.SimpleAclAuthorizer$$anonfun$loadCache$1$$anonfun$apply$mcV$sp$1.apply(SimpleAclAuthorizer.scala:232)
>   at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>   at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>   at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>   at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>   at 
> kafka.security.auth.SimpleAclAuthorizer$$anonfun$loadCache$1.apply$mcV$sp(SimpleAclAuthorizer.scala:232)
>   at 
> kafka.security.auth.SimpleAclAuthorizer$$anonfun$loadCache$1.apply(SimpleAclAuthorizer.scala:230)
>   at 
> kafka.security.auth.SimpleAclAuthorizer$$anonfun$loadCache$1.apply(SimpleAclAuthorizer.scala:230)
>   at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:216)
>   at kafka.utils.CoreUtils$.inWriteLock(CoreUtils.scala:224)
>   at 
> kafka.security.auth.SimpleAclAuthorizer.loadCache(SimpleAclAuthorizer.scala:230)
>   at 
> kafka.security.auth.SimpleAclAuthorizer.configure(SimpleAclAuthorizer.scala:114)
>   at kafka.admin.AclCommand$.withAuthorizer(AclCommand.scala:83)
>   at kafka.admin.AclCommand$.addAcl(AclCommand.scala:93)
>   at kafka.admin.AclCommand$.main(AclCommand.scala:53)
>   at kafka.admin.AclCommand.main(AclCommand.scala)
> {code}
>  
>  This is because it tries to get all the resource types registered from ZK 
> path and it throws error when `DelegationToken` resource is not defined in 
> `ResourceType` of client's Kafka version(which is earlier than 1.1.x)
>   



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7227) Kafka cluster should has a default rack id if not set

2018-08-01 Thread Genmao Yu (JIRA)
Genmao Yu created KAFKA-7227:


 Summary: Kafka cluster should has a default rack id if not set
 Key: KAFKA-7227
 URL: https://issues.apache.org/jira/browse/KAFKA-7227
 Project: Kafka
  Issue Type: Improvement
  Components: core
Affects Versions: 2.0.0, 1.0.0
Reporter: Genmao Yu


In current behavior, if partial of broker machines have "server.rack",  we will 
failed to create topic with exception like "Not all brokers have rack 
information". There is no big problem in local cluster. But, this behavior may 
not be very suitable for cloud environment. In cloud environment, there may be 
different types of machine, some has rack info, while others do not has rack 
info. It could happen for many reasons, like understock of specific machine. We 
can give each broker a default rack id. 
 * if not set any rack info, it is the same with previous behavior
 * if partial of brokers are set with rack info, we can think of the remaining 
brokers as same rack.

As far as know, if not set rack id, DataNode/NodeManager has a default rack id 
in Hadoop. 

 

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7226) kafka-console-consumer.sh doesn't use security.protocol provided in config file

2018-08-01 Thread Alexandre GRIFFAUT (JIRA)
Alexandre GRIFFAUT created KAFKA-7226:
-

 Summary: kafka-console-consumer.sh doesn't use security.protocol 
provided in config file
 Key: KAFKA-7226
 URL: https://issues.apache.org/jira/browse/KAFKA-7226
 Project: Kafka
  Issue Type: Bug
  Components: producer 
Affects Versions: 1.0.0
Reporter: Alexandre GRIFFAUT


It is confusing that kafka-console-consumer.sh doesn't read security.protocol 
when provided in config file with --producer.config, wheras 
kafka-console-consumer.sh does...

 

With debug activated:

$ /usr/hdp/2.6.5.0-292/kafka/bin/kafka-console-producer.sh --broker-list 
$(hostname):6668 --topic test --producer.config 
/etc/kafka/ssl/kafka.client.properties

[2018-08-01 14:17:18,505] INFO ProducerConfig values:
...

    security.protocol = PLAINTEXT
...

> abc

..

java.io.EOFException
    at 
org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:147)
    at 
org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:93)
    at 
org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:235)
    at 
org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:196)
    at 
org.apache.kafka.common.network.Selector.attemptRead(Selector.java:538)
    at 
org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:482)
    at org.apache.kafka.common.network.Selector.poll(Selector.java:412)
    at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:460)
    at 
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239)
    at 
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:163)
    at java.lang.Thread.run(Thread.java:745)

...

 

The only way produce with SSL, is to use --security-protocol SSL

kafka-console-consumer.sh correctly read security.protocol from config file



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-5928) Avoid redundant requests to zookeeper when reassign topic partition

2018-08-01 Thread Genmao Yu (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-5928?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Genmao Yu updated KAFKA-5928:
-
Affects Version/s: 2.0.0

> Avoid redundant requests to zookeeper when reassign topic partition
> ---
>
> Key: KAFKA-5928
> URL: https://issues.apache.org/jira/browse/KAFKA-5928
> Project: Kafka
>  Issue Type: Improvement
>  Components: admin
>Affects Versions: 0.10.2.1, 0.11.0.0, 1.0.0, 2.0.0
>Reporter: Genmao Yu
>Priority: Major
>
> We mistakenly request topic level information according to partitions config 
> in the assignment json file. For example 
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala#L550:
>  {code}
> val validPartitions = proposedPartitionAssignment.filter { case (p, _) => 
> validatePartition(zkUtils, p.topic, p.partition) } 
> {code} 
> If reassign 1000 partitions (in 10 topics), we need to request zookeeper 1000 
> times here. But actually we only need to request just 10 (topics) times. We 
> test a large-scale assignment, about 10K partitions. It takes tens of 
> minutes. After optimization, it will reduce to less than 1minute.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-5928) Avoid redundant requests to zookeeper when reassign topic partition

2018-08-01 Thread Genmao Yu (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-5928?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Genmao Yu updated KAFKA-5928:
-
Affects Version/s: 1.0.0

> Avoid redundant requests to zookeeper when reassign topic partition
> ---
>
> Key: KAFKA-5928
> URL: https://issues.apache.org/jira/browse/KAFKA-5928
> Project: Kafka
>  Issue Type: Improvement
>  Components: admin
>Affects Versions: 0.10.2.1, 0.11.0.0, 1.0.0, 2.0.0
>Reporter: Genmao Yu
>Priority: Major
>
> We mistakenly request topic level information according to partitions config 
> in the assignment json file. For example 
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala#L550:
>  {code}
> val validPartitions = proposedPartitionAssignment.filter { case (p, _) => 
> validatePartition(zkUtils, p.topic, p.partition) } 
> {code} 
> If reassign 1000 partitions (in 10 topics), we need to request zookeeper 1000 
> times here. But actually we only need to request just 10 (topics) times. We 
> test a large-scale assignment, about 10K partitions. It takes tens of 
> minutes. After optimization, it will reduce to less than 1minute.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7225) Kafka Connect ConfigProvider not invoked before validation

2018-08-01 Thread Nacho Munoz (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7225?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nacho Munoz updated KAFKA-7225:
---
Description: 
When trying to register a JDBC connector with externalised secrets (e.g. 
connection.password) the validation fails and the endpoint returns a 500. I 
think that the problem is that the config transformer is not being invoked 
before the validation so trying to exercise the credentials against the 
database fails. I have checked that publishing the connector configuration 
directly to the connect-config topic to skip the validation and restarting the 
server is enough to get the connector working so that confirms that we are just 
missing to call config transformer before validating the connector. Please let 
me know if you need further information.

I'm happy to open a PR to address this issue given that I think that this is 
easy enough to fix for a new contributor to the project. So please feel free to 
assign the resolution of the bug to me.

  was:
When trying to register a JDBC connector with externalised secrets (e.g. 
connection.password) the validation fails and the endpoint returns a 500. I 
think that the problem is that the config transformer is not being invoked 
before the validation so trying to exercise the credentials against the 
database fails. I have checked that publishing the connector configuration 
directly to the connect-config topic to skip the validation and restarting the 
server is enough to get the connector working so that confirms that we are just 
missing to call config transformer before validating the connector. Please let 
me know if you need further information.

I'm happy to open a PR to address this issue given that I think that this is 
easy enough to fix for a new contributor to the project. So please feel free to 
assign me the resolution of the bug.


> Kafka Connect ConfigProvider not invoked before validation
> --
>
> Key: KAFKA-7225
> URL: https://issues.apache.org/jira/browse/KAFKA-7225
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.0.0
>Reporter: Nacho Munoz
>Priority: Minor
>
> When trying to register a JDBC connector with externalised secrets (e.g. 
> connection.password) the validation fails and the endpoint returns a 500. I 
> think that the problem is that the config transformer is not being invoked 
> before the validation so trying to exercise the credentials against the 
> database fails. I have checked that publishing the connector configuration 
> directly to the connect-config topic to skip the validation and restarting 
> the server is enough to get the connector working so that confirms that we 
> are just missing to call config transformer before validating the connector. 
> Please let me know if you need further information.
> I'm happy to open a PR to address this issue given that I think that this is 
> easy enough to fix for a new contributor to the project. So please feel free 
> to assign the resolution of the bug to me.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7225) Kafka Connect ConfigProvider not invoked before validation

2018-08-01 Thread Nacho Munoz (JIRA)
Nacho Munoz created KAFKA-7225:
--

 Summary: Kafka Connect ConfigProvider not invoked before validation
 Key: KAFKA-7225
 URL: https://issues.apache.org/jira/browse/KAFKA-7225
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Affects Versions: 2.0.0
Reporter: Nacho Munoz


When trying to register a JDBC connector with externalised secrets (e.g. 
connection.password) the validation fails and the endpoint returns a 500. I 
think that the problem is that the config transformer is not being invoked 
before the validation so trying to exercise the credentials against the 
database fails. I have checked that publishing the connector configuration 
directly to the connect-config topic to skip the validation and restarting the 
server is enough to get the connector working so that confirms that we are just 
missing to call config transformer before validating the connector. Please let 
me know if you need further information.

I'm happy to open a PR to address this issue given that I think that this is 
easy enough to fix for a new contributor to the project. So please feel free to 
assign me the resolution of the bug.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6188) Broker fails with FATAL Shutdown - log dirs have failed

2018-08-01 Thread Peng Peng (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6188?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16565055#comment-16565055
 ] 

Peng Peng commented on KAFKA-6188:
--

[~ijuma] I also met this issue when restart Kafka Server, Is there any way to 
workaround this issue ? 

> Broker fails with FATAL Shutdown - log dirs have failed
> ---
>
> Key: KAFKA-6188
> URL: https://issues.apache.org/jira/browse/KAFKA-6188
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, log
>Affects Versions: 1.0.0, 1.0.1
> Environment: Windows 10
>Reporter: Valentina Baljak
>Priority: Blocker
>  Labels: windows
> Attachments: Segments are opened before deletion, 
> kafka_2.10-0.10.2.1.zip, output.txt
>
>
> Just started with version 1.0.0 after a 4-5 months of using 0.10.2.1. The 
> test environment is very simple, with only one producer and one consumer. 
> Initially, everything started fine, stand alone tests worked as expected. 
> However, running my code, Kafka clients fail after approximately 10 minutes. 
> Kafka won't start after that and it fails with the same error. 
> Deleting logs helps to start again, and the same problem occurs.
> Here is the error traceback:
> [2017-11-08 08:21:57,532] INFO Starting log cleanup with a period of 30 
> ms. (kafka.log.LogManager)
> [2017-11-08 08:21:57,548] INFO Starting log flusher with a default period of 
> 9223372036854775807 ms. (kafka.log.LogManager)
> [2017-11-08 08:21:57,798] INFO Awaiting socket connections on 0.0.0.0:9092. 
> (kafka.network.Acceptor)
> [2017-11-08 08:21:57,813] INFO [SocketServer brokerId=0] Started 1 acceptor 
> threads (kafka.network.SocketServer)
> [2017-11-08 08:21:57,829] INFO [ExpirationReaper-0-Produce]: Starting 
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
> [2017-11-08 08:21:57,845] INFO [ExpirationReaper-0-DeleteRecords]: Starting 
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
> [2017-11-08 08:21:57,845] INFO [ExpirationReaper-0-Fetch]: Starting 
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
> [2017-11-08 08:21:57,845] INFO [LogDirFailureHandler]: Starting 
> (kafka.server.ReplicaManager$LogDirFailureHandler)
> [2017-11-08 08:21:57,860] INFO [ReplicaManager broker=0] Stopping serving 
> replicas in dir C:\Kafka\kafka_2.12-1.0.0\kafka-logs 
> (kafka.server.ReplicaManager)
> [2017-11-08 08:21:57,860] INFO [ReplicaManager broker=0] Partitions  are 
> offline due to failure on log directory C:\Kafka\kafka_2.12-1.0.0\kafka-logs 
> (kafka.server.ReplicaManager)
> [2017-11-08 08:21:57,860] INFO [ReplicaFetcherManager on broker 0] Removed 
> fetcher for partitions  (kafka.server.ReplicaFetcherManager)
> [2017-11-08 08:21:57,892] INFO [ReplicaManager broker=0] Broker 0 stopped 
> fetcher for partitions  because they are in the failed log dir 
> C:\Kafka\kafka_2.12-1.0.0\kafka-logs (kafka.server.ReplicaManager)
> [2017-11-08 08:21:57,892] INFO Stopping serving logs in dir 
> C:\Kafka\kafka_2.12-1.0.0\kafka-logs (kafka.log.LogManager)
> [2017-11-08 08:21:57,892] FATAL Shutdown broker because all log dirs in 
> C:\Kafka\kafka_2.12-1.0.0\kafka-logs have failed (kafka.log.LogManager)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7165) Error while creating ephemeral at /brokers/ids/BROKER_ID

2018-08-01 Thread Jonathan Santilli (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7165?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16564932#comment-16564932
 ] 

Jonathan Santilli commented on KAFKA-7165:
--

Sure [~omkreddy] , it makes sense, some misconfiguration could lead having two 
or more brokers with the same id (not ideal situation).

I will wait for [~cthunes] opinion and considerations.

 

Cheers!

> Error while creating ephemeral at /brokers/ids/BROKER_ID
> 
>
> Key: KAFKA-7165
> URL: https://issues.apache.org/jira/browse/KAFKA-7165
> Project: Kafka
>  Issue Type: Bug
>  Components: core, zkclient
>Affects Versions: 1.1.0
>Reporter: Jonathan Santilli
>Priority: Major
>
> Kafka version: 1.1.0
> Zookeeper version: 3.4.12
> 4 Kafka Brokers
> 4 Zookeeper servers
>  
> In one of the 4 brokers of the cluster, we detect the following error:
> [2018-07-14 04:38:23,784] INFO Unable to read additional data from server 
> sessionid 0x3000c2420cb458d, likely server has closed socket, closing socket 
> connection and attempting reconnect (org.apache.zookeeper.ClientCnxn)
>  [2018-07-14 04:38:24,509] INFO Opening socket connection to server 
> *ZOOKEEPER_SERVER_1:PORT*. Will not attempt to authenticate using SASL 
> (unknown error) (org.apache.zookeeper.ClientCnxn)
>  [2018-07-14 04:38:24,510] INFO Socket connection established to 
> *ZOOKEEPER_SERVER_1:PORT*, initiating session 
> (org.apache.zookeeper.ClientCnxn)
>  [2018-07-14 04:38:24,513] INFO Unable to read additional data from server 
> sessionid 0x3000c2420cb458d, likely server has closed socket, closing socket 
> connection and attempting reconnect (org.apache.zookeeper.ClientCnxn)
>  [2018-07-14 04:38:25,287] INFO Opening socket connection to server 
> *ZOOKEEPER_SERVER_2:PORT*. Will not attempt to authenticate using SASL 
> (unknown error) (org.apache.zookeeper.ClientCnxn)
>  [2018-07-14 04:38:25,287] INFO Socket connection established to 
> *ZOOKEEPER_SERVER_2:PORT*, initiating session 
> (org.apache.zookeeper.ClientCnxn)
>  [2018-07-14 04:38:25,954] INFO [Partition TOPIC_NAME-PARTITION-# broker=1|#* 
> broker=1] Shrinking ISR from 1,3,4,2 to 1,4,2 (kafka.cluster.Partition)
>  [2018-07-14 04:38:26,444] WARN Unable to reconnect to ZooKeeper service, 
> session 0x3000c2420cb458d has expired (org.apache.zookeeper.ClientCnxn)
>  [2018-07-14 04:38:26,444] INFO Unable to reconnect to ZooKeeper service, 
> session 0x3000c2420cb458d has expired, closing socket connection 
> (org.apache.zookeeper.ClientCnxn)
>  [2018-07-14 04:38:26,445] INFO EventThread shut down for session: 
> 0x3000c2420cb458d (org.apache.zookeeper.ClientCnxn)
>  [2018-07-14 04:38:26,446] INFO [ZooKeeperClient] Session expired. 
> (kafka.zookeeper.ZooKeeperClient)
>  [2018-07-14 04:38:26,459] INFO [ZooKeeperClient] Initializing a new session 
> to 
> *ZOOKEEPER_SERVER_1:PORT*,*ZOOKEEPER_SERVER_2:PORT*,*ZOOKEEPER_SERVER_3:PORT*,*ZOOKEEPER_SERVER_4:PORT*.
>  (kafka.zookeeper.ZooKeeperClient)
>  [2018-07-14 04:38:26,459] INFO Initiating client connection, 
> connectString=*ZOOKEEPER_SERVER_1:PORT*,*ZOOKEEPER_SERVER_2:PORT*,*ZOOKEEPER_SERVER_3:PORT*,*ZOOKEEPER_SERVER_4:PORT*
>  sessionTimeout=6000 
> watcher=kafka.zookeeper.ZooKeeperClient$ZooKeeperClientWatcher$@44821a96 
> (org.apache.zookeeper.ZooKeeper)
>  [2018-07-14 04:38:26,465] INFO Opening socket connection to server 
> *ZOOKEEPER_SERVER_1:PORT*. Will not attempt to authenticate using SASL 
> (unknown error) (org.apache.zookeeper.ClientCnxn)
>  [2018-07-14 04:38:26,477] INFO Socket connection established to 
> *ZOOKEEPER_SERVER_1:PORT*, initiating session 
> (org.apache.zookeeper.ClientCnxn)
>  [2018-07-14 04:38:26,484] INFO Session establishment complete on server 
> *ZOOKEEPER_SERVER_1:PORT*, sessionid = 0x4005b59eb6a, negotiated timeout 
> = 6000 (org.apache.zookeeper.ClientCnxn)
>  [2018-07-14 04:38:26,496] *INFO Creating /brokers/ids/1* (is it secure? 
> false) (kafka.zk.KafkaZkClient)
>  [2018-07-14 04:38:26,500] INFO Processing notification(s) to /config/changes 
> (kafka.common.ZkNodeChangeNotificationListener)
>  *[2018-07-14 04:38:26,547] ERROR Error while creating ephemeral at 
> /brokers/ids/1, node already exists and owner '216186131422332301' does not 
> match current session '288330817911521280' 
> (kafka.zk.KafkaZkClient$CheckedEphemeral)*
>  [2018-07-14 04:38:26,547] *INFO Result of znode creation at /brokers/ids/1 
> is: NODEEXISTS* (kafka.zk.KafkaZkClient)
>  [2018-07-14 04:38:26,559] ERROR Uncaught exception in scheduled task 
> 'isr-expiration' (kafka.utils.KafkaScheduler)
> org.apache.zookeeper.KeeperException$SessionExpiredException: KeeperErrorCode 
> = Session expired for 
> /brokers/topics/*TOPIC_NAME*/partitions/*PARTITION-#*/state
>  at org.apache.zookeeper.KeeperException.create(KeeperException.java:127

[jira] [Commented] (KAFKA-3821) Allow Kafka Connect source tasks to produce offset without writing to topics

2018-08-01 Thread Gunnar Morling (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-3821?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16564900#comment-16564900
 ] 

Gunnar Morling commented on KAFKA-3821:
---

One other alternative came to my mind which avoids modelling 
{{OffsetSourceRecord}} as a sub-class of {{SourceRecord}}. There could be this 
new method:

{code}
interface SourceTask {
default OffsetPosition getOffset() {
   return null;
}
}
{code}

This method would be called by Kafka Connect always after calling {{poll()}}. 
{{OffsetPosition}} would be a container containing {{Map 
sourcePartition}} and {{Map sourceOffset}}. The default 
implementation would be a default method returning null, i.e. the change would 
be backwards-compatible.

If a connector implements the new method, it can return its current source 
offset, without emitting another actual source record (by returning an empty 
list from {{poll()}}). This would address the two use cases we have in Debezium 
for this:

* Emit an offset indicating that an initial DB snapshot has been completed 
after the last snapshot record has been emitted
* Regularly emit the processed offsets from the source DB (e.g. MySQL binlog 
position) also if we don't emit any actual source records for a longer period 
of time. Currently it can happen due to filter configuration (the user is only 
interested in capturing some of the tables from their source DB) that we 
process the DB logs for a long time without a way communicate the processed 
offsets to Kafka Connect. This will cause large parts of the log to be 
reprocessed after a connector restart and also causes larger parts of the logs 
than needed to be retained in the source DB).

Would that be an acceptable way forward? I've come to think that modelling 
{{OffsetSourceRecord}} just isn't right; it feels a bit like Java's {{Stack}} 
class which extends {{List}} and that way exposes lots of methods which 
shouldn't exist in its API.

> Allow Kafka Connect source tasks to produce offset without writing to topics
> 
>
> Key: KAFKA-3821
> URL: https://issues.apache.org/jira/browse/KAFKA-3821
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 0.9.0.1
>Reporter: Randall Hauch
>Priority: Major
>  Labels: needs-kip
> Fix For: 2.1.0
>
>
> Provide a way for a {{SourceTask}} implementation to record a new offset for 
> a given partition without necessarily writing a source record to a topic.
> Consider a connector task that uses the same offset when producing an unknown 
> number of {{SourceRecord}} objects (e.g., it is taking a snapshot of a 
> database). Once the task completes those records, the connector wants to 
> update the offsets (e.g., the snapshot is complete) but has no more records 
> to be written to a topic. With this change, the task could simply supply an 
> updated offset.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6963) KIP-310: Add a Kafka Source Connector to Kafka Connect

2018-08-01 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6963?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16564863#comment-16564863
 ] 

ASF GitHub Bot commented on KAFKA-6963:
---

rhysmccaig opened a new pull request #5438: KAFKA-6963: KIP-310: Add a Kafka 
Source Connector to Kafka Connect
URL: https://github.com/apache/kafka/pull/5438
 
 
   Based on earlier work over at 
https://github.com/Comcast/MirrorTool-for-Kafka-Connect
   
   Testing Done:
   - Added unit tests for source connector/task
   - Tested locally on docker stack using various configuration options, task 
workers and topic characteristics


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> KIP-310: Add a Kafka Source Connector to Kafka Connect
> --
>
> Key: KAFKA-6963
> URL: https://issues.apache.org/jira/browse/KAFKA-6963
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Rhys Anthony McCaig
>Priority: Major
>
> This proposal introduces a new Kafka Connect Source Connector.
> See the KIP at 
> [KIP-310|https://cwiki.apache.org/confluence/display/KAFKA/KIP-310%3A+Add+a+Kafka+Source+Connector+to+Kafka+Connect]
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)