[jira] [Resolved] (KAFKA-15041) Source Connector auto topic creation fails when topic is deleted and brokers don't support auto topic creation

2024-05-22 Thread Sagar Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15041?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sagar Rao resolved KAFKA-15041.
---
Resolution: Won't Fix

For now, setting the config `producer.override.max.block.ms` at a connector 
config level  or `producer.max.block.ms` at a worker config level to a lower 
value should fix this value. The problem is that the default value for the 
above config is[ set to Long.MAX_VALUE 
|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java#L820]in
 the configs and when topics are deleted manually, there's really no signal 
that is received to indicate the same. We could add some heuristics like 
checking if a topic is present or not periodically and refreshing the cache, or 
check the source topic metrics to see if the records are just being buffered 
and not being sent but that's outside the scope of runtime.

> Source Connector auto topic creation fails when topic is deleted and brokers 
> don't support auto topic creation
> --
>
> Key: KAFKA-15041
> URL: https://issues.apache.org/jira/browse/KAFKA-15041
> Project: Kafka
>  Issue Type: Bug
>  Components: connect
>Reporter: Sagar Rao
>Assignee: Sagar Rao
>Priority: Major
>
> [KIP-158|https://cwiki.apache.org/confluence/display/KAFKA/KIP-158%3A+Kafka+Connect+should+allow+source+connectors+to+set+topic-specific+settings+for+new+topics]
>   allows the source connectors to create topics even when the broker doesn't 
> allow to do so. It does so by checking for every record if a topic needs to 
> be created 
> [https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java#L500.]
>  To not always keep checking for topic presence via admin topics, it also 
> maintains a cache of the topics that it has created and doesn't create those 
> anymore. This helps to create topics when brokers don't support automatic 
> topic creation.
> However, lets say the topic gets created initially and later on gets deleted 
> while the connector is still running and the brokers don't support automatic 
> topic creation. For such cases, the connector has cached the topic it has 
> already created and wouldn't recreate it because the cache never updates and 
> since the broker doesn't support topic creation, the logs would just be full 
> of messages like 
>  
> {code:java}
> Error while fetching metadata with correlation id 3260 : 
> {connect-test=UNKNOWN_TOPIC_OR_PARTITION}{code}
>  
> This can become a problem on environments where brokers don't allow topic 
> creation. We need a way to refresh the topics cache for such cases.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [VOTE] KIP-910: Update Source offsets for Source Connectors without producing records

2024-04-25 Thread Sagar
Hey All,

Bumping the vote thread after a long time!

Thanks!
Sagar.

On Fri, Feb 2, 2024 at 4:24 PM Sagar  wrote:

> Thanks Yash!
>
> I am hoping to have this released in 3.8 so it would be good to get the
> remaining 2 votes.
>
> Thanks!
> Sagar.
>
>
> On Tue, Jan 30, 2024 at 3:18 PM Yash Mayya  wrote:
>
>> Hi Sagar,
>>
>> Thanks for the KIP and apologies for the extremely long delay here! I
>> think
>> we could do with some wordsmithing on the Javadoc for the new
>> `SourceTask::updateOffsets` method but that can be taken care of in the
>> PR.
>>
>> +1 (binding)
>>
>> Thanks,
>> Yash
>>
>> On Wed, Nov 15, 2023 at 11:43 PM Sagar  wrote:
>>
>> > Hey all,
>> >
>> > Bumping this vote thread again after quite a while.
>> >
>> > Thanks!
>> > Sagar.
>> >
>> > On Wed, Sep 6, 2023 at 3:58 PM Sagar  wrote:
>> >
>> > > Hi All,
>> > >
>> > > Based on the latest discussion thread, it appears as if all open
>> > questions
>> > > have been answered.
>> > >
>> > > Hopefully now we are in a state where we can close out on the Voting
>> > > process.
>> > >
>> > > Thanks everyone for the great feedback.
>> > >
>> > > Thanks!
>> > > Sagar.
>> > >
>> > > On Fri, Aug 18, 2023 at 9:00 AM Sagar 
>> wrote:
>> > >
>> > >> Hi All,
>> > >>
>> > >> Bumping the voting thread again.
>> > >>
>> > >> Thanks!
>> > >> Sagar.
>> > >>
>> > >> On Wed, Aug 2, 2023 at 4:43 PM Sagar 
>> wrote:
>> > >>
>> > >>> Attaching the KIP link for reference:
>> > >>>
>> >
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-910%3A+Update+Source+offsets+for+Source+Connectors+without+producing+records
>> > >>>
>> > >>> Thanks!
>> > >>> Sagar.
>> > >>>
>> > >>> On Wed, Aug 2, 2023 at 4:37 PM Sagar 
>> > wrote:
>> > >>>
>> > >>>> Hi All,
>> > >>>>
>> > >>>> Calling a Vote on KIP-910 [1]. I feel we have converged to a
>> > reasonable
>> > >>>> design. Ofcourse I am open to any feedback/suggestions and would
>> > address
>> > >>>> them.
>> > >>>>
>> > >>>> Thanks!
>> > >>>> Sagar.
>> > >>>>
>> > >>>
>> >
>>
>


[jira] [Created] (KAFKA-16604) Deprecate ConfigDef.ConfigKey constructor from public APIs

2024-04-23 Thread Sagar Rao (Jira)
Sagar Rao created KAFKA-16604:
-

 Summary: Deprecate ConfigDef.ConfigKey constructor from public APIs
 Key: KAFKA-16604
 URL: https://issues.apache.org/jira/browse/KAFKA-16604
 Project: Kafka
  Issue Type: Improvement
Reporter: Sagar Rao


Currently, one can create ConfigKey by either invoking the public constructor 
directly and passing it to a ConfigDef object or by invoking the a bunch of 
define methods. The 2 ways can get confusing at times. Moreover, it could lead 
to errors as was noticed in KAFKA-16592

We should ideally have only 1 way exposed to the users which IMO should be to 
create the objects only through the exposed define methods. This ticket is 
about marking the public constructor of ConfigKey as Deprecated first and then 
making it private eventually.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16592) ConfigKey constructor update can break clients using it

2024-04-20 Thread Sagar Rao (Jira)
Sagar Rao created KAFKA-16592:
-

 Summary: ConfigKey constructor update can break clients using it
 Key: KAFKA-16592
 URL: https://issues.apache.org/jira/browse/KAFKA-16592
 Project: Kafka
  Issue Type: Bug
Reporter: Sagar Rao
Assignee: Sagar Rao


In [KAFKA-14957|https://issues.apache.org/jira/browse/KAFKA-14957], the 
constructor of ConfigDef.ConfigKey was updated to add a new argument called 
{*}alternativeString{*}. As part of the PR, new *define* methods were also 
added which makes sense. However, since the constructor of 
*ConfigDef.ConfigKey* itself can be used directly by other clients which import 
the dependency, this can break all clients who were using the older constructor 
w/o the *alternativeString* argument. 

I bumped into this when I was testing 
the[kafka-connect-redis|[https://github.com/jcustenborder/kafka-connect-redis/tree/master]]
 connector. It starts up correctly against the official 3.7 release, but fails 
with the following error when run against a 3.8 snapshot

 

 
{code:java}
Caused by: java.lang.NoSuchMethodError: 
org.apache.kafka.common.config.ConfigDef$ConfigKey.(Ljava/lang/String;Lorg/apache/kafka/common/config/ConfigDef$Type;Ljava/lang/Object;Lorg/apache/kafka/common/config/ConfigDef$Validator;Lorg/apache/kafka/common/config/ConfigDef$Importance;Ljava/lang/String;Ljava/lang/String;ILorg/apache/kafka/common/config/ConfigDef$Width;Ljava/lang/String;Ljava/util/List;Lorg/apache/kafka/common/config/ConfigDef$Recommender;Z)V
 at 
com.github.jcustenborder.kafka.connect.utils.config.ConfigKeyBuilder.build(ConfigKeyBuilder.java:62)
 at 
com.github.jcustenborder.kafka.connect.redis.RedisConnectorConfig.config(RedisConnectorConfig.java:133)
 at 
com.github.jcustenborder.kafka.connect.redis.RedisSinkConnectorConfig.config(RedisSinkConnectorConfig.java:46)
 at 
com.github.jcustenborder.kafka.connect.redis.RedisSinkConnector.config(RedisSinkConnector.java:73)
 at 
org.apache.kafka.connect.runtime.AbstractHerder.validateConnectorConfig(AbstractHerder.java:538)
 at 
org.apache.kafka.connect.runtime.AbstractHerder.lambda$validateConnectorConfig$3(AbstractHerder.java:412)
 at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
 at java.util.concurrent.FutureTask.run(FutureTask.java:266)
 at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
 at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
 ... 1 more
 
{code}
 

The reason for that is that the connector uses another library called 
connect-utils which invokes the old constructor 
[directly|https://github.com/jcustenborder/connect-utils/blob/master/connect-utils/src/main/java/com/github/jcustenborder/kafka/connect/utils/config/ConfigKeyBuilder.java#L62]

It is not expected for connector invocations to fail across versions so this 
would cause confusion.

We could argue that why is the constructor being invoked directly instead of 
using the *define* method, but there might be other clients doing the same. We 
should add the old constructor back which calls the new one by setting the 
*alternativeString* to null.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [ANNOUNCE] New committer: Christo Lolov

2024-03-26 Thread Sagar
Congrats Christo!

Sagar.

On Tue, 26 Mar 2024 at 6:04 PM, Jorge Esteban Quilcate Otoya <
quilcate.jo...@gmail.com> wrote:

> Congrats Christo!!
>
> On Tue 26. Mar 2024 at 14.33, Apoorv Mittal 
> wrote:
>
> > Congrats Christo!
> >
> > Regards,
> > Apoorv Mittal
> > +44 7721681581
> >
> >
> > On Tue, Mar 26, 2024 at 12:05 PM Luke Chen  wrote:
> >
> > > Hi, Everyone,
> > >
> > > The PMC of Apache Kafka is pleased to announce a new Kafka committer:
> > > Christo Lolov.
> > >
> > > Christo has been a Kafka contributor since 2021. He has made over 50
> > > commits. He authored KIP-902, KIP-963, and KIP-1005, as well as many
> > tiered
> > > storage related tasks. He also co-drives the migration from EasyMock to
> > > Mockito and from Junit 4 to JUnit 5.
> > >
> > > Congratulations, Christo!
> > >
> > > Thanks,
> > > Luke (on behalf of the Apache Kafka PMC)
> > >
> >
>


Re: [VOTE] KIP-910: Update Source offsets for Source Connectors without producing records

2024-02-02 Thread Sagar
Thanks Yash!

I am hoping to have this released in 3.8 so it would be good to get the
remaining 2 votes.

Thanks!
Sagar.


On Tue, Jan 30, 2024 at 3:18 PM Yash Mayya  wrote:

> Hi Sagar,
>
> Thanks for the KIP and apologies for the extremely long delay here! I think
> we could do with some wordsmithing on the Javadoc for the new
> `SourceTask::updateOffsets` method but that can be taken care of in the PR.
>
> +1 (binding)
>
> Thanks,
> Yash
>
> On Wed, Nov 15, 2023 at 11:43 PM Sagar  wrote:
>
> > Hey all,
> >
> > Bumping this vote thread again after quite a while.
> >
> > Thanks!
> > Sagar.
> >
> > On Wed, Sep 6, 2023 at 3:58 PM Sagar  wrote:
> >
> > > Hi All,
> > >
> > > Based on the latest discussion thread, it appears as if all open
> > questions
> > > have been answered.
> > >
> > > Hopefully now we are in a state where we can close out on the Voting
> > > process.
> > >
> > > Thanks everyone for the great feedback.
> > >
> > > Thanks!
> > > Sagar.
> > >
> > > On Fri, Aug 18, 2023 at 9:00 AM Sagar 
> wrote:
> > >
> > >> Hi All,
> > >>
> > >> Bumping the voting thread again.
> > >>
> > >> Thanks!
> > >> Sagar.
> > >>
> > >> On Wed, Aug 2, 2023 at 4:43 PM Sagar 
> wrote:
> > >>
> > >>> Attaching the KIP link for reference:
> > >>>
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-910%3A+Update+Source+offsets+for+Source+Connectors+without+producing+records
> > >>>
> > >>> Thanks!
> > >>> Sagar.
> > >>>
> > >>> On Wed, Aug 2, 2023 at 4:37 PM Sagar 
> > wrote:
> > >>>
> > >>>> Hi All,
> > >>>>
> > >>>> Calling a Vote on KIP-910 [1]. I feel we have converged to a
> > reasonable
> > >>>> design. Ofcourse I am open to any feedback/suggestions and would
> > address
> > >>>> them.
> > >>>>
> > >>>> Thanks!
> > >>>> Sagar.
> > >>>>
> > >>>
> >
>


[jira] [Created] (KAFKA-16197) Connect Worker poll timeout prints Consumer poll timeout specific warnings.

2024-01-26 Thread Sagar Rao (Jira)
Sagar Rao created KAFKA-16197:
-

 Summary: Connect Worker poll timeout prints Consumer poll timeout 
specific warnings.
 Key: KAFKA-16197
 URL: https://issues.apache.org/jira/browse/KAFKA-16197
 Project: Kafka
  Issue Type: Bug
Reporter: Sagar Rao
Assignee: Sagar Rao


When a worker's poll timeout expires in Connect, the log lines that we see are:

{noformat}
consumer poll timeout has expired. This means the time between subsequent calls 
to poll() was longer than the configured max.poll.interval.ms, which typically 
implies that the poll loop is spending too much time processing messages. You 
can address this either by increasing max.poll.interval.ms or by reducing the 
maximum size of batches returned in poll() with max.poll.records.

{noformat}

and the reason for leaving the group is 


{noformat}
Member XX sending LeaveGroup request to coordinator XX due to consumer poll 
timeout has expired.
{noformat}

which is specific to Consumers and not to Connect workers. The log line above 
in specially misleading because the config `max.poll.interval.ms` is not 
configurable for a Connect worker and could make someone believe that the logs 
are being written for Sink Connectors and not for Connect worker. Ideally, we 
should print something specific to Connect.




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16056) Worker poll timeout expiry can lead to Duplicate task assignments.

2023-12-27 Thread Sagar Rao (Jira)
Sagar Rao created KAFKA-16056:
-

 Summary: Worker poll timeout expiry can lead to Duplicate task 
assignments.
 Key: KAFKA-16056
 URL: https://issues.apache.org/jira/browse/KAFKA-16056
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Reporter: Sagar Rao
Assignee: Sagar Rao


When a poll timeout expiry happens for a worker, it triggers a rebalance 
because it leaves the group pro-actively. Under normal scenarios, this leaving 
the group would trigger a scheduled rebalance delay. However, one thing to note 
is that, the worker which left the group temporarily, doesn't give up it's 
assignments and whatever tasks were running on it would remain as is. When the 
scheduled rebalance delay elapses, it would just get back it's assignments but 
given that there won't be any revocations, it should all work out fine.

But there is an edge case here. Let's assume that a scheduled rebalance delay 
was already active on a group and just before a follow up rebalance due to 
scheduled rebalance elapsing, one of the worker's poll timeout expires. At this 
point, a rebalance is imminent and the leader would track the assignments of 
the transiently departed worker as lost 
[here|https://github.com/apache/kafka/blob/d582d5aff517879b150bc2739bad99df07e15e2b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java#L255]
 . When 
[handleLostAssignments|https://github.com/apache/kafka/blob/d582d5aff517879b150bc2739bad99df07e15e2b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java#L441]
 gets triggered, because the scheduledRebalance delay isn't reset yet and if 
[this|https://github.com/apache/kafka/blob/d582d5aff517879b150bc2739bad99df07e15e2b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java#L473]
 condition passes, the leader would assume that it needs to reassign all the 
lost assignments which it will.

But because, the worker for which the poll timeout expired, doesn't rescind 
it's assignments we would end up noticing duplicate assignments- one set on the 
original worker which was already running the tasks and connectors and another 
set on the remaining group of workers which got the redistributed work. This 
could lead to task failures if connector has been written in a way which 
expects no duplicate tasks running across a cluster.

Also, this edge case can be encountered more frequently if the 
`rebalance.timeout.ms` config is set to a lower value. 

One of the approaches could be to do something similar to 
https://issues.apache.org/jira/browse/KAFKA-9184 where upon coordinator 
discovery failure, the worker gives up all it's assignments and joins with an 
empty assignment. We could do something similar in this case as well.




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [ANNOUNCE] New Kafka PMC Member: Divij Vaidya

2023-12-27 Thread Sagar
Congrats Divij! Absolutely well deserved !

Thanks!
Sagar.

On Wed, Dec 27, 2023 at 5:15 PM Luke Chen  wrote:

> Hi, Everyone,
>
> Divij has been a Kafka committer since June, 2023. He has remained very
> active and instructive in the community since becoming a committer. It's my
> pleasure to announce that Divij is now a member of Kafka PMC.
>
> Congratulations Divij!
>
> Luke
> on behalf of Apache Kafka PMC
>


Re: [VOTE] KIP-910: Update Source offsets for Source Connectors without producing records

2023-11-15 Thread Sagar
Hey all,

Bumping this vote thread again after quite a while.

Thanks!
Sagar.

On Wed, Sep 6, 2023 at 3:58 PM Sagar  wrote:

> Hi All,
>
> Based on the latest discussion thread, it appears as if all open questions
> have been answered.
>
> Hopefully now we are in a state where we can close out on the Voting
> process.
>
> Thanks everyone for the great feedback.
>
> Thanks!
> Sagar.
>
> On Fri, Aug 18, 2023 at 9:00 AM Sagar  wrote:
>
>> Hi All,
>>
>> Bumping the voting thread again.
>>
>> Thanks!
>> Sagar.
>>
>> On Wed, Aug 2, 2023 at 4:43 PM Sagar  wrote:
>>
>>> Attaching the KIP link for reference:
>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-910%3A+Update+Source+offsets+for+Source+Connectors+without+producing+records
>>>
>>> Thanks!
>>> Sagar.
>>>
>>> On Wed, Aug 2, 2023 at 4:37 PM Sagar  wrote:
>>>
>>>> Hi All,
>>>>
>>>> Calling a Vote on KIP-910 [1]. I feel we have converged to a reasonable
>>>> design. Ofcourse I am open to any feedback/suggestions and would address
>>>> them.
>>>>
>>>> Thanks!
>>>> Sagar.
>>>>
>>>


Re: [DISCUSS] Should we continue to merge without a green build? No!

2023-11-13 Thread Sagar
Hi Divij,

I think this proposal overall makes sense. My only nit sort of a suggestion
is that let's also consider a label called newbie++[1] for flaky tests if
we are considering adding newbie as a label. I think some of the flaky
tests need familiarity with the codebase or the test setup so as a first
time contributor, it might be difficult. newbie++ IMO covers that aspect.

[1]
https://issues.apache.org/jira/browse/KAFKA-15406?jql=project%20%3D%20KAFKA%20AND%20labels%20%3D%20%22newbie%2B%2B%22

Let me know what you think.

Thanks!
Sagar.

On Mon, Nov 13, 2023 at 9:11 PM Divij Vaidya 
wrote:

> >  Please, do it.
> We can use specific labels to effectively filter those tickets.
>
> We already have a label and a way to discover flaky tests. They are tagged
> with the label "flaky-test" [1]. There is also a label "newbie" [2] meant
> for folks who are new to Apache Kafka code base.
> My suggestion is to send a broader email to the community (since many will
> miss details in this thread) and call for action for committers to
> volunteer as "shepherds" for these tickets. I can send one out once we have
> some consensus wrt next steps in this thread.
>
>
> [1]
>
> https://issues.apache.org/jira/browse/KAFKA-13421?jql=project%20%3D%20KAFKA%20AND%20status%20in%20(Open%2C%20%22In%20Progress%22%2C%20Reopened%2C%20%22Patch%20Available%22)%20AND%20resolution%20%3D%20Unresolved%20AND%20labels%20%3D%20flaky-test%20ORDER%20BY%20priority%20DESC%2C%20updated%20DESC
>
>
> [2] https://kafka.apache.org/contributing -> Finding a project to work on
>
>
> Divij Vaidya
>
>
>
> On Mon, Nov 13, 2023 at 4:24 PM Николай Ижиков 
> wrote:
>
> >
> > > To kickstart this effort, we can publish a list of such tickets in the
> > community and assign one or more committers the role of a «shepherd" for
> > each ticket.
> >
> > Please, do it.
> > We can use specific label to effectively filter those tickets.
> >
> > > 13 нояб. 2023 г., в 15:16, Divij Vaidya 
> > написал(а):
> > >
> > > Thanks for bringing this up David.
> > >
> > > My primary concern revolves around the possibility that the currently
> > > disabled tests may remain inactive indefinitely. We currently have
> > > unresolved JIRA tickets for flaky tests that have been pending for an
> > > extended period. I am inclined to support the idea of disabling these
> > tests
> > > temporarily and merging changes only when the build is successful,
> > provided
> > > there is a clear plan for re-enabling them in the future.
> > >
> > > To address this issue, I propose the following measures:
> > >
> > > 1\ Foster a supportive environment for new contributors within the
> > > community, encouraging them to take on tickets associated with flaky
> > tests.
> > > This initiative would require individuals familiar with the relevant
> code
> > > to offer guidance to those undertaking these tasks. Committers should
> > > prioritize reviewing and addressing these tickets within their
> available
> > > bandwidth. To kickstart this effort, we can publish a list of such
> > tickets
> > > in the community and assign one or more committers the role of a
> > "shepherd"
> > > for each ticket.
> > >
> > > 2\ Implement a policy to block minor version releases until the Release
> > > Manager (RM) is satisfied that the disabled tests do not result in gaps
> > in
> > > our testing coverage. The RM may rely on Subject Matter Experts (SMEs)
> in
> > > the specific code areas to provide assurance before giving the green
> > light
> > > for a release.
> > >
> > > 3\ Set a community-wide goal for 2024 to achieve a stable Continuous
> > > Integration (CI) system. This goal should encompass projects such as
> > > refining our test suite to eliminate flakiness and addressing
> > > infrastructure issues if necessary. By publishing this goal, we create
> a
> > > shared vision for the community in 2024, fostering alignment on our
> > > objectives. This alignment will aid in prioritizing tasks for community
> > > members and guide reviewers in allocating their bandwidth effectively.
> > >
> > > --
> > > Divij Vaidya
> > >
> > >
> > >
> > > On Sun, Nov 12, 2023 at 2:58 AM Justine Olshan
> > 
> > > wrote:
> > >
> > >> I will say that I have also seen tests that seem to be more flaky
> > >> intermittently. It may be ok for some time and suddenly the CI is
> &

Re: [DISCUSS] Should we continue to merge without a green build? No!

2023-11-11 Thread Sagar
Hi David,

Thanks for bringing this point and also for creating the revert PRs. I
think there has been an effort in the community to fix a lot of flakey
tests(like around MirrorMaker). I also agree that we shouldn't merge PRs
without green builds and look to ignore flaky tests. For example, I did a
quick search for some of the common (and sometimes longstanding flakey
tests) and this is a brief list. Some of them have JIRAs associated with
them as well =>

1)
org.apache.kafka.trogdor.coordinator.CoordinatorTest.testTaskRequestWithOldStartMsGetsUpdated
=> https://issues.apache.org/jira/browse/KAFKA-8115

2) kafka.server.DynamicBrokerReconfigurationTest.testThreadPoolResize =>
https://issues.apache.org/jira/browse/KAFKA-15421

3)
org.apache.kafka.controller.QuorumControllerTest.testFenceMultipleBrokers
(Couldn't find JIRA).

4)
integration.kafka.server.FetchFromFollowerIntegrationTest.testRackAwareRangeAssignor
 => https://issues.apache.org/jira/browse/KAFKA-15020

5) EosIntegrationTest => https://issues.apache.org/jira/browse/KAFKA-15690

The only thing is where do we draw the line of disabling a genuine flakey
test v/s looking to fix it. I feel that could get confusing at times
especially if the flakey test involved is on an unrelated part of code
(like a flaky Connect test on Group Coordinator or Streams).

Thanks!
Sagar.


On Sat, Nov 11, 2023 at 3:31 PM David Jacot 
wrote:

> Hi all,
>
> The state of our CI worries me a lot. Just this week, we merged two PRs
> with compilation errors and one PR introducing persistent failures. This
> really hurts the quality and the velocity of the project and it basically
> defeats the purpose of having a CI because we tend to ignore it nowadays.
>
> Should we continue to merge without a green build? No! We should not so I
> propose to prevent merging a pull request without a green build. This is a
> really simple and bold move that will prevent us from introducing
> regressions and will improve the overall health of the project. At the same
> time, I think that we should disable all the known flaky tests, raise jiras
> for them, find an owner for each of them, and fix them.
>
> What do you think?
>
> Best,
> David
>


Re: [DISCUSS] KIP-987: Connect Static Assignments

2023-10-27 Thread Sagar
Hi Greg,

Thanks for the response.

This is detailed in the KIP two sentences earlier: "If the
> connect.protocol is set to static, each worker will send it's
> static.connectors and static.tasks to the coordinator during
> rebalances."


Yes I saw that. My point was if this is going to need a change in the
Connect Protocol format, then those changes should be outlined in the KIP.


Yes. Arbitrary here just means that the assignment is not
> influenced by the static assignment.


Okay, the usage of the word arbitrary is a bit confusing but I would leave
it to that.

Also, when I read Mickael's suggestion above about using a taint/selector
mechanism, it seems cleaner. The main reason IMO is that we are letting
connectors and tasks define where and how they want to be placed and we let
the runtime decide it(aided with a ResourceManager if needed) . With the
approach proposed in the KIP, anytime any changes are needed to the
placements, we need to modify worker configs and add/edit/remove
connectors/tasks on it. This hasn't generally been the case and in my
experience worker configs are not really modified that often. Moreover,
administrators will need to also choose where to place the connectors/tasks
based on the load on the cluster which again is an added overhead.

However you brought up a good point in the KIP

However, because tasks within a single connector can be heterogeneous, it
> is necessary to attach a different selector/affinity declaration to each
> task. But because tasks are dynamically created by the connector, either
> every task's selector must be added manually after defaulting to a wildcard
> affinity, or the Connect REST API would need a way to template multiple
> task affinities (parameterized by task-id).


as well as here above. If we consider the idea proposed by Mickael above,
and consider just the isolated config value for selector, which means that
any task being spun up for this connector would be run on a dedicated
worker. If we use this, then we might see a proliferation of workers
happening on the cluster so if we consider adding another worker level
config called max.tasks or something similar and if we couple the 2, in
effect we can achieve the same thing as the KIP propoeses (obviously I
might be over-simplifying it a bit here, but I hope the idea is clear). The
config values suggested above should cover all cases, but I still do agree
to the fact that setting the selector config value at a task level is a
challenge.

Thanks!
Sagar.


On Fri, Oct 20, 2023 at 10:43 PM Greg Harris 
wrote:

> Hey Hector,
>
> That's a cool idea for the ConnectAssignor plugin.
>
> While this proposal could be viewed as an "assignor" problem that a
> custom assignor could solve, it's really about providing additional
> context to the assignor which isn't present currently. This lack of
> context would prevent a custom assignor from solving the resource
> utilization problem adequately.
>
> Thanks!
> Greg
>
> On Fri, Oct 20, 2023 at 9:58 AM Greg Harris  wrote:
> >
> > Mickael,
> >
> > Thank you for discussing that rejected alternative, I was almost going
> > to propose it.
> >
> > > I still find the proposed mechanism limited and I'm not sure it really
> addressed the pain points I've experienced with Connect.
> >
> > I think that this KIP on its own is insufficient to solve the
> > operational difficulties of Connect, and an external management layer
> > is necessary. In this KIP i'm trying to find the minimum viable
> > abstraction to allow a management layer to make decisions about
> > placement, knowing that the abstraction may be non-ergonomic for
> > "direct users" without a management layer mediating.
> >
> > > Connectors may also change the assignment of tasks at runtime so for
> example it task-2 is really busy (because it's assigned a partition with
> high throughput), this may not be true in 10 minutes as this partition is
> now assigned to task-1
> >
> > I think this is similar to a concern (#5) that Tom raised, and a
> > limitation of the "task index" abstraction. I don't know if there is a
> > way for us to manage this sort of fine-grained dynamic utilization of
> > tasks. Once we start a task, it has some static resources assigned to
> > it (via the JVM). If you notice the resource requirements expand, it
> > will need to stop in order to move JVMs and change its resource
> > allocation, and stopping the task may cause assignments to change and
> > the workload to be distributed elsewhere.
> >
> > > I think the "hints" where to place a connector/tasks should come from
> the connector configuration as it's the engineers building a pipeline that
> knows best the requirements (in

Re: Security for Kafka

2023-10-19 Thread Sagar
Hey Walchester,

There's a confluent community slack Workspace having a #security channel
where you can post your question. Also, have you filed a bug in AK JIRA
which can also help you get the traction of some of the community members
who have expertise in this area.

Thanks!
Sagar.

On Thu, Oct 19, 2023 at 11:52 AM Walchester Gaw 
wrote:

> Hello.
>
> Is there something like a community page for Kafka where I can reach out to
> the community where hopefully someone with a similar setup can help?
>
> Thanks,
> Chester
>
>
> On Thu, Oct 12, 2023 at 10:48 AM Walchester Gaw 
> wrote:
>
> > Hello.
> >
> > I am trying to implement Quorum TLS by following the instructions in
> > https://zookeeper.apache.org/doc/r3.5.7/zookeeperAdmin.html#Quorum+TLS,
> > but I keep on encountering the following errors after doing the second
> > rolling restart where sslQuorum set to true.
> >
> >- [2023-10-11 05:46:03,250] WARN Cannot open channel to 3 at election
> >address /xxx.xx.xx.xxx: (
> >org.apache.zookeeper.server.quorum.QuorumCnxManager)
> >javax.net.ssl.SSLHandshakeException: Received fatal alert:
> >handshake_failure
> >- [2023-10-11 05:47:12,513] WARN Closing connection to /xxx.xx.xx.
> >xxx: (org.apache.zookeeper.server.NettyServerCnxn)
> >java.io.IOException: ZK down
> >
> > Our current Cluster setup consists of 3 Linux servers (Amazon EC2
> > instances) which contains one Zookeeper and Broker for each server. I
> have
> > tried using Private IP DNS name and Public IPv4 DNS as the alias and
> > distinguished name when generating the self signed certificate for each
> of
> > the servers. For the generation of CA key and CA certificate, I used the
> > Private IP DNS name and Public IPv4 DNS of one the servers as the common
> > name respectively. Do note I am generating all keystores/truststore in
> just
> > one server (this server's IP is indicated in CA key and CA cert) and
> > distributing them accordingly.
> >
> > I made sure that all ZK is up and running when I am getting the ZK down
> > issue and I am getting that error for all three ZKs. I can also confirm
> > that the file path indicated in the zookeeper.properties where the
> keystore
> > and truststore is located is correct.
> >
> > Can someone assist regarding this? What am I missing here?  Let me know
> if
> > you need more information.
> >
> > I am also unsure if there is something like a community page for Kafka
> > where I can reach out to the community where hopefully someone with a
> > similar setup can help.
> >
> > Thanks,
> > Chester
> >
>


Re: [DISCUSS] KIP-987: Connect Static Assignments

2023-10-18 Thread Sagar
Hi Greg,

Thanks for the KIP. I have a few questions/comments:

1) You mentioned that during a rebalance if all the members of the cluster
support the static protocol, then it would use the steps outlined in the
Proposed Section to do the assignments. In those steps, the leader
identifies the static/wildcard jobs. It is not clear to me how the leader
makes that distinction? Are we going to enhance the embedded protocol to
also write the static jobs that the worker owns as part of it's assignment?
As of today, the workers just write only the owned/revoked connectors/tasks
in case of incremental and above and only owned connectors/tasks in case of
eager.

2) Could you also elaborate this statement a bit:

> A cluster with both static and wildcard workers can use wildcard workers
> as backups for disaster recovery.


I see in the Strimzi proposal you have explained this scenario (the case
where shared workers are empty as pods aren't created yet etc IIUC), but
reading the KIP doesn't make it too obvious.

3) A nit comment but we should probably call out that there is no need to
assign the connector and task to the same worker when configuring them. I
guess for new users of Connect this could be a source of confusion. WDYT?

4) Staying on the above, I also wanted to know why do we need to set
static.connectors? As such, connectors generally aren't resource intensive.
Can we not let connectors be assigned using the same algorithm as today and
let only tasks be pinned to workers?

5) In the proposed section you also mentioned that

If static assignments are not specified, or at least one worker in the
> cluster is not using the static  protocol, they are ignored and the
> worker may receive an arbitrary assignment.


The arbitrary assignment is driven by the minimum supported protocol right
(sessioned/eager)?

6) Just for my understanding, because now the onus is on the connect admin
to specify which connector/task should be running where, can there be a
situation where there could be an imbalance in terms of number of
connectors/tasks running on workers (due to some oversight on part of admin
or some bug in the tooling used to generate worker configs if any) ? There
could be an imbalance even today as well but the protocol tries to balance
as much as it can in an automated fashion.

7) Lastly, in a cluster where all workers support the static protocol,
there is still no guarantee that all connectors/tasks would be static. In
such a case, what happens when one of the workers is restarted or is being
rolled? Would the assignment eventually be sticky when the worker comes
back? Does this new protocol also abide by the scheduled rebalance delays?
Maybe this has been explained in the KIP but it wasn't clear to me when I
read it.

Thanks!
Sagar.

On Tue, Oct 10, 2023 at 11:22 PM Greg Harris 
wrote:

> Hi Mickael,
>
> I'm not Chris but I hope I can still respond to your questions :)
>
> 1a. This system behaves best when connectors and tasks are known in
> advance, but I don't think that is a requirement. I clarified that the
> worker configurations are permissive of non-existent jobs, which
> allows for bootstrapping an empty cluster and concurrent worker & job
> provisioning.
> 1b. I think the wildcard suggestion is similar to the rejected
> alternative "implement a more complex worker selector...". While we
> could add that sort of functionality, I think it is more difficult to
> express and reason about. For example, with the current configuration,
> users can ensure that at most one job is assigned to a JVM. If a user
> wanted to use wildcards with the same constraint, those wildcards
> would need to express "at most one of ".
> 1c. The javadoc doesn't say what happens to the additional tasks, but
> I think we could start enforcing it if it makes the
> fixed-static-assignment use-case more reliable. I have only ever seen
> additional tasks emitted due to rather serious bugs in connector
> implementations.
>
> 2. Yes, I've added that information to the proposal. I used the `GET
> /connectors/{connector}` API as it doesn't list the task configs, but
> it performs the same function.
>
> 3. Do you mean the resource limits and requests? This is the rejected
> alternative "Model per-job resource constraints and measure resource
> utilization within a single JVM", let me know if the reasoning there
> is not convincing.
>
> 4. I have not explored rack awareness for Connect in detail. I expect
> that rack awareness would require some compromise on the "least
> disruption" property of incremental cooperative rebalancing for shared
> JVMs. We could discuss that in a future KIP.
> As for this proposal, because the management layer is responsible for
> placing workers and placing tasks on those workers, it would also be
> responsible for implementing rack-awa

Re: [ANNOUNCE] New committer: Lucas Brutschy

2023-09-21 Thread Sagar
Congrats Lucas !

On Thu, 21 Sep 2023 at 9:15 PM, Bruno Cadonna  wrote:

> Hi all,
>
> The PMC of Apache Kafka is pleased to announce a new Kafka committer
> Lucas Brutschy.
>
> Lucas' major contributions are around Kafka Streams.
>
> Lucas' significantly contributed to the state updater
> (https://issues.apache.org/jira/browse/KAFKA-10199) and he drives the
> implementation of the new threading model for Kafka Streams
> (https://issues.apache.org/jira/browse/KAFKA-15326).
>
> Lucas' contributions to KIP discussions and PR reviews are very thoughtful.
>
> Congratulations, Lucas!
>
> Thanks,
>
> Bruno (on behalf of the Apache Kafka PMC)
>


Re: [ANNOUNCE] New committer: Yash Mayya

2023-09-21 Thread Sagar
Congrats Yash !
On Thu, 21 Sep 2023 at 9:38 PM, Ashwin  wrote:

> Awesome ! Congratulations Yash !!
>
> On Thu, Sep 21, 2023 at 9:25 PM Edoardo Comar 
> wrote:
>
> > Congratulations Yash
> >
> > On Thu, 21 Sept 2023 at 16:28, Bruno Cadonna  wrote:
> > >
> > > Hi all,
> > >
> > > The PMC of Apache Kafka is pleased to announce a new Kafka committer
> > > Yash Mayya.
> > >
> > > Yash's major contributions are around Connect.
> > >
> > > Yash authored the following KIPs:
> > >
> > > KIP-793: Allow sink connectors to be used with topic-mutating SMTs
> > > KIP-882: Kafka Connect REST API configuration validation timeout
> > > improvements
> > > KIP-970: Deprecate and remove Connect's redundant task configurations
> > > endpoint
> > > KIP-980: Allow creating connectors in a stopped state
> > >
> > > Overall, Yash is known for insightful and friendly input to discussions
> > > and his high quality contributions.
> > >
> > > Congratulations, Yash!
> > >
> > > Thanks,
> > >
> > > Bruno (on behalf of the Apache Kafka PMC)
> >
>


Re: [VOTE] 3.6.0 RC0

2023-09-19 Thread Sagar
Hey Satish,

I have commented on KAFKA-15473. I think the changes in the PR look fine. I
also feel this need not be a release blocker given there are other
possibilities in which duplicates can manifest on the response of the end
point in question (albeit we can potentially see more in number due to
this).

Would like to hear others' thoughts as well.

Thanks!
Sagar.


On Tue, Sep 19, 2023 at 3:14 PM Satish Duggana 
wrote:

> Hi Greg,
> Thanks for reporting the KafkaConnect issue. I replied to this issue
> on "Apache Kafka 3.6.0 release" email thread and on
> https://issues.apache.org/jira/browse/KAFKA-15473.
>
> I would like to hear other KafkaConnect experts' opinions on whether
> this issue is really a release blocker.
>
> Thanks,
> Satish.
>
>
>
>
> On Tue, 19 Sept 2023 at 00:27, Greg Harris 
> wrote:
> >
> > Hey all,
> >
> > I noticed this regression in RC0:
> > https://issues.apache.org/jira/browse/KAFKA-15473
> > I've mentioned it in the release thread, and I'm working on a fix.
> >
> > I'm -1 (non-binding) until we determine if this regression is a blocker.
> >
> > Thanks!
> >
> > On Mon, Sep 18, 2023 at 10:56 AM Josep Prat 
> wrote:
> > >
> > > Hi Satish,
> > > Thanks for running the release.
> > >
> > > I ran the following validation steps:
> > > - Built from source with Java 11 and Scala 2.13
> > > - Verified Signatures and hashes of the artifacts generated
> > > - Navigated through Javadoc including links to JDK classes
> > > - Run the unit tests
> > > - Run integration tests
> > > - Run the quickstart in KRaft and Zookeeper mode
> > > - Checked License-binary against libs and matched them
> > >
> > > I +1 this release (non-binding)
> > >
> > > Best,
> > >
> > > On Mon, Sep 18, 2023 at 6:02 PM David Arthur  wrote:
> > >
> > > > Hey Satish, thanks for getting the RC underway!
> > > >
> > > > I noticed that the PR for the 3.6 blog post is merged. This makes
> the blog
> > > > post live on the Kafka website https://kafka.apache.org/blog.html.
> The
> > > > blog
> > > > post (along with other public announcements) is usually the last
> thing we
> > > > do as part of the release. I think we should probably take this down
> until
> > > > we're done with the release, otherwise users stumbling on this post
> could
> > > > get confused. It also contains some broken links.
> > > >
> > > > Thanks!
> > > > David
> > > >
> > > > On Sun, Sep 17, 2023 at 1:31 PM Satish Duggana <
> satish.dugg...@gmail.com>
> > > > wrote:
> > > >
> > > > > Hello Kafka users, developers and client-developers,
> > > > >
> > > > > This is the first candidate for the release of Apache Kafka 3.6.0.
> Some
> > > > of
> > > > > the major features include:
> > > > >
> > > > > * KIP-405 : Kafka Tiered Storage
> > > > > * KIP-868 : KRaft Metadata Transactions
> > > > > * KIP-875: First-class offsets support in Kafka Connect
> > > > > * KIP-898: Modernize Connect plugin discovery
> > > > > * KIP-938: Add more metrics for measuring KRaft performance
> > > > > * KIP-902: Upgrade Zookeeper to 3.8.1
> > > > > * KIP-917: Additional custom metadata for remote log segment
> > > > >
> > > > > Release notes for the 3.6.0 release:
> > > > >
> https://home.apache.org/~satishd/kafka-3.6.0-rc0/RELEASE_NOTES.html
> > > > >
> > > > > *** Please download, test and vote by Wednesday, September 21,
> 12pm PT
> > > > >
> > > > > Kafka's KEYS file containing PGP keys we use to sign the release:
> > > > > https://kafka.apache.org/KEYS
> > > > >
> > > > > * Release artifacts to be voted upon (source and binary):
> > > > > https://home.apache.org/~satishd/kafka-3.6.0-rc0/
> > > > >
> > > > > * Maven artifacts to be voted upon:
> > > > >
> https://repository.apache.org/content/groups/staging/org/apache/kafka/
> > > > >
> > > > > * Javadoc:
> > > > > https://home.apache.org/~satishd/kafka-3.6.0-rc0/javadoc/
> > > > >
> > > > > * Tag to be voted upon (off 3.6 branch) is the 3.6.0 tag:
> > > > > https://github.com/apache/kafka/releases/tag/3.6.0-rc0
> &g

Re: KIP-976: Cluster-wide dynamic log adjustment for Kafka Connect

2023-09-10 Thread Sagar
Thanks Chris,

The changes look good to me.

1) Regarding the suggestion to reduce the key sizes, the only intent was to
make it easier to read. But then I missed the fact that the
"org.apache.kafka.connect" isn't always going to be the prefix for these
keys. We can live with whatever we have

2) Hmm, I think it just felt like a useful extension to the current
mechanism of changing log levels per worker. One place where it might come
in handy, and which can't be handled by any of the options listed in Future
Work sections, is if somebody wants to observe the rebalance related
activities per worker on a subset of them using finer grained logs. I am
not sure if it's a strong enough motivation but as I said it just felt like
a useful extension. I will leave it to you if you want to add it or not (I
am ok either way).

Thanks!
Sagar.

On Thu, Sep 7, 2023 at 9:26 PM Chris Egerton 
wrote:

> Hi all,
>
> Thanks again for the reviews!
>
>
> Sagar:
>
> > The updated definition of last_modified looks good to me. As a
> continuation
> to point number 2, could we also mention that this could be used to get
> insights into the propagation of the cluster wide log level updates. It is
> implicit but probably better to add it I feel.
>
> Sure, done. Added to the end of the "Config topic records" section: "There
> may be some delay between when a REST request with scope=cluster is
> received and when all workers have read the corresponding record from the
> config topic. The last modified timestamp (details above) can serve as a
> rudimentary tool to provide insight into the propagation of a cluster-wide
> log level adjustment."
>
> > Yeah I would lean on the side of calling it out explicitly. Since the
> behaviour is similar to the current dynamically set log levels (i.e
> resetting to the log4j config files levels) so we can call it out stating
> that similarity just for completeness sake. It would be useful info for
> new/medium level users reading the KIP considering worker restarts is not
> uncommon.
>
> Alright, did this too. Added near the end of the "Config topic records"
> section: "Restarting a worker will cause it to discard all cluster-wide
> dynamic log level adjustments, and revert to the levels specified in its
> Log4j configuration. This mirrors the current behavior with per-worker
> dynamic log level adjustments."
>
> > I had a nit level suggestion but not sure if it makes sense but would
> still
> call it out. The entire namespace name in the config records key (along
> with logger-cluster prefix) seems to be a bit too verbose. My first thought
> was to not have the prefix org.apache.kafka.connect in the keys considering
> it is the root namespace. But since logging can be enabled at a root level,
> can we just use initials like (o.a.k.c) which is also a standard practice.
> Let me know what you think?
>
> Considering these records aren't meant to be user-visible, there doesn't
> seem to be a pressing need to reduce their key sizes (though I'll happily
> admit that to human eyes, the format is a bit ugly). IMO the increase in
> implementation complexity isn't quite worth it, especially considering
> there are plenty of logging namespaces that won't begin with
> "org.apache.kafka.connect" (likely including all third-party connector
> code), like Yash mentions. Is there a motivation for this suggestion that
> I'm missing?
>
> > Lastly, I was also thinking if we could introduce a new parameter which
> takes a subset of worker ids and enables logging for them in one go. But
> this is already achievable by invoking scope=worker endpoint n times to
> reflect on n workers so maybe not a necessary change. But this could be
> useful on a large cluster. Do you think this is worth listing in the Future
> Work section? It's not important so can be ignored as well.
>
> Hmmm... I think I'd rather leave this out for now because I'm just not
> certain enough it'd be useful. The one advantage I can think of is
> targeting specific workers that are behind a load balancer, but being able
> to identify those workers would be a challenge in that scenario anyways.
> Besides that, are there any cases that couldn't be addressed more
> holistically by targeting based on connector/connector type, like Yash
> asks?
>
>
> Ashwin:
>
> Glad we're on the same page RE request forwarding and integration vs.
> system tests! Let me know if anything else comes up that you'd like to
> discuss.
>
>
> Yash:
>
> Glad that it makes sense to keep these changes ephemeral. I'm not quite
> confident enough to put persistent updates in the "Future work" section but
> have a sneaking suspicion that this isn't the last we'll see of that

Re: [VOTE] KIP-910: Update Source offsets for Source Connectors without producing records

2023-09-06 Thread Sagar
Hi All,

Based on the latest discussion thread, it appears as if all open questions
have been answered.

Hopefully now we are in a state where we can close out on the Voting
process.

Thanks everyone for the great feedback.

Thanks!
Sagar.

On Fri, Aug 18, 2023 at 9:00 AM Sagar  wrote:

> Hi All,
>
> Bumping the voting thread again.
>
> Thanks!
> Sagar.
>
> On Wed, Aug 2, 2023 at 4:43 PM Sagar  wrote:
>
>> Attaching the KIP link for reference:
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-910%3A+Update+Source+offsets+for+Source+Connectors+without+producing+records
>>
>> Thanks!
>> Sagar.
>>
>> On Wed, Aug 2, 2023 at 4:37 PM Sagar  wrote:
>>
>>> Hi All,
>>>
>>> Calling a Vote on KIP-910 [1]. I feel we have converged to a reasonable
>>> design. Ofcourse I am open to any feedback/suggestions and would address
>>> them.
>>>
>>> Thanks!
>>> Sagar.
>>>
>>


Re: [DISCUSS] KIP-910: Update Source offsets for Source Connectors without producing records

2023-09-06 Thread Sagar
Hey All,

I had an offline discussion with Yash on this and while so far there didn't
seem to be a pressing need to introduce the delete offsets mechanism via
the updateOffsets method, Yash had brought up an interesting point. Point
being that if we don't introduce the deletion of offsets mechanism in this
KIP but do it in a Future version, then connector developers and users
would get different behaviour on tombstone offsets based on the runtime
version being run. This could lead to confusion.

Considering this, I have updated the KIP to also allow deleting offsets via
tombstone records. Thanks Yash for closing out on this one!

Hopefully all open questions have now been addressed.

Thanks!
Sagar.

On Tue, Aug 29, 2023 at 3:33 PM Yash Mayya  wrote:

> Hi Sagar,
>
> > The size of offsets topic can be controlled by
> > setting appropriate topic retention values and
> > that is a standard practice in Kafka
>
> Kafka Connect enforces the `cleanup.policy` configuration for the offsets
> topic to be "compact" only (references - [1], [2]), so the topic retention
> related configurations won't be relevant right?
>
> > Deleting offsets is not something which should
> > be done very frequently and should be handled
> > with care
>
> > Agreed this involves some toil but it's not something
> > that should be done on a very regular basis.
>
> I'm not sure I follow how we came to this conclusion, could you please
> expand on the pitfalls of allowing connector plugins to wipe the offsets
> for source partitions that they no longer care about?
>
> > The usecases you highlighted are edge cases at
> > best. As I have been saying, if it is needed we can
> > always add it in the future but that doesn't look like
> > a problem we need to solve upfront.
>
> I agree that these cases might not be too common, but I'm just trying to
> understand the reasoning behind preventing this use case since null offsets
> don't require any separate handling from the Connect runtime's point of
> view (and wouldn't need any additional implementation work in this KIP).
>
> Thanks,
> Yash
>
> [1] - https://kafka.apache.org/documentation/#connect_running
> [2] -
>
> https://github.com/apache/kafka/blob/0912ca27e2a229d2ebe02f4d1dabc40ed5fab0bb/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaTopicBasedBackingStore.java#L47
>
> On Mon, Aug 28, 2023 at 1:38 PM Sagar  wrote:
>
> > Hey Yash,
> >
> > Thanks for your further comments. Here are my responses:
> >
> > 1) Deleting offsets via updateOffsets.
> >
> > Hmm, I am not sure this is really necessary to be part of the KIP at this
> > point, and we can always add it later on if needed. I say this for the
> > following reasons:
> >
> >
> >- The size of offsets topic can be controlled by setting appropriate
> >topic retention values and that is a standard practice in Kafka. Sure
> > it's
> >not always possible to get the right values but as I said it is a
> > standard
> >practice. For Connect specifically, there is also a KIP (KIP-943
> ><
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=255073470
> > >)
> >which is trying to solve the problem of a large connect-offsets topic.
> > So,
> >if that is really the motivation, then these are being addressed
> > separately
> >anyways.
> >- Deleting offsets is not something which should be done very
> frequently
> >and should be handled with care. That is why KIP-875's mechanism to
> have
> >users/ cluster admin do this externally is the right thing to do.
> Agreed
> >this involves some toil but it's not something that should be done on
> a
> >very regular basis.
> >- There is no stopping connector implementations to send tombstone
> >records as offsets but in practice how many connectors actually do it?
> >Maybe 1 or 2 from what we discussed.
> >- The usecases you highlighted are edge cases at best. As I have been
> >saying, if it is needed we can always add it in the future but that
> > doesn't
> >look like a problem we need to solve upfront.
> >
> > Due to these reasons, I don't think this is a point that we need to
> stress
> > so much upon. I say this because offsets topic's purging/clean up can be
> > handled either via standard Kafka techniques (point #1 above) or via
> > Connect runtime techniques (Pt #2  above). IMO the problem we are trying
> to
> > solve via this KIP has been solved by connectors using techniques which
> > have been termed as having higher maintenance cost

Re: KIP-976: Cluster-wide dynamic log adjustment for Kafka Connect

2023-09-05 Thread Sagar
Hey Chris,

Thanks for making the updates.

The updated definition of last_modified looks good to me. As a continuation
to point number 2, could we also mention that this could be used to get
insights into the propagation of the cluster wide log level updates. It is
implicit but probably better to add it I feel.

Regarding

It's a little indirect on the front of worker restart behavior, so let me
> know if that especially should be fleshed out a bit more (possibly by
> calling it out in the "Config topic records" section).


Yeah I would lean on the side of calling it out explicitly. Since the
behaviour is similar to the current dynamically set log levels (i.e
resetting to the log4j config files levels) so we can call it out stating
that similarity just for completeness sake. It would be useful info for
new/medium level users reading the KIP considering worker restarts is not
uncommon.


Thanks, I'm glad that this seems reasonable. I've tentatively added this to
> the rejected alternatives section, but am happy to continue the
> conversation if anyone wants to explore that possibility further.


+1

I had a nit level suggestion but not sure if it makes sense but would still
call it out. The entire namespace name in the config records key (along
with logger-cluster prefix) seems to be a bit too verbose. My first thought
was to not have the prefix org.apache.kafka.connect in the keys considering
it is the root namespace. But since logging can be enabled at a root level,
can we just use initials like (o.a.k.c) which is also a standard practice.
Let me know what you think?

Lastly, I was also thinking if we could introduce a new parameter which
takes a subset of worker ids and enables logging for them in one go. But
this is already achievable by invoking scope=worker endpoint n times to
reflect on n workers so maybe not a necessary change. But this could be
useful on a large cluster. Do you think this is worth listing in the Future
Work section? It's not important so can be ignored as well.

Thanks!
Sagar.


On Wed, Sep 6, 2023 at 12:08 AM Chris Egerton 
wrote:

> Hi Sagar,
>
> Thanks for your thoughts! Responses inline:
>
> > 1) Considering that you have now clarified here that last_modified field
> would be stored in-memory, it is not mentioned in the KIP. Although that's
> something that's understandable, it wasn't apparent when reading the KIP.
> Probably, we could mention it? Also, what happens if a worker restarts? In
> that case, since the log level update message would be pre-dating the
> startup of the worker, it would be ignored? We should probably mention that
> behaviour as well IMO.
>
> I've tweaked the second paragraph of the "Last modified timestamp" section
> to try to clarify this without getting too verbose: "Modification times
> will be tracked in-memory and determined by when they are applied by the
> worker, as opposed to when they are requested by the user or persisted to
> the config topic (details below). If no modifications to the namespace have
> been made since the worker finished startup, they will be null." Does that
> feel sufficient? It's a little indirect on the front of worker restart
> behavior, so let me know if that especially should be fleshed out a bit
> more (possibly by calling it out in the "Config topic records" section).
>
> > 2) Staying on the last_modified field, it's utility is also not too clear
> to me. Can you add some examples of how it can be useful for debugging etc?
>
> The cluster-wide API relaxes the consistency guarantees of the existing
> worker-local API. With the latter, users can be certain that once they
> receive a 2xx response, the logging level on that worker has been updated.
> With the former, users know that the logging level will eventually be
> updated, but insight into the propagation of that update across the cluster
> is limited. Although it's a little primitive, I'm hoping that the last
> modified timestamp will be enough to help shed some light on this process.
> We could also explore exposing the provenance of logging levels (which maps
> fairly cleanly to the scope of the request from which they originated), but
> that feels like overkill at the moment.
>
> > 3) In the test plan, should we also add a test when the scope parameter
> passed is non-null and neither worker nor cluster? In this case the
> behaviour should be similar to the default case.
>
> Good call! Done.
>
> > 4) I had the same question as Yash regarding persistent cluster-wide
> logging level. I think you have explained it well and we can skip it for
> now.
>
> Thanks, I'm glad that this seems reasonable. I've tentatively added this to
> the rejected alternatives section, but am happy to continue the
> conversation if anyone wants to explore that possib

Re: [VOTE] KIP-953: partition method to be overloaded to accept headers as well.

2023-09-05 Thread Sagar
Hey Jack,

The way I interpreted this thread, it seems like there's more alignment on
the DTO based approach. I spent some time on the suggestion that Ismael had
regarding the usage of ProducerRecord. Did you get a chance to look at the
reply I had posted and whether that makes sense? Also, checking out the
AdminClient APIs examples provided by Ismael will give you more context.
Let me know what you think.

Thanks!
Sagar.

On Thu, Aug 31, 2023 at 12:49 PM Jack Tomy  wrote:

> Hey everyone,
>
> As I see devs favouring the current style of implementation, and that is
> inline with existing code. I would like to go ahead with the same approach
> as mentioned in the KIP.
> Can I get a few more votes so that I can take the KIP forward.
>
> Thanks
>
>
>
> On Sun, Aug 27, 2023 at 1:38 PM Sagar  wrote:
>
> > Hi Ismael,
> >
> > Thanks for pointing us towards the direction of a DTO based approach. The
> > AdminClient examples seem very neat and extensible in that sense.
> > Personally, I was trying to think only along the lines of how the current
> > Partitioner interface has been designed, i.e having all requisite
> > parameters as separate arguments (Topic, Key, Value etc).
> >
> > Regarding this question of yours:
> >
> > A more concrete question: did we consider having the method `partition`
> > > take `ProduceRecord` as one of its parameters and `Cluster` as the
> other?
> >
> >
> > No, I don't think in the discussion thread it was brought up and as I
> said
> > it appears that could be due to an attempt to keep the new method's
> > signature similar to the existing one within Partitioner. If I understood
> > the intent of the question correctly, are you trying to hint here that
> > `ProducerRecord` already contains all the arguments that the `partition`
> > method accepts and also has a `headers` field within it. So, instead of
> > adding another method for the `headers` field, why not create a new
> method
> > taking ProducerRecord directly?
> >
> > If my understanding is correct, then it seems like a very clean way of
> > adding support for `headers`. Anyways, the partition method within
> > KafkaProducer already takes a ProducerRecord as an argument so that makes
> > it easier. Keeping that in mind, should this new method's (which takes a
> > ProducerRecord as an input) default implementation invoke the existing
> > method ? One challenge I see there is that the existing partition method
> > expects serialized keys and values while ProducerRecord doesn't have
> access
> > to those (It directly operates on K, V).
> >
> > Thanks!
> > Sagar.
> >
> >
> > On Sun, Aug 27, 2023 at 8:51 AM Ismael Juma  wrote:
> >
> > > A more concrete question: did we consider having the method `partition`
> > > take `ProduceRecord` as one of its parameters and `Cluster` as the
> other?
> > >
> > > Ismael
> > >
> > > On Sat, Aug 26, 2023 at 12:50 PM Greg Harris
> >  > > >
> > > wrote:
> > >
> > > > Hey Ismael,
> > > >
> > > > > The mention of "runtime" is specific to Connect. When it comes to
> > > > clients,
> > > > one typically compiles and runs with the same version or runs with a
> > > newer
> > > > version than the one used for compilation. This is standard practice
> in
> > > > Java and not something specific to Kafka.
> > > >
> > > > When I said "older runtimes" I was being lazy, and should have said
> > > > "older versions of clients at runtime," thank you for figuring out
> > > > what I meant.
> > > >
> > > > I don't know how common it is to compile a partitioner against one
> > > > version of clients, and then distribute and run the partitioner with
> > > > older versions of clients and expect graceful degradation of
> features.
> > > > If you say that it is very uncommon and not something that we should
> > > > optimize for, then I won't suggest otherwise.
> > > >
> > > > > With regards to the Admin APIs, they have been extended several
> times
> > > > since introduction (naturally). One of them is:
> > > > >
> > > >
> > >
> >
> https://github.com/apache/kafka/commit/1d22b0d70686aef5689b775ea2ea7610a37f3e8c
> > > >
> > > > Thanks for the example. I also see that includes a migration from
> > > > regular arguments to the DTO style, consistent with your
> > > >

Re: KIP-976: Cluster-wide dynamic log adjustment for Kafka Connect

2023-09-05 Thread Sagar
Hey Chris,

Thanks for the KIP. Seems like a useful feature. I have some more
questions/comments:

1) Considering that you have now clarified here that last_modified field
would be stored in-memory, it is not mentioned in the KIP. Although that's
something that's understandable, it wasn't apparent when reading the KIP.
Probably, we could mention it? Also, what happens if a worker restarts? In
that case, since the log level update message would be pre-dating the
startup of the worker, it would be ignored? We should probably mention that
behaviour as well IMO.

2) Staying on the last_modified field, it's utility is also not too clear
to me. Can you add some examples of how it can be useful for debugging etc?

3) In the test plan, should we also add a test when the scope parameter
passed is non-null and neither worker nor cluster? In this case the
behaviour should be similar to the default case.

4) I had the same question as Yash regarding persistent cluster-wide
logging level. I think you have explained it well and we can skip it for
now.

Thanks!
Sagar.

On Tue, Sep 5, 2023 at 8:49 PM Chris Egerton 
wrote:

> Hi all,
>
> Thank you so much for the generous review comments! Happy to see interest
> in this feature. Inline responses follow.
>
>
> Ashwin:
>
> > Don't you foresee a future scope type which may require cluster metadata
> ?
> In that case, isn't it better to forward the requests to the leader in the
> initial implementation ?
>
> I agree with Yash here: we can cross that bridge when we come to it, unless
> there are problems that we'd encounter then that would be better addressed
> by adding request forwarding now. One potential argument I can think of is
> that the UX would be a little strange if the semantics for this endpoint
> differ depending on the value of the scope parameter (some values would be
> affected by in-progress rebalances, and some would not), but I don't know
> if making scope=cluster more brittle in the name of consistency with, e.g.,
> scope=connectorType:foo is really a worthwhile tradeoff. Thoughts?
>
> > I would also recommend an additional system test for Standalone herder to
> ensure that the new scope parameter is honored and the response contains
> the last modified time.
>
> Ah, great call! I love the new testing plan section. I also share Yash's
> concerns about adding a new system test though (at this point, they're so
> painful to write, test, and debug that in most circumstances I consider
> them a last resort). Do you think it'd be reasonable to add end-to-end
> verification for this with an integration test instead?
>
>
> Yash:
>
> > From the proposed changes section, it isn't very clear to me how we'll be
> tracking this last modified timestamp to be returned in responses for the
> *GET
> /admin/loggers* and *GET /admin/loggers/{logger}* endpoints. Could you
> please elaborate on this? Also, will we track the last modified timestamp
> even for worker scoped modifications where we won't write any records to
> the config topic and the requests will essentially be processed
> synchronously?
>
> RE timestamp tracking: I was thinking we'd store the timestamp for each
> level in-memory and, whenever we change the level for a namespace, update
> its timestamp to the current wallclock time. Concretely, this means we'd
> want the timestamp for some logger `logger` to be as soon as possible after
> the call to `logger.setLevel(level)` for some level `level`. I'm honestly
> unsure how to clarify this further in the KIP; is there anything in there
> that strikes you as particularly ambiguous that we can tweak to be more
> clear?
>
> RE scope distinction for timestamps: I've updated the KIP to clarify this
> point, adding this sentence: "Timestamps will be updated regardless of
> whether the namespace update was applied using scope=worker or
> scope=cluster.". Let me know what you think
>
> > In the current synchronous implementation for the *PUT
> /admin/loggers/{logger} *endpoint, we return a 404 error if the level is
> invalid (i.e. not one of TRACE, DEBUG, WARN etc.). Since the new cluster
> scoped variant of the endpoint will be asynchronous, can we also add a
> validation to synchronously surface erroneous log levels to users?
>
> Good call! I think we don't have to be explicit about this in the proposed
> changes section, but it's a great fit for the testing plan, where I've
> added it: "Ensure that cluster-scoped requests with invalid logging levels
> are rejected with a 404 response"
>
> > I'm curious to know what the rationale here is? In KIP-745, the stated
> reasoning behind ignoring restart requests during worker startup was that
> the worker will anyway be starting all connectors and tasks assigned to it
> so

Re: [VOTE] KIP-970: Deprecate and remove Connect's redundant task configurations endpoint

2023-08-30 Thread Sagar
+1 (non - binding).

Thanks !
Sagar.

On Wed, 30 Aug 2023 at 11:09 PM, Chris Egerton 
wrote:

> +1 (binding), thanks Yash!
>
> On Wed, Aug 30, 2023 at 1:34 PM Andrew Schofield <
> andrew_schofield_j...@outlook.com> wrote:
>
> > Thanks for the KIP. Looks good to me.
> >
> > +1 (non-binding).
> >
> > Andrew
> >
> > > On 30 Aug 2023, at 18:07, Hector Geraldino (BLOOMBERG/ 919 3RD A) <
> > hgerald...@bloomberg.net> wrote:
> > >
> > > This makes sense to me, +1 (non-binding)
> > >
> > > From: dev@kafka.apache.org At: 08/30/23 02:58:59 UTC-4:00To:
> > dev@kafka.apache.org
> > > Subject: [VOTE] KIP-970: Deprecate and remove Connect's redundant task
> > configurations endpoint
> > >
> > > Hi all,
> > >
> > > This is the vote thread for KIP-970 which proposes deprecating (in the
> > > Apache Kafka 3.7 release) and eventually removing (in the next major
> > Apache
> > > Kafka release - 4.0) Connect's redundant task configurations endpoint.
> > >
> > > KIP -
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-970%3A+Deprecate+and+remov
> > > e+Connect%27s+redundant+task+configurations+endpoint
> > >
> > > Discussion thread -
> > > https://lists.apache.org/thread/997qg9oz58kho3c19mdrjodv0n98plvj
> > >
> > > Thanks,
> > > Yash
> > >
> > >
> >
> >
>


Re: Disabling Test: org.apache.kafka.trogdor.coordinator.CoordinatorTest.testTaskRequestWithOldStartMsGetsUpdated()

2023-08-28 Thread Sagar
Hey Greg,

Aah ok, I wasn't aware there existed a JIRA for this already. I did see
your attempt to fix this but it seems to be failing still.

Sagar.

On Mon, Aug 28, 2023 at 10:30 PM Greg Harris 
wrote:

> Hey Sagar,
>
> The JIRA for this flaky test is here:
> https://issues.apache.org/jira/browse/KAFKA-8115
>
> Rather than disabling the test, I think we should look into the cause
> of the flakiness.
>
> Thanks!
> Greg
>
> On Mon, Aug 28, 2023 at 2:49 AM Sagar  wrote:
> >
> > Hi All,
> >
> > Should we disable this test:
> >
> org.apache.kafka.trogdor.coordinator.CoordinatorTest.testTaskRequestWithOldStartMsGetsUpdated()?
> >
> > I just did a quick search on my mailbox for this test and it has been
> > failing for a while. I will go ahead and create a ticket for this for
> > fixing this.
> >
> > Let me know if disabling it doesn't sound like a good idea.
> >
> > Thanks!
> > Sagar.
>


Disabling Test: org.apache.kafka.trogdor.coordinator.CoordinatorTest.testTaskRequestWithOldStartMsGetsUpdated()

2023-08-28 Thread Sagar
Hi All,

Should we disable this test:
org.apache.kafka.trogdor.coordinator.CoordinatorTest.testTaskRequestWithOldStartMsGetsUpdated()?

I just did a quick search on my mailbox for this test and it has been
failing for a while. I will go ahead and create a ticket for this for
fixing this.

Let me know if disabling it doesn't sound like a good idea.

Thanks!
Sagar.


Re: [DISCUSS] KIP-910: Update Source offsets for Source Connectors without producing records

2023-08-28 Thread Sagar
Hey Yash,

Thanks for your further comments. Here are my responses:

1) Deleting offsets via updateOffsets.

Hmm, I am not sure this is really necessary to be part of the KIP at this
point, and we can always add it later on if needed. I say this for the
following reasons:


   - The size of offsets topic can be controlled by setting appropriate
   topic retention values and that is a standard practice in Kafka. Sure it's
   not always possible to get the right values but as I said it is a standard
   practice. For Connect specifically, there is also a KIP (KIP-943
   <https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=255073470>)
   which is trying to solve the problem of a large connect-offsets topic. So,
   if that is really the motivation, then these are being addressed separately
   anyways.
   - Deleting offsets is not something which should be done very frequently
   and should be handled with care. That is why KIP-875's mechanism to have
   users/ cluster admin do this externally is the right thing to do. Agreed
   this involves some toil but it's not something that should be done on a
   very regular basis.
   - There is no stopping connector implementations to send tombstone
   records as offsets but in practice how many connectors actually do it?
   Maybe 1 or 2 from what we discussed.
   - The usecases you highlighted are edge cases at best. As I have been
   saying, if it is needed we can always add it in the future but that doesn't
   look like a problem we need to solve upfront.

Due to these reasons, I don't think this is a point that we need to stress
so much upon. I say this because offsets topic's purging/clean up can be
handled either via standard Kafka techniques (point #1 above) or via
Connect runtime techniques (Pt #2  above). IMO the problem we are trying to
solve via this KIP has been solved by connectors using techniques which
have been termed as having higher maintenance cost or a high cognitive load
(i.e separate topic) and that needs to be addressed upfront. And since you
yourself termed it as a nice to have feature, we can leave it to that and
take it up as Future Work. Hope that's ok with you and other community
members.

2) Purpose of offsets parameter in updateOffsets

The main purpose is to provide the task with the visibility into what
partitions are getting their offsets committed. It is not necessary that a
task might choose to update offsets everytime it sees that a given source
partition is missing from the about to be committed offsets. Maybe it
chooses to wait for some X iterations or X amount of time and send out an
updated offset for a partition only when such thresholds are breached. Even
here we could argue that since it's sending the partition/offsets it can do
the tracking on it's own, but IMO that is too much work given that the
information is already available via offsets to be committed.

Thanks!
Sagar.


Re: [VOTE] KIP-953: partition method to be overloaded to accept headers as well.

2023-08-27 Thread Sagar
Hi Ismael,

Thanks for pointing us towards the direction of a DTO based approach. The
AdminClient examples seem very neat and extensible in that sense.
Personally, I was trying to think only along the lines of how the current
Partitioner interface has been designed, i.e having all requisite
parameters as separate arguments (Topic, Key, Value etc).

Regarding this question of yours:

A more concrete question: did we consider having the method `partition`
> take `ProduceRecord` as one of its parameters and `Cluster` as the other?


No, I don't think in the discussion thread it was brought up and as I said
it appears that could be due to an attempt to keep the new method's
signature similar to the existing one within Partitioner. If I understood
the intent of the question correctly, are you trying to hint here that
`ProducerRecord` already contains all the arguments that the `partition`
method accepts and also has a `headers` field within it. So, instead of
adding another method for the `headers` field, why not create a new method
taking ProducerRecord directly?

If my understanding is correct, then it seems like a very clean way of
adding support for `headers`. Anyways, the partition method within
KafkaProducer already takes a ProducerRecord as an argument so that makes
it easier. Keeping that in mind, should this new method's (which takes a
ProducerRecord as an input) default implementation invoke the existing
method ? One challenge I see there is that the existing partition method
expects serialized keys and values while ProducerRecord doesn't have access
to those (It directly operates on K, V).

Thanks!
Sagar.


On Sun, Aug 27, 2023 at 8:51 AM Ismael Juma  wrote:

> A more concrete question: did we consider having the method `partition`
> take `ProduceRecord` as one of its parameters and `Cluster` as the other?
>
> Ismael
>
> On Sat, Aug 26, 2023 at 12:50 PM Greg Harris  >
> wrote:
>
> > Hey Ismael,
> >
> > > The mention of "runtime" is specific to Connect. When it comes to
> > clients,
> > one typically compiles and runs with the same version or runs with a
> newer
> > version than the one used for compilation. This is standard practice in
> > Java and not something specific to Kafka.
> >
> > When I said "older runtimes" I was being lazy, and should have said
> > "older versions of clients at runtime," thank you for figuring out
> > what I meant.
> >
> > I don't know how common it is to compile a partitioner against one
> > version of clients, and then distribute and run the partitioner with
> > older versions of clients and expect graceful degradation of features.
> > If you say that it is very uncommon and not something that we should
> > optimize for, then I won't suggest otherwise.
> >
> > > With regards to the Admin APIs, they have been extended several times
> > since introduction (naturally). One of them is:
> > >
> >
> https://github.com/apache/kafka/commit/1d22b0d70686aef5689b775ea2ea7610a37f3e8c
> >
> > Thanks for the example. I also see that includes a migration from
> > regular arguments to the DTO style, consistent with your
> > recommendation here.
> >
> > I think the DTO style and the proposed additional argument style are
> > both reasonable.
> >
> > Thanks,
> > Greg
> >
> > On Sat, Aug 26, 2023 at 9:46 AM Ismael Juma  wrote:
> > >
> > > Hi Greg,
> > >
> > > The mention of "runtime" is specific to Connect. When it comes to
> > clients,
> > > one typically compiles and runs with the same version or runs with a
> > newer
> > > version than the one used for compilation. This is standard practice in
> > > Java and not something specific to Kafka.
> > >
> > > With regards to the Admin APIs, they have been extended several times
> > since
> > > introduction (naturally). One of them is:
> > >
> > >
> >
> https://github.com/apache/kafka/commit/1d22b0d70686aef5689b775ea2ea7610a37f3e8c
> > >
> > > Ismael
> > >
> > > On Sat, Aug 26, 2023 at 8:29 AM Greg Harris
>  > >
> > > wrote:
> > >
> > > > Hey Ismael,
> > > >
> > > > Thank you for clarifying where the DTO pattern is used already, I did
> > > > not have the admin methods in mind.
> > > >
> > > > > With the DTO approach, you don't create a new DTO, you simply add a
> > new
> > > > overloaded constructor and accessor to the DTO.
> > > >
> > > > With this variant, partitioner implementations would receive a
> > > > `NoSuchMethodExce

Re: [DISCUSS] KIP-970: Deprecate and remove Connect's redundant task configurations endpoint

2023-08-23 Thread Sagar
Thanks Yash. LGTM

Thanks!
Sagar.

On Tue, Aug 22, 2023 at 6:04 PM Chris Egerton 
wrote:

> Hi Yash,
>
> Thanks for driving this, and for putting out a well-written KIP. LGTM!
>
> Cheers,
>
> Chris
>
> On Tue, Aug 22, 2023 at 6:13 AM Yash Mayya  wrote:
>
> > Hi all,
> >
> > I'd like to start a discussion thread for this KIP -
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-970%3A+Deprecate+and+remove+Connect%27s+redundant+task+configurations+endpoint
> > .
> >
> > It proposes the deprecation and eventual removal of Kafka Connect's
> > redundant task configurations endpoint.
> >
> > Thanks,
> > Yash
> >
>


Re: Re: [DISCUSS] KIP-943: Add independent "offset.storage.segment.bytes" for connect-distributed.properties

2023-08-18 Thread Sagar
Hey Hudeqi,

I took some time to read through the PR link as well where you and Chris
had an informative discussion.

I think even over there and in this discussion thread, it seems to me that
the consensus is to reduce the scope of the KIP to reduce the default value
of segment.bytes config for offsets topic. This will prevent future workers
from having a lesser boot up time. IMO while this might not seem like a
high impact thing, the configs that we are talking about here are advanced
ones which new users for Connect might not immediately look into. So, if
they end up in a situation where there's a 23-min worker startup time, then
it might not be an overall good experience for them.

Regarding the point Greg mentioned, we will have to think about getting
around it. The approach you suggested seems unclean to me. Since you have
been testing with this config in your cluster and you already have a large
offsets topic, in your experience have you noticed any discrepancies of the
in-memory states across workers in your cluster? Would it be possible for
you to test that? That might be a good starting point to understand how we
want to fix this. Ideally we should have some kind of a Point of view(or
even a potential fix) on this before we go about implementing this change.
WDYT?

Thanks!
Sagar.

On Mon, Aug 14, 2023 at 6:09 PM hudeqi <16120...@bjtu.edu.cn> wrote:

> bump this discuss thread.
>
> best,
> hudeqi
>
> hudeqi 16120...@bjtu.edu.cn写道:
> > Sorry for not getting email reminders and ignoring your reply for
> getting back so late, Yash Mayya, Greg Harris, Sagar.
> >
> > Thank you for your thoughts and suggestions, I learned a lot, I will
> give my thoughts and answers in a comprehensive way:
> > 1. The default configuration of 50MB is the online configuration I
> actually used to solve this problem, and the effect is better (see the
> description of jira:
> https://issues.apache.org/jira/projects/KAFKA/issues/KAFKA-15086?filter=allopenissues.
> In fact, I think it may be better to set this value smaller, so I abandoned
> the default value like __consumer_offsets, but I don't know how much the
> default value is the best.). Secondly, I also set the default value of 50MB
> online through ConfigDef#defineInternal, and if the value configured by the
> user is greater than the default value, the warning log will be displayed,
> but the only difference from your said is that I will overwrite the value
> configured by the user with the default value (emmm, this point was denied
> by Chris Egerton: https://github.com/apache/kafka/pull/13852, in fact,
> you all agree that should not directly override the user-configured value,
> and now I agree with this).
> > 2. I think the potential bug that Greg mentioned may lead to
> inconsistent state between workers is a great point. It is true that we
> cannot directly change the configuration for an existing internal topics.
> Perhaps a more tricky and disgusting approach is that we manually find that
> the active segment sizes of all current partitions are relatively small,
> first stop all connect instances, then change the topic configuration, and
> finally start the instances.
> >
> > To sum up, I think whether the scope of the KIP could be reduced to:
> only set the default value of the 'segment.bytes' of the internal topics
> and make a warning for the bigger value configured by the user. What do you
> think? If there's a better way I'm all ears.
> >
> > best,
> > hudeqi
>


Re: [VOTE] KIP-910: Update Source offsets for Source Connectors without producing records

2023-08-17 Thread Sagar
Hi All,

Bumping the voting thread again.

Thanks!
Sagar.

On Wed, Aug 2, 2023 at 4:43 PM Sagar  wrote:

> Attaching the KIP link for reference:
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-910%3A+Update+Source+offsets+for+Source+Connectors+without+producing+records
>
> Thanks!
> Sagar.
>
> On Wed, Aug 2, 2023 at 4:37 PM Sagar  wrote:
>
>> Hi All,
>>
>> Calling a Vote on KIP-910 [1]. I feel we have converged to a reasonable
>> design. Ofcourse I am open to any feedback/suggestions and would address
>> them.
>>
>> Thanks!
>> Sagar.
>>
>


Re: [VOTE] KIP-953: partition method to be overloaded to accept headers as well.

2023-08-11 Thread Sagar
Hey jack ,

+1 (non binding)

Sagar.

On Sat, 12 Aug 2023 at 8:04 AM, Jack Tomy  wrote:

> Hey everyone,
>
> Please consider this as a gentle reminder.
>
> On Mon, Aug 7, 2023 at 5:55 PM Jack Tomy  wrote:
>
> > Hey everyone.
> >
> > I would like to call for a vote on KIP-953: partition method to be
> > overloaded to accept headers as well.
> >
> > KIP :
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=263424937
> > Discussion thread :
> > https://lists.apache.org/thread/0f20kvfqkmhdqrwcb8vqgqn80szcrcdd
> >
> > Thanks
> > --
> > Best Regards
> > *Jack*
> >
>
>
> --
> Best Regards
> *Jack*
>


[jira] [Created] (KAFKA-15296) Allow committing offsets for Dropped records via SMTs

2023-08-02 Thread Sagar Rao (Jira)
Sagar Rao created KAFKA-15296:
-

 Summary: Allow committing offsets for Dropped records via SMTs
 Key: KAFKA-15296
 URL: https://issues.apache.org/jira/browse/KAFKA-15296
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Reporter: Sagar Rao
Assignee: Sagar Rao


Currently the connect Runtime doesn't commit the offsets of records which have 
been dropped due to SMT. This can lead to issues if the dropped record's 
partition reflects a source partition and the connector depends upon the 
committed offsets to make progress. In such cases, the connector might just 
stall. We should enable committing offsets for dropped records as well. Note 
that today if a record is dropped because exactly-once support is enabled and 
the connector chose to abort the batch containing the record, then its offset 
is still committed. So there already exists a discrepancy in the way the 
runtime treats these dropped records.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [VOTE] KIP-910: Update Source offsets for Source Connectors without producing records

2023-08-02 Thread Sagar
Attaching the KIP link for reference:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-910%3A+Update+Source+offsets+for+Source+Connectors+without+producing+records

Thanks!
Sagar.

On Wed, Aug 2, 2023 at 4:37 PM Sagar  wrote:

> Hi All,
>
> Calling a Vote on KIP-910 [1]. I feel we have converged to a reasonable
> design. Ofcourse I am open to any feedback/suggestions and would address
> them.
>
> Thanks!
> Sagar.
>


[VOTE] KIP-910: Update Source offsets for Source Connectors without producing records

2023-08-02 Thread Sagar
Hi All,

Calling a Vote on KIP-910 [1]. I feel we have converged to a reasonable
design. Ofcourse I am open to any feedback/suggestions and would address
them.

Thanks!
Sagar.


Re: [DISCUSS] KIP-953: partition method to be overloaded to accept headers as well.

2023-07-29 Thread Sagar
Hi Andrew,

Thanks for your comments.

1) Yes that makes sense and that's what even would expect to see as well. I
just wanted to highlight that we might still need a way to let client side
partitioning logic be present as well. Anyways, I am good on this point.
2) The example provided does seem achievable by simply attaching the
partition number in the ProducerRecord. I guess if we can't find any
further examples which strengthen the case of this partitioner, it might be
harder to justify adding it.


Thanks!
Sagar.

On Fri, Jul 28, 2023 at 2:05 PM Andrew Schofield <
andrew_schofield_j...@outlook.com> wrote:

> Hi Sagar,
> Thanks for your comments.
>
> 1) Server-side partitioning doesn’t necessarily mean that there’s only one
> way to do it. It just means that the partitioning logic runs on the broker
> and
> any configuration of partitioning applies to the broker’s partitioner. If
> we ever
> see a KIP for this, that’s the kind of thing I would expect to see.
>
> 2) In the priority example in the KIP, there is a kind of contract between
> the
> producers and consumers so that some records can be processed before
> others regardless of the order in which they were sent. The producer
> wants to apply special significance to a particular header to control which
> partition is used. I would simply achieve this by setting the partition
> number
> in the ProducerRecord at the time of sending.
>
> I don’t think the KIP proposes adjusting the built-in partitioner or
> adding to AK
> a new one that uses headers in the partitioning decision. So, any
> configuration
> for a partitioner that does support headers would be up to the
> implementation
> of that specific partitioner. Partitioner implements Configurable.
>
> I’m just providing an alternative view and I’m not particularly opposed to
> the KIP.
> I just don’t think it quite merits the work involved to get it voted and
> merged.
> As an aside, a long time ago, I created a small KIP that was never adopted
> and I didn’t push it because I eventually didn’t need it.
>
> Thanks,
> Andrew
>
> > On 28 Jul 2023, at 05:15, Sagar  wrote:
> >
> > Hey Andrew,
> >
> > Thanks for the review. Since I had reviewed the KIP I thought I would
> also
> > respond. Of course Jack has the final say on this since he wrote the KIP.
> >
> > 1) This is an interesting point and I hadn't considered it. The
> > comparison with KIP-848 is a valid one but even within that KIP, it
> allows
> > client side partitioning for power users like Streams. So while we would
> > want to move away from client side partitioner as much as possible, we
> > still shouldn't do away completely with Client side partitioning and end
> up
> > being in a state of inflexibility for different kinds of usecases. This
> is
> > my opinion though and you have more context on Clients, so would like to
> > know your thoughts on this.
> >
> > 2) Regarding this, I assumed that since the headers are already part of
> the
> > consumer records they should have access to the headers and if there is a
> > contract b/w the applications producing and the application consuming,
> that
> > decisioning should be transparent. Was my assumption incorrect? But as
> you
> > rightly pointed out header based partitioning with keys is going to lead
> to
> > surprising results. Assuming there is merit in this proposal, do you
> think
> > we should ignore the keys in this case (similar to the effect of
> > setting *partitioner.ignore.keys
> > *config to false) and document it appropriately?
> >
> > Let me know what you think.
> >
> > Thanks!
> > Sagar.
> >
> >
> > On Thu, Jul 27, 2023 at 9:41 PM Andrew Schofield <
> > andrew_schofield_j...@outlook.com> wrote:
> >
> >> Hi Jack,
> >> Thanks for the KIP. I have a few concerns about the idea.
> >>
> >> 1) I think that while a client-side partitioner seems like a neat idea
> and
> >> it’s an established part of Kafka,
> >> it’s one of the things which makes Kafka clients quite complicated. Just
> >> as KIP-848 is moving from
> >> client-side assignors to server-side assignors, I wonder whether really
> we
> >> should be looking to make
> >> partitioning a server-side capability too over time. So, I’m not
> convinced
> >> that making the Partitioner
> >> interface richer is moving in the right direction.
> >>
> >> 2) For records with a key, the partitioner usually calculates the
> >> partition from the key. This means
> >> that records with the same key end up on the same partition. Man

Re: [DISCUSS] KIP-910: Update Source offsets for Source Connectors without producing records

2023-07-28 Thread Sagar
Hey Yash,

Thanks for your comments.

1) Hmm the question is how do you qualify a partition as stale or old?
Let's say a connector has implemented updateOffsets and for a certain
partition for which no records are received then it will update it's
offsets. So technically that offset can't be termed as stale anymore. Even
though I can't think of a side effect at this point to disallow offset
deletion via this method, my opinion is to use a proper mechanism like the
ones introduced in KIP-875 to delete offsets. Moreover, if I also consider
the option presented in point #2 , for simplicity sake it seems better to
not add this feature at this point. If we feel it's really needed and users
are requesting it, we can add support for it later on.

2) I get the point now. I can't think of cases where updating offsets would
be needed. As with point #1, we can always add it back if needed later on.
For now, I have removed that part from the KIP.

3) Yes, because the offset commit happens on a different thread, ordering
guarantees might be harder to ensure if we do it from the other thread. The
current mechanism proposed, even though gets invoked multiple times, keeps
things simpler to reason about.

Let me know how things look now. If it's all looking ok, I would go ahead
and create a Vote thread for the same.

Thanks!
Sagar.

On Tue, Jul 25, 2023 at 5:15 PM Yash Mayya  wrote:

> Hi Sagar,
>
> Thanks for the updates. I had a few more follow up questions:
>
> > I have added that a better way of doing that would be
> > via KIP-875. Also, I didn't want to include any mechamisms
> > for users to meddle with the offsets topic. Allowing tombstone
> > records via this method would be akin to publishing tombstone
> > records directly to the offsets topic which is not recommended
> > generally.
>
> KIP-875 would allow a way for cluster administrators and / or users to do
> so manually externally whereas allowing tombstones in
> SourceTask::updateOffsets would enable connectors to clean up offsets for
> old / stale partitions without user intervention right? I'm not sure I
> follow what you mean by "I didn't want to include any mechamisms for users
> to meddle with the offsets topic" here? Furthermore, I'm not sure why
> publishing tombstone records directly to the offsets topic would not be
> recommended? Isn't that currently the only way to manually clean up offsets
> for a source connector?
>
> > It could be useful in a scenario where the offset of a partition
> > doesn't update for some period of time. In such cases, the
> > connector can do some kind of state tracking and update the
> > offsets after the time period elapses.
>
> I'm not sure I follow? In this case, won't the offsets argument passed
> to SourceTask::updateOffsets *not *contain the source partition which
> hasn't had an update for a long period of time? Wouldn't it make more sense
> to reduce the surface of the API as Chris suggested and only allow adding
> new partition offset pairs to the about to be committed offsets (since
> there don't seem to be any use cases outlined for allowing connectors to
> update offsets for source partitions that are already about to have an
> offset be committed for)?
>
> > All the records returned by the previous poll invocation
> >  got processed successfully
>
> Thanks for this clarification in the KIP, it looks like it does address the
> offsets ordering issue. As to Chris' point about invoking
> SourceTask::updateOffsets less frequently by calling it before offsets are
> committed rather than in every poll loop iteration - I guess that would
> make it a lot more tricky to address the ordering issue?
>
>
> Thanks,
> Yash
>
> On Thu, Jul 20, 2023 at 9:50 PM Sagar  wrote:
>
> > Hey All,
> >
> > Please let me know how the KIP looks now. Is it at a stage where I can
> > start with the Voting phase? Of course I am still open to
> > feedback/suggestions but planning to start the Vote for it.
> >
> > Thanks!
> > Sagar.
> >
> > On Tue, Jul 11, 2023 at 10:00 PM Sagar 
> wrote:
> >
> > > Hi Yash/Chris,
> > >
> > > Thanks for the feedback! I have updated the KIP with the suggestions
> > > provided. I would also update the PR with the suggestions.
> > >
> > > Also, I was hoping that this could make it to the 3.6 release given
> that
> > > it would benefit source connectors which have some of the problems
> listed
> > > in the Motivation Section.
> > >
> > > Responses Inline:
> > >
> > > Yash:
> > >
> > > 1) In the proposed changes section where you talk about modifying the
> > >> offsets, could you please clarify that tasks

Re: [DISCUSS] KIP-953: partition method to be overloaded to accept headers as well.

2023-07-27 Thread Sagar
Hey Andrew,

Thanks for the review. Since I had reviewed the KIP I thought I would also
respond. Of course Jack has the final say on this since he wrote the KIP.

1) This is an interesting point and I hadn't considered it. The
comparison with KIP-848 is a valid one but even within that KIP, it allows
client side partitioning for power users like Streams. So while we would
want to move away from client side partitioner as much as possible, we
still shouldn't do away completely with Client side partitioning and end up
being in a state of inflexibility for different kinds of usecases. This is
my opinion though and you have more context on Clients, so would like to
know your thoughts on this.

2) Regarding this, I assumed that since the headers are already part of the
consumer records they should have access to the headers and if there is a
contract b/w the applications producing and the application consuming, that
decisioning should be transparent. Was my assumption incorrect? But as you
rightly pointed out header based partitioning with keys is going to lead to
surprising results. Assuming there is merit in this proposal, do you think
we should ignore the keys in this case (similar to the effect of
setting *partitioner.ignore.keys
*config to false) and document it appropriately?

Let me know what you think.

Thanks!
Sagar.


On Thu, Jul 27, 2023 at 9:41 PM Andrew Schofield <
andrew_schofield_j...@outlook.com> wrote:

> Hi Jack,
> Thanks for the KIP. I have a few concerns about the idea.
>
> 1) I think that while a client-side partitioner seems like a neat idea and
> it’s an established part of Kafka,
> it’s one of the things which makes Kafka clients quite complicated. Just
> as KIP-848 is moving from
> client-side assignors to server-side assignors, I wonder whether really we
> should be looking to make
> partitioning a server-side capability too over time. So, I’m not convinced
> that making the Partitioner
> interface richer is moving in the right direction.
>
> 2) For records with a key, the partitioner usually calculates the
> partition from the key. This means
> that records with the same key end up on the same partition. Many
> applications expect this to give ordering.
> Log compaction expects this. There are situations in which records have to
> be repartitioned, such as
> sometimes happens with Kafka Streams. I think that a header-based
> partitioner for records which have
> keys is going to be surprising and only going to have limited
> applicability as a result.
>
> The tricky part about clever partitioning is that downstream systems have
> no idea how the partition
> number was arrived at, so they do not truly understand how the ordering
> was derived. I do think that
> perhaps there’s value to being able to influence the partitioning using
> the headers, but I wonder if actually
> transforming the headers into an “ordering context” that then flows with
> the record as it moves through
> the system would be a stronger solution. Today, the key is the ordering
> context. Maybe it should be a
> concept in its own right and the Producer could configure a converter from
> headers to ordering context.
> That is of course a much bigger change.
>
> In one of the examples you mention in the KIP, you mentioned using a
> header to control priority. I guess the
> idea is to preferentially process records off specific partitions so they
> can overtake lower priority records.
> I suggest just sending the records explicitly to specific partitions to
> achieve this.
>
> Sorry for the essay, but you did ask for people to share their thoughts :)
>
> Just my opinion. Let’s see what others think.
>
> Thanks,
> Andrew
>
> > On 25 Jul 2023, at 14:58, Jack Tomy  wrote:
> >
> > Hey @Sagar
> >
> > Thanks again for the review.
> > 1. "a null headers value is equivalent to invoking the older partition
> > method", this is not true. If someone makes an implementation and the
> > headers come as null, still the new implementation will take effect.
> > Instead I have added : "Not overriding this method in the Partitioner
> > interface has the same behaviour as using the existing method."
> > 2. Corrected.
> >
> > Hey @Sagar and everyone,
> > Please have a look at the new version and share your thoughts.
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=263424937
> > Like Sagar mentioned, I would also request more people who have more
> > context on clients to chime in.
> >
> >
> > On Tue, Jul 25, 2023 at 2:58 PM Sagar  wrote:
> >
> >> Hi Jack,
> >>
> >> Thanks I have a couple of final comments and then I am good.
> >>
> >> 1) Can you elaborate on the Jav

Re: [DISCUSS] KIP-953: partition method to be overloaded to accept headers as well.

2023-07-25 Thread Sagar
Hi Jack,

Thanks I have a couple of final comments and then I am good.

1) Can you elaborate on the Javadocs of the partition headers argument to
specify that a null headers value is equivalent to invoking the older
partition method? It is apparent but generally good to call out.
2) In the Compatibility section, you have mentioned backward comparable. I
believe it should be *backward compatible change.*

I don't have other comments. Post this, probably someone else who has more
context on Clients can also chime in on this before we can move this to
Voting.

Thanks!
Sagar.


On Sat, Jul 22, 2023 at 10:09 AM Jack Tomy  wrote:

> Hey @Sagar,
>
> Thank you again for the response and feedback.
>
>1. Though the ask wasn't very clear to me I have attached the Javadoc as
>per your suggestion. Please have a look and let me know if this meets
> the
>expectations.
>2. Done.
>3. Done
>4. Done
>
> Hey @Sagar and everyone,
> Please have a look at the new version and share your thoughts.
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=263424937
>
> On Thu, Jul 20, 2023 at 9:46 PM Sagar  wrote:
>
> > Thanks Jack for the updates.
> >
> > Some more feedback:
> >
> > 1) It would be better if you can add the Javadoc in the Public interfaces
> > section. That is a general practice used which gives the readers of the
> KIP
> > a high level idea of the Public Interfaces.
> >
> > 2) In the proposed section, the bit about marking headers as read only
> > seems like an implementation detail This can generally be avoided in
> KIPs.
> >
> > 3) Also, in the Deprecation section, can you mention again that this is a
> > backward compatible change and the reason for it (already done in the
> > Proposed Changes section).
> >
> > 4) In the Testing Plan section, there is still the KIP template bit
> copied
> > over. That can be removed.
> >
> > Thanks!
> > Sagar.
> >
> >
> > On Thu, Jul 20, 2023 at 2:48 PM Jack Tomy  wrote:
> >
> > > Hey Everyone,
> > >
> > > Please consider this as a reminder and share your feedback. Thank you.
> > >
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=263424937
> > >
> > > On Tue, Jul 18, 2023 at 5:43 PM Jack Tomy 
> wrote:
> > >
> > > > Hey @Sagar,
> > > >
> > > > Thank you for the response and feedback.
> > > >
> > > >1. Done
> > > >2. Yeah, that was a mistake from my end. Corrected.
> > > >3. Can you please elaborate this, I have added the java doc along
> > with
> > > >the code changes. Should I paste the same in KIP too?
> > > >4. Moved.
> > > >5. I have added one more use case, it is actually helpful in any
> > > >situation where you want to pass some information to partition
> > method
> > > but
> > > >don't have to have it in the key or value.
> > > >6. Added.
> > > >
> > > >
> > > > Hey @Sagar and everyone,
> > > > Please have a look at the new version and share your thoughts.
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=263424937
> > > >
> > > >
> > > > On Tue, Jul 18, 2023 at 9:53 AM Sagar 
> > wrote:
> > > >
> > > >> Hi Jack,
> > > >>
> > > >> Thanks for the KIP! Seems like an interesting idea. I have some
> > > feedback:
> > > >>
> > > >> 1) It would be great if you could clean up the text that seems to
> > mimic
> > > >> the
> > > >> KIP template. It is generally not required in the KIP.
> > > >>
> > > >> 2) In the Public Interfaces where you mentioned *Partitioner method
> in
> > > >> **org/apache/kafka/clients/producer
> > > >> will have the following update*, I believe you meant the Partitioner
> > > >> *interface*?
> > > >>
> > > >> 3) Staying on Public Interface, it is generally preferable to add a
> > > >> Javadocs section along with the newly added method. You could also
> > > >> describe
> > > >> the behaviour of it invoking the default existing method.
> > > >>
> > > >> 4) The option that is mentioned in the Rejected Alternatives, seems
> > more
> > > >> like a workaround to the current problem that you are describing.
> That
> > > &g

[jira] [Resolved] (KAFKA-15005) Status of KafkaConnect task not correct

2023-07-24 Thread Sagar Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15005?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sagar Rao resolved KAFKA-15005.
---
Resolution: Duplicate

> Status of KafkaConnect task not correct
> ---
>
> Key: KAFKA-15005
> URL: https://issues.apache.org/jira/browse/KAFKA-15005
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.5.1, 3.0.0, 3.3.2
>Reporter: Yu Wang
>Priority: Major
>
> Our MM2 is running version 2.5.1.
> After a rebalance of our MM2 source tasks, we found there were several tasks 
> always in *UNASSIGNED* status, even the real tasks already started. 
> So we dump the payload of the status topic of Kafka Connect, and found the 
> last two status change is status *RUNNING* followed by status 
> {*}UNASSIGNED{*}.
> {code:java}
> LogAppendTime: 1684167885961 keysize: 64 valuesize: 95 sequence: -1 
> headerKeys: [] key: task-7 payload: 
> {"state":"RUNNING","trace":null,"worker_id":"worker-1","generation":437643}
> LogAppendTime: 1684167903456 keysize: 64 valuesize: 98 sequence: -1 
> headerKeys: [] key: task-7 payload: 
> {"state":"UNASSIGNED","trace":null,"worker_id":"worker-2","generation":437643}
>  {code}
> But usually, the RUNNING status should be appended after the UNASSIGNED, 
> because the worker coordinator will revoked the tasks before start new tasks.
> Then we checked the log of our MM2 worker. And found that, during that time, 
> there was a task that revoked on worker-2 and started on worker-1.
>  
> Worker-1
> {code:java}
> [2023-05-15 09:24:45,950] INFO [Worker clientId=connect-1, 
> groupId=__group] Starting task task-7 
> (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
> [2023-05-15 09:24:45,951] INFO Creating task task-7 
> (org.apache.kafka.connect.runtime.Worker) {code}
> Worker-2
> {code:java}
> [2023-05-15 09:24:40,922] INFO Stopping task task-7 
> (org.apache.kafka.connect.runtime.Worker) {code}
>  
> So I think the incorrect status was caused by the revoked task finished later 
> than the new started task, which made the UNASSIGNED status append to that 
> status topic after the RUNNING status. 
>  
> After reading the code of DistributeHerder, I found that the task revoking is 
> running in a thread pool, the revoke operation just return after submit all 
> the callables. So I think even in the same worker, there is not a guarantee 
> that the revoke operation will always finish before the new tasks start.
> {code:java}
> for (final ConnectorTaskId taskId : tasks) {
> callables.add(getTaskStoppingCallable(taskId));
> }
> // The actual timeout for graceful task/connector stop is applied in worker's
> // stopAndAwaitTask/stopAndAwaitConnector methods.
> startAndStop(callables);
> log.info("Finished stopping tasks in preparation for rebalance"); {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-12283) Flaky Test RebalanceSourceConnectorsIntegrationTest#testMultipleWorkersRejoining

2023-07-23 Thread Sagar Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12283?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sagar Rao resolved KAFKA-12283.
---
Resolution: Fixed

> Flaky Test 
> RebalanceSourceConnectorsIntegrationTest#testMultipleWorkersRejoining
> 
>
> Key: KAFKA-12283
> URL: https://issues.apache.org/jira/browse/KAFKA-12283
> Project: Kafka
>  Issue Type: Test
>  Components: KafkaConnect, unit tests
>Reporter: Matthias J. Sax
>Assignee: Luke Chen
>Priority: Critical
>  Labels: flaky-test
>
> https://github.com/apache/kafka/pull/1/checks?check_run_id=1820092809
> {quote} {{java.lang.AssertionError: Tasks are imbalanced: 
> localhost:36037=[seq-source13-0, seq-source13-1, seq-source13-2, 
> seq-source13-3, seq-source12-0, seq-source12-1, seq-source12-2, 
> seq-source12-3]
> localhost:43563=[seq-source11-0, seq-source11-2, seq-source10-0, 
> seq-source10-2]
> localhost:46539=[seq-source11-1, seq-source11-3, seq-source10-1, 
> seq-source10-3]
>   at org.junit.Assert.fail(Assert.java:89)
>   at org.junit.Assert.assertTrue(Assert.java:42)
>   at 
> org.apache.kafka.connect.integration.RebalanceSourceConnectorsIntegrationTest.assertConnectorAndTasksAreUniqueAndBalanced(RebalanceSourceConnectorsIntegrationTest.java:362)
>   at 
> org.apache.kafka.test.TestUtils.lambda$waitForCondition$3(TestUtils.java:303)
>   at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:351)
>   at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:319)
>   at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:300)
>   at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:290)
>   at 
> org.apache.kafka.connect.integration.RebalanceSourceConnectorsIntegrationTest.testMultipleWorkersRejoining(RebalanceSourceConnectorsIntegrationTest.java:313)}}
> {quote}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-8391) Flaky Test RebalanceSourceConnectorsIntegrationTest#testDeleteConnector

2023-07-23 Thread Sagar Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8391?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sagar Rao resolved KAFKA-8391.
--
Resolution: Fixed

Fixed with https://github.com/apache/kafka/pull/12561

> Flaky Test RebalanceSourceConnectorsIntegrationTest#testDeleteConnector
> ---
>
> Key: KAFKA-8391
> URL: https://issues.apache.org/jira/browse/KAFKA-8391
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.3.0
>Reporter: Matthias J. Sax
>Assignee: Sagar Rao
>Priority: Critical
>  Labels: flaky-test
> Attachments: 100-gradle-builds.tar
>
>
> [https://builds.apache.org/job/kafka-pr-jdk11-scala2.12/4747/testReport/junit/org.apache.kafka.connect.integration/RebalanceSourceConnectorsIntegrationTest/testDeleteConnector/]
> {quote}java.lang.AssertionError: Condition not met within timeout 3. 
> Connector tasks did not stop in time. at 
> org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:375) at 
> org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:352) at 
> org.apache.kafka.connect.integration.RebalanceSourceConnectorsIntegrationTest.testDeleteConnector(RebalanceSourceConnectorsIntegrationTest.java:166){quote}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15229) Increase default value of task.shutdown.graceful.timeout.ms

2023-07-21 Thread Sagar Rao (Jira)
Sagar Rao created KAFKA-15229:
-

 Summary: Increase default value of 
task.shutdown.graceful.timeout.ms
 Key: KAFKA-15229
 URL: https://issues.apache.org/jira/browse/KAFKA-15229
 Project: Kafka
  Issue Type: Improvement
  Components: KafkaConnect
Reporter: Sagar Rao
Assignee: Sagar Rao


The Kafka Connect config [task.shutdown.graceful.timeout.ms. 
|https://kafka.apache.org/documentation/#connectconfigs_task.shutdown.graceful.timeout.ms]has
 a default value of 5s. As per it's definition:

 
{noformat}
Amount of time to wait for tasks to shutdown gracefully. This is the total 
amount of time, not per task. All task have shutdown triggered, then they are 
waited on sequentially.{noformat}

it is the total timeout for all tasks to shutdown. Also, if multiple tasks are 
to be shutdown then, they are waited upon sequentially. Now the default value 
of this config is ok for smaller clusters with less number of tasks, on a 
larger cluster because the timeout can elapse we will see a lot of messages of 
the form 

```
Graceful stop of task  failed.
```

In case of failure in graceful stop of tasks, the tasks are cancelled which 
means that they won't send out a status update. Once that happens there won't 
be any `UNASSIGNED` status message posted for that task. Let's say the task 
stop was triggered by a worker going down. If the cluster is configured to use 
Incremental Cooperative Assignor, then the task wouldn't be reassigned until 
scheduled.rebalance.delay.max.ms interval elapses. So, for that amount of 
duration, the task would show up with status RUNNING whenever it's status is 
queried for. This can be confusing for the users.

This problem can be exacerbated on cloud environments(like kubernetes pods) 
because there is a high chance that the running status would be associated with 
an older worker_id which doesn't even exist in the cluster anymore. 

While the net effect of all of this is not catastrophic i.e it won't lead to 
any processing delays  or loss of data but the status of the task would be off. 
And if there are fast rebalances happening under Incremental Cooperative 
Assignor, then that duration could be high as well. 

So, the proposal is to increase the default value to a higher value. I am 
thinking we can set it to 60s because as far as I can see, it doesn't interfere 
with any other timeout that we have. 

I am tagging this as need-kip because I believe we will need one.






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] KIP-943: Add independent "offset.storage.segment.bytes" for connect-distributed.properties

2023-07-21 Thread Sagar
Hey Hudeqi,

Thanks for the KIP! After reading the KIP and the comments by Yash and Greg
I agree with these aspects:

1) While I agree that having a high value for segment.btes config can lead
to higher startup times, we don't necessarily need to expose a separate
config for it(as Yash suggested). We can use the same mechanism as exposed
by KIP-605 to set it to a lower value specifically for Connect. IMO this
could be an internal config as well defined via ConfigDef#defineInternal so
they don't need to show up in the docs as well. I haven't tested it but if
the users do happen to override the config via the KIP-605 mechanism, it
should update. So, the scope of the KIP could be reduced to having an
explicit internal config for offset's topic segment.bytes with a lower
default value. WDYT?

2) I don't think we should let the configs of existing topics be updated.
If you notice both KIP-605 and KIP-154 (the one which 605 cites) don't
allow updating the configs of existing topics. It would be a good idea to
stick around with this practice imo.

3) Regarding the default value of 50 MB, tbh I am not totally aware of how
the default values for these configs were chosen in the past. But as
pointed out by Greg, __consumer_offsets topic could be a good example to
follow and a default value of 100MB could be a good starting point. Or if
needed we can be defensive and start with a slightly higher value like
250MB. Also the point about tombstone records leading to inconsistent
in-memory states across multiple workers is a good one. This happens with
status topic as well IIRC and if possible we should look to fix it. That is
outside the scope of the KIP though.

Thanks!
Sagar.


On Fri, Jul 14, 2023 at 1:49 AM Greg Harris 
wrote:

> Hey hudeqi,
>
> Thanks for the KIP! I did not know about the existing segment.bytes
> default value, and it does seem rather high in the context of the
> Connect internal topics.
> If I think about the segment.size as a "minimum per-partition data
> transfer on startup", 1GB is certainly not appropriate for even the
> single-partition config topic.
>
> 1. I have a concern about changing the topic configuration on startup.
>
> In the existing codebase, the *.storage.* worker configurations appear
> to only have an effect for newly created topics. If the topics were
> manually created before a Connect cluster starts, or were created by a
> previous Connect instance, then the Connect worker configuration could
> have arbitrary contents that have no effect. Updating the topic
> configurations after creation would be a new capability.
> Consider the situation where someone were to notice this log.segment
> problem, where a natural response would be to reconfigure the topic,
> diverging from the two configurations. When the worker can change the
> topic configuration after creation, that has the potential to roll
> back topic configurations that are managed externally.
> Do you think that changing the default for new Connect clusters, and
> emitting a startup warning for excessive segment.bytes is reasonable?
> We have other startup assertions that fail the startup of a worker
> based on partition and compaction requirements, and this would be
> similar in that it alerts the user to reconfigure the internal topics,
> but with a lesser severity.
>
> 2. I'm also interested to know what a reasonable value for this
> configuration would be. I did find the __consumer_offsets topic uses
> 104857600 (100 MiB) as defined in OffsetConfig.scala, so there is
> precedent for having a smaller segment.size for internal topics.
>
> 3. I believe there's a potential bug where compaction can happen
> before a worker reads a tombstone, leading the KafkaBasedLog to
> produce inconsistent in-memory states across multiple workers. Since
> the segment.size is so large, it makes me think that compaction has
> been wholly ineffective so far, and has prevented this bug from
> manifesting. By lowering the segment.size, we're increasing the
> likelihood of this failure, so it may need to finally be addressed.
>
> Thanks,
> Greg
>
>
>
>
> On Thu, Jul 6, 2023 at 5:39 AM Yash Mayya  wrote:
> >
> > Also, just adding to the above point - we don't necessarily need to
> > explicitly add a new worker configuration right? Instead, we could
> > potentially just use the new proposed default value internally which can
> be
> > overridden by users through setting a value for
> > "offset.storage.segment.bytes" (via the existing KIP-605 based
> mechanism).
> >
> > On Thu, Jul 6, 2023 at 6:04 PM Yash Mayya  wrote:
> >
> > > Hi hudeqi,
> > >
> > > Thanks for the KIP! Just to clarify - since KIP-605 (
> > >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-605%3A+Exp

Re: [DISCUSS] KIP-910: Update Source offsets for Source Connectors without producing records

2023-07-20 Thread Sagar
Hey All,

Please let me know how the KIP looks now. Is it at a stage where I can
start with the Voting phase? Of course I am still open to
feedback/suggestions but planning to start the Vote for it.

Thanks!
Sagar.

On Tue, Jul 11, 2023 at 10:00 PM Sagar  wrote:

> Hi Yash/Chris,
>
> Thanks for the feedback! I have updated the KIP with the suggestions
> provided. I would also update the PR with the suggestions.
>
> Also, I was hoping that this could make it to the 3.6 release given that
> it would benefit source connectors which have some of the problems listed
> in the Motivation Section.
>
> Responses Inline:
>
> Yash:
>
> 1) In the proposed changes section where you talk about modifying the
>> offsets, could you please clarify that tasks shouldn't modify the offsets
>> map that is passed as an argument? Currently, the distinction between the
>> offsets map passed as an argument and the offsets map that is returned is
>> not very clear in numerous places.
>
>
>
> Added
>
> 2) The default return value of Optional.empty() seems to be fairly
>> non-intuitive considering that the return value is supposed to be the
>> offsets that are to be committed. Can we consider simply returning the
>> offsets argument itself by default instead?
>
>
>
> Chris is suggesting returning null for the default case. I am thinking to
> make null
> as the default return type. If the returned map is null, there won't be
> any further
> processing otherwise we will contonue with the existing logic.
>
> 3) The KIP states that "It is also possible that a task might choose to
>> send a tombstone record as an offset. This is not recommended and to
>> prevent connectors shooting themselves in the foot due to this" - could
>> you
>> please clarify why this is not recommended / supported?
>
>
>
> I have added that a better way of doing that would be via KIP-875. Also, I
> didn't want to include
> any mechamisms for users to meddle with the offsets topic. Allowing
> tombstone records via this method
> would be akin to publishing tombstone records directly to the offsets
> topic which is not recommended
> generally.
>
> 4) The KIP states that "If a task returns an Optional of a null object or
>> an Optional of an empty map, even for such cases the behaviour would would
>> be disabled." - since this is an optional API that source task
>> implementations don't necessarily need to implement, I don't think I fully
>> follow why the return type of the proposed "updateOffsets" method is an
>> Optional? Can we not simply use the Map as the return type instead?
>
>
>
> Yeah, I updated the return type to be a Map.
>
>
> 5) The KIP states that "The offsets passed to the updateOffsets  method
>> would be the offset from the latest source record amongst all source
>> records per partition. This way, if the source offset for a given source
>> partition is updated, that offset is the one that gets committed for the
>> source partition." - we should clarify that the "latest" offset refers to
>> the offsets that are about to be committed, and not the latest offsets
>> returned from SourceTask::poll so far (see related discussion in
>> https://issues.apache.org/jira/browse/KAFKA-15091 and
>> https://issues.apache.org/jira/browse/KAFKA-5716).
>
>
>
> Done
>
>
> 6) We haven't used the terminology of "Atleast Once Semantics" elsewhere in
>> Connect since the framework itself does not (and cannot) make any
>> guarantees on the delivery semantics. Depending on the source connector
>> and
>> the source system, both at-least once and at-most once semantics (for
>> example - a source system where reads are destructive) are possible. We
>> should avoid introducing this terminology in the KIP and instead refer to
>> this scenario as exactly-once support being disabled.
>
>
>
> Done
>
>
> 7) Similar to the above point, we should remove the use of the term
>> "Exactly Once Semantics" and instead refer to exactly-once support being
>> enabled since the framework can't guarantee exactly-once semantics for all
>> possible source connectors (for example - a message queue source connector
>> where offsets are essentially managed in the source system via an ack
>> mechanism).
>
>
> Done
>
> 8) In a previous attempt to fix this gap in functionality, a significant
>> concern was raised on offsets ordering guarantees when we retry sending a
>> batch of records (ref -
>> https://github.com/apache/kafka/pull/5553/files#r213329307). It doesn't
>> look like this KIP addresses 

Re: [DISCUSS] KIP-953: partition method to be overloaded to accept headers as well.

2023-07-20 Thread Sagar
Thanks Jack for the updates.

Some more feedback:

1) It would be better if you can add the Javadoc in the Public interfaces
section. That is a general practice used which gives the readers of the KIP
a high level idea of the Public Interfaces.

2) In the proposed section, the bit about marking headers as read only
seems like an implementation detail This can generally be avoided in KIPs.

3) Also, in the Deprecation section, can you mention again that this is a
backward compatible change and the reason for it (already done in the
Proposed Changes section).

4) In the Testing Plan section, there is still the KIP template bit copied
over. That can be removed.

Thanks!
Sagar.


On Thu, Jul 20, 2023 at 2:48 PM Jack Tomy  wrote:

> Hey Everyone,
>
> Please consider this as a reminder and share your feedback. Thank you.
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=263424937
>
> On Tue, Jul 18, 2023 at 5:43 PM Jack Tomy  wrote:
>
> > Hey @Sagar,
> >
> > Thank you for the response and feedback.
> >
> >1. Done
> >2. Yeah, that was a mistake from my end. Corrected.
> >3. Can you please elaborate this, I have added the java doc along with
> >the code changes. Should I paste the same in KIP too?
> >4. Moved.
> >5. I have added one more use case, it is actually helpful in any
> >situation where you want to pass some information to partition method
> but
> >don't have to have it in the key or value.
> >6. Added.
> >
> >
> > Hey @Sagar and everyone,
> > Please have a look at the new version and share your thoughts.
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=263424937
> >
> >
> > On Tue, Jul 18, 2023 at 9:53 AM Sagar  wrote:
> >
> >> Hi Jack,
> >>
> >> Thanks for the KIP! Seems like an interesting idea. I have some
> feedback:
> >>
> >> 1) It would be great if you could clean up the text that seems to mimic
> >> the
> >> KIP template. It is generally not required in the KIP.
> >>
> >> 2) In the Public Interfaces where you mentioned *Partitioner method in
> >> **org/apache/kafka/clients/producer
> >> will have the following update*, I believe you meant the Partitioner
> >> *interface*?
> >>
> >> 3) Staying on Public Interface, it is generally preferable to add a
> >> Javadocs section along with the newly added method. You could also
> >> describe
> >> the behaviour of it invoking the default existing method.
> >>
> >> 4) The option that is mentioned in the Rejected Alternatives, seems more
> >> like a workaround to the current problem that you are describing. That
> >> could be added to the Motivation section IMO.
> >>
> >> 5) Can you also add some more examples of scenarios where this would be
> >> helpful? The only scenario mentioned seems to have a workaround. Just
> >> trying to ensure that we have a strong enough motivation before adding a
> >> public API.
> >>
> >> 6) One thing which should also be worth noting down would be what
> happens
> >> if users override both methods, only one method (new or old) and no
> >> methods
> >> (the default behaviour). It would help in understanding the proposal
> >> better.
> >>
> >> Thanks!
> >> Sagar.
> >>
> >>
> >> On Mon, Jul 17, 2023 at 9:19 PM Jack Tomy 
> wrote:
> >>
> >> > Hey everyone,
> >> >
> >> > Not seeing much discussion on the KPI. Might be because it is too
> >> > obvious .
> >> >
> >> > If there are no more comments, I will start the VOTE in the coming
> days.
> >> >
> >> > On Sat, Jul 15, 2023 at 8:48 PM Jack Tomy 
> >> wrote:
> >> >
> >> > > Hey everyone,
> >> > >
> >> > > Please take a look at the KPI below and provide your suggestions and
> >> > > feedback. TIA.
> >> > >
> >> > >
> >> >
> >>
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=263424937
> >> > >
> >> > >
> >> > > --
> >> > > Best Regards
> >> > > *Jack*
> >> > >
> >> >
> >> >
> >> > --
> >> > Best Regards
> >> > *Jack*
> >> >
> >>
> >
> >
> > --
> > Best Regards
> > *Jack*
> >
>
>
> --
> Best Regards
> *Jack*
>


[jira] [Resolved] (KAFKA-12525) Inaccurate task status due to status record interleaving in fast rebalances in Connect

2023-07-20 Thread Sagar Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12525?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sagar Rao resolved KAFKA-12525.
---
Resolution: Fixed

> Inaccurate task status due to status record interleaving in fast rebalances 
> in Connect
> --
>
> Key: KAFKA-12525
> URL: https://issues.apache.org/jira/browse/KAFKA-12525
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.3.1, 2.4.1, 2.5.1, 2.7.0, 2.6.1
>Reporter: Konstantine Karantasis
>Assignee: Sagar Rao
>Priority: Major
>
> When a task is stopped in Connect it produces an {{UNASSIGNED}} status 
> record. 
> Equivalently, when a task is started or restarted in Connect it produces an 
> {{RUNNING}} status record in the Connect status topic.
> At the same time rebalances are decoupled from task start and stop. These 
> operations happen in separate executor outside of the main worker thread that 
> performs the rebalance.
> Normally, any delayed and stale {{UNASSIGNED}} status records are fenced by 
> the worker that is sending them. This worker is using the 
> {{StatusBackingStore#putSafe}} method that will reject any stale status 
> messages (called only for {{UNASSIGNED}} or {{FAILED}}) as long as the worker 
> is aware of the newer status record that declares a task as {{RUNNING}}.
> In cases of fast consecutive rebalances where a task is revoked from one 
> worker and assigned to another one, it has been observed that there is a 
> small time window and thus a race condition during which a {{RUNNING}} status 
> record in the new generation is produced and is immediately followed by a 
> delayed {{UNASSIGNED}} status record belonging to the same or a previous 
> generation before the worker that sends this message reads the {{RUNNING}} 
> status record that corresponds to the latest generation.
> A couple of options are available to remediate this race condition. 
> For example a worker that is has started a task can re-write the {{RUNNING}} 
> status message in the topic if it reads a stale {{UNASSIGNED}} message from a 
> previous generation (that should have been fenced). 
> Another option is to ignore stale {{UNASSIGNED}} message (messages from an 
> earlier generation than the one in which the task had {{RUNNING}} status).
> Worth noting that when this race condition takes place, besides the 
> inaccurate status representation, the actual execution of the tasks remains 
> unaffected (e.g. the tasks are running correctly even though they appear as 
> {{UNASSIGNED}}). 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] KIP-953: partition method to be overloaded to accept headers as well.

2023-07-17 Thread Sagar
Hi Jack,

Thanks for the KIP! Seems like an interesting idea. I have some feedback:

1) It would be great if you could clean up the text that seems to mimic the
KIP template. It is generally not required in the KIP.

2) In the Public Interfaces where you mentioned *Partitioner method in
**org/apache/kafka/clients/producer
will have the following update*, I believe you meant the Partitioner
*interface*?

3) Staying on Public Interface, it is generally preferable to add a
Javadocs section along with the newly added method. You could also describe
the behaviour of it invoking the default existing method.

4) The option that is mentioned in the Rejected Alternatives, seems more
like a workaround to the current problem that you are describing. That
could be added to the Motivation section IMO.

5) Can you also add some more examples of scenarios where this would be
helpful? The only scenario mentioned seems to have a workaround. Just
trying to ensure that we have a strong enough motivation before adding a
public API.

6) One thing which should also be worth noting down would be what happens
if users override both methods, only one method (new or old) and no methods
(the default behaviour). It would help in understanding the proposal better.

Thanks!
Sagar.


On Mon, Jul 17, 2023 at 9:19 PM Jack Tomy  wrote:

> Hey everyone,
>
> Not seeing much discussion on the KPI. Might be because it is too
> obvious .
>
> If there are no more comments, I will start the VOTE in the coming days.
>
> On Sat, Jul 15, 2023 at 8:48 PM Jack Tomy  wrote:
>
> > Hey everyone,
> >
> > Please take a look at the KPI below and provide your suggestions and
> > feedback. TIA.
> >
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=263424937
> >
> >
> > --
> > Best Regards
> > *Jack*
> >
>
>
> --
> Best Regards
> *Jack*
>


Re: [DISCUSS] KIP-910: Update Source offsets for Source Connectors without producing records

2023-07-11 Thread Sagar
 since we automatically re-insert source partitions that have been
> removed by the connector.



Makes sense. I updated the KIP accordingly.

6. I don't think we don't need to return an Optional from
> SourceTask::updateOffsets. Developers can return null instead of
> Optional.empty(), and since the framework will have to handle null return
> values either way, this would reduce the number of cases for us to handle
> from three (Optional.of(...), Optional.empty(), null) to two (null,
> non-null).



I see. I didn't want to have explicit null checks but then I realised
connect does have explicit null
checks. Edited.


7. Why disallow tombstone records? If an upstream resource disappears, then
> wouldn't a task want to emit a tombstone record without having to also emit
> an accompanying source record? This could help prevent an
> infinitely-growing offsets topic, although with KIP-875 coming out in the
> next release, perhaps we can leave this out for now and let Connect users
> and cluster administrators do this work manually instead of letting
> connector developers automate it.



Even before I considered KIP-875's effects, my thought was to not meddle
too much with the inner
workings of the offsets topic. I think even today users can produce an
offset record to the offsets
topic to drop an unwanted partition but that should be used as a last
resort. I didn't want to introduce
any such mechanisms via this proposal. And with KIP-875 coming in, it makes
all the more sense to not do
it and have the offsets deleted in a more standardised way. The last part
about KIP-875 is what I have mentioned
in the KIP.


8. Is the information on multiple offsets topics for exactly-once
> connectors relevant to this KIP? If not, we should remove it.


Removed.


9. It seems like most of the use cases that motivate this KIP only require
> being able to add a new source partition/source offset pair to the
> to-be-committed offsets. Do we need to allow connector developers to modify
> source offsets for already-present source partitions at all? If we reduce
> the surface of the API, then the worst case is still just that the offsets
> we commit are at most one commit out-of-date.


It could be useful in a scenario where the offset of a partition doesn't
update for some period of time. In
such cases, the connector can do some kind of state tracking and update the
offsets after the time period elapses.

I had mentioned an example of this scenario in an earlier e-mail:


There's also a case at times with CDC source connectors which are REST Api
> / Web Service based(Zendesk Source Connector for example) . These
> connectors typically use timestamps from the responses as offsets. If
> there's a long period of inactivity wherein the API invocations don't
> return any data, then the offsets won't move and the connector would keep
> using the same timestamp that it received from the last non-empty response.
> If this period of inactivity keeps growing, and the API imposes any limits
> on how far back we can go in terms of window start, then this could
> potentially be a problem. In this case even though the connector was caught
> up with all the responses, it may need to snapshot again. In this case
> updating offsets can easily help since all the connector needs to do is to
> move the timestamp which would move the offset inherently.




10. (Nit) The "Motivation" section states that "offsets are written
> periodically by the connect framework to an offsets topic". This is only
> true in distributed mode; in standalone mode, we write offsets to a local
> file.



Ack.

On Wed, Jul 5, 2023 at 8:47 PM Chris Egerton 
wrote:

> Hi Sagar,
>
> Thanks for updating the KIP! The latest draft seems simpler and more
> focused, which I think is a win for users and developers alike. Here are my
> thoughts on the current draft:
>
> 1. (Nit) Can we move the "Public Interfaces" section before the "Proposed
> Changes" section? It's nice to have a summary of the user/developer-facing
> changes first since that answers many of the questions that I had while
> reading the "Proposed Changes" section. I'd bet that this is also why we
> use that ordering in the KIP template.
>
> 2. Why are we invoking SourceTask::updateOffsets so frequently when
> exactly-once support is disabled? Wouldn't it be simpler both for our
> implementation and for connector developers if we only invoked it directly
> before committing offsets, instead of potentially several times between
> offset commits, especially since that would also mirror the behavior with
> exactly-once support enabled?
>
> 3. Building off of point 2, we wouldn't need to specify any more detail
> than that "SourceTask::updateOffsets will be invoked directly before
> committing offsets, wit

Re: [DISCUSS] KIP-933 Publish metrics when source connector fails to poll data

2023-07-05 Thread Sagar
Hi Ravindra,

One minor thing, the discussion thread URL that you had provided points to
an incorrect page. Can you plz update it to this (
https://www.mail-archive.com/dev@kafka.apache.org/msg131894.html)?

Thanks!
Sagar.

On Sun, Jul 2, 2023 at 12:06 AM Ravindra Nath Kakarla <
ravindhran...@gmail.com> wrote:

> Thanks for reviewing and providing the feedback.
>
> > 1) Does it make sense to drop the *record *part from the metric name as
> it
> doesn't seem to serve much purpose? I would rather call the metric as
> *source-poll-errors-total
>
> Yes, "records" is not needed and misleading.
>
> > Staying on names, I am thinking, does it make more sense to have
> *failures* in the name instead of *errors *i.e.*
> source-poll-failures-total* and
> *source-poll-failures-rate*? What do you think?
>
> Agree, "failures" is a more appropriate term here.
>
> > Regarding the inclusion of retriable exceptions, as of today, source
> tasks don't retry even in cases of RetriableException. A PR was created to
> modify this behaviour (https://github.com/apache/kafka/pull/13726) but the
> reason I bring it up is that in that PR, the failures etc for retry context
> would be computed from the RetryWithToleranceOperator. I am not sure when
> would that get merged, but does it change the failure counting logic in any
> ways?
>
> In my opinion, we should ignore retryable exceptions when SourceTasks
> switches to using RetryWithToleranceOperator. I can update the KIP to call
> this out. If the PR for this KIP is implemented first, we can include both
> retriable and non-retriable exceptions. I can also add a comment on
> https://github.com/apache/kafka/pull/13726 to remove them. What do you
> think?
>
> Thank you
>
>
> On Wed, Jun 28, 2023 at 1:09 PM Sagar  wrote:
>
> > Hey Ravindra,
> >
> > Thanks for the KIP! It appears to be a useful addition to the metrics to
> > understand poll related failures which can go untracked as of now. I just
> > have a couple of minor comments:
> >
> > 1) Does it make sense to drop the *record *part from the metric name as
> it
> > doesn't seem to serve much purpose? I would rather call the metric as
> > *source-poll-errors-total
> > *and *source-poll-errors-rate*.
> > 2) Staying on names, I am thinking, does it make more sense to have
> > *failures* in the name instead of *errors *i.e.*
> > source-poll-failures-total* and
> > *source-poll-failures-rate*? What do you think?
> > 3) Regarding the inclusion of retriable exceptions, as of today, source
> > tasks don't retry even in cases of RetriableException. A PR was created
> to
> > modify this behaviour (https://github.com/apache/kafka/pull/13726) but
> the
> > reason I bring it up is that in that PR, the failures etc for retry
> context
> > would be computed from the RetryWithToleranceOperator. I am not sure when
> > would that get merged, but does it change the failure counting logic in
> any
> > ways?
> >
> > Thanks!
> > Sagar.
> >
> >
> > On Sun, Jun 25, 2023 at 12:40 AM Ravindra Nath Kakarla <
> > ravindhran...@gmail.com> wrote:
> >
> > > Hello,
> > >
> > > I would like to start a discussion on KIP-933 to add new metrics to
> Kafka
> > > Connect that helps  monitoring polling failures with source connectors.
> > >
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-933%3A+Publish+metrics+when+source+connector+fails+to+poll+data
> > >
> > > Looking forward to feedback on this.
> > >
> > > Thank you,
> > > Ravindranath
> > >
> >
>


Re: [DISCUSS] KIP-933 Publish metrics when source connector fails to poll data

2023-07-05 Thread Sagar
Hi Ravindra,

When you say

we should ignore retryable exceptions when SourceTasks switches to using
> RetryWithToleranceOperator.


you mean the metrics computation should be avoided?

Now that I think about it, it might be better to keep the PR and the KIP
separated from each other. For now, because there are no retries via
RetryToleranceOperator, if poll() fails, we can just count it as a poll
failure for both retriable and non-retriable(as you pointed out).

Let me know what you think.

Thanks!
Sagar.


On Sun, Jul 2, 2023 at 12:06 AM Ravindra Nath Kakarla <
ravindhran...@gmail.com> wrote:

> Thanks for reviewing and providing the feedback.
>
> > 1) Does it make sense to drop the *record *part from the metric name as
> it
> doesn't seem to serve much purpose? I would rather call the metric as
> *source-poll-errors-total
>
> Yes, "records" is not needed and misleading.
>
> > Staying on names, I am thinking, does it make more sense to have
> *failures* in the name instead of *errors *i.e.*
> source-poll-failures-total* and
> *source-poll-failures-rate*? What do you think?
>
> Agree, "failures" is a more appropriate term here.
>
> > Regarding the inclusion of retriable exceptions, as of today, source
> tasks don't retry even in cases of RetriableException. A PR was created to
> modify this behaviour (https://github.com/apache/kafka/pull/13726) but the
> reason I bring it up is that in that PR, the failures etc for retry context
> would be computed from the RetryWithToleranceOperator. I am not sure when
> would that get merged, but does it change the failure counting logic in any
> ways?
>
> In my opinion, we should ignore retryable exceptions when SourceTasks
> switches to using RetryWithToleranceOperator. I can update the KIP to call
> this out. If the PR for this KIP is implemented first, we can include both
> retriable and non-retriable exceptions. I can also add a comment on
> https://github.com/apache/kafka/pull/13726 to remove them. What do you
> think?
>
> Thank you
>
>
> On Wed, Jun 28, 2023 at 1:09 PM Sagar  wrote:
>
> > Hey Ravindra,
> >
> > Thanks for the KIP! It appears to be a useful addition to the metrics to
> > understand poll related failures which can go untracked as of now. I just
> > have a couple of minor comments:
> >
> > 1) Does it make sense to drop the *record *part from the metric name as
> it
> > doesn't seem to serve much purpose? I would rather call the metric as
> > *source-poll-errors-total
> > *and *source-poll-errors-rate*.
> > 2) Staying on names, I am thinking, does it make more sense to have
> > *failures* in the name instead of *errors *i.e.*
> > source-poll-failures-total* and
> > *source-poll-failures-rate*? What do you think?
> > 3) Regarding the inclusion of retriable exceptions, as of today, source
> > tasks don't retry even in cases of RetriableException. A PR was created
> to
> > modify this behaviour (https://github.com/apache/kafka/pull/13726) but
> the
> > reason I bring it up is that in that PR, the failures etc for retry
> context
> > would be computed from the RetryWithToleranceOperator. I am not sure when
> > would that get merged, but does it change the failure counting logic in
> any
> > ways?
> >
> > Thanks!
> > Sagar.
> >
> >
> > On Sun, Jun 25, 2023 at 12:40 AM Ravindra Nath Kakarla <
> > ravindhran...@gmail.com> wrote:
> >
> > > Hello,
> > >
> > > I would like to start a discussion on KIP-933 to add new metrics to
> Kafka
> > > Connect that helps  monitoring polling failures with source connectors.
> > >
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-933%3A+Publish+metrics+when+source+connector+fails+to+poll+data
> > >
> > > Looking forward to feedback on this.
> > >
> > > Thank you,
> > > Ravindranath
> > >
> >
>


Re: [DISCUSS] KIP-933 Publish metrics when source connector fails to poll data

2023-06-28 Thread Sagar
Hey Ravindra,

Thanks for the KIP! It appears to be a useful addition to the metrics to
understand poll related failures which can go untracked as of now. I just
have a couple of minor comments:

1) Does it make sense to drop the *record *part from the metric name as it
doesn't seem to serve much purpose? I would rather call the metric as
*source-poll-errors-total
*and *source-poll-errors-rate*.
2) Staying on names, I am thinking, does it make more sense to have
*failures* in the name instead of *errors *i.e.*
source-poll-failures-total* and
*source-poll-failures-rate*? What do you think?
3) Regarding the inclusion of retriable exceptions, as of today, source
tasks don't retry even in cases of RetriableException. A PR was created to
modify this behaviour (https://github.com/apache/kafka/pull/13726) but the
reason I bring it up is that in that PR, the failures etc for retry context
would be computed from the RetryWithToleranceOperator. I am not sure when
would that get merged, but does it change the failure counting logic in any
ways?

Thanks!
Sagar.


On Sun, Jun 25, 2023 at 12:40 AM Ravindra Nath Kakarla <
ravindhran...@gmail.com> wrote:

> Hello,
>
> I would like to start a discussion on KIP-933 to add new metrics to Kafka
> Connect that helps  monitoring polling failures with source connectors.
>
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-933%3A+Publish+metrics+when+source+connector+fails+to+poll+data
>
> Looking forward to feedback on this.
>
> Thank you,
> Ravindranath
>


Re: How to Integrate MySQL database with Kafka? Need Demo

2023-06-27 Thread Sagar
Hi Avani,

I already shared the documentation link for Kafka Connect. Let me share it
again: https://kafka.apache.org/documentation/#connect

Regarding the connector documentation, you should be able to find them by
just searching for JDBC source connector, JDBC sink connector and Debezium
connector for MySQL.

Let me know if that works.

Thanks!
Sagar.



On Mon, Jun 26, 2023 at 2:10 PM Avani Panchal
 wrote:

> Hi Sagar,
>
> Thank you for the information, you solved our confusion.
> I also saw lots of links for documentation on Kafka, but I am confused
> which document I should use.
> So can you share the proper link from where I can read the documents.
>
> Thanks,
> Avani Panchal
>
>
> On Mon, Jun 26, 2023 at 1:48 PM Sagar  wrote:
>
> > Hey Avani,
> >
> > Kafka Connect <https://kafka.apache.org/documentation/#connect> is the
> > tool
> > to use when you want to stream data to/from Kafka via external systems.
> One
> > would typically configure connectors which allow streaming data to/from
> > Kafka. There are 2 types of connectors:
> > 1) Source Connectors: Which stream data from external systems like
> > databases etc to Kafka and
> > 2) Sink Connectors: Which stream data from Kafka to external systems.
> >
> > Since you want to stream data from MySQL to SQL Server, with Kafka
> Connect
> > it would be a 2 step process:
> >
> > 1) Capture changes from MySQL to Kafka using connectors like JDBC source
> > connector or Debezium MySQL connector.
> > 2) Once the data is in Kafka, you can use JDBC sink connectors to stream
> > data from Kafka topics to the tables in SQL Server.
> >
> > Note that this is a very simplified view of how you can achieve your goal
> > of streaming changes from MySQL to SQL Server and I would recommend
> reading
> > the documentation of the individual connectors and the Kafka Connect
> > framework to understand how to make it work for your usecase.
> >
> > Thanks for your interest on Apache Kafka!
> >
> > Thanks!
> > Sagar.
> >
> >
> > On Mon, Jun 26, 2023 at 11:42 AM Avani Panchal
> >  wrote:
> >
> > > Hi,
> > > In my application I  want to sync my client's data to my SQL server. at
> > > client place the database is MYSQL.
> > >
> > > How can I achieve this using Kafka? I read a lot of documents but I
> don't
> > > understand which setup I need and how I can achieve it.
> > >
> > > I was also wondering about "Book a demo with Kafka" but didn't find it.
> > >
> > > Please help me.
> > >
> > > Thank you,
> > > Avani
> > >
> >
>


[jira] [Created] (KAFKA-15127) Allow offsets to be reset at the same time a connector is deleted.

2023-06-27 Thread Sagar Rao (Jira)
Sagar Rao created KAFKA-15127:
-

 Summary: Allow offsets to be reset at the same time a connector is 
deleted.
 Key: KAFKA-15127
 URL: https://issues.apache.org/jira/browse/KAFKA-15127
 Project: Kafka
  Issue Type: Improvement
  Components: KafkaConnect
Reporter: Sagar Rao


This has been listed as [Future 
Work|https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect#KIP875:FirstclassoffsetssupportinKafkaConnect-Automaticallydeleteoffsetswithconnectors]
 in KIP-875. Now that the delete offsets mechanism is also in place, we can 
take this up which will allow connector names to be reused after connector 
deletion. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: How to Integrate MySQL database with Kafka? Need Demo

2023-06-26 Thread Sagar
Hey Avani,

Kafka Connect <https://kafka.apache.org/documentation/#connect> is the tool
to use when you want to stream data to/from Kafka via external systems. One
would typically configure connectors which allow streaming data to/from
Kafka. There are 2 types of connectors:
1) Source Connectors: Which stream data from external systems like
databases etc to Kafka and
2) Sink Connectors: Which stream data from Kafka to external systems.

Since you want to stream data from MySQL to SQL Server, with Kafka Connect
it would be a 2 step process:

1) Capture changes from MySQL to Kafka using connectors like JDBC source
connector or Debezium MySQL connector.
2) Once the data is in Kafka, you can use JDBC sink connectors to stream
data from Kafka topics to the tables in SQL Server.

Note that this is a very simplified view of how you can achieve your goal
of streaming changes from MySQL to SQL Server and I would recommend reading
the documentation of the individual connectors and the Kafka Connect
framework to understand how to make it work for your usecase.

Thanks for your interest on Apache Kafka!

Thanks!
Sagar.


On Mon, Jun 26, 2023 at 11:42 AM Avani Panchal
 wrote:

> Hi,
> In my application I  want to sync my client's data to my SQL server. at
> client place the database is MYSQL.
>
> How can I achieve this using Kafka? I read a lot of documents but I don't
> understand which setup I need and how I can achieve it.
>
> I was also wondering about "Book a demo with Kafka" but didn't find it.
>
> Please help me.
>
> Thank you,
> Avani
>


Re: [DISCUSS] KIP-910: Update Source offsets for Source Connectors without producing records

2023-06-21 Thread Sagar
Hi All,

I have created this PR: https://github.com/apache/kafka/pull/13899 which
implements the approach outlined in the latest version of the KIP. I
thought I could use this to validate the approach based on my understanding
while the KIP itself gets reviewed. I can always change the implementation
once we move to a final decision on the KIP.

Thanks!
Sagar.


On Wed, Jun 14, 2023 at 4:59 PM Sagar  wrote:

> Hey All,
>
> Bumping this discussion thread again to see how the modified KIP looks
> like.
>
> Thanks!
> Sagar.
>
> On Mon, May 29, 2023 at 8:12 PM Sagar  wrote:
>
>> Hi,
>>
>> Bumping this thread again for further reviews.
>>
>> Thanks!
>> Sagar.
>>
>> On Fri, May 12, 2023 at 3:38 PM Sagar  wrote:
>>
>>> Hi All,
>>>
>>> Thanks for the comments/reviews. I have updated the KIP
>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-910%3A+Update+Source+offsets+for+Source+Connectors+without+producing+records
>>> with a newer approach which shelves the need for an explicit topic.
>>>
>>> Please review again and let me know what you think.
>>>
>>> Thanks!
>>> Sagar.
>>>
>>>
>>> On Mon, Apr 24, 2023 at 3:35 PM Yash Mayya  wrote:
>>>
>>>> Hi Sagar,
>>>>
>>>> Thanks for the KIP! I have a few questions and comments:
>>>>
>>>> 1) I agree with Chris' point about the separation of a connector
>>>> heartbeat
>>>> mechanism and allowing source connectors to generate offsets without
>>>> producing data. What is the purpose of the heartbeat topic here and are
>>>> there any concrete use cases for downstream consumers on this topic? Why
>>>> can't we instead simply introduce a mechanism to retrieve a list of
>>>> source
>>>> partition / source offset pairs from the source tasks?
>>>>
>>>> 2) With the currently described mechanism, the new
>>>> "SourceTask::produceHeartbeatRecords" method returns a
>>>> "List"
>>>> - what happens with the topic in each of these source records? Chris
>>>> pointed this out above, but it doesn't seem to have been addressed? The
>>>> "SourceRecord" class also has a bunch of other fields which will be
>>>> irrelevant here (partition, key / value schema, key / value data,
>>>> timestamp, headers). In fact, it seems like only the source partition
>>>> and
>>>> source offset are relevant here, so we should either introduce a new
>>>> abstraction or simply use a data structure like a mapping from source
>>>> partitions to source offsets (adds to the above point)?
>>>>
>>>> 3) I'm not sure I fully follow why the heartbeat timer / interval is
>>>> needed? What are the downsides of
>>>> calling "SourceTask::produceHeartbeatRecords" in every execution loop
>>>> (similar to the existing "SourceTask::poll" method)? Is this only to
>>>> prevent the generation of a lot of offset records? Since Connect's
>>>> offsets
>>>> topics are log compacted (and source partitions are used as keys for
>>>> each
>>>> source offset), I'm not sure if such concerns are valid and such a
>>>> heartbeat timer / interval mechanism is required?
>>>>
>>>> 4) The first couple of rejected alternatives state that the use of a
>>>> null
>>>> topic / key / value are preferably avoided - but the current proposal
>>>> would
>>>> also likely require connectors to use such workarounds (null topic when
>>>> the
>>>> heartbeat topic is configured at a worker level and always for the key /
>>>> value)?
>>>>
>>>> 5) The third rejected alternative talks about subclassing the
>>>> "SourceRecord" class - this presumably means allowing connectors to pass
>>>> special offset only records via the existing poll mechanism? Why was
>>>> this
>>>> considered a more invasive option? Was it because of the backward
>>>> compatibility issues that would be introduced for plugins using the new
>>>> public API class that still need to be deployed onto older Connect
>>>> workers?
>>>>
>>>> Thanks,
>>>> Yash
>>>>
>>>> On Fri, Apr 14, 2023 at 6:45 PM Sagar 
>>>> wrote:
>>>>
>>>> > One thing I forgot to mention in my previous email was 

[jira] [Resolved] (KAFKA-14913) Migrate DistributedHerder Executor shutdown to use ThreadUtils#shutdownExecutorServiceQuietly

2023-06-18 Thread Sagar Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14913?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sagar Rao resolved KAFKA-14913.
---
Resolution: Fixed

> Migrate DistributedHerder Executor shutdown to use 
> ThreadUtils#shutdownExecutorServiceQuietly
> -
>
> Key: KAFKA-14913
> URL: https://issues.apache.org/jira/browse/KAFKA-14913
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Sagar Rao
>Assignee: Sagar Rao
>Priority: Minor
>
> Some context here: 
> https://github.com/apache/kafka/pull/13557#issuecomment-1509738740



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] KIP-910: Update Source offsets for Source Connectors without producing records

2023-06-14 Thread Sagar
Hey All,

Bumping this discussion thread again to see how the modified KIP looks
like.

Thanks!
Sagar.

On Mon, May 29, 2023 at 8:12 PM Sagar  wrote:

> Hi,
>
> Bumping this thread again for further reviews.
>
> Thanks!
> Sagar.
>
> On Fri, May 12, 2023 at 3:38 PM Sagar  wrote:
>
>> Hi All,
>>
>> Thanks for the comments/reviews. I have updated the KIP
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-910%3A+Update+Source+offsets+for+Source+Connectors+without+producing+records
>> with a newer approach which shelves the need for an explicit topic.
>>
>> Please review again and let me know what you think.
>>
>> Thanks!
>> Sagar.
>>
>>
>> On Mon, Apr 24, 2023 at 3:35 PM Yash Mayya  wrote:
>>
>>> Hi Sagar,
>>>
>>> Thanks for the KIP! I have a few questions and comments:
>>>
>>> 1) I agree with Chris' point about the separation of a connector
>>> heartbeat
>>> mechanism and allowing source connectors to generate offsets without
>>> producing data. What is the purpose of the heartbeat topic here and are
>>> there any concrete use cases for downstream consumers on this topic? Why
>>> can't we instead simply introduce a mechanism to retrieve a list of
>>> source
>>> partition / source offset pairs from the source tasks?
>>>
>>> 2) With the currently described mechanism, the new
>>> "SourceTask::produceHeartbeatRecords" method returns a
>>> "List"
>>> - what happens with the topic in each of these source records? Chris
>>> pointed this out above, but it doesn't seem to have been addressed? The
>>> "SourceRecord" class also has a bunch of other fields which will be
>>> irrelevant here (partition, key / value schema, key / value data,
>>> timestamp, headers). In fact, it seems like only the source partition and
>>> source offset are relevant here, so we should either introduce a new
>>> abstraction or simply use a data structure like a mapping from source
>>> partitions to source offsets (adds to the above point)?
>>>
>>> 3) I'm not sure I fully follow why the heartbeat timer / interval is
>>> needed? What are the downsides of
>>> calling "SourceTask::produceHeartbeatRecords" in every execution loop
>>> (similar to the existing "SourceTask::poll" method)? Is this only to
>>> prevent the generation of a lot of offset records? Since Connect's
>>> offsets
>>> topics are log compacted (and source partitions are used as keys for each
>>> source offset), I'm not sure if such concerns are valid and such a
>>> heartbeat timer / interval mechanism is required?
>>>
>>> 4) The first couple of rejected alternatives state that the use of a null
>>> topic / key / value are preferably avoided - but the current proposal
>>> would
>>> also likely require connectors to use such workarounds (null topic when
>>> the
>>> heartbeat topic is configured at a worker level and always for the key /
>>> value)?
>>>
>>> 5) The third rejected alternative talks about subclassing the
>>> "SourceRecord" class - this presumably means allowing connectors to pass
>>> special offset only records via the existing poll mechanism? Why was this
>>> considered a more invasive option? Was it because of the backward
>>> compatibility issues that would be introduced for plugins using the new
>>> public API class that still need to be deployed onto older Connect
>>> workers?
>>>
>>> Thanks,
>>> Yash
>>>
>>> On Fri, Apr 14, 2023 at 6:45 PM Sagar  wrote:
>>>
>>> > One thing I forgot to mention in my previous email was that the reason
>>> I
>>> > chose to include the opt-in behaviour via configs was that the users
>>> of the
>>> > connector know their workload patterns. If the workload is such that
>>> the
>>> >  connector would receive regular valid updates then there’s ideally no
>>> need
>>> > for moving offsets since it would update automatically.
>>> >
>>> > This way they aren’t forced to use this feature and can use it only
>>> when
>>> > the workload is expected to be batchy or not frequent.
>>> >
>>> > Thanks!
>>> > Sagar.
>>> >
>>> >
>>> > On Fri, 14 Apr 2023 at 5:32 PM, Sagar 
>>> wrote:
>>> >
>>> > > Hi Chris,
>>> >

Re: [VOTE] 3.4.1 RC3

2023-06-06 Thread Prem Sagar
Please remove my mail ID from the database.


Warm Regards
K Prem Sagar
Sr.Manager - Procurements
 M: + 91 - 9100939886
p...@pidatacenters.com
 Amaravati | Bengaluru | Chennai | Delhi | Hyderabad | Kochi | Mumbai
<https://pidatacenters.com/>

<https://pidatacenters.com/news/indias-best-multi-tenant-data-center-service-provider-dcd/>
<https://www.greatplacetowork.in/great/rated/50-great/Pi-DATA/>
<https://www.linkedin.com/company/6437312/>
<https://www.facebook.com/pidatacenters/>
<https://twitter.com/pi_datacenters> <https://pidatacenters.com/>


On Mon, Jun 5, 2023 at 2:47 PM Josep Prat 
wrote:

> Hi Luke,
>
> Thanks a lot for the patience you had for this release!
>
> @Prem you are probably subscribed to either the user or dev mailing list
> for Apache Kafka, this is why you are receiving these emails.
>
> Best,
>
> On Mon, Jun 5, 2023 at 10:32 AM Prem Sagar  .invalid>
> wrote:
>
> > Why this mail is marked to me ?
> >
> >
> > Warm Regards
> > K Prem Sagar
> > Sr.Manager - Procurements
> >  M: + 91 - 9100939886
> > p...@pidatacenters.com
> >  Amaravati | Bengaluru | Chennai | Delhi | Hyderabad | Kochi | Mumbai
> > <https://pidatacenters.com/>
> >
> > <
> >
> https://pidatacenters.com/news/indias-best-multi-tenant-data-center-service-provider-dcd/
> > >
> > <https://www.greatplacetowork.in/great/rated/50-great/Pi-DATA/>
> > <https://www.linkedin.com/company/6437312/>
> > <https://www.facebook.com/pidatacenters/>
> > <https://twitter.com/pi_datacenters> <https://pidatacenters.com/>
> >
> >
> > On Mon, Jun 5, 2023 at 1:47 PM Luke Chen  wrote:
> >
> > > Hi Tom,
> > >
> > > Thanks for the vote.
> > > I've re-run the 3.4 jenkins build, and the
> > > `DynamicBrokerReconfigurationTest.testAdvertisedListenerUpdate` test
> > still
> > > pass.
> > > https://ci-builds.apache.org/job/Kafka/job/kafka/job/3.4/142/
> > >
> > > And now, I've got:
> > > Binding +1 PMC votes:
> > > * Chris Egerton
> > > * Mickael Maison
> > > * Tom Bentley
> > >
> > > Non-binding votes:
> > > * Federico Valeri
> > > * Jakub Scholz
> > > * Josep Prat
> > >
> > > I will close this vote thread and go ahead to complete the release
> > process.
> > >
> > >
> > > Thank you.
> > > Luke
> > >
> > > On Fri, Jun 2, 2023 at 5:06 PM Josep Prat  >
> > > wrote:
> > >
> > > > Hi Tom,
> > > > it failed for me a couple of times, I rebooted and things suddenly
> > > worked.
> > > > So maybe there was a dangling process holding a port from previous
> test
> > > > failures.
> > > >
> > > > On Fri, Jun 2, 2023 at 10:52 AM Tom Bentley 
> > wrote:
> > > >
> > > > > Hi Luke,
> > > > >
> > > > > Thanks for running the release.
> > > > >
> > > > > I've checked signatures, eyeballed the Javadocs, built from source
> > and
> > > > run
> > > > > the unit and integration tests.
> > > > > DynamicBrokerReconfigurationTest.testAdvertisedListenerUpdate fails
> > for
> > > > me
> > > > > repeatedly. I opened
> > https://issues.apache.org/jira/browse/KAFKA-15049
> > > > for
> > > > > it since I couldn't find an existing issue for this one. I note
> that
> > > > others
> > > > > seem to have run the integration tests without problems, so I don't
> > > think
> > > > > this is a blocker. I also did the Kafka, Connect and Streams
> > > quickstarts.
> > > > >
> > > > > +1 binding.
> > > > >
> > > > > Tom
> > > > >
> > > > >
> > > > >
> > > > > On Thu, 1 Jun 2023 at 08:46, Luke Chen  wrote:
> > > > >
> > > > > > Hi all,
> > > > > >
> > > > > > Thanks to everyone who has tested and voted for the RC3 so far!
> > > > > > Currently, I've got 2 binding votes and 3 non-binding votes:
> > > > > >
> > > > > > Binding +1 PMC votes:
> > > > > > * Chris Egerton
> > > > > > * Mickael Maison
> > > > > >
> > > > > > Non-binding votes:
> > > > > > * Federico Valeri
> > > > > > * Jak

Re: [VOTE] 3.4.1 RC3

2023-06-06 Thread Prem Sagar
Why this mail is marked to me ?


Warm Regards
K Prem Sagar
Sr.Manager - Procurements
 M: + 91 - 9100939886
p...@pidatacenters.com
 Amaravati | Bengaluru | Chennai | Delhi | Hyderabad | Kochi | Mumbai
<https://pidatacenters.com/>

<https://pidatacenters.com/news/indias-best-multi-tenant-data-center-service-provider-dcd/>
<https://www.greatplacetowork.in/great/rated/50-great/Pi-DATA/>
<https://www.linkedin.com/company/6437312/>
<https://www.facebook.com/pidatacenters/>
<https://twitter.com/pi_datacenters> <https://pidatacenters.com/>


On Mon, Jun 5, 2023 at 1:47 PM Luke Chen  wrote:

> Hi Tom,
>
> Thanks for the vote.
> I've re-run the 3.4 jenkins build, and the
> `DynamicBrokerReconfigurationTest.testAdvertisedListenerUpdate` test still
> pass.
> https://ci-builds.apache.org/job/Kafka/job/kafka/job/3.4/142/
>
> And now, I've got:
> Binding +1 PMC votes:
> * Chris Egerton
> * Mickael Maison
> * Tom Bentley
>
> Non-binding votes:
> * Federico Valeri
> * Jakub Scholz
> * Josep Prat
>
> I will close this vote thread and go ahead to complete the release process.
>
>
> Thank you.
> Luke
>
> On Fri, Jun 2, 2023 at 5:06 PM Josep Prat 
> wrote:
>
> > Hi Tom,
> > it failed for me a couple of times, I rebooted and things suddenly
> worked.
> > So maybe there was a dangling process holding a port from previous test
> > failures.
> >
> > On Fri, Jun 2, 2023 at 10:52 AM Tom Bentley  wrote:
> >
> > > Hi Luke,
> > >
> > > Thanks for running the release.
> > >
> > > I've checked signatures, eyeballed the Javadocs, built from source and
> > run
> > > the unit and integration tests.
> > > DynamicBrokerReconfigurationTest.testAdvertisedListenerUpdate fails for
> > me
> > > repeatedly. I opened https://issues.apache.org/jira/browse/KAFKA-15049
> > for
> > > it since I couldn't find an existing issue for this one. I note that
> > others
> > > seem to have run the integration tests without problems, so I don't
> think
> > > this is a blocker. I also did the Kafka, Connect and Streams
> quickstarts.
> > >
> > > +1 binding.
> > >
> > > Tom
> > >
> > >
> > >
> > > On Thu, 1 Jun 2023 at 08:46, Luke Chen  wrote:
> > >
> > > > Hi all,
> > > >
> > > > Thanks to everyone who has tested and voted for the RC3 so far!
> > > > Currently, I've got 2 binding votes and 3 non-binding votes:
> > > >
> > > > Binding +1 PMC votes:
> > > > * Chris Egerton
> > > > * Mickael Maison
> > > >
> > > > Non-binding votes:
> > > > * Federico Valeri
> > > > * Jakub Scholz
> > > > * Josep Prat
> > > >
> > > > If anyone is available (especially PMC members :)), please help
> verify
> > > the
> > > > RC build.
> > > >
> > > > Thank you.
> > > > Luke
> > > >
> > > > On Wed, May 31, 2023 at 1:53 AM Chris Egerton
>  > >
> > > > wrote:
> > > >
> > > > > Hi Luke,
> > > > >
> > > > > Many thanks for your continued work on this release!
> > > > >
> > > > > To verify, I:
> > > > > - Built from source using Java 11 with both:
> > > > > - - the 3.4.1-rc3 tag on GitHub
> > > > > - - the kafka-3.4.1-src.tgz artifact from
> > > > > https://home.apache.org/~showuon/kafka-3.4.1-rc3/
> > > > > - Checked signatures and checksums
> > > > > - Ran the quickstart using the kafka_2.13-3.4.1.tgz artifact from
> > > > > https://home.apache.org/~showuon/kafka-3.4.1-rc3/ with Java 11 and
> > > Scala
> > > > > 13
> > > > > in KRaft mode
> > > > > - Ran all unit tests
> > > > > - Ran all integration tests for Connect and MM2
> > > > >
> > > > > +1 (binding)
> > > > >
> > > > > Cheers,
> > > > >
> > > > > Chris
> > > > >
> > > > > On Tue, May 30, 2023 at 11:16 AM Mickael Maison <
> > > > mickael.mai...@gmail.com>
> > > > > wrote:
> > > > >
> > > > > > Hi Luke,
> > > > > >
> > > > > > I built from source with Java 11 and Scala 2.13 and ran the unit
> > and
> > > > > > integration tests. It took a few retries to get some of t

[jira] [Created] (KAFKA-15041) Source Connector auto topic creation fails when topic is deleted and brokers don't support auto topic creation

2023-05-31 Thread Sagar Rao (Jira)
Sagar Rao created KAFKA-15041:
-

 Summary: Source Connector auto topic creation fails when topic is 
deleted and brokers don't support auto topic creation
 Key: KAFKA-15041
 URL: https://issues.apache.org/jira/browse/KAFKA-15041
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Reporter: Sagar Rao
Assignee: Sagar Rao


[KIP-158|https://cwiki.apache.org/confluence/display/KAFKA/KIP-158%3A+Kafka+Connect+should+allow+source+connectors+to+set+topic-specific+settings+for+new+topics]
  allows the source connectors to create topics even when the broker doesn't 
allow to do so. It does so by checking for every record if a topic needs to be 
created 
[https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java#L500.]
 To not always keep checking for topic presence via admin topics, it also 
maintains a cache of the topics that it has created and doesn't create those 
anymore. This helps to create topics when brokers don't support automatic topic 
creation.

However, lets say the topic gets created initially and later on gets deleted 
while the connector is still running and the brokers don't support automatic 
topic creation. For such cases, the connector has cached the topic it has 
already created and wouldn't recreate it because the cache never updates and 
since the broker doesn't support topic creation, the logs would just be full of 
messages like 

```

 Error while fetching metadata with correlation id 3260 : 
\{connect-test=UNKNOWN_TOPIC_OR_PARTITION} 

```

 

This can become a problem on enviroments where brokers don't allow topic 
creation. We need a way to refresh the topics cache for such cases.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] KIP-910: Update Source offsets for Source Connectors without producing records

2023-05-29 Thread Sagar
Hi,

Bumping this thread again for further reviews.

Thanks!
Sagar.

On Fri, May 12, 2023 at 3:38 PM Sagar  wrote:

> Hi All,
>
> Thanks for the comments/reviews. I have updated the KIP
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-910%3A+Update+Source+offsets+for+Source+Connectors+without+producing+records
> with a newer approach which shelves the need for an explicit topic.
>
> Please review again and let me know what you think.
>
> Thanks!
> Sagar.
>
>
> On Mon, Apr 24, 2023 at 3:35 PM Yash Mayya  wrote:
>
>> Hi Sagar,
>>
>> Thanks for the KIP! I have a few questions and comments:
>>
>> 1) I agree with Chris' point about the separation of a connector heartbeat
>> mechanism and allowing source connectors to generate offsets without
>> producing data. What is the purpose of the heartbeat topic here and are
>> there any concrete use cases for downstream consumers on this topic? Why
>> can't we instead simply introduce a mechanism to retrieve a list of source
>> partition / source offset pairs from the source tasks?
>>
>> 2) With the currently described mechanism, the new
>> "SourceTask::produceHeartbeatRecords" method returns a
>> "List"
>> - what happens with the topic in each of these source records? Chris
>> pointed this out above, but it doesn't seem to have been addressed? The
>> "SourceRecord" class also has a bunch of other fields which will be
>> irrelevant here (partition, key / value schema, key / value data,
>> timestamp, headers). In fact, it seems like only the source partition and
>> source offset are relevant here, so we should either introduce a new
>> abstraction or simply use a data structure like a mapping from source
>> partitions to source offsets (adds to the above point)?
>>
>> 3) I'm not sure I fully follow why the heartbeat timer / interval is
>> needed? What are the downsides of
>> calling "SourceTask::produceHeartbeatRecords" in every execution loop
>> (similar to the existing "SourceTask::poll" method)? Is this only to
>> prevent the generation of a lot of offset records? Since Connect's offsets
>> topics are log compacted (and source partitions are used as keys for each
>> source offset), I'm not sure if such concerns are valid and such a
>> heartbeat timer / interval mechanism is required?
>>
>> 4) The first couple of rejected alternatives state that the use of a null
>> topic / key / value are preferably avoided - but the current proposal
>> would
>> also likely require connectors to use such workarounds (null topic when
>> the
>> heartbeat topic is configured at a worker level and always for the key /
>> value)?
>>
>> 5) The third rejected alternative talks about subclassing the
>> "SourceRecord" class - this presumably means allowing connectors to pass
>> special offset only records via the existing poll mechanism? Why was this
>> considered a more invasive option? Was it because of the backward
>> compatibility issues that would be introduced for plugins using the new
>> public API class that still need to be deployed onto older Connect
>> workers?
>>
>> Thanks,
>> Yash
>>
>> On Fri, Apr 14, 2023 at 6:45 PM Sagar  wrote:
>>
>> > One thing I forgot to mention in my previous email was that the reason I
>> > chose to include the opt-in behaviour via configs was that the users of
>> the
>> > connector know their workload patterns. If the workload is such that the
>> >  connector would receive regular valid updates then there’s ideally no
>> need
>> > for moving offsets since it would update automatically.
>> >
>> > This way they aren’t forced to use this feature and can use it only when
>> > the workload is expected to be batchy or not frequent.
>> >
>> > Thanks!
>> > Sagar.
>> >
>> >
>> > On Fri, 14 Apr 2023 at 5:32 PM, Sagar 
>> wrote:
>> >
>> > > Hi Chris,
>> > >
>> > > Thanks for following up on the response. Sharing my thoughts further:
>> > >
>> > > If we want to add support for connectors to emit offsets without
>> > >> accompanying source records, we could (and IMO should) do that
>> without
>> > >> requiring users to manually enable that feature by adjusting worker
>> or
>> > >> connector configurations.
>> > >
>> > >
>> > > With the current KIP design, I have tried to implement this in an
>> opt-in
>> > > manner via configs. I

[jira] [Created] (KAFKA-14997) JmxToolTest failing with initializationError

2023-05-13 Thread Sagar Rao (Jira)
Sagar Rao created KAFKA-14997:
-

 Summary: JmxToolTest failing with initializationError
 Key: KAFKA-14997
 URL: https://issues.apache.org/jira/browse/KAFKA-14997
 Project: Kafka
  Issue Type: Bug
Reporter: Sagar Rao
Assignee: Federico Valeri


Noticed that JmxToolTest fails with 

```
h4. Error
java.io.IOException: Cannot bind to URL [rmi://:44743/jmxrmi]: 
javax.naming.ServiceUnavailableException [Root exception is 
java.rmi.ConnectException: Connection refused to host: 40.117.157.99; nested 
exception is: 
 java.net.ConnectException: Connection timed out (Connection timed out)]
h4. Stacktrace
java.io.IOException: Cannot bind to URL [rmi://:44743/jmxrmi]: 
javax.naming.ServiceUnavailableException [Root exception is 
java.rmi.ConnectException: Connection refused to host: 40.117.157.99; nested 
exception is: 
 java.net.ConnectException: Connection timed out (Connection timed out)]
 at 
javax.management.remote.rmi.RMIConnectorServer.newIOException(RMIConnectorServer.java:827)
 at 
javax.management.remote.rmi.RMIConnectorServer.start(RMIConnectorServer.java:432)
 at org.apache.kafka.tools.JmxToolTest.startJmxAgent(JmxToolTest.java:337)
 at org.apache.kafka.tools.JmxToolTest.beforeAll(JmxToolTest.java:55)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
 at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:498)
 at 
org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:727)
 at 
org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
 at 
org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
 at 
org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:156)
 at 
org.junit.jupiter.engine.extension.TimeoutExtension.interceptLifecycleMethod(TimeoutExtension.java:128)
 at 
org.junit.jupiter.engine.extension.TimeoutExtension.interceptBeforeAllMethod(TimeoutExtension.java:70)
 at 
org.junit.jupiter.engine.execution.InterceptingExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(InterceptingExecutableInvoker.java:103)
 at 
org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.lambda$invoke$0(InterceptingExecutableInvoker.java:93)
 at 
org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
 at 
org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)
 at 
org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)
 at 
org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)
 at 
org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:92)
 at 
org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:86)
 at 
org.junit.jupiter.engine.descriptor.ClassBasedTestDescriptor.lambda$invokeBeforeAllMethods$13(ClassBasedTestDescriptor.java:411)
 at 
org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
 at 
org.junit.jupiter.engine.descriptor.ClassBasedTestDescriptor.invokeBeforeAllMethods(ClassBasedTestDescriptor.java:409)
 at 
org.junit.jupiter.engine.descriptor.ClassBasedTestDescriptor.before(ClassBasedTestDescriptor.java:215)
 at 
org.junit.jupiter.engine.descriptor.ClassBasedTestDescriptor.before(ClassBasedTestDescriptor.java:84)
 at 
org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:148)
 at 
org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
 at 
org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141)
 at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
 at 
org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139)
 at 
org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
 at 
org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)
 at 
org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)
 at java.util.ArrayList.forEach(ArrayList.java:1259)
 at 
org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:41)
 at 
org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:155

Re: Test failures

2023-05-13 Thread Sagar
Hey Greg,

I see https://issues.apache.org/jira/browse/KAFKA-14905 is marked as
resolved but I saw testSyncTopicConfigs() failed in
org.apache.kafka.connect.mirror.integration.MirrorConnectorsWithCustomForwardingAdminIntegrationTest.
Here is the build :

https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-13453/6/tests
.

Do you think this is a flaky test which needs to be looked at?

Thanks!
Sagar.

On Thu, Apr 27, 2023 at 2:05 AM Ismael Juma  wrote:

> Thanks!
>
> Ismael
>
> On Wed, Apr 26, 2023, 9:12 AM Sagar  wrote:
>
> > Hi,
> >
> > FYI the pr which fixes the flaky test for connect is merged to trunk
> >
> > Sagar.
> >
> > On Wed, 26 Apr 2023 at 2:45 AM, Manyanda Chitimbo <
> > manyanda.chiti...@gmail.com> wrote:
> >
> > > HI Ismail,
> > >
> > > There is a PR to fix the failure in
> > > https://github.com/apache/kafka/pull/13634
> > >
> > > On Tue, Apr 25, 2023 at 9:40 PM Ismael Juma  wrote:
> > >
> > > > I was looking at the CI builds and I came across the following test
> > > failure
> > > > that seems to be clear and consistent:
> > > >
> > > > org.mockito.exceptions.verification.TooFewActualInvocations:
> > > > kafkaBasedLog.send(, , );
> > > > Wanted 2 times:
> > > > -> at
> > > >
> > org.apache.kafka.connect.util.KafkaBasedLog.send(KafkaBasedLog.java:376)
> > > > But was 1 time:
> > > > -> at
> > > >
> > >
> >
> org.apache.kafka.connect.storage.KafkaStatusBackingStore.sendTopicStatus(KafkaStatusBackingStore.java:315)
> > > >
> > > >
> > > >
> > >
> >
> https://ci-builds.apache.org/job/Kafka/job/kafka/job/trunk/1795/testReport/junit/org.apache.kafka.connect.storage/KafkaStatusBackingStoreFormatTest/Build___JDK_11_and_Scala_2_13___putTopicStateRetriableFailure/
> > > >
> > > > :(
> > > >
> > > > Ismael
> > > >
> > > > On Fri, Apr 21, 2023 at 5:39 PM Sagar 
> > wrote:
> > > >
> > > > > Hi Greg,
> > > > >
> > > > > The fix for 14929 has already been included as part of this pr :
> > > > > https://github.com/apache/kafka/pull/13594
> > > > >
> > > > > I can create a separate pr just for that flaky test if needed. Let
> me
> > > > know
> > > > > .
> > > > >
> > > > > Sagar.
> > > > >
> > > > > On Sat, 22 Apr 2023 at 3:20 AM, Greg Harris
> > >  > > > >
> > > > > wrote:
> > > > >
> > > > > > Hey all,
> > > > > >
> > > > > > We just landed a fix for
> > > > > https://issues.apache.org/jira/browse/KAFKA-14905
> > > > > > which was causing all of
> > > > > > those MirrorConnectorsWithCustomForwardingAdminIntegrationTest
> > > > failures,
> > > > > > and we will monitor the build for any re-occurrances.
> > > > > > Unfortunately we discovered another test flake that was
> introduced
> > > > > recently
> > > > > > but that should have a straightforward resolution:
> > > > > > https://issues.apache.org/jira/browse/KAFKA-14929
> > > > > > Thanks Ismael for merging a fix for
> > > > > > https://issues.apache.org/jira/browse/KAFKA-8115 but it appears
> > that
> > > > > there
> > > > > > is still more investigation needed there, as the test is still
> > > failing
> > > > > > occasionally.
> > > > > >
> > > > > > Thanks,
> > > > > > Greg
> > > > > >
> > > > > > On Fri, Apr 14, 2023 at 12:18 PM Ismael Juma 
> > > > wrote:
> > > > > >
> > > > > > > Thanks Greg! I really appreciate the help.
> > > > > > >
> > > > > > > Ismael
> > > > > > >
> > > > > > > On Fri, Apr 14, 2023 at 12:08 PM Greg Harris
> > > > > >  > > > > > > >
> > > > > > > wrote:
> > > > > > >
> > > > > > > > Hey Ismael,
> > > > > > > >
> > > > > > > > We're working to stabilize the Connect/MM2 tests with the
> > > following
> > > > > > > 

Re: [DISCUSS] KIP-910: Update Source offsets for Source Connectors without producing records

2023-05-12 Thread Sagar
Hi All,

Thanks for the comments/reviews. I have updated the KIP
https://cwiki.apache.org/confluence/display/KAFKA/KIP-910%3A+Update+Source+offsets+for+Source+Connectors+without+producing+records
with a newer approach which shelves the need for an explicit topic.

Please review again and let me know what you think.

Thanks!
Sagar.


On Mon, Apr 24, 2023 at 3:35 PM Yash Mayya  wrote:

> Hi Sagar,
>
> Thanks for the KIP! I have a few questions and comments:
>
> 1) I agree with Chris' point about the separation of a connector heartbeat
> mechanism and allowing source connectors to generate offsets without
> producing data. What is the purpose of the heartbeat topic here and are
> there any concrete use cases for downstream consumers on this topic? Why
> can't we instead simply introduce a mechanism to retrieve a list of source
> partition / source offset pairs from the source tasks?
>
> 2) With the currently described mechanism, the new
> "SourceTask::produceHeartbeatRecords" method returns a "List"
> - what happens with the topic in each of these source records? Chris
> pointed this out above, but it doesn't seem to have been addressed? The
> "SourceRecord" class also has a bunch of other fields which will be
> irrelevant here (partition, key / value schema, key / value data,
> timestamp, headers). In fact, it seems like only the source partition and
> source offset are relevant here, so we should either introduce a new
> abstraction or simply use a data structure like a mapping from source
> partitions to source offsets (adds to the above point)?
>
> 3) I'm not sure I fully follow why the heartbeat timer / interval is
> needed? What are the downsides of
> calling "SourceTask::produceHeartbeatRecords" in every execution loop
> (similar to the existing "SourceTask::poll" method)? Is this only to
> prevent the generation of a lot of offset records? Since Connect's offsets
> topics are log compacted (and source partitions are used as keys for each
> source offset), I'm not sure if such concerns are valid and such a
> heartbeat timer / interval mechanism is required?
>
> 4) The first couple of rejected alternatives state that the use of a null
> topic / key / value are preferably avoided - but the current proposal would
> also likely require connectors to use such workarounds (null topic when the
> heartbeat topic is configured at a worker level and always for the key /
> value)?
>
> 5) The third rejected alternative talks about subclassing the
> "SourceRecord" class - this presumably means allowing connectors to pass
> special offset only records via the existing poll mechanism? Why was this
> considered a more invasive option? Was it because of the backward
> compatibility issues that would be introduced for plugins using the new
> public API class that still need to be deployed onto older Connect workers?
>
> Thanks,
> Yash
>
> On Fri, Apr 14, 2023 at 6:45 PM Sagar  wrote:
>
> > One thing I forgot to mention in my previous email was that the reason I
> > chose to include the opt-in behaviour via configs was that the users of
> the
> > connector know their workload patterns. If the workload is such that the
> >  connector would receive regular valid updates then there’s ideally no
> need
> > for moving offsets since it would update automatically.
> >
> > This way they aren’t forced to use this feature and can use it only when
> > the workload is expected to be batchy or not frequent.
> >
> > Thanks!
> > Sagar.
> >
> >
> > On Fri, 14 Apr 2023 at 5:32 PM, Sagar  wrote:
> >
> > > Hi Chris,
> > >
> > > Thanks for following up on the response. Sharing my thoughts further:
> > >
> > > If we want to add support for connectors to emit offsets without
> > >> accompanying source records, we could (and IMO should) do that without
> > >> requiring users to manually enable that feature by adjusting worker or
> > >> connector configurations.
> > >
> > >
> > > With the current KIP design, I have tried to implement this in an
> opt-in
> > > manner via configs. I guess what you are trying to say is that this
> > doesn't
> > > need a config of it's own and instead could be part of the poll ->
> > > transform etc -> produce -> commit cycle. That way, the users don't
> need
> > to
> > > set any config and if the connector supports moving offsets w/o
> producing
> > > SourceRecords, it should happen automatically. Is that correct? If that
> > > is the concern, then I can think of not exposing a config and try to
> make
> > > th

[jira] [Created] (KAFKA-14971) Flaky Test org.apache.kafka.connect.mirror.integration.IdentityReplicationIntegrationTest#testSyncTopicConfigs

2023-05-07 Thread Sagar Rao (Jira)
Sagar Rao created KAFKA-14971:
-

 Summary: Flaky Test 
org.apache.kafka.connect.mirror.integration.IdentityReplicationIntegrationTest#testSyncTopicConfigs
 Key: KAFKA-14971
 URL: https://issues.apache.org/jira/browse/KAFKA-14971
 Project: Kafka
  Issue Type: Bug
Reporter: Sagar Rao


The test testSyncTopicConfigs in ` 
org.apache.kafka.connect.mirror.integration.IdentityReplicationIntegrationTest#testSyncTopicConfigs`
 seems to be flaky. Found here : 
[https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-13594/6/tests]

 

Ran on local against the [same PR  
|https://github.com/apache/kafka/pull/13594]and  it has passed.

 

```
h4. Error
org.opentest4j.AssertionFailedError: `delete.retention.ms` should be 2000, 
because it's explicitly defined on the target topic! ==> expected: <2000> but 
was: <8640>
h4. Stacktrace
org.opentest4j.AssertionFailedError: `delete.retention.ms` should be 2000, 
because it's explicitly defined on the target topic! ==> expected: <2000> but 
was: <8640>
 at 
app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151)
 at 
app//org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132)
 at app//org.junit.jupiter.api.AssertEquals.failNotEqual(AssertEquals.java:197)
 at app//org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:182)
 at app//org.junit.jupiter.api.Assertions.assertEquals(Assertions.java:1153)
 at 
app//org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.lambda$testSyncTopicConfigs$8(MirrorConnectorsIntegrationBaseTest.java:758)
 at 
app//org.apache.kafka.test.TestUtils.lambda$waitForCondition$3(TestUtils.java:325)
 at 
app//org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:373)
 at app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:322)
 at app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:306)
 at app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:296)
 at 
app//org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.testSyncTopicConfigs(MirrorConnectorsIntegrationBaseTest.java:752)
 at 
java.base@17.0.7/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
Method)
 at 
java.base@17.0.7/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
 at 
java.base@17.0.7/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.base@17.0.7/java.lang.reflect.Method.invoke(Method.java:568)
 at 
app//org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:727)
 at 
app//org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
 at 
app//org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
 at 
app//org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:156)
 at 
app//org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:147)
 at 
app//org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:86)
 at 
app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(InterceptingExecutableInvoker.java:103)
 at 
app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.lambda$invoke$0(InterceptingExecutableInvoker.java:93)
 at 
app//org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
 at 
app//org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)
 at 
app//org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)
 at 
app//org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)
 at 
app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:92)
 at 
app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:86)
 at 
app//org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$7(TestMethodTestDescriptor.java:217)
 at 
app//org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
 at 
app//org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:213)
 at 
app//org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:138)
 at 
app//org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTe

[jira] [Created] (KAFKA-14956) Flaky test org.apache.kafka.connect.integration.OffsetsApiIntegrationTest#testGetSinkConnectorOffsetsDifferentKafkaClusterTargeted

2023-05-02 Thread Sagar Rao (Jira)
Sagar Rao created KAFKA-14956:
-

 Summary: Flaky test 
org.apache.kafka.connect.integration.OffsetsApiIntegrationTest#testGetSinkConnectorOffsetsDifferentKafkaClusterTargeted
 Key: KAFKA-14956
 URL: https://issues.apache.org/jira/browse/KAFKA-14956
 Project: Kafka
  Issue Type: Bug
Reporter: Sagar Rao


```
h4. Error
org.opentest4j.AssertionFailedError: Condition not met within timeout 15000. 
Sink connector consumer group offsets should catch up to the topic end offsets 
==> expected:  but was: 
h4. Stacktrace
org.opentest4j.AssertionFailedError: Condition not met within timeout 15000. 
Sink connector consumer group offsets should catch up to the topic end offsets 
==> expected:  but was: 
 at 
app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151)
 at 
app//org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132)
 at app//org.junit.jupiter.api.AssertTrue.failNotTrue(AssertTrue.java:63)
 at app//org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:36)
 at app//org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:211)
 at 
app//org.apache.kafka.test.TestUtils.lambda$waitForCondition$4(TestUtils.java:337)
 at 
app//org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:385)
 at app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:334)
 at app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:318)
 at app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:291)
 at 
app//org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.getAndVerifySinkConnectorOffsets(OffsetsApiIntegrationTest.java:150)
 at 
app//org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.testGetSinkConnectorOffsetsDifferentKafkaClusterTargeted(OffsetsApiIntegrationTest.java:131)
 at 
java.base@17.0.7/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
Method)
 at 
java.base@17.0.7/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
 at 
java.base@17.0.7/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.base@17.0.7/java.lang.reflect.Method.invoke(Method.java:568)
 at 
app//org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
 at 
app//org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
 at 
app//org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
 at 
app//org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
 at 
app//org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
 at 
app//org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
 at app//org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
 at 
app//org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
 at app//org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
 at 
app//org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
 at 
app//org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
 at app//org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
 at app//org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
 at app//org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
 at app//org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
 at app//org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
 at app//org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
 at app//org.junit.runners.ParentRunner.run(ParentRunner.java:413)
 at 
org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:108)
 at 
org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
 at 
org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:40)
 at 
org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:60)
 at 
org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:52)
 at 
java.base@17.0.7/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
Method)
 at 
java.base@17.0.7/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
 at 
java.base@17.0.7/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.base@17.0.7/java.lang.reflect.Method.invoke(Method.java:568)
 at 
org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36)
 at 
org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.j

Re: Test failures

2023-04-26 Thread Sagar
Hi,

FYI the pr which fixes the flaky test for connect is merged to trunk

Sagar.

On Wed, 26 Apr 2023 at 2:45 AM, Manyanda Chitimbo <
manyanda.chiti...@gmail.com> wrote:

> HI Ismail,
>
> There is a PR to fix the failure in
> https://github.com/apache/kafka/pull/13634
>
> On Tue, Apr 25, 2023 at 9:40 PM Ismael Juma  wrote:
>
> > I was looking at the CI builds and I came across the following test
> failure
> > that seems to be clear and consistent:
> >
> > org.mockito.exceptions.verification.TooFewActualInvocations:
> > kafkaBasedLog.send(, , );
> > Wanted 2 times:
> > -> at
> > org.apache.kafka.connect.util.KafkaBasedLog.send(KafkaBasedLog.java:376)
> > But was 1 time:
> > -> at
> >
> org.apache.kafka.connect.storage.KafkaStatusBackingStore.sendTopicStatus(KafkaStatusBackingStore.java:315)
> >
> >
> >
> https://ci-builds.apache.org/job/Kafka/job/kafka/job/trunk/1795/testReport/junit/org.apache.kafka.connect.storage/KafkaStatusBackingStoreFormatTest/Build___JDK_11_and_Scala_2_13___putTopicStateRetriableFailure/
> >
> > :(
> >
> > Ismael
> >
> > On Fri, Apr 21, 2023 at 5:39 PM Sagar  wrote:
> >
> > > Hi Greg,
> > >
> > > The fix for 14929 has already been included as part of this pr :
> > > https://github.com/apache/kafka/pull/13594
> > >
> > > I can create a separate pr just for that flaky test if needed. Let me
> > know
> > > .
> > >
> > > Sagar.
> > >
> > > On Sat, 22 Apr 2023 at 3:20 AM, Greg Harris
>  > >
> > > wrote:
> > >
> > > > Hey all,
> > > >
> > > > We just landed a fix for
> > > https://issues.apache.org/jira/browse/KAFKA-14905
> > > > which was causing all of
> > > > those MirrorConnectorsWithCustomForwardingAdminIntegrationTest
> > failures,
> > > > and we will monitor the build for any re-occurrances.
> > > > Unfortunately we discovered another test flake that was introduced
> > > recently
> > > > but that should have a straightforward resolution:
> > > > https://issues.apache.org/jira/browse/KAFKA-14929
> > > > Thanks Ismael for merging a fix for
> > > > https://issues.apache.org/jira/browse/KAFKA-8115 but it appears that
> > > there
> > > > is still more investigation needed there, as the test is still
> failing
> > > > occasionally.
> > > >
> > > > Thanks,
> > > > Greg
> > > >
> > > > On Fri, Apr 14, 2023 at 12:18 PM Ismael Juma 
> > wrote:
> > > >
> > > > > Thanks Greg! I really appreciate the help.
> > > > >
> > > > > Ismael
> > > > >
> > > > > On Fri, Apr 14, 2023 at 12:08 PM Greg Harris
> > > >  > > > > >
> > > > > wrote:
> > > > >
> > > > > > Hey Ismael,
> > > > > >
> > > > > > We're working to stabilize the Connect/MM2 tests with the
> following
> > > > > issues:
> > > > > >
> > > > > > * https://issues.apache.org/jira/browse/KAFKA-14905 to address
> > > > > > MirrorConectorsWithCustomForwardingAdminIntegrationTest with
> > > tentative
> > > > > open
> > > > > > PR
> > > > > > * https://issues.apache.org/jira/browse/KAFKA-14901 to address
> > > > > > ExactlyOnceSourceIntegrationTest caused by an (apparent) kafka
> bug
> > > > > >
> > > > > > Looking at the other failures in Connect/MM2 for that build in
> > > > > particular,
> > > > > > it appears that most of them include Embedded Kafka not coming
> > > > > up/shutting
> > > > > > down cleanly:
> > > > > > * MirrorConnectorsIntegrationBaseTest
> > > > > > * MirrorConnectorsIntegrationExactlyOnceTest
> > > > > > * MirrorConnectorsIntegrationSSLTest
> > > > > > * ConnectorClientPolicyIntegrationTest
> > > > > > * ConnectorTopicsIntegrationTest
> > > > > > * ExactlyOnceSourceIntegrationTest
> > > > > > * OffsetsApiIntegrationTest
> > > > > > I'll start investigating these failures to learn more.
> > > > > >
> > > > > > I also have a few older flaky test improvements that have not
> been
> > > > > reviewed
> > > > > 

[jira] [Created] (KAFKA-14938) Flaky test org.apache.kafka.connect.integration.ExactlyOnceSourceIntegrationTest#testConnectorBoundary

2023-04-26 Thread Sagar Rao (Jira)
Sagar Rao created KAFKA-14938:
-

 Summary: Flaky test 
org.apache.kafka.connect.integration.ExactlyOnceSourceIntegrationTest#testConnectorBoundary
 Key: KAFKA-14938
 URL: https://issues.apache.org/jira/browse/KAFKA-14938
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Reporter: Sagar Rao


Test seems to be failing with 

```
ava.lang.AssertionError: Not enough records produced by source connector. 
Expected at least: 100 + but got 72
h4. Stacktrace
java.lang.AssertionError: Not enough records produced by source connector. 
Expected at least: 100 + but got 72
 at org.junit.Assert.fail(Assert.java:89)
 at org.junit.Assert.assertTrue(Assert.java:42)
 at 
org.apache.kafka.connect.integration.ExactlyOnceSourceIntegrationTest.testConnectorBoundary(ExactlyOnceSourceIntegrationTest.java:421)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
 at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:498)
 at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
 at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
 at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
 at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
 at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
 at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
 at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
 at 
org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
 at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
 at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
 at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
 at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
 at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
 at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
 at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
 at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
 at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
 at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
 at 
org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:108)
 at 
org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
 at 
org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:40)
 at 
org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:60)
 at 
org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:52)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
 at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:498)
 at 
org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36)
 at 
org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
 at 
org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:33)
 at 
org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:94)
 at com.sun.proxy.$Proxy2.processTestClass(Unknown Source)
 at 
org.gradle.api.internal.tasks.testing.worker.TestWorker$2.run(TestWorker.java:176)
 at 
org.gradle.api.internal.tasks.testing.worker.TestWorker.executeAndMaintainThreadName(TestWorker.java:129)
 at 
org.gradle.api.internal.tasks.testing.worker.TestWorker.execute(TestWorker.java:100)
 at 
org.gradle.api.internal.tasks.testing.worker.TestWorker.execute(TestWorker.java:60)
 at 
org.gradle.process.internal.worker.child.ActionExecutionWorker.execute(ActionExecutionWorker.java:56)
 at 
org.gradle.process.internal.worker.child.SystemApplicationClassLoaderWorker.call(SystemApplicationClassLoaderWorker.java:113)
 at 
org.gradle.process.internal.worker.child.SystemApplicationClassLoaderWorker.call(SystemApplicationClassLoaderWorker.java:65)
 at 
worker.org.gradle.process.internal.worker.GradleWorkerMain.run(GradleWorkerMain.java:69)
```



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: Test failures

2023-04-21 Thread Sagar
Hi Greg,

The fix for 14929 has already been included as part of this pr :
https://github.com/apache/kafka/pull/13594

I can create a separate pr just for that flaky test if needed. Let me know .

Sagar.

On Sat, 22 Apr 2023 at 3:20 AM, Greg Harris 
wrote:

> Hey all,
>
> We just landed a fix for https://issues.apache.org/jira/browse/KAFKA-14905
> which was causing all of
> those MirrorConnectorsWithCustomForwardingAdminIntegrationTest failures,
> and we will monitor the build for any re-occurrances.
> Unfortunately we discovered another test flake that was introduced recently
> but that should have a straightforward resolution:
> https://issues.apache.org/jira/browse/KAFKA-14929
> Thanks Ismael for merging a fix for
> https://issues.apache.org/jira/browse/KAFKA-8115 but it appears that there
> is still more investigation needed there, as the test is still failing
> occasionally.
>
> Thanks,
> Greg
>
> On Fri, Apr 14, 2023 at 12:18 PM Ismael Juma  wrote:
>
> > Thanks Greg! I really appreciate the help.
> >
> > Ismael
> >
> > On Fri, Apr 14, 2023 at 12:08 PM Greg Harris
>  > >
> > wrote:
> >
> > > Hey Ismael,
> > >
> > > We're working to stabilize the Connect/MM2 tests with the following
> > issues:
> > >
> > > * https://issues.apache.org/jira/browse/KAFKA-14905 to address
> > > MirrorConectorsWithCustomForwardingAdminIntegrationTest with tentative
> > open
> > > PR
> > > * https://issues.apache.org/jira/browse/KAFKA-14901 to address
> > > ExactlyOnceSourceIntegrationTest caused by an (apparent) kafka bug
> > >
> > > Looking at the other failures in Connect/MM2 for that build in
> > particular,
> > > it appears that most of them include Embedded Kafka not coming
> > up/shutting
> > > down cleanly:
> > > * MirrorConnectorsIntegrationBaseTest
> > > * MirrorConnectorsIntegrationExactlyOnceTest
> > > * MirrorConnectorsIntegrationSSLTest
> > > * ConnectorClientPolicyIntegrationTest
> > > * ConnectorTopicsIntegrationTest
> > > * ExactlyOnceSourceIntegrationTest
> > > * OffsetsApiIntegrationTest
> > > I'll start investigating these failures to learn more.
> > >
> > > I also have a few older flaky test improvements that have not been
> > reviewed
> > > or merged yet:
> > > * https://issues.apache.org/jira/browse/KAFKA-8115 to
> > > address CoordinatorTest (reappeared in the linked build)
> > > * https://issues.apache.org/jira/browse/KAFKA-14345 to address
> > > (Dynamic)ConnectionQuotaTest
> > >
> > > It also appears that the flakey AuthorizerTest should be addressed by
> > > https://github.com/apache/kafka/pull/13543 which is on trunk now but
> > > wasn't
> > > at the time of the above run.
> > >
> > > Thanks,
> > > Greg
> > >
> > > On Fri, Apr 14, 2023 at 10:25 AM Ismael Juma 
> wrote:
> > >
> > > > Thanks Justine!
> > > >
> > > > Ismael
> > > >
> > > > On Fri, Apr 14, 2023 at 9:53 AM Justine Olshan
> > > > 
> > > > wrote:
> > > >
> > > > > Hey Ismael -- thanks for bringing this up.
> > > > > I've filed https://issues.apache.org/jira/browse/KAFKA-14904 and
> am
> > > > > working
> > > > > on it now.
> > > > >
> > > > > I hope the other tests get fixed soon.
> > > > >
> > > > > On Fri, Apr 14, 2023 at 6:47 AM Ismael Juma 
> > wrote:
> > > > >
> > > > > > Hi team,
> > > > > >
> > > > > > It looks like there are a lot of test failures in the master
> > branch.
> > > I
> > > > > > don't know which commits introduced them, but can you please
> check
> > if
> > > > > > commit(s) you merged or contributed are the reason and fix it
> asap?
> > > If
> > > > > it's
> > > > > > easy to fix the tests, let's do that - otherwise we should revert
> > the
> > > > > > faulty commit. And let's please be more careful going forward
> when
> > it
> > > > > comes
> > > > > > to the PRs we merge.
> > > > > >
> > > > > > An example from one of the builds, but there are many like this:
> > > > > >
> > > > > > Build / JDK 17 and Scala 2.13 /
> > > > > > kafka.api.TransactionsBounceTest.testWithGroupMetadata()
&

[jira] [Created] (KAFKA-14913) Migrate DistributedHerder Executor shutdown to use ThreadUtils#shutdownExecutorServiceQuietly

2023-04-17 Thread Sagar Rao (Jira)
Sagar Rao created KAFKA-14913:
-

 Summary: Migrate DistributedHerder Executor shutdown to use 
ThreadUtils#shutdownExecutorServiceQuietly
 Key: KAFKA-14913
 URL: https://issues.apache.org/jira/browse/KAFKA-14913
 Project: Kafka
  Issue Type: Improvement
Reporter: Sagar Rao






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] KIP-910: Update Source offsets for Source Connectors without producing records

2023-04-14 Thread Sagar
One thing I forgot to mention in my previous email was that the reason I
chose to include the opt-in behaviour via configs was that the users of the
connector know their workload patterns. If the workload is such that the
 connector would receive regular valid updates then there’s ideally no need
for moving offsets since it would update automatically.

This way they aren’t forced to use this feature and can use it only when
the workload is expected to be batchy or not frequent.

Thanks!
Sagar.


On Fri, 14 Apr 2023 at 5:32 PM, Sagar  wrote:

> Hi Chris,
>
> Thanks for following up on the response. Sharing my thoughts further:
>
> If we want to add support for connectors to emit offsets without
>> accompanying source records, we could (and IMO should) do that without
>> requiring users to manually enable that feature by adjusting worker or
>> connector configurations.
>
>
> With the current KIP design, I have tried to implement this in an opt-in
> manner via configs. I guess what you are trying to say is that this doesn't
> need a config of it's own and instead could be part of the poll ->
> transform etc -> produce -> commit cycle. That way, the users don't need to
> set any config and if the connector supports moving offsets w/o producing
> SourceRecords, it should happen automatically. Is that correct? If that
> is the concern, then I can think of not exposing a config and try to make
> this process automatically. That should ease the load on connector users,
> but your point about cognitive load on Connector developers, I am still not
> sure how to address that. The offsets are privy to a connector and the
> framework at best can provide hooks to the tasks to update their offsets.
> Connector developers would still have to consider all cases before updating
> offsets.  And if I ignore the heartbeat topic and heartbeat interval ms
> configs, then what the KIP proposes currently isn't much different in that
> regard. Just that it produces a List of SourceRecord which can be changed
> to a Map of SourcePartition and their offsets if you think that would
> simplify things. Are there other cases in your mind which need addressing?
>
> Here's my take on the usecases:
>
>1. Regarding the example about SMTs with Object Storage based
>connectors, it was one of the scenarios identified. We have some connectors
>that rely on the offsets topic to check if the next batch of files should
>be processed and because of filtering of the last record from the files,
>the eof supposedly is  never reached and the connector can't commit offsets
>for that source partition(file). If there was a mechanism to update offsets
>for such a source file, then with some moderately complex state tracking,
>the connector can mark that file as processed and proceed.
>2. There's another use case with the same class of connectors where if
>a file is malformed, then the connector couldn't produce any offsets
>because the file couldn't get processed completely. To handle such cases,
>the connector developers have introduced a dev/null sort of topic where
>they produce a record to this corrupted file topic and move the offset
>somehow. This topic ideally isn't needed and with a mechanism to update
>offsets would have helped in this case as well.
>3. Coming to CDC based connectors,
>   1. We had a similar issue with Oracle CDC source connector and
>   needed to employ the same heartbeat mechanism to get around it.
>   2. MongoDB CDC source Connector  has employed the same heartbeat
>   mechanism Check `heartbeat.interval.ms` here (
>   
> https://www.mongodb.com/docs/kafka-connector/current/source-connector/configuration-properties/error-handling/
>   ).
>   3. Another CDC connector for ScyllaDB employs a similar mechanism.
>   
> https://github.com/scylladb/scylla-cdc-source-connector/search?q=heartbeat
>   4. For CDC based connectors, you could argue that these connectors
>   have been able to solve this error then why do we need framework level
>   support. But the point I am trying to make is that this limitation from 
> the
>   framework is forcing CDC connector developers to implement per-connector
>   solutions/hacks(at times). And there could always be more CDC 
> connectors in
>   the pipeline forcing them to take a similar route as well.
>4. There's also a case at times with CDC source connectors which are
>REST Api / Web Service based(Zendesk Source Connector for example) . These
>connectors typically use timestamps from the responses as offsets. If
>there's a long period of inactivity wherein the API invocations don't
>return any data, then the offsets won't move and 

Re: [DISCUSS] KIP-910: Update Source offsets for Source Connectors without producing records

2023-04-14 Thread Sagar
mitted.


Let me know if we need a separate JIRA to track this? This somehow didn't
look related to this discussion.

Thanks!
Sagar.


On Wed, Apr 12, 2023 at 9:34 PM Chris Egerton 
wrote:

> Hi Sagar,
>
> I'm sorry, I'm still not convinced that this design solves the problem(s)
> it sets out to solve in the best way possible. I tried to highlight this in
> my last email:
>
> > In general, it seems like we're trying to solve two completely different
> problems with this single KIP: adding framework-level support for emitting
> heartbeat records for source connectors, and allowing source connectors to
> emit offsets without also emitting source records. I don't mind addressing
> the two at the same time if the result is elegant and doesn't compromise on
> the solution for either problem, but that doesn't seem to be the case here.
> Of the two problems, could we describe one as the primary and one as the
> secondary? If so, we might consider dropping the secondary problem from
> this KIP and addressing it separately.
>
> If we wanted to add support for heartbeat records, we could (and IMO
> should) do that without requiring connectors to implement any new methods
> and only require adjustments to worker or connector configurations by users
> in order to enable that feature.
>
> If we want to add support for connectors to emit offsets without
> accompanying source records, we could (and IMO should) do that without
> requiring users to manually enable that feature by adjusting worker or
> connector configurations.
>
>
> I'm also not sure that it's worth preserving the current behavior that
> offsets for records that have been filtered out via SMT are not committed.
> I can't think of a case where this would be useful and there are obviously
> plenty where it isn't. There's also a slight discrepancy in how these kinds
> of records are treated by the Connect runtime now; if a record is dropped
> because of an SMT, then its offset isn't committed, but if it's dropped
> because exactly-once support is enabled and the connector chose to abort
> the batch containing the record, then its offset is still committed. After
> thinking carefully about the aborted transaction behavior, we realized that
> it was fine to commit the offsets for those records, and I believe that the
> same logic can be applied to any record that we're done trying to send to
> Kafka (regardless of whether it was sent correctly, dropped due to producer
> error, filtered via SMT, etc.).
>
> I also find the file-based source connector example a little confusing.
> What about that kind of connector causes the offset for the last record of
> a file to be treated differently? Is there anything different about
> filtering that record via SMT vs. dropping it altogether because of an
> asynchronous producer error with "errors.tolerance" set to "all"? And
> finally, how would such a connector use the design proposed here?
>
> Finally, I don't disagree that if there are other legitimate use cases that
> would be helped by addressing KAFKA-3821, we should try to solve that issue
> in the Kafka Connect framework instead of requiring individual connectors
> to implement their own solutions. But the cognitive load added by the
> design proposed here, for connector developers and Connect cluster
> administrators alike, costs too much to justify by pointing to an
> already-solved problem encountered by a single group of connectors (i.e.,
> Debezium). This is why I think it's crucial that we identify realistic
> cases where this feature would actually be useful, and right now, I don't
> think any have been provided (at least, not ones that have already been
> addressed or could be addressed with much simpler changes).
>
> Cheers,
>
> Chris
>
> On Tue, Apr 11, 2023 at 7:30 AM Sagar  wrote:
>
> > Hi Chris,
> >
> > Thanks for your detailed feedback!
> >
> > nits: I have taken care of them now. Thanks for pointing those out.
> >
> > non-nits:
> >
> > 6) It seems (based on both the KIP and discussion on KAFKA-3821) that the
> > > only use case for being able to emit offsets without also emitting
> source
> > > records that's been identified so far is for CDC source connectors like
> > > Debezium.
> >
> >
> > I am aware of atleast one more case where the non production of offsets
> > (due to non production of records ) leads to the failure of connectors
> when
> > the source purges the records of interest. This happens in File based
> > source connectors  (like s3/blob storage ) in which if the last record
> from
> > a file is fiterterd due to an SMT, then that particular file is never
> > committed to the source partit

Re: [DISCUSS] KIP-910: Update Source offsets for Source Connectors without producing records

2023-04-11 Thread Sagar
Hi Chris,

Thanks for your detailed feedback!

nits: I have taken care of them now. Thanks for pointing those out.

non-nits:

6) It seems (based on both the KIP and discussion on KAFKA-3821) that the
> only use case for being able to emit offsets without also emitting source
> records that's been identified so far is for CDC source connectors like
> Debezium.


I am aware of atleast one more case where the non production of offsets
(due to non production of records ) leads to the failure of connectors when
the source purges the records of interest. This happens in File based
source connectors  (like s3/blob storage ) in which if the last record from
a file is fiterterd due to an SMT, then that particular file is never
committed to the source partition and eventually when the file is deleted
from the source and the connector is restarted due to some reason, it fails.
Moreover, I feel the reason this support should be there in the Kafka
Connect framework is because this is a restriction of the framework and
today the framework provides no support for getting around this limitation.
Every connector has it's own way of handling offsets and having each
connector handle this restriction in its own way can make it complex.
Whether we choose to do it the way this KIP prescribes or any other way is
up for debate but IMHO, the framework should provide a way of
getting around this limitation.

7. If a task produces heartbeat records and source records that use the
> same source partition, which offset will ultimately be committed?


The idea is to add the records returned by the `produceHeartbeatRecords`
to  the same `toSend` list within `AbstractWorkerSourceTask#execute`. The
`produceHeartbeatRecords` would be invoked before we make the `poll` call.
Hence, the offsets committed would be in the same order in which they would
be written. Note that, the onus is on the Connector implementation to not
return records which can lead to data loss or data going out of order. The
framework would just commit based on whatever is supplied. Also, AFAIK, 2
`normal` source records can also produce the same source partitions and
they are committed in the order in which they are written.

8. The SourceTask::produceHeartbeatRecords method returns a
> List, and users can control the heartbeat topic for a
> connector via the (connector- or worker-level) "heartbeat.records.topic"
> property. Since every constructor for the SourceRecord class [2] requires a
> topic to be supplied, what will happen to that topic? Will it be ignored?
> If so, I think we should look for a cleaner solution.


Sorry, I couldn't quite follow which topic will be ignored in this case.

9. A large concern raised in the discussion for KAFKA-3821 was the allowing
> connectors to control the ordering of these special "offsets-only"
> emissions and the regular source records returned from SourceTask::poll.
> Are we choosing to ignore that concern? If so, can you add this to the
> rejected alternatives section along with a rationale?


One thing to note is that the for every connector, the condition to emit
the heartbeat record is totally up to the connector, For example, for a
connector which is tracking transactions for an ordered log, if there are
open transactions, it might not need to emit heartbeat records when the
timer expires while for file based connectors, if the same file is being
processed again and again due to an SMT or some other reasons, then it can
choose to emit that partition. The uber point here is that every connector
has it's own requirements and the framework can't really make an assumption
about it. What the KIP is trying to do is to provide a mechanism to the
connector to commit new offsets. With this approach, as far as I can think
so far, there doesn't seem to be a case of out of order processing. If you
have other concerns/thoughts I would be happy to know them.

10. If, sometime in the future, we wanted to add framework-level support
> for sending heartbeat records that doesn't require connectors to implement
> any new APIs...


The main purpose of producing heartbeat records is to be able to emit
offsets w/o any new records. We are using heartbeat records to solve the
primary concern of offsets getting stalled. The reason to do that was once
we get SourceRecords, then the rest of the code is already in place to
write it to a topic of interest and commit offsets and that seemed the most
non invasive in terms of framework level changes. If in the future we want
to do a framework-only heartbeat record support, then this would create
confusion as you pointed out. Do you think the choice of the name heartbeat
records is creating confusion in this case? Maybe we can call these special
records something else (not sure what at this point) which would then
decouple the 2 logically and implementation wise as well?

Thanks!
Sagar.

On Tue, Mar 28, 2023 at 8:28 PM Chris Egerton 

[jira] [Resolved] (KAFKA-14586) Move StreamsResetter to tools

2023-04-04 Thread Sagar Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14586?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sagar Rao resolved KAFKA-14586.
---
Resolution: Fixed

> Move StreamsResetter to tools
> -
>
> Key: KAFKA-14586
> URL: https://issues.apache.org/jira/browse/KAFKA-14586
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Mickael Maison
>Assignee: Sagar Rao
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14851) Move StreamResetterTest to tools module

2023-03-27 Thread Sagar Rao (Jira)
Sagar Rao created KAFKA-14851:
-

 Summary: Move StreamResetterTest to tools module
 Key: KAFKA-14851
 URL: https://issues.apache.org/jira/browse/KAFKA-14851
 Project: Kafka
  Issue Type: Sub-task
Reporter: Sagar Rao


This came up as a suggestion here: 
[https://github.com/apache/kafka/pull/13127#issuecomment-1433155607] .



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] KIP-910: Update Source offsets for Source Connectors without producing records

2023-03-25 Thread Sagar
Hi John,

Thanks for taking. look at the KIP!

The point about stream time not advancing in case of infrequent updates is
an interesting one. I can imagine if the upstream producer to a Kafka
Streams application is a Source Connector which isn't sending records
frequently(due to the nature of the data ingestion for example), then the
downstream stream processing can land into the issues you described above.

Which also brings me to the second point you made about how this would be
used by downstream consumers. IIUC, you are referring to the consumers of
the newly added topic i.e the heartbeat topic. In my mind, the heartbeat
topic is an internal topic (similar to offsets/config/status topic in
connect), the main purpose of which is to trick the framework to produce
records to the offsets topic and advance the offsets. Since every connector
could have a different definition of offsets(LSN, BinLogID etc for
example), that logic to determine what the heartbeat records should be
would have to reside in the actual connector.

Now that I think of it, it could very well be consumed by downstream
consumers/ Streams or Flink Applications and be further used for some
decision making. A very crude example could be let's say if the heartbeat
records sent to the new heartbeat topic include timestamps, then the
downstream streams application can use that timestamp to close any time
windows. Having said that, it still appears to me that it's outside the
scope of the Connect framework and is something which is difficult to
generalise because of the variety of Sources and the definitions of offsets.

But, I would still be more than happy to add this example if you think it
can be useful in getting a better understanding of the idea and also its
utility beyond connect. Please let me know!

Thanks!
Sagar.


On Fri, Mar 24, 2023 at 7:22 PM John Roesler  wrote:

> Thanks for the KIP, Sagar!
>
> At first glance, this seems like a very useful feature.
>
> A common pain point in Streams is when upstream producers don't send
> regular updates and stream time cannot advance. This causes
> stream-time-driven operations to appear to hang, like time windows not
> closing, suppressions not firing, etc.
>
> From your KIP, I have a good idea of how the feature would be integrated
> into connect, and it sounds good to me. I don't quite see how downstream
> clients, such as a downstream Streams or Flink application, or users of the
> Consumer would make use of this feature. Could you add some examples of
> that nature?
>
> Thank you,
> -John
>
> On Fri, Mar 24, 2023, at 05:23, Sagar wrote:
> > Hi All,
> >
> > Bumping the thread again.
> >
> > Sagar.
> >
> >
> > On Fri, Mar 10, 2023 at 4:42 PM Sagar  wrote:
> >
> >> Hi All,
> >>
> >> Bumping this discussion thread again.
> >>
> >> Thanks!
> >> Sagar.
> >>
> >> On Thu, Mar 2, 2023 at 3:44 PM Sagar  wrote:
> >>
> >>> Hi All,
> >>>
> >>> I wanted to create a discussion thread for KIP-910:
> >>>
> >>>
> >>>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-910%3A+Update+Source+offsets+for+Source+Connectors+without+producing+records
> >>>
> >>> Thanks!
> >>> Sagar.
> >>>
> >>
>


Re: [DISCUSS] KIP-910: Update Source offsets for Source Connectors without producing records

2023-03-24 Thread Sagar
Hi All,

Bumping the thread again.

Sagar.


On Fri, Mar 10, 2023 at 4:42 PM Sagar  wrote:

> Hi All,
>
> Bumping this discussion thread again.
>
> Thanks!
> Sagar.
>
> On Thu, Mar 2, 2023 at 3:44 PM Sagar  wrote:
>
>> Hi All,
>>
>> I wanted to create a discussion thread for KIP-910:
>>
>>
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-910%3A+Update+Source+offsets+for+Source+Connectors+without+producing+records
>>
>> Thanks!
>> Sagar.
>>
>


Re: [DISCUSS] KIP-910: Update Source offsets for Source Connectors without producing records

2023-03-10 Thread Sagar
Hi All,

Bumping this discussion thread again.

Thanks!
Sagar.

On Thu, Mar 2, 2023 at 3:44 PM Sagar  wrote:

> Hi All,
>
> I wanted to create a discussion thread for KIP-910:
>
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-910%3A+Update+Source+offsets+for+Source+Connectors+without+producing+records
>
> Thanks!
> Sagar.
>


Re: [ANNOUNCE] New Kafka PMC Member: Chris Egerton

2023-03-10 Thread Sagar
Congrats Chris! Absolutely well deserved!

Sagar.

On Fri, Mar 10, 2023 at 1:42 PM Tom Bentley  wrote:

> Congratulations!
>
> On Fri, 10 Mar 2023 at 03:35, John Roesler  wrote:
>
> > Congratulations, Chris!
> > -John
> >
> > On Thu, Mar 9, 2023, at 20:02, Luke Chen wrote:
> > > Congratulations, Chris!
> > >
> > > On Fri, Mar 10, 2023 at 9:57 AM Yash Mayya 
> wrote:
> > >
> > >> Congratulations Chris!
> > >>
> > >> On Thu, Mar 9, 2023, 23:42 Jun Rao  wrote:
> > >>
> > >> > Hi, Everyone,
> > >> >
> > >> > Chris Egerton has been a Kafka committer since July 2022. He has
> been
> > >> very
> > >> > instrumental to the community since becoming a committer. It's my
> > >> pleasure
> > >> > to announce that Chris is now a member of Kafka PMC.
> > >> >
> > >> > Congratulations Chris!
> > >> >
> > >> > Jun
> > >> > on behalf of Apache Kafka PMC
> > >> >
> > >>
> >
> >
>


[DISCUSS] KIP-910: Update Source offsets for Source Connectors without producing records

2023-03-02 Thread Sagar
Hi All,

I wanted to create a discussion thread for KIP-910:

https://cwiki.apache.org/confluence/display/KAFKA/KIP-910%3A+Update+Source+offsets+for+Source+Connectors+without+producing+records

Thanks!
Sagar.


[jira] [Created] (KAFKA-14734) Use CommandDefaultOptions in StreamsResetter

2023-02-20 Thread Sagar Rao (Jira)
Sagar Rao created KAFKA-14734:
-

 Summary: Use CommandDefaultOptions in StreamsResetter 
 Key: KAFKA-14734
 URL: https://issues.apache.org/jira/browse/KAFKA-14734
 Project: Kafka
  Issue Type: Sub-task
Reporter: Sagar Rao


This came up as a suggestion here: 
[https://github.com/apache/kafka/pull/13127#issuecomment-1433155607] .



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14647) Move TopicFilter shared class

2023-01-23 Thread Sagar Rao (Jira)
Sagar Rao created KAFKA-14647:
-

 Summary: Move TopicFilter shared class
 Key: KAFKA-14647
 URL: https://issues.apache.org/jira/browse/KAFKA-14647
 Project: Kafka
  Issue Type: Sub-task
Reporter: Sagar Rao
Assignee: Federico Valeri






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-10652) Raft leader should flush accumulated writes after a min size is reached

2023-01-21 Thread Sagar Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10652?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sagar Rao resolved KAFKA-10652.
---
Resolution: Won't Fix

Not sure this is needed anymore.

> Raft leader should flush accumulated writes after a min size is reached
> ---
>
> Key: KAFKA-10652
> URL: https://issues.apache.org/jira/browse/KAFKA-10652
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Jason Gustafson
>Assignee: Sagar Rao
>Priority: Major
>
> In KAFKA-10601, we implemented linger semantics similar to the producer to 
> let the leader accumulate a batch of writes before fsyncing them to disk. 
> Currently the fsync is only based on the linger time, but it would be helpful 
> to make it size-based as well. In other words, if we accumulate a 
> configurable N bytes, then we should not wait for linger expiration and 
> should just fsync immediately.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-13296) Verify old assignment within StreamsPartitionAssignor

2023-01-21 Thread Sagar Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13296?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sagar Rao resolved KAFKA-13296.
---
Resolution: Fixed

> Verify old assignment within StreamsPartitionAssignor
> -
>
> Key: KAFKA-13296
> URL: https://issues.apache.org/jira/browse/KAFKA-13296
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Sagar Rao
>Priority: Major
>
> `StreamsPartitionAssignor` is responsible to assign partitions and tasks to 
> all StreamsThreads within an application.
> While it ensures to not assign a single partition/task to two threads, there 
> is limited verification about it. In particular, we had one incident for with 
> a zombie thread/consumer did not cleanup its own internal state correctly due 
> to KAFKA-12983. This unclean zombie-state implied that the _old assignment_ 
> reported to `StreamsPartitionAssignor` contained a single partition for two 
> consumers. As a result, both threads/consumers later revoked the same 
> partition and the zombie-thread could commit it's unclean work (even if it 
> should have been fenced), leading to duplicate output under EOS_v2.
> We should consider to add a check to `StreamsPartitionAssignor` if the _old 
> assignment_ is valid, ie, no partition should be missing and no partition 
> should be assigned to two consumers. For this case, we should log the invalid 
> _old assignment_ and send an error code back to all consumer that indicates 
> that they should shut down "unclean" (ie, without and flushing and no 
> committing any offsets or transactions).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [ANNOUNCE] New committer: Walker Carlson

2023-01-17 Thread Sagar
Congratulations Walker!

Thanks!
Sagar.

On Wed, Jan 18, 2023 at 9:32 AM Tom Bentley  wrote:

> Congratulations!
>
> On Wed, 18 Jan 2023 at 01:26, John Roesler  wrote:
>
> > Congratulations, Walker!
> > -John
> >
> > On Tue, Jan 17, 2023, at 18:50, Guozhang Wang wrote:
> > > Congrats, Walker!
> > >
> > > On Tue, Jan 17, 2023 at 2:20 PM Chris Egerton  >
> > > wrote:
> > >
> > >> Congrats, Walker!
> > >>
> > >> On Tue, Jan 17, 2023, 17:07 Bill Bejeck 
> > wrote:
> > >>
> > >> > Congratulations, Walker!
> > >> >
> > >> > -Bill
> > >> >
> > >> > On Tue, Jan 17, 2023 at 4:57 PM Matthias J. Sax 
> > >> wrote:
> > >> >
> > >> > > Dear community,
> > >> > >
> > >> > > I am pleased to announce Walker Carlson as a new Kafka committer.
> > >> > >
> > >> > > Walker has been contributing to Apache Kafka since November 2019.
> He
> > >> > > made various contributions including the following KIPs.
> > >> > >
> > >> > > KIP-671: Introduce Kafka Streams Specific Uncaught Exception
> Handler
> > >> > > KIP-696: Update Streams FSM to clarify ERROR state meaning
> > >> > > KIP-715: Expose Committed offset in streams
> > >> > >
> > >> > >
> > >> > > Congratulations Walker and welcome on board!
> > >> > >
> > >> > >
> > >> > > Thanks,
> > >> > >-Matthias (on behalf of the Apache Kafka PMC)
> > >> > >
> > >> >
> > >>
> >
> >
>


Re: [VOTE] KIP-837 Allow MultiCasting a Result Record.

2022-12-21 Thread Sagar
Hi All,

Just as an update, the changes described here:

```
Changes at a high level are:

1) KeyQueryMetada enhanced to have a new method called partitions().
2) Lifting the restriction of a single partition for IQ. Now the
restriction holds only for FK Join.
```

are reverted back. As things stand,  KeyQueryMetada exposes only the
partition() method and the restriction for single partition is added back
for IQ. This has been done based on the points raised by Matthias above.

The KIP has been updated accordingly.

Thanks!
Sagar.

On Sat, Dec 10, 2022 at 12:09 AM Sagar  wrote:

> Hey Matthias,
>
> Actually I had shared the PR link for any potential issues that might have
> gone missing. I guess it didn't come out that way in my response. Apologies
> for that!
>
> I am more than happy to incorporate any feedback/changes or address any
> concerns that are still present around this at this point as well.
>
> And I would keep in mind the feedback to provide more time in such a
> scenario.
>
> Thanks!
> Sagar.
>
> On Fri, Dec 9, 2022 at 11:41 PM Matthias J. Sax  wrote:
>
>> It is what it is.
>>
>> > we did have internal discussions on this
>>
>> We sometimes have the case that a KIP need adjustment as stuff is
>> discovered during coding. And having a discussion on the PR about it is
>> fine. -- However, before the PR gets merge, the KIP change should be
>> announced to verify that nobody has objections to he change, before we
>> carry forward.
>>
>> It's up to the committer who reviews/merges the PR to ensure that this
>> process is followed IMHO. I hope we can do better next time.
>>
>> (And yes, there was the 3.4 release KIP deadline that might explain it,
>> but it seems important that we give enough time is make "tricky" changes
>> and not rush into stuff IMHO.)
>>
>>
>> -Matthias
>>
>>
>> On 12/8/22 7:04 PM, Sagar wrote:
>> > Thanks Matthias,
>> >
>> > Well, as things stand, we did have internal discussions on this and it
>> > seemed ok to open it up for IQ and more importantly not ok to have it
>> > opened up for FK-Join. And more importantly, the PR for this is already
>> > merged and some of these things came up during that. Here's the PR link:
>> > https://github.com/apache/kafka/pull/12803.
>> >
>> > Thanks!
>> > Sagar.
>> >
>> >
>> > On Fri, Dec 9, 2022 at 5:15 AM Matthias J. Sax 
>> wrote:
>> >
>> >> Ah. Missed it as it does not have a nice "code block" similar to
>> >> `StreamPartitioner` changes.
>> >>
>> >> I understand the motivation, but I am wondering if we might head into a
>> >> tricky direction? State stores (at least the built-in ones) and IQ are
>> >> kinda build with the idea to have sharded data and that a multi-cast of
>> >> keys is an anti-pattern?
>> >>
>> >> Maybe it's fine, but I also don't want to open Pandora's Box. Are we
>> >> sure that generalizing the concepts does not cause issues in the
>> future?
>> >>
>> >> Ie, should we claim that the multi-cast feature should be used for
>> >> KStreams only, but not for KTables?
>> >>
>> >> Just want to double check that we are not doing something we regret
>> later.
>> >>
>> >>
>> >> -Matthias
>> >>
>> >>
>> >> On 12/7/22 6:45 PM, Sagar wrote:
>> >>> Hi Mathias,
>> >>>
>> >>> I did save it. The changes are added under Public Interfaces (Pt#2
>> about
>> >>> enhancing KeyQueryMetadata with partitions method) and
>> >>> throwing IllegalArgumentException when StreamPartitioner#partitions
>> >> method
>> >>> returns multiple partitions for just FK-join instead of the earlier
>> >> decided
>> >>> FK-Join and IQ.
>> >>>
>> >>> The background is that for IQ, if the users have multi casted records
>> to
>> >>> multiple partitions during ingestion but the fetch returns only a
>> single
>> >>> partition, then it would be wrong. That's why the restriction was
>> lifted
>> >>> for IQ and that's the reason KeyQueryMetadata now has another
>> >> partitions()
>> >>> method to signify the same.
>> >>>
>> >>> FK-Join also has a similar case, but while reviewing it was felt that
>> >>> FK-Join on it's own is fairly complicated and w

Re: [VOTE] KIP-889 Versioned State Stores

2022-12-20 Thread Sagar
Hi Victoria,

+1 (non-binding).

Thanks!
Sagar.

On Tue, Dec 20, 2022 at 1:39 PM Bruno Cadonna  wrote:

> Hi Victoria,
>
> Thanks for the KIP!
>
> +1 (binding)
>
> Best,
> Bruno
>
> On 19.12.22 20:03, Matthias J. Sax wrote:
> > +1 (binding)
> >
> > On 12/15/22 1:27 PM, John Roesler wrote:
> >> Thanks for the thorough KIP, Victoria!
> >>
> >> I'm +1 (binding)
> >>
> >> -John
> >>
> >> On 2022/12/15 19:56:21 Victoria Xia wrote:
> >>> Hi all,
> >>>
> >>> I'd like to start a vote on KIP-889 for introducing versioned key-value
> >>> state stores to Kafka Streams:
> >>>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-889%3A+Versioned+State+Stores
> >>>
> >>> The discussion thread has been open for a few weeks now and has
> >>> converged
> >>> among the current participants.
> >>>
> >>> Thanks,
> >>> Victoria
> >>>
>


Re: [ANNOUNCE] New Kafka PMC Member: Luke Chen

2022-12-17 Thread Sagar
Congratulations Luke! Very well deserved!

Sagar.

On Sun, 18 Dec 2022 at 6:41 AM, Sam Barker  wrote:

> Congratulations Luke!
>
> On Sat, 17 Dec 2022 at 08:41, Jun Rao  wrote:
>
> > Hi, Everyone,
> >
> > Luke Chen has been a Kafka committer since Feb. 9, 2022. He has been very
> > instrumental to the community since becoming a committer. It's my
> pleasure
> > to announce that Luke  is now a member of Kafka PMC.
> >
> > Congratulations Luke!
> >
> > Jun
> > on behalf of Apache Kafka PMC
> >
>


[jira] [Created] (KAFKA-14461) StoreQueryIntegrationTest#shouldQuerySpecificActivePartitionStores logic to check for active partitions seems brittle.

2022-12-11 Thread Sagar Rao (Jira)
Sagar Rao created KAFKA-14461:
-

 Summary: 
StoreQueryIntegrationTest#shouldQuerySpecificActivePartitionStores logic to 
check for active partitions seems brittle.
 Key: KAFKA-14461
 URL: https://issues.apache.org/jira/browse/KAFKA-14461
 Project: Kafka
  Issue Type: Bug
Reporter: Sagar Rao
Assignee: Sagar Rao


Newly added test 
KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest#shouldThrowIllegalArgumentExceptionWhenCustomPartionerReturnsMultiplePartitions
 as part of KIP-837 passes when run individually but fails when is part of IT 
class and hence is marked as Ignored. 

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [VOTE] KIP-837 Allow MultiCasting a Result Record.

2022-12-09 Thread Sagar
Hey Matthias,

Actually I had shared the PR link for any potential issues that might have
gone missing. I guess it didn't come out that way in my response. Apologies
for that!

I am more than happy to incorporate any feedback/changes or address any
concerns that are still present around this at this point as well.

And I would keep in mind the feedback to provide more time in such a
scenario.

Thanks!
Sagar.

On Fri, Dec 9, 2022 at 11:41 PM Matthias J. Sax  wrote:

> It is what it is.
>
> > we did have internal discussions on this
>
> We sometimes have the case that a KIP need adjustment as stuff is
> discovered during coding. And having a discussion on the PR about it is
> fine. -- However, before the PR gets merge, the KIP change should be
> announced to verify that nobody has objections to he change, before we
> carry forward.
>
> It's up to the committer who reviews/merges the PR to ensure that this
> process is followed IMHO. I hope we can do better next time.
>
> (And yes, there was the 3.4 release KIP deadline that might explain it,
> but it seems important that we give enough time is make "tricky" changes
> and not rush into stuff IMHO.)
>
>
> -Matthias
>
>
> On 12/8/22 7:04 PM, Sagar wrote:
> > Thanks Matthias,
> >
> > Well, as things stand, we did have internal discussions on this and it
> > seemed ok to open it up for IQ and more importantly not ok to have it
> > opened up for FK-Join. And more importantly, the PR for this is already
> > merged and some of these things came up during that. Here's the PR link:
> > https://github.com/apache/kafka/pull/12803.
> >
> > Thanks!
> > Sagar.
> >
> >
> > On Fri, Dec 9, 2022 at 5:15 AM Matthias J. Sax  wrote:
> >
> >> Ah. Missed it as it does not have a nice "code block" similar to
> >> `StreamPartitioner` changes.
> >>
> >> I understand the motivation, but I am wondering if we might head into a
> >> tricky direction? State stores (at least the built-in ones) and IQ are
> >> kinda build with the idea to have sharded data and that a multi-cast of
> >> keys is an anti-pattern?
> >>
> >> Maybe it's fine, but I also don't want to open Pandora's Box. Are we
> >> sure that generalizing the concepts does not cause issues in the future?
> >>
> >> Ie, should we claim that the multi-cast feature should be used for
> >> KStreams only, but not for KTables?
> >>
> >> Just want to double check that we are not doing something we regret
> later.
> >>
> >>
> >> -Matthias
> >>
> >>
> >> On 12/7/22 6:45 PM, Sagar wrote:
> >>> Hi Mathias,
> >>>
> >>> I did save it. The changes are added under Public Interfaces (Pt#2
> about
> >>> enhancing KeyQueryMetadata with partitions method) and
> >>> throwing IllegalArgumentException when StreamPartitioner#partitions
> >> method
> >>> returns multiple partitions for just FK-join instead of the earlier
> >> decided
> >>> FK-Join and IQ.
> >>>
> >>> The background is that for IQ, if the users have multi casted records
> to
> >>> multiple partitions during ingestion but the fetch returns only a
> single
> >>> partition, then it would be wrong. That's why the restriction was
> lifted
> >>> for IQ and that's the reason KeyQueryMetadata now has another
> >> partitions()
> >>> method to signify the same.
> >>>
> >>> FK-Join also has a similar case, but while reviewing it was felt that
> >>> FK-Join on it's own is fairly complicated and we don't need this
> feature
> >>> right away so the restriction still exists.
> >>>
> >>> Thanks!
> >>> Sagar.
> >>>
> >>>
> >>> On Wed, Dec 7, 2022 at 9:42 PM Matthias J. Sax 
> wrote:
> >>>
> >>>> I don't see any update on the wiki about it. Did you forget to hit
> >> "save"?
> >>>>
> >>>> Can you also provide some background? I am not sure right now if I
> >>>> understand the proposed changes?
> >>>>
> >>>>
> >>>> -Matthias
> >>>>
> >>>> On 12/6/22 6:36 PM, Sophie Blee-Goldman wrote:
> >>>>> Thanks Sagar, this makes sense to me -- we clearly need additional
> >>>> changes
> >>>>> to
> >>>>> avoid breaking IQ when using this feature, but I agree with
> continuing
> >> to

Re: [VOTE] KIP-837 Allow MultiCasting a Result Record.

2022-12-08 Thread Sagar
Thanks Matthias,

Well, as things stand, we did have internal discussions on this and it
seemed ok to open it up for IQ and more importantly not ok to have it
opened up for FK-Join. And more importantly, the PR for this is already
merged and some of these things came up during that. Here's the PR link:
https://github.com/apache/kafka/pull/12803.

Thanks!
Sagar.


On Fri, Dec 9, 2022 at 5:15 AM Matthias J. Sax  wrote:

> Ah. Missed it as it does not have a nice "code block" similar to
> `StreamPartitioner` changes.
>
> I understand the motivation, but I am wondering if we might head into a
> tricky direction? State stores (at least the built-in ones) and IQ are
> kinda build with the idea to have sharded data and that a multi-cast of
> keys is an anti-pattern?
>
> Maybe it's fine, but I also don't want to open Pandora's Box. Are we
> sure that generalizing the concepts does not cause issues in the future?
>
> Ie, should we claim that the multi-cast feature should be used for
> KStreams only, but not for KTables?
>
> Just want to double check that we are not doing something we regret later.
>
>
> -Matthias
>
>
> On 12/7/22 6:45 PM, Sagar wrote:
> > Hi Mathias,
> >
> > I did save it. The changes are added under Public Interfaces (Pt#2 about
> > enhancing KeyQueryMetadata with partitions method) and
> > throwing IllegalArgumentException when StreamPartitioner#partitions
> method
> > returns multiple partitions for just FK-join instead of the earlier
> decided
> > FK-Join and IQ.
> >
> > The background is that for IQ, if the users have multi casted records to
> > multiple partitions during ingestion but the fetch returns only a single
> > partition, then it would be wrong. That's why the restriction was lifted
> > for IQ and that's the reason KeyQueryMetadata now has another
> partitions()
> > method to signify the same.
> >
> > FK-Join also has a similar case, but while reviewing it was felt that
> > FK-Join on it's own is fairly complicated and we don't need this feature
> > right away so the restriction still exists.
> >
> > Thanks!
> > Sagar.
> >
> >
> > On Wed, Dec 7, 2022 at 9:42 PM Matthias J. Sax  wrote:
> >
> >> I don't see any update on the wiki about it. Did you forget to hit
> "save"?
> >>
> >> Can you also provide some background? I am not sure right now if I
> >> understand the proposed changes?
> >>
> >>
> >> -Matthias
> >>
> >> On 12/6/22 6:36 PM, Sophie Blee-Goldman wrote:
> >>> Thanks Sagar, this makes sense to me -- we clearly need additional
> >> changes
> >>> to
> >>> avoid breaking IQ when using this feature, but I agree with continuing
> to
> >>> restrict
> >>> FKJ since they wouldn't stop working without it, and would become much
> >>> harder
> >>> to reason about (than they already are) if we did enable them to use
> it.
> >>>
> >>> And of course, they can still multicast the final results of a FKJ,
> they
> >>> just can't
> >>> mess with the internal workings of it in this way.
> >>>
> >>> On Tue, Dec 6, 2022 at 9:48 AM Sagar 
> wrote:
> >>>
> >>>> Hi All,
> >>>>
> >>>> I made a couple of edits to the KIP which came up during the code
> >> review.
> >>>> Changes at a high level are:
> >>>>
> >>>> 1) KeyQueryMetada enhanced to have a new method called partitions().
> >>>> 2) Lifting the restriction of a single partition for IQ. Now the
> >>>> restriction holds only for FK Join.
> >>>>
> >>>> Updated KIP:
> >>>>
> >>
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=211883356
> >>>>
> >>>> Thanks!
> >>>> Sagar.
> >>>>
> >>>> On Mon, Sep 12, 2022 at 6:43 PM Sagar 
> >> wrote:
> >>>>
> >>>>> Thanks Bruno,
> >>>>>
> >>>>> Marking this as accepted.
> >>>>>
> >>>>> Thanks everyone for their comments/feedback.
> >>>>>
> >>>>> Thanks!
> >>>>> Sagar.
> >>>>>
> >>>>> On Mon, Sep 12, 2022 at 1:53 PM Bruno Cadonna 
> >>>> wrote:
> >>>>>
> >>>>>> Hi Sagar,
> >>>>>>
> >>>>>> Thanks for the update and the PR!
> >>>>&g

Re: [VOTE] KIP-837 Allow MultiCasting a Result Record.

2022-12-07 Thread Sagar
Hi Mathias,

I did save it. The changes are added under Public Interfaces (Pt#2 about
enhancing KeyQueryMetadata with partitions method) and
throwing IllegalArgumentException when StreamPartitioner#partitions method
returns multiple partitions for just FK-join instead of the earlier decided
FK-Join and IQ.

The background is that for IQ, if the users have multi casted records to
multiple partitions during ingestion but the fetch returns only a single
partition, then it would be wrong. That's why the restriction was lifted
for IQ and that's the reason KeyQueryMetadata now has another partitions()
method to signify the same.

FK-Join also has a similar case, but while reviewing it was felt that
FK-Join on it's own is fairly complicated and we don't need this feature
right away so the restriction still exists.

Thanks!
Sagar.


On Wed, Dec 7, 2022 at 9:42 PM Matthias J. Sax  wrote:

> I don't see any update on the wiki about it. Did you forget to hit "save"?
>
> Can you also provide some background? I am not sure right now if I
> understand the proposed changes?
>
>
> -Matthias
>
> On 12/6/22 6:36 PM, Sophie Blee-Goldman wrote:
> > Thanks Sagar, this makes sense to me -- we clearly need additional
> changes
> > to
> > avoid breaking IQ when using this feature, but I agree with continuing to
> > restrict
> > FKJ since they wouldn't stop working without it, and would become much
> > harder
> > to reason about (than they already are) if we did enable them to use it.
> >
> > And of course, they can still multicast the final results of a FKJ, they
> > just can't
> > mess with the internal workings of it in this way.
> >
> > On Tue, Dec 6, 2022 at 9:48 AM Sagar  wrote:
> >
> >> Hi All,
> >>
> >> I made a couple of edits to the KIP which came up during the code
> review.
> >> Changes at a high level are:
> >>
> >> 1) KeyQueryMetada enhanced to have a new method called partitions().
> >> 2) Lifting the restriction of a single partition for IQ. Now the
> >> restriction holds only for FK Join.
> >>
> >> Updated KIP:
> >>
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=211883356
> >>
> >> Thanks!
> >> Sagar.
> >>
> >> On Mon, Sep 12, 2022 at 6:43 PM Sagar 
> wrote:
> >>
> >>> Thanks Bruno,
> >>>
> >>> Marking this as accepted.
> >>>
> >>> Thanks everyone for their comments/feedback.
> >>>
> >>> Thanks!
> >>> Sagar.
> >>>
> >>> On Mon, Sep 12, 2022 at 1:53 PM Bruno Cadonna 
> >> wrote:
> >>>
> >>>> Hi Sagar,
> >>>>
> >>>> Thanks for the update and the PR!
> >>>>
> >>>> +1 (binding)
> >>>>
> >>>> Best,
> >>>> Bruno
> >>>>
> >>>> On 10.09.22 18:57, Sagar wrote:
> >>>>> Hi Bruno,
> >>>>>
> >>>>> Thanks, I think these changes make sense to me. I have updated the
> KIP
> >>>>> accordingly.
> >>>>>
> >>>>> Thanks!
> >>>>> Sagar.
> >>>>>
> >>>>> On Wed, Sep 7, 2022 at 2:16 PM Bruno Cadonna 
> >>>> wrote:
> >>>>>
> >>>>>> Hi Sagar,
> >>>>>>
> >>>>>> I would not drop the support for dropping records. I would also not
> >>>>>> return null from partitions(). Maybe an Optional can help here. An
> >>>> empty
> >>>>>> Optional would mean to use the default partitioning behavior of the
> >>>>>> producer. So we would have:
> >>>>>>
> >>>>>> - non-empty Optional, non-empty list of integers: partitions to send
> >>>> the
> >>>>>> record to
> >>>>>> - non-empty Optional, empty list of integers: drop the record
> >>>>>> - empty Optional: use default behavior
> >>>>>>
> >>>>>> What do other think?
> >>>>>>
> >>>>>> Best,
> >>>>>> Bruno
> >>>>>>
> >>>>>> On 02.09.22 13:53, Sagar wrote:
> >>>>>>> Hello Bruno/Chris,
> >>>>>>>
> >>>>>>> Since these are the last set of changes(I am assuming haha), it
> >> would
> >>>> be
> >>>>>>> great if you c

[jira] [Created] (KAFKA-14454) KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest#shouldThrowIllegalArgumentExceptionWhenCustomPartionerReturnsMultiplePartitions passes when run individua

2022-12-07 Thread Sagar Rao (Jira)
Sagar Rao created KAFKA-14454:
-

 Summary: 
KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest#shouldThrowIllegalArgumentExceptionWhenCustomPartionerReturnsMultiplePartitions
 passes when run individually but not when is run as part of the IT
 Key: KAFKA-14454
 URL: https://issues.apache.org/jira/browse/KAFKA-14454
 Project: Kafka
  Issue Type: Bug
Reporter: Sagar Rao
Assignee: Sagar Rao


Newly added test 
KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest#shouldThrowIllegalArgumentExceptionWhenCustomPartionerReturnsMultiplePartitions
 as part of KIP-837 passes when run individually but fails when is part of IT 
class and hence is marked as Ignored. 

As part of this ticket, we can also look to move to Junit5 annotations for this 
class since it relies on Junit4 ones.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [VOTE] KIP-837 Allow MultiCasting a Result Record.

2022-12-06 Thread Sagar
Hi All,

I made a couple of edits to the KIP which came up during the code review.
Changes at a high level are:

1) KeyQueryMetada enhanced to have a new method called partitions().
2) Lifting the restriction of a single partition for IQ. Now the
restriction holds only for FK Join.

Updated KIP:
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=211883356

Thanks!
Sagar.

On Mon, Sep 12, 2022 at 6:43 PM Sagar  wrote:

> Thanks Bruno,
>
> Marking this as accepted.
>
> Thanks everyone for their comments/feedback.
>
> Thanks!
> Sagar.
>
> On Mon, Sep 12, 2022 at 1:53 PM Bruno Cadonna  wrote:
>
>> Hi Sagar,
>>
>> Thanks for the update and the PR!
>>
>> +1 (binding)
>>
>> Best,
>> Bruno
>>
>> On 10.09.22 18:57, Sagar wrote:
>> > Hi Bruno,
>> >
>> > Thanks, I think these changes make sense to me. I have updated the KIP
>> > accordingly.
>> >
>> > Thanks!
>> > Sagar.
>> >
>> > On Wed, Sep 7, 2022 at 2:16 PM Bruno Cadonna 
>> wrote:
>> >
>> >> Hi Sagar,
>> >>
>> >> I would not drop the support for dropping records. I would also not
>> >> return null from partitions(). Maybe an Optional can help here. An
>> empty
>> >> Optional would mean to use the default partitioning behavior of the
>> >> producer. So we would have:
>> >>
>> >> - non-empty Optional, non-empty list of integers: partitions to send
>> the
>> >> record to
>> >> - non-empty Optional, empty list of integers: drop the record
>> >> - empty Optional: use default behavior
>> >>
>> >> What do other think?
>> >>
>> >> Best,
>> >> Bruno
>> >>
>> >> On 02.09.22 13:53, Sagar wrote:
>> >>> Hello Bruno/Chris,
>> >>>
>> >>> Since these are the last set of changes(I am assuming haha), it would
>> be
>> >>> great if you could review the 2 options from above so that we can
>> close
>> >> the
>> >>> voting. Of course I am happy to incorporate any other requisite
>> changes.
>> >>>
>> >>> Thanks!
>> >>> Sagar.
>> >>>
>> >>> On Wed, Aug 31, 2022 at 10:07 PM Sagar 
>> >> wrote:
>> >>>
>> >>>> Thanks Bruno for the great points.
>> >>>>
>> >>>> I see 2 options here =>
>> >>>>
>> >>>> 1) As Chris suggested, drop the support for dropping records in the
>> >>>> partitioner. That way, an empty list could signify the usage of a
>> >> default
>> >>>> partitioner. Also, if the deprecated partition() method returns null
>> >>>> thereby signifying the default partitioner, the partitions() can
>> return
>> >> an
>> >>>> empty list i.e default partitioner.
>> >>>>
>> >>>> 2) OR we treat a null return type of partitions() method to signify
>> the
>> >>>> usage of the default partitioner. In the default implementation of
>> >>>> partitions() method, if partition() returns null, then even
>> partitions()
>> >>>> can return null(instead of an empty list). The RecordCollectorImpl
>> code
>> >> can
>> >>>> also be modified accordingly. @Chris, to your point, we can even drop
>> >> the
>> >>>> support of dropping of records. It came up during KIP discussion,
>> and I
>> >>>> thought it might be a useful feature. Let me know what you think.
>> >>>>
>> >>>> 3) Lastly about the partition number check. I wanted to avoid the
>> >> throwing
>> >>>> of exception so I thought adding it might be a useful feature. But as
>> >> you
>> >>>> pointed out, if it can break backwards compatibility, it's better to
>> >> remove
>> >>>> it.
>> >>>>
>> >>>> Thanks!
>> >>>> Sagar.
>> >>>>
>> >>>>
>> >>>> On Tue, Aug 30, 2022 at 6:32 PM Chris Egerton
>> 
>> >>>> wrote:
>> >>>>
>> >>>>> +1 to Bruno's concerns about backward compatibility. Do we actually
>> >> need
>> >>>>> support for dropping records in the partitioner? It doesn't seem
>> >> necessary
>> >>

Re: [DISCUSS] KIP-889 Versioned State Stores

2022-12-01 Thread Sagar
Thanks Victoria,

I guess an advantage of exposing a method like delete(key, timestamp) could
be that from a user's standpoint, it is a single operation and not 2. The
equivalent of this method i.e put followed by get is not atomic so exposing
it certainly sounds like a good idea.

Thanks!
Sagar.

On Tue, Nov 29, 2022 at 1:15 AM Victoria Xia
 wrote:

> Thanks, Sagar and Bruno, for your insights and comments!
>
> > Sagar: Can we name according to the semantics that you want to
> support like `getAsOf` or something like that? I am not sure if we do that
> in our codebase though. Maybe the experts can chime in.
>
> Because it is a new method that will be added, we should be able to name it
> whatever we like. I agree `getAsOf` is more clear, albeit wordier.
> Introducing `getAsOf(key, timestamp)` means we could leave open `get(key,
> timeFrom, timeTo)` to have an exclusive `timeTo` without introducing a
> collision. (We could introduce `getBetween(key, timeFrom, timeTo)` instead
> to delineate even more clearly, though this is better left for a future
> KIP.)
>
> I don't think there's any existing precedent in codebase to follow here but
> I'll leave that to the experts. Curious to hear what others prefer as well.
>
> > Sagar: With delete, we would stlll keep the older versions of the key
> right?
>
> We could certainly choose this for the semantics of delete(...) -- and it
> sounds like we should too, based on Bruno's confirmation below that this
> feels more natural to him as well -- but as Bruno noted in his message
> below I think we'll want the method signature to be `delete(key,
> timestamp)` then, so that there is an explicit timestamp to associate with
> the deletion. In other words, `delete(key, timestamp)` has the same effect
> as `put(key, null, timestamp)`. The only difference is that the `put(...)`
> method has a `void` return type, while `delete(key, timestamp)` can have
> `ValueAndTimestamp` as return type in order to return the record which is
> replaced (if any). In other words, `delete(key, timestamp)` is equivalent
> to `put(key, null, timestamp)` followed by `get(key, timestamp)`.
>
> > Bruno: I would also not change the semantics so that it deletes all
> versions of
> a key. I would rather add a new method purge(key) or
> deleteAllVersions(key) or similar if we want to have such a method in
> this first KIP.
>
> Makes sense; I'm convinced. Let's defer
> `purge(key)`/`deleteAllVersions(key)` to a future KIP. If there's agreement
> that `delete(key, timestamp)` (as described above) is valuable, we can keep
> it in this first KIP even though it is syntactic sugar. If this turns into
> a larger discussion, we can defer this to a future KIP as well.
>
> > Bruno: I would treat the history retention as a strict limit. [...] You
> could also add historyRetentionMs() to the VersionedKeyValueStore
> interface to make the concept of the history retention part of the
> interface.
>
> OK. That's the second vote for rewording the javadoc for
> `VersionedKeyValueStore#get(key, timestampTo)` to remove the parenthetical
> and clarify that history retention should be used to dictate this case, so
> I'll go ahead and do that. I'll leave out adding `historyRetentionMs()` to
> the interface for now, though, for the sake of consistency with other
> stores (e.g., window stores) which don't expose similar types of
> configurations from their interfaces.
>
> > Bruno: exclusive vs inclusive regarding validTo timestamp in get().
> Doesn't this decision depend on the semantics of the join for which this
> state store should be used?
>
> Yes, you are correct. As a user I would expect that a stream-side record
> with the same timestamp as a table-side record _would_ produce a join
> result, which is consistent with the proposal for timestampTo to be
> inclusive. (FWIW I tried this out with a Flink temporal join just now and
> observed this result as well. Not sure where to look for other standards to
> validate this expectation.)
>
> > Bruno: If Streams does not update min.compaction.lag.ms during
> rebalances,
> users have to do it each time they change history retention in the code,
> right? That seems odd to me. What is the actual reason for not updating
> the config? How does Streams handle updates to windowed stores?
>
> Yes, users will have to update min.compaction.lag.ms for the changelog
> topic themselves if they update history retention in their code. This is
> consistent with what happens for window stores today: e.g., if a user
> updates grace period for a windowed aggregation, then they are responsible
> for updating retention.ms on their windowed changelog topic as well.
>
> I'm not familiar with the historical context around why this is the c

Re: [DISCUSS] KIP-889 Versioned State Stores

2022-11-23 Thread Sagar
Hi Vicky,

Thanks for your response!

I would just use numbers to refer to your comments.

1) Thanks for your response. Even I am not totally sure whether these
should be supported via IQv2 or via store interface. That said, I wouldn't
definitely qualify this as  blocking the KIP for sure so we can live
without it :)

2) Yeah if the 2 APIs for get have different semantics for timestampTo,
then it could be confusing. I went through the link for temporal tables
(TFS!) and I now get why the AS OF semantics would have it inclusive. I
think part of the problem is that the name get on it's own is not as
expressive as SQL. Can we name according to the semantics that you want to
support like `getAsOf` or something like that? I am not sure if we do that
in our codebase though. Maybe the experts can chime in.

3) hmm I would have named it `validUpto` But again not very picky about it.
After going through the link and your KIP, it's a lot clearer to me.

4) I think delete(key) should be sufficient. With delete, we would
stlll keep the older versions of the key right?

Thanks!
Sagar.

On Wed, Nov 23, 2022 at 12:17 AM Victoria Xia
 wrote:

> Thanks, Matthias and Sagar, for your comments! I've responded here for now,
> and will update the KIP afterwards with the outcome of our discussions as
> they resolve.
>
> --- Matthias's comments ---
>
> > (1) Why does the new store not extend KeyValueStore, but StateStore?
> In the end, it's a KeyValueStore?
>
> A `VersionedKeyValueStore` is not a `KeyValueStore` because
> many of the KeyValueStore methods would not make sense for a versioned
> store. For example, `put(K key, V value)` is not meaningful for a versioned
> store because the record needs a timestamp associated with it.
>
> A `VersionedKeyValueStore` is more similar to a `KeyValueStore ValueAndTimestamp>` (i.e., `TimestampedKeyValueStore`), but some
> of the TimestampedKeyValueStore methods are still problematic. For example,
> what does it mean for `delete(K key)` to have return type
> `ValueAndTimestamp`? Does this mean that `delete(K key)` only deletes
> (and returns) the latest record version for the key? Probably we want a
> versioned store to have `delete(K key)` delete all record versions for the
> given key, in which case the return type is better suited as an
> iterator/collection of KeyValueTimestamp. `putIfAbsent(K key,
> ValueAndTimestamp value)` also has ambiguous semantics for versioned stores
> (i.e., what does it mean for the key/record to be "absent").
>
> I agree that conceptually a versioned key-value store is just a key-value
> store, though. In the future if we redesign the store interfaces, it'd be
> great to unify them by having a more generic KeyValueStore interface that
> allows for extra flexibility to support different types of key-value
> stores, including versioned stores. (Or, if you can think of a way to
> achieve this with the existing interfaces today, I'm all ears!)
>
> > (2) Should we have a ReadOnlyVersionedKeyValueStore? Even if we don't
> want to support IQ in this KIP, it might be good to add this interface
> right away to avoid complications for follow up KIPs? Or won't there by
> any complications anyway?
>
> I don't think there will be complications for refactoring to add this
> interface in the future. Refactoring out ReadOnlyVersionedKeyValueStore
> from VersionedKeyValueStore would leave VersionedKeyValueStore unchanged
> from the outside.
>
> Also, is it true that the ReadOnlyKeyValueStore interface is only used for
> IQv1 and not IQv2? I think it's an open question as to whether we should
> support IQv1 for versioned stores or only IQv2. If the latter, then maybe
> we won't need the extra interface at all.
>
> > (3) Why do we not have a `delete(key)` method? I am ok with not
> supporting all methods from existing KV-store, but a `delete(key)` seems
> to be fundamentally to have?
>
> What do you think the semantics of `delete(key)` should be for versioned
> stores? Should `delete(key)` delete (and return) all record versions for
> the key? Or should we have `delete(key, timestamp)` which is equivalent to
> `put(key, null, timestamp)` except with a return type to return
> ValueAndTimestamp representing the record it replaced?
>
> If we have ready alignment on what the interface and semantics for
> `delete(key)` should be, then adding it in this KIP sounds good. I just
> didn't want the rest of the KIP to be hung up over additional interfaces,
> given that we can always add extra interfaces in the future.
>
> > (4a) Do we need `get(key)`? It seems to be the same as `get(key,
> MAX_VALUE)`? Maybe is good to have as syntactic sugar though? Just for
> my own clarification (should we add something to the JavaDocs?).
>
> Correct, it is 

Re: [DISCUSS] KIP-889 Versioned State Stores

2022-11-22 Thread Sagar
Hi Victoria,

Thanks for the KIP. Seems like a very interesting idea!

I have a couple of questions:

1) Did you consider adding a method similar to :
List> get(K key, long from, long to)?

I think this could be useful considering that this
versioning scheme unlocks time travel at a key basis. WDYT?

2) I have a similar question as Matthias, about the timestampTo argument
when doing a get. Is it inclusive or exclusive?

3) validFrom sounds slightly confusing to me. It is essentially the
timestamp at which the record was inserted. validFrom makes it sound like
validTo which can keep changing based on new records while *from* is fixed.
WDYT?

4) Even I think delete api should be supported.

Thanks!
Sagar.

On Tue, Nov 22, 2022 at 8:02 AM Matthias J. Sax  wrote:

> Thanks for the KIP Victoria. Very well written!
>
>
> Couple of questions (many might just require to add some more details to
> the KIP):
>
>   (1) Why does the new store not extend KeyValueStore, but StateStore?
> In the end, it's a KeyValueStore?
>
>   (2) Should we have a ReadOnlyVersionedKeyValueStore? Even if we don't
> want to support IQ in this KIP, it might be good to add this interface
> right away to avoid complications for follow up KIPs? Or won't there by
> any complications anyway?
>
>   (3) Why do we not have a `delete(key)` method? I am ok with not
> supporting all methods from existing KV-store, but a `delete(key)` seems
> to be fundamentally to have?
>
>   (4a) Do we need `get(key)`? It seems to be the same as `get(key,
> MAX_VALUE)`? Maybe is good to have as syntactic sugar though? Just for
> my own clarification (should we add something to the JavaDocs?).
>
>   (4b) Should we throw an exception if a user queries out-of-bound
> instead of returning `null` (in `get(key,ts)`)?
>-> You put it into "rejected alternatives", and I understand your
> argument. Would love to get input from others about this question
> though. -- It seems we also return `null` for windowed stores, so maybe
> the strongest argument is to align to existing behavior? Or do we have
> case for which the current behavior is problematic?
>
>   (4c) JavaDoc on `get(key,ts)` says: "(up to store implementation
> discretion when this is the case)" -> Should we make it a stricter
> contract such that the user can reason about it better (there is WIP to
> make retention time a strict bound for windowed stores atm)
>-> JavaDocs on `persistentVersionedKeyValueStore` seems to suggest a
> strict bound, too.
>
>   (5a) Do we need to expose `segmentInterval`? For windowed-stores, we
> also use segments but hard-code it to two (it was exposed in earlier
> versions but it seems not useful, even if we would be open to expose it
> again if there is user demand).
>
>   (5b) JavaDocs says: "Performance degrades as more record versions for
> the same key are collected in a single segment. On the other hand,
> out-of-order writes and reads which access older segments may slow down
> if there are too many segments." -- Wondering if JavaDocs should make
> any statements about expected performance? Seems to be an implementation
> detail?
>
>   (6) validTo timestamp is "exclusive", right? Ie, if I query
> `get(key,ts[=validToV1])` I would get `null` or the "next" record v2
> with validFromV2=ts?
>
>   (7) The KIP says, that segments are stores in the same RocksDB -- for
> this case, how are efficient deletes handled? For windowed-store, we can
> just delete a full RocksDB.
>
>   (8) Rejected alternatives: you propose to not return the validTo
> timestamp -- if we find it useful in the future to return it, would
> there be a clean path to change it accordingly?
>
>
> -Matthias
>
>
> On 11/16/22 9:57 PM, Victoria Xia wrote:
> > Hi everyone,
> >
> > I have a proposal for introducing versioned state stores in Kafka
> Streams.
> > Versioned state stores are similar to key-value stores except they can
> > store multiple record versions for a single key. This KIP focuses on
> > interfaces only in order to limit the scope of the KIP.
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-889%3A+Versioned+State+Stores
> >
> > Thanks,
> > Victoria
> >
>


Re: [DISCUSS] KIP-891: Running multiple versions of a connector.

2022-11-22 Thread Sagar
Hey Snehashsih,

Thanks for the KIP. It looks like a very useful feature. Couple of
small-ish points, let me know what you think:

1) Should we update the GET /connectors endpoint to include the version of
the plugin that is running? It could be useful to figure out the version of
the plugin or I am assuming it gets returned by the expand=info call?
2) I am not aware of this and hence asking, can 2 connectors with different
versions have the same name? Does the plugin isolation allow this? This
could have a bearing when using the lifecycle endpoints for connectors like
DELETE etc.

Thanks!
Sagar.


On Tue, Nov 22, 2022 at 2:10 PM Ashwin  wrote:

> Hi Snehasis,
>
> > IIUC (please correct me if I am wrong here), what you highlighted above,
> is
> a versioning scheme for a connector config for the same connector (and not
> different versions of a connector plugin).
>
> Sorry for not being more precise in my wording -  I meant registering
> versions of schema for connector config.
>
> Let's take the example of a fictional connector which uses a fictional AWS
> service.
>
> Fictional Connector Config schema version:2.0
> ---
> {
>   "$schema": "http://json-schema.org/draft-04/schema#;,
>   "type": "object",
>   "properties": {
> "name": {
>   "type": "string"
> },
> "schema_version": {
>   "type": "string"
> },
> "aws_access_key": {
>   "type": "string"
> },
> "aws_secret_key": {
>   "type": "string"
> }
>   },
>   "required": [
> "name",
> "schema_version",
> "aws_access_key",
> "aws_secret_key"
>   ]
> }
>
> Fictional Connector config schema version:3.0
> ---
> {
>   "$schema": "http://json-schema.org/draft-04/schema#;,
>   "type": "object",
>   "properties": {
> "name": {
>   "type": "string"
> },
> "schema_version": {
>   "type": "string"
> },
> "iam_role": {
>   "type": "string"
> }
>   },
>   "required": [
> "name",
> "schema_version",
> "iam_role"
>   ]
> }
>
> The connector which supports Fictional config schema 2.0  will validate the
> access key and secret key.
> Whereas a connector which supports config with schema version 3.0 will only
> validate the IAM role.
>
> This is the alternative which I wanted to suggest. Each plugin will
> register the schema versions of connector config which it supports.
>
> The plugin paths may be optionally different i.e  we don't have to
> mandatorily add a new plugin path to support a new schema version.
>
> Thanks,
> Ashwin
>
> On Tue, Nov 22, 2022 at 12:47 PM Snehashis 
> wrote:
>
> > Thanks for the input Ashwin.
> >
> > > 1. Can you elaborate on the rejected alternatives ? Suppose connector
> > > config is versioned and has a schema. Then a single plugin (whose
> > > dependencies have not changed) can handle multiple config versions for
> > the
> > > same connector class.
> >
> > IIUC (please correct me if I am wrong here), what you highlighted above,
> is
> > a versioning scheme for a connector config for the same connector (and
> not
> > different versions of a connector plugin). That is a somewhat tangential
> > problem. While it is definitely a useful feature to have, like a log to
> > check what changes were made over time to the config which might make it
> > easier to do rollbacks, it is not the focus here. Here by version we mean
> > to say what underlying version of the plugin should the given
> configuration
> > of the connector use. Perhaps it is better to change the name of the
> > parameter from connector.version to connector.plugin.version or
> > plugin.version if it was confusing. wdyt?
> >
> > >  2. Any plans to support assisted migration e.g if a user invokes "POST
> > > connector/config?migrate=latest", the latest version __attempts__ to
> > > transform the existing config to the newer version. This would require
> > > adding a method like "boolean migrate(Version fromVersion)" to the
> > > connector interface.
> >
> > This is an enhancement we can think of doing in future. Users can simply
> do
> > a PUT call with the updated config which has the updated version number.
> > The assisted mod

  1   2   3   >