Hi Maximilian,
yes, I've set the --runner to FlinkRunner when launching the pipeline and
it does work for a GCS sink, but it seems to be ignored for a BigQuery sink
somehow. Even though it looks like the system magically handles it itself.
This is the full command line to launch the Beam 2.9.0 pipeline on Flink
1.5.5:
bin/flink run -d -c di.beam.KafkaToBigQuery -j lib/beam_pipelines.jar
--runner=FlinkRunner --appName=ratings --checkpointingMode=EXACTLY_ONCE
--checkpointingInterval=300000 --parallelism=1
--tempLocation=gs://somebucket
Here are the logs from the taskmanager, I can share the full code of the
pipeline if you want:
2019-01-28 14:33:31,287 WARN org.apache.flink.metrics.MetricGroup
- The operator name
BigQueryIO.Write/BatchLoads/View.AsSingleton/Combine.GloballyAsSingletonView/DropInputs/ParMultiDo(NoOp)
exceeded the 80 characters length limit and was truncated.
2019-01-28 14:33:31,911 WARN org.apache.flink.metrics.MetricGroup
- The operator name
BigQueryIO.Write/BatchLoads/SinglePartitionWriteTables/ParDo(GarbageCollectTemporaryFiles)/ParMultiDo(GarbageCollectTemporaryFiles)
exceeded the 80 characters length limit and was truncated.
2019-01-28 14:33:31,976 WARN org.apache.flink.metrics.MetricGroup
- The operator name
BigQueryIO.Write/BatchLoads/MultiPartitionsWriteTables/ParDo(GarbageCollectTemporaryFiles)/ParMultiDo(GarbageCollectTemporaryFiles)
exceeded the 80 characters length limit and was truncated.
2019-01-28 14:33:32,217 WARN org.apache.flink.metrics.MetricGroup
- The operator name
BigQueryIO.Write/BatchLoads/SinglePartitionWriteTables/Values/Values/Map/ParMultiDo(Anonymous)
exceeded the 80 characters length limit and was truncated.
2019-01-28 14:33:32,227 WARN org.apache.flink.metrics.MetricGroup
- The operator name
BigQueryIO.Write/BatchLoads/View.AsSingleton/Combine.GloballyAsSingletonView/Combine.globally(Singleton)/WithKeys/AddKeys/Map/ParMultiDo(Anonymous)
exceeded the 80 characters length limit and was truncated.
2019-01-28 14:33:32,228 WARN org.apache.flink.metrics.MetricGroup
- The operator name
BigQueryIO.Write/BatchLoads/SinglePartitionsReshuffle/RestoreOriginalTimestamps/Reify.ExtractTimestampsFromValues/ParDo(Anonymous)/ParMultiDo(Anonymous)
exceeded the 80 characters length limit and was truncated.
2019-01-28 14:33:32,276 WARN org.apache.flink.metrics.MetricGroup
- The operator name
BigQueryIO.Write/BatchLoads/MultiPartitionsWriteTables/Values/Values/Map/ParMultiDo(Anonymous)
exceeded the 80 characters length limit and was truncated.
2019-01-28 14:33:32,282 INFO
org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper
- No restore state for UnbounedSourceWrapper.
2019-01-28 14:33:32,288 INFO
org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper
- Unbounded Flink Source 0/1 is reading from sources:
[org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource$BoundedToUnboundedSourceAdapter@5fecdf95
]
2019-01-28 14:33:32,296 INFO
org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper
- No restore state for UnbounedSourceWrapper.
2019-01-28 14:33:32,318 WARN org.apache.flink.metrics.MetricGroup
- The operator name
BigQueryIO.Write/BatchLoads/TempFilePrefixView/Combine.GloballyAsSingletonView/View.CreatePCollectionView/Combine.globally(Concatenate)/WithKeys/AddKeys/Map/ParMultiDo(Anonymous)
exceeded the 80 characters length limit and was truncated.
2019-01-28 14:33:32,321 WARN org.apache.flink.metrics.MetricGroup
- The operator name
BigQueryIO.Write/BatchLoads/SinglePartitionsReshuffle/RestoreOriginalTimestamps/ReifyTimestamps.RemoveWildcard/ParDo(Anonymous)/ParMultiDo(Anonymous)
exceeded the 80 characters length limit and was truncated.
2019-01-28 14:33:32,324 WARN org.apache.flink.metrics.MetricGroup
- The operator name
BigQueryIO.Write/BatchLoads/TempFilePrefixView/Combine.GloballyAsSingletonView/View.CreatePCollectionView/Combine.globally(Concatenate)/Values/Values/Map/ParMultiDo(Anonymous)
exceeded the 80 characters length limit and was truncated.
2019-01-28 14:33:32,329 WARN org.apache.flink.metrics.MetricGroup
- The operator name
BigQueryIO.Write/BatchLoads/TempFilePrefixView/Combine.GloballyAsSingletonView/View.CreatePCollectionView/Combine.globally(Concatenate)/Combine.perKey(Concatenate)
exceeded the 80 characters length limit and was truncated.
2019-01-28 14:33:32,357 WARN org.apache.flink.metrics.MetricGroup
- The operator name
BigQueryIO.Write/BatchLoads/TempFilePrefixView/Combine.GloballyAsSingletonView/View.VoidKeyToMultimapMaterialization/ParDo(VoidKeyToMultimapMaterialization)/ParMultiDo(VoidKeyToMultimapMaterialization)
exceeded the 80 characters length limit and was truncated.
2019-01-28 14:33:32,482 WARN org.apache.flink.metrics.MetricGroup
- The operator name
BigQueryIO.Write/BatchLoads/SinglePartitionsReshuffle/ExpandIterable/ParMultiDo(Anonymous)
exceeded the 80 characters length limit and was truncated.
2019-01-28 14:33:32,483 WARN org.apache.flink.metrics.MetricGroup
- The operator name
BigQueryIO.Write/BatchLoads/TempFilePrefixView/Combine.GloballyAsSingletonView/Combine.globally(Singleton)/Values/Values/Map/ParMultiDo(Anonymous)
exceeded the 80 characters length limit and was truncated.
2019-01-28 14:33:32,493 WARN org.apache.flink.metrics.MetricGroup
- The operator name
BigQueryIO.Write/BatchLoads/TempFilePrefixView/Combine.GloballyAsSingletonView/Combine.globally(Singleton)/Combine.perKey(Singleton)
exceeded the 80 characters length limit and was truncated.
2019-01-28 14:33:32,697 WARN org.apache.flink.metrics.MetricGroup
- The operator name
BigQueryIO.Write/BatchLoads/WriteGroupedRecords/ParMultiDo(WriteGroupedRecordsToFiles)
exceeded the 80 characters length limit and was truncated.
2019-01-28 14:33:32,782 WARN org.apache.flink.metrics.MetricGroup
- The operator name
BigQueryIO.Write/BatchLoads/View.AsSingleton/Combine.GloballyAsSingletonView/View.CreatePCollectionView/Combine.globally(Concatenate)/Values/Values/Map/ParMultiDo(Anonymous)
exceeded the 80 characters length limit and was truncated.
2019-01-28 14:33:32,789 WARN org.apache.flink.metrics.MetricGroup
- The operator name
BigQueryIO.Write/BatchLoads/View.AsSingleton/Combine.GloballyAsSingletonView/View.CreatePCollectionView/Combine.globally(Concatenate)/Combine.perKey(Concatenate)
exceeded the 80 characters length limit and was truncated.
2019-01-28 14:33:33,093 WARN org.apache.flink.metrics.MetricGroup
- The operator name
BigQueryIO.Write/BatchLoads/View.AsSingleton/Combine.GloballyAsSingletonView/View.CreatePCollectionView/Combine.globally(Concatenate)/WithKeys/AddKeys/Map/ParMultiDo(Anonymous)
exceeded the 80 characters length limit and was truncated.
2019-01-28 14:33:33,122 WARN org.apache.flink.metrics.MetricGroup
- The operator name
BigQueryIO.Write/BatchLoads/View.AsSingleton/Combine.GloballyAsSingletonView/View.VoidKeyToMultimapMaterialization/ParDo(VoidKeyToMultimapMaterialization)/ParMultiDo(VoidKeyToMultimapMaterialization)
exceeded the 80 characters length limit and was truncated.
2019-01-28 14:33:33,162 INFO
org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper
- No restore state for UnbounedSourceWrapper.
2019-01-28 14:33:33,179 WARN org.apache.flink.metrics.MetricGroup
- The operator name
BigQueryIO.Write/BatchLoads/MultiPartitionsReshuffle/RestoreOriginalTimestamps/Reify.ExtractTimestampsFromValues/ParDo(Anonymous)/ParMultiDo(Anonymous)
exceeded the 80 characters length limit and was truncated.
2019-01-28 14:33:33,187 WARN org.apache.flink.metrics.MetricGroup
- The operator name
BigQueryIO.Write/BatchLoads/MultiPartitionsReshuffle/RestoreOriginalTimestamps/ReifyTimestamps.RemoveWildcard/ParDo(Anonymous)/ParMultiDo(Anonymous)
exceeded the 80 characters length limit and was truncated.
2019-01-28 14:33:33,192 WARN org.apache.flink.metrics.MetricGroup
- The operator name
BigQueryIO.Write/BatchLoads/MultiPartitionsReshuffle/ExpandIterable/ParMultiDo(Anonymous)
exceeded the 80 characters length limit and was truncated.
2019-01-28 14:33:33,218 WARN org.apache.flink.metrics.MetricGroup
- The operator name
BigQueryIO.Write/BatchLoads/View.AsSingleton/Combine.GloballyAsSingletonView/Combine.globally(Singleton)/Values/Values/Map/ParMultiDo(Anonymous)
exceeded the 80 characters length limit and was truncated.
2019-01-28 14:33:33,220 WARN org.apache.flink.metrics.MetricGroup
- The operator name
BigQueryIO.Write/BatchLoads/View.AsSingleton/Combine.GloballyAsSingletonView/Combine.globally(Singleton)/Combine.perKey(Singleton)
exceeded the 80 characters length limit and was truncated.
2019-01-28 14:33:33,298 WARN org.apache.flink.metrics.MetricGroup
- The operator name
BigQueryIO.Write/BatchLoads/SinglePartitionWriteTables/Window.Into()/Window.Assign.out
exceeded the 80 characters length limit and was truncated.
2019-01-28 14:33:33,304 WARN org.apache.flink.metrics.MetricGroup
- The operator name
BigQueryIO.Write/BatchLoads/MultiPartitionsWriteTables/Window.Into()/Window.Assign.out
exceeded the 80 characters length limit and was truncated.
2019-01-28 14:33:33,323 WARN org.apache.flink.metrics.MetricGroup
- The operator name
BigQueryIO.Write/BatchLoads/MultiPartitionsWriteTables/WithKeys/AddKeys/Map/ParMultiDo(Anonymous)
exceeded the 80 characters length limit and was truncated.
2019-01-28 14:33:33,326 WARN org.apache.flink.metrics.MetricGroup
- The operator name
BigQueryIO.Write/BatchLoads/MultiPartitionsReshuffle/ReifyOriginalTimestamps/ParDo(Anonymous)/ParMultiDo(Anonymous)
exceeded the 80 characters length limit and was truncated.
2019-01-28 14:33:33,357 WARN org.apache.flink.metrics.MetricGroup
- The operator name
BigQueryIO.Write/BatchLoads/SinglePartitionWriteTables/WithKeys/AddKeys/Map/ParMultiDo(Anonymous)
exceeded the 80 characters length limit and was truncated.
2019-01-28 14:33:33,377 WARN org.apache.flink.metrics.MetricGroup
- The operator name
BigQueryIO.Write/BatchLoads/MultiPartitionsReshuffle/Window.Into()/Window.Assign.out
exceeded the 80 characters length limit and was truncated.
2019-01-28 14:33:33,395 WARN org.apache.flink.metrics.MetricGroup
- The operator name
BigQueryIO.Write/BatchLoads/TempFilePrefixView/Combine.GloballyAsSingletonView/Combine.globally(Singleton)/WithKeys/AddKeys/Map/ParMultiDo(Anonymous)
exceeded the 80 characters length limit and was truncated.
2019-01-28 14:33:33,477 WARN org.apache.flink.metrics.MetricGroup
- The operator name
BigQueryIO.Write/BatchLoads/SinglePartitionsReshuffle/ReifyOriginalTimestamps/ParDo(Anonymous)/ParMultiDo(Anonymous)
exceeded the 80 characters length limit and was truncated.
2019-01-28 14:33:33,487 WARN org.apache.flink.metrics.MetricGroup
- The operator name
BigQueryIO.Write/BatchLoads/SinglePartitionsReshuffle/Window.Into()/Window.Assign.out
exceeded the 80 characters length limit and was truncated.
2019-01-28 14:33:33,748 INFO
org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper
- No restore state for UnbounedSourceWrapper.
2019-01-28 14:33:34,577 INFO
org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper
- Unbounded Flink Source 0/1 is reading from sources:
[org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource$BoundedToUnboundedSourceAdapter@1892a35d
]
2019-01-28 14:33:34,610 INFO
org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper
- Unbounded Flink Source 0/1 is reading from sources:
[org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource$BoundedToUnboundedSourceAdapter@7adcb05b
]
2019-01-28 14:33:34,747 INFO
org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper
- Unbounded Flink Source 0/1 is reading from sources:
[org.apache.beam.sdk.io.kafka.KafkaUnboundedSource@71389814]
2019-01-28 14:33:34,896 WARN
org.apache.kafka.clients.consumer.ConsumerConfig - The
configuration 'metis.input.messages.config' was supplied but isn't a known
config.
2019-01-28 14:33:35,462 INFO
org.apache.beam.sdk.io.gcp.bigquery.BatchLoads - Writing
BigQuery temporary files to
gs://tempspace_eu_regional/dataflow_dev/BigQueryWriteTemp/beam_load_ratingsflink01281433342cd8f9b4_7ce229196b1d41e3ad63d0ef6234e0f6/
before loading them.
2019-01-28 14:33:35,544 INFO
org.apache.beam.sdk.io.kafka.KafkaUnboundedSource - Reader-0:
reading from ratings-0 starting at offset 2945
2019-01-28 14:33:35,544 INFO
org.apache.beam.sdk.io.kafka.KafkaUnboundedSource - Reader-0:
reading from ratings-1 starting at offset 3101
2019-01-28 14:33:35,544 INFO
org.apache.beam.sdk.io.kafka.KafkaUnboundedSource - Reader-0:
reading from ratings-2 starting at offset 3031
2019-01-28 14:33:35,545 INFO
org.apache.beam.sdk.io.kafka.KafkaUnboundedSource - Reader-0:
reading from ratings-3 starting at offset 3009
2019-01-28 14:33:35,545 INFO
org.apache.beam.sdk.io.kafka.KafkaUnboundedSource - Reader-0:
reading from ratings-4 starting at offset 2903
2019-01-28 14:33:35,545 INFO
org.apache.beam.sdk.io.kafka.KafkaUnboundedSource - Reader-0:
reading from ratings-5 starting at offset 3068
2019-01-28 14:33:35,545 INFO
org.apache.beam.sdk.io.kafka.KafkaUnboundedSource - Reader-0:
reading from ratings-6 starting at offset 3160
2019-01-28 14:33:35,545 INFO
org.apache.beam.sdk.io.kafka.KafkaUnboundedSource - Reader-0:
reading from ratings-7 starting at offset 3014
2019-01-28 14:33:35,546 INFO
org.apache.beam.sdk.io.kafka.KafkaUnboundedSource - Reader-0:
reading from ratings-8 starting at offset 3096
2019-01-28 14:33:35,546 INFO
org.apache.beam.sdk.io.kafka.KafkaUnboundedSource - Reader-0:
reading from ratings-9 starting at offset 2885
2019-01-28 14:33:35,577 WARN
org.apache.kafka.clients.consumer.ConsumerConfig - The
configuration 'metis.input.messages.config' was supplied but isn't a known
config.
2019-01-28 14:33:35,801 INFO
org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers - Waiting for
jobs to complete.
2019-01-28 14:33:35,803 INFO
org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers - Waiting for
jobs to complete.
2019-01-28 14:33:35,801 INFO
org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers - Waiting for
jobs to complete.
2019-01-28 14:33:36,217 INFO
org.apache.beam.sdk.io.kafka.KafkaUnboundedSource - Reader-0:
first record offset 3014
Best,
Tobi
On Mon, Jan 28, 2019 at 11:52 AM Maximilian Michels <[email protected]> wrote:
> Hi Tobias,
>
> Checkpointing should be enabled when you set it in the Flink config or via
> the
> Beam option `checkpointingInterval`. Did you set `runner` to `FlinkRunner`?
>
> If possible, could you share parts of the Flink logs?
>
> Thanks,
> Max
>
> On 25.01.19 15:14, Kaymak, Tobias wrote:
> > Hi,
> >
> > I am trying to migrate my existing KafkaToGCS pipeline to a
> KafkaToBigQuery
> > pipeline to skip the loading step from GCS which is currently handled
> externally
> > from Beam.
> >
> > I noticed that the pipeline, written in Beam 2.9.0 (Java) does not
> trigger any
> > checkpoint on Flink (1.5.5), even though its configured to do so when I
> launch
> > it. Is this normal? How does Beam then guarantee exactly once when there
> are no
> > checkpoints in Flink? (It seems to start from scratch when it crashes,
> during my
> > tests, but I am not 100% sure)
> >
> >
> > This is my pipeline:
> >
> > pipeline
> > .apply(
> > KafkaIO.<String, String>read()
> > .withBootstrapServers(bootstrap)
> > .withTopics(topics)
> > .withKeyDeserializer(StringDeserializer.class)
> > .withValueDeserializer(ConfigurableDeserializer.class)
> > .updateConsumerProperties(
> >
> ImmutableMap.of(InputMessagesConfig.CONFIG_PROPERTY_NAME,
> > inputMessagesConfig))
> >
> .updateConsumerProperties(ImmutableMap.of("auto.offset.reset",
> > "earliest"))
> > .updateConsumerProperties(ImmutableMap.of("group.id
> > <http://group.id>", groupId))
> >
> .updateConsumerProperties(ImmutableMap.of("enable.auto.commit",
> > "true"))
> > .withReadCommitted()
> > .withTimestampPolicyFactory(withEventTs)
> > .commitOffsetsInFinalize())
> > .apply(ParDo.of(new ToEventFn()))
> > .apply(
> > Window.into(new ZurichTimePartitioningWindowFn())
> >
> > .triggering(
> > Repeatedly.forever(
> > AfterFirst.of(
> > AfterPane.elementCountAtLeast(bundleSize),
> > AfterProcessingTime.pastFirstElementInPane()
> > .plusDelayOf(refreshFrequency))))
> > .withAllowedLateness(Duration.standardDays(14))
> > .discardingFiredPanes())
> > .apply(
> > BigQueryIO.<Event>write()
> > .withMethod(BigQueryIO.Write.Method.FILE_LOADS)
> > .withTriggeringFrequency(refreshFrequency)
> > .withNumFileShards(1)
> > .to(partitionedTableDynamicDestinations)
> > .withFormatFunction(
> > (SerializableFunction<Event, TableRow>)
> > KafkaToBigQuery::convertUserEventToTableRow)
> >
> >
> .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
> >
> > .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
> >
> > pipeline.run().waitUntilFinish();
> > It's launched like the other (GCS) one via:
> >
> > ...--checkpointingMode=EXACTLY_ONCE --checkpointingInterval=300000
> > --parallelism=1 --tempLocation=gs://foo..
> >
> > Any idea why checkpointing does not work here?
> >
> > Best,
> > Tobias
>
--
Tobias Kaymak
Data Engineer
[email protected]
www.ricardo.ch