Thank you for verifying this. This is manifested in https://jira.apache.org/jira/browse/BEAM-5386 and has indeed been fixed already for 2.10.0.

This likely warrants a 2.9.1 release. I'll check on the dev mailing list.

Thanks,
Max

On 30.01.19 10:27, Kaymak, Tobias wrote:
Hi Maximilian,

I can confirm that checkpoints work with Beam 2.10-SNAPSHOT and do not work with version 2.9. I am very sure it is related to this issue: https://issues.apache.org/jira/browse/FLINK-2491 - which has been fixed in 2.10, since parts of the pipeline are FINISHED after a couple of minutes and this then triggers the shutdown of the checkpoints. However, executing the pipeline on a Flink 1.5.5 cluster yields no metrics about the elements processed in the webinterface anymore:

2019-01-30 09:14:53,934 WARN  org.apache.beam.sdk.metrics.MetricsEnvironment - Reporting metrics are not supported in the current execution environment.

Is this a known issue? I want to change my Flink version to 1.6 to see if this is fixed, but I am unsure at this point how to achieve this. Is it something I can pass in my pom.xml?




image.png

Best,
Tobi



On Tue, Jan 29, 2019 at 4:27 PM Maximilian Michels <[email protected] <mailto:[email protected]>> wrote:

    Hi Tobias,

    It is normal to see "No restore state for UnbounedSourceWrapper" when not
    restoring from a checkpoint/savepoint.

    Just checking. You mentioned you set the checkpoint interval via:
    --checkpointingInterval=300000

    That means you have to wait 5 minutes until the first checkpoint will be 
taken.
    You should be seeing an INFO message like this: "INFO: Triggering checkpoint
    1 @
    1548775459114 for job 3b5bdb811f1923bf49db24403e9c1ae9."

    Thanks,
    Max

    On 29.01.19 16:13, Kaymak, Tobias wrote:
     > Even after altering the pipeline and making it way more simple it still
    does not
     > checkpoint. (I used a single KafkaTopic as a source and altered the IO
    step the
     > following way:
     >
     >       .apply(
     >              BigQueryIO.<Event>write()
     >                  .withMethod(BigQueryIO.Write.Method.FILE_LOADS)
     >                  .withTriggeringFrequency(refreshFrequency)
     >                  .withNumFileShards(1)
     >                  .to(projectId + ":" + dataset + "." + tableName)
     >                  .withTimePartitioning(new
     > TimePartitioning().setField("event_date"))
     >                  .withSchema(tableSchema)
     >                  .withFormatFunction(
     >                      (SerializableFunction<Event, TableRow>)
     >                          KafkaToBigQuery::convertUserEventToTableRow)
     >
     > 
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
     >
     > .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
     >
     > The graph that Flink 1.5.5 generated looked exactly the same and
    checkpointing
     > did not work still:
     > image.png
     >
     > On Tue, Jan 29, 2019 at 11:05 AM Kaymak, Tobias <[email protected]
    <mailto:[email protected]>
     > <mailto:[email protected] <mailto:[email protected]>>> 
wrote:
     >
     >     If I have a pipeline running and I restart the taskmanager on which 
it's
     >     executing the log shows - I find the "No restore state for
     >     UnbounedSourceWrapper." interesting, as it seems to indicate that the
     >     pipeline never stored a state in the first place?
     >
     >     Starting taskexecutor as a console application on host
     >     flink-taskmanager-5d85dd6854-pm5bl.
     >     2019-01-29 09:20:48,706 WARN  org.apache.hadoop.util.NativeCodeLoader
     >                     - Unable to load native-hadoop library for your
    platform...
     >     using builtin-java classes where applicable
     >     2019-01-29 09:20:51,253 WARN
     >     org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - 
SASL
     >     configuration failed: javax.security.auth.login.LoginException: No 
JAAS
     >     configuration section named 'Client' was found in specified JAAS
     >     configuration file: '/tmp/jaas-7768141350028767113.conf'. Will 
continue
     >     connection to Zookeeper server without SASL authentication, if 
Zookeeper
     >     server allows it.
     >     2019-01-29 09:20:51,281 ERROR
     >     org.apache.flink.shaded.curator.org.apache.curator.ConnectionState  -
     >     Authentication failed
     >     2019-01-29 09:21:53,814 WARN  org.apache.flink.metrics.MetricGroup
     >                      - The operator name
>  BigQueryIO.Write/BatchLoads/View.AsSingleton/Combine.GloballyAsSingletonView/DropInputs/ParMultiDo(NoOp)
     >     exceeded the 80 characters length limit and was truncated.
     >     2019-01-29 09:21:53,828 WARN  org.apache.flink.metrics.MetricGroup
     >                      - The operator name
>  BigQueryIO.Write/BatchLoads/MultiPartitionsWriteTables/Window.Into()/Window.Assign.out
     >     exceeded the 80 characters length limit and was truncated.
     >     2019-01-29 09:21:53,834 WARN  org.apache.flink.metrics.MetricGroup
     >                      - The operator name
>  BigQueryIO.Write/BatchLoads/SinglePartitionWriteTables/Window.Into()/Window.Assign.out
     >     exceeded the 80 characters length limit and was truncated.
     >     2019-01-29 09:21:53,917 INFO
     >     org.apache.beam.runners.flink.translation.wrappers.streaming.io
    <http://streaming.io>.UnboundedSourceWrapper
     >     - No restore state for UnbounedSourceWrapper.
     >     2019-01-29 09:21:53,929 WARN  org.apache.flink.metrics.MetricGroup
     >                      - The operator name
>  BigQueryIO.Write/BatchLoads/MultiPartitionsWriteTables/ParDo(GarbageCollectTemporaryFiles)/ParMultiDo(GarbageCollectTemporaryFiles)
     >     exceeded the 80 characters length limit and was truncated.
     >     2019-01-29 09:21:53,937 WARN  org.apache.flink.metrics.MetricGroup
     >                      - The operator name
>  BigQueryIO.Write/BatchLoads/MultiPartitionsReshuffle/ReifyOriginalTimestamps/ParDo(Anonymous)/ParMultiDo(Anonymous)
     >     exceeded the 80 characters length limit and was truncated.
     >     2019-01-29 09:21:53,978 INFO
     >     org.apache.beam.runners.flink.translation.wrappers.streaming.io
    <http://streaming.io>.UnboundedSourceWrapper
     >     - Unbounded Flink Source 0/1 is reading from sources:
>  [org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource$BoundedToUnboundedSourceAdapter@69908217]
     >     2019-01-29 09:21:54,002 WARN  org.apache.flink.metrics.MetricGroup
     >                      - The operator name
>  BigQueryIO.Write/BatchLoads/MultiPartitionsReshuffle/Window.Into()/Window.Assign.out
     >     exceeded the 80 characters length limit and was truncated.
     >     2019-01-29 09:21:54,008 WARN  org.apache.flink.metrics.MetricGroup
     >                      - The operator name
>  BigQueryIO.Write/BatchLoads/View.AsSingleton/Combine.GloballyAsSingletonView/View.CreatePCollectionView/Combine.globally(Concatenate)/WithKeys/AddKeys/Map/ParMultiDo(Anonymous)
     >     exceeded the 80 characters length limit and was truncated.
     >     2019-01-29 09:21:54,011 WARN  org.apache.flink.metrics.MetricGroup
     >                      - The operator name
>  BigQueryIO.Write/BatchLoads/TempFilePrefixView/Combine.GloballyAsSingletonView/View.CreatePCollectionView/Combine.globally(Concatenate)/WithKeys/AddKeys/Map/ParMultiDo(Anonymous)
     >     exceeded the 80 characters length limit and was truncated.
     >     2019-01-29 09:21:54,020 WARN  org.apache.flink.metrics.MetricGroup
     >                      - The operator name
>  BigQueryIO.Write/BatchLoads/SinglePartitionWriteTables/ParDo(GarbageCollectTemporaryFiles)/ParMultiDo(GarbageCollectTemporaryFiles)
     >     exceeded the 80 characters length limit and was truncated.
     >     2019-01-29 09:21:54,080 WARN  org.apache.flink.metrics.MetricGroup
     >                      - The operator name
>  BigQueryIO.Write/BatchLoads/TempFilePrefixView/Combine.GloballyAsSingletonView/View.CreatePCollectionView/Combine.globally(Concatenate)/Values/Values/Map/ParMultiDo(Anonymous)
     >     exceeded the 80 characters length limit and was truncated.
     >     2019-01-29 09:21:54,091 WARN  org.apache.flink.metrics.MetricGroup
     >                      - The operator name
>  BigQueryIO.Write/BatchLoads/MultiPartitionsReshuffle/RestoreOriginalTimestamps/Reify.ExtractTimestampsFromValues/ParDo(Anonymous)/ParMultiDo(Anonymous)
     >     exceeded the 80 characters length limit and was truncated.
     >     2019-01-29 09:21:54,099 WARN  org.apache.flink.metrics.MetricGroup
     >                      - The operator name
>  BigQueryIO.Write/BatchLoads/MultiPartitionsWriteTables/WithKeys/AddKeys/Map/ParMultiDo(Anonymous)
     >     exceeded the 80 characters length limit and was truncated.
     >     2019-01-29 09:21:54,107 WARN  org.apache.flink.metrics.MetricGroup
     >                      - The operator name
>  BigQueryIO.Write/BatchLoads/View.AsSingleton/Combine.GloballyAsSingletonView/Combine.globally(Singleton)/WithKeys/AddKeys/Map/ParMultiDo(Anonymous)
     >     exceeded the 80 characters length limit and was truncated.
     >     2019-01-29 09:21:54,109 WARN  org.apache.flink.metrics.MetricGroup
     >                      - The operator name
>  BigQueryIO.Write/BatchLoads/MultiPartitionsWriteTables/Values/Values/Map/ParMultiDo(Anonymous)
     >     exceeded the 80 characters length limit and was truncated.
     >     2019-01-29 09:21:54,119 WARN  org.apache.flink.metrics.MetricGroup
     >                      - The operator name
>  BigQueryIO.Write/BatchLoads/SinglePartitionWriteTables/Values/Values/Map/ParMultiDo(Anonymous)
     >     exceeded the 80 characters length limit and was truncated.
     >     2019-01-29 09:21:54,118 WARN  org.apache.flink.metrics.MetricGroup
     >                      - The operator name
>  BigQueryIO.Write/BatchLoads/TempFilePrefixView/Combine.GloballyAsSingletonView/View.VoidKeyToMultimapMaterialization/ParDo(VoidKeyToMultimapMaterialization)/ParMultiDo(VoidKeyToMultimapMaterialization)
     >     exceeded the 80 characters length limit and was truncated.
     >     2019-01-29 09:21:54,115 WARN  org.apache.flink.metrics.MetricGroup
     >                      - The operator name
>  BigQueryIO.Write/BatchLoads/SinglePartitionsReshuffle/ReifyOriginalTimestamps/ParDo(Anonymous)/ParMultiDo(Anonymous)
     >     exceeded the 80 characters length limit and was truncated.
     >     2019-01-29 09:21:54,114 WARN  org.apache.flink.metrics.MetricGroup
     >                      - The operator name
>  BigQueryIO.Write/BatchLoads/TempFilePrefixView/Combine.GloballyAsSingletonView/View.CreatePCollectionView/Combine.globally(Concatenate)/Combine.perKey(Concatenate)
     >     exceeded the 80 characters length limit and was truncated.
     >     2019-01-29 09:21:54,111 WARN  org.apache.flink.metrics.MetricGroup
     >                      - The operator name
>  BigQueryIO.Write/BatchLoads/SinglePartitionWriteTables/WithKeys/AddKeys/Map/ParMultiDo(Anonymous)
     >     exceeded the 80 characters length limit and was truncated.
     >     2019-01-29 09:21:54,111 WARN  org.apache.flink.metrics.MetricGroup
     >                      - The operator name
>  BigQueryIO.Write/BatchLoads/SinglePartitionsReshuffle/RestoreOriginalTimestamps/Reify.ExtractTimestampsFromValues/ParDo(Anonymous)/ParMultiDo(Anonymous)
     >     exceeded the 80 characters length limit and was truncated.
     >     2019-01-29 09:21:54,110 WARN  org.apache.flink.metrics.MetricGroup
     >                      - The operator name
>  BigQueryIO.Write/BatchLoads/View.AsSingleton/Combine.GloballyAsSingletonView/View.CreatePCollectionView/Combine.globally(Concatenate)/Values/Values/Map/ParMultiDo(Anonymous)
     >     exceeded the 80 characters length limit and was truncated.
     >     2019-01-29 09:21:54,110 WARN  org.apache.flink.metrics.MetricGroup
     >                      - The operator name
>  BigQueryIO.Write/BatchLoads/TempFilePrefixView/Combine.GloballyAsSingletonView/Combine.globally(Singleton)/WithKeys/AddKeys/Map/ParMultiDo(Anonymous)
     >     exceeded the 80 characters length limit and was truncated.
     >     2019-01-29 09:21:54,109 WARN  org.apache.flink.metrics.MetricGroup
     >                      - The operator name
>  BigQueryIO.Write/BatchLoads/View.AsSingleton/Combine.GloballyAsSingletonView/View.VoidKeyToMultimapMaterialization/ParDo(VoidKeyToMultimapMaterialization)/ParMultiDo(VoidKeyToMultimapMaterialization)
     >     exceeded the 80 characters length limit and was truncated.
     >     2019-01-29 09:21:54,144 WARN  org.apache.flink.metrics.MetricGroup
     >                      - The operator name
>  BigQueryIO.Write/BatchLoads/SinglePartitionsReshuffle/Window.Into()/Window.Assign.out
     >     exceeded the 80 characters length limit and was truncated.
     >     2019-01-29 09:21:54,172 WARN  org.apache.flink.metrics.MetricGroup
     >                      - The operator name
>  BigQueryIO.Write/BatchLoads/WriteGroupedRecords/ParMultiDo(WriteGroupedRecordsToFiles)
     >     exceeded the 80 characters length limit and was truncated.
     >     2019-01-29 09:21:54,176 WARN  org.apache.flink.metrics.MetricGroup
     >                      - The operator name
>  BigQueryIO.Write/BatchLoads/View.AsSingleton/Combine.GloballyAsSingletonView/Combine.globally(Singleton)/Values/Values/Map/ParMultiDo(Anonymous)
     >     exceeded the 80 characters length limit and was truncated.
     >     2019-01-29 09:21:54,179 WARN  org.apache.flink.metrics.MetricGroup
     >                      - The operator name
>  BigQueryIO.Write/BatchLoads/View.AsSingleton/Combine.GloballyAsSingletonView/View.CreatePCollectionView/Combine.globally(Concatenate)/Combine.perKey(Concatenate)
     >     exceeded the 80 characters length limit and was truncated.
     >     2019-01-29 09:21:54,189 WARN  org.apache.flink.metrics.MetricGroup
     >                      - The operator name
>  BigQueryIO.Write/BatchLoads/View.AsSingleton/Combine.GloballyAsSingletonView/Combine.globally(Singleton)/Combine.perKey(Singleton)
     >     exceeded the 80 characters length limit and was truncated.
     >     2019-01-29 09:21:54,191 WARN  org.apache.flink.metrics.MetricGroup
     >                      - The operator name
>  BigQueryIO.Write/BatchLoads/TempFilePrefixView/Combine.GloballyAsSingletonView/Combine.globally(Singleton)/Values/Values/Map/ParMultiDo(Anonymous)
     >     exceeded the 80 characters length limit and was truncated.
     >     2019-01-29 09:21:54,203 WARN  org.apache.flink.metrics.MetricGroup
     >                      - The operator name
>  BigQueryIO.Write/BatchLoads/SinglePartitionsReshuffle/RestoreOriginalTimestamps/ReifyTimestamps.RemoveWildcard/ParDo(Anonymous)/ParMultiDo(Anonymous)
     >     exceeded the 80 characters length limit and was truncated.
     >     2019-01-29 09:21:54,210 WARN  org.apache.flink.metrics.MetricGroup
     >                      - The operator name
>  BigQueryIO.Write/BatchLoads/TempFilePrefixView/Combine.GloballyAsSingletonView/Combine.globally(Singleton)/Combine.perKey(Singleton)
     >     exceeded the 80 characters length limit and was truncated.
     >     2019-01-29 09:21:54,217 WARN  org.apache.flink.metrics.MetricGroup
     >                      - The operator name
>  BigQueryIO.Write/BatchLoads/MultiPartitionsReshuffle/RestoreOriginalTimestamps/ReifyTimestamps.RemoveWildcard/ParDo(Anonymous)/ParMultiDo(Anonymous)
     >     exceeded the 80 characters length limit and was truncated.
     >     2019-01-29 09:21:54,238 WARN  org.apache.flink.metrics.MetricGroup
     >                      - The operator name
>  BigQueryIO.Write/BatchLoads/MultiPartitionsReshuffle/ExpandIterable/ParMultiDo(Anonymous)
     >     exceeded the 80 characters length limit and was truncated.
     >     2019-01-29 09:21:54,242 WARN  org.apache.flink.metrics.MetricGroup
     >                      - The operator name
>  BigQueryIO.Write/BatchLoads/SinglePartitionsReshuffle/ExpandIterable/ParMultiDo(Anonymous)
     >     exceeded the 80 characters length limit and was truncated.
     >     2019-01-29 09:21:54,339 INFO
     >     org.apache.beam.runners.flink.translation.wrappers.streaming.io
    <http://streaming.io>.UnboundedSourceWrapper
     >     - No restore state for UnbounedSourceWrapper.
     >     2019-01-29 09:21:54,371 INFO
     >     org.apache.beam.runners.flink.translation.wrappers.streaming.io
    <http://streaming.io>.UnboundedSourceWrapper
     >     - No restore state for UnbounedSourceWrapper.
     >     2019-01-29 09:21:54,479 INFO
     >     org.apache.beam.runners.flink.translation.wrappers.streaming.io
    <http://streaming.io>.UnboundedSourceWrapper
     >     - No restore state for UnbounedSourceWrapper.
     >     2019-01-29 09:21:55,509 INFO
     >     org.apache.beam.runners.flink.translation.wrappers.streaming.io
    <http://streaming.io>.UnboundedSourceWrapper
     >     - Unbounded Flink Source 0/1 is reading from sources:
>  [org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource$BoundedToUnboundedSourceAdapter@2a501a64]
     >     2019-01-29 09:21:55,535 INFO
     >     org.apache.beam.runners.flink.translation.wrappers.streaming.io
    <http://streaming.io>.UnboundedSourceWrapper
     >     - Unbounded Flink Source 0/1 is reading from sources:
>  [org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource$BoundedToUnboundedSourceAdapter@60345813]
     >     2019-01-29 09:21:55,770 INFO
     >     org.apache.beam.runners.flink.translation.wrappers.streaming.io
    <http://streaming.io>.UnboundedSourceWrapper
     >     - Unbounded Flink Source 0/1 is reading from sources:
     >     [org.apache.beam.sdk.io.kafka.KafkaUnboundedSource@75aab48]
     >     2019-01-29 09:21:56,280 WARN
     >     org.apache.kafka.clients.consumer.ConsumerConfig              - The
     >     configuration 'metis.input.messages.config' was supplied but isn't a
    known
     >     config.
     >     2019-01-29 09:21:57,387 INFO
     >     org.apache.beam.sdk.io.gcp.bigquery.BatchLoads                - 
Writing
     >     BigQuery temporary files to
>  gs://bucket/BigQueryWriteTemp/beam_load_ratingsflink01290921554a632112_85202e449b2d45729d384e3ac5f8cc2b/
     >     before loading them.
     >     2019-01-29 09:21:58,118 INFO
     >     org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers           -
    Waiting for
     >     jobs to complete.
     >     2019-01-29 09:21:58,118 INFO
     >     org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers           -
    Waiting for
     >     jobs to complete.
     >     2019-01-29 09:21:58,118 INFO
     >     org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers           -
    Waiting for
     >     jobs to complete.
     >     2019-01-29 09:21:58,140 INFO
     >     org.apache.beam.sdk.io.kafka.KafkaUnboundedSource             - 
Reader-0:
     >     reading from ratings-0 starting at offset 13112
     >     2019-01-29 09:21:58,141 INFO
     >     org.apache.beam.sdk.io.kafka.KafkaUnboundedSource             - 
Reader-0:
     >     reading from ratings-1 starting at offset 13407
     >     2019-01-29 09:21:58,142 INFO
     >     org.apache.beam.sdk.io.kafka.KafkaUnboundedSource             - 
Reader-0:
     >     reading from ratings-2 starting at offset 13034
     >     2019-01-29 09:21:58,142 INFO
     >     org.apache.beam.sdk.io.kafka.KafkaUnboundedSource             - 
Reader-0:
     >     reading from ratings-3 starting at offset 13271
     >     2019-01-29 09:21:58,142 INFO
     >     org.apache.beam.sdk.io.kafka.KafkaUnboundedSource             - 
Reader-0:
     >     reading from ratings-4 starting at offset 12813
     >     2019-01-29 09:21:58,142 INFO
     >     org.apache.beam.sdk.io.kafka.KafkaUnboundedSource             - 
Reader-0:
     >     reading from ratings-5 starting at offset 13211
     >     2019-01-29 09:21:58,144 INFO
     >     org.apache.beam.sdk.io.kafka.KafkaUnboundedSource             - 
Reader-0:
     >     reading from ratings-6 starting at offset 13394
     >     2019-01-29 09:21:58,145 INFO
     >     org.apache.beam.sdk.io.kafka.KafkaUnboundedSource             - 
Reader-0:
     >     reading from ratings-7 starting at offset 13194
     >     2019-01-29 09:21:58,145 INFO
     >     org.apache.beam.sdk.io.kafka.KafkaUnboundedSource             - 
Reader-0:
     >     reading from ratings-8 starting at offset 13478
     >     2019-01-29 09:21:58,145 INFO
     >     org.apache.beam.sdk.io.kafka.KafkaUnboundedSource             - 
Reader-0:
     >     reading from ratings-9 starting at offset 12966
     >
     >
     >     On Mon, Jan 28, 2019 at 3:36 PM Kaymak, Tobias
    <[email protected] <mailto:[email protected]>
     >     <mailto:[email protected] <mailto:[email protected]>>>
    wrote:
     >
     >         Hi Maximilian,
     >
     >         yes, I've set the --runner to FlinkRunner when launching the 
pipeline
     >         and it does work for a GCS sink, but it seems to be ignored for a
     >         BigQuery sink somehow. Even though it looks like the system 
magically
     >         handles it itself.
     >
     >         This is the full command line to launch the Beam 2.9.0 pipeline
    on Flink
     >         1.5.5:
     >
     >         bin/flink run -d -c di.beam.KafkaToBigQuery -j 
lib/beam_pipelines.jar
     >         --runner=FlinkRunner --appName=ratings
    --checkpointingMode=EXACTLY_ONCE
     >         --checkpointingInterval=300000 --parallelism=1
     >         --tempLocation=gs://somebucket
     >
     >         Here are the logs from the taskmanager, I can share the full code
    of the
     >         pipeline if you want:
     >
     >         2019-01-28 14:33:31,287 WARN  
org.apache.flink.metrics.MetricGroup
     >                              - The operator name
>  BigQueryIO.Write/BatchLoads/View.AsSingleton/Combine.GloballyAsSingletonView/DropInputs/ParMultiDo(NoOp)
     >         exceeded the 80 characters length limit and was truncated.
     >         2019-01-28 14:33:31,911 WARN  
org.apache.flink.metrics.MetricGroup
     >                              - The operator name
>  BigQueryIO.Write/BatchLoads/SinglePartitionWriteTables/ParDo(GarbageCollectTemporaryFiles)/ParMultiDo(GarbageCollectTemporaryFiles)
     >         exceeded the 80 characters length limit and was truncated.
     >         2019-01-28 14:33:31,976 WARN  
org.apache.flink.metrics.MetricGroup
     >                              - The operator name
>  BigQueryIO.Write/BatchLoads/MultiPartitionsWriteTables/ParDo(GarbageCollectTemporaryFiles)/ParMultiDo(GarbageCollectTemporaryFiles)
     >         exceeded the 80 characters length limit and was truncated.
     >         2019-01-28 14:33:32,217 WARN  
org.apache.flink.metrics.MetricGroup
     >                              - The operator name
>  BigQueryIO.Write/BatchLoads/SinglePartitionWriteTables/Values/Values/Map/ParMultiDo(Anonymous)
     >         exceeded the 80 characters length limit and was truncated.
     >         2019-01-28 14:33:32,227 WARN  
org.apache.flink.metrics.MetricGroup
     >                              - The operator name
>  BigQueryIO.Write/BatchLoads/View.AsSingleton/Combine.GloballyAsSingletonView/Combine.globally(Singleton)/WithKeys/AddKeys/Map/ParMultiDo(Anonymous)
     >         exceeded the 80 characters length limit and was truncated.
     >         2019-01-28 14:33:32,228 WARN  
org.apache.flink.metrics.MetricGroup
     >                              - The operator name
>  BigQueryIO.Write/BatchLoads/SinglePartitionsReshuffle/RestoreOriginalTimestamps/Reify.ExtractTimestampsFromValues/ParDo(Anonymous)/ParMultiDo(Anonymous)
     >         exceeded the 80 characters length limit and was truncated.
     >         2019-01-28 14:33:32,276 WARN  
org.apache.flink.metrics.MetricGroup
     >                              - The operator name
>  BigQueryIO.Write/BatchLoads/MultiPartitionsWriteTables/Values/Values/Map/ParMultiDo(Anonymous)
     >         exceeded the 80 characters length limit and was truncated.
     >         2019-01-28 14:33:32,282 INFO
     >         org.apache.beam.runners.flink.translation.wrappers.streaming.io
    <http://streaming.io>.UnboundedSourceWrapper
     >         - No restore state for UnbounedSourceWrapper.
     >         2019-01-28 14:33:32,288 INFO
     >         org.apache.beam.runners.flink.translation.wrappers.streaming.io
    <http://streaming.io>.UnboundedSourceWrapper
     >         - Unbounded Flink Source 0/1 is reading from sources:
>  [org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource$BoundedToUnboundedSourceAdapter@5fecdf95]
     >         2019-01-28 14:33:32,296 INFO
     >         org.apache.beam.runners.flink.translation.wrappers.streaming.io
    <http://streaming.io>.UnboundedSourceWrapper
     >         - No restore state for UnbounedSourceWrapper.
     >         2019-01-28 14:33:32,318 WARN  
org.apache.flink.metrics.MetricGroup
     >                              - The operator name
>  BigQueryIO.Write/BatchLoads/TempFilePrefixView/Combine.GloballyAsSingletonView/View.CreatePCollectionView/Combine.globally(Concatenate)/WithKeys/AddKeys/Map/ParMultiDo(Anonymous)
     >         exceeded the 80 characters length limit and was truncated.
     >         2019-01-28 14:33:32,321 WARN  
org.apache.flink.metrics.MetricGroup
     >                              - The operator name
>  BigQueryIO.Write/BatchLoads/SinglePartitionsReshuffle/RestoreOriginalTimestamps/ReifyTimestamps.RemoveWildcard/ParDo(Anonymous)/ParMultiDo(Anonymous)
     >         exceeded the 80 characters length limit and was truncated.
     >         2019-01-28 14:33:32,324 WARN  
org.apache.flink.metrics.MetricGroup
     >                              - The operator name
>  BigQueryIO.Write/BatchLoads/TempFilePrefixView/Combine.GloballyAsSingletonView/View.CreatePCollectionView/Combine.globally(Concatenate)/Values/Values/Map/ParMultiDo(Anonymous)
     >         exceeded the 80 characters length limit and was truncated.
     >         2019-01-28 14:33:32,329 WARN  
org.apache.flink.metrics.MetricGroup
     >                              - The operator name
>  BigQueryIO.Write/BatchLoads/TempFilePrefixView/Combine.GloballyAsSingletonView/View.CreatePCollectionView/Combine.globally(Concatenate)/Combine.perKey(Concatenate)
     >         exceeded the 80 characters length limit and was truncated.
     >         2019-01-28 14:33:32,357 WARN  
org.apache.flink.metrics.MetricGroup
     >                              - The operator name
>  BigQueryIO.Write/BatchLoads/TempFilePrefixView/Combine.GloballyAsSingletonView/View.VoidKeyToMultimapMaterialization/ParDo(VoidKeyToMultimapMaterialization)/ParMultiDo(VoidKeyToMultimapMaterialization)
     >         exceeded the 80 characters length limit and was truncated.
     >         2019-01-28 14:33:32,482 WARN  
org.apache.flink.metrics.MetricGroup
     >                              - The operator name
>  BigQueryIO.Write/BatchLoads/SinglePartitionsReshuffle/ExpandIterable/ParMultiDo(Anonymous)
     >         exceeded the 80 characters length limit and was truncated.
     >         2019-01-28 14:33:32,483 WARN  
org.apache.flink.metrics.MetricGroup
     >                              - The operator name
>  BigQueryIO.Write/BatchLoads/TempFilePrefixView/Combine.GloballyAsSingletonView/Combine.globally(Singleton)/Values/Values/Map/ParMultiDo(Anonymous)
     >         exceeded the 80 characters length limit and was truncated.
     >         2019-01-28 14:33:32,493 WARN  
org.apache.flink.metrics.MetricGroup
     >                          ��   - The operator name
>  BigQueryIO.Write/BatchLoads/TempFilePrefixView/Combine.GloballyAsSingletonView/Combine.globally(Singleton)/Combine.perKey(Singleton)
     >         exceeded the 80 characters length limit and was truncated.
     >         2019-01-28 14:33:32,697 WARN  
org.apache.flink.metrics.MetricGroup
     >                              - The operator name
>  BigQueryIO.Write/BatchLoads/WriteGroupedRecords/ParMultiDo(WriteGroupedRecordsToFiles)
     >         exceeded the 80 characters length limit and was truncated.
     >         2019-01-28 14:33:32,782 WARN  
org.apache.flink.metrics.MetricGroup
     >                              - The operator name
>  BigQueryIO.Write/BatchLoads/View.AsSingleton/Combine.GloballyAsSingletonView/View.CreatePCollectionView/Combine.globally(Concatenate)/Values/Values/Map/ParMultiDo(Anonymous)
     >         exceeded the 80 characters length limit and was truncated.
     >         2019-01-28 14:33:32,789 WARN  
org.apache.flink.metrics.MetricGroup
     >                              - The operator name
>  BigQueryIO.Write/BatchLoads/View.AsSingleton/Combine.GloballyAsSingletonView/View.CreatePCollectionView/Combine.globally(Concatenate)/Combine.perKey(Concatenate)
     >         exceeded the 80 characters length limit and was truncated.
     >         2019-01-28 14:33:33,093 WARN  
org.apache.flink.metrics.MetricGroup
     >                              - The operator name
>  BigQueryIO.Write/BatchLoads/View.AsSingleton/Combine.GloballyAsSingletonView/View.CreatePCollectionView/Combine.globally(Concatenate)/WithKeys/AddKeys/Map/ParMultiDo(Anonymous)
     >         exceeded the 80 characters length limit and was truncated.
     >         2019-01-28 14:33:33,122 WARN  
org.apache.flink.metrics.MetricGroup
     >                              - The operator name
>  BigQueryIO.Write/BatchLoads/View.AsSingleton/Combine.GloballyAsSingletonView/View.VoidKeyToMultimapMaterialization/ParDo(VoidKeyToMultimapMaterialization)/ParMultiDo(VoidKeyToMultimapMaterialization)
     >         exceeded the 80 characters length limit and was truncated.
     >         2019-01-28 14:33:33,162 INFO
     >         org.apache.beam.runners.flink.translation.wrappers.streaming.io
    <http://streaming.io>.UnboundedSourceWrapper
     >         - No restore state for UnbounedSourceWrapper.
     >         2019-01-28 14:33:33,179 WARN  
org.apache.flink.metrics.MetricGroup
     >                              - The operator name
>  BigQueryIO.Write/BatchLoads/MultiPartitionsReshuffle/RestoreOriginalTimestamps/Reify.ExtractTimestampsFromValues/ParDo(Anonymous)/ParMultiDo(Anonymous)
     >         exceeded the 80 characters length limit and was truncated.
     >         2019-01-28 14:33:33,187 WARN  
org.apache.flink.metrics.MetricGroup
     >                              - The operator name
>  BigQueryIO.Write/BatchLoads/MultiPartitionsReshuffle/RestoreOriginalTimestamps/ReifyTimestamps.RemoveWildcard/ParDo(Anonymous)/ParMultiDo(Anonymous)
     >         exceeded the 80 characters length limit and was truncated.
     >         2019-01-28 14:33:33,192 WARN  
org.apache.flink.metrics.MetricGroup
     >                              - The operator name
>  BigQueryIO.Write/BatchLoads/MultiPartitionsReshuffle/ExpandIterable/ParMultiDo(Anonymous)
     >         exceeded the 80 characters length limit and was truncated.
     >         2019-01-28 14:33:33,218 WARN  
org.apache.flink.metrics.MetricGroup
     >                              - The operator name
>  BigQueryIO.Write/BatchLoads/View.AsSingleton/Combine.GloballyAsSingletonView/Combine.globally(Singleton)/Values/Values/Map/ParMultiDo(Anonymous)
     >         exceeded the 80 characters length limit and was truncated.
     >         2019-01-28 14:33:33,220 WARN  
org.apache.flink.metrics.MetricGroup
     >                              - The operator name
>  BigQueryIO.Write/BatchLoads/View.AsSingleton/Combine.GloballyAsSingletonView/Combine.globally(Singleton)/Combine.perKey(Singleton)
     >         exceeded the 80 characters length limit and was truncated.
     >         2019-01-28 14:33:33,298 WARN  
org.apache.flink.metrics.MetricGroup
     >                              - The operator name
>  BigQueryIO.Write/BatchLoads/SinglePartitionWriteTables/Window.Into()/Window.Assign.out
     >         exceeded the 80 characters length limit and was truncated.
     >         2019-01-28 14:33:33,304 WARN  
org.apache.flink.metrics.MetricGroup
     >                              - The operator name
>  BigQueryIO.Write/BatchLoads/MultiPartitionsWriteTables/Window.Into()/Window.Assign.out
     >         exceeded the 80 characters length limit and was truncated.
     >         2019-01-28 14:33:33,323 WARN  
org.apache.flink.metrics.MetricGroup
     >                              - The operator name
>  BigQueryIO.Write/BatchLoads/MultiPartitionsWriteTables/WithKeys/AddKeys/Map/ParMultiDo(Anonymous)
     >         exceeded the 80 characters length limit and was truncated.
     >         2019-01-28 14:33:33,326 WARN  
org.apache.flink.metrics.MetricGroup
     >                              - The operator name
>  BigQueryIO.Write/BatchLoads/MultiPartitionsReshuffle/ReifyOriginalTimestamps/ParDo(Anonymous)/ParMultiDo(Anonymous)
     >         exceeded the 80 characters length limit and was truncated.
     >         2019-01-28 14:33:33,357 WARN  
org.apache.flink.metrics.MetricGroup
     >                              - The operator name
>  BigQueryIO.Write/BatchLoads/SinglePartitionWriteTables/WithKeys/AddKeys/Map/ParMultiDo(Anonymous)
     >         exceeded the 80 characters length limit and was truncated.
     >         2019-01-28 14:33:33,377 WARN  
org.apache.flink.metrics.MetricGroup
     >                              - The operator name
>  BigQueryIO.Write/BatchLoads/MultiPartitionsReshuffle/Window.Into()/Window.Assign.out
     >         exceeded the 80 characters length limit and was truncated.
     >         2019-01-28 14:33:33,395 WARN  
org.apache.flink.metrics.MetricGroup
     >                              - The operator name
>  BigQueryIO.Write/BatchLoads/TempFilePrefixView/Combine.GloballyAsSingletonView/Combine.globally(Singleton)/WithKeys/AddKeys/Map/ParMultiDo(Anonymous)
     >         exceeded the 80 characters length limit and was truncated.
     >         2019-01-28 14:33:33,477 WARN  
org.apache.flink.metrics.MetricGroup
     >                              - The operator name
>  BigQueryIO.Write/BatchLoads/SinglePartitionsReshuffle/ReifyOriginalTimestamps/ParDo(Anonymous)/ParMultiDo(Anonymous)
     >         exceeded the 80 characters length limit and was truncated.
     >         2019-01-28 14:33:33,487 WARN  
org.apache.flink.metrics.MetricGroup
     >                              - The operator name
>  BigQueryIO.Write/BatchLoads/SinglePartitionsReshuffle/Window.Into()/Window.Assign.out
     >         exceeded the 80 characters length limit and was truncated.
     >         2019-01-28 14:33:33,748 INFO
     >         org.apache.beam.runners.flink.translation.wrappers.streaming.io
    <http://streaming.io>.UnboundedSourceWrapper
     >         - No restore state for UnbounedSourceWrapper.
     >         2019-01-28 14:33:34,577 INFO
     >         org.apache.beam.runners.flink.translation.wrappers.streaming.io
    <http://streaming.io>.UnboundedSourceWrapper
     >         - Unbounded Flink Source 0/1 is reading from sources:
>  [org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource$BoundedToUnboundedSourceAdapter@1892a35d]
     >         2019-01-28 14:33:34,610 INFO
     >         org.apache.beam.runners.flink.translation.wrappers.streaming.io
    <http://streaming.io>.UnboundedSourceWrapper
     >         - Unbounded Flink Source 0/1 is reading from sources:
>  [org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource$BoundedToUnboundedSourceAdapter@7adcb05b]
     >         2019-01-28 14:33:34,747 INFO
     >         org.apache.beam.runners.flink.translation.wrappers.streaming.io
    <http://streaming.io>.UnboundedSourceWrapper
     >         - Unbounded Flink Source 0/1 is reading from sources:
     >         [org.apache.beam.sdk.io.kafka.KafkaUnboundedSource@71389814]
     >         2019-01-28 14:33:34,896 WARN
     >         org.apache.kafka.clients.consumer.ConsumerConfig              - 
The
     >         configuration 'metis.input.messages.config' was supplied but 
isn't a
     >         known config.
     >         2019-01-28 14:33:35,462 INFO
     >         org.apache.beam.sdk.io.gcp.bigquery.BatchLoads                -
    Writing
     >         BigQuery temporary files to
>  gs://tempspace_eu_regional/dataflow_dev/BigQueryWriteTemp/beam_load_ratingsflink01281433342cd8f9b4_7ce229196b1d41e3ad63d0ef6234e0f6/
     >         before loading them.
     >         2019-01-28 14:33:35,544 INFO
     >         org.apache.beam.sdk.io.kafka.KafkaUnboundedSource             -
     >         Reader-0: reading from ratings-0 starting at offset 2945
     >         2019-01-28 14:33:35,544 INFO
     >         org.apache.beam.sdk.io.kafka.KafkaUnboundedSource             -
     >         Reader-0: reading from ratings-1 starting at offset 3101
     >         2019-01-28 14:33:35,544 INFO
     >         org.apache.beam.sdk.io.kafka.KafkaUnboundedSource             -
     >         Reader-0: reading from ratings-2 starting at offset 3031
     >         2019-01-28 14:33:35,545 INFO
     >         org.apache.beam.sdk.io.kafka.KafkaUnboundedSource             -
     >         Reader-0: reading from ratings-3 starting at offset 3009
     >         2019-01-28 14:33:35,545 INFO
     >         org.apache.beam.sdk.io.kafka.KafkaUnboundedSource             -
     >         Reader-0: reading from ratings-4 starting at offset 2903
     >         2019-01-28 14:33:35,545 INFO
     >         org.apache.beam.sdk.io.kafka.KafkaUnboundedSource             -
     >         Reader-0: reading from ratings-5 starting at offset 3068
     >         2019-01-28 14:33:35,545 INFO
     >         org.apache.beam.sdk.io.kafka.KafkaUnboundedSource             -
     >         Reader-0: reading from ratings-6 starting at offset 3160
     >         2019-01-28 14:33:35,545 INFO
     >         org.apache.beam.sdk.io.kafka.KafkaUnboundedSource             -
     >         Reader-0: reading from ratings-7 starting at offset 3014
     >         2019-01-28 14:33:35,546 INFO
     >         org.apache.beam.sdk.io.kafka.KafkaUnboundedSource             -
     >         Reader-0: reading from ratings-8 starting at offset 3096
     >         2019-01-28 14:33:35,546 INFO
     >         org.apache.beam.sdk.io.kafka.KafkaUnboundedSource             -
     >         Reader-0: reading from ratings-9 starting at offset 2885
     >         2019-01-28 14:33:35,577 WARN
     >         org.apache.kafka.clients.consumer.ConsumerConfig              - 
The
     >         configuration 'metis.input.messages.config' was supplied but 
isn't a
     >         known config.
     >         2019-01-28 14:33:35,801 INFO
     >         org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers           -
    Waiting
     >         for jobs to complete.
     >         2019-01-28 14:33:35,803 INFO
     >         org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers           -
    Waiting
     >         for jobs to complete.
     >         2019-01-28 14:33:35,801 INFO
     >         org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers           -
    Waiting
     >         for jobs to complete.
     >         2019-01-28 14:33:36,217 INFO
     >         org.apache.beam.sdk.io.kafka.KafkaUnboundedSource             -
     >         Reader-0: first record offset 3014
     >
     >
     >         Best,
     >         Tobi
     >
     >
     >         On Mon, Jan 28, 2019 at 11:52 AM Maximilian Michels
    <[email protected] <mailto:[email protected]>
     >         <mailto:[email protected] <mailto:[email protected]>>> wrote:
     >
     >             Hi Tobias,
     >
     >             Checkpointing should be enabled when you set it in the Flink
    config
     >             or via the
     >             Beam option `checkpointingInterval`. Did you set `runner` to
     >             `FlinkRunner`?
     >
     >             If possible, could you share parts of the Flink logs?
     >
     >             Thanks,
     >             Max
     >
     >             On 25.01.19 15:14, Kaymak, Tobias wrote:
     >              > Hi,
     >              >
     >              > I am trying to migrate my existing KafkaToGCS pipeline to 
a
     >             KafkaToBigQuery
     >              > pipeline to skip the loading step from GCS which is 
currently
     >             handled externally
     >              > from Beam.
     >              >
     >              > I noticed that the pipeline, written in Beam 2.9.0 (Java) 
does
     >             not trigger any
     >              > checkpoint on Flink (1.5.5), even though its configured to
    do so
     >             when I launch
     >              > it. Is this normal? How does Beam then guarantee exactly 
once
     >             when there are no
     >              > checkpoints in Flink? (It seems to start from scratch 
when it
     >             crashes, during my
     >              > tests, but I am not 100% sure)
     >              >
     >              >
     >              > This is my pipeline:
     >              >
     >              >   pipeline
     >              >          .apply(
     >              >              KafkaIO.<String, String>read()
     >              >                  .withBootstrapServers(bootstrap)
     >              >                  .withTopics(topics)
>              > .withKeyDeserializer(StringDeserializer.class)
     >              >
     >             .withValueDeserializer(ConfigurableDeserializer.class)
     >              >                  .updateConsumerProperties(
     >              >
     >             ImmutableMap.of(InputMessagesConfig.CONFIG_PROPERTY_NAME,
     >              > inputMessagesConfig))
     >              >
     >             
.updateConsumerProperties(ImmutableMap.of("auto.offset.reset",
     >              > "earliest"))
     >              >
     >             .updateConsumerProperties(ImmutableMap.of("group.id
    <http://group.id> <http://group.id>
     >              > <http://group.id>", groupId))
     >              >
     >             
.updateConsumerProperties(ImmutableMap.of("enable.auto.commit",
     >              > "true"))
     >              >                  .withReadCommitted()
     >              >                  .withTimestampPolicyFactory(withEventTs)
     >              >                  .commitOffsetsInFinalize())
     >              >          .apply(ParDo.of(new ToEventFn()))
     >              >          .apply(
     >              >              Window.into(new 
ZurichTimePartitioningWindowFn())
     >              >
     >              >                  .triggering(
     >              >                      Repeatedly.forever(
     >              >                          AfterFirst.of(
     >              >
     >             AfterPane.elementCountAtLeast(bundleSize),
     >              >
     >             AfterProcessingTime.pastFirstElementInPane()
>              > .plusDelayOf(refreshFrequency)))) >              > .withAllowedLateness(Duration.standardDays(14))
     >              >                  .discardingFiredPanes())
     >              >          .apply(
     >              >              BigQueryIO.<Event>write()
>              > .withMethod(BigQueryIO.Write.Method.FILE_LOADS)
     >              >                  
.withTriggeringFrequency(refreshFrequency)
     >              >                  .withNumFileShards(1)
     >              >                  .to(partitionedTableDynamicDestinations)
     >              >                  .withFormatFunction(
     >              >                      (SerializableFunction<Event, 
TableRow>)
>              > KafkaToBigQuery::convertUserEventToTableRow)
     >              >
     >              >
>  .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
     >              >
     >              >
>  .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
     >              >
     >              >      pipeline.run().waitUntilFinish();
     >              > It's launched like the other (GCS) one via:
     >              >
     >              > ...--checkpointingMode=EXACTLY_ONCE
    --checkpointingInterval=300000
     >              > --parallelism=1 --tempLocation=gs://foo..
     >              >
     >              > Any idea why checkpointing does not work here?
     >              >
     >              > Best,
     >              > Tobias
     >
     >
     >
     >         --
     >         Tobias Kaymak
     >         Data Engineer
     >
     > [email protected] <mailto:[email protected]>
    <mailto:[email protected] <mailto:[email protected]>>
     > www.ricardo.ch <http://www.ricardo.ch> <http://www.ricardo.ch/>
     >
     >
     >
     >     --
     >     Tobias Kaymak
     >     Data Engineer
     >
     > [email protected] <mailto:[email protected]>
    <mailto:[email protected] <mailto:[email protected]>>
     > www.ricardo.ch <http://www.ricardo.ch> <http://www.ricardo.ch/>
     >
     >
     >
     > --
     > Tobias Kaymak
     > Data Engineer
     >
     > [email protected] <mailto:[email protected]>
    <mailto:[email protected] <mailto:[email protected]>>
     > www.ricardo.ch <http://www.ricardo.ch> <http://www.ricardo.ch/>
     >

Reply via email to