[jira] [Created] (KAFKA-13701) Pin background worker threads for certain background work (ex: UnifiedLog.flush())

2022-02-28 Thread Kowshik Prakasam (Jira)
Kowshik Prakasam created KAFKA-13701:


 Summary: Pin background worker threads for certain background work 
(ex: UnifiedLog.flush())
 Key: KAFKA-13701
 URL: https://issues.apache.org/jira/browse/KAFKA-13701
 Project: Kafka
  Issue Type: Improvement
Reporter: Kowshik Prakasam


Certain background work such as UnifiedLog.flush() need not support 
concurrency. Today in the existing KafkaScheduler, we are not able to pin 
background work to specific threads. As a result we are unable to prevent 
concurrent UnifiedLog.flush() calls, so we have to ensure UnifiedLog.flush() 
implementation is thread safe by modifying the code at subtle areas (ex: [PR 
#11814|https://github.com/apache/kafka/pull/11814]). The code will be simpler 
if instead KafkaScheduler (or alike) could support pinning of certain 
background work to specific threads, for example the UnifiedLog.flush() 
operation for the same topic-partition will go to the same thread. This will 
ensure strict ordering of flush() calls, thereby enabling us to write simpler 
code eventually.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (KAFKA-13700) Kafka reporting corrupted block message.

2022-02-28 Thread Uday Bhaskar (Jira)
Uday Bhaskar created KAFKA-13700:


 Summary: Kafka reporting corrupted block message.  
 Key: KAFKA-13700
 URL: https://issues.apache.org/jira/browse/KAFKA-13700
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 2.4.1
 Environment: ubuntu 16.04
kafka 2.4
Reporter: Uday Bhaskar


In our kafka cluster a couple of partitions in __consumer_offsets and 1 regular 
topic getting data corruption issue while replicas trying to read from leader.  
Similar messages for other partitions as well . 
 
[2022-02-28 21:57:29,941] ERROR [ReplicaFetcher replicaId=6, leaderId=1, 
fetcherId=2] Found invalid messages during fetch for partition 
__consumer_offsets-10 offset 108845487 (kafka.server.ReplicaFetcherThread)

org.apache.kafka.common.errors.CorruptRecordException: Record is corrupt 
(stored crc = 1524235439) in topic partition __consumer_offsets-10
 
another topic partitions with same errors
[2022-02-28 22:17:00,235] ERROR [ReplicaFetcher replicaId=6, leaderId=1, 
fetcherId=0] Found invalid messages during fetch for partition 
px-11351-xx-a56c642-0 offset 11746872 (kafka.server.ReplicaFetcherThread)
org.apache.kafka.common.errors.CorruptRecordException: Record is corrupt 
(stored crc = 475179617) in topic partition px-11351-xx-a56c642-0.
 
I have verified all infrastructure, dish network and system for any errors 
found and nothing found. I am not sure why it is happening or how to 
troubleshoot.  
 
Bellow is output of the message from DumpLogSegments , 
 
$ /opt/ns/kafka/bin/kafka-run-class.sh kafka.tools.DumpLogSegments 
--verify-index-only --deep-iteration --files ./11324034.log | grep 
11746872
baseOffset: 11746872 lastOffset: 11746872 count: 1 baseSequence: 50278 
lastSequence: 50278 producerId: 17035 producerEpoch: 0 partitionLeaderEpoch: 8 
isTransactional: false isControl: false position: 252530345 CreateTime: 
1645886348240 size: 647 magic: 2 compresscodec: SNAPPY crc: 475179617 isvalid: 
true
| offset: 11746872 CreateTime: 1645886348240 keysize: 54 valuesize: 637 
sequence: 50278 headerKeys: []



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (KAFKA-13699) ProcessorContext does not expose Stream Time

2022-02-28 Thread Shay Lin (Jira)
Shay Lin created KAFKA-13699:


 Summary: ProcessorContext does not expose Stream Time
 Key: KAFKA-13699
 URL: https://issues.apache.org/jira/browse/KAFKA-13699
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 2.7.0
Reporter: Shay Lin


As a KS developer, I would like to leverage 
[KIP-622|https://cwiki.apache.org/confluence/display/KAFKA/KIP-622%3A+Add+currentSystemTimeMs+and+currentStreamTimeMs+to+ProcessorContext]
 and access stream time in Processor Context.

 

However, the methods currentStreamTimeMs or currentSystemTimeMs is missing from 
for KStreams 2.7.0 (Java).



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


Re: [DISCUSS] KIP-794: Strictly Uniform Sticky Partitioner

2022-02-28 Thread Jun Rao
Hi, Artem,

Thanks for the reply. A few more comments.

1. Since we control the implementation and the usage of DefaultPartitioner,
another way is to instantiate the DefaultPartitioner with a special
constructor, which allows it to have more access to internal information.
Then we could just change the behavior of  DefaultPartitioner such that it
can use the internal infoamtion when choosing the partition. This seems
more intuitive than having DefaultPartitioner return -1 partition.

2. I guess partitioner.sticky.batch.size is introduced because the
effective batch size could be less than batch.size and we want to align
partition switching with the effective batch size. How would a user know
the effective batch size to set partitioner.sticky.batch.size properly? If
the user somehow knows the effective batch size, does setting batch.size to
the effective batch size achieve the same result?

4. Thanks for the explanation. Makes sense to me.

Thanks,

Jun

Thanks,

Jun

On Fri, Feb 25, 2022 at 8:26 PM Artem Livshits
 wrote:

> Hi Jun,
>
> 1. Updated the KIP to add a couple paragraphs about implementation
> necessities in the Proposed Changes section.
>
> 2. Sorry if my reply was confusing, what I meant to say (and I elaborated
> on that in point #3) is that there could be patterns for which 16KB
> wouldn't be the most effective setting, thus it would be good to make it
> configurable.
>
> 4. We could use broker readiness timeout.  But I'm not sure it would
> correctly model the broker load.  The problem is that latency is not an
> accurate measure of throughput, we could have 2 brokers that have equal
> throughput but one has higher latency (so it takes larger batches less
> frequently, but still takes the same load).  Latency-based logic is likely
> to send less data to the broker with higher latency.  Using the queue size
> would adapt to throughput, regardless of latency (which could be just a
> result of deployment topology), so that's the model chosen in the
> proposal.  The partition.availability.timeout.ms logic approaches the
> model
> from a slightly different angle, say we have a requirement to deliver
> messages via brokers that have a certain latency, then
> partition.availability.timeout.ms could be used to tune that.  Latency is
> a
> much more volatile metric than throughput (latency depends on external
> load, on capacity, on deployment topology, on jitter in network, on jitter
> in disk, etc.) and I think it would be best to leave latency-based
> thresholds configurable to tune for the environment.
>
> -Artem
>
> On Wed, Feb 23, 2022 at 11:14 AM Jun Rao  wrote:
>
> > Hi, Artem,
> >
> > Thanks for the reply. A few more comments.
> >
> > 1. Perhaps you could elaborate a bit more on how the producer determines
> > the partition if the partitioner returns -1. This will help understand
> why
> > encapsulating that logic as a partitioner is not clean.
> >
> > 2. I am not sure that I understand this part. If 15.5KB is more
> efficient,
> > could we just set batch.size to 15.5KB?
> >
> > 4. Yes, we could add a switch (or a variant of the partitioner) for
> > enabling this behavior. Also, choosing partitions based on broker
> readiness
> > can be made in a smoother way. For example, we could track the last time
> a
> > broker has drained any batches from the accumulator. We can then select
> > partitions from brokers proportionally to the inverse of that time. This
> > seems smoother than a cutoff based on a
> partition.availability.timeout.ms
> >  threshold.
> >
> > Thanks,
> >
> > Jun
> >
> > On Fri, Feb 18, 2022 at 5:14 PM Artem Livshits
> >  wrote:
> >
> > > Hello Luke, Jun,
> > >
> > > Thank you for your feedback.  I've added the Rejected Alternative
> section
> > > that may clarify some of the questions w.r.t. returning -1.
> > >
> > > 1. I've elaborated on the -1 in the KIP.  The problem is that a
> > significant
> > > part of the logic needs to be in the producer (because it now uses
> > > information about brokers that only the producer knows), so
> encapsulation
> > > of the logic within the default partitioner isn't as clean.   I've
> added
> > > the Rejected Alternative section that documents an attempt to keep the
> > > encapsulation by providing new callbacks to the partitioner.
> > >
> > > 2. The meaning of the partitioner.sticky.batch.size is explained in the
> > > Uniform Sticky Batch Size section.  Basically, we track the amount of
> > bytes
> > > produced to the partition and if it exceeds
> partitioner.sticky.batch.size
> > > then we switch to the next partition.  As far as the reason to make it
> > > different from batch.size, I think Luke answered this with the question
> > #3
> > > -- what if the load pattern is such that 15.5KB would be more efficient
> > > than 16KB?
> > >
> > > 3. I think it's hard to have one size that would fit all patterns.
> E.g.
> > if
> > > the load pattern is such that there is linger and the app fills the
> batch
> > > before linger expires, then 

Build failed in Jenkins: Kafka » Kafka Branch Builder » trunk #724

2022-02-28 Thread Apache Jenkins Server
See 


Changes:


--
[...truncated 368092 lines...]
[2022-02-28T19:34:26.638Z] streams-5: SMOKE-TEST-CLIENT-CLOSED
[2022-02-28T19:34:28.418Z] 
[2022-02-28T19:34:28.418Z] 
org.apache.kafka.streams.integration.SuppressionDurabilityIntegrationTest > 
shouldRecoverBufferAfterShutdown[exactly_once_v2] PASSED
[2022-02-28T19:34:29.436Z] 
[2022-02-28T19:34:29.436Z] 
org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > 
testLeftInner[caching enabled = true] STARTED
[2022-02-28T19:34:31.243Z] 
[2022-02-28T19:34:31.243Z] Deprecated Gradle features were used in this build, 
making it incompatible with Gradle 8.0.
[2022-02-28T19:34:31.243Z] 
[2022-02-28T19:34:31.243Z] You can use '--warning-mode all' to show the 
individual deprecation warnings and determine if they come from your own 
scripts or plugins.
[2022-02-28T19:34:31.243Z] 
[2022-02-28T19:34:31.243Z] See 
https://docs.gradle.org/7.3.3/userguide/command_line_interface.html#sec:command_line_warnings
[2022-02-28T19:34:31.243Z] 
[2022-02-28T19:34:31.243Z] BUILD SUCCESSFUL in 1h 47m 21s
[2022-02-28T19:34:31.243Z] 208 actionable tasks: 113 executed, 95 up-to-date
[2022-02-28T19:34:31.980Z] 
[2022-02-28T19:34:31.981Z] 
org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > 
testOuterInner[caching enabled = false] PASSED
[2022-02-28T19:34:31.981Z] 
[2022-02-28T19:34:31.981Z] 
org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > 
testOuterOuter[caching enabled = false] STARTED
[2022-02-28T19:34:32.170Z] 
[2022-02-28T19:34:32.170Z] See the profiling report at: 
file:///home/jenkins/jenkins-agent/workspace/Kafka_kafka_trunk/build/reports/profile/profile-2022-02-28-17-47-14.html
[2022-02-28T19:34:32.170Z] A fine-grained performance profile is available: use 
the --scan option.
[Pipeline] junit
[2022-02-28T19:34:33.015Z] Recording test results
[2022-02-28T19:34:34.925Z] 
[2022-02-28T19:34:34.925Z] 
org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > 
testLeftInner[caching enabled = true] PASSED
[2022-02-28T19:34:34.925Z] 
[2022-02-28T19:34:34.925Z] 
org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > 
testLeftOuter[caching enabled = true] STARTED
[2022-02-28T19:34:39.368Z] 
[2022-02-28T19:34:39.368Z] 
org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > 
testOuterOuter[caching enabled = false] PASSED
[2022-02-28T19:34:39.368Z] 
[2022-02-28T19:34:39.368Z] 
org.apache.kafka.streams.integration.TaskMetadataIntegrationTest > 
shouldReportCorrectEndOffsetInformation STARTED
[2022-02-28T19:34:40.341Z] 
[2022-02-28T19:34:40.341Z] 
org.apache.kafka.streams.integration.TaskMetadataIntegrationTest > 
shouldReportCorrectEndOffsetInformation PASSED
[2022-02-28T19:34:40.341Z] 
[2022-02-28T19:34:40.341Z] 
org.apache.kafka.streams.integration.TaskMetadataIntegrationTest > 
shouldReportCorrectCommittedOffsetInformation STARTED
[2022-02-28T19:34:41.964Z] 
[2022-02-28T19:34:41.964Z] 
org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > 
testLeftOuter[caching enabled = true] PASSED
[2022-02-28T19:34:41.964Z] 
[2022-02-28T19:34:41.964Z] 
org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > 
testLeftLeft[caching enabled = true] STARTED
[2022-02-28T19:34:42.159Z] 
[2022-02-28T19:34:42.159Z] 
org.apache.kafka.streams.integration.TaskMetadataIntegrationTest > 
shouldReportCorrectCommittedOffsetInformation PASSED
[2022-02-28T19:34:43.132Z] 
[2022-02-28T19:34:43.132Z] 
org.apache.kafka.streams.processor.internals.HandlingSourceTopicDeletionIntegrationTest
 > shouldThrowErrorAfterSourceTopicDeleted STARTED
[2022-02-28T19:34:43.243Z] [Checks API] No suitable checks publisher found.
[Pipeline] echo
[2022-02-28T19:34:43.244Z] Skipping Kafka Streams archetype test for Java 17
[Pipeline] }
[Pipeline] // withEnv
[Pipeline] }
[Pipeline] // withEnv
[Pipeline] }
[Pipeline] // withEnv
[Pipeline] }
[Pipeline] // node
[Pipeline] }
[Pipeline] // timestamps
[Pipeline] }
[Pipeline] // timeout
[Pipeline] }
[Pipeline] // stage
[Pipeline] }
[2022-02-28T19:34:48.043Z] 
[2022-02-28T19:34:48.043Z] 
org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > 
testLeftLeft[caching enabled = true] PASSED
[2022-02-28T19:34:48.043Z] 
[2022-02-28T19:34:48.043Z] 
org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > 
testOuterLeft[caching enabled = true] STARTED
[2022-02-28T19:34:50.345Z] 
[2022-02-28T19:34:50.345Z] 
org.apache.kafka.streams.processor.internals.HandlingSourceTopicDeletionIntegrationTest
 > shouldThrowErrorAfterSourceTopicDeleted PASSED
[2022-02-28T19:34:53.064Z] streams-4: SMOKE-TEST-CLIENT-CLOSED
[2022-02-28T19:34:53.064Z] streams-2: SMOKE-TEST-CLIENT-CLOSED
[2022-02-28T19:34:53.064Z] streams-5: SMOKE-TEST-CLIENT-CLOSED
[2022-02-28T19:34:53.064Z] streams-3: SMOKE-TEST-CLIENT-CLOSED
[2022-02-28T19:34:53.064Z] streams-0: 

Re: [DISCUSS] KIP-821: Connect Transforms support for nested structures

2022-02-28 Thread Jorge Esteban Quilcate Otoya
Thank you, Chris! and sorry for the delayed response.

Please, find my comments below:

On Mon, 14 Feb 2022 at 17:34, Chris Egerton 
wrote:

> Hi Jorge,
>
> Thanks for the KIP! I'd love to see support for nested fields added to the
> out-of-the-box SMTs provided with Connect. Here are my initial thoughts:
>
> 1. I agree that there's a case to be made for expanding HoistField with a
> new config property for identifying a nested, to-be-hoisted field, but the
> example in the KIP doesn't really demonstrate why this would be valuable. I
> think it'd be helpful to expand the example to add other fields in order to
> show how adding nested field support enables users to hoist a nested field
> without dropping other fields from the value. Maybe something like this:
>
> source = nested.val
> field = line
>
> value (before):
> {
> "nested": {
> "val": 42,
> "other val": 96
> }
> }
>
> value (after):
> {
> "nested": {
> "line": {
> "val": 42,
> }
> "other val": 96
> }
> }
>
> 2. Nit: I think "source" is a little strange for the new HoistField
> property name. Maybe "hoisted" or "hoisted.field" would be more
> descriptive?
>
>
About 1. and 2.:
Agree. The example for this SMT is updated and have added the `hoisted`
configuration.


> 3. Is there a reasonable use case for expanding Flatten to be able to
> flatten specific fields? My understanding is that it's mostly useful for
> writing to systems like databases that don't support nested values and
> require everything to be a flat list of key-value pairs. Being able to
> flatten a nested field wouldn't provide any advantage for that use case.
> Are there other cases where it would?
>
> 4. I don't think we should unconditionally change the default delimiter for
> Flatten. It's a backwards-incompatible, breaking change that could cause
> headaches for users. It might be reasonable to change the default value
> dynamically based on whether the user has specified a value for the "field"
> property, but considering the motivation for changing the default is that
> it creates conflicts with the to-be-introduced nested field syntax (which
> could arise with downstream SMTs regardless of whether the user has
> explicitly configured Flatten with the "field" property), I don't know that
> this would be too useful either. I have some thoughts below on how to
> handle possible conflicts between names with dots in their names and dotted
> syntax for nested field references that should hopefully make either change
> unnecessary.
>
>
Fair enough. With the support for nested fields in other SMTs, Flatten
could stay as it is.
This removes the need for (4) changing Flatten config as well.


> 5. I think it's fine to expand ExtractField to support nested notation, but
> it might be worth noting in the rejected alternatives section that this
> isn't strictly necessary since you can replace any single invocation of
> that SMT that uses nested field notation with multiple invocations of it
> that use non-nested notation.
>

Agree. Adding it.


>
> 6. Nit: "RegerRouter" should be "RegexRouter" in the list of SMTs that do
> not require nested structure support.
>
>
Ack. Fixing it.


> 7. It may be rare for dots in field names to occur in the wild (although I
> wouldn't be so certain of this), but unless we want to inflict headaches on
> users of Flatten, I think we're going to have to think about conflicts
> between dotted notation and non-nested fields whose names contain dots. I
> don't think this is actually such a bad thing, though. I agree that dotted
> notation is intuitive and pretty commonplace (in tools like jq, for
> example), so I'd like it if we could stick to it. What about introducing
> escape syntax, using a backslash? That way, users could disambiguate
> between "this.field" (which would refer to the nested field "field" under
> the top-level "this" field), and "this\.field" (which would refer to the
> field named "this.field"). Like with most languages that use the backslash
> for escape sequences, it could also be used to escape itself, in the event
> that a field name contains a backslash. I think this is more intuitive and
> simpler than, e.g., adding a new config property to toggle the delimiter to
> be used when parsing nested field references.
>

I like this approach. Adding to the KIP.


>
> 8. I don't think we can unconditionally turn this feature on. The risk of
> breaking existing pipelines (especially ones that involve, for example, a
> combination of the Flatten and Cast SMTs) is pretty high. I think this
> should be an opt-in feature, at least until the next major release. One way
> we could accomplish this is by introducing a new "field.style" (name
> obviously subject to change) property with values of "plain" (default) and
> "nested". If set to "plain" then the 

[jira] [Resolved] (KAFKA-13542) Utilize the new Consumer#enforceRebalance(reason) API in Streams

2022-02-28 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13542?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna resolved KAFKA-13542.
---
Resolution: Fixed

> Utilize the new Consumer#enforceRebalance(reason) API in Streams
> 
>
> Key: KAFKA-13542
> URL: https://issues.apache.org/jira/browse/KAFKA-13542
> Project: Kafka
>  Issue Type: Task
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Assignee: Hao Li
>Priority: Blocker
> Fix For: 3.2.0
>
>
> KIP-800 is adding a new "reason" parameter to the Consumer#enforceRebalance 
> API, which will be passed in to a new field of the JoinGroup protocol. We 
> invoke this API throughout Streams for various reasons, which are very useful 
> for debugging the cause of rebalancing. Passing in the reason to this new API 
> would make it possible to figure out why a Streams client triggered a 
> rebalance from the broker logs, which are often the only logs available when 
> the client logs cannot be retrieved for whatever reason



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


Re: [DISCUSS] KIP-813 Shared State Stores

2022-02-28 Thread Daan Gertis
Updated the KIP to be more aligned with global state store function names.

If I remember correctly during restore the processor will not be used right? I 
think this might provide issues if your processor is doing a projection of the 
data. Either way, I would not add that into this KIP since it is a specific 
use-case pattern.

Unless there is anything more to add or change, I would propose moving to a 
vote?

Cheers!
D.

From: Matthias J. Sax 
Date: Friday, 18 February 2022 at 03:29
To: dev@kafka.apache.org 
Subject: Re: [DISCUSS] KIP-813 Shared State Stores
Thanks for updating the KIP!

I am wondering if we would need two overloads of `addReadOnlyStateStore`
one w/ and one w/o `TimestampExtractor` argument to effectively make it
an "optional" parameter?

Also wondering if we need to pass in a `String sourceName` and `String
processorName` parameters (similar to `addGlobalStore()`?) instead if
re-using the store name as currently proposed? -- In general I don't
have a strong opinion either way, but it seems to introduce some API
inconsistency if we don't follow the `addGlobalStore()` pattern?


> Another thing we were confronted with was the restoring of state when the 
> actual local storage is gone. For example, we host on K8s with ephemeral 
> pods, so there is no persisted storage between pod restarts. However, the 
> consumer group will be already been at the latest offset, preventing from 
> previous data to be restored within the new pod’s statestore.

We have already code in-place in the runtime to do the right thing for
this case (ie, via DSL source-table changelog optimization). We can
re-use this part. It's nothing we need to discuss on the KIP, but we can
discuss on the PR later.


-Matthias


On 2/17/22 10:09 AM, Guozhang Wang wrote:
> Hi Daan,
>
> I think for the read-only state stores you'd need ot slightly augment the
> checkpointing logic so that it would still write the checkpointed offsets
> while restoring from the changelogs.
>
>
> Guozhang
>
> On Thu, Feb 17, 2022 at 7:02 AM Daan Gertis 
> wrote:
>
>>> Could you add more details about the signature of
>>> `addReadOnlyStateStore()` -- What parameters does it take? Are there any
>>> overloads taking different parameters? The KIP only contains some verbal
>>> description on the "Implementation Plan" section, that is hard to find
>>> and hard to read.
>>>
>>> The KIP mentions a `ProcessorProvider` -- do you mean
>> `ProcessorSupplier`?
>>>
>>> About timestamp synchronization: why do you propose to disable timestamp
>>> synchronization (similar to global state stores)? It seems to be an
>>> unnecessary limitation? -- Given that we could re-use the new method for
>>> source `KTables` (ie, `StreamsBuilder#table()` implemenation), having
>>> timestamp synchronization enabled seems to be important?
>>
>> Yup, will do these updates. I’ll overload the addReadOnlyStateStore to
>> have allow for timestamp synchronization.
>>
>> Another thing we were confronted with was the restoring of state when the
>> actual local storage is gone. For example, we host on K8s with ephemeral
>> pods, so there is no persisted storage between pod restarts. However, the
>> consumer group will be already been at the latest offset, preventing from
>> previous data to be restored within the new pod’s statestore.
>>
>> If I remember correctly, there was some checkpoint logic available when
>> restoring, but we are bypassing that since logging is disabled on the
>> statestore, no?
>>
>> As always, thanks for your insights.
>>
>> Cheers,
>> D.
>>
>>
>> From: Matthias J. Sax 
>> Date: Wednesday, 16 February 2022 at 02:09
>> To: dev@kafka.apache.org 
>> Subject: Re: [DISCUSS] KIP-813 Shared State Stores
>> Thanks for updating the KIP.
>>
>> Could you add more details about the signature of
>> `addReadOnlyStateStore()` -- What parameters does it take? Are there any
>> overloads taking different parameters? The KIP only contains some verbal
>> description on the "Implementation Plan" section, that is hard to find
>> and hard to read.
>>
>> The KIP mentions a `ProcessorProvider` -- do you mean `ProcessorSupplier`?
>>
>> About timestamp synchronization: why do you propose to disable timestamp
>> synchronization (similar to global state stores)? It seems to be an
>> unnecessary limitation? -- Given that we could re-use the new method for
>> source `KTables` (ie, `StreamsBuilder#table()` implemenation), having
>> timestamp synchronization enabled seems to be important?
>>
>>
>> -Matthias
>>
>>
>> On 2/8/22 11:01 PM, Guozhang Wang wrote:
>>> Daan,
>>>
>>> Thanks for the replies, those make sense to me.
>>>
>>> On Tue, Feb 8, 2022 at 7:24 AM Daan Gertis 
>> wrote:
>>>
 I just updated the KIP to reflect the things discussed in this thread.

 As for your questions Guozhang:

> 1) How do we handle if the num.partitions of app A's store changelog is
> different from the num.tasks of app B's sub-topology with that
>> read-only
> store? Or are we going to 

Re: [VOTE] KIP-815: Support max-timestamp in GetOffsetShell

2022-02-28 Thread 邓子明
Thank you David, I have already updated the KIP. 
The PR will be finished soon. 

--
Ziming Deng

> On Feb 28, 2022, at 6:38 PM, David Jacot  wrote:
> 
> Thanks. Could you update the KIP to incorporate
> Mickael's feedback? Ping me once the PR is
> updated, I will finish reviewing it.
> 
> Best,
> David
> 
> On Mon, Feb 28, 2022 at 11:33 AM deng ziming  wrote:
>> 
>> Hi all,
>> 
>> Since it’s a pretty minor KIP, I think we can pass the vote with:
>> - 4 +1(binding) votes (Luke, David, Mickael and John)
>> 
>> Thanks to all that participated in the discussion and voting,
>> 
>> --
>> Ziming Deng
>> 
>> 
>>> On Feb 22, 2022, at 2:56 PM, David Jacot  wrote:
>>> 
>>> For reference, here is the KIP:
>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-815%3A++Support+max-timestamp+in+GetOffsetShell
>>> 
>>> Thanks for the KIP! +1 (binding)
>>> 
>>> Best,
>>> David
>>> 
>>> Le mar. 22 févr. 2022 à 04:03, deng ziming  a
>>> écrit :
>>> 
 Hey all, I'm starting the voting on KIP-815.
 
 This supports a new OffsetSpec in GetOffsetShell so that we can easily
 determine the offset and timestamp of the message with the largest
 timestamp on a partition. This seems a simple change but replaced
 KafkaConsumer with AdminClient in GetOffsetShell.
 
 I recreated this vote thread since I changed the KIP title when
 discussing.
 
 Thanks,
 Ziming Deng
>> 



Re: [VOTE] KIP-815: Support max-timestamp in GetOffsetShell

2022-02-28 Thread David Jacot
Thanks. Could you update the KIP to incorporate
Mickael's feedback? Ping me once the PR is
updated, I will finish reviewing it.

Best,
David

On Mon, Feb 28, 2022 at 11:33 AM deng ziming  wrote:
>
> Hi all,
>
> Since it’s a pretty minor KIP, I think we can pass the vote with:
> - 4 +1(binding) votes (Luke, David, Mickael and John)
>
> Thanks to all that participated in the discussion and voting,
>
> --
> Ziming Deng
>
>
> > On Feb 22, 2022, at 2:56 PM, David Jacot  wrote:
> >
> > For reference, here is the KIP:
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-815%3A++Support+max-timestamp+in+GetOffsetShell
> >
> > Thanks for the KIP! +1 (binding)
> >
> > Best,
> > David
> >
> > Le mar. 22 févr. 2022 à 04:03, deng ziming  a
> > écrit :
> >
> >> Hey all, I'm starting the voting on KIP-815.
> >>
> >> This supports a new OffsetSpec in GetOffsetShell so that we can easily
> >> determine the offset and timestamp of the message with the largest
> >> timestamp on a partition. This seems a simple change but replaced
> >> KafkaConsumer with AdminClient in GetOffsetShell.
> >>
> >> I recreated this vote thread since I changed the KIP title when
> >> discussing.
> >>
> >> Thanks,
> >> Ziming Deng
>


Re: [VOTE] KIP-815: Support max-timestamp in GetOffsetShell

2022-02-28 Thread deng ziming
Hi all,

Since it’s a pretty minor KIP, I think we can pass the vote with:
- 4 +1(binding) votes (Luke, David, Mickael and John)

Thanks to all that participated in the discussion and voting,

-- 
Ziming Deng


> On Feb 22, 2022, at 2:56 PM, David Jacot  wrote:
> 
> For reference, here is the KIP:
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-815%3A++Support+max-timestamp+in+GetOffsetShell
> 
> Thanks for the KIP! +1 (binding)
> 
> Best,
> David
> 
> Le mar. 22 févr. 2022 à 04:03, deng ziming  a
> écrit :
> 
>> Hey all, I'm starting the voting on KIP-815.
>> 
>> This supports a new OffsetSpec in GetOffsetShell so that we can easily
>> determine the offset and timestamp of the message with the largest
>> timestamp on a partition. This seems a simple change but replaced
>> KafkaConsumer with AdminClient in GetOffsetShell.
>> 
>> I recreated this vote thread since I changed the KIP title when
>> discussing.
>> 
>> Thanks,
>> Ziming Deng