Thank for creating a Jira. I know this is not exactly what you are looking for, but did you try to use “commitOffsetsInFinalize()”? It does not provide exactly-once guarantee for consumption but it minimises the gap between new and already processed records in case of pipeline restart. Perhaps, it could be helpful.
> On 10 Dec 2019, at 03:19, 郑 洁锋 <[email protected]> wrote: > > Hi, > Thank you very much for your reply, I have added jira, the url is: > https://issues.apache.org/jira/browse/BEAM-8934 > <https://issues.apache.org/jira/browse/BEAM-8934> > > This is the first time I have created jira, and I apologize if there > is a problem. Please tell me and i will make changes. > > [email protected] <mailto:[email protected]> > > From: Pablo Estrada <mailto:[email protected]> > Date: 2019-12-10 00:49 > To: user <mailto:[email protected]> > Subject: Re: Re: How to store&read offset with kafkaio > So if I understand correctly, you specify a map<partition, offset>, and pass > it to the IO to start from there, right? > I don't think our KafkaIO supports that ATM[1], but it sounds like a > desirable feature. > > Would you please create a JIRA to track adding that? > > [1] > https://beam.apache.org/releases/javadoc/2.16.0/org/apache/beam/sdk/io/kafka/KafkaIO.Read.html > > <https://beam.apache.org/releases/javadoc/2.16.0/org/apache/beam/sdk/io/kafka/KafkaIO.Read.html> > On Sun, Dec 8, 2019 at 6:17 PM 郑 洁锋 <[email protected] > <mailto:[email protected]>> wrote: > Hi, > The main reason I need this feature is to support exactly-once > semantics in spark streaming: > <Catch4F3E(12-10-10-16-18).jpg> > > > According to our existing business, we choose to use the following > method to implement (also in the spark streaming official document): > <CatchBC9F(12-10-10-16-18).jpg> > The complete document url is: > > http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html > <http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html> > > [email protected] <mailto:[email protected]> > > From: Chamikara Jayalath <mailto:[email protected]> > Date: 2019-12-05 03:17 > To: user <mailto:[email protected]> > Subject: Re: How to store&read offset with kafkaio > I assume you meant Kafka offset - > https://kafka.apache.org/documentation/#intro_topics > <https://kafka.apache.org/documentation/#intro_topics> > > Currently I don't think this is possible due to two reasons. > > (1) Currently Kafka source can either read from a given topic or a set of > topic partitions, but not from a given offset - > https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L523 > > <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L523> > (2) Currently source has to be at the top of the pipeline graph to operate. > For example, you cannot initiate a Kafka source from a topic/partition read > from a database. > > (2) should be possible when we have our next generation source framework, > SplittableDoFn. May be when we have that we can consider adding (1) as well, > if there are good justifications for that. I think the policy regarding the > offset to start reading from is configured in the Kafka cluster and > specifying a specific offset will not work if the corresponding messages have > been purged by the Kafka cluster, so I'm not sure how useful adding support > for reading from a given offset will be. > > Thanks, > Cham > >
