Hi Tobias,
Checkpointing should be enabled when you set it in the Flink config or via the
Beam option `checkpointingInterval`. Did you set `runner` to `FlinkRunner`?
If possible, could you share parts of the Flink logs?
Thanks,
Max
On 25.01.19 15:14, Kaymak, Tobias wrote:
Hi,
I am trying to migrate my existing KafkaToGCS pipeline to a KafkaToBigQuery
pipeline to skip the loading step from GCS which is currently handled externally
from Beam.
I noticed that the pipeline, written in Beam 2.9.0 (Java) does not trigger any
checkpoint on Flink (1.5.5), even though its configured to do so when I launch
it. Is this normal? How does Beam then guarantee exactly once when there are no
checkpoints in Flink? (It seems to start from scratch when it crashes, during my
tests, but I am not 100% sure)
This is my pipeline:
pipeline
.apply(
KafkaIO.<String, String>read()
.withBootstrapServers(bootstrap)
.withTopics(topics)
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializer(ConfigurableDeserializer.class)
.updateConsumerProperties(
ImmutableMap.of(InputMessagesConfig.CONFIG_PROPERTY_NAME,
inputMessagesConfig))
.updateConsumerProperties(ImmutableMap.of("auto.offset.reset",
"earliest"))
.updateConsumerProperties(ImmutableMap.of("group.id
<http://group.id>", groupId))
.updateConsumerProperties(ImmutableMap.of("enable.auto.commit",
"true"))
.withReadCommitted()
.withTimestampPolicyFactory(withEventTs)
.commitOffsetsInFinalize())
.apply(ParDo.of(new ToEventFn()))
.apply(
Window.into(new ZurichTimePartitioningWindowFn())
.triggering(
Repeatedly.forever(
AfterFirst.of(
AfterPane.elementCountAtLeast(bundleSize),
AfterProcessingTime.pastFirstElementInPane()
.plusDelayOf(refreshFrequency))))
.withAllowedLateness(Duration.standardDays(14))
.discardingFiredPanes())
.apply(
BigQueryIO.<Event>write()
.withMethod(BigQueryIO.Write.Method.FILE_LOADS)
.withTriggeringFrequency(refreshFrequency)
.withNumFileShards(1)
.to(partitionedTableDynamicDestinations)
.withFormatFunction(
(SerializableFunction<Event, TableRow>)
KafkaToBigQuery::convertUserEventToTableRow)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
pipeline.run().waitUntilFinish();
It's launched like the other (GCS) one via:
...--checkpointingMode=EXACTLY_ONCE --checkpointingInterval=300000
--parallelism=1 --tempLocation=gs://foo..
Any idea why checkpointing does not work here?
Best,
Tobias