Best Practices? Fault Isolation for Processing Large Number of Same-Shaped Input Kafka Topics in a Big Flink Job

2024-05-13 Thread Kevin Lam via user
Hi everyone, I'm currently prototyping on a project where we need to process a large number of Kafka input topics (say, a couple of hundred), all of which share the same DataType/Schema. Our objective is to run the same Flink SQL on all of the input topics, but I am concerned about doing this in

Re: Issue with HybridSource recovering from Savepoint

2022-05-04 Thread Kevin Lam
. In intelliJ we just use the build and run functionality, and don't have access to the Flink CLI. On Tue, May 3, 2022 at 2:48 PM Kevin Lam wrote: > Hi, > > We're encountering an error using a HybridSource that is composed of a > FileSource + KafkaSource, only when recovering from a

Issue with HybridSource recovering from Savepoint

2022-05-03 Thread Kevin Lam
Hi, We're encountering an error using a HybridSource that is composed of a FileSource + KafkaSource, only when recovering from a savepoint [0]. This HybridSource is used to read from a Kafka topic's archives hosted on GCS via a bounded FileSource, and then automatically switch over to the data

Reading FileSource Files in a particular order

2022-03-14 Thread Kevin Lam
Hi all, We're interested in being able to use a FileSource read from a Google Cloud Storage (GCS) archive of messages from a Kafka topic, roughly in order. Our GCS archive is

Evolving Schemas with ParquetColumnarRowInputFormat

2022-03-09 Thread Kevin Lam
Hi all, We're interested in using ParquetColumnarRowInputFormat or similar with evolving Parquet schemas. Any advice or recommendations? Specifically, the

Re: Avro BulkFormat for the new FileSource API?

2022-02-08 Thread Kevin Lam
, 2022 at 4:19 PM Kevin Lam wrote: > Hi David, > > Awesome, wasn't aware of FLINK-24565. That's the kind of thing we were > looking for and will take a look at it. Thanks for sharing that! > > > > On Fri, Jan 7, 2022 at 2:05 PM David Morávek > wrote: > >>

Re: Future support for custom FileEnumerator in FileSource?

2022-02-02 Thread Kevin Lam
Hi, Totally missed that setFileEnumerator method. That definitely helps, I checked it out and this does what we were looking for. Thanks FG! On Wed, Feb 2, 2022 at 3:07 AM Francesco Guardiani wrote: > Hi, > From what I see here >

Future support for custom FileEnumerator in FileSource?

2022-02-01 Thread Kevin Lam
Hi all, We're interested in being able to filter files using the new FileSource API . Are there plans to add it? If there's existing work, we would be happy to help push this

Re: Is FlinkKafkaProducer state compatible with KafkaSink sink? How to migrate?

2022-01-12 Thread Kevin Lam
che.org/flink/flink-docs-release-1.14/release-notes/flink-1.14/#port-kafkasink-to-new-unified-sink-api-flip-143 > > On Tue, Jan 11, 2022 at 9:58 PM Kevin Lam wrote: > > > > Hi all, > > > > We're looking to migrating from FlinkKafkaProducer to the new KafkaSink > for

Is FlinkKafkaProducer state compatible with KafkaSink sink? How to migrate?

2022-01-11 Thread Kevin Lam
Hi all, We're looking to migrating from FlinkKafkaProducer to the new KafkaSink for the new unified Sink API. Is the state compatible across the two Kafka sink APIs? If not, what's the best way to migrate from one to the other? Thanks in advance, Kevin

Re: Plans to update StreamExecutionEnvironment.readFiles to use the FLIP-27 compatible FileSource?

2022-01-11 Thread Kevin Lam
he > FileSource. I updated the ticket accordingly. Perhaps there is a way > to migrate the state but it would be a larger effort. Is this an > important feature for you? > > Best, > Fabian > > On Mon, Jan 10, 2022 at 3:58 PM Kevin Lam wrote: > > > > Hi Fabian,

Re: Avro BulkFormat for the new FileSource API?

2022-01-10 Thread Kevin Lam
it seems > that the BulkFormat for Avro [1] has been added recently and will be > released with the Flink 1.15.x. > > [1] https://issues.apache.org/jira/browse/FLINK-24565 > > Best, > D. > > On Fri, Jan 7, 2022 at 7:23 PM Kevin Lam wrote: > >> Hi all, >> >&

Re: Plans to update StreamExecutionEnvironment.readFiles to use the FLIP-27 compatible FileSource?

2022-01-10 Thread Kevin Lam
t; already in the last few weeks of the release cycle for 1.15 so I > cannot guarantee that someone can implement it until then. > > Best, > Fabian > > [1] https://issues.apache.org/jira/browse/FLINK-25591 > > On Fri, Jan 7, 2022 at 5:07 PM Kevin Lam wrote: > &g

Avro BulkFormat for the new FileSource API?

2022-01-07 Thread Kevin Lam
Hi all, We're looking into using the new FileSource API, we see that there is a BulkFormat

Plans to update StreamExecutionEnvironment.readFiles to use the FLIP-27 compatible FileSource?

2022-01-07 Thread Kevin Lam
Hi all, Are there any plans to update StreamExecutionEnvironment.readFiles to

Python Interop with Java defined org.apache.flink.api.common.functions.Function classes?

2021-12-14 Thread Kevin Lam
Hi all, We currently operate several Flink applications using the Scala API, and run on kubernetes in Application mode. We're interested in researching the Python API and how we can support Python for application developers that prefer to use Python. We have a common library which implements a

Re: GCS/Object Storage Rate Limiting

2021-12-08 Thread Kevin Lam
the state. > > Does the restore operation fail, or the retry mechanism is sufficient to > work around this? > > D. > > On Thu, Dec 2, 2021 at 7:54 PM Kevin Lam wrote: > >> HI David, >> >> Thanks for your response. >> >> What's the DefaultSchedule

Re: GCS/Object Storage Rate Limiting

2021-12-02 Thread Kevin Lam
taking the checkpoint is different from the one you're trying to restore > with (especially with RocksDB). > > Best, > D. > > On Thu, Dec 2, 2021 at 4:29 PM Kevin Lam wrote: > >> Hi all, >> >> We're running a large (256 task managers with 4 task slots each) Fli

GCS/Object Storage Rate Limiting

2021-12-02 Thread Kevin Lam
Hi all, We're running a large (256 task managers with 4 task slots each) Flink Cluster with High Availability enabled, on Kubernetes, and use Google Cloud Storage (GCS) as our object storage for the HA metadata. In addition, our Flink application writes out to GCS from one of its sinks via

Is there a way to update checkpoint configuration for a job "in-place"?

2021-11-02 Thread Kevin Lam
Hi all, We run a Flink application on Kubernetes in Application Mode using Kafka with exactly-once-semantics and high availability. We are looking into a specific failure scenario: a flink job that has too short a checkpoint timeout (execution.checkpointing.timeout) and at some point during the

Re: RocksDB: Spike in Memory Usage Post Restart

2021-10-06 Thread Kevin Lam
Hi Fabian, Yes I can tell you a bit more about the job we are seeing the problem with. I'll simplify things a bit but this captures the essence: 1. Input datastreams are from a few kafka sources that we intend to join. 2. We wrap the datastreams we want to join into a common container class and

Re: RocksDB: Spike in Memory Usage Post Restart

2021-10-06 Thread Kevin Lam
Hi Fabian, Thanks for collecting feedback. Here's the answers to your questions: 1. Yes, we enabled incremental checkpoints for our job by setting `state.backend.incremental` to true. As for whether the checkpoint we recover from is incremental or not, I'm not sure how to determine that. It's

Re: RocksDB: Spike in Memory Usage Post Restart

2021-10-05 Thread Kevin Lam
y free all resources and shutdown." When a running job fails and a running TaskManager restores from checkpoint, is the old Embedded RocksDb being cleaned up properly? I wasn't really sure where to look in the Flink source code to verify this. On Mon, Oct 4, 2021 at 4:56 PM Kevin Lam wrote: &g

Re: RocksDB: Spike in Memory Usage Post Restart

2021-10-04 Thread Kevin Lam
We tried with 1.14.0, unfortunately we still run into the issue. Any thoughts or suggestions? On Mon, Oct 4, 2021 at 9:09 AM Kevin Lam wrote: > Hi Fabian, > > We're using our own image built from the official Flink docker image, so > we should have the code to use jemalloc i

Re: RocksDB: Spike in Memory Usage Post Restart

2021-10-04 Thread Kevin Lam
Hi Fabian, We're using our own image built from the official Flink docker image, so we should have the code to use jemalloc in the docker entrypoint. I'm going to give 1.14 a try and will let you know how it goes. On Mon, Oct 4, 2021 at 8:29 AM Fabian Paul wrote: > Hi Kevin, > > We bumped the

Re: RocksDB: Spike in Memory Usage Post Restart

2021-10-01 Thread Kevin Lam
Hi Fabian, Thanks for your response. Sure, let me tell you a bit more about the job. - Flink version 1.13.1 (I also tried 1.13.2 because I saw FLINK-22886 , but this didn't help) - We're running on kubernetes in an application

RocksDB: Spike in Memory Usage Post Restart

2021-09-30 Thread Kevin Lam
Hi all, We're debugging an issue with OOMs that occurs on our jobs shortly after a restore from checkpoint. Our application is running on kubernetes and uses RocksDB as it's state backend. We reproduced the issue on a small cluster of 2 task managers. If we killed a single task manager, we

Re: Not able to avoid Dynamic Class Loading

2021-09-22 Thread Kevin Lam
rializable schema... Maybe this might also explain why it works in > one submission and not in the other? > > On Fri, Aug 27, 2021 at 4:10 PM Kevin Lam wrote: > >> There's no inner classes, and none of the fields >> of DebeziumAvroRegistryDeserializationSchema have a

Re: TaskManagers OOM'ing for Flink App with very large state only when restoring from checkpoint

2021-09-13 Thread Kevin Lam
> https://faun.pub/how-much-is-too-much-the-linux-oomkiller-and-used-memory-d32186f29c9d > > Regards, > Alexis. > > -- > *From:* Guowei Ma > *Sent:* Monday, September 13, 2021 8:35 AM > *To:* Kevin Lam > *Cc:* user > *Subject:* Re: TaskM

TaskManagers OOM'ing for Flink App with very large state only when restoring from checkpoint

2021-09-10 Thread Kevin Lam
Hi all, We've seen scenarios where TaskManagers will begin to OOM, shortly after a job restore from checkpoint. Our flink app has a very large state (100s of GB) and we use RocksDB as a backend. Our repro is something like this: run the job for an hour and let it accumulate state, kill a task

Re: Not able to avoid Dynamic Class Loading

2021-08-27 Thread Kevin Lam
There's no inner classes, and none of the fields of DebeziumAvroRegistryDeserializationSchema have an Avro schema, even when expanded, including KafkaClusterConfig. KafkaClusterConfig is just composed of Strings and Booleans. DebeziumAvroRegistryDeserializationSchema has a field that initializes

Re: Not able to avoid Dynamic Class Loading

2021-08-26 Thread Kevin Lam
I also tested serializing an instance of `OurSource` with `org.apache.commons.lang3.SerializationUtils.clone` and it worked fine. On Thu, Aug 26, 2021 at 3:27 PM Kevin Lam wrote: > Hi Arvid, > > Got it, we don't use Avro.schema inside of > DebeziumAvroRegistryDeserializationSchema,

Re: Not able to avoid Dynamic Class Loading

2021-08-26 Thread Kevin Lam
/github.com/apache/flink/blob/master/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/SerializableAvroSchema.java#L34-L34 > > On Thu, Aug 26, 2021 at 3:16 PM Kevin Lam wrote: > >> Hi! >> >> We're using 1.13.1. We have a class in our user code th

Re: Not able to avoid Dynamic Class Loading

2021-08-26 Thread Kevin Lam
ot contain fields related to Avro. > > Jars in usrlib has a higher priority to be loaded than jars in lib. So if > there is another FlinkKafkaConsumer class in your user jar then it might > affect class loading and thus affect this issue. > > Kevin Lam 于2021年8月25日周三 下午11:18写道: >

Not able to avoid Dynamic Class Loading

2021-08-25 Thread Kevin Lam
Hi all, I'm trying to avoid dynamic class loading my user code [0] due to a suspected classloading leak, but when I put my application jar into /lib instead of /usrlib, I run into the following error: ``` The main method caused an error: The implementation of the FlinkKafkaConsumer is not

Re: Task Managers having trouble registering after restart

2021-08-24 Thread Kevin Lam
a's Consumer API itself. > > I'll pull in @Chesnay Schepler who afaik debugged > the leak a while ago. > > On Mon, Aug 23, 2021 at 9:24 PM Kevin Lam wrote: > >> Actually, we are using the `FlinkKafkaConsumer` [0] rather than >> `KafkaSource`. Is there a

Re: Task Managers having trouble registering after restart

2021-08-23 Thread Kevin Lam
/flink/streaming/connectors/kafka/FlinkKafkaConsumer.html On Mon, Aug 23, 2021 at 2:55 PM Kevin Lam wrote: > Thanks Arvid! I will give this a try and report back. > > On Mon, Aug 23, 2021 at 11:07 AM Arvid Heise wrote: > >> Hi Kevin, >> >> "java.lang.OutOfMemoryE

Re: Task Managers having trouble registering after restart

2021-08-23 Thread Kevin Lam
ttps://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/debugging/debugging_classloading/#unloading-of-dynamically-loaded-classes-in-user-code > [2] > https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/datastream/kafka/#additional-properties > > On

Re: Job manager sometimes doesn't restore job from checkpoint post TaskManager failure

2021-08-23 Thread Kevin Lam
how Flink handled the TM > failure. > > On 19/08/2021 16:06, Kevin Lam wrote: > > Hi all, > > I've noticed that sometimes when task managers go down--it looks like the > job is not restored from checkpoint, but instead restarted from a fresh > state (when I go to the j

Job manager sometimes doesn't restore job from checkpoint post TaskManager failure

2021-08-19 Thread Kevin Lam
Hi all, I've noticed that sometimes when task managers go down--it looks like the job is not restored from checkpoint, but instead restarted from a fresh state (when I go to the job's checkpoint tab in the UI, I don't see the restore, and the number in the job overview all get reset). Under what

Task Managers having trouble registering after restart

2021-08-17 Thread Kevin Lam
Hi all, I'm observing an issue sometimes, and it's been hard to reproduce, where task managers are not able to register with the Flink cluster. We provision only the number of task managers required to run a given application, and so the absence of any of the task managers causes the job to enter

Re: FW: Error using KafkaProducer EXACTLY_ONCE semantic + TaskManager Failure

2021-08-04 Thread Kevin Lam
n are you using? (=What is effectively bundled > into your application jar?) > > On Wed, Aug 4, 2021 at 5:56 PM Kevin Lam wrote: > >> Thanks Matthias. >> >> I just tried this backport (https://github.com/apache/flink/pull/16693) >> and got the following error,

Re: FW: Error using KafkaProducer EXACTLY_ONCE semantic + TaskManager Failure

2021-08-04 Thread Kevin Lam
gt; > > > Thias > > > > *From:* Kevin Lam > *Sent:* Dienstag, 3. August 2021 15:56 > *To:* Schwalbe Matthias > *Cc:* user ; fabianp...@ververica.com > *Subject:* Re: FW: Error using KafkaProducer EXACTLY_ONCE semantic + > TaskManager Failure > > > > Thank

Error using KafkaProducer EXACTLY_ONCE semantic + TaskManager Failure

2021-07-29 Thread Kevin Lam
Hi user@, We're developing a Flink application, and using the FlinkKafkaProducer. Semantic.EXACTLY_ONCE producer semantic to output records to a Kafka topic in an exactly-once way. We run our flink application on kubernetes. I've observed that if a task manager fails (I've simulated this by

Monitoring Exceptions using Bugsnag

2021-06-21 Thread Kevin Lam
Hi all, I'm interested in instrumenting an Apache Flink application so that we can monitor exceptions. I was wondering what the best practices are here? Is there a good way to observe all the exceptions inside of a Flink application, including Flink internals? We are currently thinking of using

Monitoring Exceptions using Bugsnag

2021-06-18 Thread Kevin Lam
Hi all, I'm interested in instrumenting an Apache Flink application so that we can monitor exceptions. I was wondering what the best practices are here? Is there a good way to observe all the exceptions inside of a Flink application, including Flink internals? We are currently thinking of using

Re: Reducing Task Manager Count Greatly Increases Savepoint Restore

2021-04-12 Thread Kevin Lam
> range into a new RocksDB instance. This causes a lot of read and write > amplification. > > Cheers, > Till > > On Wed, Apr 7, 2021 at 4:07 PM Kevin Lam wrote: > >> Hi all, >> >> We are trying to benchmark savepoint size vs. restore time. >> >&

Reducing Task Manager Count Greatly Increases Savepoint Restore

2021-04-07 Thread Kevin Lam
Hi all, We are trying to benchmark savepoint size vs. restore time. One thing we've observed is that when we reduce the number of task managers, the time to restore from a savepoint increases drastically: 1/ Restoring from 9.7tb savepoint onto 156 task managers takes 28 minutes 2/ Restoring

Measuring the Size of State, Savepoint Size vs. Restore time

2021-03-31 Thread Kevin Lam
Hi all, We're interested in doing some analysis on how the size of our savepoints and state affects the time it takes to restore from a savepoint. We're running Flink 1.12 and using RocksDB as a state backend, on Kubernetes. What is the best way to measure the size of a Flink Application's

Re: OOM issues with Python Objects

2021-03-24 Thread Kevin Lam
PM Arvid Heise wrote: > >> Hi Kevin, >> >> yes I understood that, but then your Python class contains a Row field, >> where no mapping exists. I'm assuming PyFlink tries to do a deep conversion >> and fails to do so by ending in some infinite loop. >> >>

Re: OOM issues with Python Objects

2021-03-22 Thread Kevin Lam
also pulling in Dian who knows more about PyFlink. > > [1] > https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/datastream-api-users-guide/data_types.html > > On Fri, Mar 19, 2021 at 3:55 PM Kevin Lam wrote: > >> Hi all, >> >> I've encountere

OOM issues with Python Objects

2021-03-19 Thread Kevin Lam
Hi all, I've encountered an interesting issue where I observe an OOM issue in my Flink Application when I use a DataStream of Python Objects, but when I make that Python Object a Subclass of pyflink.common.types.Row and provide TypeInformation, there is no issue. For the OOM scenario, no

Re: Python API + Unit Testing

2021-03-19 Thread Kevin Lam
Dian > > [1] > https://cwiki.apache.org/confluence/display/FLINK/Setting+up+a+Flink+development+environment > > 2021年3月18日 下午10:46,Kevin Lam 写道: > > Hi all, > > I noticed there isn't much in the way of testing discussed in the Python > API docs > <https://ci.apache.org/projec

Python API + Unit Testing

2021-03-18 Thread Kevin Lam
Hi all, I noticed there isn't much in the way of testing discussed in the Python API docs for Flink. Does the community have any best-practices or recommendations on how testing should be done with PyFlink? Thanks!

Re: Working with DataStreams of Java objects in Pyflink?

2021-03-15 Thread Kevin Lam
JO type in the future. > > Best, > Shuiqiang > > Kevin Lam 于2021年3月15日周一 下午10:46写道: > >> Hi all, >> >> Looking to use Pyflink to work with some scala-defined objects being >> emitted from a custom source. When trying to manipulate the objects in a >> pyfli

Working with DataStreams of Java objects in Pyflink?

2021-03-15 Thread Kevin Lam
Hi all, Looking to use Pyflink to work with some scala-defined objects being emitted from a custom source. When trying to manipulate the objects in a pyflink defined MapFunction

Re: Python DataStream API Questions -- Java/Scala Interoperability?

2021-03-10 Thread Kevin Lam
A follow-up question--In the example you provided Shuiqiang, there were no arguments passed to the constructor of the custom sink/source. What's the best way to pass arguments to the constructor? On Fri, Mar 5, 2021 at 4:29 PM Kevin Lam wrote: > Thanks Shuiqiang! That's really helpful, we

Re: Running Pyflink job on K8s Flink Cluster Deployment?

2021-03-08 Thread Kevin Lam
u might need to build a custom image with Python >> and PyFlink install, please refer to Enbale Python in docker >> <https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/standalone/docker.html#enabling-python> >> . >> >> Generally,

Re: Python DataStream API Questions -- Java/Scala Interoperability?

2021-03-05 Thread Kevin Lam
s('/the/path/of/your/MyBigTableSink.jar') > # ... > ds.add_sink(MyBigTableSink("com.mycompany.MyBigTableSink")) > env.execute("Application with Custom Sink") > > > if __name__ == '__main__': > example() > > Remember that you must add the jar

Running Pyflink job on K8s Flink Cluster Deployment?

2021-03-05 Thread Kevin Lam
Hello everyone, I'm looking to run a Pyflink application run in a distributed fashion, using kubernetes, and am currently facing issues. I've successfully gotten a Scala Flink Application to run using the manifests provided at [0] I attempted to run the application by updating the jobmanager

Python DataStream API Questions -- Java/Scala Interoperability?

2021-03-02 Thread Kevin Lam
Hello everyone, I have some questions about the Python API that hopefully folks in the Apache Flink community can help with. A little background, I’m interested in using the Python Datastream API because of stakeholders who don’t have a background in Scala/Java, and would prefer Python if