Jenkins build is unstable: Kafka » Kafka Branch Builder » trunk #1366

2022-11-18 Thread Apache Jenkins Server
See 




[jira] [Created] (KAFKA-14407) Able to access offset index memory map even after memory map flag is turned off

2022-11-18 Thread Jeff Kim (Jira)
Jeff Kim created KAFKA-14407:


 Summary: Able to access offset index memory map even after memory 
map flag is turned off
 Key: KAFKA-14407
 URL: https://issues.apache.org/jira/browse/KAFKA-14407
 Project: Kafka
  Issue Type: Bug
Reporter: Jeff Kim


Original investigation: 
[https://github.com/apache/kafka/pull/12783#issuecomment-1315804689]

 

we have mechanisms in place to check before accessing a memory map. an example:

  private def fetchHighWatermarkMetadata: LogOffsetMetadata = {
    localLog.checkIfMemoryMappedBufferClosed()
    ...
  }

 which makes it seem okay to read from the offset index. however, we should not 
allow any disk io if an offset index is unmapped. moreover, if a local log is 
closed, we should set `isMemoryMappedBufferClosed` to true in 
`LocalLog.close()`:

  private[log] def close(): Unit = {
maybeHandleIOException(s"Error while renaming dir for $topicPartition in 
dir ${dir.getParent}") {
  checkIfMemoryMappedBufferClosed()
  segments.close()
  // isMemoryMappedBufferClosed = true?
}
  }

note that besides altering replica log dir, UnifiedLog.close() is only called 
during shutdown.

i'm not sure if this is a bug but accessing the high watermark (by reading from 
the actual segment via offset index) will result in a NPE because 
isMemoryMappedBufferClosed is false. 

java.lang.NullPointerException
at kafka.log.OffsetIndex.$anonfun$lookup$1(OffsetIndex.scala:90)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-14406) Double iteration of records in batches to be restored

2022-11-18 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14406?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman resolved KAFKA-14406.

Resolution: Fixed

> Double iteration of records in batches to be restored
> -
>
> Key: KAFKA-14406
> URL: https://issues.apache.org/jira/browse/KAFKA-14406
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Priority: Major
> Fix For: 3.4.0
>
>
> While restoring a batch of records, {{RocksDBStore}} was iterating the 
> {{{}ConsumerRecord{}}}s, building a list of {{{}KeyValue{}}}s, and then 
> iterating _that_ list of {{{}KeyValue{}}}s to add them to the RocksDB batch.
> Simply adding the key and value directly to the RocksDB batch prevents this 
> unnecessary second iteration, and the creation of itermediate {{KeyValue}} 
> objects, improving the performance of state restoration, and reducing 
> unnecessary object allocation.
> (thanks to Nick Telford for finding this)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14406) Double iteration of records in batches to be restored

2022-11-18 Thread A. Sophie Blee-Goldman (Jira)
A. Sophie Blee-Goldman created KAFKA-14406:
--

 Summary: Double iteration of records in batches to be restored
 Key: KAFKA-14406
 URL: https://issues.apache.org/jira/browse/KAFKA-14406
 Project: Kafka
  Issue Type: Bug
  Components: streams
Reporter: A. Sophie Blee-Goldman
 Fix For: 3.4.0


While restoring a batch of records, {{RocksDBStore}} was iterating the 
{{{}ConsumerRecord{}}}s, building a list of {{{}KeyValue{}}}s, and then 
iterating _that_ list of {{{}KeyValue{}}}s to add them to the RocksDB batch.

Simply adding the key and value directly to the RocksDB batch prevents this 
unnecessary second iteration, and the creation of itermediate {{KeyValue}} 
objects, improving the performance of state restoration, and reducing 
unnecessary object allocation.

(thanks to Nick Telford for finding this)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14405) Log a warning when users attempt to set a config controlled by Streams

2022-11-18 Thread A. Sophie Blee-Goldman (Jira)
A. Sophie Blee-Goldman created KAFKA-14405:
--

 Summary: Log a warning when users attempt to set a config 
controlled by Streams
 Key: KAFKA-14405
 URL: https://issues.apache.org/jira/browse/KAFKA-14405
 Project: Kafka
  Issue Type: Bug
  Components: streams
Reporter: A. Sophie Blee-Goldman


Related to https://issues.apache.org/jira/browse/KAFKA-14404

It's too easy for users to try overriding one of the client configs that 
Streams hardcodes, and since we just silently ignore it there's no good way for 
them to tell their config is not being used. Sometimes this may be harmless but 
in cases like the Producer's partitioner, there could be important application 
logic that's never being invoked.

When processing user configs in StreamsConfig, we should check for all these 
configs and log a warning when any of them have been set



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14404) Improve dWarn that the ProducerConfig partitioner.class cannot be used in Streams

2022-11-18 Thread A. Sophie Blee-Goldman (Jira)
A. Sophie Blee-Goldman created KAFKA-14404:
--

 Summary: Improve dWarn that the ProducerConfig partitioner.class 
cannot be used in Streams
 Key: KAFKA-14404
 URL: https://issues.apache.org/jira/browse/KAFKA-14404
 Project: Kafka
  Issue Type: Bug
  Components: streams
Reporter: A. Sophie Blee-Goldman


There are a handful of client configs that can't be set by Streams users for 
various reasons, such as the group id, but we seem to have missed a few of them 
in the documentation 
[here|https://kafka.apache.org/documentation/streams/developer-guide/config-streams.html#id26]:
 the partitioner assignor (Consumer) and partitioner (Producer).

This section of the docs also just needs to be cleaned up in general as there 
is overlap between the [Default 
Values|https://kafka.apache.org/documentation/streams/developer-guide/config-streams.html#default-values]
 and [Parameters controlled by Kafka 
Streams|https://kafka.apache.org/documentation/streams/developer-guide/config-streams.html#id26]
 sections, and the table of contents is messed up presumably due to an issue 
with the section headers.

We should separate these with one section covering (only) configs where Streams 
sets a different default but this can still be overridden by the user, and the 
other section covering the configs that Streams hardcodes and users can never 
override.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Jenkins build is still unstable: Kafka » Kafka Branch Builder » 3.3 #120

2022-11-18 Thread Apache Jenkins Server
See 




[jira] [Created] (KAFKA-14403) Snapshot failure metrics

2022-11-18 Thread Jira
José Armando García Sancio created KAFKA-14403:
--

 Summary: Snapshot failure metrics
 Key: KAFKA-14403
 URL: https://issues.apache.org/jira/browse/KAFKA-14403
 Project: Kafka
  Issue Type: Sub-task
Reporter: José Armando García Sancio


Implement the following metrics:

Controller:
kafka.controller:type=KafkaController,name=MetadataSnapshotGenerationErrors
Incremented anytime the controller fails to generate a snapshot. Reset to zero 
anytime the controller restarts or a snapshot is successfully generated.

Broker:
kafka.server:type=broker-metadata-metrics,name=snapshot-generation-errors
Incremented anytime the broker fails to generate a snapshot. Reset to zero 
anytime the broker restarts or a snapshot is successfully generated.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Build failed in Jenkins: Kafka » Kafka Branch Builder » trunk #1365

2022-11-18 Thread Apache Jenkins Server
See 


Changes:


--
[...truncated 512703 lines...]
[2022-11-18T21:28:58.062Z] 
[2022-11-18T21:28:58.062Z] Gradle Test Run :streams:integrationTest > Gradle 
Test Executor 169 > TableTableJoinIntegrationTest > [caching enabled = false] > 
org.apache.kafka.streams.integration.TableTableJoinIntegrationTest.testOuter[caching
 enabled = false] STARTED
[2022-11-18T21:29:05.792Z] 
[2022-11-18T21:29:05.792Z] Gradle Test Run :streams:integrationTest > Gradle 
Test Executor 169 > TableTableJoinIntegrationTest > [caching enabled = false] > 
org.apache.kafka.streams.integration.TableTableJoinIntegrationTest.testOuter[caching
 enabled = false] PASSED
[2022-11-18T21:29:05.792Z] 
[2022-11-18T21:29:05.792Z] Gradle Test Run :streams:integrationTest > Gradle 
Test Executor 169 > TableTableJoinIntegrationTest > [caching enabled = false] > 
org.apache.kafka.streams.integration.TableTableJoinIntegrationTest.testLeft[caching
 enabled = false] STARTED
[2022-11-18T21:29:13.567Z] 
[2022-11-18T21:29:13.567Z] Gradle Test Run :streams:integrationTest > Gradle 
Test Executor 169 > TableTableJoinIntegrationTest > [caching enabled = false] > 
org.apache.kafka.streams.integration.TableTableJoinIntegrationTest.testLeft[caching
 enabled = false] PASSED
[2022-11-18T21:29:13.567Z] 
[2022-11-18T21:29:13.567Z] Gradle Test Run :streams:integrationTest > Gradle 
Test Executor 169 > TableTableJoinIntegrationTest > [caching enabled = false] > 
org.apache.kafka.streams.integration.TableTableJoinIntegrationTest.testInnerInner[caching
 enabled = false] STARTED
[2022-11-18T21:29:20.420Z] 
[2022-11-18T21:29:20.420Z] Gradle Test Run :streams:integrationTest > Gradle 
Test Executor 169 > TableTableJoinIntegrationTest > [caching enabled = false] > 
org.apache.kafka.streams.integration.TableTableJoinIntegrationTest.testInnerInner[caching
 enabled = false] PASSED
[2022-11-18T21:29:20.420Z] 
[2022-11-18T21:29:20.420Z] Gradle Test Run :streams:integrationTest > Gradle 
Test Executor 169 > TableTableJoinIntegrationTest > [caching enabled = false] > 
org.apache.kafka.streams.integration.TableTableJoinIntegrationTest.testInnerOuter[caching
 enabled = false] STARTED
[2022-11-18T21:29:27.200Z] 
[2022-11-18T21:29:27.200Z] Gradle Test Run :streams:integrationTest > Gradle 
Test Executor 169 > TableTableJoinIntegrationTest > [caching enabled = false] > 
org.apache.kafka.streams.integration.TableTableJoinIntegrationTest.testInnerOuter[caching
 enabled = false] PASSED
[2022-11-18T21:29:27.200Z] 
[2022-11-18T21:29:27.200Z] Gradle Test Run :streams:integrationTest > Gradle 
Test Executor 169 > TableTableJoinIntegrationTest > [caching enabled = false] > 
org.apache.kafka.streams.integration.TableTableJoinIntegrationTest.testInnerLeft[caching
 enabled = false] STARTED
[2022-11-18T21:29:37.252Z] 
[2022-11-18T21:29:37.252Z] Gradle Test Run :streams:integrationTest > Gradle 
Test Executor 169 > TableTableJoinIntegrationTest > [caching enabled = false] > 
org.apache.kafka.streams.integration.TableTableJoinIntegrationTest.testInnerLeft[caching
 enabled = false] PASSED
[2022-11-18T21:29:37.252Z] 
[2022-11-18T21:29:37.252Z] Gradle Test Run :streams:integrationTest > Gradle 
Test Executor 169 > TableTableJoinIntegrationTest > [caching enabled = false] > 
org.apache.kafka.streams.integration.TableTableJoinIntegrationTest.testOuterInner[caching
 enabled = false] STARTED
[2022-11-18T21:29:42.345Z] 
[2022-11-18T21:29:42.345Z] Gradle Test Run :streams:integrationTest > Gradle 
Test Executor 169 > TableTableJoinIntegrationTest > [caching enabled = false] > 
org.apache.kafka.streams.integration.TableTableJoinIntegrationTest.testOuterInner[caching
 enabled = false] PASSED
[2022-11-18T21:29:42.345Z] 
[2022-11-18T21:29:42.345Z] Gradle Test Run :streams:integrationTest > Gradle 
Test Executor 169 > TableTableJoinIntegrationTest > [caching enabled = false] > 
org.apache.kafka.streams.integration.TableTableJoinIntegrationTest.testOuterOuter[caching
 enabled = false] STARTED
[2022-11-18T21:29:49.187Z] 
[2022-11-18T21:29:49.187Z] Gradle Test Run :streams:integrationTest > Gradle 
Test Executor 169 > TableTableJoinIntegrationTest > [caching enabled = false] > 
org.apache.kafka.streams.integration.TableTableJoinIntegrationTest.testOuterOuter[caching
 enabled = false] PASSED
[2022-11-18T21:29:49.187Z] 
[2022-11-18T21:29:49.187Z] Gradle Test Run :streams:integrationTest > Gradle 
Test Executor 169 > TaskAssignorIntegrationTest > 
shouldProperlyConfigureTheAssignor STARTED
[2022-11-18T21:29:50.255Z] 
[2022-11-18T21:29:50.255Z] Gradle Test Run :streams:integrationTest > Gradle 
Test Executor 169 > TaskAssignorIntegrationTest > 
shouldProperlyConfigureTheAssignor PASSED
[2022-11-18T21:29:51.198Z] 
[2022-11-18T21:29:51.198Z] Gradle Test Run :streams:integrationTest > Gradle 
Test Executor 169 > TaskMetadataIntegrationTest > 
shouldReportCorrectEndOffsetInformation STARTED

[DISCUSS] KIP-890 Server Side Defense

2022-11-18 Thread Justine Olshan
Hey all!

I'd like to start a discussion on my proposal to add some server-side
checks on transactions to avoid hanging transactions. I know this has been
an issue for some time, so I really hope this KIP will be helpful for many
users of EOS.

The KIP includes changes that will be compatible with old clients and
changes to improve performance and correctness on new clients.

Please take a look and leave any comments you may have!

KIP:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-890%3A+Transactions+Server-Side+Defense
JIRA: https://issues.apache.org/jira/browse/KAFKA-14402

Thanks!
Justine


[jira] [Created] (KAFKA-14402) Transactions Server Side Defense

2022-11-18 Thread Justine Olshan (Jira)
Justine Olshan created KAFKA-14402:
--

 Summary: Transactions Server Side Defense
 Key: KAFKA-14402
 URL: https://issues.apache.org/jira/browse/KAFKA-14402
 Project: Kafka
  Issue Type: Task
Reporter: Justine Olshan
Assignee: Justine Olshan


We have seen hanging transactions in Kafka where the last stable offset (LSO) 
does not update, we can’t clean the log (if the topic is compacted), and 
read_committed consumers get stuck.

This can happen when a message gets stuck or delayed due to networking issues 
or a network partition, the transaction aborts, and then the delayed message 
finally comes in. The delayed message case can also violate EOS if the delayed 
message comes in after the next addPartitionsToTxn request comes in. 
Effectively we may see a message from a previous (aborted) transaction become 
part of the next transaction.

Another way hanging transactions can occur is that a client is buggy and may 
somehow try to write to a partition before it adds the partition to the 
transaction. In both of these cases, we want the server to have some control to 
prevent these incorrect records from being written and either causing hanging 
transactions or violating Exactly once semantics (EOS) by including records in 
the wrong transaction.

The best way to avoid this issue is to:
 # *Uniquely identify transactions by bumping the producer epoch after every 
commit/abort marker. That way, each transaction can be identified by (producer 
id, epoch).* 

 # {*}Remove the addPartitionsToTxn call and implicitly just add partitions to 
the transaction on the first produce request during a transaction{*}.

We avoid the late arrival case because the transaction is uniquely identified 
and fenced AND we avoid the buggy client case because we remove the need for 
the client to explicitly add partitions to begin the transaction.

Of course, 1 and 2 require client-side changes, so for older clients, those 
approaches won’t apply.

3. *To cover older clients, we will ensure a transaction is ongoing before we 
write to a transaction. We can do this by querying the transaction coordinator 
and caching the result.*

 

See KIP-890 for more information: ** 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-890%3A+Transactions+Server-Side+Defense



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Jenkins build is unstable: Kafka » Kafka Branch Builder » 3.3 #119

2022-11-18 Thread Apache Jenkins Server
See 




Re: [VOTE] KIP-881: Rack-aware Partition Assignment for Kafka Consumers

2022-11-18 Thread Rajini Sivaram
Hi all,

The vote has passed with 3 binding votes (David Jacot, Jun and myself) and
one non-binding vote (Maulin).

I will update the KIP and submit a PR.

Thanks everyone!

Regards,

Rajini


On Wed, Nov 16, 2022 at 5:13 PM Jun Rao  wrote:

> Hi, Rajini,
>
> Thanks for the KIP. +1
>
> Jun
>
> On Wed, Nov 16, 2022 at 12:10 AM David Jacot 
> wrote:
>
> > +1 (binding). Thanks for the KIP, Rajini!
> >
> > On Tue, Nov 15, 2022 at 9:26 PM Maulin Vasavada
> >  wrote:
> > >
> > > +1 (non-binding).
> > >
> > > Makes sense, Rajini. This would be a great addition.
> > >
> > > On Tue, Nov 15, 2022 at 10:55 AM Rajini Sivaram <
> rajinisiva...@gmail.com
> > >
> > > wrote:
> > >
> > > > Hi all,
> > > >
> > > > I would like to start vote on KIP-881 to support rack-aware partition
> > > > assignment for Kafka consumers:
> > > >
> > > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-881%3A+Rack-aware+Partition+Assignment+for+Kafka+Consumers
> > > > .
> > > >
> > > > Thank you!
> > > >
> > > > Regards,
> > > >
> > > > Rajini
> > > >
> >
>


[jira] [Resolved] (KAFKA-14320) Upgrade Jackson for CVE fix

2022-11-18 Thread Manikumar (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14320?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manikumar resolved KAFKA-14320.
---
Resolution: Fixed

> Upgrade Jackson for CVE fix
> ---
>
> Key: KAFKA-14320
> URL: https://issues.apache.org/jira/browse/KAFKA-14320
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 3.2.0
>Reporter: Javier Li Sam
>Assignee: Thomas Cooper
>Priority: Minor
>  Labels: security
> Fix For: 3.4.0, 3.3.2
>
>
> There is a CVE for Jackson:
> Jackson: [CVE-2020-36518|https://nvd.nist.gov/vuln/detail/CVE-2020-36518] - 
> Fixed by upgrading to 2.14.0+



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[DISCUSS] KIP-888: Batch describe ACLs and describe client quotas

2022-11-18 Thread Mickael Maison
Hi,

I have opened KIP-888 to allow describing ACLs and Quotas in batches:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-888%3A+Batch+describe+ACLs+and+describe+client+quotas

Let me know if you have any feedback or suggestions.

Thanks,
Mickael


Re: [DISCUSS] KIP-884: Add config to configure KafkaClientSupplier in Kafka Streams

2022-11-18 Thread Hao Li
Yes. I've updated the KIP with suggested changes.

Thanks,
Hao

On Thu, Nov 17, 2022 at 7:56 PM Matthias J. Sax  wrote:

> SG.
>
> Can we clarify/document the behavior on the KIP?
>
>
> -Matthias
>
> On 11/15/22 4:19 PM, Hao Li wrote:
> > Thanks for the questions Matthias!
> >
> > 1. I think we can check the config in the constructor which doesn't take
> > the client supplier as a parameter. This one:
> >
> >  public KafkaStreams(final Topology topology,
> >  final Properties props) {
> >  ...
> >  }
> >
> > If users provide a client supplier in another constructor, the config
> won't
> > be checked and the provided one will be used, which is code would
> override
> > the config.
> >
> > 2. I'm fine with `default.client.supplier` and make it the
> > `DefaultKafkaClientSupplier`
> >
> > Thanks,
> > Hao
> >
> >
> > On Tue, Nov 15, 2022 at 4:11 PM Matthias J. Sax 
> wrote:
> >
> >> Thanks for the KIP Hao.
> >>
> >> What is the behavior if users set the config and also pass in a client
> >> supplier into the constructor?
> >>
> >> Following other config/API patterns we use, it seems the best thing
> >> would be if the code would overwrite the config?
> >>
> >> If we do this, should we change the config name to
> >> `default.client.supplier` and not make it `null`, but set the default
> >> supplier we use currently?
> >>
> >> This way, the config and code would behave the same as other configs
> >> like `default.timestamp.extractor` and similar.
> >>
> >>
> >> -Matthias
> >>
> >>
> >>
> >> On 11/15/22 3:35 PM, Hao Li wrote:
> >>> Hi all,
> >>>
> >>> I have submitted KIP-884 to add config to configure KafkaClientSupplier
> >> and
> >>> would like to start a discussion:
> >>>
> >>>
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-884%3A+Add+config+to+configure+KafkaClientSupplier+in+Kafka+Streams
> >>>
> >>>
> >>
> >
> >
>


-- 
Thanks,
Hao


Jenkins build is still unstable: Kafka » Kafka Branch Builder » trunk #1364

2022-11-18 Thread Apache Jenkins Server
See 




Re: [DISCUSS] KIP-864: Add End-To-End Latency Metrics to Connectors

2022-11-18 Thread Jorge Esteban Quilcate Otoya
Thanks Mickael!


On Wed, 9 Nov 2022 at 15:54, Mickael Maison 
wrote:

> Hi Jorge,
>
> Thanks for the KIP, it is a nice improvement.
>
> 1) The per transformation metrics still have a question mark next to
> them in the KIP. Do you want to include them? If so we'll want to tag
> them, we should be able to include the aliases in TransformationChain
> and use them.
>

Yes, I have added the changes on TransformChain that will be needed to add
these metrics.


>
> 2) I see no references to predicates. If we don't want to measure
> their latency, can we say it explicitly?
>

Good question, I haven't considered these. Though as these are materialized
as PredicatedTransformation, they should be covered by these changes.
Adding a note about this.


>
> 3) Should we have sink-record-batch-latency-avg-ms? All other metrics
> have both the maximum and average values.
>
>
Good question. I will remove it and change the record latency from
DEBUG->INFO as it already cover the maximum metric.

Hope it's clearer now, let me know if there any additional feedback.
Thanks!



> Thanks,
> Mickael
>
> On Thu, Oct 20, 2022 at 9:58 PM Jorge Esteban Quilcate Otoya
>  wrote:
> >
> > Thanks, Chris! Great feedback! Please, find my comments below:
> >
> > On Thu, 13 Oct 2022 at 18:52, Chris Egerton 
> wrote:
> >
> > > Hi Jorge,
> > >
> > > Thanks for the KIP. I agree with the overall direction and think this
> would
> > > be a nice improvement to Kafka Connect. Here are my initial thoughts
> on the
> > > details:
> > >
> > > 1. The motivation section outlines the gaps in Kafka Connect's task
> metrics
> > > nicely. I think it'd be useful to include more concrete details on why
> > > these gaps need to be filled in, and in which cases additional metrics
> > > would be helpful. One goal could be to provide enhanced monitoring of
> > > production deployments that allows for cluster administrators to set up
> > > automatic alerts for latency spikes and, if triggered, quickly
> identify the
> > > root cause of those alerts, reducing the time to remediation. Another
> goal
> > > could be to provide more insight to developers or cluster
> administrators
> > > who want to do performance testing on connectors in non-production
> > > environments. It may help guide our decision making process to have a
> > > clearer picture of the goals we're trying to achieve.
> > >
> >
> > Agree. The Motivation section has been updated.
> > Thanks for the examples, I see both of them being covered by the KIP.
> > I see how these could give us a good distinction on whether to position
> > some metrics at INFO or DEBUG level.
> >
> >
> > > 2. If we're trying to address the alert-and-diagnose use case, it'd be
> > > useful to have as much information as possible at INFO level, rather
> than
> > > forcing cluster administrators to possibly reconfigure a connector to
> emit
> > > DEBUG or TRACE level metrics in order to diagnose a potential
> > > production-impacting performance bottleneck. I can see the rationale
> for
> > > emitting per-record metrics that track an average value at DEBUG
> level, but
> > > for per-record metrics that track a maximum value, is there any reason
> not
> > > to provide this information at INFO level?
> > >
> >
> > Agree. Though with Max and Avg metrics being part of the same sensor —
> > where Metric Level is defined — then both metrics get the same level.
> >
> >
> > > 3. I'm also curious about the performance testing suggested by Yash to
> > > gauge the potential impact of this change. Have you been able to do any
> > > testing with your draft implementation yet?
> > >
> >
> > No, not so far.
> > I think it would be valuable to discuss the scope of this testing and
> maybe
> > tackle it
> > in a separate issue as Sensors and Metrics are used all over the place.
> > My initial understanding is that these tests should by placed in the
> > jmh-benchmarks[1].
> > Then, we could target testing Sensors and Metrics, and validate how much
> > overhead
> > is added by having only Max vs Max,Avg(,Min), etc.
> > In the other hand, we could extend this to Transformers or other Connect
> > layers.
> >
> > Here are some pointers to the Sensors and Metrics implementations that
> > could be considered:
> > Path to metric recording:
> > -
> >
> https://github.com/apache/kafka/blob/5cab11cf525f6c06fcf9eb43f7f95ef33fe1cdbb/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java#L195-L199
> > -
> >
> https://github.com/apache/kafka/blob/5cab11cf525f6c06fcf9eb43f7f95ef33fe1cdbb/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java#L230-L244
> >
> > ```
> > // increment all the stats
> > for (StatAndConfig statAndConfig : this.stats) {
> >statAndConfig.stat.record(statAndConfig.config(), value, timeMs);
> > }
> > ```
> >
> > SampledStats:
> > - Avg:
> >
> https://github.com/apache/kafka/blob/068ab9cefae301f3187ea885d645c425955e77d2/clients/src/main/java/org/apache/kafka/common/metrics/stats/Avg.java
> > - Max:
> >
> 

[jira] [Created] (KAFKA-14401) Connector/Tasks reading offsets can get stuck if underneath WorkThread dies

2022-11-18 Thread Sagar Rao (Jira)
Sagar Rao created KAFKA-14401:
-

 Summary: Connector/Tasks reading offsets can get stuck if 
underneath WorkThread dies
 Key: KAFKA-14401
 URL: https://issues.apache.org/jira/browse/KAFKA-14401
 Project: Kafka
  Issue Type: Bug
Reporter: Sagar Rao


When a connector or task tries to read the offsets from the offsets topic, it 
issues `OffsetStorageImpl#offsets` method. This method gets a Future from the 
underneath KafkaBackingStore. KafkaBackingStore invokes 
`KafkaBasedLog#readToEnd` method and passes the Callback. This method 
essentially adds the Callback to a Queue of callbacks that are being managed.

Within KafkaBasedLog, there's a WorkThread which keeps polling over the 
callback queue and executes them and it does this in an infinite loop. However, 
there is an enclosing try/catch block around the while loop. If there's an 
exception thrown which is not caught by any of the other catch blocks, the 
control goes to the outermost catch block and the WorkThread is terminated. 
However, the connectors/tasks are not aware of this and they would keep 
submitting callbacks to KafkaBasedLog with nobody processing them. This can be 
seen in the thread dumps as well:

 

```

"task-thread-connector-0" #6334 prio=5 os_prio=0 cpu=19.36ms elapsed=2092.93s 
tid=0x7f8d9c037000 nid=0x5d00 waiting on condition  [0x7f8dc08cd000]
   java.lang.Thread.State: WAITING (parking)
    at jdk.internal.misc.Unsafe.park(java.base@11.0.15/Native Method)
    - parking to wait for  <0x00070345c9a8> (a 
java.util.concurrent.CountDownLatch$Sync)
    at 
java.util.concurrent.locks.LockSupport.park(java.base@11.0.15/LockSupport.java:194)
    at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(java.base@11.0.15/AbstractQueuedSynchronizer.java:885)
    at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(java.base@11.0.15/AbstractQueuedSynchronizer.java:1039)
    at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(java.base@11.0.15/AbstractQueuedSynchronizer.java:1345)
    at 
java.util.concurrent.CountDownLatch.await(java.base@11.0.15/CountDownLatch.java:232)
    at 
org.apache.kafka.connect.util.ConvertingFutureCallback.get(ConvertingFutureCallback.java:98)
    at 
org.apache.kafka.connect.storage.OffsetStorageReaderImpl.offsets(OffsetStorageReaderImpl.java:101)
    at 
org.apache.kafka.connect.storage.OffsetStorageReaderImpl.offset(OffsetStorageReaderImpl.java:63)

```

 

We need a mechanism to restart the WorkThread if it dies. This could be done in 
the outermost catch block for example.

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] Apache Kafka 3.3.2

2022-11-18 Thread Chris Egerton
Hi all,

Here is the release plan for 3.3.2:
https://cwiki.apache.org/confluence/display/KAFKA/Release+plan+3.3.2

Currently there is one open blocker issue and one open non-blocker,
non-critical issue. I plan to generate the first release candidate once the
blocker issue is resolved, unless other blockers are raised in the meantime.

Cheers,

Chris

On Thu, Nov 17, 2022 at 10:14 AM Bruno Cadonna  wrote:

> Thanks for volunteering!
>
> +1
>
> Best,
> Bruno
>
> On 17.11.22 09:57, Luke Chen wrote:
> > Thanks for volunteering!
> >
> > On Thu, Nov 17, 2022 at 1:07 AM José Armando García Sancio
> >  wrote:
> >
> >> +1. Thanks for volunteering.
> >>
> >> --
> >> -José
> >>
> >
>


[jira] [Resolved] (KAFKA-13586) ConfigExceptions thrown by FileConfigProvider during connector/task startup crash worker

2022-11-18 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13586?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton resolved KAFKA-13586.
---
Resolution: Fixed

> ConfigExceptions thrown by FileConfigProvider during connector/task startup 
> crash worker
> 
>
> Key: KAFKA-13586
> URL: https://issues.apache.org/jira/browse/KAFKA-13586
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.0.0
>Reporter: Greg Harris
>Assignee: Dan Stelljes
>Priority: Minor
> Fix For: 3.4.0
>
>
> If the filesystems of a multi-worker connect cluster are inconsistent, the 
> FileConfigProvider may be able to find a configuration on worker A but not 
> worker B.
> This may lead to worker B experiencing a crash when given a connector/task 
> assignment that was previously validated by worker A.
> Steps to reproduce:
> 1. Configure a two-worker Connect cluster to use the FileConfigProvider
> 2. Place a secret file on worker A (leader) but not worker B (member).
> 3. Create a connector via REST which references the secret file on-disk.
> 4. Observe that the connector creation succeeds
> 5. Wait for a rebalance which assigns either the connector or task to worker 
> B.
> Expected behavior:
> The connector/task is marked FAILED, and the exception is attributed to the 
> FileConfigProvider not able to find the file.
> Actual behavior:
> Worker B prints this log message and shuts down:
> {noformat}
> [Worker clientId=connect-1, groupId=my-connect-cluster] Uncaught exception in 
> herder work thread, exiting: 
> 2org.apache.kafka.common.config.ConfigException: Invalid value 
> java.nio.file.NoSuchFileException: /path/to/secrets/file.properties for 
> configuration Could not read properties from file 
> /path/to/secrets/file.properties
>   at 
> org.apache.kafka.common.config.provider.FileConfigProvider.get(FileConfigProvider.java:92)
>   at 
> org.apache.kafka.common.config.ConfigTransformer.transform(ConfigTransformer.java:103)
>   at 
> org.apache.kafka.connect.runtime.WorkerConfigTransformer.transform(WorkerConfigTransformer.java:58)
>   at 
> org.apache.kafka.connect.runtime.distributed.ClusterConfigState.connectorConfig(ClusterConfigState.java:135)
>   at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.startConnector(DistributedHerder.java:1464)
>   at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.processConnectorConfigUpdates(DistributedHerder.java:638)
>   at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.tick(DistributedHerder.java:457)
>   at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:326)
>   at 
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
>   at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
>   at 
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
>   at 
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630)
>   at java.base/java.lang.Thread.run(Thread.java:831){noformat}
> Having an inconsistent filesystem is not a recommended configuration, but it 
> is preferable in such situations to prevent such a connector configuration 
> error from crashing the worker irrecoverably.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Streams PR review request

2022-11-18 Thread Nick Telford
Hi everyone,

I found a small performance improvement in Kafka Streams state stores,
would someone be able to review/merge it please?
https://github.com/apache/kafka/pull/12842

(I'm not sure if this is the correct forum for requesting a review/merge.
If it isn't, please let me know).

Regards,

Nick


[jira] [Created] (KAFKA-14400) KStream - KStream - LeftJoin() does not call ValueJoiner with null value

2022-11-18 Thread Victor van den Hoven (Jira)
Victor van den Hoven created KAFKA-14400:


 Summary: KStream - KStream - LeftJoin() does not call ValueJoiner 
with null value 
 Key: KAFKA-14400
 URL: https://issues.apache.org/jira/browse/KAFKA-14400
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 3.3.1, 3.1.1
 Environment: Windows PC 
Reporter: Victor van den Hoven
 Attachments: SimpleStreamTopology.java, SimpleStreamTopologyTest.java

In Kafka-streams 3.1.1 :

When using +JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMillis(1))+

the  KStream {*}leftJoin{*}(KStream otherStream, ValueJoiner {_}joiner{_}, 
JoinWindows windows) does not seem to call the _joiner_ with null value when 
join predicate is not satisfied.

 

When using deprecated +JoinWindows.of(Duration.ofMillis(1));+

the  KStream {*}leftJoin{*}(KStream otherStream, ValueJoiner {_}joiner{_}, 
JoinWindows windows) does

all the _joiner_ with null value when join predicate is not satisfied.

 

Attached you can find two files with TopologyTestDriver Unit test to reproduce.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] KIP-852 Optimize calculation of size for log in remote tier

2022-11-18 Thread Alexandre Dupriez
Hi Divij,

Thanks for the KIP. Please find some comments based on what I read on
this thread so far - apologies for the repeats and the late reply.

If I understand correctly, one of the main elements of discussion is
about caching in Kafka versus delegation of providing the remote size
of a topic-partition to the plugin.

A few comments:

100. The size of the “derived metadata” which is managed by the plugin
to represent an rlmMetadata can indeed be close to 1 kB on average
depending on its own internal structure, e.g. the redundancy it
enforces (unfortunately resulting to duplication), additional
information such as checksums and primary and secondary indexable
keys. But indeed, the rlmMetadata is itself a lighter data structure
by a factor of 10. And indeed, instead of caching the “derived
metadata”, only the rlmMetadata could be, which should address the
concern regarding the memory occupancy of the cache.

101. I am not sure I fully understand why we would need to cache the
list of rlmMetadata to retain the remote size of a topic-partition.
Since the leader of a topic-partition is, in non-degenerated cases,
the only actor which can mutate the remote part of the
topic-partition, hence its size, it could in theory only cache the
size of the remote log once it has calculated it? In which case there
would not be any problem regarding the size of the caching strategy.
Did I miss something there?

102. There may be a few challenges to consider with caching:

102.1) As mentioned above, the caching strategy assumes no mutation
outside the lifetime of a leader. While this is true in the normal
course of operation, there could be accidental mutation outside of the
leader and a loss of consistency between the cached state and the
actual remote representation of the log. E.g. split-brain scenarios,
bugs in the plugins, bugs in external systems with mutating access on
the derived metadata. In the worst case, a drift between the cached
size and the actual size could lead to over-deleting remote data which
is a durability risk.

The alternative you propose, by making the plugin the source of truth
w.r.t. to the size of the remote log, can make it easier to avoid
inconsistencies between plugin-managed metadata and the remote log
from the perspective of Kafka. On the other hand, plugin vendors would
have to implement it with the expected efficiency to have it yield
benefits.

102.2) As you mentioned, the caching strategy in Kafka would still
require one iteration over the list of rlmMetadata when the leadership
of a topic-partition is assigned to a broker, while the plugin can
offer alternative constant-time approaches. This calculation cannot be
put on the LeaderAndIsr path and would be performed in the background.
In case of bulk leadership migration, listing the rlmMetadata could a)
result in request bursts to any backend system the plugin may use
[which shouldn’t be a problem for high-throughput data stores but
could have cost implications] b) increase utilisation timespan of the
RLM threads for these calculations potentially leading to transient
starvation of tasks queued for, typically, offloading operations c)
could have a non-marginal CPU footprint on hardware with strict
resource constraints. All these elements could have an impact to some
degree depending on the operational environment.

>From a design perspective, one question is where we want the source of
truth w.r.t. remote log size to be during the lifetime of a leader.
The responsibility of maintaining a consistent representation of the
remote log is shared by Kafka and the plugin. Which system is best
placed to maintain such a state while providing the highest
consistency guarantees is something both Kafka and plugin designers
could help understand better.

Many thanks,
Alexandre


Le jeu. 17 nov. 2022 à 19:27, Jun Rao  a écrit :
>
> Hi, Divij,
>
> Thanks for the reply.
>
> Point #1. Is the average remote segment metadata really 1KB? What's listed
> in the public interface is probably well below 100 bytes.
>
> Point #2. I guess you are assuming that each broker only caches the remote
> segment metadata in memory. An alternative approach is to cache them in
> both memory and local disk. That way, on broker restart, you just need to
> fetch the new remote segments' metadata using the
> listRemoteLogSegments(TopicIdPartition topicIdPartition, int leaderEpoch)
> api. Will that work?
>
> Point #3. Thanks for the explanation and it sounds good.
>
> Thanks,
>
> Jun
>
> On Thu, Nov 17, 2022 at 7:31 AM Divij Vaidya 
> wrote:
>
> > Hi Jun
> >
> > There are three points that I would like to present here:
> >
> > 1. We would require a large cache size to efficiently cache all segment
> > metadata.
> > 2. Linear scan of all metadata at broker startup to populate the cache will
> > be slow and will impact the archival process.
> > 3. There is no other use case where a full scan of segment metadata is
> > required.
> >
> > Let's start by quantifying 1. Here's my 

[jira] [Created] (KAFKA-14399) ERROR Processor got uncaught exception: Java heap space OutOfMemory

2022-11-18 Thread evan.zhao (Jira)
evan.zhao created KAFKA-14399:
-

 Summary: ERROR Processor got uncaught exception: Java heap space 
OutOfMemory
 Key: KAFKA-14399
 URL: https://issues.apache.org/jira/browse/KAFKA-14399
 Project: Kafka
  Issue Type: Bug
 Environment: java version "1.8.0_211"
Java(TM) SE Runtime Environment (build 1.8.0_211-b12)
Java HotSpot(TM) 64-Bit Server VM (build 25.211-b12, mixed mode)
kafka-2.12-2.0.1
Linux bsa4031 3.10.0-1160.el7.x86_64 #1 SMP Mon Oct 19 16:18:59 UTC 2020 x86_64 
x86_64 x86_64 GNU/Linux
Reporter: evan.zhao
 Attachments: heapdump-1668757192589.7z, 
image-2022-11-18-16-28-59-922.png

[^heapdump-1668757192589.7z]

The kafka cluster has three node,Heap 2G,logs:
{code:java}
[2022-11-18 16:20:22,581] INFO [ProducerStateManager partition=isoc_uts-4] 
Writing producer snapshot at offset 2169125716 (kafka.log.ProducerStateManager)
[2022-11-18 16:20:22,582] INFO [Log partition=isoc_uts-4, 
dir=/home/sdb/kafka/kafka_data_dir] Rolled new log segment at offset 2169125716 
in 1 ms. (kafka.log.Log)
[2022-11-18 16:21:00,545] INFO [ProducerStateManager partition=isoc_uts-2] 
Writing producer snapshot at offset 2169172463 (kafka.log.ProducerStateManager)
[2022-11-18 16:21:00,546] INFO [Log partition=isoc_uts-2, 
dir=/home/sdh/kafka/kafka_data_dir] Rolled new log segment at offset 2169172463 
in 1 ms. (kafka.log.Log)
[2022-11-18 16:21:02,427] INFO [Log partition=format_traffic-0, 
dir=/home/sdc/kafka/kafka_data_dir] Deleting segment 1983124706 (kafka.log.Log)
[2022-11-18 16:21:02,427] INFO [Log partition=isoc_uts-14, 
dir=/home/sdc/kafka/kafka_data_dir] Deleting segment 2064627163 (kafka.log.Log)
[2022-11-18 16:21:02,427] INFO [Log partition=format_normal-6, 
dir=/home/sdj/kafka/kafka_data_dir] Deleting segment 112923194 (kafka.log.Log)
[2022-11-18 16:21:02,428] INFO [Log partition=format_traffic-13, 
dir=/home/sdh/kafka/kafka_data_dir] Deleting segment 1983026997 (kafka.log.Log)
[2022-11-18 16:21:02,428] INFO [Log partition=format_traffic-13, 
dir=/home/sdh/kafka/kafka_data_dir] Deleting segment 1983290739 (kafka.log.Log)
[2022-11-18 16:21:02,516] INFO [Log partition=isoc_uts-12, 
dir=/home/sdi/kafka/kafka_data_dir] Deleting segment 2064465335 (kafka.log.Log)
[2022-11-18 16:21:02,537] INFO Deleted log 
/home/sdj/kafka/kafka_data_dir/format_normal-6/000112923194.log.deleted.
 (kafka.log.LogSegment)
[2022-11-18 16:21:02,537] INFO Deleted offset index 
/home/sdj/kafka/kafka_data_dir/format_normal-6/000112923194.index.deleted.
 (kafka.log.LogSegment)
[2022-11-18 16:21:02,537] INFO Deleted time index 
/home/sdj/kafka/kafka_data_dir/format_normal-6/000112923194.timeindex.deleted.
 (kafka.log.LogSegment)
[2022-11-18 16:21:02,551] INFO [Log partition=format_traffic-4, 
dir=/home/sdd/kafka/kafka_data_dir] Deleting segment 1982782808 (kafka.log.Log)
[2022-11-18 16:21:02,570] INFO Deleted log 
/home/sdh/kafka/kafka_data_dir/format_traffic-13/001983026997.log.deleted.
 (kafka.log.LogSegment)
[2022-11-18 16:21:02,570] INFO [Log partition=isoc_uts-10, 
dir=/home/sdg/kafka/kafka_data_dir] Deleting segment 2064709003 (kafka.log.Log)
[2022-11-18 16:21:02,574] INFO Deleted log 
/home/sdi/kafka/kafka_data_dir/isoc_uts-12/002064465335.log.deleted. 
(kafka.log.LogSegment)
[2022-11-18 16:21:02,574] INFO Deleted offset index 
/home/sdi/kafka/kafka_data_dir/isoc_uts-12/002064465335.index.deleted. 
(kafka.log.LogSegment)
[2022-11-18 16:21:02,574] INFO Deleted time index 
/home/sdi/kafka/kafka_data_dir/isoc_uts-12/002064465335.timeindex.deleted.
 (kafka.log.LogSegment)
[2022-11-18 16:21:02,588] INFO [Log partition=format_traffic-1, 
dir=/home/sdf/kafka/kafka_data_dir] Deleting segment 1983054040 (kafka.log.Log)
[2022-11-18 16:21:02,590] INFO Deleted log 
/home/sdh/kafka/kafka_data_dir/format_traffic-13/001983290739.log.deleted.
 (kafka.log.LogSegment)
[2022-11-18 16:21:02,591] INFO Deleted offset index 
/home/sdh/kafka/kafka_data_dir/format_traffic-13/001983026997.index.deleted.
 (kafka.log.LogSegment)
[2022-11-18 16:21:02,591] INFO Deleted offset index 
/home/sdh/kafka/kafka_data_dir/format_traffic-13/001983290739.index.deleted.
 (kafka.log.LogSegment)
[2022-11-18 16:21:02,591] INFO Deleted time index 
/home/sdh/kafka/kafka_data_dir/format_traffic-13/001983026997.timeindex.deleted.
 (kafka.log.LogSegment)
[2022-11-18 16:21:02,591] INFO Deleted time index 
/home/sdh/kafka/kafka_data_dir/format_traffic-13/001983290739.timeindex.deleted.
 (kafka.log.LogSegment)
[2022-11-18 16:21:02,706] INFO Deleted log 
/home/sdc/kafka/kafka_data_dir/format_traffic-0/001983124706.log.deleted.
 (kafka.log.LogSegment)
[2022-11-18 16:21:02,707] INFO [Log partition=isoc_uts-8, 
dir=/home/sde/kafka/kafka_data_dir] Deleting segment 2064673709 (kafka.log.Log)
[2022-11-18 16:21:02,707] INFO [Log partition=format_traffic-7,