[jira] [Commented] (KAFKA-6572) kafka-consumer-groups does not reset offsets to specified datetime correctly

2018-02-22 Thread huxihx (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6572?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16374023#comment-16374023
 ] 

huxihx commented on KAFKA-6572:
---

[~slucas] Could it be possible that some records in those segments have no 
timestamps or you ever ran kafka-delete-records.sh before?

> kafka-consumer-groups does not reset offsets to specified datetime correctly
> 
>
> Key: KAFKA-6572
> URL: https://issues.apache.org/jira/browse/KAFKA-6572
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 1.0.0
>Reporter: Sharon Lucas
>Priority: Major
>
> With Kafka 1.0.0 we are seeing a problem using the kafka-consumer-groups.sh 
> --reset-offsets option to reset offsets to a specific date/time in our 
> production environment.
> We first tried to use the kafka-consumer-groups.sh command with the 
> --reset-offsets option and with option --to-datetime 2018-02-10T00:00:00.000 
> in our staging environment and it worked correctly.  Running the following 
> command changed it to start processing logs from February 12, 2018 (4 days 
> ago) for a topic that had a large lag.  We did a dry run to verify before 
> running with the --execute option.
> {code:java}
> root@mlpstagemon0101a:/# /opt/kafka/bin/kafka-consumer-groups.sh 
> --bootstrap-server NN.NNN.NN.NN:9092 --group logstash-elasticsearch-latest 
> --to-datetime 2018-02-12T00:00:00.000-06:00 --reset-offsets --topic 
> staging-mon01-rg-elasticsearch --execute{code}
> We stopped the kafka mirrors that process this topic before resetting the 
> offsets and started the kafka mirrors after rsetting the offsets.  We 
> verified that it correctly started processing logs from February 12, 2018.
> Then we tried resetting offsets in a production environment for a topic that 
> had a very large lag using option --to-datetime 2018-02-10T00:00:00.000 and 
> it did not work as expected. We stopped the kafka mirrors that process this 
> topic before resetting the offsets and did a dry run to see what the new 
> offsets would be:
> {code:java}
> root@mlplon0401e:# /opt/kafka/bin/kafka-consumer-groups.sh --bootstrap-server 
> NN.N.NNN.NNN:9092 --group mirror-consumer-ams03-geo-earliest --to-datetime 
> 2018-02-10T00:00:00.000 --reset-offsets --topic prod_in-ams03-geo-ca_access
> Note: This will not show information about old Zookeeper-based consumers.
> ^@^@^@^@
> TOPIC  PARTITION  NEW-OFFSET
> prod_in-ams03-geo-ca_access    52 52084147
> prod_in-ams03-geo-ca_access    106    52154199
> prod_in-ams03-geo-ca_access    75 52148673
> prod_in-ams03-geo-ca_access    61 52130753
> prod_in-ams03-geo-ca_access    49 52151667
> prod_in-ams03-geo-ca_access    48 52145233
> prod_in-ams03-geo-ca_access    27 52092805
> prod_in-ams03-geo-ca_access    26 52139644
> prod_in-ams03-geo-ca_access    65 52157504
> prod_in-ams03-geo-ca_access    105    52166289
> prod_in-ams03-geo-ca_access    38 52160464
> prod_in-ams03-geo-ca_access    22 52093451
> prod_in-ams03-geo-ca_access    4  52151660
> prod_in-ams03-geo-ca_access    90 52160296
> prod_in-ams03-geo-ca_access    25 52161691
> prod_in-ams03-geo-ca_access    13 52145828
> prod_in-ams03-geo-ca_access    56 52162867
> prod_in-ams03-geo-ca_access    42 52072094
> prod_in-ams03-geo-ca_access    7  52069496
> prod_in-ams03-geo-ca_access    117    52087078
> prod_in-ams03-geo-ca_access    32 52073732
> prod_in-ams03-geo-ca_access    102    52082022
> prod_in-ams03-geo-ca_access    76 52141018
> prod_in-ams03-geo-ca_access    83 52154542
> prod_in-ams03-geo-ca_access    72 52095051
> prod_in-ams03-geo-ca_access    85 52149907
> prod_in-ams03-geo-ca_access    119    52134435
> prod_in-ams03-geo-ca_access    113    52159340
> prod_in-ams03-geo-ca_access    55 52146597
> prod_in-ams03-geo-ca_access    18 52149079
> prod_in-ams03-geo-ca_access    35 52149058
> prod_in-ams03-geo-ca_access    99 52143277
> prod_in-ams03-geo-ca_access    41 52158872
> prod_in-ams03-geo-ca_access    112    52083901
> prod_in-ams03-geo-ca_access    34 52137932
> prod_in-ams03-geo-ca_access    89 52158135
> prod_in-ams03-geo-ca_access    40 5212
> prod_in-ams03-geo-ca_access    53 52138400
> prod_in-ams03-geo-ca_access    19 52144966
> prod_in-ams03-geo-ca_access    44 52166404
> prod_in-ams03-geo-ca_access    31 52155685
> prod_in-ams03-geo-ca_access    10 52152151
> prod_in-ams03-geo-ca_access    98 52145378
> prod_in-ams03-geo-ca_access    69 52153436
> prod_in-ams03-geo-ca_access    92

[jira] [Resolved] (KAFKA-6578) Connect distributed and standalone worker 'main()' methods should catch and log all exceptions

2018-02-22 Thread Jason Gustafson (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6578?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-6578.

   Resolution: Fixed
Fix Version/s: 1.1.0

> Connect distributed and standalone worker 'main()' methods should catch and 
> log all exceptions
> --
>
> Key: KAFKA-6578
> URL: https://issues.apache.org/jira/browse/KAFKA-6578
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 0.10.0.0
>Reporter: Randall Hauch
>Priority: Critical
> Fix For: 1.1.0
>
>
> Currently, the {{main}} methods in {{ConnectDistributed}} and 
> {{ConnectStandalone}} do not catch and log most of the potential exceptions. 
> That means that when such an exception does occur, Java does terminate the 
> process and report it to stderr, but does not log the exception in the log.
> We should add a try block around most of the existing code in the main method 
> to catch any Throwable exception, log it, and either rethrow it or explicitly 
> exit with a non-zero return code.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6578) Connect distributed and standalone worker 'main()' methods should catch and log all exceptions

2018-02-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6578?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16373981#comment-16373981
 ] 

ASF GitHub Bot commented on KAFKA-6578:
---

hachikuji closed pull request #4609: KAFKA-6578: Changed the Connect 
distributed and standalone main method to log all exceptions
URL: https://github.com/apache/kafka/pull/4609
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
index 3b7ec87f644..54854fe4b80 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
@@ -58,52 +58,59 @@ public static void main(String[] args) throws Exception {
 Exit.exit(1);
 }
 
-Time time = Time.SYSTEM;
-log.info("Kafka Connect distributed worker initializing ...");
-long initStart = time.hiResClockMs();
-WorkerInfo initInfo = new WorkerInfo();
-initInfo.logAll();
+try {
+Time time = Time.SYSTEM;
+log.info("Kafka Connect distributed worker initializing ...");
+long initStart = time.hiResClockMs();
+WorkerInfo initInfo = new WorkerInfo();
+initInfo.logAll();
 
-String workerPropsFile = args[0];
-Map workerProps = !workerPropsFile.isEmpty() ?
-Utils.propsToStringMap(Utils.loadProps(workerPropsFile)) : 
Collections.emptyMap();
+String workerPropsFile = args[0];
+Map workerProps = !workerPropsFile.isEmpty() ?
+Utils.propsToStringMap(Utils.loadProps(workerPropsFile)) : 
Collections.emptyMap();
 
-log.info("Scanning for plugin classes. This might take a moment ...");
-Plugins plugins = new Plugins(workerProps);
-plugins.compareAndSwapWithDelegatingLoader();
-DistributedConfig config = new DistributedConfig(workerProps);
+log.info("Scanning for plugin classes. This might take a moment 
...");
+Plugins plugins = new Plugins(workerProps);
+plugins.compareAndSwapWithDelegatingLoader();
+DistributedConfig config = new DistributedConfig(workerProps);
 
-String kafkaClusterId = ConnectUtils.lookupKafkaClusterId(config);
-log.debug("Kafka cluster ID: {}", kafkaClusterId);
+String kafkaClusterId = ConnectUtils.lookupKafkaClusterId(config);
+log.debug("Kafka cluster ID: {}", kafkaClusterId);
 
-RestServer rest = new RestServer(config);
-URI advertisedUrl = rest.advertisedUrl();
-String workerId = advertisedUrl.getHost() + ":" + 
advertisedUrl.getPort();
+RestServer rest = new RestServer(config);
+URI advertisedUrl = rest.advertisedUrl();
+String workerId = advertisedUrl.getHost() + ":" + 
advertisedUrl.getPort();
 
-KafkaOffsetBackingStore offsetBackingStore = new 
KafkaOffsetBackingStore();
-offsetBackingStore.configure(config);
+KafkaOffsetBackingStore offsetBackingStore = new 
KafkaOffsetBackingStore();
+offsetBackingStore.configure(config);
 
-Worker worker = new Worker(workerId, time, plugins, config, 
offsetBackingStore);
+Worker worker = new Worker(workerId, time, plugins, config, 
offsetBackingStore);
 
-Converter internalValueConverter = worker.getInternalValueConverter();
-StatusBackingStore statusBackingStore = new 
KafkaStatusBackingStore(time, internalValueConverter);
-statusBackingStore.configure(config);
+Converter internalValueConverter = 
worker.getInternalValueConverter();
+StatusBackingStore statusBackingStore = new 
KafkaStatusBackingStore(time, internalValueConverter);
+statusBackingStore.configure(config);
 
-ConfigBackingStore configBackingStore = new 
KafkaConfigBackingStore(internalValueConverter, config);
+ConfigBackingStore configBackingStore = new 
KafkaConfigBackingStore(internalValueConverter, config);
 
-DistributedHerder herder = new DistributedHerder(config, time, worker,
-kafkaClusterId, statusBackingStore, configBackingStore,
-advertisedUrl.toString());
-final Connect connect = new Connect(herder, rest);
-log.info("Kafka Connect distributed worker initialization took {}ms", 
time.hiResClockMs() - initStart);
-try {
-connect.start();
-} catch (Exception e) {
-log.err

[jira] [Commented] (KAFKA-6433) Connect distributed workers should fail if their config is "incompatible" with leader's

2018-02-22 Thread Ewen Cheslack-Postava (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6433?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16373961#comment-16373961
 ] 

Ewen Cheslack-Postava commented on KAFKA-6433:
--

This needs a lot of thought around upgrades, compatibility, and debuggability. 
There are all sorts of weird issues you can get into with something like this.

I agree that the general goal of checking that important configs are aligned is 
absolutely the right thing to do. Today, unless I'm forgetting something, we 
basically only check that the group and config offset match. Lots of other 
things could potentially mismatch and cause problems.

But things "matching" can be tricky. Topic names are pretty straightforward and 
we can validate easily. Validating anything like "the same set of connectors" 
is tricky given both versioning and upgrading a cluster with a *new* connector. 
Same for converters and transformations. We'd need to define clear rules for 
what "compatibility" means here and when a node is allowed to run a 
connector/task. And who is the source of truth? Who defines what's new?

Personally, I'd argue it's actually clearer to have a log message saying 
"couldn't start connector X because class not found" from node Y than have to 
determine why all connectors/tasks are running on node Z because node W wasn't 
allowed to join worker group N for some mismatch of connectors. It might fail 
faster, but it tells you exactly what the problem is and leads to a clear 
resolution.

 

 

> Connect distributed workers should fail if their config is "incompatible" 
> with leader's
> ---
>
> Key: KAFKA-6433
> URL: https://issues.apache.org/jira/browse/KAFKA-6433
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 1.0.0
>Reporter: Randall Hauch
>Priority: Major
>  Labels: needs-kip
>
> Currently, each distributed worker config must have the same `worker.id` and 
> must use the same internal topics for configs, offsets, and status. 
> Additionally, each worker must be configured to have the same connectors, 
> SMTs, and converters; confusing error messages will result when some workers 
> are able to deploy connector tasks with SMTs while others fail when they are 
> missing plugins the other workers do have.
> Ideally, a Connect workers would only be allowed to join the cluster if it 
> were "compatible" with the the existing cluster, where "compatible" perhaps 
> includes using the same internal topics and having the same set of plugins.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6585) Consolidate duplicated logic on reset tools

2018-02-22 Thread Aditya Vivek (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6585?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16373951#comment-16373951
 ] 

Aditya Vivek commented on KAFKA-6585:
-

Thanks Jason :)

> Consolidate duplicated logic on reset tools
> ---
>
> Key: KAFKA-6585
> URL: https://issues.apache.org/jira/browse/KAFKA-6585
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Guozhang Wang
>Assignee: Aditya Vivek
>Priority: Minor
>  Labels: newbie
>
> The consumer reset tool and streams reset tool today shares lot of common 
> logics such as resetting to a datetime etc. We can consolidate them into a 
> common class which directly depend on admin client at simply let these tools 
> to use the class.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-5740) Use separate file for HTTP logs

2018-02-22 Thread Ewen Cheslack-Postava (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5740?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16373949#comment-16373949
 ] 

Ewen Cheslack-Postava commented on KAFKA-5740:
--

Isn't this something that can be handled via log4j configs to separate the two 
types of log traffic? Is this about defaults or some other issue that can't be 
handled via log4j?

I'd make a strong *no* vote to removing the HTTP logs from the default Connect 
logs. They provide way too useful information when debugging issues. Just the 
HTTP status info often reveals something important (whether bad input 
triggering something in the 4xx range or something broken in Connect triggering 
the 5xx range).

> Use separate file for HTTP logs
> ---
>
> Key: KAFKA-5740
> URL: https://issues.apache.org/jira/browse/KAFKA-5740
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 0.11.0.0
>Reporter: Randall Hauch
>Priority: Critical
>  Labels: monitoring, usability
>
> Currently the HTTP logs are interspersed in the normal output. However, 
> client usage/requests should be logged to a separate file.
> Question: should the HTTP logs be *removed* from the normal Connect logs?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-5575) SchemaBuilder should have a method to clone an existing Schema.

2018-02-22 Thread Ewen Cheslack-Postava (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5575?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16373925#comment-16373925
 ] 

Ewen Cheslack-Postava commented on KAFKA-5575:
--

[~rhauch] I think I didn't trigger a vote on the KIP yet though. Really I just 
grab public API changes that someone's hesitating on KIPs for and write the KIP 
because I think the 15min it takes for the simple ones isn't a big deal and 
gives the community, and especially frequent, direct contributors, a chance to 
catch issues like incompatibilities. I think a few minutes investment + a 
couple of days turn-around time for a public API change in an API that's used 
very broadly is actually quite a low cost. In this case, there's really nothing 
I can see that would be a problem beyond what's already mentioned in the KIP, 
so I think we should just move forward w/ voting and execution.

> SchemaBuilder should have a method to clone an existing Schema.
> ---
>
> Key: KAFKA-5575
> URL: https://issues.apache.org/jira/browse/KAFKA-5575
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Jeremy Custenborder
>Assignee: Jeremy Custenborder
>Priority: Minor
>  Labels: needs-kip
>
> Now that Transformations have landed in Kafka Connect we should have an easy 
> way to do quick modifications to schemas. For example changing the name of a 
> schema shouldn't be much more than. I should be able to do more stuff like 
> this.
> {code:java}
> return SchemaBuilder.from(Schema.STRING_SCHEMA).name("MyNewName").build()
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Issue Comment Deleted] (KAFKA-6583) Metadata should include number of state stores for task

2018-02-22 Thread Richard Yu (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6583?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Richard Yu updated KAFKA-6583:
--
Comment: was deleted

(was: This issue will block KAFKA-4696 until resolved.)

> Metadata should include number of state stores for task
> ---
>
> Key: KAFKA-6583
> URL: https://issues.apache.org/jira/browse/KAFKA-6583
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Affects Versions: 0.10.2.0, 0.11.0.0
>Reporter: Richard Yu
>Priority: Critical
>  Labels: needs-kip
>
> Currently, in the need for clients to be more evenly balanced, stateful tasks 
> should be distributed in such a manner that it will be spread equally. 
> However, for such an awareness to be implemented during task assignment, it 
> would require the need for the present rebalance protocol metadata to also 
> contain the number of state stores in a particular task. This way, it will 
> allow us to "weight" tasks during assignment. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6551) Unbounded queues in WorkerSourceTask cause OutOfMemoryError

2018-02-22 Thread Ewen Cheslack-Postava (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6551?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16373906#comment-16373906
 ] 

Ewen Cheslack-Postava commented on KAFKA-6551:
--

Seems reasonable – this should only be an issue if producing to the topic is 
failing and we generate a large backlog, but very good point that this should 
be bounded, at least roughly, and pause poll()ing until it is resolved. A bit 
hard to say what the right metric for measurement is since this holds onto the 
entire record. Maybe # of records will work in practice just because you can 
set it to a reasonable default and never think about it again while still not 
hitting any OOMs. But any large messages could make that assumption fail.

> Unbounded queues in WorkerSourceTask cause OutOfMemoryError
> ---
>
> Key: KAFKA-6551
> URL: https://issues.apache.org/jira/browse/KAFKA-6551
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Gunnar Morling
>Priority: Major
>
> A Debezium user reported an {{OutOfMemoryError}} to us, with over 50,000 
> messages in the {{WorkerSourceTask#outstandingMessages}} map.
> This map is unbounded and I can't see any way of "rate limiting" which would 
> control how many records are added to it. Growth can only indirectly be 
> limited by reducing the offset flush interval, but as connectors can return 
> large amounts of messages in single {{poll()}} calls that's not sufficient in 
> all cases. Note the user reported this issue during snapshotting a database, 
> i.e. a high number of records arrived in a very short period of time.
> To solve the problem I'd suggest to make this map backpressure-aware and thus 
> prevent its indefinite growth, so that no further records will be polled from 
> the connector until messages have been taken out of the map again.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-5828) Allow explicitly setting polling interval for Kafka connectors

2018-02-22 Thread Ewen Cheslack-Postava (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5828?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16373773#comment-16373773
 ] 

Ewen Cheslack-Postava commented on KAFKA-5828:
--

I don't think this is best solved at the Connect level. Could you just use 
[quotas|http://kafka.apache.org/documentation.html#design_quotas] to rate limit 
the connector?

> Allow explicitly setting polling interval for Kafka connectors
> --
>
> Key: KAFKA-5828
> URL: https://issues.apache.org/jira/browse/KAFKA-5828
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Behrang Saeedzadeh
>Priority: Major
>
> I have a Kafka cluster deployed in our internal data center and a Kafka 
> Connect server deployed in AWS that gets data records from a topic on the 
> Kafka cluster and writes them into a Kinesis stream.
> We want to ensure that our Kafka connect server does not saturate the 
> available bandwidth between our internal data center and AWS.
> Using {{max.partition.fetch.bytes}} we can limit the upper bound of data that 
> can be fetched in each poll call. If we can also configure the poll interval, 
> then we can limit how much data is transferred per partition per second.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6583) Metadata should include number of state stores for task

2018-02-22 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16373759#comment-16373759
 ] 

Matthias J. Sax commented on KAFKA-6583:


Thanks for putting the comment. It's better to "Link" Jiras to each other 
directly. You can find "Link" with "More" button.

> Metadata should include number of state stores for task
> ---
>
> Key: KAFKA-6583
> URL: https://issues.apache.org/jira/browse/KAFKA-6583
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Affects Versions: 0.10.2.0, 0.11.0.0
>Reporter: Richard Yu
>Priority: Critical
>  Labels: needs-kip
>
> Currently, in the need for clients to be more evenly balanced, stateful tasks 
> should be distributed in such a manner that it will be spread equally. 
> However, for such an awareness to be implemented during task assignment, it 
> would require the need for the present rebalance protocol metadata to also 
> contain the number of state stores in a particular task. This way, it will 
> allow us to "weight" tasks during assignment. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6002) Kafka Connect Transform transforming JSON string into actual object

2018-02-22 Thread Ewen Cheslack-Postava (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6002?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16373755#comment-16373755
 ] 

Ewen Cheslack-Postava commented on KAFKA-6002:
--

[~edvard.poliakov] Not sure if you made progress on this. The schema support in 
JsonConverter doesn't use json-schema.org style, partly because it is quite 
complicated. To be honest, the inline schema support with JSON is really more 
for demonstrative purposes – baking in and supporting a ton of formats adds a 
lot of overhead to the project, so we wanted to stick to shipping just one 
format with the framework and leave the rest to be community supported. 
However, that meant we needed to include something that could have both schema 
and schemaless modes in order to demonstrate both modes and ensure everything 
works with both modes. We ended up doing this with JSON and an ad hoc schema 
format. But generally when using a schema, you want something that doesn't need 
to ship the full schema inline with the message because that's quite 
heavyweight – often times the schema ends up larger than the message data 
itself!

For a complete JSON w/ schemas solution, I would probably suggest implementing 
Converters that look a lot like what Confluent has for Avro and using 
json-schema.org to express the schemas. The one difference is that now that we 
have headers, I'd put the schema ID information into a header instead and make 
the value just the JSON payload (whereas Avro has some additional framing in 
the value itself).

For a transformation that does this you *could* just omit the schema entirely. 
That is an option in Connect. Basically this would just mean that the transform 
only works when the user/connectors expect schemaless data.

Regarding inference, you can also just do this on a per-message basis instead 
of continuously updating a schema. There is a risk that you end up with lots of 
schemas because of this (since each could be unique), but for a lot of cases 
that may not be expected. I also have an SMT that infers schemas, so does 
something similar to what you'd need here 
[https://github.com/ewencp/kafka/commit/3abb54a8062fe727ddaabc4dd5a552dd0b465a03]
 I didn't complete both modes, but the idea was to allow either inferring on a 
per-message basis *or* specifying a schema (whether the JsonConverter variant 
or json-schema.org style) and validating & add it to the record. I think 
offering those two options in your SMT would give good flexibility as well.

> Kafka Connect Transform transforming JSON string into actual object
> ---
>
> Key: KAFKA-6002
> URL: https://issues.apache.org/jira/browse/KAFKA-6002
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Edvard Poliakov
>Priority: Minor
>
> My colleague and I have been working on a new Transform, that takes a JSON 
> string and transforms it into an actual object, like this:
> {code} 
> {
>   "a" : "{\"b\": 23}"
> }
> {code}
> into
> {code}
> {
>   "a" : {
>"b" : 23
>   }
> }
> {code}
> There is no robust way of building a Schema from a JSON object itself, as it 
> can be something like an empty array or a null, that doesn't provide any info 
> on the schema of the object. So I see two options here.
> 1. For a transform to take in schema as a transform parameter. The problem I 
> found with this is that it is not clear what JSON schema specification should 
> be used for this? I assume it would be reasonable to use 
> http://json-schema.org/, but it doesn't seem that Kafka Connect supports it 
> currently, moreover reading through JsonConverter class in Kafka Connect, I 
> am not able to understand what spec does the Json Schema have that is used in 
> that class, for example {{asConnectSchema}} method on {{JsonConverter}}, [see 
> here|https://github.com/apache/kafka/blob/trunk/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java#L415].
> 2. On each object received, keep updating the schema, but I can't see a 
> standard and robust way of handling edge cases.
> I am happy to create a pull request for this transform, if we can agree on 
> something here. :)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6583) Metadata should include number of state stores for task

2018-02-22 Thread Richard Yu (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6583?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Richard Yu updated KAFKA-6583:
--
Issue Type: New Feature  (was: Improvement)

> Metadata should include number of state stores for task
> ---
>
> Key: KAFKA-6583
> URL: https://issues.apache.org/jira/browse/KAFKA-6583
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Affects Versions: 0.10.2.0, 0.11.0.0
>Reporter: Richard Yu
>Priority: Critical
>  Labels: needs-kip
>
> Currently, in the need for clients to be more evenly balanced, stateful tasks 
> should be distributed in such a manner that it will be spread equally. 
> However, for such an awareness to be implemented during task assignment, it 
> would require the need for the present rebalance protocol metadata to also 
> contain the number of state stores in a particular task. This way, it will 
> allow us to "weight" tasks during assignment. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-6583) Metadata should include number of state stores for task

2018-02-22 Thread Richard Yu (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16373124#comment-16373124
 ] 

Richard Yu edited comment on KAFKA-6583 at 2/23/18 12:52 AM:
-

This issue will block KAFKA-4696 until resolved.


was (Author: yohan123):
This issue will block https://issues.apache.org/jira/browse/KAFKA-4696 until 
resolved.

> Metadata should include number of state stores for task
> ---
>
> Key: KAFKA-6583
> URL: https://issues.apache.org/jira/browse/KAFKA-6583
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.2.0, 0.11.0.0
>Reporter: Richard Yu
>Priority: Critical
>  Labels: needs-kip
>
> Currently, in the need for clients to be more evenly balanced, stateful tasks 
> should be distributed in such a manner that it will be spread equally. 
> However, for such an awareness to be implemented during task assignment, it 
> would require the need for the present rebalance protocol metadata to also 
> contain the number of state stores in a particular task. This way, it will 
> allow us to "weight" tasks during assignment. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6583) Metadata should include number of state stores for task

2018-02-22 Thread Richard Yu (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6583?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Richard Yu updated KAFKA-6583:
--
Labels: needs-kip  (was: )

> Metadata should include number of state stores for task
> ---
>
> Key: KAFKA-6583
> URL: https://issues.apache.org/jira/browse/KAFKA-6583
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.2.0, 0.11.0.0
>Reporter: Richard Yu
>Priority: Critical
>  Labels: needs-kip
>
> Currently, in the need for clients to be more evenly balanced, stateful tasks 
> should be distributed in such a manner that it will be spread equally. 
> However, for such an awareness to be implemented during task assignment, it 
> would require the need for the present rebalance protocol metadata to also 
> contain the number of state stores in a particular task. This way, it will 
> allow us to "weight" tasks during assignment. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-4696) Streams standby task assignment should be state-store aware

2018-02-22 Thread Richard Yu (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4696?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Richard Yu reassigned KAFKA-4696:
-

Assignee: Richard Yu

> Streams standby task assignment should be state-store aware
> ---
>
> Key: KAFKA-4696
> URL: https://issues.apache.org/jira/browse/KAFKA-4696
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Affects Versions: 0.10.2.0, 0.11.0.0
>Reporter: Damian Guy
>Assignee: Richard Yu
>Priority: Major
>
> Task Assignment is currently not aware of which tasks have State Stores. This 
> can result in uneven balance of standby task assignment as all tasks are 
> assigned, but only those tasks with state-stores are ever created by 
> {{StreamThread}}. So what seems like an optimal strategy during assignment 
> time could be sub-optimal post-assignment.
> For example, lets say we have 4 tasks (2 with state-stores), 2 clients, 
> numStandbyReplicas = 1. Each client would get 2 active and 2 standby tasks.  
> One of the clients may end up with both state-store tasks, while the other 
> has none.
> Further to this, standby task configuration is currently "all or nothing". It 
> might make sense to allow more fine grained configuration, i.e., the ability 
> to specify the number of standby replicas individually for each stateful 
> operator.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-4696) Streams standby task assignment should be state-store aware

2018-02-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4696?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16373589#comment-16373589
 ] 

ASF GitHub Bot commented on KAFKA-4696:
---

ConcurrencyPractitioner opened a new pull request #4615: [KAFKA-4696] Streams 
standby task assignment should be state-store aware
URL: https://github.com/apache/kafka/pull/4615
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Streams standby task assignment should be state-store aware
> ---
>
> Key: KAFKA-4696
> URL: https://issues.apache.org/jira/browse/KAFKA-4696
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Affects Versions: 0.10.2.0, 0.11.0.0
>Reporter: Damian Guy
>Priority: Major
>
> Task Assignment is currently not aware of which tasks have State Stores. This 
> can result in uneven balance of standby task assignment as all tasks are 
> assigned, but only those tasks with state-stores are ever created by 
> {{StreamThread}}. So what seems like an optimal strategy during assignment 
> time could be sub-optimal post-assignment.
> For example, lets say we have 4 tasks (2 with state-stores), 2 clients, 
> numStandbyReplicas = 1. Each client would get 2 active and 2 standby tasks.  
> One of the clients may end up with both state-store tasks, while the other 
> has none.
> Further to this, standby task configuration is currently "all or nothing". It 
> might make sense to allow more fine grained configuration, i.e., the ability 
> to specify the number of standby replicas individually for each stateful 
> operator.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6585) Consolidate duplicated logic on reset tools

2018-02-22 Thread Jason Gustafson (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6585?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16373546#comment-16373546
 ] 

Jason Gustafson commented on KAFKA-6585:


[~adityavivek94] I went ahead and assigned to you. Of course Guozhang won't 
mind :).

> Consolidate duplicated logic on reset tools
> ---
>
> Key: KAFKA-6585
> URL: https://issues.apache.org/jira/browse/KAFKA-6585
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Guozhang Wang
>Assignee: Aditya Vivek
>Priority: Minor
>  Labels: newbie
>
> The consumer reset tool and streams reset tool today shares lot of common 
> logics such as resetting to a datetime etc. We can consolidate them into a 
> common class which directly depend on admin client at simply let these tools 
> to use the class.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-6585) Consolidate duplicated logic on reset tools

2018-02-22 Thread Jason Gustafson (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6585?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson reassigned KAFKA-6585:
--

Assignee: Aditya Vivek

> Consolidate duplicated logic on reset tools
> ---
>
> Key: KAFKA-6585
> URL: https://issues.apache.org/jira/browse/KAFKA-6585
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Guozhang Wang
>Assignee: Aditya Vivek
>Priority: Minor
>  Labels: newbie
>
> The consumer reset tool and streams reset tool today shares lot of common 
> logics such as resetting to a datetime etc. We can consolidate them into a 
> common class which directly depend on admin client at simply let these tools 
> to use the class.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-5157) Options for handling corrupt data during deserialization

2018-02-22 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5157?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16373432#comment-16373432
 ] 

Matthias J. Sax commented on KAFKA-5157:


It is by design. It's of course something different people have different 
opinions and both approaches has advantages/disadvantages. The design aligns 
with other components like serializers/deserializers though that use the same 
pattern.

> Options for handling corrupt data during deserialization
> 
>
> Key: KAFKA-5157
> URL: https://issues.apache.org/jira/browse/KAFKA-5157
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Eno Thereska
>Assignee: Eno Thereska
>Priority: Major
>  Labels: kip, user-experience
> Fix For: 1.0.0
>
>
> When there is a bad formatted data in the source topics, deserialization will 
> throw a runtime exception all the way to the users. And since deserialization 
> happens before it was ever processed at the beginning of the topology, today 
> there is no ways to handle such errors on the user-app level.
>  We should consider allowing users to handle such "poison pills" in a 
> customizable way.
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-161%3A+streams+deserialization+exception+handlers



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6585) Consolidate duplicated logic on reset tools

2018-02-22 Thread Aditya Vivek (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6585?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16373401#comment-16373401
 ] 

Aditya Vivek commented on KAFKA-6585:
-

[~guozhang] Can I pick this up?

> Consolidate duplicated logic on reset tools
> ---
>
> Key: KAFKA-6585
> URL: https://issues.apache.org/jira/browse/KAFKA-6585
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Guozhang Wang
>Priority: Minor
>  Labels: newbie
>
> The consumer reset tool and streams reset tool today shares lot of common 
> logics such as resetting to a datetime etc. We can consolidate them into a 
> common class which directly depend on admin client at simply let these tools 
> to use the class.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6586) Refactor Connect executables

2018-02-22 Thread Randall Hauch (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6586?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Randall Hauch updated KAFKA-6586:
-
Description: The main methods in {{ConnectDistributed}} and 
{{ConnectStandalone}} have a lot of duplication, and it'd be good to refactor 
to centralize the logic. We can pull most of this logic into an abstract class 
that {{ConnectStandalone}} and {{ConnectDistributed}} both extend. At a glance, 
the differences between the two are different config and Herder implementations 
and some different initialization logic.  (was: The main methods in 
{{ConnectDistributed}} and {{ConnectStandalone}} have a lot of duplication, and 
it'd be good to refactor to centralize the logic.)

> Refactor Connect executables
> 
>
> Key: KAFKA-6586
> URL: https://issues.apache.org/jira/browse/KAFKA-6586
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Randall Hauch
>Priority: Minor
>
> The main methods in {{ConnectDistributed}} and {{ConnectStandalone}} have a 
> lot of duplication, and it'd be good to refactor to centralize the logic. We 
> can pull most of this logic into an abstract class that {{ConnectStandalone}} 
> and {{ConnectDistributed}} both extend. At a glance, the differences between 
> the two are different config and Herder implementations and some different 
> initialization logic.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6586) Refactor Connect executables

2018-02-22 Thread Randall Hauch (JIRA)
Randall Hauch created KAFKA-6586:


 Summary: Refactor Connect executables
 Key: KAFKA-6586
 URL: https://issues.apache.org/jira/browse/KAFKA-6586
 Project: Kafka
  Issue Type: Improvement
  Components: KafkaConnect
Reporter: Randall Hauch


The main methods in {{ConnectDistributed}} and {{ConnectStandalone}} have a lot 
of duplication, and it'd be good to refactor to centralize the logic.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6585) Consolidate duplicated logic on reset tools

2018-02-22 Thread Jason Gustafson (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6585?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson updated KAFKA-6585:
---
Summary: Consolidate duplicated logic on reset tools  (was: Sonsolidate 
duplicated logic on reset tools)

> Consolidate duplicated logic on reset tools
> ---
>
> Key: KAFKA-6585
> URL: https://issues.apache.org/jira/browse/KAFKA-6585
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Guozhang Wang
>Priority: Minor
>  Labels: newbie
>
> The consumer reset tool and streams reset tool today shares lot of common 
> logics such as resetting to a datetime etc. We can consolidate them into a 
> common class which directly depend on admin client at simply let these tools 
> to use the class.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6585) Sonsolidate duplicated logic on reset tools

2018-02-22 Thread Guozhang Wang (JIRA)
Guozhang Wang created KAFKA-6585:


 Summary: Sonsolidate duplicated logic on reset tools
 Key: KAFKA-6585
 URL: https://issues.apache.org/jira/browse/KAFKA-6585
 Project: Kafka
  Issue Type: Improvement
Reporter: Guozhang Wang


The consumer reset tool and streams reset tool today shares lot of common 
logics such as resetting to a datetime etc. We can consolidate them into a 
common class which directly depend on admin client at simply let these tools to 
use the class.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-5157) Options for handling corrupt data during deserialization

2018-02-22 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5157?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-5157:
---
Description: 
When there is a bad formatted data in the source topics, deserialization will 
throw a runtime exception all the way to the users. And since deserialization 
happens before it was ever processed at the beginning of the topology, today 
there is no ways to handle such errors on the user-app level.
 We should consider allowing users to handle such "poison pills" in a 
customizable way.

https://cwiki.apache.org/confluence/display/KAFKA/KIP-161%3A+streams+deserialization+exception+handlers

  was:
When there is a bad formatted data in the source topics, deserialization will 
throw a runtime exception all the way to the users. And since deserialization 
happens before it was ever processed at the beginning of the topology, today 
there is no ways to handle such errors on the user-app level.
We should consider allowing users to handle such "poison pills" in a 
customizable way.


> Options for handling corrupt data during deserialization
> 
>
> Key: KAFKA-5157
> URL: https://issues.apache.org/jira/browse/KAFKA-5157
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Eno Thereska
>Assignee: Eno Thereska
>Priority: Major
>  Labels: kip, user-experience
> Fix For: 1.0.0
>
>
> When there is a bad formatted data in the source topics, deserialization will 
> throw a runtime exception all the way to the users. And since deserialization 
> happens before it was ever processed at the beginning of the topology, today 
> there is no ways to handle such errors on the user-app level.
>  We should consider allowing users to handle such "poison pills" in a 
> customizable way.
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-161%3A+streams+deserialization+exception+handlers



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-5157) Options for handling corrupt data during deserialization

2018-02-22 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5157?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-5157:
---
Labels: kip user-experience  (was: user-experience)

> Options for handling corrupt data during deserialization
> 
>
> Key: KAFKA-5157
> URL: https://issues.apache.org/jira/browse/KAFKA-5157
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Eno Thereska
>Assignee: Eno Thereska
>Priority: Major
>  Labels: kip, user-experience
> Fix For: 1.0.0
>
>
> When there is a bad formatted data in the source topics, deserialization will 
> throw a runtime exception all the way to the users. And since deserialization 
> happens before it was ever processed at the beginning of the topology, today 
> there is no ways to handle such errors on the user-app level.
>  We should consider allowing users to handle such "poison pills" in a 
> customizable way.
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-161%3A+streams+deserialization+exception+handlers



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6583) Metadata should include number of state stores for task

2018-02-22 Thread Richard Yu (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16373124#comment-16373124
 ] 

Richard Yu commented on KAFKA-6583:
---

This issue will block https://issues.apache.org/jira/browse/KAFKA-4696 until 
resolved.

> Metadata should include number of state stores for task
> ---
>
> Key: KAFKA-6583
> URL: https://issues.apache.org/jira/browse/KAFKA-6583
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.2.0, 0.11.0.0
>Reporter: Richard Yu
>Priority: Critical
>
> Currently, in the need for clients to be more evenly balanced, stateful tasks 
> should be distributed in such a manner that it will be spread equally. 
> However, for such an awareness to be implemented during task assignment, it 
> would require the need for the present rebalance protocol metadata to also 
> contain the number of state stores in a particular task. This way, it will 
> allow us to "weight" tasks during assignment. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-4696) Streams standby task assignment should be state-store aware

2018-02-22 Thread Richard Yu (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4696?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Richard Yu updated KAFKA-4696:
--
External issue URL:   (was: 
https://issues.apache.org/jira/browse/KAFKA-6583)

> Streams standby task assignment should be state-store aware
> ---
>
> Key: KAFKA-4696
> URL: https://issues.apache.org/jira/browse/KAFKA-4696
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Affects Versions: 0.10.2.0, 0.11.0.0
>Reporter: Damian Guy
>Priority: Major
>
> Task Assignment is currently not aware of which tasks have State Stores. This 
> can result in uneven balance of standby task assignment as all tasks are 
> assigned, but only those tasks with state-stores are ever created by 
> {{StreamThread}}. So what seems like an optimal strategy during assignment 
> time could be sub-optimal post-assignment.
> For example, lets say we have 4 tasks (2 with state-stores), 2 clients, 
> numStandbyReplicas = 1. Each client would get 2 active and 2 standby tasks.  
> One of the clients may end up with both state-store tasks, while the other 
> has none.
> Further to this, standby task configuration is currently "all or nothing". It 
> might make sense to allow more fine grained configuration, i.e., the ability 
> to specify the number of standby replicas individually for each stateful 
> operator.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-4696) Streams standby task assignment should be state-store aware

2018-02-22 Thread Richard Yu (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4696?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Richard Yu updated KAFKA-4696:
--
External issue URL: https://issues.apache.org/jira/browse/KAFKA-6583

> Streams standby task assignment should be state-store aware
> ---
>
> Key: KAFKA-4696
> URL: https://issues.apache.org/jira/browse/KAFKA-4696
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Affects Versions: 0.10.2.0, 0.11.0.0
>Reporter: Damian Guy
>Priority: Major
>
> Task Assignment is currently not aware of which tasks have State Stores. This 
> can result in uneven balance of standby task assignment as all tasks are 
> assigned, but only those tasks with state-stores are ever created by 
> {{StreamThread}}. So what seems like an optimal strategy during assignment 
> time could be sub-optimal post-assignment.
> For example, lets say we have 4 tasks (2 with state-stores), 2 clients, 
> numStandbyReplicas = 1. Each client would get 2 active and 2 standby tasks.  
> One of the clients may end up with both state-store tasks, while the other 
> has none.
> Further to this, standby task configuration is currently "all or nothing". It 
> might make sense to allow more fine grained configuration, i.e., the ability 
> to specify the number of standby replicas individually for each stateful 
> operator.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6584) Session expiration concurrent with ZooKeeper leadership failover may lead to broker registration failure

2018-02-22 Thread Chris Thunes (JIRA)
Chris Thunes created KAFKA-6584:
---

 Summary: Session expiration concurrent with ZooKeeper leadership 
failover may lead to broker registration failure
 Key: KAFKA-6584
 URL: https://issues.apache.org/jira/browse/KAFKA-6584
 Project: Kafka
  Issue Type: Bug
Affects Versions: 1.0.0
Reporter: Chris Thunes


It seems that an edge case exists which can lead to sessions "un-expiring" 
during a ZooKeeper leadership failover. Additional details can be found in 
ZOOKEEPER-2985.

This leads to a NODEXISTS error when attempting to re-create the ephemeral 
brokers/ids/\{id} node in ZkUtils.registerBrokerInZk. We experienced this issue 
on each node within a 3-node Kafka cluster running 1.0.0. All three nodes 
continued running (producers and consumers appeared unaffected), but none of 
the nodes were considered online and partition leadership could be not 
re-assigned.

I took a quick look at trunk and I believe the issue is still present, but has 
moved into KafkaZkClient.checkedEphemeralCreate which will [raise an 
error|https://github.com/apache/kafka/blob/90e0bbe/core/src/main/scala/kafka/zk/KafkaZkClient.scala#L1512]
 when it finds that the broker/ids/\{id} node exists, but belongs to the old 
(believed expired) session.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6583) Metadata should include number of state stores for task

2018-02-22 Thread Richard Yu (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6583?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Richard Yu updated KAFKA-6583:
--
Affects Version/s: 0.10.2.0
   0.11.0.0

> Metadata should include number of state stores for task
> ---
>
> Key: KAFKA-6583
> URL: https://issues.apache.org/jira/browse/KAFKA-6583
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.2.0, 0.11.0.0
>Reporter: Richard Yu
>Priority: Critical
>
> Currently, in the need for clients to be more evenly balanced, stateful tasks 
> should be distributed in such a manner that it will be spread equally. 
> However, for such an awareness to be implemented during task assignment, it 
> would require the need for the present rebalance protocol metadata to also 
> contain the number of state stores in a particular task. This way, it will 
> allow us to "weight" tasks during assignment. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6583) Metadata should include number of state stores for task

2018-02-22 Thread Richard Yu (JIRA)
Richard Yu created KAFKA-6583:
-

 Summary: Metadata should include number of state stores for task
 Key: KAFKA-6583
 URL: https://issues.apache.org/jira/browse/KAFKA-6583
 Project: Kafka
  Issue Type: Improvement
Reporter: Richard Yu


Currently, in the need for clients to be more evenly balanced, stateful tasks 
should be distributed in such a manner that it will be spread equally. However, 
for such an awareness to be implemented during task assignment, it would 
require the need for the present rebalance protocol metadata to also contain 
the number of state stores in a particular task. This way, it will allow us to 
"weight" tasks during assignment. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6583) Metadata should include number of state stores for task

2018-02-22 Thread Richard Yu (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6583?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Richard Yu updated KAFKA-6583:
--
Component/s: streams

> Metadata should include number of state stores for task
> ---
>
> Key: KAFKA-6583
> URL: https://issues.apache.org/jira/browse/KAFKA-6583
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.2.0, 0.11.0.0
>Reporter: Richard Yu
>Priority: Critical
>
> Currently, in the need for clients to be more evenly balanced, stateful tasks 
> should be distributed in such a manner that it will be spread equally. 
> However, for such an awareness to be implemented during task assignment, it 
> would require the need for the present rebalance protocol metadata to also 
> contain the number of state stores in a particular task. This way, it will 
> allow us to "weight" tasks during assignment. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-6572) kafka-consumer-groups does not reset offsets to specified datetime correctly

2018-02-22 Thread Sharon Lucas (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6572?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16372986#comment-16372986
 ] 

Sharon Lucas edited comment on KAFKA-6572 at 2/22/18 4:03 PM:
--

[~huxi_2b]  No.  There were tons of messages whose timestamp was greater than 
the timestamp we specified.


was (Author: slucas):
No.  There were tons of messages whose timestamp was greater than the timestamp 
we specified.

> kafka-consumer-groups does not reset offsets to specified datetime correctly
> 
>
> Key: KAFKA-6572
> URL: https://issues.apache.org/jira/browse/KAFKA-6572
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 1.0.0
>Reporter: Sharon Lucas
>Priority: Major
>
> With Kafka 1.0.0 we are seeing a problem using the kafka-consumer-groups.sh 
> --reset-offsets option to reset offsets to a specific date/time in our 
> production environment.
> We first tried to use the kafka-consumer-groups.sh command with the 
> --reset-offsets option and with option --to-datetime 2018-02-10T00:00:00.000 
> in our staging environment and it worked correctly.  Running the following 
> command changed it to start processing logs from February 12, 2018 (4 days 
> ago) for a topic that had a large lag.  We did a dry run to verify before 
> running with the --execute option.
> {code:java}
> root@mlpstagemon0101a:/# /opt/kafka/bin/kafka-consumer-groups.sh 
> --bootstrap-server NN.NNN.NN.NN:9092 --group logstash-elasticsearch-latest 
> --to-datetime 2018-02-12T00:00:00.000-06:00 --reset-offsets --topic 
> staging-mon01-rg-elasticsearch --execute{code}
> We stopped the kafka mirrors that process this topic before resetting the 
> offsets and started the kafka mirrors after rsetting the offsets.  We 
> verified that it correctly started processing logs from February 12, 2018.
> Then we tried resetting offsets in a production environment for a topic that 
> had a very large lag using option --to-datetime 2018-02-10T00:00:00.000 and 
> it did not work as expected. We stopped the kafka mirrors that process this 
> topic before resetting the offsets and did a dry run to see what the new 
> offsets would be:
> {code:java}
> root@mlplon0401e:# /opt/kafka/bin/kafka-consumer-groups.sh --bootstrap-server 
> NN.N.NNN.NNN:9092 --group mirror-consumer-ams03-geo-earliest --to-datetime 
> 2018-02-10T00:00:00.000 --reset-offsets --topic prod_in-ams03-geo-ca_access
> Note: This will not show information about old Zookeeper-based consumers.
> ^@^@^@^@
> TOPIC  PARTITION  NEW-OFFSET
> prod_in-ams03-geo-ca_access    52 52084147
> prod_in-ams03-geo-ca_access    106    52154199
> prod_in-ams03-geo-ca_access    75 52148673
> prod_in-ams03-geo-ca_access    61 52130753
> prod_in-ams03-geo-ca_access    49 52151667
> prod_in-ams03-geo-ca_access    48 52145233
> prod_in-ams03-geo-ca_access    27 52092805
> prod_in-ams03-geo-ca_access    26 52139644
> prod_in-ams03-geo-ca_access    65 52157504
> prod_in-ams03-geo-ca_access    105    52166289
> prod_in-ams03-geo-ca_access    38 52160464
> prod_in-ams03-geo-ca_access    22 52093451
> prod_in-ams03-geo-ca_access    4  52151660
> prod_in-ams03-geo-ca_access    90 52160296
> prod_in-ams03-geo-ca_access    25 52161691
> prod_in-ams03-geo-ca_access    13 52145828
> prod_in-ams03-geo-ca_access    56 52162867
> prod_in-ams03-geo-ca_access    42 52072094
> prod_in-ams03-geo-ca_access    7  52069496
> prod_in-ams03-geo-ca_access    117    52087078
> prod_in-ams03-geo-ca_access    32 52073732
> prod_in-ams03-geo-ca_access    102    52082022
> prod_in-ams03-geo-ca_access    76 52141018
> prod_in-ams03-geo-ca_access    83 52154542
> prod_in-ams03-geo-ca_access    72 52095051
> prod_in-ams03-geo-ca_access    85 52149907
> prod_in-ams03-geo-ca_access    119    52134435
> prod_in-ams03-geo-ca_access    113    52159340
> prod_in-ams03-geo-ca_access    55 52146597
> prod_in-ams03-geo-ca_access    18 52149079
> prod_in-ams03-geo-ca_access    35 52149058
> prod_in-ams03-geo-ca_access    99 52143277
> prod_in-ams03-geo-ca_access    41 52158872
> prod_in-ams03-geo-ca_access    112    52083901
> prod_in-ams03-geo-ca_access    34 52137932
> prod_in-ams03-geo-ca_access    89 52158135
> prod_in-ams03-geo-ca_access    40 5212
> prod_in-ams03-geo-ca_access    53 52138400
> prod_in-ams03-geo-ca_access    19 52144966
> prod_in-ams03-geo-ca_access    44 52166404
> prod_in-ams03-geo-ca_access    31 52155685
> prod_in-ams03-geo-ca_access    10 52

[jira] [Commented] (KAFKA-6572) kafka-consumer-groups does not reset offsets to specified datetime correctly

2018-02-22 Thread Sharon Lucas (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6572?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16372986#comment-16372986
 ] 

Sharon Lucas commented on KAFKA-6572:
-

No.  There were tons of messages whose timestamp was greater than the timestamp 
we specified.

> kafka-consumer-groups does not reset offsets to specified datetime correctly
> 
>
> Key: KAFKA-6572
> URL: https://issues.apache.org/jira/browse/KAFKA-6572
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 1.0.0
>Reporter: Sharon Lucas
>Priority: Major
>
> With Kafka 1.0.0 we are seeing a problem using the kafka-consumer-groups.sh 
> --reset-offsets option to reset offsets to a specific date/time in our 
> production environment.
> We first tried to use the kafka-consumer-groups.sh command with the 
> --reset-offsets option and with option --to-datetime 2018-02-10T00:00:00.000 
> in our staging environment and it worked correctly.  Running the following 
> command changed it to start processing logs from February 12, 2018 (4 days 
> ago) for a topic that had a large lag.  We did a dry run to verify before 
> running with the --execute option.
> {code:java}
> root@mlpstagemon0101a:/# /opt/kafka/bin/kafka-consumer-groups.sh 
> --bootstrap-server NN.NNN.NN.NN:9092 --group logstash-elasticsearch-latest 
> --to-datetime 2018-02-12T00:00:00.000-06:00 --reset-offsets --topic 
> staging-mon01-rg-elasticsearch --execute{code}
> We stopped the kafka mirrors that process this topic before resetting the 
> offsets and started the kafka mirrors after rsetting the offsets.  We 
> verified that it correctly started processing logs from February 12, 2018.
> Then we tried resetting offsets in a production environment for a topic that 
> had a very large lag using option --to-datetime 2018-02-10T00:00:00.000 and 
> it did not work as expected. We stopped the kafka mirrors that process this 
> topic before resetting the offsets and did a dry run to see what the new 
> offsets would be:
> {code:java}
> root@mlplon0401e:# /opt/kafka/bin/kafka-consumer-groups.sh --bootstrap-server 
> NN.N.NNN.NNN:9092 --group mirror-consumer-ams03-geo-earliest --to-datetime 
> 2018-02-10T00:00:00.000 --reset-offsets --topic prod_in-ams03-geo-ca_access
> Note: This will not show information about old Zookeeper-based consumers.
> ^@^@^@^@
> TOPIC  PARTITION  NEW-OFFSET
> prod_in-ams03-geo-ca_access    52 52084147
> prod_in-ams03-geo-ca_access    106    52154199
> prod_in-ams03-geo-ca_access    75 52148673
> prod_in-ams03-geo-ca_access    61 52130753
> prod_in-ams03-geo-ca_access    49 52151667
> prod_in-ams03-geo-ca_access    48 52145233
> prod_in-ams03-geo-ca_access    27 52092805
> prod_in-ams03-geo-ca_access    26 52139644
> prod_in-ams03-geo-ca_access    65 52157504
> prod_in-ams03-geo-ca_access    105    52166289
> prod_in-ams03-geo-ca_access    38 52160464
> prod_in-ams03-geo-ca_access    22 52093451
> prod_in-ams03-geo-ca_access    4  52151660
> prod_in-ams03-geo-ca_access    90 52160296
> prod_in-ams03-geo-ca_access    25 52161691
> prod_in-ams03-geo-ca_access    13 52145828
> prod_in-ams03-geo-ca_access    56 52162867
> prod_in-ams03-geo-ca_access    42 52072094
> prod_in-ams03-geo-ca_access    7  52069496
> prod_in-ams03-geo-ca_access    117    52087078
> prod_in-ams03-geo-ca_access    32 52073732
> prod_in-ams03-geo-ca_access    102    52082022
> prod_in-ams03-geo-ca_access    76 52141018
> prod_in-ams03-geo-ca_access    83 52154542
> prod_in-ams03-geo-ca_access    72 52095051
> prod_in-ams03-geo-ca_access    85 52149907
> prod_in-ams03-geo-ca_access    119    52134435
> prod_in-ams03-geo-ca_access    113    52159340
> prod_in-ams03-geo-ca_access    55 52146597
> prod_in-ams03-geo-ca_access    18 52149079
> prod_in-ams03-geo-ca_access    35 52149058
> prod_in-ams03-geo-ca_access    99 52143277
> prod_in-ams03-geo-ca_access    41 52158872
> prod_in-ams03-geo-ca_access    112    52083901
> prod_in-ams03-geo-ca_access    34 52137932
> prod_in-ams03-geo-ca_access    89 52158135
> prod_in-ams03-geo-ca_access    40 5212
> prod_in-ams03-geo-ca_access    53 52138400
> prod_in-ams03-geo-ca_access    19 52144966
> prod_in-ams03-geo-ca_access    44 52166404
> prod_in-ams03-geo-ca_access    31 52155685
> prod_in-ams03-geo-ca_access    10 52152151
> prod_in-ams03-geo-ca_access    98 52145378
> prod_in-ams03-geo-ca_access    69 52153436
> prod_in-ams03-geo-ca_access    92 52093455
> prod_in-ams0

[jira] [Commented] (KAFKA-3450) Producer blocks on send to topic that doesn't exist if auto create is disabled

2018-02-22 Thread JIRA

[ 
https://issues.apache.org/jira/browse/KAFKA-3450?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16372875#comment-16372875
 ] 

Josef Ludvíček commented on KAFKA-3450:
---

Same problem with 0.11.0.1

Is there any plan to resolve this issue or any recommendation to work around ? 

> Producer blocks on send to topic that doesn't exist if auto create is disabled
> --
>
> Key: KAFKA-3450
> URL: https://issues.apache.org/jira/browse/KAFKA-3450
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 0.9.0.1
>Reporter: Michal Turek
>Priority: Critical
>
> {{producer.send()}} is blocked for {{max.block.ms}} (default 60 seconds) if 
> the destination topic doesn't exist and if their automatic creation is 
> disabled. Warning from NetworkClient containing UNKNOWN_TOPIC_OR_PARTITION is 
> logged every 100 ms in a loop until the 60 seconds timeout expires, but the 
> operation is not recoverable.
> Preconditions
> - Kafka 0.9.0.1 with default configuration and auto.create.topics.enable=false
> - Kafka 0.9.0.1 clients.
> Example minimalist code
> https://github.com/avast/kafka-tests/blob/master/src/main/java/com/avast/kafkatests/othertests/nosuchtopic/NoSuchTopicTest.java
> {noformat}
> /**
>  * Test of sending to a topic that does not exist while automatic creation of 
> topics is disabled in Kafka (auto.create.topics.enable=false).
>  */
> public class NoSuchTopicTest {
> private static final Logger LOGGER = 
> LoggerFactory.getLogger(NoSuchTopicTest.class);
> public static void main(String[] args) {
> Properties properties = new Properties();
> properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
> "localhost:9092");
> properties.setProperty(ProducerConfig.CLIENT_ID_CONFIG, 
> NoSuchTopicTest.class.getSimpleName());
> properties.setProperty(ProducerConfig.MAX_BLOCK_MS_CONFIG, "1000"); 
> // Default is 60 seconds
> try (Producer producer = new 
> KafkaProducer<>(properties, new StringSerializer(), new StringSerializer())) {
> LOGGER.info("Sending message");
> producer.send(new ProducerRecord<>("ThisTopicDoesNotExist", 
> "key", "value"), (metadata, exception) -> {
> if (exception != null) {
> LOGGER.error("Send failed: {}", exception.toString());
> } else {
> LOGGER.info("Send successful: {}-{}/{}", 
> metadata.topic(), metadata.partition(), metadata.offset());
> }
> });
> LOGGER.info("Sending message");
> producer.send(new ProducerRecord<>("ThisTopicDoesNotExistToo", 
> "key", "value"), (metadata, exception) -> {
> if (exception != null) {
> LOGGER.error("Send failed: {}", exception.toString());
> } else {
> LOGGER.info("Send successful: {}-{}/{}", 
> metadata.topic(), metadata.partition(), metadata.offset());
> }
> });
> }
> }
> }
> {noformat}
> Related output
> {noformat}
> 2016-03-23 12:44:37.725 INFO  c.a.k.o.nosuchtopic.NoSuchTopicTest [main]: 
> Sending message (NoSuchTopicTest.java:26)
> 2016-03-23 12:44:37.830 WARN  o.a.kafka.clients.NetworkClient 
> [kafka-producer-network-thread | NoSuchTopicTest]: Error while fetching 
> metadata with correlation id 0 : 
> {ThisTopicDoesNotExist=UNKNOWN_TOPIC_OR_PARTITION} (NetworkClient.java:582)
> 2016-03-23 12:44:37.928 WARN  o.a.kafka.clients.NetworkClient 
> [kafka-producer-network-thread | NoSuchTopicTest]: Error while fetching 
> metadata with correlation id 1 : 
> {ThisTopicDoesNotExist=UNKNOWN_TOPIC_OR_PARTITION} (NetworkClient.java:582)
> 2016-03-23 12:44:38.028 WARN  o.a.kafka.clients.NetworkClient 
> [kafka-producer-network-thread | NoSuchTopicTest]: Error while fetching 
> metadata with correlation id 2 : 
> {ThisTopicDoesNotExist=UNKNOWN_TOPIC_OR_PARTITION} (NetworkClient.java:582)
> 2016-03-23 12:44:38.130 WARN  o.a.kafka.clients.NetworkClient 
> [kafka-producer-network-thread | NoSuchTopicTest]: Error while fetching 
> metadata with correlation id 3 : 
> {ThisTopicDoesNotExist=UNKNOWN_TOPIC_OR_PARTITION} (NetworkClient.java:582)
> 2016-03-23 12:44:38.231 WARN  o.a.kafka.clients.NetworkClient 
> [kafka-producer-network-thread | NoSuchTopicTest]: Error while fetching 
> metadata with correlation id 4 : 
> {ThisTopicDoesNotExist=UNKNOWN_TOPIC_OR_PARTITION} (NetworkClient.java:582)
> 2016-03-23 12:44:38.332 WARN  o.a.kafka.clients.NetworkClient 
> [kafka-producer-network-thread | NoSuchTopicTest]: Error while fetching 
> metadata with correlation id 5 : 
> {ThisTopicDoesNotExist=UNKNOWN_TOPIC_OR_PARTITION} (NetworkClient.java:582)
> 2016-03-23 12:44:38.433

[jira] [Created] (KAFKA-6582) Partitions get underreplicated, with a single ISR, and doesn't recover. Other brokers do not take over and we need to manually restart the broker.

2018-02-22 Thread Jurriaan Pruis (JIRA)
Jurriaan Pruis created KAFKA-6582:
-

 Summary: Partitions get underreplicated, with a single ISR, and 
doesn't recover. Other brokers do not take over and we need to manually restart 
the broker.
 Key: KAFKA-6582
 URL: https://issues.apache.org/jira/browse/KAFKA-6582
 Project: Kafka
  Issue Type: Bug
  Components: network
Affects Versions: 1.0.0
 Environment: Ubuntu 16.04
Linux kafka04 4.4.0-109-generic #132-Ubuntu SMP Tue Jan 9 19:52:39 UTC 2018 
x86_64 x86_64 x86_64 GNU/Linux

java version "9.0.1"
Java(TM) SE Runtime Environment (build 9.0.1+11)
Java HotSpot(TM) 64-Bit Server VM (build 9.0.1+11, mixed mode) 

but also tried with the latest JVM 8 before with the same result.
Reporter: Jurriaan Pruis


Partitions get underreplicated, with a single ISR, and doesn't recover. Other 
brokers do not take over and we need to manually restart the 'single ISR' 
broker (if you describe the partitions of replicated topic it is clear that 
some partitions are only in sync on this broker).

This bug resembles KAFKA-4477 a lot, but since that issue is marked as resolved 
this is probably something else but similar.

We have the same issue (or at least it looks pretty similar) on Kafka 1.0. 

Since upgrading to Kafka 1.0 in November 2017 we've had these issues (we've 
upgraded from Kafka 0.10.2.1).

This happens almost every 24-48 hours on a random broker. This is why we 
currently have a cronjob which restarts every broker every 24 hours. 

During this issue the ISR shows the following server log: 
{code:java}
[2018-02-20 12:02:08,342] WARN Attempting to send response via channel for 
which there is no open connection, connection id 
10.132.0.32:9092-10.14.148.20:56352-96708 (kafka.network.Processor)
[2018-02-20 12:02:08,364] WARN Attempting to send response via channel for 
which there is no open connection, connection id 
10.132.0.32:9092-10.14.150.25:54412-96715 (kafka.network.Processor)
[2018-02-20 12:02:08,349] WARN Attempting to send response via channel for 
which there is no open connection, connection id 
10.132.0.32:9092-10.14.149.18:35182-96705 (kafka.network.Processor)
[2018-02-20 12:02:08,379] WARN Attempting to send response via channel for 
which there is no open connection, connection id 
10.132.0.32:9092-10.14.150.25:54456-96717 (kafka.network.Processor)
[2018-02-20 12:02:08,448] WARN Attempting to send response via channel for 
which there is no open connection, connection id 
10.132.0.32:9092-10.14.159.20:36388-96720 (kafka.network.Processor)
[2018-02-20 12:02:08,683] WARN Attempting to send response via channel for 
which there is no open connection, connection id 
10.132.0.32:9092-10.14.157.110:41922-96740 (kafka.network.Processor)
{code}
Also on the ISR broker, the controller log shows this:
{code:java}
[2018-02-20 12:02:14,927] INFO [Controller-3-to-broker-3-send-thread]: 
Controller 3 connected to 10.132.0.32:9092 (id: 3 rack: null) for sending state 
change requests (kafka.controller.RequestSendThread)
[2018-02-20 12:02:14,927] INFO [Controller-3-to-broker-0-send-thread]: 
Controller 3 connected to 10.132.0.10:9092 (id: 0 rack: null) for sending state 
change requests (kafka.controller.RequestSendThread)
[2018-02-20 12:02:14,928] INFO [Controller-3-to-broker-1-send-thread]: 
Controller 3 connected to 10.132.0.12:9092 (id: 1 rack: null) for sending state 
change requests (kafka.controller.RequestSendThread){code}
And the non-ISR brokers show these kind of errors:

 
{code:java}
2018-02-20 12:02:29,204] WARN [ReplicaFetcher replicaId=1, leaderId=3, 
fetcherId=0] Error in fetch to broker 3, request (type=FetchRequest, 
replicaId=1, maxWait=500, minBytes=1, maxBytes=10485760, 
fetchData={..}, isolationLevel=READ_UNCOMMITTED) 
(kafka.server.ReplicaFetcherThread)
java.io.IOException: Connection to 3 was disconnected before the response was 
read
 at 
org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:95)
 at 
kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:96)
 at kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:205)
 at kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:41)
 at 
kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:149)
 at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113)
 at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:64)
{code}
 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6581) ConsumerGroupCommand hangs if even one of the partition is unavailable

2018-02-22 Thread Sahil Aggarwal (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6581?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sahil Aggarwal updated KAFKA-6581:
--
Priority: Minor  (was: Major)

> ConsumerGroupCommand hangs if even one of the partition is unavailable
> --
>
> Key: KAFKA-6581
> URL: https://issues.apache.org/jira/browse/KAFKA-6581
> Project: Kafka
>  Issue Type: Bug
>  Components: admin, core, tools
>Affects Versions: 0.10.0.0
>Reporter: Sahil Aggarwal
>Priority: Minor
> Fix For: 0.10.0.2
>
>
> ConsumerGroupCommand.scala uses consumer internally to get the position for 
> each partition but if the partition is unavailable the call 
> consumer.position(topicPartition) will block indefinitely.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6581) ConsumerGroupCommand hangs if even one of the partition is unavailable

2018-02-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16372688#comment-16372688
 ] 

ASF GitHub Bot commented on KAFKA-6581:
---

SahilAggarwal opened a new pull request #4612: KAFKA-6581: Fix the 
ConsumerGroupCommand indefinite execution if one of the partition is 
unavailable.
URL: https://github.com/apache/kafka/pull/4612
 
 
   
   * Checks if partition available before calling consumer.position
   * Adds timeout on consumer.position() call.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> ConsumerGroupCommand hangs if even one of the partition is unavailable
> --
>
> Key: KAFKA-6581
> URL: https://issues.apache.org/jira/browse/KAFKA-6581
> Project: Kafka
>  Issue Type: Bug
>  Components: admin, core, tools
>Affects Versions: 0.10.0.0
>Reporter: Sahil Aggarwal
>Priority: Major
> Fix For: 0.10.0.2
>
>
> ConsumerGroupCommand.scala uses consumer internally to get the position for 
> each partition but if the partition is unavailable the call 
> consumer.position(topicPartition) will block indefinitely.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-4477) Node reduces its ISR to itself, and doesn't recover. Other nodes do not take leadership, cluster remains sick until node is restarted.

2018-02-22 Thread AMOUSSOU DJANGBAN Baruch (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16372689#comment-16372689
 ] 

AMOUSSOU DJANGBAN Baruch commented on KAFKA-4477:
-

Ok Thanks 

 

> Node reduces its ISR to itself, and doesn't recover. Other nodes do not take 
> leadership, cluster remains sick until node is restarted.
> --
>
> Key: KAFKA-4477
> URL: https://issues.apache.org/jira/browse/KAFKA-4477
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.10.1.0
> Environment: RHEL7
> java version "1.8.0_66"
> Java(TM) SE Runtime Environment (build 1.8.0_66-b17)
> Java HotSpot(TM) 64-Bit Server VM (build 25.66-b17, mixed mode)
>Reporter: Michael Andre Pearce
>Assignee: Apurva Mehta
>Priority: Critical
>  Labels: reliability
> Fix For: 0.10.1.1
>
> Attachments: 2016_12_15.zip, 72_Server_Thread_Dump.txt, 
> 73_Server_Thread_Dump.txt, 74_Server_Thread_Dump, issue_node_1001.log, 
> issue_node_1001_ext.log, issue_node_1002.log, issue_node_1002_ext.log, 
> issue_node_1003.log, issue_node_1003_ext.log, kafka.jstack, 
> server_1_72server.log, server_2_73_server.log, server_3_74Server.log, 
> state_change_controller.tar.gz
>
>
> We have encountered a critical issue that has re-occured in different 
> physical environments. We haven't worked out what is going on. We do though 
> have a nasty work around to keep service alive. 
> We do have not had this issue on clusters still running 0.9.01.
> We have noticed a node randomly shrinking for the partitions it owns the 
> ISR's down to itself, moments later we see other nodes having disconnects, 
> followed by finally app issues, where producing to these partitions is 
> blocked.
> It seems only by restarting the kafka instance java process resolves the 
> issues.
> We have had this occur multiple times and from all network and machine 
> monitoring the machine never left the network, or had any other glitches.
> Below are seen logs from the issue.
> Node 7:
> [2016-12-01 07:01:28,112] INFO Partition 
> [com_ig_trade_v1_position_event--demo--compacted,10] on broker 7: Shrinking 
> ISR for partition [com_ig_trade_v1_position_event--demo--compacted,10] from 
> 1,2,7 to 7 (kafka.cluster.Partition)
> All other nodes:
> [2016-12-01 07:01:38,172] WARN [ReplicaFetcherThread-0-7], Error in fetch 
> kafka.server.ReplicaFetcherThread$FetchRequest@5aae6d42 
> (kafka.server.ReplicaFetcherThread)
> java.io.IOException: Connection to 7 was disconnected before the response was 
> read
> All clients:
> java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.NetworkException: The server disconnected 
> before a response was received.
> After this occurs, we then suddenly see on the sick machine an increasing 
> amount of close_waits and file descriptors.
> As a work around to keep service we are currently putting in an automated 
> process that tails and regex's for: and where new_partitions hit just itself 
> we restart the node. 
> "\[(?P.+)\] INFO Partition \[.*\] on broker .* Shrinking ISR for 
> partition \[.*\] from (?P.+) to (?P.+) 
> \(kafka.cluster.Partition\)"



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-4477) Node reduces its ISR to itself, and doesn't recover. Other nodes do not take leadership, cluster remains sick until node is restarted.

2018-02-22 Thread JIRA

[ 
https://issues.apache.org/jira/browse/KAFKA-4477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16372683#comment-16372683
 ] 

Håkon Åmdal commented on KAFKA-4477:


Not sure, but if they use Kafka 1.0.0, all those commits seems to be included.

> Node reduces its ISR to itself, and doesn't recover. Other nodes do not take 
> leadership, cluster remains sick until node is restarted.
> --
>
> Key: KAFKA-4477
> URL: https://issues.apache.org/jira/browse/KAFKA-4477
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.10.1.0
> Environment: RHEL7
> java version "1.8.0_66"
> Java(TM) SE Runtime Environment (build 1.8.0_66-b17)
> Java HotSpot(TM) 64-Bit Server VM (build 25.66-b17, mixed mode)
>Reporter: Michael Andre Pearce
>Assignee: Apurva Mehta
>Priority: Critical
>  Labels: reliability
> Fix For: 0.10.1.1
>
> Attachments: 2016_12_15.zip, 72_Server_Thread_Dump.txt, 
> 73_Server_Thread_Dump.txt, 74_Server_Thread_Dump, issue_node_1001.log, 
> issue_node_1001_ext.log, issue_node_1002.log, issue_node_1002_ext.log, 
> issue_node_1003.log, issue_node_1003_ext.log, kafka.jstack, 
> server_1_72server.log, server_2_73_server.log, server_3_74Server.log, 
> state_change_controller.tar.gz
>
>
> We have encountered a critical issue that has re-occured in different 
> physical environments. We haven't worked out what is going on. We do though 
> have a nasty work around to keep service alive. 
> We do have not had this issue on clusters still running 0.9.01.
> We have noticed a node randomly shrinking for the partitions it owns the 
> ISR's down to itself, moments later we see other nodes having disconnects, 
> followed by finally app issues, where producing to these partitions is 
> blocked.
> It seems only by restarting the kafka instance java process resolves the 
> issues.
> We have had this occur multiple times and from all network and machine 
> monitoring the machine never left the network, or had any other glitches.
> Below are seen logs from the issue.
> Node 7:
> [2016-12-01 07:01:28,112] INFO Partition 
> [com_ig_trade_v1_position_event--demo--compacted,10] on broker 7: Shrinking 
> ISR for partition [com_ig_trade_v1_position_event--demo--compacted,10] from 
> 1,2,7 to 7 (kafka.cluster.Partition)
> All other nodes:
> [2016-12-01 07:01:38,172] WARN [ReplicaFetcherThread-0-7], Error in fetch 
> kafka.server.ReplicaFetcherThread$FetchRequest@5aae6d42 
> (kafka.server.ReplicaFetcherThread)
> java.io.IOException: Connection to 7 was disconnected before the response was 
> read
> All clients:
> java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.NetworkException: The server disconnected 
> before a response was received.
> After this occurs, we then suddenly see on the sick machine an increasing 
> amount of close_waits and file descriptors.
> As a work around to keep service we are currently putting in an automated 
> process that tails and regex's for: and where new_partitions hit just itself 
> we restart the node. 
> "\[(?P.+)\] INFO Partition \[.*\] on broker .* Shrinking ISR for 
> partition \[.*\] from (?P.+) to (?P.+) 
> \(kafka.cluster.Partition\)"



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-4477) Node reduces its ISR to itself, and doesn't recover. Other nodes do not take leadership, cluster remains sick until node is restarted.

2018-02-22 Thread AMOUSSOU DJANGBAN Baruch (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16372665#comment-16372665
 ] 

AMOUSSOU DJANGBAN Baruch commented on KAFKA-4477:
-

Thanks [~hawk_aa]

Are you know if in confluent stack this problem are fixed? 

AM. 

> Node reduces its ISR to itself, and doesn't recover. Other nodes do not take 
> leadership, cluster remains sick until node is restarted.
> --
>
> Key: KAFKA-4477
> URL: https://issues.apache.org/jira/browse/KAFKA-4477
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.10.1.0
> Environment: RHEL7
> java version "1.8.0_66"
> Java(TM) SE Runtime Environment (build 1.8.0_66-b17)
> Java HotSpot(TM) 64-Bit Server VM (build 25.66-b17, mixed mode)
>Reporter: Michael Andre Pearce
>Assignee: Apurva Mehta
>Priority: Critical
>  Labels: reliability
> Fix For: 0.10.1.1
>
> Attachments: 2016_12_15.zip, 72_Server_Thread_Dump.txt, 
> 73_Server_Thread_Dump.txt, 74_Server_Thread_Dump, issue_node_1001.log, 
> issue_node_1001_ext.log, issue_node_1002.log, issue_node_1002_ext.log, 
> issue_node_1003.log, issue_node_1003_ext.log, kafka.jstack, 
> server_1_72server.log, server_2_73_server.log, server_3_74Server.log, 
> state_change_controller.tar.gz
>
>
> We have encountered a critical issue that has re-occured in different 
> physical environments. We haven't worked out what is going on. We do though 
> have a nasty work around to keep service alive. 
> We do have not had this issue on clusters still running 0.9.01.
> We have noticed a node randomly shrinking for the partitions it owns the 
> ISR's down to itself, moments later we see other nodes having disconnects, 
> followed by finally app issues, where producing to these partitions is 
> blocked.
> It seems only by restarting the kafka instance java process resolves the 
> issues.
> We have had this occur multiple times and from all network and machine 
> monitoring the machine never left the network, or had any other glitches.
> Below are seen logs from the issue.
> Node 7:
> [2016-12-01 07:01:28,112] INFO Partition 
> [com_ig_trade_v1_position_event--demo--compacted,10] on broker 7: Shrinking 
> ISR for partition [com_ig_trade_v1_position_event--demo--compacted,10] from 
> 1,2,7 to 7 (kafka.cluster.Partition)
> All other nodes:
> [2016-12-01 07:01:38,172] WARN [ReplicaFetcherThread-0-7], Error in fetch 
> kafka.server.ReplicaFetcherThread$FetchRequest@5aae6d42 
> (kafka.server.ReplicaFetcherThread)
> java.io.IOException: Connection to 7 was disconnected before the response was 
> read
> All clients:
> java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.NetworkException: The server disconnected 
> before a response was received.
> After this occurs, we then suddenly see on the sick machine an increasing 
> amount of close_waits and file descriptors.
> As a work around to keep service we are currently putting in an automated 
> process that tails and regex's for: and where new_partitions hit just itself 
> we restart the node. 
> "\[(?P.+)\] INFO Partition \[.*\] on broker .* Shrinking ISR for 
> partition \[.*\] from (?P.+) to (?P.+) 
> \(kafka.cluster.Partition\)"



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-4477) Node reduces its ISR to itself, and doesn't recover. Other nodes do not take leadership, cluster remains sick until node is restarted.

2018-02-22 Thread JIRA

[ 
https://issues.apache.org/jira/browse/KAFKA-4477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16372644#comment-16372644
 ] 

Håkon Åmdal commented on KAFKA-4477:


We ended up running our own build of Kafka 0.11.0 where we cherry picked these 
commits:
 * KAFKA-6042: Avoid deadlock between two groups with delayed operations

 * KAFKA-6003; Accept appends on replicas and when rebuilding the log 
unconditionally

 * KAFKA-5970; Use ReentrantLock for delayed operation lock to avoid blocking

On the top of my head, I cannot remember which commit exactly solved this 
problem, but we've run without issues since November 2017.

> Node reduces its ISR to itself, and doesn't recover. Other nodes do not take 
> leadership, cluster remains sick until node is restarted.
> --
>
> Key: KAFKA-4477
> URL: https://issues.apache.org/jira/browse/KAFKA-4477
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.10.1.0
> Environment: RHEL7
> java version "1.8.0_66"
> Java(TM) SE Runtime Environment (build 1.8.0_66-b17)
> Java HotSpot(TM) 64-Bit Server VM (build 25.66-b17, mixed mode)
>Reporter: Michael Andre Pearce
>Assignee: Apurva Mehta
>Priority: Critical
>  Labels: reliability
> Fix For: 0.10.1.1
>
> Attachments: 2016_12_15.zip, 72_Server_Thread_Dump.txt, 
> 73_Server_Thread_Dump.txt, 74_Server_Thread_Dump, issue_node_1001.log, 
> issue_node_1001_ext.log, issue_node_1002.log, issue_node_1002_ext.log, 
> issue_node_1003.log, issue_node_1003_ext.log, kafka.jstack, 
> server_1_72server.log, server_2_73_server.log, server_3_74Server.log, 
> state_change_controller.tar.gz
>
>
> We have encountered a critical issue that has re-occured in different 
> physical environments. We haven't worked out what is going on. We do though 
> have a nasty work around to keep service alive. 
> We do have not had this issue on clusters still running 0.9.01.
> We have noticed a node randomly shrinking for the partitions it owns the 
> ISR's down to itself, moments later we see other nodes having disconnects, 
> followed by finally app issues, where producing to these partitions is 
> blocked.
> It seems only by restarting the kafka instance java process resolves the 
> issues.
> We have had this occur multiple times and from all network and machine 
> monitoring the machine never left the network, or had any other glitches.
> Below are seen logs from the issue.
> Node 7:
> [2016-12-01 07:01:28,112] INFO Partition 
> [com_ig_trade_v1_position_event--demo--compacted,10] on broker 7: Shrinking 
> ISR for partition [com_ig_trade_v1_position_event--demo--compacted,10] from 
> 1,2,7 to 7 (kafka.cluster.Partition)
> All other nodes:
> [2016-12-01 07:01:38,172] WARN [ReplicaFetcherThread-0-7], Error in fetch 
> kafka.server.ReplicaFetcherThread$FetchRequest@5aae6d42 
> (kafka.server.ReplicaFetcherThread)
> java.io.IOException: Connection to 7 was disconnected before the response was 
> read
> All clients:
> java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.NetworkException: The server disconnected 
> before a response was received.
> After this occurs, we then suddenly see on the sick machine an increasing 
> amount of close_waits and file descriptors.
> As a work around to keep service we are currently putting in an automated 
> process that tails and regex's for: and where new_partitions hit just itself 
> we restart the node. 
> "\[(?P.+)\] INFO Partition \[.*\] on broker .* Shrinking ISR for 
> partition \[.*\] from (?P.+) to (?P.+) 
> \(kafka.cluster.Partition\)"



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-4477) Node reduces its ISR to itself, and doesn't recover. Other nodes do not take leadership, cluster remains sick until node is restarted.

2018-02-22 Thread AMOUSSOU DJANGBAN Baruch (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16372629#comment-16372629
 ] 

AMOUSSOU DJANGBAN Baruch commented on KAFKA-4477:
-

Hi All,

I have same issue actually with kafka 0.11.0.

Are you have a solution for this problem? I have a big impact for all my 
application? 

Many thanks 

> Node reduces its ISR to itself, and doesn't recover. Other nodes do not take 
> leadership, cluster remains sick until node is restarted.
> --
>
> Key: KAFKA-4477
> URL: https://issues.apache.org/jira/browse/KAFKA-4477
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.10.1.0
> Environment: RHEL7
> java version "1.8.0_66"
> Java(TM) SE Runtime Environment (build 1.8.0_66-b17)
> Java HotSpot(TM) 64-Bit Server VM (build 25.66-b17, mixed mode)
>Reporter: Michael Andre Pearce
>Assignee: Apurva Mehta
>Priority: Critical
>  Labels: reliability
> Fix For: 0.10.1.1
>
> Attachments: 2016_12_15.zip, 72_Server_Thread_Dump.txt, 
> 73_Server_Thread_Dump.txt, 74_Server_Thread_Dump, issue_node_1001.log, 
> issue_node_1001_ext.log, issue_node_1002.log, issue_node_1002_ext.log, 
> issue_node_1003.log, issue_node_1003_ext.log, kafka.jstack, 
> server_1_72server.log, server_2_73_server.log, server_3_74Server.log, 
> state_change_controller.tar.gz
>
>
> We have encountered a critical issue that has re-occured in different 
> physical environments. We haven't worked out what is going on. We do though 
> have a nasty work around to keep service alive. 
> We do have not had this issue on clusters still running 0.9.01.
> We have noticed a node randomly shrinking for the partitions it owns the 
> ISR's down to itself, moments later we see other nodes having disconnects, 
> followed by finally app issues, where producing to these partitions is 
> blocked.
> It seems only by restarting the kafka instance java process resolves the 
> issues.
> We have had this occur multiple times and from all network and machine 
> monitoring the machine never left the network, or had any other glitches.
> Below are seen logs from the issue.
> Node 7:
> [2016-12-01 07:01:28,112] INFO Partition 
> [com_ig_trade_v1_position_event--demo--compacted,10] on broker 7: Shrinking 
> ISR for partition [com_ig_trade_v1_position_event--demo--compacted,10] from 
> 1,2,7 to 7 (kafka.cluster.Partition)
> All other nodes:
> [2016-12-01 07:01:38,172] WARN [ReplicaFetcherThread-0-7], Error in fetch 
> kafka.server.ReplicaFetcherThread$FetchRequest@5aae6d42 
> (kafka.server.ReplicaFetcherThread)
> java.io.IOException: Connection to 7 was disconnected before the response was 
> read
> All clients:
> java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.NetworkException: The server disconnected 
> before a response was received.
> After this occurs, we then suddenly see on the sick machine an increasing 
> amount of close_waits and file descriptors.
> As a work around to keep service we are currently putting in an automated 
> process that tails and regex's for: and where new_partitions hit just itself 
> we restart the node. 
> "\[(?P.+)\] INFO Partition \[.*\] on broker .* Shrinking ISR for 
> partition \[.*\] from (?P.+) to (?P.+) 
> \(kafka.cluster.Partition\)"



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6572) kafka-consumer-groups does not reset offsets to specified datetime correctly

2018-02-22 Thread huxihx (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6572?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16372618#comment-16372618
 ] 

huxihx commented on KAFKA-6572:
---

[~slucas] Currently, the partition's offset will be set to the LEO (log end 
offset) if there is no such message whose timestamp is greater than the given 
timestamp. Could it be possible in your case?

> kafka-consumer-groups does not reset offsets to specified datetime correctly
> 
>
> Key: KAFKA-6572
> URL: https://issues.apache.org/jira/browse/KAFKA-6572
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 1.0.0
>Reporter: Sharon Lucas
>Priority: Major
>
> With Kafka 1.0.0 we are seeing a problem using the kafka-consumer-groups.sh 
> --reset-offsets option to reset offsets to a specific date/time in our 
> production environment.
> We first tried to use the kafka-consumer-groups.sh command with the 
> --reset-offsets option and with option --to-datetime 2018-02-10T00:00:00.000 
> in our staging environment and it worked correctly.  Running the following 
> command changed it to start processing logs from February 12, 2018 (4 days 
> ago) for a topic that had a large lag.  We did a dry run to verify before 
> running with the --execute option.
> {code:java}
> root@mlpstagemon0101a:/# /opt/kafka/bin/kafka-consumer-groups.sh 
> --bootstrap-server NN.NNN.NN.NN:9092 --group logstash-elasticsearch-latest 
> --to-datetime 2018-02-12T00:00:00.000-06:00 --reset-offsets --topic 
> staging-mon01-rg-elasticsearch --execute{code}
> We stopped the kafka mirrors that process this topic before resetting the 
> offsets and started the kafka mirrors after rsetting the offsets.  We 
> verified that it correctly started processing logs from February 12, 2018.
> Then we tried resetting offsets in a production environment for a topic that 
> had a very large lag using option --to-datetime 2018-02-10T00:00:00.000 and 
> it did not work as expected. We stopped the kafka mirrors that process this 
> topic before resetting the offsets and did a dry run to see what the new 
> offsets would be:
> {code:java}
> root@mlplon0401e:# /opt/kafka/bin/kafka-consumer-groups.sh --bootstrap-server 
> NN.N.NNN.NNN:9092 --group mirror-consumer-ams03-geo-earliest --to-datetime 
> 2018-02-10T00:00:00.000 --reset-offsets --topic prod_in-ams03-geo-ca_access
> Note: This will not show information about old Zookeeper-based consumers.
> ^@^@^@^@
> TOPIC  PARTITION  NEW-OFFSET
> prod_in-ams03-geo-ca_access    52 52084147
> prod_in-ams03-geo-ca_access    106    52154199
> prod_in-ams03-geo-ca_access    75 52148673
> prod_in-ams03-geo-ca_access    61 52130753
> prod_in-ams03-geo-ca_access    49 52151667
> prod_in-ams03-geo-ca_access    48 52145233
> prod_in-ams03-geo-ca_access    27 52092805
> prod_in-ams03-geo-ca_access    26 52139644
> prod_in-ams03-geo-ca_access    65 52157504
> prod_in-ams03-geo-ca_access    105    52166289
> prod_in-ams03-geo-ca_access    38 52160464
> prod_in-ams03-geo-ca_access    22 52093451
> prod_in-ams03-geo-ca_access    4  52151660
> prod_in-ams03-geo-ca_access    90 52160296
> prod_in-ams03-geo-ca_access    25 52161691
> prod_in-ams03-geo-ca_access    13 52145828
> prod_in-ams03-geo-ca_access    56 52162867
> prod_in-ams03-geo-ca_access    42 52072094
> prod_in-ams03-geo-ca_access    7  52069496
> prod_in-ams03-geo-ca_access    117    52087078
> prod_in-ams03-geo-ca_access    32 52073732
> prod_in-ams03-geo-ca_access    102    52082022
> prod_in-ams03-geo-ca_access    76 52141018
> prod_in-ams03-geo-ca_access    83 52154542
> prod_in-ams03-geo-ca_access    72 52095051
> prod_in-ams03-geo-ca_access    85 52149907
> prod_in-ams03-geo-ca_access    119    52134435
> prod_in-ams03-geo-ca_access    113    52159340
> prod_in-ams03-geo-ca_access    55 52146597
> prod_in-ams03-geo-ca_access    18 52149079
> prod_in-ams03-geo-ca_access    35 52149058
> prod_in-ams03-geo-ca_access    99 52143277
> prod_in-ams03-geo-ca_access    41 52158872
> prod_in-ams03-geo-ca_access    112    52083901
> prod_in-ams03-geo-ca_access    34 52137932
> prod_in-ams03-geo-ca_access    89 52158135
> prod_in-ams03-geo-ca_access    40 5212
> prod_in-ams03-geo-ca_access    53 52138400
> prod_in-ams03-geo-ca_access    19 52144966
> prod_in-ams03-geo-ca_access    44 52166404
> prod_in-ams03-geo-ca_access    31 52155685
> prod_in-ams03-geo-ca_access    10 52152151
> prod_in-ams03-geo-ca_access    98 52145378
> prod_in-ams03-geo-ca_ac

[jira] [Commented] (KAFKA-6577) Connect standalone SASL file source and sink test fails without explanation

2018-02-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16372606#comment-16372606
 ] 

ASF GitHub Bot commented on KAFKA-6577:
---

dguy closed pull request #4610: KAFKA-6577: Fix Connect system tests and add 
debug messages
URL: https://github.com/apache/kafka/pull/4610
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
index 4afa47dda1a..3b7ec87f644 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
@@ -74,6 +74,7 @@ public static void main(String[] args) throws Exception {
 DistributedConfig config = new DistributedConfig(workerProps);
 
 String kafkaClusterId = ConnectUtils.lookupKafkaClusterId(config);
+log.debug("Kafka cluster ID: {}", kafkaClusterId);
 
 RestServer rest = new RestServer(config);
 URI advertisedUrl = rest.advertisedUrl();
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java
index 17699053541..413cb46cf28 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java
@@ -78,6 +78,7 @@ public static void main(String[] args) throws Exception {
 StandaloneConfig config = new StandaloneConfig(workerProps);
 
 String kafkaClusterId = ConnectUtils.lookupKafkaClusterId(config);
+log.debug("Kafka cluster ID: {}", kafkaClusterId);
 
 RestServer rest = new RestServer(config);
 URI advertisedUrl = rest.advertisedUrl();
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
index b34e48390e1..e51b365cec6 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
@@ -432,6 +432,7 @@ public void putTargetState(String connector, TargetState 
state) {
 Runnable createTopics = new Runnable() {
 @Override
 public void run() {
+log.debug("Creating admin client to manage Connect internal 
config topic");
 try (TopicAdmin admin = new TopicAdmin(adminProps)) {
 admin.createTopics(topicDescription);
 }
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java
index f29f3c23d03..fb8ad97b48d 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java
@@ -94,6 +94,7 @@ public void configure(final WorkerConfig config) {
 Runnable createTopics = new Runnable() {
 @Override
 public void run() {
+log.debug("Creating admin client to manage Connect internal 
offset topic");
 try (TopicAdmin admin = new TopicAdmin(adminProps)) {
 admin.createTopics(topicDescription);
 }
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java
index 8ca21ebb350..6710808f9a9 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java
@@ -157,6 +157,7 @@ public void onCompletion(Throwable error, 
ConsumerRecord record)
 Runnable createTopics = new Runnable() {
 @Override
 public void run() {
+log.debug("Creating admin client to manage Connect internal 
status topic");
 try (TopicAdmin admin = new TopicAdmin(adminProps)) {
 admin.createTopics(topicDescription);
 }
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectUtils.java 
b/connect/runtime/src/main/java/org/apache/kafka/con

[jira] [Resolved] (KAFKA-6577) Connect standalone SASL file source and sink test fails without explanation

2018-02-22 Thread Damian Guy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6577?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Damian Guy resolved KAFKA-6577.
---
   Resolution: Fixed
Fix Version/s: 1.2.0

Issue resolved by pull request 4610
[https://github.com/apache/kafka/pull/4610]

> Connect standalone SASL file source and sink test fails without explanation
> ---
>
> Key: KAFKA-6577
> URL: https://issues.apache.org/jira/browse/KAFKA-6577
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect, system tests
>Affects Versions: 1.1.0
>Reporter: Randall Hauch
>Assignee: Randall Hauch
>Priority: Blocker
> Fix For: 1.2.0, 1.1.0
>
>
> The 
> {{tests/kafkatest/tests/connect/connect_test.py::ConnectStandaloneFileTest.test_file_source_and_sink}}
>  test is failing with the SASL configuration without a sufficient 
> explanation. During the test, the Connect worker fails to start, but the 
> Connect log contains no useful information.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-5157) Options for handling corrupt data during deserialization

2018-02-22 Thread childNo͡.de

[ 
https://issues.apache.org/jira/browse/KAFKA-5157?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16372564#comment-16372564
 ] 

Marcel "childNo͡.de" Trautwein edited comment on KAFKA-5157 at 2/22/18 9:08 AM:


Just a question ... What I "dislike" here from a native point of view, is that 
if you want to configure your implementing {{DeserializationExceptionHandler}} 
you have that well defined {{Configurable}} interface, but what is injected is 
the {{StreamsConfig}} so you have to put configurations into the streamsConfig 
map to access them later in the implementation ... is this really the desired / 
designed procedure and / or best practice?

P.S: yes, I know, discussion to KIP was / is in 
[http://mail-archives.apache.org/mod_mbox/kafka-dev/201705.mbox/%3c3da51c0a-f7d8-4134-b641-06eb3f2a1...@gmail.com%3e]
 :)


was (Author: childnode):
Just a question ... What I "dislike" here from a native point of view, is that 
if you want to configure your implementing {{DeserializationExceptionHandler}} 
you have that well defined {{Configurable}} interface, but what is injected is 
the {{StreamsConfig}} so you have to put configurations into the streamsConfig 
map to access them later in the implementation ... is this really the desired / 
designed procedure and / or best practice?

> Options for handling corrupt data during deserialization
> 
>
> Key: KAFKA-5157
> URL: https://issues.apache.org/jira/browse/KAFKA-5157
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Eno Thereska
>Assignee: Eno Thereska
>Priority: Major
>  Labels: user-experience
> Fix For: 1.0.0
>
>
> When there is a bad formatted data in the source topics, deserialization will 
> throw a runtime exception all the way to the users. And since deserialization 
> happens before it was ever processed at the beginning of the topology, today 
> there is no ways to handle such errors on the user-app level.
> We should consider allowing users to handle such "poison pills" in a 
> customizable way.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-5157) Options for handling corrupt data during deserialization

2018-02-22 Thread childNo͡.de

[ 
https://issues.apache.org/jira/browse/KAFKA-5157?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16372564#comment-16372564
 ] 

Marcel "childNo͡.de" Trautwein commented on KAFKA-5157:
---

Just a question ... What I "dislike" here from a native point of view, is that 
if you want to configure your implementing {{DeserializationExceptionHandler}} 
you have that well defined {{Configurable}} interface, but what is injected is 
the {{StreamsConfig}} so you have to put configurations into the streamsConfig 
map to access them later in the implementation ... is this really the desired / 
designed procedure and / or best practice?

> Options for handling corrupt data during deserialization
> 
>
> Key: KAFKA-5157
> URL: https://issues.apache.org/jira/browse/KAFKA-5157
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Eno Thereska
>Assignee: Eno Thereska
>Priority: Major
>  Labels: user-experience
> Fix For: 1.0.0
>
>
> When there is a bad formatted data in the source topics, deserialization will 
> throw a runtime exception all the way to the users. And since deserialization 
> happens before it was ever processed at the beginning of the topology, today 
> there is no ways to handle such errors on the user-app level.
> We should consider allowing users to handle such "poison pills" in a 
> customizable way.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6581) ConsumerGroupCommand hangs if even one of the partition is unavailable

2018-02-22 Thread Sahil Aggarwal (JIRA)
Sahil Aggarwal created KAFKA-6581:
-

 Summary: ConsumerGroupCommand hangs if even one of the partition 
is unavailable
 Key: KAFKA-6581
 URL: https://issues.apache.org/jira/browse/KAFKA-6581
 Project: Kafka
  Issue Type: Bug
  Components: admin, core, tools
Affects Versions: 0.10.0.0
Reporter: Sahil Aggarwal
 Fix For: 0.10.0.2


ConsumerGroupCommand.scala uses consumer internally to get the position for 
each partition but if the partition is unavailable the call 
consumer.position(topicPartition) will block indefinitely.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)