Custom metrics in Stateful Functions

2021-04-27 Thread Cliff Resnick
We think Embedded Statefun is a nicer fit than Datastream for some problem
domains, but one thing we miss is support for custom metrics/counters. Is
there a way to access the Flink support? It looks like if we want custom
metrics we'll need to roll our own.


KeyedStream and chained forward operators

2020-04-21 Thread Cliff Resnick
I'm running a massive file sifting by timestamp DataSteam job from s3.

The basic job is:
FileMonitor -> ContinuousFileReader -> MultipleFileOutputSink

The MultipleFileOutputSink sifts data based on timestamp to date-hour
directories

It's a lot of data, so I'm using high parallelism, but I want to maintain
reasonable output file size, so if I key post-ContinuousFileReader by
5-minute timestamp keys I get the desired result of large files at the cost
of a network shuffle.

But since I also have timestamps on the input files I figured I could push
back the keyed stream to FileMonitor -> ContinuousFileReader and save the
network shuffle. I tested this and confirmed that it sort of worked and
ContinuousFileReaders are receiving properly partitioned input, but output
post reader is now rebalanced and sinks produce lots of tiny files.

The code is below. Am I missing something?

val source = env
  .addSource(fileMonitor)
  .name(s"Bucketed Log Source File Watcher: $path")
  .keyBy(new KeySelector[TimestampedFileInputSplit, Long]() {
override def getKey(split: TimestampedFileInputSplit): Long = {
  val name = split.getPath.getName
  val r= """(\d+)\.log""".r
  r.findFirstMatchIn(name) match {
case Some(m) ⇒ {
  val t = m.group(1).toLong
  t - (t % 300)
}
case _ ⇒ -1
  }
}
  })
  .transform[String]("Bucketed Log Source File Reader", fileReader)
  .forward
  .assignTimestampsAndWatermarks(WatermarkExtractor[String])
  .forward
  .addSink(SourceTrackingSink(Sift.outputBucket, BidDateFunc))


post-checkpoint watermark out of sync with event stream?

2020-04-14 Thread Cliff Resnick
We have an event-time pipeline that uses a ProcessFunction to accept events
with an allowed lateness of a number of days. We a
BoundedOutOfOrdernessTimestampExtractor and our event stream has a long
tail that occasionally exceeds our allowed lateness, in which case we drop
the events.

The logic is simple:
1. OnElement, we compare the (element's event time + allowed lateness)
against the current watermark
2. If element is within time bound bounds we register a timer for
(element's event time + allowed lateness). We call this "endTime".
3. during the above window we collect and assimilate all the data for the
key and regularly UPSERT the data to a data store.
4. OnTimer for above "endTime" we clear state for the key.

The above has worked well for the past 1-2 years. Last week, however, we
had a bug that introduced DEBUG logging to the job config, and this caused
several failure/restarts (S3 DEBUG logging is extremely verbose!). Within a
day or two, our monitoring system restarted the pipeline several times,
sometimes from a Savepoint over an hour or two old. For some reason during
this period we noticed that some few long tail data that should have been
dropped made it into our data store. These data did not contain assimilated
Flink state, meaning they passed through after the endTime key purge (4.)
and ended up compromising the data store by replacing assimilated  with
tail-end values.

I'm wondering how this could be possible. The only explanation I can think
of is:

4. on "endTime" timer key state is purged.
5 --- job fail ---
6.  job restarted on 2.5 hour old Savepoint
7.  watermark regresses (?) from "endTime" watermark.
8. a long tail event squeaks through under temporarily backdated watermark
9. data store data for key is replaced with long tail data,

Is the above possible, or perhaps there is another possible scenario? Any
opinions appreciated!

-Cliff


Re: GC overhead limit exceeded, memory full of DeleteOnExit hooks for S3a files

2020-01-27 Thread Cliff Resnick
I know from experience that Flink's shaded S3A FileSystem does not
reference core-site.xml, though I don't remember offhand what file (s) it
does reference. However since it's shaded, maybe this could be fixed by
building a Flink FS referencing 3.3.0? Last I checked I think it referenced
3.1.0.

On Mon, Jan 27, 2020, 8:48 AM David Magalhães  wrote:

> Does StreamingFileSink use core-site.xml ? When I was using it, it didn't
> load any configurations from core-site.xml.
>
> On Mon, Jan 27, 2020 at 12:08 PM Mark Harris 
> wrote:
>
>> Hi Piotr,
>>
>> Thanks for the link to the issue.
>>
>> Do you know if there's a workaround? I've tried setting the following in
>> my core-site.xml:
>>
>> ​fs.s3a.fast.upload.buffer=true
>>
>> To try and avoid writing the buffer files, but the taskmanager breaks
>> with the same problem.
>>
>> Best regards,
>>
>> Mark
>> --
>> *From:* Piotr Nowojski  on behalf of Piotr
>> Nowojski 
>> *Sent:* 22 January 2020 13:29
>> *To:* Till Rohrmann 
>> *Cc:* Mark Harris ; flink-u...@apache.org <
>> flink-u...@apache.org>; kkloudas 
>> *Subject:* Re: GC overhead limit exceeded, memory full of DeleteOnExit
>> hooks for S3a files
>>
>> Hi,
>>
>> This is probably a known issue of Hadoop [1]. Unfortunately it was only
>> fixed in 3.3.0.
>>
>> Piotrek
>>
>> [1] https://issues.apache.org/jira/browse/HADOOP-15658
>>
>> On 22 Jan 2020, at 13:56, Till Rohrmann  wrote:
>>
>> Thanks for reporting this issue Mark. I'm pulling Klou into this
>> conversation who knows more about the StreamingFileSink. @Klou does the
>> StreamingFileSink relies on DeleteOnExitHooks to clean up files?
>>
>> Cheers,
>> Till
>>
>> On Tue, Jan 21, 2020 at 3:38 PM Mark Harris 
>> wrote:
>>
>> Hi,
>>
>> We're using flink 1.7.2 on an EMR cluster v emr-5.22.0, which runs hadoop
>> v "Amazon 2.8.5". We've recently noticed that some TaskManagers fail
>> (causing all the jobs running on them to fail) with an
>> "java.lang.OutOfMemoryError: GC overhead limit exceeded”. The taskmanager
>> (and jobs that should be running on it) remain down until manually
>> restarted.
>>
>> I managed to take and analyze a memory dump from one of the afflicted
>> taskmanagers.
>>
>> It showed that 85% of the heap was made up of
>> the java.io.DeleteOnExitHook.files hashset. The majority of the strings in
>> that hashset (9041060 out of ~9041100) pointed to files that began
>> /tmp/hadoop-yarn/s3a/s3ablock
>>
>> The problem seems to affect jobs that make use of the StreamingFileSink
>> - all of the taskmanager crashes have been on the taskmaster running at
>> least one job using this sink, and a cluster running only a single
>> taskmanager / job that uses the StreamingFileSink crashed with the GC
>> overhead limit exceeded error.
>>
>> I've had a look for advice on handling this error more broadly without
>> luck.
>>
>> Any suggestions or advice gratefully received.
>>
>> Best regards,
>>
>> Mark Harris
>>
>>
>>
>> The information contained in or attached to this email is intended only
>> for the use of the individual or entity to which it is addressed. If you
>> are not the intended recipient, or a person responsible for delivering it
>> to the intended recipient, you are not authorised to and must not disclose,
>> copy, distribute, or retain this message or any part of it. It may contain
>> information which is confidential and/or covered by legal professional or
>> other privilege under applicable law.
>>
>> The views expressed in this email are not necessarily the views of
>> Centrica plc or its subsidiaries, and the company, its directors, officers
>> or employees make no representation or accept any liability for its
>> accuracy or completeness unless expressly stated to the contrary.
>>
>> Additional regulatory disclosures may be found here:
>> https://www.centrica.com/privacy-cookies-and-legal-disclaimer#email
>>
>> PH Jones is a trading name of British Gas Social Housing Limited. British
>> Gas Social Housing Limited (company no: 01026007), British Gas Trading
>> Limited (company no: 03078711), British Gas Services Limited (company no:
>> 3141243), British Gas Insurance Limited (company no: 06608316), British Gas
>> New Heating Limited (company no: 06723244), British Gas Services
>> (Commercial) Limited (company no: 07385984) and Centrica Energy (Trading)
>> Limited (company no: 02877397) are all wholly owned subsidiaries of
>> Centrica plc (company no: 3033654). Each company is registered in England
>> and Wales with a registered office at Millstream, Maidenhead Road, Windsor,
>> Berkshire SL4 5GD.
>>
>> British Gas Insurance Limited is authorised by the Prudential Regulation
>> Authority and regulated by the Financial Conduct Authority and the
>> Prudential Regulation Authority. British Gas Services Limited and Centrica
>> Energy (Trading) Limited are authorised and regulated by the Financial
>> Conduct Authority. British Gas Trading Limited is an appointed
>> representative of British Gas Services Limited which 

Re: State Migration with RocksDB MapState

2019-04-25 Thread Cliff Resnick
Great news! Thanks

On Thu, Apr 25, 2019, 2:59 AM Tzu-Li (Gordon) Tai 
wrote:

> Hi Cliff,
>
> Thanks for bringing this up again.
>
> I think it would make sense to at least move this forward be only
> exclusively checking the schema for user keys in MapState, and allow value
> schema evolution.
> I'll comment on the JIRA about this, and also make it a blocker for 1.9.0
> to make sure it will be resolved by then.
>
> Concerning your question on GenericRecord:
> The actual schema resolution is still performed using the Avro schema, so
> you will still bump into the same issue.
>
> Best,
> Gordon
>
> On Wed, Apr 24, 2019 at 7:45 PM Cliff Resnick  wrote:
>
>> Hi Gordon,
>>
>> I noticed there has been no movement on this issue and I'm wondering if I
>> can find some way to work around this.
>> My MapState value is a case class container of Avro-generated
>> SpecificRecords. If one SpecificRecord changes I am stuck.
>>
>> From the issue It seems like the blocker is around evolving the MapState
>> key type.  That may be a nasty problem, but my key type is stable and will
>> never change. What do you think the level of difficulty would be to add
>> support for evolving only the value?
>>
>> Also, if I use GenericRecord instead of SpecificRecord will the need for
>> schema evolution still be triggered? Or does it actually go down to the
>> avro schema rather than just the class serialVersionUID?
>>
>>
>>
>>
>>
>>
>> On Mon, Mar 18, 2019 at 1:10 AM Tzu-Li (Gordon) Tai 
>> wrote:
>>
>>> Hi Cliff,
>>>
>>> Thanks for bringing this up!
>>> AFAIK, this wouldn't be a long-term blocker. I've just opened a JIRA to
>>> track this [1].
>>>
>>> As explained in the JIRA ticket, the main reason this is disallowed in
>>> the initial support for state schema evolution was due to how migration was
>>> implemented in the RocksDB state backend.
>>> Technically speaking, enabling this in the future is definitely possible.
>>>
>>> Cheers,
>>> Gordon
>>>
>>> [1]  https://issues.apache.org/jira/browse/FLINK-11947
>>>
>>> On Mon, Mar 18, 2019 at 11:20 AM Cliff Resnick  wrote:
>>>
>>>> After trying out state migration in 1.8 rc2 I ran into this hard stop
>>>> below. The comment does not give an indication why rocksdb map state cannot
>>>> be migrated, and I'm wondering what the status is, since we do need this
>>>> functionality and would like to know if this is a long-term blocker or not.
>>>> Anyone know?
>>>>
>>>>
>>>> https://github.com/apache/flink/blob/953a5ffcbdae4115f7d525f310723cf8770779df/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java#L542
>>>>
>>>


Re: State Migration with RocksDB MapState

2019-04-24 Thread Cliff Resnick
Hi Gordon,

I noticed there has been no movement on this issue and I'm wondering if I
can find some way to work around this.
My MapState value is a case class container of Avro-generated
SpecificRecords. If one SpecificRecord changes I am stuck.

>From the issue It seems like the blocker is around evolving the MapState
key type.  That may be a nasty problem, but my key type is stable and will
never change. What do you think the level of difficulty would be to add
support for evolving only the value?

Also, if I use GenericRecord instead of SpecificRecord will the need for
schema evolution still be triggered? Or does it actually go down to the
avro schema rather than just the class serialVersionUID?






On Mon, Mar 18, 2019 at 1:10 AM Tzu-Li (Gordon) Tai 
wrote:

> Hi Cliff,
>
> Thanks for bringing this up!
> AFAIK, this wouldn't be a long-term blocker. I've just opened a JIRA to
> track this [1].
>
> As explained in the JIRA ticket, the main reason this is disallowed in the
> initial support for state schema evolution was due to how migration was
> implemented in the RocksDB state backend.
> Technically speaking, enabling this in the future is definitely possible.
>
> Cheers,
> Gordon
>
> [1]  https://issues.apache.org/jira/browse/FLINK-11947
>
> On Mon, Mar 18, 2019 at 11:20 AM Cliff Resnick  wrote:
>
>> After trying out state migration in 1.8 rc2 I ran into this hard stop
>> below. The comment does not give an indication why rocksdb map state cannot
>> be migrated, and I'm wondering what the status is, since we do need this
>> functionality and would like to know if this is a long-term blocker or not.
>> Anyone know?
>>
>>
>> https://github.com/apache/flink/blob/953a5ffcbdae4115f7d525f310723cf8770779df/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java#L542
>>
>


State Migration with RocksDB MapState

2019-03-17 Thread Cliff Resnick
After trying out state migration in 1.8 rc2 I ran into this hard stop
below. The comment does not give an indication why rocksdb map state cannot
be migrated, and I'm wondering what the status is, since we do need this
functionality and would like to know if this is a long-term blocker or not.
Anyone know?

https://github.com/apache/flink/blob/953a5ffcbdae4115f7d525f310723cf8770779df/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java#L542


how to use Hadoop Inputformats with flink shaded s3?

2019-01-31 Thread Cliff Resnick
I need to process some Parquet data from S3 as a unioned input in my
DataStream pipeline. From what I know, this requires using the
hadoop AvroParquetInputFormat.  The problem I'm running into is that also
requires using un-shaded hadoop classes that conflict with the Flink shaded
hadoop3 FileSystem. The pipleline otherwise runs fine with the shaded fs.

Can anyone successfully read parquet data using the Flink shaded s3 fs? If
so can you please clue me in?


Re: Problem building 1.7.1 with scala-2.12

2019-01-03 Thread Cliff Resnick
Thanks, that works. I was passing -Pscala-2.12 (for Profile).

On Thu, Jan 3, 2019 at 4:45 AM Chesnay Schepler  wrote:

> When building Flink for scala 2.12 you have to pass "-Dscala-2.12" to
> maven; see the flink-connector-kafka-0.9 pom for details. (look for the
> scala-2.11 profile)
>
> On 02.01.2019 17:48, Cliff Resnick wrote:
> > The build fails at flink-connector-kafka-0.9 because _2.12 libraries
> > apparently do not exist for kafka < 0.10. Any help appreciated!
> >
> >
> > -Cliff
>
>
>


Problem building 1.7.1 with scala-2.12

2019-01-02 Thread Cliff Resnick
The build fails at flink-connector-kafka-0.9 because _2.12 libraries
apparently do not exist for kafka < 0.10. Any help appreciated!


-Cliff


Re: Task Manager allocation issue when upgrading 1.6.0 to 1.6.2

2018-11-12 Thread Cliff Resnick
Hi Till,

Yes, it turns out the problem was
having flink-queryable-state-runtime_2.11-1.6.2.jar in flink/lib. I guess
Queriable State bootstraps itself and, in my situation, it brought the task
manager down when it found no available ports. What's a little troubling is
that I had not configured Queriable State at all, so I would not expect it
to get in the way. I haven't looked further into it but I think that if
Queriable State wants to enable itself then it should at worst take an
unused port by default, especially since many folks will be running in
shared environments like YARN.

But anyway, thanks for that! I'm now up with 1.6.2.

Cliff

On Mon, Nov 12, 2018 at 6:04 AM Till Rohrmann  wrote:

> Hi Cliff,
>
> the TaskManger fail to start with exit code 31 which indicates an
> initialization error on startup. If you check the TaskManager logs via
> `yarn logs -applicationId ` you should see the problem why the TMs
> don't start up.
>
> Cheers,
> Till
>
> On Fri, Nov 9, 2018 at 8:32 PM Cliff Resnick  wrote:
>
>> Hi Till,
>>
>> Here are Job Manager logs, same job in both 1.6.0 and 1.6.2 at DEBUG
>> level. I saw several errors in 1.6.2, hope it's informative!
>>
>> Cliff
>>
>> On Fri, Nov 9, 2018 at 8:34 AM Till Rohrmann 
>> wrote:
>>
>>> Hi Cliff,
>>>
>>> this sounds not right. Could you share the logs of the Yarn cluster
>>> entrypoint with the community for further debugging? Ideally on DEBUG
>>> level. The Yarn logs would also be helpful to fully understand the problem.
>>> Thanks a lot!
>>>
>>> Cheers,
>>> Till
>>>
>>> On Thu, Nov 8, 2018 at 9:59 PM Cliff Resnick  wrote:
>>>
>>>> I'm running a YARN cluster of 8 * 4 core instances = 32 cores, with a
>>>> configuration of 3 slots per TM. The cluster is dedicated to a single job
>>>> that runs at full capacity in "FLIP6" mode. So in this cluster, the
>>>> parallelism is 21 (7 TMs * 3, one container dedicated for Job Manager).
>>>>
>>>> When I run the job in 1.6.0, seven Task Managers are spun up as
>>>> expected. But if I run with 1.6.2 only four Task Managers spin up and the
>>>> job hangs waiting for more resources.
>>>>
>>>> Our Flink distribution is set up by script after building from source.
>>>> So aside from flink jars, both 1.6.0 and 1.6.2 directories are identical.
>>>> The job is the same, restarting from savepoint. The problem is repeatable.
>>>>
>>>> Has something changed in 1.6.2, and if so can it be remedied with a
>>>> config change?
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>


Re: Flink Web UI does not show specific exception messages when job submission fails

2018-11-09 Thread Cliff Resnick
+1!

On Fri, Nov 9, 2018 at 1:34 PM Gary Yao  wrote:

> Hi,
>
> We only propagate the exception message but not the complete stacktrace
> [1].
> Can you create a ticket for that?
>
> Best,
> Gary
>
> [1]
> https://github.com/apache/flink/blob/091cff3299aed4bb143619324f6ec8165348d3ae/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java#L93
>
> On Tue, Nov 6, 2018 at 6:50 PM Luis Gustavo Oliveira Silva <
> l...@poli.ufrj.br> wrote:
>
>> Hello,
>>
>> I was using Flink 1.4.2 and when submiting jobs through the Web UI, I
>> could see exceptions that would help me debug jobs, such as:
>>
>> We're sorry, something went wrong. The server responded with:
>>>
>>> java.util.concurrent.CompletionException: 
>>> org.apache.flink.util.FlinkException: Could not run the jar.
>>> at 
>>> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleJsonRequest$0(JarRunHandler.java:90)
>>> at java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown 
>>> Source)
>>> at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
>>> at java.util.concurrent.FutureTask.run(Unknown Source)
>>> at 
>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(Unknown
>>>  Source)
>>> at 
>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown
>>>  Source)
>>> at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
>>> at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
>>> at java.lang.Thread.run(Unknown Source)
>>> Caused by: org.apache.flink.util.FlinkException: Could not run the jar.
>>> ... 9 more
>>> Caused by: org.apache.flink.client.program.ProgramInvocationException: The 
>>> main method caused an error.
>>> at 
>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:542)
>>> at 
>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:417)
>>> at 
>>> org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83)
>>> at 
>>> org.apache.flink.client.program.ClusterClient.getOptimizedPlan(ClusterClient.java:334)
>>> at 
>>> org.apache.flink.runtime.webmonitor.handlers.JarActionHandler.getJobGraphAndClassLoader(JarActionHandler.java:87)
>>> at 
>>> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleJsonRequest$0(JarRunHandler.java:69)
>>> ... 8 more
>>> Caused by: org.apache.flink.table.api.SqlParserException: SQL parse failed. 
>>> Encountered "." at line 3, column 4.
>>> Was expecting one of:
>>> 
>>> "ORDER" ...
>>> "LIMIT" ...
>>> "OFFSET" ...
>>> "FETCH" ...
>>> "FROM" ...
>>> "," ...
>>> "UNION" ...
>>> "INTERSECT" ...
>>> "EXCEPT" ...
>>> "MINUS" ...
>>>
>>> at 
>>> org.apache.flink.table.calcite.FlinkPlannerImpl.parse(FlinkPlannerImpl.scala:81)
>>> at 
>>> org.apache.flink.table.api.TableEnvironment.sqlQuery(TableEnvironment.scala:558)
>>> at 
>>> com.stone.default.rule.Sandbox3$.delayedEndpoint$com$stone$default$rule$Sandbox3$1(Sandbox.scala:112)
>>> at 
>>> com.stone.default.rule.Sandbox3$delayedInit$body.apply(Sandbox.scala:93)
>>> at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
>>> at 
>>> scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
>>> at scala.App$$anonfun$main$1.apply(App.scala:76)
>>> at scala.App$$anonfun$main$1.apply(App.scala:76)
>>> at scala.collection.immutable.List.foreach(List.scala:392)
>>> at 
>>> scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
>>> at scala.App$class.main(App.scala:76)
>>> at com.stone.default.rule.Sandbox3$.main(Sandbox.scala:93)
>>> at com.stone.default.rule.Sandbox3.main(Sandbox.scala)
>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>> at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
>>> at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
>>> at java.lang.reflect.Method.invoke(Unknown Source)
>>> at 
>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:525)
>>> ... 13 more
>>> Caused by: org.apache.calcite.sql.parser.SqlParseException: Encountered "." 
>>> at line 3, column 4.
>>> Was expecting one of:
>>> 
>>> "ORDER" ...
>>> "LIMIT" ...
>>> "OFFSET" ...
>>> "FETCH" ...
>>> "FROM" ...
>>> "," ...
>>> "UNION" ...
>>> "INTERSECT" ...
>>> "EXCEPT" ...
>>> "MINUS" ...
>>>
>>> at 
>>> org.apache.calcite.sql.parser.impl.SqlParserImpl.convertException(SqlParserImpl.java:350)
>>> at 
>>> org.apache.calcite.sql.parser.impl.SqlParserImpl.normalizeException(SqlParserImpl.java:131)
>>> at 
>>> org.apache.calcite.sql.parser.SqlParser.parseQuery(SqlParser.java:138)
>>> at 

Task Manager allocation issue when upgrading 1.6.0 to 1.6.2

2018-11-08 Thread Cliff Resnick
I'm running a YARN cluster of 8 * 4 core instances = 32 cores, with a
configuration of 3 slots per TM. The cluster is dedicated to a single job
that runs at full capacity in "FLIP6" mode. So in this cluster, the
parallelism is 21 (7 TMs * 3, one container dedicated for Job Manager).

When I run the job in 1.6.0, seven Task Managers are spun up as expected.
But if I run with 1.6.2 only four Task Managers spin up and the job hangs
waiting for more resources.

Our Flink distribution is set up by script after building from source. So
aside from flink jars, both 1.6.0 and 1.6.2 directories are identical. The
job is the same, restarting from savepoint. The problem is repeatable.

Has something changed in 1.6.2, and if so can it be remedied with a config
change?


Re: classloading strangeness with Avro in Flink

2018-08-21 Thread Cliff Resnick
Solved this by moving flink-avro to lib and reverting to
`classloader.resolve-order: parent-first`.  I still don't know why, but I
guess if you're reading Avro both from file and Kafka in the same pipeline
then inverted class loader delegation will not work. Thanks, Vino for your
help!

On Tue, Aug 21, 2018 at 8:02 AM Cliff Resnick  wrote:

> Hi Aljoscha,
>
> We need flink-shaded-hadoop2-uber.jar because there is no hadoop distro on
> the instance the Flink session/jobs is managed from and the process that
> launches Flink is not a java process, but execs a process that calls the
> flink script.
>
> -Cliff
>
> On Tue, Aug 21, 2018 at 5:11 AM Aljoscha Krettek 
> wrote:
>
>> Hi Cliff,
>>
>> Do you actually need the flink-shaded-hadoop2-uber.jar in lib. If you're
>> running on YARN, you should be able to just remove them because with YARN
>> you will have Hadoop in the classpath anyways.
>>
>> Aljoscha
>>
>> On 21. Aug 2018, at 03:45, vino yang  wrote:
>>
>> Hi Cliff,
>>
>> If so, you can explicitly exclude Avro's dependencies from related
>> dependencies (using ) and then directly introduce dependencies on
>> the Avro version you need.
>>
>> Thanks, vino.
>>
>> Cliff Resnick  于2018年8月21日周二 上午5:13写道:
>>
>>> Hi Vino,
>>>
>>> Unfortunately, I'm still stuck here. By moving the avro dependency chain
>>> to lib (and removing it from user jar), my OCFs decode but I get the error
>>> described here:
>>>
>>> https://github.com/confluentinc/schema-registry/pull/509
>>>
>>> However, the Flink fix described in the PR above was to move the Avro
>>> dependency to the user jar. However, since I'm using YARN, I'm required to
>>> have flink-shaded-hadoop2-uber.jar loaded from lib -- and that has
>>> avro bundled un-shaded. So I'm back to the start problem...
>>>
>>> Any advice is welcome!
>>>
>>> -Cliff
>>>
>>>
>>> On Mon, Aug 20, 2018 at 1:42 PM Cliff Resnick  wrote:
>>>
>>>> Hi Vino,
>>>>
>>>> You were right in your assumption -- unshaded avro was being added to
>>>> our application jar via third-party dependency. Excluding it in packaging
>>>> fixed the issue. For the record, it looks flink-avro must be loaded from
>>>> the lib or there will be errors in checkpoint restores.
>>>>
>>>> On Mon, Aug 20, 2018 at 8:43 AM Cliff Resnick  wrote:
>>>>
>>>>> Hi Vino,
>>>>>
>>>>> Thanks for the explanation, but the job only ever uses the Avro
>>>>> (1.8.2) pulled in by flink-formats/avro, so it's not a class version
>>>>> conflict there.
>>>>>
>>>>> I'm using default child-first loading. It might be a further
>>>>> transitive dependency, though it's not clear by stack trace or stepping
>>>>> through the process. When I get a chance I'll look further into it but in
>>>>> case anyone is experiencing similar problems, what is clear is that
>>>>> classloader order does matter with Avro.
>>>>>
>>>>> On Sun, Aug 19, 2018, 11:36 PM vino yang 
>>>>> wrote:
>>>>>
>>>>>> Hi Cliff,
>>>>>>
>>>>>> My personal guess is that this may be caused by Job's Avro conflict
>>>>>> with the Avro that the Flink framework itself relies on.
>>>>>> Flink has provided some configuration parameters which allows you to
>>>>>> determine the order of the classloaders yourself. [1]
>>>>>> Alternatively, you can debug classloading and participate in the
>>>>>> documentation.[2]
>>>>>>
>>>>>> [1]:
>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.6/ops/config.html
>>>>>> [2]:
>>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/debugging_classloading.html
>>>>>>
>>>>>> Thanks, vino.
>>>>>>
>>>>>> Cliff Resnick  于2018年8月20日周一 上午10:40写道:
>>>>>>
>>>>>>> Our Flink/YARN pipeline has been reading Avro from Kafka for a while
>>>>>>> now. We just introduced a source of Avro OCF (Object Container Files) 
>>>>>>> read
>>>>>>> from S3. The Kafka Avro continued to decode without incident, but the 
>>>>>>> OCF
>>>>>>> files failed 100% with anomalous parse errors in the decoding phase 
>>>>>>> after
>>>>>>> the schema and codec were successfully read from them. The pipeline 
>>>>>>> would
>>>>>>> work on my laptop, and when I submitted a test Main program to the Flink
>>>>>>> Session in YARN, that would also successfully decode. Only the actual
>>>>>>> pipeline run from the TaskManager failed. At one point I even remote
>>>>>>> debugged the TaskManager process and stepped through what looked like a
>>>>>>> normal Avro decode (if you can describe Avro code as normal!) -- until 
>>>>>>> it
>>>>>>> abruptly failed with an int decode or what-have-you.
>>>>>>>
>>>>>>> This stumped me for a while, but I finally tried moving
>>>>>>> flink-avro.jar from the lib to the application jar, and that fixed it. 
>>>>>>> I'm
>>>>>>> not sure why this is, especially since there were no typical
>>>>>>> classloader-type errors.  This issue was observed both on Flink 1.5 and 
>>>>>>> 1.6
>>>>>>> in Flip-6 mode.
>>>>>>>
>>>>>>> -Cliff
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>


Re: classloading strangeness with Avro in Flink

2018-08-21 Thread Cliff Resnick
Hi Aljoscha,

We need flink-shaded-hadoop2-uber.jar because there is no hadoop distro on
the instance the Flink session/jobs is managed from and the process that
launches Flink is not a java process, but execs a process that calls the
flink script.

-Cliff

On Tue, Aug 21, 2018 at 5:11 AM Aljoscha Krettek 
wrote:

> Hi Cliff,
>
> Do you actually need the flink-shaded-hadoop2-uber.jar in lib. If you're
> running on YARN, you should be able to just remove them because with YARN
> you will have Hadoop in the classpath anyways.
>
> Aljoscha
>
> On 21. Aug 2018, at 03:45, vino yang  wrote:
>
> Hi Cliff,
>
> If so, you can explicitly exclude Avro's dependencies from related
> dependencies (using ) and then directly introduce dependencies on
> the Avro version you need.
>
> Thanks, vino.
>
> Cliff Resnick  于2018年8月21日周二 上午5:13写道:
>
>> Hi Vino,
>>
>> Unfortunately, I'm still stuck here. By moving the avro dependency chain
>> to lib (and removing it from user jar), my OCFs decode but I get the error
>> described here:
>>
>> https://github.com/confluentinc/schema-registry/pull/509
>>
>> However, the Flink fix described in the PR above was to move the Avro
>> dependency to the user jar. However, since I'm using YARN, I'm required to
>> have flink-shaded-hadoop2-uber.jar loaded from lib -- and that has
>> avro bundled un-shaded. So I'm back to the start problem...
>>
>> Any advice is welcome!
>>
>> -Cliff
>>
>>
>> On Mon, Aug 20, 2018 at 1:42 PM Cliff Resnick  wrote:
>>
>>> Hi Vino,
>>>
>>> You were right in your assumption -- unshaded avro was being added to
>>> our application jar via third-party dependency. Excluding it in packaging
>>> fixed the issue. For the record, it looks flink-avro must be loaded from
>>> the lib or there will be errors in checkpoint restores.
>>>
>>> On Mon, Aug 20, 2018 at 8:43 AM Cliff Resnick  wrote:
>>>
>>>> Hi Vino,
>>>>
>>>> Thanks for the explanation, but the job only ever uses the Avro (1.8.2)
>>>> pulled in by flink-formats/avro, so it's not a class version conflict
>>>> there.
>>>>
>>>> I'm using default child-first loading. It might be a further transitive
>>>> dependency, though it's not clear by stack trace or stepping through the
>>>> process. When I get a chance I'll look further into it but in case anyone
>>>> is experiencing similar problems, what is clear is that classloader order
>>>> does matter with Avro.
>>>>
>>>> On Sun, Aug 19, 2018, 11:36 PM vino yang  wrote:
>>>>
>>>>> Hi Cliff,
>>>>>
>>>>> My personal guess is that this may be caused by Job's Avro conflict
>>>>> with the Avro that the Flink framework itself relies on.
>>>>> Flink has provided some configuration parameters which allows you to
>>>>> determine the order of the classloaders yourself. [1]
>>>>> Alternatively, you can debug classloading and participate in the
>>>>> documentation.[2]
>>>>>
>>>>> [1]:
>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.6/ops/config.html
>>>>> [2]:
>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/debugging_classloading.html
>>>>>
>>>>> Thanks, vino.
>>>>>
>>>>> Cliff Resnick  于2018年8月20日周一 上午10:40写道:
>>>>>
>>>>>> Our Flink/YARN pipeline has been reading Avro from Kafka for a while
>>>>>> now. We just introduced a source of Avro OCF (Object Container Files) 
>>>>>> read
>>>>>> from S3. The Kafka Avro continued to decode without incident, but the OCF
>>>>>> files failed 100% with anomalous parse errors in the decoding phase after
>>>>>> the schema and codec were successfully read from them. The pipeline would
>>>>>> work on my laptop, and when I submitted a test Main program to the Flink
>>>>>> Session in YARN, that would also successfully decode. Only the actual
>>>>>> pipeline run from the TaskManager failed. At one point I even remote
>>>>>> debugged the TaskManager process and stepped through what looked like a
>>>>>> normal Avro decode (if you can describe Avro code as normal!) -- until it
>>>>>> abruptly failed with an int decode or what-have-you.
>>>>>>
>>>>>> This stumped me for a while, but I finally tried moving
>>>>>> flink-avro.jar from the lib to the application jar, and that fixed it. 
>>>>>> I'm
>>>>>> not sure why this is, especially since there were no typical
>>>>>> classloader-type errors.  This issue was observed both on Flink 1.5 and 
>>>>>> 1.6
>>>>>> in Flip-6 mode.
>>>>>>
>>>>>> -Cliff
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>


Re: classloading strangeness with Avro in Flink

2018-08-20 Thread Cliff Resnick
Hi Vino,

Unfortunately, I'm still stuck here. By moving the avro dependency chain to
lib (and removing it from user jar), my OCFs decode but I get the error
described here:

https://github.com/confluentinc/schema-registry/pull/509

However, the Flink fix described in the PR above was to move the Avro
dependency to the user jar. However, since I'm using YARN, I'm required to
have flink-shaded-hadoop2-uber.jar loaded from lib -- and that has
avro bundled un-shaded. So I'm back to the start problem...

Any advice is welcome!

-Cliff


On Mon, Aug 20, 2018 at 1:42 PM Cliff Resnick  wrote:

> Hi Vino,
>
> You were right in your assumption -- unshaded avro was being added to our
> application jar via third-party dependency. Excluding it in packaging fixed
> the issue. For the record, it looks flink-avro must be loaded from the lib
> or there will be errors in checkpoint restores.
>
> On Mon, Aug 20, 2018 at 8:43 AM Cliff Resnick  wrote:
>
>> Hi Vino,
>>
>> Thanks for the explanation, but the job only ever uses the Avro (1.8.2)
>> pulled in by flink-formats/avro, so it's not a class version conflict
>> there.
>>
>> I'm using default child-first loading. It might be a further transitive
>> dependency, though it's not clear by stack trace or stepping through the
>> process. When I get a chance I'll look further into it but in case anyone
>> is experiencing similar problems, what is clear is that classloader order
>> does matter with Avro.
>>
>> On Sun, Aug 19, 2018, 11:36 PM vino yang  wrote:
>>
>>> Hi Cliff,
>>>
>>> My personal guess is that this may be caused by Job's Avro conflict with
>>> the Avro that the Flink framework itself relies on.
>>> Flink has provided some configuration parameters which allows you to
>>> determine the order of the classloaders yourself. [1]
>>> Alternatively, you can debug classloading and participate in the
>>> documentation.[2]
>>>
>>> [1]:
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.6/ops/config.html
>>> [2]:
>>> https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/debugging_classloading.html
>>>
>>> Thanks, vino.
>>>
>>> Cliff Resnick  于2018年8月20日周一 上午10:40写道:
>>>
>>>> Our Flink/YARN pipeline has been reading Avro from Kafka for a while
>>>> now. We just introduced a source of Avro OCF (Object Container Files) read
>>>> from S3. The Kafka Avro continued to decode without incident, but the OCF
>>>> files failed 100% with anomalous parse errors in the decoding phase after
>>>> the schema and codec were successfully read from them. The pipeline would
>>>> work on my laptop, and when I submitted a test Main program to the Flink
>>>> Session in YARN, that would also successfully decode. Only the actual
>>>> pipeline run from the TaskManager failed. At one point I even remote
>>>> debugged the TaskManager process and stepped through what looked like a
>>>> normal Avro decode (if you can describe Avro code as normal!) -- until it
>>>> abruptly failed with an int decode or what-have-you.
>>>>
>>>> This stumped me for a while, but I finally tried moving flink-avro.jar
>>>> from the lib to the application jar, and that fixed it. I'm not sure why
>>>> this is, especially since there were no typical classloader-type errors.
>>>> This issue was observed both on Flink 1.5 and 1.6 in Flip-6 mode.
>>>>
>>>> -Cliff
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>


Re: classloading strangeness with Avro in Flink

2018-08-20 Thread Cliff Resnick
Hi Jamie,

No, it was nothing of the class not found variety, just parse errors. It
had to do with Avro getting mixed up with different versions.

-Cliff

On Mon, Aug 20, 2018 at 4:18 PM Jamie Grier  wrote:

> Hey Cliff, can you provide the stack trace of the issue you were seeing?
> We recently ran into a similar issue that we're still debugging.  Did it
> look like this:
>
> java.lang.IllegalStateException: Could not initialize operator state
>> backend.
>> at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initOperatorState(AbstractStreamOperator.java:301)
>> at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:248)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>> at java.lang.Thread.run(Thread.java:748)
>> Caused by: java.lang.UnsupportedOperationException: Could not find
>> required Avro dependency.
>> at
>> org.apache.flink.api.java.typeutils.runtime.kryo.Serializers$DummyAvroKryoSerializerClass.read(Serializers.java:170)
>> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>> at
>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:249)
>> at
>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:136)
>> at
>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
>> at
>> org.apache.flink.runtime.state.DefaultOperatorStateBackend.deserializeStateValues(DefaultOperatorStateBackend.java:584)
>> at
>> org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:399)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.createOperatorStateBackend(StreamTask.java:733)
>> at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initOperatorState(AbstractStreamOperator.java:299)
>> ... 6 common frames omitted
>> 00:17:19.626 INFO o.a.f.r.e.ExecutionGraph - Job
>> ClientEventToElasticsearchJob (5cec438674e9a111703c83897f7c8138) switched
>> from state RUNNING to FAILING.
>> java.lang.IllegalStateException: Could not initialize operator state
>> backend.
>> at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initOperatorState(AbstractStreamOperator.java:301)
>> at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:248)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>> at java.lang.Thread.run(Thread.java:748)
>> Caused by: java.lang.UnsupportedOperationException: Could not find
>> required Avro dependency.
>> at
>> org.apache.flink.api.java.typeutils.runtime.kryo.Serializers$DummyAvroKryoSerializerClass.read(Serializers.java:170)
>> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>> at
>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:249)
>> at
>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:136)
>> at
>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
>> at
>> org.apache.flink.runtime.state.DefaultOperatorStateBackend.deserializeStateValues(DefaultOperatorStateBackend.java:584)
>> at
>> org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:399)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.createOperatorStateBackend(StreamTask.java:733)
>> at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initOperatorState(AbstractStreamOperator.java:299)
>> ... 6 common frames omitted
>
>
> -Jamie
>
>
> On Mon, Aug 20, 2018 at 10:42 AM, Cliff Resnick  wrote:
>
>> Hi Vino,
>>
>> You were right in your assumption -- unshaded avro was being added to our
>> application jar via third-party dependency. Excluding it in packaging fixed
>> the iss

Re: classloading strangeness with Avro in Flink

2018-08-20 Thread Cliff Resnick
Hi Vino,

You were right in your assumption -- unshaded avro was being added to our
application jar via third-party dependency. Excluding it in packaging fixed
the issue. For the record, it looks flink-avro must be loaded from the lib
or there will be errors in checkpoint restores.

On Mon, Aug 20, 2018 at 8:43 AM Cliff Resnick  wrote:

> Hi Vino,
>
> Thanks for the explanation, but the job only ever uses the Avro (1.8.2)
> pulled in by flink-formats/avro, so it's not a class version conflict
> there.
>
> I'm using default child-first loading. It might be a further transitive
> dependency, though it's not clear by stack trace or stepping through the
> process. When I get a chance I'll look further into it but in case anyone
> is experiencing similar problems, what is clear is that classloader order
> does matter with Avro.
>
> On Sun, Aug 19, 2018, 11:36 PM vino yang  wrote:
>
>> Hi Cliff,
>>
>> My personal guess is that this may be caused by Job's Avro conflict with
>> the Avro that the Flink framework itself relies on.
>> Flink has provided some configuration parameters which allows you to
>> determine the order of the classloaders yourself. [1]
>> Alternatively, you can debug classloading and participate in the
>> documentation.[2]
>>
>> [1]:
>> https://ci.apache.org/projects/flink/flink-docs-release-1.6/ops/config.html
>> [2]:
>> https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/debugging_classloading.html
>>
>> Thanks, vino.
>>
>> Cliff Resnick  于2018年8月20日周一 上午10:40写道:
>>
>>> Our Flink/YARN pipeline has been reading Avro from Kafka for a while
>>> now. We just introduced a source of Avro OCF (Object Container Files) read
>>> from S3. The Kafka Avro continued to decode without incident, but the OCF
>>> files failed 100% with anomalous parse errors in the decoding phase after
>>> the schema and codec were successfully read from them. The pipeline would
>>> work on my laptop, and when I submitted a test Main program to the Flink
>>> Session in YARN, that would also successfully decode. Only the actual
>>> pipeline run from the TaskManager failed. At one point I even remote
>>> debugged the TaskManager process and stepped through what looked like a
>>> normal Avro decode (if you can describe Avro code as normal!) -- until it
>>> abruptly failed with an int decode or what-have-you.
>>>
>>> This stumped me for a while, but I finally tried moving flink-avro.jar
>>> from the lib to the application jar, and that fixed it. I'm not sure why
>>> this is, especially since there were no typical classloader-type errors.
>>> This issue was observed both on Flink 1.5 and 1.6 in Flip-6 mode.
>>>
>>> -Cliff
>>>
>>>
>>>
>>>
>>>
>>>


Re: classloading strangeness with Avro in Flink

2018-08-20 Thread Cliff Resnick
Hi Vino,

Thanks for the explanation, but the job only ever uses the Avro (1.8.2)
pulled in by flink-formats/avro, so it's not a class version conflict
there.

I'm using default child-first loading. It might be a further transitive
dependency, though it's not clear by stack trace or stepping through the
process. When I get a chance I'll look further into it but in case anyone
is experiencing similar problems, what is clear is that classloader order
does matter with Avro.

On Sun, Aug 19, 2018, 11:36 PM vino yang  wrote:

> Hi Cliff,
>
> My personal guess is that this may be caused by Job's Avro conflict with
> the Avro that the Flink framework itself relies on.
> Flink has provided some configuration parameters which allows you to
> determine the order of the classloaders yourself. [1]
> Alternatively, you can debug classloading and participate in the
> documentation.[2]
>
> [1]:
> https://ci.apache.org/projects/flink/flink-docs-release-1.6/ops/config.html
> [2]:
> https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/debugging_classloading.html
>
> Thanks, vino.
>
> Cliff Resnick  于2018年8月20日周一 上午10:40写道:
>
>> Our Flink/YARN pipeline has been reading Avro from Kafka for a while now.
>> We just introduced a source of Avro OCF (Object Container Files) read from
>> S3. The Kafka Avro continued to decode without incident, but the OCF files
>> failed 100% with anomalous parse errors in the decoding phase after the
>> schema and codec were successfully read from them. The pipeline would work
>> on my laptop, and when I submitted a test Main program to the Flink Session
>> in YARN, that would also successfully decode. Only the actual pipeline run
>> from the TaskManager failed. At one point I even remote debugged the
>> TaskManager process and stepped through what looked like a normal Avro
>> decode (if you can describe Avro code as normal!) -- until it abruptly
>> failed with an int decode or what-have-you.
>>
>> This stumped me for a while, but I finally tried moving flink-avro.jar
>> from the lib to the application jar, and that fixed it. I'm not sure why
>> this is, especially since there were no typical classloader-type errors.
>> This issue was observed both on Flink 1.5 and 1.6 in Flip-6 mode.
>>
>> -Cliff
>>
>>
>>
>>
>>
>>


classloading strangeness with Avro in Flink

2018-08-19 Thread Cliff Resnick
Our Flink/YARN pipeline has been reading Avro from Kafka for a while now.
We just introduced a source of Avro OCF (Object Container Files) read from
S3. The Kafka Avro continued to decode without incident, but the OCF files
failed 100% with anomalous parse errors in the decoding phase after the
schema and codec were successfully read from them. The pipeline would work
on my laptop, and when I submitted a test Main program to the Flink Session
in YARN, that would also successfully decode. Only the actual pipeline run
from the TaskManager failed. At one point I even remote debugged the
TaskManager process and stepped through what looked like a normal Avro
decode (if you can describe Avro code as normal!) -- until it abruptly
failed with an int decode or what-have-you.

This stumped me for a while, but I finally tried moving flink-avro.jar from
the lib to the application jar, and that fixed it. I'm not sure why this
is, especially since there were no typical classloader-type errors.  This
issue was observed both on Flink 1.5 and 1.6 in Flip-6 mode.

-Cliff


Re: MIs-reported metrics using SideOutput stream + Broadcast

2018-07-03 Thread Cliff Resnick
I found the problem and, of course, it was between my desk and my chair.
For side outputs Flink UI correctly reports (S+N) * Slots. Since the
CoProcessFunction late-join was hashed that factored to S+N. Thanks for the
help!

On Tue, Jul 3, 2018 at 3:51 AM, Chesnay Schepler  wrote:

> Let's see if i understood everything correctly:
>
> 1)
> Let's say that metadata contains N records.
>
> The UI output metrics indicate that *metadata *sends N records.
> The UI input metrics for *join *and *late-join* do each include N records
> (i.e N + whatever other data they receive).
>
> You expected that the output of *metadata *be 2*N since they are
> broadcasted to 2 operators.
>
> If so, then the metrics work as intended; they count the number of records
> that the operator emits; the duplication happens behind the scenes
> *somewhere* outside the operator. In other words, the metric counts the
> number of *Collector#collect()* calls.
>
> 2)
> Let's say that *join *emits M records via the main output, and S records
> via the side-output.
>
> The UI input metrics for *late-join *indicate that M records have been
> received.
>
> You expected the input for *late-join* to be S + N instead, the
> side-output + broadcast data (see 1) ).
>
> If so, then yeah that's weird and shouldn't happen.
>
> For clarification:
> You use the broadcast variable for the *join *operator, but
> KeyedBroadcastMetadataJoin.broadcast(metadata) for the* late-join*.
> Is this intended, or just a copy error?
>
>
> On 03.07.2018 04:16, Cliff Resnick wrote:
>
> Our topology has a metadata source that we push via Broadcast. Because
> this metadata source is critical, but sometimes late, we added a buffering
> mechanism via a SideOutput. We call the initial look-up from Broadcast
> "join"  and the secondary, state-backed buffered  lookup, "late-join"
>
> Today I noticed that if we implement the late join using a
> KeyedBroadcastProcessFunction, (so we can set TTL timers while using
> broadcast) everything seems to work. However, even though our
> internal metrics show the correct numbers, the numbers in the Flink UI
> falsely indicates that:
>
> 1) No broadcast data is sent to the late join, meaning Flink metrics
> for the metadata operator does not indicate any extra records sent.
> 2) Primary Join's main stream (not Side Output) is indicated as being sent
> to Late Join, meaning the Flink metrics input record number from Primary
> Join matches Late Join's, even though our logs and internal metrics might
> show zero traffic.
>
> If I do the late join via CoProcessFunction using a metadata keyed stream
> instead of broadcast, then the Flink UI shows the correct numbers
> (unfortunately there is another side issue when we take that tack but I
> won't go into that here).
>
> I hope this was not too confusing. Again the issue is not that this does
> not work -- it just looks like it does not work in the Flink UI.
>
> Below is the approximate code. Perhaps I'm doing something wrong that
> causes the weird reporting?
>
> val metadata = MetadataTable
>   .streamFromKafka(env)
>
> val broadcast = createBroadcast(metadata)
>
> val metadataJoined = sourceTables .union(source1Tables)
> .union(source2Tables) .connect(broadcast) .process(BroadcastMetadataJoin())
> // this operator will send side output data using Metadata.sideOutputTag
>
>   .name("join")
>
> val lateJoined = metadataJoined
>   .getSideOutput(Metadata.sideOutputTag)
>   .keyBy(_.primaryKey.getMetadataId)
>   .connect(KeyedBroadcastMetadataJoin.broadcast(metadata))
>   .process(KeyedBroadcastMetadataJoin())
>   .name("late-join")
>
>
>
>
>


MIs-reported metrics using SideOutput stream + Broadcast

2018-07-02 Thread Cliff Resnick
Our topology has a metadata source that we push via Broadcast. Because this
metadata source is critical, but sometimes late, we added a buffering
mechanism via a SideOutput. We call the initial look-up from Broadcast
"join"  and the secondary, state-backed buffered  lookup, "late-join"

Today I noticed that if we implement the late join using a
KeyedBroadcastProcessFunction, (so we can set TTL timers while using
broadcast) everything seems to work. However, even though our
internal metrics show the correct numbers, the numbers in the Flink UI
falsely indicates that:

1) No broadcast data is sent to the late join, meaning Flink metrics
for the metadata operator does not indicate any extra records sent.
2) Primary Join's main stream (not Side Output) is indicated as being sent
to Late Join, meaning the Flink metrics input record number from Primary
Join matches Late Join's, even though our logs and internal metrics might
show zero traffic.

If I do the late join via CoProcessFunction using a metadata keyed stream
instead of broadcast, then the Flink UI shows the correct numbers
(unfortunately there is another side issue when we take that tack but I
won't go into that here).

I hope this was not too confusing. Again the issue is not that this does
not work -- it just looks like it does not work in the Flink UI.

Below is the approximate code. Perhaps I'm doing something wrong that
causes the weird reporting?

val metadata = MetadataTable
  .streamFromKafka(env)

val broadcast = createBroadcast(metadata)

val metadataJoined = sourceTables
.union(source1Tables)
.union(source2Tables)
.connect(broadcast)
.process(BroadcastMetadataJoin()) // this operator will send side output
data using Metadata.sideOutputTag

  .name("join")


val lateJoined = metadataJoined
  .getSideOutput(Metadata.sideOutputTag)
  .keyBy(_.primaryKey.getMetadataId)
  .connect(KeyedBroadcastMetadataJoin.broadcast(metadata))
  .process(KeyedBroadcastMetadataJoin())
  .name("late-join")


Re: Task Manager detached under load

2018-01-30 Thread Cliff Resnick
I've seen a similar issue while running successive Flink SQL batches on
1.4. In my case, the Job Manager would fail with the log output about
unreachability (with an additional statement about something going
"horribly wrong"). Under workload pressure, I reverted to 1.3.2 where
everything works perfectly, but we will try again soon on 1.4. When we do I
will post the actual log output.

This was on YARN in AWS, with akka.ask.timeout = 60s.

On Wed, Jan 24, 2018 at 9:57 PM, Ashish Pokharel 
wrote:

> I haven’t gotten much further with this. It doesn’t look like GC related -
> at least GC counters were not that atrocious. However, my main concern was
> once the load subsides why aren’t TM and JM connecting again? That doesn’t
> look normal. I could definitely tell JM was listening on the port and from
> logs it does appear TM is trying to message JM that is still alive.
>
> Thanks, Ashish
>
> On Jan 23, 2018, at 12:31 PM, Lasse Nedergaard 
> wrote:
>
> Hi.
>
> Did you find a reason for the detaching ?
> I sometimes see the same on our system running Flink 1.4 on dc/os. I have
> enabled taskmanager.Debug.memory.startlogthread for debugging.
>
> Med venlig hilsen / Best regards
> Lasse Nedergaard
>
>
> Den 20. jan. 2018 kl. 12.57 skrev Kien Truong :
>
> Hi,
>
> You should enable and check your garbage collection log.
>
> We've encountered case where Task Manager disassociated due to long GC
> pause.
>
>
> Regards,
>
> Kien
> On 1/20/2018 1:27 AM, ashish pok wrote:
>
> Hi All,
>
> We have hit some load related issues and was wondering if any one has some
> suggestions. We are noticing task managers and job managers being detached
> from each other under load and never really sync up again. As a result,
> Flink session shows 0 slots available for processing. Even though, apps are
> configured to restart it isn't really helping as there are no slots
> available to run the apps.
>
>
> Here are excerpt from logs that seemed relevant. (I am trimming out rest
> of the logs for brevity)
>
> *Job Manager:*
> 2018-01-19 12:38:00,423 INFO  org.apache.flink.runtime.jobmanager.JobManager
>   -  Starting JobManager (Version: 1.4.0, Rev:3a9d9f2,
> Date:06.12.2017 @ 11:08:40 UTC)
>
> 2018-01-19 12:38:00,792 INFO  org.apache.flink.runtime.jobmanager.JobManager
>   -  Maximum heap size: 16384 MiBytes
> 2018-01-19 12:38:00,794 INFO  org.apache.flink.runtime.jobmanager.JobManager
>   -  Hadoop version: 2.6.5
> 2018-01-19 12:38:00,794 INFO  org.apache.flink.runtime.jobmanager.JobManager
>   -  JVM Options:
> 2018-01-19 12:38:00,794 INFO  org.apache.flink.runtime.jobmanager.JobManager
>   - -Xms16384m
> 2018-01-19 12:38:00,794 INFO  org.apache.flink.runtime.jobmanager.JobManager
>   - -Xmx16384m
> 2018-01-19 12:38:00,795 INFO  org.apache.flink.runtime.jobmanager.JobManager
>   - -XX:+UseG1GC
>
> 2018-01-19 12:38:00,908 INFO  
> org.apache.flink.configuration.GlobalConfiguration
>   - Loading configuration property: jobmanager.rpc.port, 6123
> 2018-01-19 12:38:00,908 INFO  
> org.apache.flink.configuration.GlobalConfiguration
>   - Loading configuration property: jobmanager.heap.mb, 16384
>
>
> 2018-01-19 12:53:34,671 WARN  akka.remote.RemoteWatcher
>  - Detected unreachable: [akka.tcp://flink@:37840
> ]
> 2018-01-19 12:53:34,676 INFO  org.apache.flink.runtime.jobmanager.JobManager
>   - Task manager akka.tcp://flink@:
> 37840/user/taskmanager terminated.
>
> -- So once Flink session boots up, we are hitting it with pretty heavy
> load, which typically results in the WARN above
>
> *Task Manager:*
> 2018-01-19 12:38:01,002 INFO  org.apache.flink.runtime.taskmanager.TaskManager
> -  Starting TaskManager (Version: 1.4.0, Rev:3a9d9f2,
> Date:06.12.2017 @ 11:08:40 UTC)
> 2018-01-19 12:38:01,367 INFO  org.apache.flink.runtime.taskmanager.TaskManager
> -  Hadoop version: 2.6.5
> 2018-01-19 12:38:01,367 INFO  org.apache.flink.runtime.taskmanager.TaskManager
> -  JVM Options:
> 2018-01-19 12:38:01,367 INFO  org.apache.flink.runtime.taskmanager.TaskManager
> - -Xms16384M
> 2018-01-19 12:38:01,367 INFO  org.apache.flink.runtime.taskmanager.TaskManager
> - -Xmx16384M
> 2018-01-19 12:38:01,367 INFO  org.apache.flink.runtime.taskmanager.TaskManager
> - -XX:MaxDirectMemorySize=8388607T
> 2018-01-19 12:38:01,367 INFO  org.apache.flink.runtime.taskmanager.TaskManager
> - -XX:+UseG1GC
>
> 2018-01-19 12:38:01,392 INFO  
> org.apache.flink.configuration.GlobalConfiguration
>   - Loading configuration property: jobmanager.rpc.port, 6123
> 2018-01-19 12:38:01,392 INFO  
> org.apache.flink.configuration.GlobalConfiguration
>   - Loading configuration property: jobmanager.heap.mb, 16384
>
>
> 2018-01-19 12:54:48,626 WARN  

CSV writer/parser inconsistency when using the Table API?

2017-12-22 Thread Cliff Resnick
I've been trying out the Table API for some ETL using a two-stage job of
CsvTableSink (DataSet) -> CsvInputFormat (Stream). I ran into an issue
where the first stage produces output with trailing null values (valid),
which causes a parse error in the second stage.

Looking at RowCsvInputFormatTest.java, I noticed that it expects input
lines with a trailing delimiter, eg. "a|b|c|". Meanwhile, the CsvTableSink
creates rows in the form of "a|b|c". As long as 'c' is present, this input
does get successfully parsed by the RowCsvInputFormat. However, if  'c' is
defined as a number and missing, eg. the row is "a|b|", the Number parser
will fail on the empty string.

Is there something I am missing, or is there, in fact, an inconsistency
between the TableSink and the InputFormat?


Question on checkpoint management

2017-05-08 Thread Cliff Resnick
When a job cancel-with-savepoint finishes a successful Savepoint, the
preceding last successful Checkpoint is removed. Is this the intended
behavior? I thought that checkpoints and savepoints were separate entities
and, as such, savepoints should not infringe on checkpoints. This is
actually an issue for us because we have seen occurrences of false-positive
successful savepoints, perhaps due to S3 latency. Bottom line, we'd like to
treat savepoints as insurance rather than the critical path and would
rather they be oblivious to checkpoint management.

We are using externalized checkpoints, which may be confusing things. Also
I know checkpoint management is undergoing some changes in Flink 1.3 (we
are on Flink 1.2.0). Any insight is greatly appreciated.


Re: REST api: how to upload jar?

2017-01-25 Thread Cliff Resnick
Thanks for that, issue here https://issues.apache.org/jira/browse/FLINK-5646

On Tue, Jan 24, 2017 at 11:30 PM, Sachin Goel <sachingoel0...@gmail.com>
wrote:

> Hey Cliff
> You can upload a jar file using http post with the file data sent under a
> form field 'jarfile'.
>
> Can you also please open a jira for fixing the documentation?
>
> - Sachin
>
>
> On Jan 25, 2017 06:55, "Cliff Resnick" <cre...@gmail.com> wrote:
>
>> The 1.2 release documentation (https://ci.apache.org/project
>> s/flink/flink-docs-release-1.2/monitoring/rest_api.html)  states "It is
>> possible to upload, run, and list Flink programs via the REST APIs and web
>> frontend". However there is no documentation about uploading a jar via REST
>> api. Does this mean that upload is only supported via the web frontend?  I
>> did notice that if I manually upload a jar to the configured upload dir an
>> prepend its name with a uuid it does get recognized and I can POST a job
>> start, but this is messy and I'd rather use the api if supported.
>>
>> -Cliff
>>
>


REST api: how to upload jar?

2017-01-24 Thread Cliff Resnick
The 1.2 release documentation (https://ci.apache.org/
projects/flink/flink-docs-release-1.2/monitoring/rest_api.html)  states "It
is possible to upload, run, and list Flink programs via the REST APIs and
web frontend". However there is no documentation about uploading a jar via
REST api. Does this mean that upload is only supported via the web
frontend?  I did notice that if I manually upload a jar to the configured
upload dir an prepend its name with a uuid it does get recognized and I can
POST a job start, but this is messy and I'd rather use the api if supported.

-Cliff


Re: Streaming pipeline failing to complete checkpoints at scale

2016-12-23 Thread Cliff Resnick
Thanks Stephan, those are great suggestions and I look forward to working
through them. The Thread.yield() in the tight loop looks particularly
interesting. In a similar vein, I have noticed that the
ContinuousFIleReader sometimes at load lags behind the
ContinuousFileWatcher and seems to stay that way as buffers fill despite
the much lower cardinality of the Watcher's Source data. As an experiment I
added a forced timeout in the FileWatcher to approximate the FileReader's
consumption time, and I have been able to gain consistent checkpoints this
way. Despite the obvious kludgery (the success is more likely an effect of
simply slowing down the pipeline!),  I'm wondering if there's an
unrecoverable Barrier lag somehow hiding here. I stopped the pipeline and
resumed with more network buffers just to see how that affects things.

One minor note about RocksDb and multiple task managers: the RocksDb
library is loaded to be shared among TaskMangers from flink/tmp, but this
creates a lot of thrash on startup as each TaskManager tries to overwrite
the library, We sometimes use 32 or 36 core instances and this thrash would
be intolerable with that many TaskManagers. As a workaround I added random
a directory creation in the flink-statebackend-rocksdb code. This then
becomes a cleanup issue as directories proliferate but that's a useful
tradeoff for us. Maybe add a config for this?

Thanks again,
Cliff



On Fri, Dec 23, 2016 at 2:25 PM, Stephan Ewen <se...@apache.org> wrote:

> Hi Cliff!
>
> Sorry to hear that you are running into so much trouble with the
> checkpointing.
>
> Here are a few things we found helping another user that was running into
> a similar issue:
>
> (1) Make sure you use a rather new version of the 1.2 branch (there have
> been some important fixes added to the network in the past weeks)
>
> (2) Use TaskManagers with one slot (start more TaskManagers instead,
> multiple TaskManagers can run on the same machine). That way, network
> connections do not get into each other's way. Flink's connection
> multiplexing has a vulnerability that may slow down networks in some cases,
> which can affect checkpoints.
>
> (3) This is very important: Try to move all state that is large to keyed
> state (value state, list state, reducing state, ...) and use RocksDB as the
> state backend. That way snapshots happen asynchronously, which greatly
> helps the checkpoints.
>
> (4) Briefly looking over the File Reading Operator code, it could be that
> the operator does not release the checkpoint lock long enough for barriers
> to get injected in a timely fashion. If you are running a custom Flink
> build anyways, you could try to change the following code block in the
> "ContinuousFileReaderOperator" (as an experiment, not a final fix)
>
> while (!format.reachedEnd()) {
> synchronized (checkpointLock) {
> nextElement = format.nextRecord(nextElement);
> if (nextElement != null) {
> readerContext.collect(nextElement);
> } else {
> break;
> }
> }
> }
>
> while (!format.reachedEnd()) {
> synchronized (checkpointLock) {
> nextElement = format.nextRecord(nextElement);
> if (nextElement != null) {
> readerContext.collect(nextElement);
> } else {
> break;
> }
> }
> // give the checkpoint thread a change to work
> Thread.yield();
> }
>
>
> If you are already doing all of that, or if this does not help, we have
> two courses of action (if you can bear with us for a bit):
>
>   - There is some much enhanced Checkpoint Monitoring almost ready to be
> merged. That should help in fining out where the barriers get delayed.
>
>   - Finally, we are experimenting with some other checkpoint alignment
> variants (alternatives to the BarrierBuffer). We can ping you when we have
> an experimental branch with that.
>
> Hope we get this under control!
>
> Best,
> Stephan
>
>
> On Fri, Dec 23, 2016 at 3:47 PM, Cliff Resnick <cre...@gmail.com> wrote:
>
>> We are running a DataStream pipeline using Exactly Once/Event Time
>> semantics on 1.2-SNAPSHOT. The pipeline sources from S3 using the
>> ContinuousFileReaderOperator. We use a custom version of the
>> ContinuousFileMonitoringFunction since our source directory changes over
>> time. The pipeline transforms and aggregates tuples of data that is steady
>> over time (spike-less), windowing by hour with an allowed lateness of 6
>> hours. We are running on ec2 c4 instances in a simple YARN setup.
>>
>> What we are seeing is that as we scale the system to hundreds of cores
>> reading around 10 million events per second, the pipeline might checkpoint
>> up to few times before it reaches a state where it becomes unable to
>> complete a checkpoint. The checkpoint interval does not seem 

Streaming pipeline failing to complete checkpoints at scale

2016-12-23 Thread Cliff Resnick
We are running a DataStream pipeline using Exactly Once/Event Time
semantics on 1.2-SNAPSHOT. The pipeline sources from S3 using the
ContinuousFileReaderOperator. We use a custom version of the
ContinuousFileMonitoringFunction since our source directory changes over
time. The pipeline transforms and aggregates tuples of data that is steady
over time (spike-less), windowing by hour with an allowed lateness of 6
hours. We are running on ec2 c4 instances in a simple YARN setup.

What we are seeing is that as we scale the system to hundreds of cores
reading around 10 million events per second, the pipeline might checkpoint
up to few times before it reaches a state where it becomes unable to
complete a checkpoint. The checkpoint interval does not seem to matter as
we've tested intervals from 5 minutes to one hour and timeouts up to one
hour as well.
What we'll usually see is something like:

checkpoint 1 (2G) 1 minute
checkpoint 2 (3-4G)  1-5 minutes
checkpoint 3 (5-6G) 1-6 minutes
checkpoint 4-5 (mixed bag, 4-25 minutes, or never)
checkpoint 6-n never

We've added debugging output to Flink internal code, e.g.
BarrierBuffer.java. From the debugging output it's clear that the actual
checkpoint per operator always completes in at most several seconds. What
seem to be happening, however, is that CheckpointBarriers start to become
slower to arrive, and after a few checkpoints it gets worse with
CheckpointBarriers going greatly askew and finally never arriving.
Meanwhile we can see that downstream counts are closely tailing upstream
counts,  < .01  behind, but once the barrier flow seemingly stops, the
downstream slows to a stop as (I guess) network buffers fill, then the
pipeline is dead.

Meanwhile, cluster resources are not being stressed.

At this point we've stripped down the pipeline, tried various StateBackend
configs, etc. but the result is invariably the same sad story. It would be
great if somebody could provide more insight into where things might be
going wrong. Hopefully this is a simple config issue, but we'd be open to
any and all suggestions regarding testing, tweaking, etc.

-Cliff


Re: Resource under-utilization when using RocksDb state backend [SOLVED]

2016-12-08 Thread Cliff Resnick
It turns out that most of the time in RocksDBFoldingState was spent on
serialization/deserializaton. RocksDb read/write was performing well. By
moving from Kryo to custom serialization we were able to increase
throughput dramatically. Load is now where it should be.

On Mon, Dec 5, 2016 at 1:15 PM, Robert Metzger <rmetz...@apache.org> wrote:

> Another Flink user using RocksDB with large state on SSDs recently posted
> this video for oprimizing the performance of Rocks on SSDs:
> https://www.youtube.com/watch?v=pvUqbIeoPzM
> That could be relevant for you.
>
> For how long did you look at iotop. It could be that the IO access happens
> in bursts, depending on how data is cached.
>
> I'll also add Stefan Richter to the conversation, he has maybe some more
> ideas what we can do here.
>
>
> On Mon, Dec 5, 2016 at 6:19 PM, Cliff Resnick <cre...@gmail.com> wrote:
>
>> Hi Robert,
>>
>> We're following 1.2-SNAPSHOT,  using event time. I have tried "iotop" and
>> I see usually less than 1 % IO. The most I've seen was a quick flash here
>> or there of something substantial (e.g. 19%, 52%) then back to nothing. I
>> also assumed we were disk-bound, but to use your metaphor I'm having
>> trouble finding any smoke. However, I'm not very experienced in sussing out
>> IO issues so perhaps there is something else I'm missing.
>>
>> I'll keep investigating. If I continue to come up empty then I guess my
>> next steps may be to stage some independent tests directly against RocksDb.
>>
>> -Cliff
>>
>>
>> On Mon, Dec 5, 2016 at 5:52 AM, Robert Metzger <rmetz...@apache.org>
>> wrote:
>>
>>> Hi Cliff,
>>>
>>> which Flink version are you using?
>>> Are you using Eventtime or processing time windows?
>>>
>>> I suspect that your disks are "burning" (= your job is IO bound). Can
>>> you check with a tool like "iotop" how much disk IO Flink is producing?
>>> Then, I would set this number in relation with the theoretical maximum
>>> of your SSD's (a good rough estimate is to use dd for that).
>>>
>>> If you find that your disk bandwidth is saturated by Flink, you could
>>> look into tuning the RocksDB settings so that it uses more memory for
>>> caching.
>>>
>>> Regards,
>>> Robert
>>>
>>>
>>> On Fri, Dec 2, 2016 at 11:34 PM, Cliff Resnick <cre...@gmail.com> wrote:
>>>
>>>> In tests comparing RocksDb to fs state backend we observe much lower
>>>> throughput, around 10x slower. While the lowered throughput is expected,
>>>> what's perplexing is that machine load is also very low with RocksDb,
>>>> typically falling to  < 25% CPU and negligible IO wait (around 0.1%). Our
>>>> test instances are EC2 c3.xlarge which are 4 virtual CPUs and 7.5G RAM,
>>>> each running a single TaskManager in YARN, with 6.5G allocated memory per
>>>> TaskManager. The instances also have 2x40G attached SSDs which we have
>>>> mapped to `taskmanager.tmp.dir`.
>>>>
>>>> With FS state and 4 slots per TM, we will easily max out with an
>>>> average load average around 5 or 6, so we actually need throttle down the
>>>> slots to 3. With RocksDb using the Flink SSD configured options we see a
>>>> load average at around 1. Also, load (and actual) throughput remain more or
>>>> less constant no matter how many slots we use. The weak load is spread over
>>>> all CPUs.
>>>>
>>>> Here is a sample top:
>>>>
>>>> Cpu0  : 20.5%us,  0.0%sy,  0.0%ni, 79.5%id,  0.0%wa,  0.0%hi,  0.0%si,
>>>>  0.0%st
>>>> Cpu1  : 18.5%us,  0.0%sy,  0.0%ni, 81.5%id,  0.0%wa,  0.0%hi,  0.0%si,
>>>>  0.0%st
>>>> Cpu2  : 11.6%us,  0.7%sy,  0.0%ni, 87.0%id,  0.7%wa,  0.0%hi,  0.0%si,
>>>>  0.0%st
>>>> Cpu3  : 12.5%us,  0.3%sy,  0.0%ni, 86.8%id,  0.0%wa,  0.0%hi,  0.3%si,
>>>>  0.0%st
>>>>
>>>> Our pipeline uses tumbling windows, each with a ValueState keyed to a
>>>> 3-tuple of one string and two ints.. Each ValueState comprises a small set
>>>> of tuples around 5-7 fields each. The WindowFunction simply diffs agains
>>>> the set and updates state if there is a diff.
>>>>
>>>> Any ideas as to what the bottleneck is here? Any suggestions welcomed!
>>>>
>>>> -Cliff
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>
>>
>


Re: Resource under-utilization when using RocksDb state backend

2016-12-05 Thread Cliff Resnick
Hi Robert,

We're following 1.2-SNAPSHOT,  using event time. I have tried "iotop" and I
see usually less than 1 % IO. The most I've seen was a quick flash here or
there of something substantial (e.g. 19%, 52%) then back to nothing. I also
assumed we were disk-bound, but to use your metaphor I'm having trouble
finding any smoke. However, I'm not very experienced in sussing out IO
issues so perhaps there is something else I'm missing.

I'll keep investigating. If I continue to come up empty then I guess my
next steps may be to stage some independent tests directly against RocksDb.

-Cliff


On Mon, Dec 5, 2016 at 5:52 AM, Robert Metzger <rmetz...@apache.org> wrote:

> Hi Cliff,
>
> which Flink version are you using?
> Are you using Eventtime or processing time windows?
>
> I suspect that your disks are "burning" (= your job is IO bound). Can you
> check with a tool like "iotop" how much disk IO Flink is producing?
> Then, I would set this number in relation with the theoretical maximum of
> your SSD's (a good rough estimate is to use dd for that).
>
> If you find that your disk bandwidth is saturated by Flink, you could look
> into tuning the RocksDB settings so that it uses more memory for caching.
>
> Regards,
> Robert
>
>
> On Fri, Dec 2, 2016 at 11:34 PM, Cliff Resnick <cre...@gmail.com> wrote:
>
>> In tests comparing RocksDb to fs state backend we observe much lower
>> throughput, around 10x slower. While the lowered throughput is expected,
>> what's perplexing is that machine load is also very low with RocksDb,
>> typically falling to  < 25% CPU and negligible IO wait (around 0.1%). Our
>> test instances are EC2 c3.xlarge which are 4 virtual CPUs and 7.5G RAM,
>> each running a single TaskManager in YARN, with 6.5G allocated memory per
>> TaskManager. The instances also have 2x40G attached SSDs which we have
>> mapped to `taskmanager.tmp.dir`.
>>
>> With FS state and 4 slots per TM, we will easily max out with an average
>> load average around 5 or 6, so we actually need throttle down the slots to
>> 3. With RocksDb using the Flink SSD configured options we see a load
>> average at around 1. Also, load (and actual) throughput remain more or less
>> constant no matter how many slots we use. The weak load is spread over all
>> CPUs.
>>
>> Here is a sample top:
>>
>> Cpu0  : 20.5%us,  0.0%sy,  0.0%ni, 79.5%id,  0.0%wa,  0.0%hi,  0.0%si,
>>  0.0%st
>> Cpu1  : 18.5%us,  0.0%sy,  0.0%ni, 81.5%id,  0.0%wa,  0.0%hi,  0.0%si,
>>  0.0%st
>> Cpu2  : 11.6%us,  0.7%sy,  0.0%ni, 87.0%id,  0.7%wa,  0.0%hi,  0.0%si,
>>  0.0%st
>> Cpu3  : 12.5%us,  0.3%sy,  0.0%ni, 86.8%id,  0.0%wa,  0.0%hi,  0.3%si,
>>  0.0%st
>>
>> Our pipeline uses tumbling windows, each with a ValueState keyed to a
>> 3-tuple of one string and two ints.. Each ValueState comprises a small set
>> of tuples around 5-7 fields each. The WindowFunction simply diffs agains
>> the set and updates state if there is a diff.
>>
>> Any ideas as to what the bottleneck is here? Any suggestions welcomed!
>>
>> -Cliff
>>
>>
>>
>>
>>
>>
>>
>


Resource under-utilization when using RocksDb state backend

2016-12-02 Thread Cliff Resnick
In tests comparing RocksDb to fs state backend we observe much lower
throughput, around 10x slower. While the lowered throughput is expected,
what's perplexing is that machine load is also very low with RocksDb,
typically falling to  < 25% CPU and negligible IO wait (around 0.1%). Our
test instances are EC2 c3.xlarge which are 4 virtual CPUs and 7.5G RAM,
each running a single TaskManager in YARN, with 6.5G allocated memory per
TaskManager. The instances also have 2x40G attached SSDs which we have
mapped to `taskmanager.tmp.dir`.

With FS state and 4 slots per TM, we will easily max out with an average
load average around 5 or 6, so we actually need throttle down the slots to
3. With RocksDb using the Flink SSD configured options we see a load
average at around 1. Also, load (and actual) throughput remain more or less
constant no matter how many slots we use. The weak load is spread over all
CPUs.

Here is a sample top:

Cpu0  : 20.5%us,  0.0%sy,  0.0%ni, 79.5%id,  0.0%wa,  0.0%hi,  0.0%si,
 0.0%st
Cpu1  : 18.5%us,  0.0%sy,  0.0%ni, 81.5%id,  0.0%wa,  0.0%hi,  0.0%si,
 0.0%st
Cpu2  : 11.6%us,  0.7%sy,  0.0%ni, 87.0%id,  0.7%wa,  0.0%hi,  0.0%si,
 0.0%st
Cpu3  : 12.5%us,  0.3%sy,  0.0%ni, 86.8%id,  0.0%wa,  0.0%hi,  0.3%si,
 0.0%st

Our pipeline uses tumbling windows, each with a ValueState keyed to a
3-tuple of one string and two ints.. Each ValueState comprises a small set
of tuples around 5-7 fields each. The WindowFunction simply diffs agains
the set and updates state if there is a diff.

Any ideas as to what the bottleneck is here? Any suggestions welcomed!

-Cliff


Re: Why are externalized checkpoints deleted on Job Manager exit?

2016-11-15 Thread Cliff Resnick
Hi,

Anything keeping this from being merged into master?

On Thu, Nov 3, 2016 at 10:56 AM, Ufuk Celebi  wrote:

> A fix is pending here: https://github.com/apache/flink/pull/2750
>
> The behaviour on graceful shut down/suspension respects the
> cancellation behaviour with this change.
>
> On Thu, Nov 3, 2016 at 3:23 PM, Ufuk Celebi  wrote:
> > I don't need the logs. Externalized checkpoints have been configured
> > to be deleted when the job is suspended, too. When the YARN session is
> > terminated, all jobs are suspended.
> >
> > The behaviour seems like a bug. As a work around you have to cancel
> > the job before you shut down the YARN session. Let me think for a
> > minute whether there is a good reason to discard externalized
> > checkpoints on suspension, but I don't think so.
> >
> > On Thu, Nov 3, 2016 at 3:00 PM, Ufuk Celebi  wrote:
> >> They should actually be not deleted.
> >>
> >> Could you please share the logs with me? In the mean time, I will try
> >> to reproduce this.
> >>
> >> On Thu, Nov 3, 2016 at 2:04 PM, Aljoscha Krettek 
> wrote:
> >>> +Ufuk
> >>>
> >>> Ufuk recently worked on that, if I'm not mistaken. Do you have an Idea
> what
> >>> could be going on here?
> >>>
> >>>
> >>> On Wed, 2 Nov 2016 at 21:52 Clifford Resnick 
> wrote:
> 
>  Testing externalized checkpoints in a YARN-based cluster, configured
> with:
> 
> 
>  env.getCheckpointConfig.enableExternalizedCheckpoints(
> ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
> 
>  I can confirm that checkpoint is retained between cancelled jobs,
> however
>  it’s deleted when the Job Manager session is gracefully shutdown. We’d
>  really like for the persistent checkpoint to be treated like a
> Savepoint and
>  not be deleted. Is there a way to enable this?
> 
> 
> >>>
>