Re: Performance issue when writing to HDFS

2020-05-25 Thread Mu Kong
Hi Gao, Thanks for helping out. It turns out to be a networking issue of our HDFS datanode. But I was hoping to find out a way to weaken the impact of such issue. Things I considered was to shorten the waiting time for a file to close when there is no incoming data, so that there will be fewer ope

Re: close file on job crash

2020-05-25 Thread Piotr Nowojski
Hi, One clarification. `RichFunction#close` is of course called always, not only after internal failure. It’s called after internal failure, external failure or clean shutdown. `SourceFunction#cancel` is intended to inform the `SourceFunction` to cleanly exit it’s `#run` method/loop (note SIGI

Re: Multiple Sinks for a Single Soure

2020-05-25 Thread Prasanna kumar
Piotr, Thanks for the reply. There is one other case, where some events have to be written to multiple sinks and while other have to be written to just one sink. How could i have a common codeflow/DAG for the same ? I do not want multiple jobs to do the same want to accomplish in a single job .

Re: stateful-fun2.0 checkpointing

2020-05-25 Thread Tzu-Li (Gordon) Tai
Hi, I replied to your question on this in your other email thread. Let us know if you have other questions! Cheers, Gordon On Sun, May 24, 2020, 1:01 AM C DINESH wrote: > Hi Team, > > 1. How can we enable checkpointing in stateful-fun2.0 > 2. How to set parallelism > > Thanks, > Dinesh. > >

Re: Stateful-fun-Basic-Hello

2020-05-25 Thread Tzu-Li (Gordon) Tai
Hi, You're right, maybe the documentation needs a bit more directions there, especially for people who are newer to Flink. 1. How to increase parallelism There are two ways to do this. Either set the `parallelism.default` also in the flink-conf.yaml, or use the -p command line option when starti

Re: In consistent Check point API response

2020-05-25 Thread Vijay Bhaskar
Hi Yun Understood the issue now: "restored" always shows only the check point that is used for restoring previous state In all the attempts < 6 ( in my case max attempts are 5, 6 is the last attempt) Flink HA is restoring the state, so restored and latest are same value if the last attempt == 6

Re: Stateful-fun-Basic-Hello

2020-05-25 Thread C DINESH
Hi Team, I mean to say that know I understood. but in the documentation page flink-conf.yaml is not mentioned On Mon, May 25, 2020 at 7:18 PM C DINESH wrote: > Thanks Gordon, > > I read the documentation several times. But I didn't understand at that > time, flink-conf.yaml is not there. > > ca

Re: In consistent Check point API response

2020-05-25 Thread Yun Tang
Hi Bhaskar It seems I still not understand your case-5 totally. Your job failed 6 times, and recover from previous checkpoint to restart again. However, you found the REST API told the wrong answer. How do you ensure your "restored" field is giving the wrong checkpoint file which is not latest?

Re: How to control the banlence by using FlinkKafkaConsumer.setStartFromTimestamp(timestamp)

2020-05-25 Thread C DINESH
HI Jary, My first point is wrong. If we give these settings also flink will consume the whole data from the last one day.That is what we want right? late data is defined by window length and water marks strategy. Are you combining your streams .please provide these details so that we can understa

Re: Apache Flink - Error on creating savepoints using REST interface

2020-05-25 Thread M Singh
Hi Chesney: The SavepointTriggerRequestBody indicates defaultValue for cancel-job attribute, so is it not being honored ? https://github.com/apache/flink/blob/release-1.6/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/savepoints/SavepointTriggerRequestBody.java Thanks

Re: Apache Flink - Question about application restart

2020-05-25 Thread M Singh
Hi Zhu Zhu: Just to clarify - from what I understand, EMR also has by default restart times (I think it is 3). So if the EMR restarts the job - the job id is the same since the job graph is the same.  Thanks for the clarification. On Monday, May 25, 2020, 04:01:17 AM EDT, Yang Wang wrote:

Re: close file on job crash

2020-05-25 Thread Piotr Nowojski
Hi, Cancel method is being invoked only when SourceTask is being cancelled from the outside, by JobManager - for example after detecting a failure of a different Task. > What is the proper way to handle this issue? Is there some kind of closable > source interface we should implement? Have yo

Re: Singal task backpressure problem with Credit-based Flow Control

2020-05-25 Thread Piotr Nowojski
Hi Weihua, > After dumping the memory and analyzing it, I found: > Sink (121)'s RemoteInputChannel.unannouncedCredit = 0, > Map (242)'s CreditBasedSequenceNumberingViewReader.numCreditsAvailable = 0. > This is not consistent with my understanding of the Flink network > transmission mechanism. It

Re: Multiple Sinks for a Single Soure

2020-05-25 Thread Piotr Nowojski
Hi, To the best of my knowledge the following pattern should work just fine: DataStream myStream = env.addSource(…).foo().bar() // for custom source, but any ; myStream.addSink(sink1); myStream.addSink(sink2); myStream.addSink(sink3); All of the records from `myStream` would be passed to each o

Re: Collecting operators real output cardinalities as json files

2020-05-25 Thread Piotr Nowojski
Hi Francesco, Have you taken a look at the metrics? [1] And IO metrics [2] in particular? You can use some of the pre-existing metric reporter [3] or implement a custom one. You could export metrics to some 3rd party system, and get JSONs from there, or export them to JSON directly via a custom

Re: How can I set the parallelism higher than the task slot number in more machines?

2020-05-25 Thread Felipe Gutierrez
Solved! that was because I was using slotSharingGroup() in all operators to ensure that they stay in the same task slot. I guess Flink was creating dummy operators to ensure that. Thanks anyway. -- -- Felipe Gutierrez -- skype: felipe.o.gutierrez -- https://felipeogutierrez.blogspot.com On Mon, M

Re: java.lang.AbstractMethodError when implementing KafkaSerializationSchema

2020-05-25 Thread Piotr Nowojski
Hi, It would be helpful if you could provide full stack trace, what Flink version and which Kafka connector version are you using? It sounds like either a dependency convergence error (mixing Kafka dependencies/various versions of flink-connector-kafka inside a single job/jar) or some shading

How can I set the parallelism higher than the task slot number in more machines?

2020-05-25 Thread Felipe Gutierrez
Hi, I deployed Flink 1.10 standalone in a cluster with 4 machines 8 cores each. Then I configured each machine to have 8 Task Slots and parallelism default of 8. taskmanager.numberOfTaskSlots: 8 parallelism.default: 8 I want to run my stream app with a parallelism of 16 for each subtask. But not h

Re: Stateful-fun-Basic-Hello

2020-05-25 Thread C DINESH
Thanks Gordon, I read the documentation several times. But I didn't understand at that time, flink-conf.yaml is not there. can you please suggest 1. how to increase parallelism 2. how to give checkpoints to the job As far as I know there is no documentation regarding this. or Are these features

Re: How to control the banlence by using FlinkKafkaConsumer.setStartFromTimestamp(timestamp)

2020-05-25 Thread C DINESH
HI Jary, The easiest and simple solution is while creating consumer you can pass different config based on your requirements Example : For creating consumer for topic A you can pass config as max.poll.records: “1" max.poll.interval.ms: "1000” For creating consumer for topic B you can p

Re: Query Rest API from IDE during runtime

2020-05-25 Thread Chesnay Schepler
If you set DeploymentOptions.ATTACHED to false then execute() does not block until the job finishes, and returns a DetachedJobExecutionResult from which you can retrieve the Job ID. If you need to know when the job finishes you will have to continuously query the REST API. This is the only way

Re: Query Rest API from IDE during runtime

2020-05-25 Thread Annemarie Burger
Hi, Thanks for your reply and explanation! Do you know of any way to have a job retrieve its own jobID while it's still running? Best, Annemarie -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Timeout Callbacks issue -Flink

2020-05-25 Thread Jaswin Shah
Yeah yes, I got it what u tried to convey. From: Dawid Wysakowicz Sent: Monday, May 25, 2020 16:48 To: Jaswin Shah; user@flink.apache.org; ankit.sing...@paytm.com; isha.sing...@paytm.com Subject: Re: Timeout Callbacks issue -Flink Where did I say you cannot get

Re: Timeout Callbacks issue -Flink

2020-05-25 Thread Dawid Wysakowicz
Where did I say you cannot get the key for which the onTimer method is called? You can and you do it with OnTimerContext.getCurrentKey, the way you do it. My only point is that if I understand your code correctly you will only ever have a single entry in the MapState for every key. Remember that *m

Re: Timeout Callbacks issue -Flink

2020-05-25 Thread Jaswin Shah
Like, callback mechanism is designed to get a callback for key registered when the timer reaches the expiry registered. So, at the moment flink gives the callback, there must be a mechanism to get a key for which I am receiving the call back, right? If this is not possible what is sole purpose o

Re: Query Rest API from IDE during runtime

2020-05-25 Thread Chesnay Schepler
StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(config) does not actually create any resources yet, this only happens when you run a job. Upon execute() the Flink cluster is started, the job is run, and once the job finishes (and execute() returns) the cluster shuts down. So, you can

Re: Query Rest API from IDE during runtime

2020-05-25 Thread Annemarie Burger
Hi, Thanks for your response! I can't seem to get past a "java.net.ConnectException: Connection refused" though. Below is the relevant code and exception, any idea what I'm doing wrong? Configuration config = new Configuration(); config.setString(JobManagerOptions.ADDRESS, "localhost"); confi

Re: Timeout Callbacks issue -Flink

2020-05-25 Thread Jaswin Shah
OMG!!! If this is the case Dawid, I think I am solving the problem in an incorrect way. Here I would like to explain my use-case: Basically, I have two streams 1 and 2, and I am storing events of stream1 to MapState and when ever events are arrived from stream2, I check in MapState if that event

Re: [EXTERNAL] Re: Memory growth from TimeWindows

2020-05-25 Thread Aljoscha Krettek
Just to double check: the issue was resolved by using a different GC? Because the default GC was too "lazy". ;-) Best, Aljoscha On 21.05.20 18:09, Slotterback, Chris wrote: For those who are interested or googling the mail archives in 8 months, the issue was garbage collection related. The d

Re: Performance impact of many open windows at the same time

2020-05-25 Thread Aljoscha Krettek
Hi, I don't think this will immediately degrade performance. State is essentially stored in a HashMap (for the FileStateBackend) or RocksDB (for the RocksDB backend). If these data structures don't degrade with size then your performance also shouldn't degrade. There are of course some effec

Re: Timeout Callbacks issue -Flink

2020-05-25 Thread Dawid Wysakowicz
I don't necessarily know how can I better describe it. The MapState/ValueState is *always implicitly scoped to the current key*. It will be scoped this way in all functions of the operator. In processElement1, processElement2, onTimer. It will always hold whatever you stored there for the current k

Re: Timeout Callbacks issue -Flink

2020-05-25 Thread Jaswin Shah
correcting: By ctx.getCurrentKey()=> I meant to get the key registered at a timestamp when callback timeout for a key was registered. This was a reason to use getCurrentKey(). So, with that I am fetching the events registered at that point of time for which till the current moment I didn't receiv

Re: Timeout Callbacks issue -Flink

2020-05-25 Thread Jaswin Shah
By ctx.getCurrentKey()=> I meant to get the key registered at a timestamp when callback timeout for a key was registered. This was a reason to use getCurrentKey(). So, with that I am fetching the events registered at that point of time for which till the current moment I didn't receive the callba

Re: Timeout Callbacks issue -Flink

2020-05-25 Thread Dawid Wysakowicz
You are right that a ValueState can keep a single value at any point of time. It is scoped to the current key of the operator though. So it keeps a single value for a key. If your cartMessage.createJoinStringCondition()/pgMessage.createJoinStringCondition()/new CartJoinColumnsSelector()/new PGJoi

Re: Timeout Callbacks issue -Flink

2020-05-25 Thread Jaswin Shah
One question Dawid: If I maintain a ValueState of Maps if this is what you were referring to: 1. In my use case, I am registering the timeout for a key when I store that in state. i.e. If I do not receive the matching event with same key from other stream, then I would like to receive a callb

Re: Timeout Callbacks issue -Flink

2020-05-25 Thread Jaswin Shah
If I understand correctly, you are trying to tell that I should have valueState of Map? From: Jaswin Shah Sent: 25 May 2020 14:43 To: Dawid Wysakowicz ; user@flink.apache.org ; ankit.sing...@paytm.com ; isha.sing...@paytm.com Subject: Re: Timeout Callbacks issue

Re: Timeout Callbacks issue -Flink

2020-05-25 Thread Jaswin Shah
Thanks for responding Dawid. I would like to know more about MapState solution you talked about. As per my understanding valueState maintains a single value at any point of time. So, here what I want to maintain is the first streams information until matching event have not found in second strea

Re: In consistent Check point API response

2020-05-25 Thread Vijay Bhaskar
Thanks Yun. Here is the problem i am facing: I am using jobs/:jobID/checkpoints API to recover the failed job. We have the remote manager which monitors the jobs. We are using "restored" field of the API response to get the latest check point file to use. Its giving correct checkpoint file for

Re: Stateful-fun-Basic-Hello

2020-05-25 Thread Tzu-Li (Gordon) Tai
Hi, It seems like you are trying to package your Stateful Functions app as a Flink job, and submit that to an existing cluster. If that indeed is the case, Stateful Functions apps have some required confogurations that need to be set via the flink-conf.yaml file for your existing cluster. Please

Re: Timeout Callbacks issue -Flink

2020-05-25 Thread Dawid Wysakowicz
Hi Jaswin, I can't see any obvious problems in your code. It looks rather correct. What exactly do you mean that "callback is coming earlier than registered callback timeout"? Could you explain that with some examples? As for the different timezones. Flink does not make any assumptions on the ti

close file on job crash

2020-05-25 Thread Laurent Exsteens
Hello, we had to implement a specific source to read files in a certain way. The files we read are a NAS mounted through NFS. If an error occurs in a map after this specific source when the file is still being read, the file is never closed, resulting in the task manager keeping the file open (ap

Re: How do I get the IP of the master and slave files programmatically in Flink?

2020-05-25 Thread Yangze Guo
I'm not quite familiar with that. I'd like to cc @Aljoscha Krettek here. Best, Yangze Guo On Mon, May 25, 2020 at 4:39 PM Felipe Gutierrez wrote: > > ok, I see. > > Do you suggest a better approach to send messages from the JobManager > to the TaskManagers and my specific operator? > > Thanks,

Re: How do I get the IP of the master and slave files programmatically in Flink?

2020-05-25 Thread Felipe Gutierrez
ok, I see. Do you suggest a better approach to send messages from the JobManager to the TaskManagers and my specific operator? Thanks, Felipe -- -- Felipe Gutierrez -- skype: felipe.o.gutierrez -- https://felipeogutierrez.blogspot.com On Mon, May 25, 2020 at 4:23 AM Yangze Guo wrote: > > Glad t

Re: java.lang.NoSuchMethodError while writing to Kafka from Flink

2020-05-25 Thread Chesnay Schepler
Please double-check that your distribution and application jar were built against the same Flink version. This looks related to a binary-compatibility issues reporter in FLINK-13586 .

Re: Apache Flink - Question about application restart

2020-05-25 Thread Yang Wang
Just share some additional information. When deploying Flink application on Yarn and it exhausted restart policy, then the whole application will failed. If you start another instance(Yarn application), even the high availability is configured, we could not recover from the latest checkpoint becau

Re: In consistent Check point API response

2020-05-25 Thread Yun Tang
Hi Vijay If I understand correct, do you mean your last "restored" checkpoint is null via REST api when the job failed 6 times and then recover successfully with another several successful checkpoints? First of all, if your job just recovered successfully, can you observe the "last restored" c

[ANNOUNCE] Weekly Community Update 2020/21

2020-05-25 Thread Konstantin Knauf
Dear community, happy to share this week's community update! Flink Development == * [releases] Zhijiang has published a first release candidate for Apache Flink 1.11.0. This is a "preview-only" release candidate to facilitate current testing efforts. There will not be a vote on this

Re: How to control the banlence by using FlinkKafkaConsumer.setStartFromTimestamp(timestamp)

2020-05-25 Thread Jary Zhen
Hi, dinesh , thanks for your reply. For example, there are two topics, topic A produces 1 record per second and topic B produces 3600 records per second. If I set kafka consume config like this: max.poll.records: “3600" max.poll.interval.ms: "1000”) , which means I can get the whole re