Hi Maximilian,

I can confirm that checkpoints work with Beam 2.10-SNAPSHOT and do not work
with version 2.9. I am very sure it is related to this issue:
https://issues.apache.org/jira/browse/FLINK-2491 - which has been fixed in
2.10, since parts of the pipeline are FINISHED after a couple of minutes
and this then triggers the shutdown of the checkpoints. However, executing
the pipeline on a Flink 1.5.5 cluster yields no metrics about the elements
processed in the webinterface anymore:

2019-01-30 09:14:53,934 WARN
org.apache.beam.sdk.metrics.MetricsEnvironment - Reporting metrics are not
supported in the current execution environment.

Is this a known issue? I want to change my Flink version to 1.6 to see if
this is fixed, but I am unsure at this point how to achieve this. Is it
something I can pass in my pom.xml?




[image: image.png]

Best,
Tobi



On Tue, Jan 29, 2019 at 4:27 PM Maximilian Michels <[email protected]> wrote:

> Hi Tobias,
>
> It is normal to see "No restore state for UnbounedSourceWrapper" when not
> restoring from a checkpoint/savepoint.
>
> Just checking. You mentioned you set the checkpoint interval via:
> --checkpointingInterval=300000
>
> That means you have to wait 5 minutes until the first checkpoint will be
> taken.
> You should be seeing an INFO message like this: "INFO: Triggering
> checkpoint 1 @
> 1548775459114 for job 3b5bdb811f1923bf49db24403e9c1ae9."
>
> Thanks,
> Max
>
> On 29.01.19 16:13, Kaymak, Tobias wrote:
> > Even after altering the pipeline and making it way more simple it still
> does not
> > checkpoint. (I used a single KafkaTopic as a source and altered the IO
> step the
> > following way:
> >
> >       .apply(
> >              BigQueryIO.<Event>write()
> >                  .withMethod(BigQueryIO.Write.Method.FILE_LOADS)
> >                  .withTriggeringFrequency(refreshFrequency)
> >                  .withNumFileShards(1)
> >                  .to(projectId + ":" + dataset + "." + tableName)
> >                  .withTimePartitioning(new
> > TimePartitioning().setField("event_date"))
> >                  .withSchema(tableSchema)
> >                  .withFormatFunction(
> >                      (SerializableFunction<Event, TableRow>)
> >                          KafkaToBigQuery::convertUserEventToTableRow)
> >
> >
> .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
> >
> > .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
> >
> > The graph that Flink 1.5.5 generated looked exactly the same and
> checkpointing
> > did not work still:
> > image.png
> >
> > On Tue, Jan 29, 2019 at 11:05 AM Kaymak, Tobias <
> [email protected]
> > <mailto:[email protected]>> wrote:
> >
> >     If I have a pipeline running and I restart the taskmanager on which
> it's
> >     executing the log shows - I find the "No restore state for
> >     UnbounedSourceWrapper." interesting, as it seems to indicate that the
> >     pipeline never stored a state in the first place?
> >
> >     Starting taskexecutor as a console application on host
> >     flink-taskmanager-5d85dd6854-pm5bl.
> >     2019-01-29 09:20:48,706 WARN
> org.apache.hadoop.util.NativeCodeLoader
> >                     - Unable to load native-hadoop library for your
> platform...
> >     using builtin-java classes where applicable
> >     2019-01-29 09:20:51,253 WARN
> >     org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  -
> SASL
> >     configuration failed: javax.security.auth.login.LoginException: No
> JAAS
> >     configuration section named 'Client' was found in specified JAAS
> >     configuration file: '/tmp/jaas-7768141350028767113.conf'. Will
> continue
> >     connection to Zookeeper server without SASL authentication, if
> Zookeeper
> >     server allows it.
> >     2019-01-29 09:20:51,281 ERROR
> >     org.apache.flink.shaded.curator.org.apache.curator.ConnectionState  -
> >     Authentication failed
> >     2019-01-29 09:21:53,814 WARN  org.apache.flink.metrics.MetricGroup
>
> >                      - The operator name
> >
>  
> BigQueryIO.Write/BatchLoads/View.AsSingleton/Combine.GloballyAsSingletonView/DropInputs/ParMultiDo(NoOp)
> >     exceeded the 80 characters length limit and was truncated.
> >     2019-01-29 09:21:53,828 WARN  org.apache.flink.metrics.MetricGroup
>
> >                      - The operator name
> >
>  
> BigQueryIO.Write/BatchLoads/MultiPartitionsWriteTables/Window.Into()/Window.Assign.out
> >     exceeded the 80 characters length limit and was truncated.
> >     2019-01-29 09:21:53,834 WARN  org.apache.flink.metrics.MetricGroup
>
> >                      - The operator name
> >
>  
> BigQueryIO.Write/BatchLoads/SinglePartitionWriteTables/Window.Into()/Window.Assign.out
> >     exceeded the 80 characters length limit and was truncated.
> >     2019-01-29 09:21:53,917 INFO
> >     
> > org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper
>
> >     - No restore state for UnbounedSourceWrapper.
> >     2019-01-29 09:21:53,929 WARN  org.apache.flink.metrics.MetricGroup
>
> >                      - The operator name
> >
>  
> BigQueryIO.Write/BatchLoads/MultiPartitionsWriteTables/ParDo(GarbageCollectTemporaryFiles)/ParMultiDo(GarbageCollectTemporaryFiles)
> >     exceeded the 80 characters length limit and was truncated.
> >     2019-01-29 09:21:53,937 WARN  org.apache.flink.metrics.MetricGroup
>
> >                      - The operator name
> >
>  
> BigQueryIO.Write/BatchLoads/MultiPartitionsReshuffle/ReifyOriginalTimestamps/ParDo(Anonymous)/ParMultiDo(Anonymous)
> >     exceeded the 80 characters length limit and was truncated.
> >     2019-01-29 09:21:53,978 INFO
> >     
> > org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper
>
> >     - Unbounded Flink Source 0/1 is reading from sources:
> >
>  
> [org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource$BoundedToUnboundedSourceAdapter@69908217
> ]
> >     2019-01-29 09:21:54,002 WARN  org.apache.flink.metrics.MetricGroup
>
> >                      - The operator name
> >
>  
> BigQueryIO.Write/BatchLoads/MultiPartitionsReshuffle/Window.Into()/Window.Assign.out
> >     exceeded the 80 characters length limit and was truncated.
> >     2019-01-29 09:21:54,008 WARN  org.apache.flink.metrics.MetricGroup
>
> >                      - The operator name
> >
>  
> BigQueryIO.Write/BatchLoads/View.AsSingleton/Combine.GloballyAsSingletonView/View.CreatePCollectionView/Combine.globally(Concatenate)/WithKeys/AddKeys/Map/ParMultiDo(Anonymous)
> >     exceeded the 80 characters length limit and was truncated.
> >     2019-01-29 09:21:54,011 WARN  org.apache.flink.metrics.MetricGroup
>
> >                      - The operator name
> >
>  
> BigQueryIO.Write/BatchLoads/TempFilePrefixView/Combine.GloballyAsSingletonView/View.CreatePCollectionView/Combine.globally(Concatenate)/WithKeys/AddKeys/Map/ParMultiDo(Anonymous)
> >     exceeded the 80 characters length limit and was truncated.
> >     2019-01-29 09:21:54,020 WARN  org.apache.flink.metrics.MetricGroup
>
> >                      - The operator name
> >
>  
> BigQueryIO.Write/BatchLoads/SinglePartitionWriteTables/ParDo(GarbageCollectTemporaryFiles)/ParMultiDo(GarbageCollectTemporaryFiles)
> >     exceeded the 80 characters length limit and was truncated.
> >     2019-01-29 09:21:54,080 WARN  org.apache.flink.metrics.MetricGroup
>
> >                      - The operator name
> >
>  
> BigQueryIO.Write/BatchLoads/TempFilePrefixView/Combine.GloballyAsSingletonView/View.CreatePCollectionView/Combine.globally(Concatenate)/Values/Values/Map/ParMultiDo(Anonymous)
> >     exceeded the 80 characters length limit and was truncated.
> >     2019-01-29 09:21:54,091 WARN  org.apache.flink.metrics.MetricGroup
>
> >                      - The operator name
> >
>  
> BigQueryIO.Write/BatchLoads/MultiPartitionsReshuffle/RestoreOriginalTimestamps/Reify.ExtractTimestampsFromValues/ParDo(Anonymous)/ParMultiDo(Anonymous)
> >     exceeded the 80 characters length limit and was truncated.
> >     2019-01-29 09:21:54,099 WARN  org.apache.flink.metrics.MetricGroup
>
> >                      - The operator name
> >
>  
> BigQueryIO.Write/BatchLoads/MultiPartitionsWriteTables/WithKeys/AddKeys/Map/ParMultiDo(Anonymous)
> >     exceeded the 80 characters length limit and was truncated.
> >     2019-01-29 09:21:54,107 WARN  org.apache.flink.metrics.MetricGroup
>
> >                      - The operator name
> >
>  
> BigQueryIO.Write/BatchLoads/View.AsSingleton/Combine.GloballyAsSingletonView/Combine.globally(Singleton)/WithKeys/AddKeys/Map/ParMultiDo(Anonymous)
> >     exceeded the 80 characters length limit and was truncated.
> >     2019-01-29 09:21:54,109 WARN  org.apache.flink.metrics.MetricGroup
>
> >                      - The operator name
> >
>  
> BigQueryIO.Write/BatchLoads/MultiPartitionsWriteTables/Values/Values/Map/ParMultiDo(Anonymous)
> >     exceeded the 80 characters length limit and was truncated.
> >     2019-01-29 09:21:54,119 WARN  org.apache.flink.metrics.MetricGroup
>
> >                      - The operator name
> >
>  
> BigQueryIO.Write/BatchLoads/SinglePartitionWriteTables/Values/Values/Map/ParMultiDo(Anonymous)
> >     exceeded the 80 characters length limit and was truncated.
> >     2019-01-29 09:21:54,118 WARN  org.apache.flink.metrics.MetricGroup
>
> >                      - The operator name
> >
>  
> BigQueryIO.Write/BatchLoads/TempFilePrefixView/Combine.GloballyAsSingletonView/View.VoidKeyToMultimapMaterialization/ParDo(VoidKeyToMultimapMaterialization)/ParMultiDo(VoidKeyToMultimapMaterialization)
> >     exceeded the 80 characters length limit and was truncated.
> >     2019-01-29 09:21:54,115 WARN  org.apache.flink.metrics.MetricGroup
>
> >                      - The operator name
> >
>  
> BigQueryIO.Write/BatchLoads/SinglePartitionsReshuffle/ReifyOriginalTimestamps/ParDo(Anonymous)/ParMultiDo(Anonymous)
> >     exceeded the 80 characters length limit and was truncated.
> >     2019-01-29 09:21:54,114 WARN  org.apache.flink.metrics.MetricGroup
>
> >                      - The operator name
> >
>  
> BigQueryIO.Write/BatchLoads/TempFilePrefixView/Combine.GloballyAsSingletonView/View.CreatePCollectionView/Combine.globally(Concatenate)/Combine.perKey(Concatenate)
> >     exceeded the 80 characters length limit and was truncated.
> >     2019-01-29 09:21:54,111 WARN  org.apache.flink.metrics.MetricGroup
>
> >                      - The operator name
> >
>  
> BigQueryIO.Write/BatchLoads/SinglePartitionWriteTables/WithKeys/AddKeys/Map/ParMultiDo(Anonymous)
> >     exceeded the 80 characters length limit and was truncated.
> >     2019-01-29 09:21:54,111 WARN  org.apache.flink.metrics.MetricGroup
>
> >                      - The operator name
> >
>  
> BigQueryIO.Write/BatchLoads/SinglePartitionsReshuffle/RestoreOriginalTimestamps/Reify.ExtractTimestampsFromValues/ParDo(Anonymous)/ParMultiDo(Anonymous)
> >     exceeded the 80 characters length limit and was truncated.
> >     2019-01-29 09:21:54,110 WARN  org.apache.flink.metrics.MetricGroup
>
> >                      - The operator name
> >
>  
> BigQueryIO.Write/BatchLoads/View.AsSingleton/Combine.GloballyAsSingletonView/View.CreatePCollectionView/Combine.globally(Concatenate)/Values/Values/Map/ParMultiDo(Anonymous)
> >     exceeded the 80 characters length limit and was truncated.
> >     2019-01-29 09:21:54,110 WARN  org.apache.flink.metrics.MetricGroup
>
> >                      - The operator name
> >
>  
> BigQueryIO.Write/BatchLoads/TempFilePrefixView/Combine.GloballyAsSingletonView/Combine.globally(Singleton)/WithKeys/AddKeys/Map/ParMultiDo(Anonymous)
> >     exceeded the 80 characters length limit and was truncated.
> >     2019-01-29 09:21:54,109 WARN  org.apache.flink.metrics.MetricGroup
>
> >                      - The operator name
> >
>  
> BigQueryIO.Write/BatchLoads/View.AsSingleton/Combine.GloballyAsSingletonView/View.VoidKeyToMultimapMaterialization/ParDo(VoidKeyToMultimapMaterialization)/ParMultiDo(VoidKeyToMultimapMaterialization)
> >     exceeded the 80 characters length limit and was truncated.
> >     2019-01-29 09:21:54,144 WARN  org.apache.flink.metrics.MetricGroup
>
> >                      - The operator name
> >
>  
> BigQueryIO.Write/BatchLoads/SinglePartitionsReshuffle/Window.Into()/Window.Assign.out
> >     exceeded the 80 characters length limit and was truncated.
> >     2019-01-29 09:21:54,172 WARN  org.apache.flink.metrics.MetricGroup
>
> >                      - The operator name
> >
>  
> BigQueryIO.Write/BatchLoads/WriteGroupedRecords/ParMultiDo(WriteGroupedRecordsToFiles)
> >     exceeded the 80 characters length limit and was truncated.
> >     2019-01-29 09:21:54,176 WARN  org.apache.flink.metrics.MetricGroup
>
> >                      - The operator name
> >
>  
> BigQueryIO.Write/BatchLoads/View.AsSingleton/Combine.GloballyAsSingletonView/Combine.globally(Singleton)/Values/Values/Map/ParMultiDo(Anonymous)
> >     exceeded the 80 characters length limit and was truncated.
> >     2019-01-29 09:21:54,179 WARN  org.apache.flink.metrics.MetricGroup
>
> >                      - The operator name
> >
>  
> BigQueryIO.Write/BatchLoads/View.AsSingleton/Combine.GloballyAsSingletonView/View.CreatePCollectionView/Combine.globally(Concatenate)/Combine.perKey(Concatenate)
> >     exceeded the 80 characters length limit and was truncated.
> >     2019-01-29 09:21:54,189 WARN  org.apache.flink.metrics.MetricGroup
>
> >                      - The operator name
> >
>  
> BigQueryIO.Write/BatchLoads/View.AsSingleton/Combine.GloballyAsSingletonView/Combine.globally(Singleton)/Combine.perKey(Singleton)
> >     exceeded the 80 characters length limit and was truncated.
> >     2019-01-29 09:21:54,191 WARN  org.apache.flink.metrics.MetricGroup
>
> >                      - The operator name
> >
>  
> BigQueryIO.Write/BatchLoads/TempFilePrefixView/Combine.GloballyAsSingletonView/Combine.globally(Singleton)/Values/Values/Map/ParMultiDo(Anonymous)
> >     exceeded the 80 characters length limit and was truncated.
> >     2019-01-29 09:21:54,203 WARN  org.apache.flink.metrics.MetricGroup
>
> >                      - The operator name
> >
>  
> BigQueryIO.Write/BatchLoads/SinglePartitionsReshuffle/RestoreOriginalTimestamps/ReifyTimestamps.RemoveWildcard/ParDo(Anonymous)/ParMultiDo(Anonymous)
> >     exceeded the 80 characters length limit and was truncated.
> >     2019-01-29 09:21:54,210 WARN  org.apache.flink.metrics.MetricGroup
>
> >                      - The operator name
> >
>  
> BigQueryIO.Write/BatchLoads/TempFilePrefixView/Combine.GloballyAsSingletonView/Combine.globally(Singleton)/Combine.perKey(Singleton)
> >     exceeded the 80 characters length limit and was truncated.
> >     2019-01-29 09:21:54,217 WARN  org.apache.flink.metrics.MetricGroup
>
> >                      - The operator name
> >
>  
> BigQueryIO.Write/BatchLoads/MultiPartitionsReshuffle/RestoreOriginalTimestamps/ReifyTimestamps.RemoveWildcard/ParDo(Anonymous)/ParMultiDo(Anonymous)
> >     exceeded the 80 characters length limit and was truncated.
> >     2019-01-29 09:21:54,238 WARN  org.apache.flink.metrics.MetricGroup
>
> >                      - The operator name
> >
>  
> BigQueryIO.Write/BatchLoads/MultiPartitionsReshuffle/ExpandIterable/ParMultiDo(Anonymous)
> >     exceeded the 80 characters length limit and was truncated.
> >     2019-01-29 09:21:54,242 WARN  org.apache.flink.metrics.MetricGroup
>
> >                      - The operator name
> >
>  
> BigQueryIO.Write/BatchLoads/SinglePartitionsReshuffle/ExpandIterable/ParMultiDo(Anonymous)
> >     exceeded the 80 characters length limit and was truncated.
> >     2019-01-29 09:21:54,339 INFO
> >     
> > org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper
>
> >     - No restore state for UnbounedSourceWrapper.
> >     2019-01-29 09:21:54,371 INFO
> >     
> > org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper
>
> >     - No restore state for UnbounedSourceWrapper.
> >     2019-01-29 09:21:54,479 INFO
> >     
> > org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper
>
> >     - No restore state for UnbounedSourceWrapper.
> >     2019-01-29 09:21:55,509 INFO
> >     
> > org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper
>
> >     - Unbounded Flink Source 0/1 is reading from sources:
> >
>  
> [org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource$BoundedToUnboundedSourceAdapter@2a501a64
> ]
> >     2019-01-29 09:21:55,535 INFO
> >     
> > org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper
>
> >     - Unbounded Flink Source 0/1 is reading from sources:
> >
>  
> [org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource$BoundedToUnboundedSourceAdapter@60345813
> ]
> >     2019-01-29 09:21:55,770 INFO
> >     
> > org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper
>
> >     - Unbounded Flink Source 0/1 is reading from sources:
> >     [org.apache.beam.sdk.io.kafka.KafkaUnboundedSource@75aab48]
> >     2019-01-29 09:21:56,280 WARN
> >     org.apache.kafka.clients.consumer.ConsumerConfig              - The
> >     configuration 'metis.input.messages.config' was supplied but isn't a
> known
> >     config.
> >     2019-01-29 09:21:57,387 INFO
> >     org.apache.beam.sdk.io.gcp.bigquery.BatchLoads                -
> Writing
> >     BigQuery temporary files to
> >
>  
> gs://bucket/BigQueryWriteTemp/beam_load_ratingsflink01290921554a632112_85202e449b2d45729d384e3ac5f8cc2b/
> >     before loading them.
> >     2019-01-29 09:21:58,118 INFO
> >     org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers           -
> Waiting for
> >     jobs to complete.
> >     2019-01-29 09:21:58,118 INFO
> >     org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers           -
> Waiting for
> >     jobs to complete.
> >     2019-01-29 09:21:58,118 INFO
> >     org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers           -
> Waiting for
> >     jobs to complete.
> >     2019-01-29 09:21:58,140 INFO
> >     org.apache.beam.sdk.io.kafka.KafkaUnboundedSource             -
> Reader-0:
> >     reading from ratings-0 starting at offset 13112
> >     2019-01-29 09:21:58,141 INFO
> >     org.apache.beam.sdk.io.kafka.KafkaUnboundedSource             -
> Reader-0:
> >     reading from ratings-1 starting at offset 13407
> >     2019-01-29 09:21:58,142 INFO
> >     org.apache.beam.sdk.io.kafka.KafkaUnboundedSource             -
> Reader-0:
> >     reading from ratings-2 starting at offset 13034
> >     2019-01-29 09:21:58,142 INFO
> >     org.apache.beam.sdk.io.kafka.KafkaUnboundedSource             -
> Reader-0:
> >     reading from ratings-3 starting at offset 13271
> >     2019-01-29 09:21:58,142 INFO
> >     org.apache.beam.sdk.io.kafka.KafkaUnboundedSource             -
> Reader-0:
> >     reading from ratings-4 starting at offset 12813
> >     2019-01-29 09:21:58,142 INFO
> >     org.apache.beam.sdk.io.kafka.KafkaUnboundedSource             -
> Reader-0:
> >     reading from ratings-5 starting at offset 13211
> >     2019-01-29 09:21:58,144 INFO
> >     org.apache.beam.sdk.io.kafka.KafkaUnboundedSource             -
> Reader-0:
> >     reading from ratings-6 starting at offset 13394
> >     2019-01-29 09:21:58,145 INFO
> >     org.apache.beam.sdk.io.kafka.KafkaUnboundedSource             -
> Reader-0:
> >     reading from ratings-7 starting at offset 13194
> >     2019-01-29 09:21:58,145 INFO
> >     org.apache.beam.sdk.io.kafka.KafkaUnboundedSource             -
> Reader-0:
> >     reading from ratings-8 starting at offset 13478
> >     2019-01-29 09:21:58,145 INFO
> >     org.apache.beam.sdk.io.kafka.KafkaUnboundedSource             -
> Reader-0:
> >     reading from ratings-9 starting at offset 12966
> >
> >
> >     On Mon, Jan 28, 2019 at 3:36 PM Kaymak, Tobias <
> [email protected]
> >     <mailto:[email protected]>> wrote:
> >
> >         Hi Maximilian,
> >
> >         yes, I've set the --runner to FlinkRunner when launching the
> pipeline
> >         and it does work for a GCS sink, but it seems to be ignored for a
> >         BigQuery sink somehow. Even though it looks like the system
> magically
> >         handles it itself.
> >
> >         This is the full command line to launch the Beam 2.9.0 pipeline
> on Flink
> >         1.5.5:
> >
> >         bin/flink run -d -c di.beam.KafkaToBigQuery -j
> lib/beam_pipelines.jar
> >         --runner=FlinkRunner --appName=ratings
> --checkpointingMode=EXACTLY_ONCE
> >         --checkpointingInterval=300000 --parallelism=1
> >         --tempLocation=gs://somebucket
> >
> >         Here are the logs from the taskmanager, I can share the full
> code of the
> >         pipeline if you want:
> >
> >         2019-01-28 14:33:31,287 WARN
> org.apache.flink.metrics.MetricGroup
> >                              - The operator name
> >
>  
> BigQueryIO.Write/BatchLoads/View.AsSingleton/Combine.GloballyAsSingletonView/DropInputs/ParMultiDo(NoOp)
> >         exceeded the 80 characters length limit and was truncated.
> >         2019-01-28 14:33:31,911 WARN
> org.apache.flink.metrics.MetricGroup
> >                              - The operator name
> >
>  
> BigQueryIO.Write/BatchLoads/SinglePartitionWriteTables/ParDo(GarbageCollectTemporaryFiles)/ParMultiDo(GarbageCollectTemporaryFiles)
> >         exceeded the 80 characters length limit and was truncated.
> >         2019-01-28 14:33:31,976 WARN
> org.apache.flink.metrics.MetricGroup
> >                              - The operator name
> >
>  
> BigQueryIO.Write/BatchLoads/MultiPartitionsWriteTables/ParDo(GarbageCollectTemporaryFiles)/ParMultiDo(GarbageCollectTemporaryFiles)
> >         exceeded the 80 characters length limit and was truncated.
> >         2019-01-28 14:33:32,217 WARN
> org.apache.flink.metrics.MetricGroup
> >                              - The operator name
> >
>  
> BigQueryIO.Write/BatchLoads/SinglePartitionWriteTables/Values/Values/Map/ParMultiDo(Anonymous)
> >         exceeded the 80 characters length limit and was truncated.
> >         2019-01-28 14:33:32,227 WARN
> org.apache.flink.metrics.MetricGroup
> >                              - The operator name
> >
>  
> BigQueryIO.Write/BatchLoads/View.AsSingleton/Combine.GloballyAsSingletonView/Combine.globally(Singleton)/WithKeys/AddKeys/Map/ParMultiDo(Anonymous)
> >         exceeded the 80 characters length limit and was truncated.
> >         2019-01-28 14:33:32,228 WARN
> org.apache.flink.metrics.MetricGroup
> >                              - The operator name
> >
>  
> BigQueryIO.Write/BatchLoads/SinglePartitionsReshuffle/RestoreOriginalTimestamps/Reify.ExtractTimestampsFromValues/ParDo(Anonymous)/ParMultiDo(Anonymous)
> >         exceeded the 80 characters length limit and was truncated.
> >         2019-01-28 14:33:32,276 WARN
> org.apache.flink.metrics.MetricGroup
> >                              - The operator name
> >
>  
> BigQueryIO.Write/BatchLoads/MultiPartitionsWriteTables/Values/Values/Map/ParMultiDo(Anonymous)
> >         exceeded the 80 characters length limit and was truncated.
> >         2019-01-28 14:33:32,282 INFO
> >         
> > org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper
>
> >         - No restore state for UnbounedSourceWrapper.
> >         2019-01-28 14:33:32,288 INFO
> >         
> > org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper
>
> >         - Unbounded Flink Source 0/1 is reading from sources:
> >
>  
> [org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource$BoundedToUnboundedSourceAdapter@5fecdf95
> ]
> >         2019-01-28 14:33:32,296 INFO
> >         
> > org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper
>
> >         - No restore state for UnbounedSourceWrapper.
> >         2019-01-28 14:33:32,318 WARN
> org.apache.flink.metrics.MetricGroup
> >                              - The operator name
> >
>  
> BigQueryIO.Write/BatchLoads/TempFilePrefixView/Combine.GloballyAsSingletonView/View.CreatePCollectionView/Combine.globally(Concatenate)/WithKeys/AddKeys/Map/ParMultiDo(Anonymous)
> >         exceeded the 80 characters length limit and was truncated.
> >         2019-01-28 14:33:32,321 WARN
> org.apache.flink.metrics.MetricGroup
> >                              - The operator name
> >
>  
> BigQueryIO.Write/BatchLoads/SinglePartitionsReshuffle/RestoreOriginalTimestamps/ReifyTimestamps.RemoveWildcard/ParDo(Anonymous)/ParMultiDo(Anonymous)
> >         exceeded the 80 characters length limit and was truncated.
> >         2019-01-28 14:33:32,324 WARN
> org.apache.flink.metrics.MetricGroup
> >                              - The operator name
> >
>  
> BigQueryIO.Write/BatchLoads/TempFilePrefixView/Combine.GloballyAsSingletonView/View.CreatePCollectionView/Combine.globally(Concatenate)/Values/Values/Map/ParMultiDo(Anonymous)
> >         exceeded the 80 characters length limit and was truncated.
> >         2019-01-28 14:33:32,329 WARN
> org.apache.flink.metrics.MetricGroup
> >                              - The operator name
> >
>  
> BigQueryIO.Write/BatchLoads/TempFilePrefixView/Combine.GloballyAsSingletonView/View.CreatePCollectionView/Combine.globally(Concatenate)/Combine.perKey(Concatenate)
> >         exceeded the 80 characters length limit and was truncated.
> >         2019-01-28 14:33:32,357 WARN
> org.apache.flink.metrics.MetricGroup
> >                              - The operator name
> >
>  
> BigQueryIO.Write/BatchLoads/TempFilePrefixView/Combine.GloballyAsSingletonView/View.VoidKeyToMultimapMaterialization/ParDo(VoidKeyToMultimapMaterialization)/ParMultiDo(VoidKeyToMultimapMaterialization)
> >         exceeded the 80 characters length limit and was truncated.
> >         2019-01-28 14:33:32,482 WARN
> org.apache.flink.metrics.MetricGroup
> >                              - The operator name
> >
>  
> BigQueryIO.Write/BatchLoads/SinglePartitionsReshuffle/ExpandIterable/ParMultiDo(Anonymous)
> >         exceeded the 80 characters length limit and was truncated.
> >         2019-01-28 14:33:32,483 WARN
> org.apache.flink.metrics.MetricGroup
> >                              - The operator name
> >
>  
> BigQueryIO.Write/BatchLoads/TempFilePrefixView/Combine.GloballyAsSingletonView/Combine.globally(Singleton)/Values/Values/Map/ParMultiDo(Anonymous)
> >         exceeded the 80 characters length limit and was truncated.
> >         2019-01-28 14:33:32,493 WARN
> org.apache.flink.metrics.MetricGroup
> >                              - The operator name
> >
>  
> BigQueryIO.Write/BatchLoads/TempFilePrefixView/Combine.GloballyAsSingletonView/Combine.globally(Singleton)/Combine.perKey(Singleton)
> >         exceeded the 80 characters length limit and was truncated.
> >         2019-01-28 14:33:32,697 WARN
> org.apache.flink.metrics.MetricGroup
> >                              - The operator name
> >
>  
> BigQueryIO.Write/BatchLoads/WriteGroupedRecords/ParMultiDo(WriteGroupedRecordsToFiles)
> >         exceeded the 80 characters length limit and was truncated.
> >         2019-01-28 14:33:32,782 WARN
> org.apache.flink.metrics.MetricGroup
> >                              - The operator name
> >
>  
> BigQueryIO.Write/BatchLoads/View.AsSingleton/Combine.GloballyAsSingletonView/View.CreatePCollectionView/Combine.globally(Concatenate)/Values/Values/Map/ParMultiDo(Anonymous)
> >         exceeded the 80 characters length limit and was truncated.
> >         2019-01-28 14:33:32,789 WARN
> org.apache.flink.metrics.MetricGroup
> >                              - The operator name
> >
>  
> BigQueryIO.Write/BatchLoads/View.AsSingleton/Combine.GloballyAsSingletonView/View.CreatePCollectionView/Combine.globally(Concatenate)/Combine.perKey(Concatenate)
> >         exceeded the 80 characters length limit and was truncated.
> >         2019-01-28 14:33:33,093 WARN
> org.apache.flink.metrics.MetricGroup
> >                              - The operator name
> >
>  
> BigQueryIO.Write/BatchLoads/View.AsSingleton/Combine.GloballyAsSingletonView/View.CreatePCollectionView/Combine.globally(Concatenate)/WithKeys/AddKeys/Map/ParMultiDo(Anonymous)
> >         exceeded the 80 characters length limit and was truncated.
> >         2019-01-28 14:33:33,122 WARN
> org.apache.flink.metrics.MetricGroup
> >                              - The operator name
> >
>  
> BigQueryIO.Write/BatchLoads/View.AsSingleton/Combine.GloballyAsSingletonView/View.VoidKeyToMultimapMaterialization/ParDo(VoidKeyToMultimapMaterialization)/ParMultiDo(VoidKeyToMultimapMaterialization)
> >         exceeded the 80 characters length limit and was truncated.
> >         2019-01-28 14:33:33,162 INFO
> >         
> > org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper
>
> >         - No restore state for UnbounedSourceWrapper.
> >         2019-01-28 14:33:33,179 WARN
> org.apache.flink.metrics.MetricGroup
> >                              - The operator name
> >
>  
> BigQueryIO.Write/BatchLoads/MultiPartitionsReshuffle/RestoreOriginalTimestamps/Reify.ExtractTimestampsFromValues/ParDo(Anonymous)/ParMultiDo(Anonymous)
> >         exceeded the 80 characters length limit and was truncated.
> >         2019-01-28 14:33:33,187 WARN
> org.apache.flink.metrics.MetricGroup
> >                              - The operator name
> >
>  
> BigQueryIO.Write/BatchLoads/MultiPartitionsReshuffle/RestoreOriginalTimestamps/ReifyTimestamps.RemoveWildcard/ParDo(Anonymous)/ParMultiDo(Anonymous)
> >         exceeded the 80 characters length limit and was truncated.
> >         2019-01-28 14:33:33,192 WARN
> org.apache.flink.metrics.MetricGroup
> >                              - The operator name
> >
>  
> BigQueryIO.Write/BatchLoads/MultiPartitionsReshuffle/ExpandIterable/ParMultiDo(Anonymous)
> >         exceeded the 80 characters length limit and was truncated.
> >         2019-01-28 14:33:33,218 WARN
> org.apache.flink.metrics.MetricGroup
> >                              - The operator name
> >
>  
> BigQueryIO.Write/BatchLoads/View.AsSingleton/Combine.GloballyAsSingletonView/Combine.globally(Singleton)/Values/Values/Map/ParMultiDo(Anonymous)
> >         exceeded the 80 characters length limit and was truncated.
> >         2019-01-28 14:33:33,220 WARN
> org.apache.flink.metrics.MetricGroup
> >                              - The operator name
> >
>  
> BigQueryIO.Write/BatchLoads/View.AsSingleton/Combine.GloballyAsSingletonView/Combine.globally(Singleton)/Combine.perKey(Singleton)
> >         exceeded the 80 characters length limit and was truncated.
> >         2019-01-28 14:33:33,298 WARN
> org.apache.flink.metrics.MetricGroup
> >                              - The operator name
> >
>  
> BigQueryIO.Write/BatchLoads/SinglePartitionWriteTables/Window.Into()/Window.Assign.out
> >         exceeded the 80 characters length limit and was truncated.
> >         2019-01-28 14:33:33,304 WARN
> org.apache.flink.metrics.MetricGroup
> >                              - The operator name
> >
>  
> BigQueryIO.Write/BatchLoads/MultiPartitionsWriteTables/Window.Into()/Window.Assign.out
> >         exceeded the 80 characters length limit and was truncated.
> >         2019-01-28 14:33:33,323 WARN
> org.apache.flink.metrics.MetricGroup
> >                              - The operator name
> >
>  
> BigQueryIO.Write/BatchLoads/MultiPartitionsWriteTables/WithKeys/AddKeys/Map/ParMultiDo(Anonymous)
> >         exceeded the 80 characters length limit and was truncated.
> >         2019-01-28 14:33:33,326 WARN
> org.apache.flink.metrics.MetricGroup
> >                              - The operator name
> >
>  
> BigQueryIO.Write/BatchLoads/MultiPartitionsReshuffle/ReifyOriginalTimestamps/ParDo(Anonymous)/ParMultiDo(Anonymous)
> >         exceeded the 80 characters length limit and was truncated.
> >         2019-01-28 14:33:33,357 WARN
> org.apache.flink.metrics.MetricGroup
> >                              - The operator name
> >
>  
> BigQueryIO.Write/BatchLoads/SinglePartitionWriteTables/WithKeys/AddKeys/Map/ParMultiDo(Anonymous)
> >         exceeded the 80 characters length limit and was truncated.
> >         2019-01-28 14:33:33,377 WARN
> org.apache.flink.metrics.MetricGroup
> >                              - The operator name
> >
>  
> BigQueryIO.Write/BatchLoads/MultiPartitionsReshuffle/Window.Into()/Window.Assign.out
> >         exceeded the 80 characters length limit and was truncated.
> >         2019-01-28 14:33:33,395 WARN
> org.apache.flink.metrics.MetricGroup
> >                              - The operator name
> >
>  
> BigQueryIO.Write/BatchLoads/TempFilePrefixView/Combine.GloballyAsSingletonView/Combine.globally(Singleton)/WithKeys/AddKeys/Map/ParMultiDo(Anonymous)
> >         exceeded the 80 characters length limit and was truncated.
> >         2019-01-28 14:33:33,477 WARN
> org.apache.flink.metrics.MetricGroup
> >                              - The operator name
> >
>  
> BigQueryIO.Write/BatchLoads/SinglePartitionsReshuffle/ReifyOriginalTimestamps/ParDo(Anonymous)/ParMultiDo(Anonymous)
> >         exceeded the 80 characters length limit and was truncated.
> >         2019-01-28 14:33:33,487 WARN
> org.apache.flink.metrics.MetricGroup
> >                              - The operator name
> >
>  
> BigQueryIO.Write/BatchLoads/SinglePartitionsReshuffle/Window.Into()/Window.Assign.out
> >         exceeded the 80 characters length limit and was truncated.
> >         2019-01-28 14:33:33,748 INFO
> >         
> > org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper
>
> >         - No restore state for UnbounedSourceWrapper.
> >         2019-01-28 14:33:34,577 INFO
> >         
> > org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper
>
> >         - Unbounded Flink Source 0/1 is reading from sources:
> >
>  
> [org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource$BoundedToUnboundedSourceAdapter@1892a35d
> ]
> >         2019-01-28 14:33:34,610 INFO
> >         
> > org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper
>
> >         - Unbounded Flink Source 0/1 is reading from sources:
> >
>  
> [org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource$BoundedToUnboundedSourceAdapter@7adcb05b
> ]
> >         2019-01-28 14:33:34,747 INFO
> >         
> > org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper
>
> >         - Unbounded Flink Source 0/1 is reading from sources:
> >         [org.apache.beam.sdk.io.kafka.KafkaUnboundedSource@71389814]
> >         2019-01-28 14:33:34,896 WARN
> >         org.apache.kafka.clients.consumer.ConsumerConfig              -
> The
> >         configuration 'metis.input.messages.config' was supplied but
> isn't a
> >         known config.
> >         2019-01-28 14:33:35,462 INFO
> >         org.apache.beam.sdk.io.gcp.bigquery.BatchLoads                -
> Writing
> >         BigQuery temporary files to
> >
>  
> gs://tempspace_eu_regional/dataflow_dev/BigQueryWriteTemp/beam_load_ratingsflink01281433342cd8f9b4_7ce229196b1d41e3ad63d0ef6234e0f6/
> >         before loading them.
> >         2019-01-28 14:33:35,544 INFO
> >         org.apache.beam.sdk.io.kafka.KafkaUnboundedSource             -
> >         Reader-0: reading from ratings-0 starting at offset 2945
> >         2019-01-28 14:33:35,544 INFO
> >         org.apache.beam.sdk.io.kafka.KafkaUnboundedSource             -
> >         Reader-0: reading from ratings-1 starting at offset 3101
> >         2019-01-28 14:33:35,544 INFO
> >         org.apache.beam.sdk.io.kafka.KafkaUnboundedSource             -
> >         Reader-0: reading from ratings-2 starting at offset 3031
> >         2019-01-28 14:33:35,545 INFO
> >         org.apache.beam.sdk.io.kafka.KafkaUnboundedSource             -
> >         Reader-0: reading from ratings-3 starting at offset 3009
> >         2019-01-28 14:33:35,545 INFO
> >         org.apache.beam.sdk.io.kafka.KafkaUnboundedSource             -
> >         Reader-0: reading from ratings-4 starting at offset 2903
> >         2019-01-28 14:33:35,545 INFO
> >         org.apache.beam.sdk.io.kafka.KafkaUnboundedSource             -
> >         Reader-0: reading from ratings-5 starting at offset 3068
> >         2019-01-28 14:33:35,545 INFO
> >         org.apache.beam.sdk.io.kafka.KafkaUnboundedSource             -
> >         Reader-0: reading from ratings-6 starting at offset 3160
> >         2019-01-28 14:33:35,545 INFO
> >         org.apache.beam.sdk.io.kafka.KafkaUnboundedSource             -
> >         Reader-0: reading from ratings-7 starting at offset 3014
> >         2019-01-28 14:33:35,546 INFO
> >         org.apache.beam.sdk.io.kafka.KafkaUnboundedSource             -
> >         Reader-0: reading from ratings-8 starting at offset 3096
> >         2019-01-28 14:33:35,546 INFO
> >         org.apache.beam.sdk.io.kafka.KafkaUnboundedSource             -
> >         Reader-0: reading from ratings-9 starting at offset 2885
> >         2019-01-28 14:33:35,577 WARN
> >         org.apache.kafka.clients.consumer.ConsumerConfig              -
> The
> >         configuration 'metis.input.messages.config' was supplied but
> isn't a
> >         known config.
> >         2019-01-28 14:33:35,801 INFO
> >         org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers           -
> Waiting
> >         for jobs to complete.
> >         2019-01-28 14:33:35,803 INFO
> >         org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers           -
> Waiting
> >         for jobs to complete.
> >         2019-01-28 14:33:35,801 INFO
> >         org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers           -
> Waiting
> >         for jobs to complete.
> >         2019-01-28 14:33:36,217 INFO
> >         org.apache.beam.sdk.io.kafka.KafkaUnboundedSource             -
> >         Reader-0: first record offset 3014
> >
> >
> >         Best,
> >         Tobi
> >
> >
> >         On Mon, Jan 28, 2019 at 11:52 AM Maximilian Michels <
> [email protected]
> >         <mailto:[email protected]>> wrote:
> >
> >             Hi Tobias,
> >
> >             Checkpointing should be enabled when you set it in the Flink
> config
> >             or via the
> >             Beam option `checkpointingInterval`. Did you set `runner` to
> >             `FlinkRunner`?
> >
> >             If possible, could you share parts of the Flink logs?
> >
> >             Thanks,
> >             Max
> >
> >             On 25.01.19 15:14, Kaymak, Tobias wrote:
> >              > Hi,
> >              >
> >              > I am trying to migrate my existing KafkaToGCS pipeline to
> a
> >             KafkaToBigQuery
> >              > pipeline to skip the loading step from GCS which is
> currently
> >             handled externally
> >              > from Beam.
> >              >
> >              > I noticed that the pipeline, written in Beam 2.9.0 (Java)
> does
> >             not trigger any
> >              > checkpoint on Flink (1.5.5), even though its configured
> to do so
> >             when I launch
> >              > it. Is this normal? How does Beam then guarantee exactly
> once
> >             when there are no
> >              > checkpoints in Flink? (It seems to start from scratch
> when it
> >             crashes, during my
> >              > tests, but I am not 100% sure)
> >              >
> >              >
> >              > This is my pipeline:
> >              >
> >              >   pipeline
> >              >          .apply(
> >              >              KafkaIO.<String, String>read()
> >              >                  .withBootstrapServers(bootstrap)
> >              >                  .withTopics(topics)
> >              >
> .withKeyDeserializer(StringDeserializer.class)
> >              >
> >             .withValueDeserializer(ConfigurableDeserializer.class)
> >              >                  .updateConsumerProperties(
> >              >
> >             ImmutableMap.of(InputMessagesConfig.CONFIG_PROPERTY_NAME,
> >              > inputMessagesConfig))
> >              >
> >
>  .updateConsumerProperties(ImmutableMap.of("auto.offset.reset",
> >              > "earliest"))
> >              >
> >             .updateConsumerProperties(ImmutableMap.of("group.id <
> http://group.id>
> >              > <http://group.id>", groupId))
> >              >
> >
>  .updateConsumerProperties(ImmutableMap.of("enable.auto.commit",
> >              > "true"))
> >              >                  .withReadCommitted()
> >              >                  .withTimestampPolicyFactory(withEventTs)
> >              >                  .commitOffsetsInFinalize())
> >              >          .apply(ParDo.of(new ToEventFn()))
> >              >          .apply(
> >              >              Window.into(new
> ZurichTimePartitioningWindowFn())
> >              >
> >              >                  .triggering(
> >              >                      Repeatedly.forever(
> >              >                          AfterFirst.of(
> >              >
> >             AfterPane.elementCountAtLeast(bundleSize),
> >              >
> >             AfterProcessingTime.pastFirstElementInPane()
> >              >
> .plusDelayOf(refreshFrequency))))
> >              >
> .withAllowedLateness(Duration.standardDays(14))
> >              >                  .discardingFiredPanes())
> >              >          .apply(
> >              >              BigQueryIO.<Event>write()
> >              >
> .withMethod(BigQueryIO.Write.Method.FILE_LOADS)
> >              >
> .withTriggeringFrequency(refreshFrequency)
> >              >                  .withNumFileShards(1)
> >              >                  .to(partitionedTableDynamicDestinations)
> >              >                  .withFormatFunction(
> >              >                      (SerializableFunction<Event,
> TableRow>)
> >              >
> KafkaToBigQuery::convertUserEventToTableRow)
> >              >
> >              >
> >
>  .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
> >              >
> >              >
> >
>  .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
> >              >
> >              >      pipeline.run().waitUntilFinish();
> >              > It's launched like the other (GCS) one via:
> >              >
> >              > ...--checkpointingMode=EXACTLY_ONCE
> --checkpointingInterval=300000
> >              > --parallelism=1 --tempLocation=gs://foo..
> >              >
> >              > Any idea why checkpointing does not work here?
> >              >
> >              > Best,
> >              > Tobias
> >
> >
> >
> >         --
> >         Tobias Kaymak
> >         Data Engineer
> >
> >         [email protected] <mailto:[email protected]>
> >         www.ricardo.ch <http://www.ricardo.ch/>
> >
> >
> >
> >     --
> >     Tobias Kaymak
> >     Data Engineer
> >
> >     [email protected] <mailto:[email protected]>
> >     www.ricardo.ch <http://www.ricardo.ch/>
> >
> >
> >
> > --
> > Tobias Kaymak
> > Data Engineer
> >
> > [email protected] <mailto:[email protected]>
> > www.ricardo.ch <http://www.ricardo.ch/>
> >
>

Reply via email to