Re: Flatmap operator in an Asynchronous call

2022-03-25 Thread Diwakar Jha
. On Wed, Mar 9, 2022 at 4:52 AM Arvid Heise wrote: > You can use flatMap to flatten and have an asyncIO after it. > > On Wed, Mar 9, 2022 at 8:08 AM Diwakar Jha wrote: > >> Thanks Gen, I will look into customized Source and SpiltEnumerator. >> >> On Mon, Mar 7, 2

Flink STOP with savepoint

2022-03-24 Thread Diwakar Jha
Hello Everyone, I'm running Flink 1.11 as EMR 6.1 as a Yarn application. I'm trying to use STOP command to capture savepoint and restart job from the same savepoint during redeployment. flink stop -p $JOB_RUNNING -yid $YARN_APP_ID Problem : job completes savepoint on Flink UI but it throw the

Re: Flatmap operator in an Asynchronous call

2022-03-08 Thread Diwakar Jha
t; pollNext, so you don't have to implement it asynchronously. It would be > better if the readers read the lines and the records are enriched in a map > function following. > > > > On Tue, Mar 8, 2022 at 5:17 AM Diwakar Jha wrote: > >> Hello Everyone, >> >> I'm

Flatmap operator in an Asynchronous call

2022-03-07 Thread Diwakar Jha
Hello Everyone, I'm running a streaming application using Flink 1.11 and EMR 6.01. My use case is reading files from a s3 bucket, filter file contents ( say record) and enrich each record. Filter records and output to a sink. I'm reading 6k files per 15mints and the total number of records is 3

Re: Fwd: How to get memory specific metrics for tasknodes

2022-02-18 Thread Diwakar Jha
to avoid. Thanks! On Wed, Feb 16, 2022 at 11:35 PM Chesnay Schepler wrote: > It is currently not possible to select metrics. > > What you can do however is create a custom reporter that wraps the StatsD > reporter which does this filtering. > > On 16/02/2022 17:41, Diwakar Jha

Fwd: How to get memory specific metrics for tasknodes

2022-02-16 Thread Diwakar Jha
to provide any other information. Thank you! -- Forwarded message - From: Diwakar Jha Date: Tue, Feb 15, 2022 at 1:31 PM Subject: How to get memory specific metrics for tasknodes To: user Hello, I'm running Flink 1.11 on AWS EMR using the Yarn application. I'm trying to access memory

How to get memory specific metrics for tasknodes

2022-02-15 Thread Diwakar Jha
Hello, I'm running Flink 1.11 on AWS EMR using the Yarn application. I'm trying to access memory metrics(Heap.Max, Heap.Used) per tasknode in CloudWatch. I have 50 tasknodes and it creates Millions of metrics(including per operator) though I need only a few metrics per tasknode (Heap.Max,

Re: TimeoutException in Flink 1.11 stop command

2021-05-12 Thread Diwakar Jha
ot;client.timeout"). > It could also be that the savepoint operation takes abnormally long; for > example due to IO bottlenecks. > > I suggest to look into the JobManager logs to see whether the savepoint > was actually created / the application shut down, and if so then maybe just &g

TimeoutException in Flink 1.11 stop command

2021-05-11 Thread Diwakar Jha
Hello, I'm trying to use the flink 1.11 stop command to gracefully shutdown application with savepoint. flink stop --savepointPath s3a://path_to_save_point > c5d52e0146258f80fd52a3bf002d2a1b -yid application_1620673166934_0001 > 2021-05-11 06:26:57,852 ERROR

How Flink's async-io-api parameters work?

2021-05-01 Thread Diwakar Jha
Hello, I'm trying to solve a problem using async-io-api. I'm running a flinkjob which has a sleep time of 120sec during restart. After every deployment i see around 10K file load time out. I suspect it is because of the TimeOut parameter being *30 sec*. I played with that number and found out

Re: Flink custom trigger use case

2021-02-24 Thread Diwakar Jha
is not sufficient but i want to know how i can do it using processWindowFunction. Appreciate any pointers. Thanks! On Wed, Feb 24, 2021 at 10:50 AM Diwakar Jha wrote: > Hi Arvid, > > Thanks. I tried FIRE instead of FIRE_AND_PURGE and it introduced > duplicates though the result is still the sam

Re: Flink custom trigger use case

2021-02-24 Thread Diwakar Jha
t; [1] >> >> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#processwindowfunction >> >> Regards, >> Roman >> >> >> On Tue, Feb 23, 2021 at 3:29 AM Diwakar Jha >> wrote: >> >>> >>> Hello, >

Re: Flink custom trigger use case

2021-02-23 Thread Diwakar Jha
ister an event time timer (for the window end). So that > trigger.onEventTime() will be called. > And it's safer to check if the state (firstSeen) value is true, not just > exists. > > Regards, > Roman > > > On Tue, Feb 23, 2021 at 3:29 AM Diwakar Jha > wrote: > >

Fwd: Flink custom trigger use case

2021-02-23 Thread Diwakar Jha
Hello, posting again for help. I'm planning to use state TTL but would like to know if there is any other way to do it. I'm using Flink 1.11. Thanks! -- Forwarded message - From: Diwakar Jha Date: Mon, Feb 22, 2021 at 6:28 PM Subject: Flink custom trigger use case To: user

Flink custom trigger use case

2021-02-22 Thread Diwakar Jha
Hello, I'm trying to use a custom trigger for one of my use case. I have a basic logic (as shown below) of using keyBy on the input stream and using a window of 1 min. .keyBy() .window(TumblingEventTimeWindows.of(Time.seconds(60))) .trigger(new CustomTrigger())

Re: how to enable metrics in Flink 1.11

2020-11-13 Thread Diwakar Jha
using "yarn app -stop > application_1603649952937_0002". To stop Flink on YARN, use: "yarn > application -kill ". > > > > On Sat, Oct 31, 2020 at 6:26 PM Diwakar Jha > wrote: > >> Hi, >> >> I wanted to check if anyone can help me with th

Re: Flink 1.11 not showing logs

2020-11-11 Thread Diwakar Jha
uration=file:./log4j.properties > -Dlog4j.configurationFile=file:./log4j.properties > > Best, > Yang > > Diwakar Jha 于2020年11月2日周一 下午11:37写道: > >> Sure. I will check that and get back to you. could you please share how >> to check java dynamic options? >> >> Best, >>

Re: error in using package SubnetUtils

2020-11-11 Thread Diwakar Jha
per dependent and use > org.apache.commons.net.util.SubnetUtils. > > On Tue, Nov 10, 2020 at 6:11 PM Diwakar Jha > wrote: > >> Hello, >> >> I'm migrating from Flink 1.8 to Flink 1.11 on an EMR cluster and I get >> this error message for using package subnetUti

error in using package SubnetUtils

2020-11-10 Thread Diwakar Jha
Hello, I'm migrating from Flink 1.8 to Flink 1.11 on an EMR cluster and I get this error message for using package subnetUtils. Its working fine for Flink 1.8. [javac] import org.apache.flink.fs.s3presto.shaded.com.facebook.presto.hadoop.$internal.org.apache.commons.net.util.SubnetUtils;

Re: Flink 1.11 not showing logs

2020-11-02 Thread Diwakar Jha
W, could you share the new yarn logs? > > Best, > Yang > > Diwakar Jha 于2020年11月2日周一 下午4:32写道: > >> >> >> Hi Yang, >> >> Thank you so much for taking a look at the log files. I changed my >> log4j.properties. Below is the actual file that I go

Re: Flink 1.11 not showing logs

2020-11-02 Thread Diwakar Jha
other log files or configuration. Thanks. On Sun, Nov 1, 2020 at 10:06 PM Yang Wang wrote: > Hi Diwakar Jha, > > From the logs you have provided, everything seems working as expected. The > JobManager and TaskManager > java processes have been started with correct dynamic opti

Flink 1.11 not showing logs

2020-11-01 Thread Diwakar Jha
Hi I'm running Flink 1.11 on EMR 6.1.0. I can see my job is running fine but i'm not seeing any taskmanager/jobmanager logs. I see the below error in stdout. 18:29:19.834 [flink-akka.actor.default-dispatcher-28] ERROR org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerLogFileHandler -

Re: how to enable metrics in Flink 1.11

2020-10-31 Thread Diwakar Jha
on the TaskExecutor. Thanks! On Fri, Oct 30, 2020 at 9:04 AM Diwakar Jha wrote: > Hello, > > I see that in my class path (below) I have both log4j-1 and lo4j-api-2. is > this because of which i'm not seeing any logs. If so, could someone suggest > how to fix it? > > export > CLASSPAT

Re: how to enable metrics in Flink 1.11

2020-10-30 Thread Diwakar Jha
.12-1.11.0.jar:lib/flink-table_2.12-1.11.0.jar: *lib/log4j-1.2-api-2.12.1.jar:lib/log4j-api-2.12.1.jar* :lib/log4j-core-2.12.1.jar:lib/log4j-slf4j-impl-2.12.1.jar:flink-dist_2.12-1.11.0.jar:flink-conf.yaml:" thanks. On Thu, Oct 29, 2020 at 6:21 PM Diwakar Jha wrote: > Hello Everyone, > >

Re: how to enable metrics in Flink 1.11

2020-10-29 Thread Diwakar Jha
to stop and get more logs. Could someone please help me figure this out? I'm running Flink 1.11 on the EMR 6.1 cluster. On Tue, Oct 27, 2020 at 1:06 PM Diwakar Jha wrote: > Hi Robert, > Could please correct me. I'm not able to stop the app. Also, i > stopped flink job already. > &g

Re: how to enable metrics in Flink 1.11

2020-10-27 Thread Diwakar Jha
inished > applications ("End of LogType:prelaunch.out.This log file belongs to a > running container (container_1603649952937_0002_01_02) and so may not > be complete.") > > Please stop the app, then provide the logs. > > > On Tue, Oct 27, 2020 at 5:11 PM Diwakar

Re: how to enable metrics in Flink 1.11

2020-10-26 Thread Diwakar Jha
0_252] at java.lang.Thread.run <http://java.lang.thread.run/>( Thread.java:748 <http://thread.java:748/>) ~[?:1.8.0_252] Appreciate if anyone has any pointer for this. On Mon, Oct 26, 2020 at 10:45 AM Chesnay Schepler wrote: > Flink 1.11 uses slf4j 1.7.15; the easiest way to check the

how to enable metrics in Flink 1.11

2020-10-25 Thread Diwakar Jha
: org.apache.flink.metrics.statsd.StatsDReporterFactory metrics.reporter.stsd.host: localhost metrics.reporter.stsd.port: 8125 b) I tried downloading the statsd jar from https://mvnrepository.com/artifact/org.apache.flink/flink-metrics-statsd putting it inside plugins/statsd directory. -- Best, Diwakar