Hi Mason,

Definitely! Feel free to open a PR and ping me for a review.

Cheers, Martijn

On Tue, Oct 4, 2022 at 3:51 PM Mason Chen <mas.chen6...@gmail.com> wrote:

> Hi Martjin,
>
> I notice that this question comes up quite often. Would this be a good
> addition to the KafkaSource documentation? I'd be happy to contribute to
> the documentation.
>
> Best,
> Mason
>
> On Tue, Oct 4, 2022 at 11:23 AM Martijn Visser <martijnvis...@apache.org>
> wrote:
>
>> Hi Robert,
>>
>> Based on
>> https://stackoverflow.com/questions/72870074/apache-flink-restoring-state-from-checkpoint-with-changes-kafka-topic
>> I think you'll need to change the UID for your KafkaSource and restart your
>> job with allowNonRestoredState enabled.
>>
>> Best regards,
>>
>> Martijn
>>
>> On Tue, Oct 4, 2022 at 12:40 PM Robert Cullen <cinquate...@gmail.com>
>> wrote:
>>
>>> We've changed the KafkaSource to ingest from a new topic but the old
>>> name is still being referenced:
>>>
>>> 2022-10-04 07:03:41org.apache.flink.util.FlinkException: Global failure 
>>> triggered by OperatorCoordinator for 'Source: Grokfailures' (operator 
>>> feca28aff5a3958840bee985ee7de4d3).    at 
>>> org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder$LazyInitializedCoordinatorContext.failJob(OperatorCoordinatorHolder.java:553)
>>>       at 
>>> org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$QuiesceableContext.failJob(RecreateOnResetOperatorCoordinator.java:223)
>>>    at 
>>> org.apache.flink.runtime.source.coordinator.SourceCoordinatorContext.failJob(SourceCoordinatorContext.java:285)
>>>       at 
>>> org.apache.flink.runtime.source.coordinator.SourceCoordinatorContext.handleUncaughtExceptionFromAsyncCall(SourceCoordinatorContext.java:298)
>>>  at 
>>> org.apache.flink.util.ThrowableCatchingRunnable.run(ThrowableCatchingRunnable.java:42)
>>>        at 
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>>       at 
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>>       at java.lang.Thread.run(Thread.java:748)Caused by: 
>>> org.apache.flink.util.FlinkRuntimeException: Failed to handle partition 
>>> splits change due to         at 
>>> org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.handlePartitionSplitChanges(KafkaSourceEnumerator.java:239)
>>>  at 
>>> org.apache.flink.runtime.source.coordinator.ExecutorNotifier.lambda$null$1(ExecutorNotifier.java:86)
>>>  at 
>>> org.apache.flink.util.ThrowableCatchingRunnable.run(ThrowableCatchingRunnable.java:40)
>>>        ... 3 moreCaused by: java.lang.RuntimeException: Failed to get topic 
>>> metadata.  at 
>>> org.apache.flink.connector.kafka.source.enumerator.subscriber.TopicListSubscriber.getPartitionChanges(TopicListSubscriber.java:59)
>>>    at 
>>> org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.discoverAndInitializePartitionSplit(KafkaSourceEnumerator.java:212)
>>>  at 
>>> org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.lambda$start$0(KafkaSourceEnumerator.java:158)
>>>       at 
>>> org.apache.flink.runtime.source.coordinator.ExecutorNotifier.lambda$notifyReadyAsync$2(ExecutorNotifier.java:83)
>>>      at 
>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)     
>>>  at java.util.concurrent.FutureTask.run(FutureTask.java:266)     at 
>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>>>         at 
>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>>>        ... 3 moreCaused by: java.util.concurrent.ExecutionException: 
>>> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This 
>>> server does not host this topic-partition.  at 
>>> org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
>>>       at 
>>> org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
>>>         at 
>>> org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
>>>         at 
>>> org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
>>>       at 
>>> org.apache.flink.connector.kafka.source.enumerator.subscriber.TopicListSubscriber.getPartitionChanges(TopicListSubscriber.java:57)
>>>    ... 10 moreCaused by: 
>>> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This 
>>> server does not host this topic-partition.
>>>
>>>
>>> --
>>> Robert Cullen
>>> 240-475-4490
>>>
>>

Reply via email to