[jira] [Resolved] (KAFKA-12734) LazyTimeIndex & LazyOffsetIndex may cause niobufferoverflow when skip activeSegment sanityCheck

2021-04-30 Thread Wenbing Shen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12734?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenbing Shen resolved KAFKA-12734.
--
Resolution: Duplicate

Duplicate with jiraId KAFKA-10471, This problem has been fixed in version 2.8.

> LazyTimeIndex & LazyOffsetIndex may cause niobufferoverflow when skip 
> activeSegment  sanityCheck
> 
>
> Key: KAFKA-12734
> URL: https://issues.apache.org/jira/browse/KAFKA-12734
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 2.4.0, 2.5.0, 2.6.0, 2.7.0, 2.8.0
>Reporter: Wenbing Shen
>Assignee: Wenbing Shen
>Priority: Blocker
> Attachments: LoadIndex.png, image-2021-04-30-22-49-24-202.png, 
> niobufferoverflow.png
>
>
> This question is similar to KAFKA-9156
> We introduced Lazy Index, which helps us skip checking the index files of all 
> log segments when starting kafka, which has greatly improved the speed of our 
> kafka startup.
> Unfortunately, it skips the index file detection of the active segment. The 
> active segment will receive write requests from the client or the replica 
> synchronization thread.
> There is a situation when we skip the index detection of all segments, and we 
> do not need to recover the unflushed log segment, and the index file of the 
> last active segment is damaged at this time. When appending data to the 
> active segment, at this time The program reported an error.
> Below are the problems I encountered in the production environment:
> When Kafka starts to load the log segment, I see in the program log that the 
> memory mapping position of the index file with timestamp and offset is at the 
> larger position of the current index file, but in fact, the index file is not 
> written With so many index items, I guess this kind of problem will occur 
> during the kafka startup process. When kafka has not been started yet, stop 
> the kafka process at this time, and then start the kafka process again, 
> whether it will cause the limit address of the index file memory map to be a 
> file The maximum value is not cut to the actual size used, which will cause 
> the memory map position to be set to limit when Kafka is started.
>  At this time, adding data to the active segment will cause niobufferoverflow.
> I agree to skip the index detection of all inactive segments, because in fact 
> they will no longer receive write requests, but for active segments, we need 
> to perform index file detection.
>  Another situation is that we have CleanShutdown, but due to some factors, 
> the index file of the active segment sets the position of the memory map to 
> limit, resulting in a niobuffer overflow in the write



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12742) 5. Checkpoint all uncorrupted state stores within the subtopology

2021-04-30 Thread A. Sophie Blee-Goldman (Jira)
A. Sophie Blee-Goldman created KAFKA-12742:
--

 Summary: 5. Checkpoint all uncorrupted state stores within the 
subtopology
 Key: KAFKA-12742
 URL: https://issues.apache.org/jira/browse/KAFKA-12742
 Project: Kafka
  Issue Type: Sub-task
  Components: streams
Reporter: A. Sophie Blee-Goldman


Once we have [KAFKA-12740|https://issues.apache.org/jira/browse/KAFKA-12740], 
we can close the loop on EOS by checkpointing not only those state stores which 
are attached to processors that the record has successfully passed, but also 
any remaining state stores further downstream in the subtopology that aren't 
connected to the processor where the error occurred.
At this point, outside of a hard crash (eg process is killed) or dropping out 
of the group, we’ll only ever need to restore state stores from scratch if the 
exception came from the specific processor node they’re attached to. Which is 
pretty darn cool



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12741) 4. Handle thread-wide errors

2021-04-30 Thread A. Sophie Blee-Goldman (Jira)
A. Sophie Blee-Goldman created KAFKA-12741:
--

 Summary: 4. Handle thread-wide errors
 Key: KAFKA-12741
 URL: https://issues.apache.org/jira/browse/KAFKA-12741
 Project: Kafka
  Issue Type: Sub-task
  Components: streams
Reporter: A. Sophie Blee-Goldman


Although just implementing 1-3 will likely be a significant improvement, we 
will eventually want to tackle the case of an exception that affects all tasks 
on that thread. I think in this case we would just need to keep retrying the 
specific call that’s failing, whatever that may be. This would almost certainly 
require solution #3/4 as a prerequisite as we would need to keep retrying on 
that thread’s Producer. Of course, with KIP-572 we’ll already retry most kinds 
of errors on the Producer, if not all. So this would most likely only apply to 
a few kinds of exceptions, such as any Consumer calls that fail for reasons 
besides having dropped out of the group.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12740) 3. Resume processing from last-cleared processor after soft crash

2021-04-30 Thread A. Sophie Blee-Goldman (Jira)
A. Sophie Blee-Goldman created KAFKA-12740:
--

 Summary: 3. Resume processing from last-cleared processor after 
soft crash
 Key: KAFKA-12740
 URL: https://issues.apache.org/jira/browse/KAFKA-12740
 Project: Kafka
  Issue Type: Sub-task
  Components: streams
Reporter: A. Sophie Blee-Goldman


Building off of that, we can go one step further and avoid duplicate work 
within the subtopology itself. Any time a record is partially processed through 
a subtopology before hitting an error, all of the processors up to that point 
will be applied again when the record is reprocessed. If we can keep track of 
how far along the subtopology a record was processed, then we can start 
reprocessing it from the last processor it had cleared before hitting an error.
This proposal is the most likely to benefit from letting a StreamThread recover 
after an unexpected exception rather than letting it die and starting up a new 
one, as we don’t need to worry about handing anything off from the dying thread 
to its replacement. 




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12739) 2. Commit any cleanly-processed records within a corrupted task

2021-04-30 Thread A. Sophie Blee-Goldman (Jira)
A. Sophie Blee-Goldman created KAFKA-12739:
--

 Summary: 2. Commit any cleanly-processed records within a 
corrupted task
 Key: KAFKA-12739
 URL: https://issues.apache.org/jira/browse/KAFKA-12739
 Project: Kafka
  Issue Type: Sub-task
  Components: streams
Reporter: A. Sophie Blee-Goldman


Within a task, there will typically be a number of records that have been 
successfully processed through the subtopology but not yet committed. If the 
next record to be picked up hits an unexpected exception, we’ll dirty close the 
entire task and essentially throw away all the work we did on those previous 
records. We should be able to drop only the corrupted record and just commit 
the offsets up to that point. Again, for some exceptions such as 
de/serialization or user code errors, this can be straightforward as the 
thread/task is otherwise in a healthy state. Other cases such as an error in 
the Producer will need to be tackled separately, since a Producer error cannot 
be isolated to a single task.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12738) Improved error handling for better at-least-once semantics and faster EOS

2021-04-30 Thread A. Sophie Blee-Goldman (Jira)
A. Sophie Blee-Goldman created KAFKA-12738:
--

 Summary: Improved error handling for better at-least-once 
semantics and faster EOS
 Key: KAFKA-12738
 URL: https://issues.apache.org/jira/browse/KAFKA-12738
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: A. Sophie Blee-Goldman


Umbrella ticket for various ideas I had to improve the error handling behavior 
of Streams



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12737) Commit all healthy tasks after a task-specific error for better task isolation and reduced overcounting under ALOS

2021-04-30 Thread A. Sophie Blee-Goldman (Jira)
A. Sophie Blee-Goldman created KAFKA-12737:
--

 Summary: Commit all healthy tasks after a task-specific error for 
better task isolation and reduced overcounting under ALOS
 Key: KAFKA-12737
 URL: https://issues.apache.org/jira/browse/KAFKA-12737
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: A. Sophie Blee-Goldman


At the moment any time we hit the exception handler, an unclean shutdown will 
be triggered on that thread, which means no tasks will be committed. For 
certain kinds of exceptions this is unavoidable: for example if the consumer 
has dropped out of the group then by definition it can’t commit during 
shutdown, and the task will have already been reassigned to another 
StreamThread. However there are many common scenarios in which we can (and 
should) attempt to commit all the tasks which are in a clean state, ie everyone 
except for the task currently being processed when the exception occurred. A 
good example of this is de/serialization or user code errors, as well as 
exceptions that occur during an operation like closing or suspending a 
particular task. In all those cases, there’s no need to throw away all the 
progress that has been made by the unaffected tasks who just happened to be 
assigned to the same StreamThread.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: Requesting to be added to Kafka project

2021-04-30 Thread Bill Bejeck
Hi Alyssa,

I've added you to the contributors role in Jira so you should be able to
self-assign tickets now.

-Bill

On Fri, Apr 30, 2021 at 5:26 PM Alyssa Huang 
wrote:

> Oops, yes I'd like to be added to JIRA, my username is alyssahuang.
>
> Thanks!
>
> On Fri, Apr 30, 2021 at 2:06 PM Justine Olshan
> 
> wrote:
>
> > Hi Alyssa,
> > Are you asking to be added to JIRA? If so, can you provide your jira
> > username?
> >
> > Thanks,
> > Justine
> >
> > On Fri, Apr 30, 2021 at 9:48 AM Alyssa Huang  >
> > wrote:
> >
> > > Hello,
> > >
> > > I'm interested in contributing to Kafka! Can I be added to the project?
> > >
> > > Best,
> > > Alyssa
> > >
> >
>


Jenkins build is still unstable: Kafka » Kafka Branch Builder » trunk #83

2021-04-30 Thread Apache Jenkins Server
See 




Re: Requesting to be added to Kafka project

2021-04-30 Thread Alyssa Huang
Oops, yes I'd like to be added to JIRA, my username is alyssahuang.

Thanks!

On Fri, Apr 30, 2021 at 2:06 PM Justine Olshan 
wrote:

> Hi Alyssa,
> Are you asking to be added to JIRA? If so, can you provide your jira
> username?
>
> Thanks,
> Justine
>
> On Fri, Apr 30, 2021 at 9:48 AM Alyssa Huang 
> wrote:
>
> > Hello,
> >
> > I'm interested in contributing to Kafka! Can I be added to the project?
> >
> > Best,
> > Alyssa
> >
>


Re: Requesting to be added to Kafka project

2021-04-30 Thread Justine Olshan
Hi Alyssa,
Are you asking to be added to JIRA? If so, can you provide your jira
username?

Thanks,
Justine

On Fri, Apr 30, 2021 at 9:48 AM Alyssa Huang 
wrote:

> Hello,
>
> I'm interested in contributing to Kafka! Can I be added to the project?
>
> Best,
> Alyssa
>


[jira] [Resolved] (KAFKA-12726) misbehaving Task.stop() can prevent other Tasks from stopping

2021-04-30 Thread Ryanne Dolan (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12726?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ryanne Dolan resolved KAFKA-12726.
--
Resolution: Fixed

Closing as duplicate.

> misbehaving Task.stop() can prevent other Tasks from stopping
> -
>
> Key: KAFKA-12726
> URL: https://issues.apache.org/jira/browse/KAFKA-12726
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.8.0
>Reporter: Ryanne Dolan
>Assignee: Ryanne Dolan
>Priority: Minor
>
> We've observed a misbehaving Task fail to stop in a timely manner (e.g. stuck 
> in a retry loop). Despite Connect supporting a property 
> task.shutdown.graceful.timeout.ms, this is currently not enforced – tasks can 
> take as long as they want to stop, and the only consequence is an error 
> message.
> We've seen a Worker's "task-count" metric double following a rebalance, which 
> we think is due to Tasks not getting cleaned up when Task.stop() is stuck.
> While the Connector implementation is ultimately to blame here – a Task 
> probably shouldn't loop forever in stop() – we believe the Connect runtime 
> should handle this situation more gracefully.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Requesting to be added to Kafka project

2021-04-30 Thread Alyssa Huang
Hello,

I'm interested in contributing to Kafka! Can I be added to the project?

Best,
Alyssa


Re: [VOTE] KIP-690: Add additional configuration to control MirrorMaker 2 internal topics naming convention

2021-04-30 Thread Ryanne Dolan
+1 (non-binding), thanks!

On Thu, Jan 21, 2021, 4:31 AM Omnia Ibrahim  wrote:

> Hi
> Can I get a vote on this, please?
>
> Best
> Omnia
>
> On Tue, Dec 15, 2020 at 12:16 PM Omnia Ibrahim 
> wrote:
>
>> If anyone interested in reading the discussions you can find it here
>> https://www.mail-archive.com/dev@kafka.apache.org/msg113373.html
>>
>> On Tue, Dec 8, 2020 at 4:01 PM Omnia Ibrahim 
>> wrote:
>>
>>> Hi everyone,
>>> I’m proposing a new KIP for MirrorMaker 2 to add the ability to control
>>> internal topics naming convention. The proposal details are here
>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-690%3A+Add+additional+configuration+to+control+MirrorMaker+2+internal+topics+naming+convention
>>>
>>> Please vote in this thread.
>>> Thanks
>>> Omnia
>>>
>>


[jira] [Created] (KAFKA-12736) KafkaProducer.flush holds onto completed ProducerBatch(s) until flush completed

2021-04-30 Thread Lucas Bradstreet (Jira)
Lucas Bradstreet created KAFKA-12736:


 Summary: KafkaProducer.flush holds onto completed ProducerBatch(s) 
until flush completed
 Key: KAFKA-12736
 URL: https://issues.apache.org/jira/browse/KAFKA-12736
 Project: Kafka
  Issue Type: Improvement
Reporter: Lucas Bradstreet


When flush is called a copy of the incomplete batches is made. This means that 
the full ProducerBatch(s) are held in memory until the flush has completed. For 
batches where the existing memory pool is used this is not as wasteful as the 
memory will already be returned to the pool, but for non pool memory it can 
only be GC'd after the flush has completed. Rather than use copyAll we can make 
a new array with only the produceFuture(s) and await on those.

 
{code:java}
/**
 * Mark all partitions as ready to send and block until the send is complete
 */
public void awaitFlushCompletion() throws InterruptedException {
 try {
 for (ProducerBatch batch : this.incomplete.copyAll())
 batch.produceFuture.await();
 } finally {
 this.flushesInProgress.decrementAndGet();
 }
}

{code}
 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [VOTE] KIP-717: Deprecate batch-size config from console producer

2021-04-30 Thread Kamal Chandraprakash
Bumping up this thread.

On Wed, Mar 17, 2021 at 7:44 PM Manikumar  wrote:

> Hi Kamal,
>
> It looks like we just forgot this config, when we removed old producer
> code.  I think we dont require KIP for this.
> we can directly fix with a minor PR .
>
> Thanks.
>
> On Wed, Mar 17, 2021 at 7:02 PM Dongjin Lee  wrote:
>
> > +1. (non-binding)
> >
> > Thanks,
> > Dongjin
> >
> > On Thu, Mar 11, 2021 at 5:52 PM Manikumar 
> > wrote:
> >
> > > +1 (binding). Thanks for the KIP
> > > I think we can remove the config option as the config option is unused.
> > >
> > > On Wed, Mar 10, 2021 at 3:06 PM Kamal Chandraprakash <
> > > kamal.chandraprak...@gmail.com> wrote:
> > >
> > > > Hi,
> > > >
> > > > I'd like to start a vote on KIP-717 to remove batch-size config from
> > the
> > > > console producer.
> > > >
> > > > https://cwiki.apache.org/confluence/x/DB1RCg
> > > >
> > > > Thanks,
> > > > Kamal
> > > >
> > >
> > --
> > *Dongjin Lee*
> >
> > *A hitchhiker in the mathematical world.*
> >
> >
> >
> > *github:  github.com/dongjinleekr
> > keybase:
> https://keybase.io/dongjinleekr
> > linkedin:
> kr.linkedin.com/in/dongjinleekr
> > speakerdeck:
> > speakerdeck.com/dongjin
> > *
> >
>


[jira] [Created] (KAFKA-12735) "Key Store is not initilalized" after some time for a Keystore created by a transform Values processors

2021-04-30 Thread Thomas Hein (Jira)
Thomas Hein created KAFKA-12735:
---

 Summary: "Key Store is not initilalized" after some time for a 
Keystore created by a transform Values processors
 Key: KAFKA-12735
 URL: https://issues.apache.org/jira/browse/KAFKA-12735
 Project: Kafka
  Issue Type: Bug
  Components: clients
Affects Versions: 2.6.2
Reporter: Thomas Hein


Dear Kafka Fellows,

currently, we are facing problems with Kafka Streams.
We try to transform a set of messages into a state store.

The functionality is working, but after a certain period the application 
returns the error
Key value store is not initialzed.

We tried alot of solutions, like using the Kafka events or loops to wait until 
the store is available again. But the system is not able to healh again.

Colleagues of us use the Kubernetes Health check to restart the application 
when this issues comes up. But we think this is not a proper solution.

What are you recommending?

 

 

Thanks a lot for your help

 

Our Code

 
{code:java}
@Cacheable(value = MYICP_NOTIFICATIONS, key = "#emailAddress", unless = 
"#result == null || #result.cachedObject == null || 
#result.cachedObject.isEmpty()")
public GenericCacheable> 
getMyIcpNotificationsForUser(final String uuid, final String emailAddress) 
throws InterruptedException {
if (!hasText(emailAddress)) {
LOGGER.error("[{}]: getMyIcpNotificationsForUser was called with an 
invalid email address.", uuid);
return new GenericCacheable<>(Collections.emptyList(), null);
}

if (keyValueStore == null) {
initializeStore(uuid);
}

if (keyValueStore == null) {
LOGGER.error("[{}]: Key value store is not initialized.", uuid);
return new GenericCacheable<>(Collections.emptyList(), null);
}

final List> commandList = 
keyValueStore.get(emailAddress);
if (commandList == null) {
return new GenericCacheable<>(Collections.emptyList(), null);
}

//@formatter:off
final List list = commandList
.stream()
.map(this::mapToNotification)
.collect(Collectors.toList());
//@formatter:on

return new GenericCacheable<>(list, LocalDateTime.now());
}
{code}
{code:java}
private void initializeStore(final String uuid) throws InterruptedException {
int counter = 0;
while (counter < 5) {
try {
keyValueStore = myIcpMessagesStream.store(storeName, 
QueryableStoreTypes.keyValueStore());
return;
} catch (final Exception e) {
LOGGER.debug("[{}]: Error while loading the state store [{}]", 
uuid, e.getMessage());
Thread.sleep(1000);
counter++;
}
}
}
{code}
 
{code:java}
public KafkaStreams myIcpMessagesStream(@Qualifier("myIcpEvents") final 
StreamsBuilderFactoryBean streamsBuilderFactoryBean) throws Exception {
final StreamsBuilder myicpQueryStreamBuilder = 
Objects.requireNonNull(streamsBuilderFactoryBean.getObject());

final StoreBuilder>>> 
keyValueStoreBuilder = keyValueStoreBuilder(inMemoryKeyValueStore(storeName), 
Serdes.String(), new CommandListSerde<>());
myicpQueryStreamBuilder.addStateStore(keyValueStoreBuilder);

//@formatter:off
myicpQueryStreamBuilder
.stream(kafkaTopicNames.getMyIcpMessageTopic(), 
Consumed.with(Serdes.String(), new CommandSerde<>()))
.mapValues(this::mapPayloadToMyIcpPayload)
.transformValues(() -> commandTransformer, storeName);
//@formatter:on

final KafkaStreams kafkaStreams = new 
KafkaStreams(myicpQueryStreamBuilder.build(), 
Objects.requireNonNull(streamsBuilderFactoryBean.getStreamsConfiguration()));
kafkaStreams.start();

return kafkaStreams;
}
{code}
 

 
{code:java}
public class CommandTransformer implements 
ValueTransformer, List>> {
private static final Logger LOGGER = 
LoggerFactory.getLogger(CommandTransformer.class);

@Value("${ifx.notificationService.myicp.storeName}")
private String storeName;

@Value("${ifx.notificationService.myicp.maxStoreSize}")
private int maxStoreSize;

private KeyValueStore>> keyValueStore;

@Override
public void init(final ProcessorContext context) {
keyValueStore = (KeyValueStore>>) 
context.getStateStore(storeName);
}

@Override
@CacheEvict(value = MYICP_NOTIFICATIONS, key = 
"#value.payload.user.emailAddress")
public List> transform(final Command 
value) {
if (value == null) {
return Collections.emptyList();
}

final List> listForUser = 
getCommandListForUser(value);

if (isInvalidValue(value, listForUser)) {
return listForUser;
}

if (listForUser.size() >= maxStoreSize) {
listForUser.remove(0);
}

LOGGER.debug("[{}] current list [{}]", 
value.getPayload().getUser().getEmailAddress(), listForUser.size());


Re: [DISCUSS] KIP-690 Add additional configuration to control MirrorMaker 2 internal topics naming convention

2021-04-30 Thread Omnia Ibrahim
Hi Ryanne, Can you vote for it here
https://www.mail-archive.com/dev@kafka.apache.org/msg113575.html as well,
please?

On Fri, Apr 30, 2021 at 12:44 AM Ryanne Dolan  wrote:

> Thanks Omnia. lgtm!
>
> Ryanne
>
> On Thu, Apr 29, 2021 at 10:50 AM Omnia Ibrahim 
> wrote:
>
>> I updated the KIP
>>
>> On Thu, Apr 29, 2021 at 4:43 PM Omnia Ibrahim 
>> wrote:
>>
>>> Sure, this would make it easier, we can make these functions returns the
>>> original behaviour (.checkpoints.internal,
>>> "mm2-offset-syncs..internal", heartbeat) without any
>>> customisation using `replication.policy.separator` and use the separator in
>>> the DefaultReplicationPolicy
>>>
>>> On Wed, Apr 28, 2021 at 1:31 AM Ryanne Dolan 
>>> wrote:
>>>
 Thanks Omnia, makes sense to me.

 > Customers who have their customised ReplicationPolicy will need to
 add the definition of their internal topics naming convention

 I wonder should we include default impls in the interface to avoid that
 requirement?

 Ryanne

 On Sun, Apr 25, 2021, 2:20 PM Omnia Ibrahim 
 wrote:

> Hi Mickael and Ryanne,
> I updated the KIP to add these methods to the ReplicationPolicy
> instead of an extra interface to simplify the changes. Please have a look
> and let me know your thoughts.
>
> Thanks
>
> On Tue, Mar 30, 2021 at 7:19 PM Omnia Ibrahim 
> wrote:
>
>> *(sorry forgot to Replay to All) *
>> Hi Ryanne,
>> It's a valid concern, I was trying to separate the concerns of
>> internal and replicated policy away from each other and to make the code
>> readable as extending ReplicationPolicy to manage both internal and
>> replicated topic is a bit odd. Am not against simplifying things out to
>> make ReplicationPolicy handling both at the end of the day if an MM2 user
>> has a special naming convention for topics it will be affecting both
>> replicated and MM2 internal topics.
>>
>> For simplifying things we can extend `ReplicationPolicy` to the
>> following instead of adding an extra class
>>
>>> *public interface ReplicationPolicy {*
>>> String topicSource(String topic);
>>> String upstreamTopic(String topic);
>>>
>>>
>>> */** Returns heartbeats topic name.*/String heartbeatsTopic();*
>>>
>>>
>>>
>>>
>>>
>>> */** Returns the offset-syncs topic for given cluster alias. */
>>>   String offsetSyncTopic(String targetAlias);/** Returns the name
>>> checkpoint topic for given cluster alias. */String
>>> checkpointTopic(String sourceAlias); *
>>>
>>> default String originalTopic(String topic) {
>>> String upstream = upstreamTopic(topic);
>>> if (upstream == null) {
>>> return topic;
>>> } else {
>>> return originalTopic(upstream);
>>> }
>>> }
>>>
>>>
>>> */** Internal topics are never replicated. */
>>> isInternalTopic(String topic) *//the implementaion will be moved to
>>> `DefaultReplicationPolicy` to handle both kafka topics and MM2 internal
>>> topics.
>>> }
>>>
>>
>> On Fri, Mar 26, 2021 at 3:05 PM Ryanne Dolan 
>> wrote:
>>
>>> Omnia, have we considered just adding methods to ReplicationPolicy?
>>> I'm reluctant to add a new class because, as Mickael points out, we'd 
>>> need
>>> to carry it around in client code.
>>>
>>> Ryanne
>>>
>>> On Fri, Feb 19, 2021 at 8:31 AM Mickael Maison <
>>> mickael.mai...@gmail.com> wrote:
>>>
 Hi Omnia,

 Thanks for the clarifications.

 - I'm still a bit uneasy with the overlap between these 2 methods as
 currently `ReplicationPolicy.isInternalTopic` already handles MM2
 internal topics. Should we make it only handle Kafka internal topics
 and `isMM2InternalTopic()` only handle MM2 topics?

 - I'm not sure I understand what this method is used for. There are
 no
 such methods for the other 2 topics (offset-sync and heartbeat).
 Also
 what happens if there are other MM2 instances using different naming
 schemes in the same cluster. Do all instances have to know about the
 other naming schemes? What are the expected issues if they don't?

 - RemoteClusterUtils is a client-side utility so it does not have
 access to the MM2 configuration. Since this new API can affect the
 name of the checkpoint topic, it will need to be used client-side
 too
 so users can find the checkpoint topic name. I had to realized this
 was the case.

 Thanks

 On Mon, Feb 15, 2021 at 9:33 PM Omnia Ibrahim <
 o.g.h.ibra...@gmail.com> wrote:
 >
 > Hi Mickael, did you have some time to check my answer?
 >