Hi Max, Thanks a lot for the clarification!
Best Eleanore On Wed, Mar 11, 2020 at 11:32 AM Maximilian Michels <[email protected]> wrote: > Please see my answers inline. > > -Max > > On 10.03.20 05:02, Eleanore Jin wrote: > > Hi Max, > > > > Thanks for the response! the reason to setup the state backend is to > > experiment Kafka EOS with Beam running on Flink. Reading through the > > code and this PR <https://github.com/apache/beam/pull/7991/files>, can > > you please help me clarify my understanding? > > > > 1. Beam uses KafkaExactlyOnceSink to publish messages to achieve > > EOS, ExactlyOnceWriter processElement method is annotated > > with @RequiresStableInput, so all the messages will be cached > > by KeyedBufferingElementsHandler, only after checkpoint succeeds, those > > messages will be processed by ExactlyOnceWriter? > > That's correct. > > > > > 2. Upon checkpoint, will those messages cached by > > KeyedBufferingEleementsHandler also checkpointed? > > Yes, the buffered elements will be checkpointed. > > > 3. It seems the way Beam provides Kafka EOS will introduce delays in the > > stream processing, the delay is based on the checkpoint interval? How to > > reduce the latency while still have EOS guarantee? > > Indeed, the checkpoint interval and the checkpoint duration limits the > latency. Given the current design and the guarantees, there is no other > way to influence the latency. > > > 4. commitOffsetsInFinalize is also enabled, does this mean, upon > > checkpoint successfully, the checkpointed offset will be committed back > > to kafka, but if this operation does not finish successfully, and then > > the job gets cancelled/stopped, and re-submit the job again (with the > > same consumer group for source topics, but different jobID), then it is > > possible duplicated processing still exists? because the consumed offset > > is not committed back to kafka? > > This option is for the Kafka consumer. AFAIK this is just a convenience > method to commit the latest checkpointed offset to Kafka. This offset is > not used when restoring from a checkpoint. However, if you don't restore > from a checkpoint, you can resume from that offset which might be > convenient or not, depending on your use case. > > > > > Thanks a lot! > > Eleanore > > > > On Thu, Mar 5, 2020 at 12:46 AM Maximilian Michels <[email protected] > > <mailto:[email protected]>> wrote: > > > > Hi Eleanore, > > > > Good question. I think the easiest way is to configure this in the > > Flink > > configuration file, i.e. flink-conf.yaml. Then you don't need to set > > anything in Beam. > > > > If you want to go with your approach, then just use > > getClass().getClassLoader() unless you have some custom classloader > for > > loading your state backend. > > > > Cheers, > > Max > > > > On 04.03.20 01:39, Jin Yi wrote: > > > Hi Experts, > > > > > > I am running Beam application with Flink Runner. I would like to > set > > > State Backend to be FsStateBackend instead of MemoryStateBackend. > > > > > > in FlinkPipelineOptions.java > > > > > > I should be able to call setStateBackendFactory(), but I did not > find > > > any provided implementations for FlinkStateBackendFactory > > interface, so > > > that means I have to implement my own? > > > > > > Thanks a lot! > > > Eleanore > > > > > > /** > > > * State backend to store Beam's state during computation. Note: > Only > > > applicable when executing in > > > * streaming mode. > > > */ > > > @Description( > > > "Sets the state backend factory to use in streaming mode. " > > > +"Defaults to the flink cluster's state.backend configuration.") > > > Class<?extends FlinkStateBackendFactory> getStateBackendFactory(); > > > > > > void setStateBackendFactory(Class<?extends > > FlinkStateBackendFactory> stateBackendFactory); > > > > > >
