Re: GC overhead limit exceeded when using Prometheus exporter

2021-02-16 Thread bat man
Hi Till, Tried increasing the task manager memory to 4GB but unfortunately EMR nodes are going down, investigating that for now. Will share the results in case this works out,if not then will get the heap dump. Thanks, Hemant On Tue, Feb 16, 2021 at 10:45 PM Till Rohrmann wrote: > Hi Hemant,

Re: Understanding blocking behavior

2021-02-16 Thread Arvid Heise
Hi Andreas, Julian already offered a good explanation, so here is one possible solution: you could try to run the whole first subpipeline with parallelism X and the second with P-X. However, most likely you need to run with P>X to finish in time. Another way is to use DataStream (your program is

How to report metric based on keyed state piece

2021-02-16 Thread Salva Alcántara
I wonder what is the canonical way to accomplish the following: Given a Flink UDF, how to report a metric `y` which is a function of some (keyed) state `X`? That is, `y=f(X)`. Most commonly, we are interested in the size of the state X. For instance, consider a `CoFlatMap` function with: - `X`

ConnectedStreams paused until control stream “ready”

2021-02-16 Thread Salva Alcántara
What is the canonical way to accomplish this: >Given a ConnectedStreams, e.g., a CoFlatMap UDF, how to prevent any processing of the data stream until >the control stream is "ready", so to speak My particular use case is as follows: I have a CoFlatMap function. The data stream contains elements

Re: Unit testing Async Operator

2021-02-16 Thread Arpith Prakash
Thanks Till, your solution worked perfectly. Arpith On Wed, Feb 17, 2021 at 12:53 AM Arvid Heise wrote: > Hi Arpith, > > The operator test harness is more meant for use cases where you implement > your own operator (quite advanced). > > If you just want to test your AsyncFunction, I'd strongly

Configure classes

2021-02-16 Thread Abhinav Sharma
Hi I am evaluating flink with use case where we need to create a basic flink pipeline, and inject the classes for map, reduce, process, etc via some xml configuration (or something equivalent). Eg: stream.keyBy(value -> value.getKey()) .window(TumblingProcessingWindow.of(Time.miuntes(1)))

Re: Understanding blocking behavior

2021-02-16 Thread Jaffe, Julian
Hey Andreas, Have a read through https://ci.apache.org/projects/flink/flink-docs-stable/dev/datastream_execution_mode.html#task-scheduling-and-network-shuffle and in particular the BATCH Execution Mode section. Your intuition is mostly correct – because your operators can’t be chained due to

Understanding blocking behavior

2021-02-16 Thread Hailu, Andreas [Engineering]
Hi folks, I'm trying to get a better understanding of what operations result in blocked partitions. I've got a batch-processing job that reads from 2 sources, and then performs a series of Maps/Filters/CoGroups all with the same parallelism to create a final DataSet to be written to two

Re: How do i register a streaming table sink in 1.12?

2021-02-16 Thread Clay Teeter
Thanks Till, the tickets and links were immensely useful. With that i was able to make progress and even get things to compile. However, when i run things a serializable exception is thrown. (see below) .addSink(JdbcSink.sink[SignableTableSchema]( >

Re: Unit testing Async Operator

2021-02-16 Thread Arvid Heise
Hi Arpith, The operator test harness is more meant for use cases where you implement your own operator (quite advanced). If you just want to test your AsyncFunction, I'd strongly recommend building a small ITCase like [1] and then you don't have to fiddle with these things anymore. The tests run

Re: GC overhead limit exceeded when using Prometheus exporter

2021-02-16 Thread Till Rohrmann
Hi Hemant, Have you tried running a new Flink version? Can you create a heap dump when the process fails? This could help us digging into whether there is some memory leak. Cheers, Till On Tue, Feb 16, 2021 at 5:21 PM bat man wrote: > Hi there, > > I am facing *java.lang.OutOfMemoryError: GC

Re: How do i register a streaming table sink in 1.12?

2021-02-16 Thread Till Rohrmann
Hi Clay, I am not a Table API expert but let me try to answer your question: With FLINK-17748 [1] the community removed the registerTableSink in favour of the connect API. The connect API has been deprecated [2] because it was not well maintained. Now the recommended way for specifying sinks is

Re: Flink’s Kubernetes HA services - NOT working

2021-02-16 Thread Till Rohrmann
If you are running a session cluster, then Flink will create a config map for every submitted job. These config maps will unfortunately only be cleaned up when you shut down the cluster. This is a known limitation which we want to fix soon [1, 2]. If you can help us with updating the

GC overhead limit exceeded when using Prometheus exporter

2021-02-16 Thread bat man
Hi there, I am facing *java.lang.OutOfMemoryError: GC overhead limit exceeded *when using prometheus exporter with* Flink 1.9 *on *AWS EMR *emr-5.28.1. I have other jobs which run fine. tihs specific job fails with the below error stack. Exception in thread "pool-3-thread-2"

How do i register a streaming table sink in 1.12?

2021-02-16 Thread Clay Teeter
Hey all. Hopefully this is an easy question. I'm porting my JDBC postgres sink from 1.10 to 1.12 I'm using: * StreamTableEnvironment * JdbcUpsertTableSink What I'm having difficulty with is how to register the sink with the streaming table environment. In 1.10:

Joining and windowing multiple streams using DataStream API or Table API & SQL

2021-02-16 Thread Pieter Bonte
Hi all, I’m trying to apply a window operator over multiple streams (more than 2) and join these streams within the validity of the window. However, I have some questions about the time semantics using both the DataStream API and the Table API/SQL. Lets say we have 3 streams, an A, B and C

Re: Flink’s Kubernetes HA services - NOT working

2021-02-16 Thread Till Rohrmann
Hi Omar, I think Matthias is right. The K8s HA services create and edit config maps. Hence they need the rights to do this. In the native K8s documentation there is a section about how to create a service account with the right permissions [1]. I think that our K8s HA documentation currently

Re: Flink behavior when one of the kafka sinks fails or is not properly dimensioned.

2021-02-16 Thread Till Rohrmann
Hi MS, 1. If a sink cannot send the data to Kafka it will make the sink fail and trigger a Flink failover. Depending on your topology this will also cause the other sink tasks to be restarted because they are all consuming from the same producing task. If you want to tolerate failures, then you

Re: Netty LocalTransportException: Sending the partition request to 'null' failed

2021-02-16 Thread Till Rohrmann
Hi Matthias, Can you make sure that node-1 and node-2 can talk to each other? It looks to me that node-2 fails to open a connection to the other TaskManager. Maybe the logs give some more insights. You can change the log level to DEBUG to gather more information. Cheers, Till On Tue, Feb 16,

Re: lazy loading for rocksdb backend (statefun)

2021-02-16 Thread Igal Shilman
Hello Stephan, The values are loaded lazily, upon access. And this applies to both PersistedTable and PersistedValue. For a PersistedTable in particular, calling get/set/remove will only affect the specific key, and calling entries() will fetch the entire map. Cheers, Igal. On Sun, Feb 14,

Re: Unit testing Async Operator

2021-02-16 Thread Till Rohrmann
Hi Arpith, looking at the definition of the GetMetadataAsyncProcess function you need to specify the TypeSerializer for a Tuple1>>. What you could try in order to not create the serializer manually is to use: TypeInformation.of(new TypeHint>>>(){}).createSerializer(new ExecutionConfig()) This

Re: Flink docker in session cluster mode - is a local distribution needed?

2021-02-16 Thread Till Rohrmann
Hi Manas, I think the documentation assumes that you first start a session cluster and then submit jobs from outside the Docker images. If your jobs are included in the Docker image, then you could log into the master process and start the jobs from within the Docker image. Cheers, Till On Tue,

Flink docker in session cluster mode - is a local distribution needed?

2021-02-16 Thread Manas Kale
Hi, I have a project that is a set of 6 jobs out of which 4 are written in Java and 2 are written in pyFlink. I want to dockerize these so that all 6 can be run in a single Flink session cluster. I have been able to successfully set up the JobManager and TaskManager containers as per [1] after

Unit testing Async Operator

2021-02-16 Thread Arpith techy
Hi, I tried mocking the Async operator which takes Tuple1, Tuple3 as Input & Output but while creating a test harness I couldn't find the right TupleSerializer. Can anyone help me on this? public class GetMetadataAsyncProcess extends RichAsyncFunction>>, Tuple3>, Map, List>>> { ... }

Netty LocalTransportException: Sending the partition request to 'null' failed

2021-02-16 Thread Matthias Seiler
Hi Everyone, I'm trying to setup a Flink cluster in standealone mode with two machines. However, running a job throws the following exception: `org.apache.flink.runtime.io.network.netty.exception.LocalTransportException: Sending the partition request to 'null' failed` Here is some background: