Re: Apache Flink - High Available K8 Setup Wrongly Marked Dead TaskManager

2019-11-28 Thread Eray Arslan
Hi Chesnay,
Thank you for reply.
I figure out that issue with using livenessProbe on Task Manager
deployment. But I think it is still a workaround.

I am using Flink 1.9.1 (currently its latest version)
And I am getting "connection unexpectedly closed by remote task manager"
error on Task Manager.
Because of that cluster losing Task Manager and job cannot restart cause
not enough task manager on cluster.

Thanks

Chesnay Schepler , 28 Kas 2019 Per, 18:55 tarihinde
şunu yazdı:

> The akka.watch configuration options haven't been used for a while
> irrespective of FLINK-13883 (but I can't quite tell atm since when).
>
> Let's start with what version of Flink you are using, and what the
> taskmanager/jobmanager logs say.
>
> On 25/11/2019 12:05, Eray Arslan wrote:
> > Hi,
> >
> > I have some trouble with my HA K8 cluster.
> > Current my Flink application has infinite stream. (With 12 parallelism)
> > After few days I am losing my task managers. And they never reconnect
> > to job manager.
> > Because of this, application cannot get restored with restart policy.
> >
> > I did few searches and I found “akka.watch” configurations. But they
> > didn’t work.
> > I think this issue will solve the problem. Am I right?
> > (https://issues.apache.org/jira/browse/FLINK-13883). Is there any
> > workaround I can apply to solve this problem?
> >
> > Thanks
> >
> > Eray
> >
> >
>
>

-- 

*Eray Arslan*
Yazılım Uzmanı  / Software Specialists
eray.ars...@hepsiburada.com

*+90 537 738 14 34*
Trump Towers Mecidiyeköy Yolu No: 12 Kule 2, Mecidiyeköy - Şişli / İstanbul
- Türkiye


Re: ProcessFunction collect and close, when to use?

2019-11-28 Thread bupt_ljy
Hi Shuwen,


> When to call close() ? After every element processed? Or on 
> ProcessFunction.close() ? Or never to use it? 


IMO, the #close() function is used to manage the lifecycle of #Collector 
instead of a single element. I think it should not be called in user function 
unless you have some special use cases(no use case comes to my mind).


> If it's been closed already, can the collector collect() anymore data?


No. if it’s closed, it usually means the writer is closed or maybe the operator 
is closed.


> If processElement received a message but consider it as discard and does not 
> call collect(), will this block checkpoint's barrier until next element was 
> sent by collect() ?


No. 




Best,
Jiayi Liao




 Original Message 
Sender: shuwen zhou
Recipient: user
Date: Friday, Nov 29, 2019 12:29
Subject: ProcessFunction collect and close, when to use?


Hi Community,
In ProcessFunction class, ProcessElement function, there is a Collector that 
has 2 method: collect() and close(). I would like to know:


1. When to call close() ? After every element processed? Or on 
ProcessFunction.close() ? Or never to use it? If it's been closed already, can 
the collector collect() anymore data?
2. If processElement received a message but consider it as discard and does not 
call collect(), will this block checkpoint's barrier until next element was 
sent by collect() ?




-- 

Best Wishes,
Shuwen Zhou

ProcessFunction collect and close, when to use?

2019-11-28 Thread shuwen zhou
Hi Community,
In ProcessFunction class, ProcessElement function, there is a Collector
that has 2 method: collect() and close(). I would like to know:

1. When to call close() ? After every element processed? Or
on ProcessFunction.close() ? Or never to use it? If it's been closed
already, can the collector collect() anymore data?
2. If processElement received a message but consider it as discard and does
not call collect(), will this block checkpoint's barrier until next element
was sent by collect() ?


-- 
Best Wishes,
Shuwen Zhou


Re: Auto Scaling in Flink

2019-11-28 Thread Caizhi Weng
Hi Akash,

Flink doesn't support auto scaling in core currently, it may be supported
in the future, when the new scheduling architecture is implemented
https://issues.apache.org/jira/browse/FLINK-10407 .

You can do it externally by cancel the job with a savepoint, update the
parallelism, and restart the job, according to the rate of data. like what
pravega suggests in the doc:
http://pravega.io/docs/latest/key-features/#auto-scaling.

vino yang  于2019年11月29日周五 上午11:12写道:

> Hi Akash,
>
> You can use Pravega connector to integrate with Flink, the source code is
> here[1].
>
> In short, relying on its rescalable state feature[2] flink supports
> scalable streaming jobs.
>
> Currently, the mainstream solution about auto-scaling is Flink + K8S, I
> can share some resources with you[3].
>
> Best,
> Vino
>
> [1]: https://github.com/pravega/flink-connectors
> [2]:
> https://flink.apache.org/features/2017/07/04/flink-rescalable-state.html
> [3]:
> https://www.slideshare.net/FlinkForward/flink-forward-san-francisco-2019-scaling-a-realtime-streaming-warehouse-with-apache-flink-parquet-and-kubernetes-aditi-verma-ramesh-shanmugam
>
> Akash Goel  于2019年11月29日周五 上午9:52写道:
>
>> Hi,
>>
>> Does Flunk support auto scaling. I read that it is supported using
>> pravega? Is it incorporated in any version.
>>
>> Thanks,
>> Akash Goel
>>
>


Re: Auto Scaling in Flink

2019-11-28 Thread vino yang
Hi Akash,

You can use Pravega connector to integrate with Flink, the source code is
here[1].

In short, relying on its rescalable state feature[2] flink supports
scalable streaming jobs.

Currently, the mainstream solution about auto-scaling is Flink + K8S, I can
share some resources with you[3].

Best,
Vino

[1]: https://github.com/pravega/flink-connectors
[2]:
https://flink.apache.org/features/2017/07/04/flink-rescalable-state.html
[3]:
https://www.slideshare.net/FlinkForward/flink-forward-san-francisco-2019-scaling-a-realtime-streaming-warehouse-with-apache-flink-parquet-and-kubernetes-aditi-verma-ramesh-shanmugam

Akash Goel  于2019年11月29日周五 上午9:52写道:

> Hi,
>
> Does Flunk support auto scaling. I read that it is supported using
> pravega? Is it incorporated in any version.
>
> Thanks,
> Akash Goel
>


Auto Scaling in Flink

2019-11-28 Thread Akash Goel
Hi,

Does Flunk support auto scaling. I read that it is supported using pravega?
Is it incorporated in any version.

Thanks,
Akash Goel


Re: How to recover state from savepoint on embedded mode?

2019-11-28 Thread Arvid Heise
Just to add up, if you use LocalStreamEnvironment, you can pass a
configuration and you can set "execution.savepoint.path" to point to your
savepoint.

Best,

Arvid

On Wed, Nov 27, 2019 at 1:00 PM Congxian Qiu  wrote:

> Hi,
>
> You can recovery from checkpoint/savepoint if JM can read from the
> given path. no math which mode the job is running on.
>
> Best,
> Congxian
>
>
> Reo Lei  于2019年11月26日周二 下午12:18写道:
>
>>
>>
>> -- Forwarded message -
>> 发件人: Reo Lei 
>> Date: 2019年11月26日周二 上午9:53
>> Subject: Re: How to recover state from savepoint on embedded mode?
>> To: Yun Tang 
>>
>>
>> Hi Yun,
>> Thanks for your reply. what I say the embedded mode is the whole flink
>> cluster and job, include jobmanager, taskmanager and the job application
>> itself, running within a local JVM progress, which is use the "
>> LocalStreamEnvironment" within the job. And the start command look like
>> this: "java -Xmx512M -XX:... -Dlog.file=... -cp flink-job.jar
>> com.a.b.c.MyJob > /dev/null &"
>>
>> why I am not use the standalnoe mode to run the job is because the
>> running env haven't zookeeper, and would not install the zookeeper. So I
>> need to depend on the embedded mode to run my job.
>>
>> BR,
>> Reo
>>
>> Yun Tang  于2019年11月26日周二 上午2:38写道:
>>
>>> What is the embedded mode mean here? If you refer to SQL embedded mode,
>>> you cannot resume from savepoint now; if you refer to local standalone
>>> cluster, you could use `bin/flink run -s` to resume on a local cluster.
>>>
>>>
>>>
>>> Best
>>>
>>> Yun Tang
>>>
>>>
>>>
>>> *From: *Reo Lei 
>>> *Date: *Tuesday, November 26, 2019 at 12:37 AM
>>> *To: *"user@flink.apache.org" 
>>> *Subject: *How to recover state from savepoint on embedded mode?
>>>
>>>
>>>
>>> Hi,
>>>
>>> I have a job need running on embedded mode, but need to init some rule
>>> data from a database before start. So I used the State Processor API to
>>> construct my state data and save it to the local disk. When I want to used
>>> this savepoint to recover my job, I found resume a job from a savepoint
>>> need to use the command `bin/flink run -s :savepointPath *[*:runArgs]`
>>> to submit a job to flink cluster. That is mean the job is run on remote
>>> mode, not embedded mode.
>>>
>>>
>>>
>>> And I was wondering why I can't resume a job from a savepoint on
>>> embedded mode. If that is possible, what should I do?
>>>
>>> BTW, if we can not  resume a job from a savepoint on embedded mode, how
>>> to know the savepoint is constructed correctly in develop environment and
>>> use idea to debug it?
>>>
>>>
>>>
>>> BR,
>>>
>>> Reo
>>>
>>>
>>>
>>


Re: Flink 'Job Cluster' mode Ui Access

2019-11-28 Thread Chesnay Schepler

Could you try accessing :/#/overview ?

The REST API is obviously accessible, and hence the WebUI should be too.

How did you setup the session cluster? Are you using some custom Flink 
build or something, which potentially excluded flink-runtime-web from 
the classpath?


On 28/11/2019 10:02, Jatin Banger wrote:

Hi,

I checked the log file there is no error.
And I checked the pods internal ports by using rest api.

# curl : 4081
{"errors":["Not found."]}
4081 is the Ui port

# curl :4081/config
{"refresh-interval":3000,"timezone-name":"Coordinated Universal 
Time","timezone-offset":0,"flink-version":"","flink-revision":"ceba8af 
@ 11.02.2019 @ 22:17:09 CST"}


# curl :4081/jobs
{"jobs":[{"id":"___job_Id_","status":"RUNNING"}]}

Which shows the state of the job as running.

What else can we do ?

Best regards,
Jatin

On Thu, Nov 28, 2019 at 1:28 PM vino yang > wrote:


Hi Jatin,

Flink web UI does not depend on any deployment mode.

You should check if there are error logs in the log file and the
job status is running state.

Best,
Vino

Jatin Banger mailto:bangerjatinrm...@gmail.com>> 于2019年11月28日周四
下午3:43写道:

Hi,

It seems there is Web Ui for Flink Session cluster, But for
Flink Job Cluster it is Showing

{"errors":["Not found."]}

Is it the expected behavior for Flink Job Cluster Mode ?

Best Regards,
Jatin





Re: ***UNCHECKED*** Error while confirming Checkpoint

2019-11-28 Thread Piotr Nowojski
Thank you all for investigation/reporting/discussion. I have merged an older PR 
[1] that was fixing this issue which was previously rejected as we didn’t 
realise this is a production issue.

I have merged it and issue should be fixed in Flink 1.10, 1.9.2 and 1.8.3 
releases.

Piotrek

[1] https://github.com/apache/flink/pull/6723 


> On 28 Nov 2019, at 02:52, Tony Wei  wrote:
> 
> Hi Piotrek,
> 
> There was already an issue [1] and PR for this thread. Should we mark it as 
> duplicated or related issue?
> 
> Best,
> Tony Wei
> 
> [1] https://issues.apache.org/jira/browse/FLINK-10377 
> 
> Piotr Nowojski mailto:pi...@ververica.com>> 於 
> 2019年11月28日 週四 上午12:17寫道:
> Hi Tony,
> 
> Thanks for the explanation. Assuming that’s what’s happening, then I agree, 
> this checkStyle should be removed. I created a ticket for this issue 
> https://issues.apache.org/jira/browse/FLINK-14979 
> 
> 
> Piotrek
> 
>> On 27 Nov 2019, at 16:28, Tony Wei > > wrote:
>> 
>> Hi Piotrek,
>> 
>> The case here was that the first snapshot is a savepoint. I know that if the 
>> following checkpoint succeeded before the previous one, the previous one 
>> will be subsumed by JobManager. However, if that previous one is a 
>> savepoint, it won't be subsumed. That leads to the case that Chesney said. 
>> The following checkpoint succeeded before the previous savepoint, handling 
>> both of their pending transaction, but savepoint still succeeded and sent 
>> the notification to each TaskManager. That led to this exception. Could you 
>> double check if this is the case? Thank you. 
>> 
>> Best,
>> Tony Wei
>> 
>> Piotr Nowojski mailto:pi...@ververica.com>> 於 
>> 2019年11月27日 週三 下午8:50 寫道:
>> Hi,
>> 
>> Maybe Chesney you are right, but I’m not sure. TwoPhaseCommitSink was based 
>> on Pravega’s sink for Flink, which was implemented by Stephan, and it has 
>> the same logic [1]. If I remember the discussions with Stephan/Till, the way 
>> how Flink is using Akka probably guarantees that messages will be always 
>> delivered, except of some failure, so `notifyCheckpointComplete` could be 
>> missed probably only if a failure happens between snapshot and arrival of 
>> the notification. Receiving the same notification twice should be impossible 
>> (based on the knowledge passed to me from Till/Stephan).
>> 
>> However, for one thing, if that’s possible, then the code should adjusted 
>> accordingly. On the other hand, maybe there is no harm in relaxing the 
>> contract? Even if we miss this notification (because of some re-ordering?), 
>> next one will subsume the missed one and commit everything. 
>> 
>> Piotrek
>> 
>> [1] 
>> https://github.com/pravega/flink-connectors/blob/master/src/main/java/io/pravega/connectors/flink/FlinkPravegaWriter.java#L567
>>  
>> 
>> 
>>> On 27 Nov 2019, at 13:02, Chesnay Schepler >> > wrote:
>>> 
>>> This looks to me like the TwoPhaseCommitSinkFunction is a bit too strict. 
>>> The notification for complete checkpoints is not reliable; it may be late, 
>>> not come at all, possibly even in different order than expected.
>>> 
>>> As such, if you a simple case of snapshot -> snapshot -> notify -> notify 
>>> the sink will always fail with an exception.
>>> 
>>> What it should do imo is either a) don't check that there is a pending 
>>> transaction or b) track the highest checkpoint id received and optionally 
>>> don't fail if the notification is for an older CP.
>>> 
>>> @piotr WDYT?
>>> 
>>> On 27/11/2019 08:59, Tony Wei wrote:
 Hi, 
 
 As the follow up, it seem that savepoint can't be subsumed, so that its 
 notification could still be send to each TMs.
 Is this a bug that need to be fixed in TwoPhaseCommitSinkFunction?
 
 Best,
 Tony Wei
 
 Tony Wei mailto:tony19920...@gmail.com>> 於 
 2019年11月27日 週三 下午3:43寫道:
 Hi, 
 
 I want to raise this question again, since I have had this exception on my 
 production job.
 
 The exception is as follows
  
 2019-11-27 14:47:29
  
 java.lang.RuntimeException: Error while confirming checkpoint
 at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1205)
 at 
 java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
 at java.util.concurrent.FutureTask.run(FutureTask.java:266)
 at 
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
 at 
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
 at java.lang.Thread.run(Thread.java:748)
 Caused by: java.lang.IllegalStateException: checkpoint completed, but no 
 transaction 

Re: Apache Flink - High Available K8 Setup Wrongly Marked Dead TaskManager

2019-11-28 Thread Chesnay Schepler
The akka.watch configuration options haven't been used for a while 
irrespective of FLINK-13883 (but I can't quite tell atm since when).


Let's start with what version of Flink you are using, and what the 
taskmanager/jobmanager logs say.


On 25/11/2019 12:05, Eray Arslan wrote:

Hi,

I have some trouble with my HA K8 cluster.
Current my Flink application has infinite stream. (With 12 parallelism)
After few days I am losing my task managers. And they never reconnect 
to job manager.

Because of this, application cannot get restored with restart policy.

I did few searches and I found “akka.watch” configurations. But they 
didn’t work.
I think this issue will solve the problem. Am I right? 
(https://issues.apache.org/jira/browse/FLINK-13883). Is there any 
workaround I can apply to solve this problem?


Thanks

Eray






Re: Flink 1.9.1 KafkaConnector missing data (1M+ records)

2019-11-28 Thread Kostas Kloudas
Hi Harrison,

One thing to keep in mind is that Flink will only write files if there
is data to write. If, for example, your partition is not active for a
period of time, then no files will be written.
Could this explain the fact that dt 2019-11-24T01 and 2019-11-24T02
are entirely skipped?

In addition, for the "duplicates", it would help if you could share a
bit more information about your BucketAssigner.
How are these names assigned to the files and what does TT stand for?
Can it be that there are a lot of events for partition 4 that fill up
2 part files for that duration? I am
asking because the counter of the 2 part files differ.

Cheers,
Kostas

On Tue, Nov 26, 2019 at 1:09 AM Harrison Xu  wrote:
>
> Hello,
>
> We're seeing some strange behavior with flink's KafkaConnector010 (Kafka 
> 0.10.1.1) arbitrarily skipping data.
>
> Context
> KafkaConnector010 is used as source, and StreamingFileSink/BulkPartWriter 
> (S3) as sink with no intermediate operators. Recently, we noticed that 
> millions of Kafka records were missing for one topic partition (this job is 
> running for 100+ topic partitions, and such behavior was only observed for 
> one). This job is run on YARN, and hosts were healthy with no hardware faults 
> observed. No exceptions in jobmanager or taskmanager logs at this time.
>
> How was this detected?
> As a sanity check, we dual-write Kafka metadata (offsets) to a separate 
> location in S3, and have monitoring to ensure that written offsets are 
> contiguous with no duplicates.
> Each Kafka record is bucketed into hourly datetime partitions (UTC) in S3.
>
> (Condensed) Taskmanager logs
> 2019-11-24 02:36:50,140 INFO  
> org.apache.flink.fs.s3.common.writer.S3Committer  - Committing 
> kafka_V2/meta/digest_features/dt=2019-11-24T00/partition_4/part-12-5252 with 
> MPU ID 3XG...
> 2019-11-24 02:41:27,966 INFO  
> org.apache.flink.fs.s3.common.writer.S3Committer  - Committing 
> kafka_V2/meta/digest_features/dt=2019-11-24T00/partition_4/part-12-5253 with 
> MPU ID 9MW...
> 2019-11-24 02:46:29,153 INFO  
> org.apache.flink.fs.s3.common.writer.S3Committer  - Committing 
> kafka_V2/meta/digest_features/dt=2019-11-24T00/partition_4/part-12-5254 with 
> MPU ID 7AP...
> 2019-11-24 02:51:32,602 INFO  
> org.apache.flink.fs.s3.common.writer.S3Committer  - Committing 
> kafka_V2/meta/digest_features/dt=2019-11-24T00/partition_4/part-12-5255 with 
> MPU ID xQU...
> 2019-11-24 02:56:35,183 INFO  
> org.apache.flink.fs.s3.common.writer.S3Committer  - Committing 
> kafka_V2/meta/digest_features/dt=2019-11-24T00/partition_4/part-12-5256 with 
> MPU ID pDL...
> 2019-11-24 03:01:26,059 INFO  
> org.apache.flink.fs.s3.common.writer.S3Committer  - Committing 
> kafka_V2/meta/digest_features/dt=2019-11-24T00/partition_4/part-12-5257 with 
> MPU ID Itf...
> 2019-11-24 03:01:26,510 INFO  
> org.apache.flink.fs.s3.common.writer.S3Committer  - Committing 
> kafka_V2/meta/digest_features/dt=2019-11-24T03/partition_4/part-12-5263 with 
> MPU ID e3l...
> 2019-11-24 03:06:26,230 INFO  
> org.apache.flink.fs.s3.common.writer.S3Committer  - Committing 
> kafka_V2/meta/digest_features/dt=2019-11-24T03/partition_4/part-12-5264 with 
> MPU ID 5z4...
> 2019-11-24 03:11:22,711 INFO  
> org.apache.flink.fs.s3.common.writer.S3Committer  - Committing 
> kafka_V2/meta/digest_features/dt=2019-11-24T03/partition_4/part-12-5265 with 
> MPU ID NfP...
>
> Two observations stand out from the above logs:
> - Datetime 2019-11-24T01 and 2019-11-24T02 are entirely skipped, resulting in 
> millions of missing offsets. They are never written in future commits (and 
> data in S3 shows this).
> - Two commits for the same topic partition ("digest_features", partition 4), 
> happened nearly simultaneously on 2019-11-24 03:03, despite our commit 
> interval being set at 5 minutes. Why was the same TopicPartition read from 
> and committed twice in such a short interval?
>
> Would greatly appreciate if anyone is able to shed light on this issue. Happy 
> to provide full logs if needed.
> Thanks
>
>
>
>
>
>
>
>


Re: Streaming Files to S3

2019-11-28 Thread Arvid Heise
Hi Li,

S3 file sink will write data into prefixes, with as many part-files as the
degree of parallelism. This structure comes from the good ol' Hadoop days,
where an output folder also contained part-files and is independent of S3.
However, each of the part-files will be uploaded in a multipart fashion,
which is S3 specific.

For your questions:
1. It will create one part-file for each parallel instance of the window
operator. If you run the window operator and sink with parallelism of 1,
you will receive exactly 1 file as you wished.
2. RollingPolicy can indeed be used, but again it's on part-file level. So
you would need to use parallelism of 1 again. Also RollingPolicy will
signal when the next part is started. So you probably need to roll when the
file is something like 99 MB and hope that the last record will not go over
100 MB. Alternatively, you live with some files being a tad larger than 100
MB.
3. Yes, exactly. If you also want to use presto s3 system (in the future)
for checkpoints, it's safer to specify "s3a://file".

Best,

Arvid

On Tue, Nov 26, 2019 at 2:59 AM Li Peng  wrote:

> Hey folks, I'm trying to stream large volume data and write them as csv
> files to S3, and one of the restrictions is to try and keep the files to
> below 100MB (compressed) and write one file per minute. I wanted to verify
> with you guys regarding my understanding of StreamingFileSink:
>
> 1. From the docs, StreamingFileSink will use multipart upload with s3, so
> even with many workers writing to s3, it will still output only one file
> for all of them for each time window, right?
> 2. StreamingFileSink.forRowFormat can be configured to write individual
> rows and then commit to disk as per the above rules, by specifying a
> RollingPolicy with the file size limit and the rollover interval, correct?
> And the limit and the interval applies to the entire file, not to each part
> file?
> 3. To write to s3, is it enough to just add flink-s3-fs-hadoop as a
> dependency and specify the file path as "s3://file"?
>
> Thanks,
> Li
>


Re: Side output from Flink Sink

2019-11-28 Thread Robert Metzger
What do you mean by "from within a sink"? Do you have a custom sink?

If you want to write to different Kafka topics from the same sink, you can
do that using a custom KafkaSerializationSchema. It allows you to return a
ProducerRecord with a custom target topic set. (A Kafka sink can write to
multiple topics).

Why can't you split the stream into "good" and "bad" records before the
sink, and then define two different sinks?


On Thu, Nov 28, 2019 at 1:41 PM Arvid Heise  wrote:

> Hi Victor,
>
> you could implement your own SinkFunction that wraps the KafkaProducer.
> However, since you may need to check if the write operation is successful,
> you probably need to subclass KafkaProducer and implement your own error
> handling.
>
> Best,
>
> Arvid
>
> On Mon, Nov 25, 2019 at 7:51 AM vino yang  wrote:
>
>> Hi Victor,
>>
>> Currently, it seems the "side output" feature does not been supported by
>> the streaming sink.
>>
>> IMO, you can customize your sink via selecting different types of events
>> to output to different places.
>>
>> WDYT?
>>
>> Best,
>> Vino
>>
>> Victor Villa Dev  于2019年11月25日周一 下午1:37写道:
>>
>>> Hi Vino,
>>>
>>> Thanks a lot for your reply!
>>> However I'm not quite sure my question was clear enough.
>>> I'm aware I can create/get side outputs using output tags from within
>>> operators (Process Functions) as documentation also states.
>>>
>>> The main point in my question is wether creating a sideo output is even
>>> possible from within a Sink?
>>> if so, would you mind pointing to an examples on how to correctly get
>>> the context necessary to add the "output" from within the "invoke()" method.
>>> In case it isn't what are the usual/suggested strategies?
>>>
>>> I know the Sink is usually the "last" portion of a data stream as its
>>> name indicates, but I was wondering if for some reason something can't be
>>> sinked (after retries, etc), what is the usual way to deal with such cases?
>>>
>>> Thanks again for your kind support.
>>>
>>> On 2019/11/25 02:23:15, vino yang  wrote:
>>> > Hi Victor,
>>> >
>>> > Firstly, you can get your side output stream via OutputTag. Please
>>> refer to
>>> > the official documentation[1].
>>> > Then, specify a sink for your side output stream. Of course, you can
>>> > specify a Kafka sink.
>>> >
>>> > Best,
>>> > Vino
>>> >
>>> > [1]:
>>> >
>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/side_output.html
>>> >
>>> > Victor Villa Dev  于2019年11月25日周一 上午2:27写道:
>>> >
>>> > > I'd like know if there's a way to generate a side output and/or sink
>>> to an
>>> > > alternate kafka topic from within a Sink?
>>> > > The use case is the datastream sinks to a storage and on particular
>>> failed
>>> > > attempts I'd like to deadletter to a kafka topic.
>>> > > Any suggestions?
>>> > >
>>> > > Thanks
>>> > >
>>> >
>>>
>>


Re: 使用FsStateBackend导致的oom

2019-11-28 Thread Congxian Qiu
Hi

Checkpooint 不会导致 State 被清理。Checkpoint 只是将 State 备份到一个远程存储,供后续恢复使用。如果有大量
CopyOnWriteStateTable$StateTableEntry 的话,首先需要确认,你作业真的会有大量的 state 存在吗?另外
Checkpoint 相关的配置是什么,OOM 前是否有 checkpoint 失败这类的

Best,
Congxian


李茂伟  于2019年11月28日周四 下午9:02写道:

> hi all ~
>  
> 我在使用FsStateBackend,这种方式checkpoint时,导致oom,查看堆内存中有大量的CopyOnWriteStateTable$StateTableEntry对象没有被清理。请问在使用FsStateBackend这种方式checkpoint后,内存中的state会被清理吗?


使用FsStateBackend导致的oom

2019-11-28 Thread 李茂伟
hi all ~
  
我在使用FsStateBackend,这种方式checkpoint时,导致oom,查看堆内存中有大量的CopyOnWriteStateTable$StateTableEntry对象没有被清理。请问在使用FsStateBackend这种方式checkpoint后,内存中的state会被清理吗?

Re: Flink Kudu Connector

2019-11-28 Thread Arvid Heise
Hi Rahul,

can you check if the KuduSink tests of Apache Bahir shed any light? [1]

Best,

Arvid

[1]
https://github.com/apache/bahir-flink/blob/55240a993df999d66aefa36e587be719c29be92a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/streaming/KuduSinkTest.java


On Mon, Nov 25, 2019 at 11:53 AM vino yang  wrote:

> Hi Rahul,
>
> Only found some resources from the Internet you can consider.[1][2]
>
> Best,
> Vino
>
> [1]: https://bahir.apache.org/docs/flink/current/flink-streaming-kudu/
> [2]:
> https://www.slideshare.net/0xnacho/apache-flink-kudu-a-connector-to-develop-kappa-architectures
>
> Rahul Jain  于2019年11月25日周一 下午6:32写道:
>
>> Hi,
>>
>> We are trying to use the Flink Kudu connector. Is there any documentation
>> available that we can read to understand how to use it ?
>>
>> We found some sample code but that was not very helpful.
>>
>> Thanks,
>> -rahul
>>
>


Re: Idiomatic way to split pipeline

2019-11-28 Thread Arvid Heise
Hi Avi,

it seems to me that you are not really needing any split feature. As far as
I can see in your picture you want to apply two different windows on the
same input data.

In that case you simply use two different subgraphs.

stream = ...

stream1 = stream.window(...).addSink()

stream2 = stream.window(...).addSink()

In Flink, you can compose arbitrary directed acyclic graphs, so consuming
the output of one operator on several downstream operators is completely
normal.

Best,

Arvid

On Mon, Nov 25, 2019 at 10:50 AM Avi Levi  wrote:

> Thanks, I'll check it out.
>
> On Mon, Nov 25, 2019 at 11:46 AM vino yang  wrote:
>
>> *This Message originated outside your organization.*
>> --
>> Hi Avi,
>>
>> The side output provides a superset of split's functionality. So anything
>> can be implemented via split also can be implemented via side output.[1]
>>
>> Best,
>> Vino
>>
>> [1]:
>> https://stackoverflow.com/questions/51440677/apache-flink-whats-the-difference-between-side-outputs-and-split-in-the-data
>>
>> Avi Levi  于2019年11月25日周一 下午5:32写道:
>>
>>> Thank you, for your quick reply. I appreciate that.  but this it not
>>> exactly "side output" per se. it is simple splitting. IIUC The side output
>>> is more for splitting the records buy something the differentiate them
>>> (latnes , value etc' ) . I thought there is more idiomatic but if this is
>>> it, than I will go with that.
>>>
>>> On Mon, Nov 25, 2019 at 10:42 AM vino yang 
>>> wrote:
>>>
 *This Message originated outside your organization.*
 --
 Hi Avi,

 As the doc of DataStream#split said, you can use the "side output"
 feature to replace it.[1]

 [1]:
 https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/side_output.html

 Best,
 Vino

 Avi Levi  于2019年11月25日周一 下午4:12写道:

> Hi,
> I want to split the output of one of the operators to two pipelines.
> Since the *split* method is deprecated, what is the idiomatic way to
> do that without duplicating the operator ?
>
> [image: Screen Shot 2019-11-25 at 10.05.38.png]
>
>
>


?????? JobGraphs not cleaned up in HA mode

2019-11-28 Thread ??????
the chk-* directory is not found , I think the misssing because of jobmanager 
removes it automaticly , but why it still in zookeeper?







----
??:"Vijay Bhaskar"http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Side output from Flink Sink

2019-11-28 Thread Arvid Heise
Hi Victor,

you could implement your own SinkFunction that wraps the KafkaProducer.
However, since you may need to check if the write operation is successful,
you probably need to subclass KafkaProducer and implement your own error
handling.

Best,

Arvid

On Mon, Nov 25, 2019 at 7:51 AM vino yang  wrote:

> Hi Victor,
>
> Currently, it seems the "side output" feature does not been supported by
> the streaming sink.
>
> IMO, you can customize your sink via selecting different types of events
> to output to different places.
>
> WDYT?
>
> Best,
> Vino
>
> Victor Villa Dev  于2019年11月25日周一 下午1:37写道:
>
>> Hi Vino,
>>
>> Thanks a lot for your reply!
>> However I'm not quite sure my question was clear enough.
>> I'm aware I can create/get side outputs using output tags from within
>> operators (Process Functions) as documentation also states.
>>
>> The main point in my question is wether creating a sideo output is even
>> possible from within a Sink?
>> if so, would you mind pointing to an examples on how to correctly get the
>> context necessary to add the "output" from within the "invoke()" method.
>> In case it isn't what are the usual/suggested strategies?
>>
>> I know the Sink is usually the "last" portion of a data stream as its
>> name indicates, but I was wondering if for some reason something can't be
>> sinked (after retries, etc), what is the usual way to deal with such cases?
>>
>> Thanks again for your kind support.
>>
>> On 2019/11/25 02:23:15, vino yang  wrote:
>> > Hi Victor,
>> >
>> > Firstly, you can get your side output stream via OutputTag. Please
>> refer to
>> > the official documentation[1].
>> > Then, specify a sink for your side output stream. Of course, you can
>> > specify a Kafka sink.
>> >
>> > Best,
>> > Vino
>> >
>> > [1]:
>> >
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/side_output.html
>> >
>> > Victor Villa Dev  于2019年11月25日周一 上午2:27写道:
>> >
>> > > I'd like know if there's a way to generate a side output and/or sink
>> to an
>> > > alternate kafka topic from within a Sink?
>> > > The use case is the datastream sinks to a storage and on particular
>> failed
>> > > attempts I'd like to deadletter to a kafka topic.
>> > > Any suggestions?
>> > >
>> > > Thanks
>> > >
>> >
>>
>


Re: Retrieving Flink job ID/YARN Id programmatically

2019-11-28 Thread Arvid Heise
Hi Lei,

if you use

public JobExecutionResult StreamExecutionEnvironment#execute()

You can retrieve the job id through the result.

result.getJobID()

Best,

Arvid

On Mon, Nov 25, 2019 at 3:50 AM Ana  wrote:

> Hi Lei,
>
> To add, you may use Hadoop Resource Manager REST APIs
> https://hadoop.apache.org/docs/current/hadoop-yarn/hadoop-yarn-site/ResourceManagerRest.html.
> I'm also running Flink application on YARN and use this API for that
> purpose. If you find other way or a much better solution, please let me
> know!
>
> Regards,
> Ana
>
> On Fri, Nov 22, 2019 at 10:58 AM vino yang  wrote:
>
>> Hi Lei,
>>
>> It would be better to use Flink's RESTful API to fetch the information of
>> the running jobs[1].
>>
>> [1]:
>> https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/rest_api.html#jobs-1
>>
>> Best,
>> Vino
>>
>> Lei Nie  于2019年11月22日周五 上午4:14写道:
>>
>>> I looked at the code, and
>>> StreamExecutionEnvironment#getStreamGraph#getJobGraph#getJobID is
>>> generating a random ID unrelated to the actual ID used.
>>>
>>> Is there any way to fetch the real ID at runtime?
>>> Use case: fetch most recent checkpoint from stable storage for
>>> automated restarts. Most recent checkpoint has form
>>> ".../checkpoints/flink_app_id/chk-123"
>>>
>>> On Thu, Nov 21, 2019 at 11:28 AM Lei Nie  wrote:
>>> >
>>> > This does not get the correct id:
>>> > StreamExecutionEnvironment#getStreamGraph#getJobGraph#getJobID =
>>> > eea5abc21dd8743a4090f4a3a660f9e8
>>> > Actual job ID (from webUI): 1357d21be640b6a3b8a86a063f4bba8a
>>> >
>>> >
>>> >
>>> > On Thu, Nov 7, 2019 at 6:56 PM vino yang 
>>> wrote:
>>> > >
>>> > > Hi Lei Nie,
>>> > >
>>> > > You can use
>>> `StreamExecutionEnvironment#getStreamGraph#getJobGraph#getJobID` to get the
>>> job id.
>>> > >
>>> > > Best,
>>> > > Vino
>>> > >
>>> > > Lei Nie  于2019年11月8日周五 上午8:38写道:
>>> > >>
>>> > >> Hello,
>>> > >> I am currently executing streaming jobs via
>>> StreamExecutionEnvironment. Is it possible to retrieve the Flink job
>>> ID/YARN ID within the context of a job? I'd like to be able to
>>> automatically register the job such that monitoring jobs can run (REST api
>>> requires for example job id).
>>> > >>
>>> > >> Thanks
>>>
>>


Re: JobGraphs not cleaned up in HA mode

2019-11-28 Thread Vijay Bhaskar
One more thing:
You configured:
high-availability.cluster-id: /cluster-test
it should be:
high-availability.cluster-id: cluster-test
I don't think this is major issue, in case it helps, you can check.
Can you check one more thing:
Is check pointing happening or not?
Were you able to see the chk-* folder under checkpoint directory?

Regards
Bhaskar

On Thu, Nov 28, 2019 at 5:00 PM 曾祥才  wrote:

> hi,
> Is there any deference (for me using nas is more convenient to test
> currently)?
> from the docs seems hdfs ,s3, nfs etc all will be fine.
>
>
>
> -- 原始邮件 --
> *发件人:* "vino yang";
> *发送时间:* 2019年11月28日(星期四) 晚上7:17
> *收件人:* "曾祥才";
> *抄送:* "Vijay Bhaskar";"User-Flink"<
> user@flink.apache.org>;
> *主题:* Re: JobGraphs not cleaned up in HA mode
>
> Hi,
>
> Why do you not use HDFS directly?
>
> Best,
> Vino
>
> 曾祥才  于2019年11月28日周四 下午6:48写道:
>
>>
>> anyone have the same problem? pls help, thks
>>
>>
>>
>> -- 原始邮件 --
>> *发件人:* "曾祥才";
>> *发送时间:* 2019年11月28日(星期四) 下午2:46
>> *收件人:* "Vijay Bhaskar";
>> *抄送:* "User-Flink";
>> *主题:* 回复: JobGraphs not cleaned up in HA mode
>>
>> the config  (/flink is the NASdirectory ):
>>
>> jobmanager.rpc.address: flink-jobmanager
>> taskmanager.numberOfTaskSlots: 16
>> web.upload.dir: /flink/webUpload
>> blob.server.port: 6124
>> jobmanager.rpc.port: 6123
>> taskmanager.rpc.port: 6122
>> jobmanager.heap.size: 1024m
>> taskmanager.heap.size: 1024m
>> high-availability: zookeeper
>> high-availability.cluster-id: /cluster-test
>> high-availability.storageDir: /flink/ha
>> high-availability.zookeeper.quorum: :2181
>> high-availability.jobmanager.port: 6123
>> high-availability.zookeeper.path.root: /flink/risk-insight
>> high-availability.zookeeper.path.checkpoints: /flink/zk-checkpoints
>> state.backend: filesystem
>> state.checkpoints.dir: file:///flink/checkpoints
>> state.savepoints.dir: file:///flink/savepoints
>> state.checkpoints.num-retained: 2
>> jobmanager.execution.failover-strategy: region
>> jobmanager.archive.fs.dir: file:///flink/archive/history
>>
>>
>>
>> -- 原始邮件 --
>> *发件人:* "Vijay Bhaskar";
>> *发送时间:* 2019年11月28日(星期四) 下午3:12
>> *收件人:* "曾祥才";
>> *抄送:* "User-Flink";
>> *主题:* Re: JobGraphs not cleaned up in HA mode
>>
>> Can you share the flink configuration once?
>>
>> Regards
>> Bhaskar
>>
>> On Thu, Nov 28, 2019 at 12:09 PM 曾祥才  wrote:
>>
>>> if i clean the zookeeper data , it runs fine .  but next time when the
>>> jobmanager failed and redeploy the error occurs again
>>>
>>>
>>>
>>>
>>> -- 原始邮件 --
>>> *发件人:* "Vijay Bhaskar";
>>> *发送时间:* 2019年11月28日(星期四) 下午3:05
>>> *收件人:* "曾祥才";
>>> *主题:* Re: JobGraphs not cleaned up in HA mode
>>>
>>> Again it could not find the state store file: "Caused by:
>>> java.io.FileNotFoundException: /flink/ha/submittedJobGraph0c6bcff01199 "
>>>  Check why its unable to find.
>>> Better thing is: Clean up zookeeper state and check your configurations,
>>> correct them and restart cluster.
>>> Otherwise it always picks up corrupted state from zookeeper and it will
>>> never restart
>>>
>>> Regards
>>> Bhaskar
>>>
>>> On Thu, Nov 28, 2019 at 11:51 AM 曾祥才  wrote:
>>>
 i've made a misstake( the log before is another cluster) . the full
 exception log is :


 INFO  org.apache.flink.runtime.dispatcher.StandaloneDispatcher  -
 Recovering all persisted jobs.
 2019-11-28 02:33:12,726 INFO
 org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl  -
 Starting the SlotManager.
 2019-11-28 02:33:12,743 INFO
 org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore  -
 Released locks of job graph 639170a9d710bacfd113ca66b2aacefa from
 ZooKeeper.
 2019-11-28 02:33:12,744 ERROR
 org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Fatal error
 occurred in the cluster entrypoint.
 org.apache.flink.runtime.dispatcher.DispatcherException: Failed to take
 leadership with session id 607e1fe0-f1b4-4af0-af16-4137d779defb.
 at
 org.apache.flink.runtime.dispatcher.Dispatcher.lambda$null$30(Dispatcher.java:915)

 at
 java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)

 at
 java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)

 at
 java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)

 at
 java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:575)
 at
 java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:594)

 at
 java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)

 at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
 at
 akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)

 at 

?????? JobGraphs not cleaned up in HA mode

2019-11-28 Thread ??????
hi??
Is there any deference ??for me using nas is more convenient to test 
currently??? 
from the docs seems hdfs ,s3, nfs etc all will be fine.






----
??:"vino yang"http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: JobGraphs not cleaned up in HA mode

2019-11-28 Thread vino yang
Hi,

Why do you not use HDFS directly?

Best,
Vino

曾祥才  于2019年11月28日周四 下午6:48写道:

>
> anyone have the same problem? pls help, thks
>
>
>
> -- 原始邮件 --
> *发件人:* "曾祥才";
> *发送时间:* 2019年11月28日(星期四) 下午2:46
> *收件人:* "Vijay Bhaskar";
> *抄送:* "User-Flink";
> *主题:* 回复: JobGraphs not cleaned up in HA mode
>
> the config  (/flink is the NASdirectory ):
>
> jobmanager.rpc.address: flink-jobmanager
> taskmanager.numberOfTaskSlots: 16
> web.upload.dir: /flink/webUpload
> blob.server.port: 6124
> jobmanager.rpc.port: 6123
> taskmanager.rpc.port: 6122
> jobmanager.heap.size: 1024m
> taskmanager.heap.size: 1024m
> high-availability: zookeeper
> high-availability.cluster-id: /cluster-test
> high-availability.storageDir: /flink/ha
> high-availability.zookeeper.quorum: :2181
> high-availability.jobmanager.port: 6123
> high-availability.zookeeper.path.root: /flink/risk-insight
> high-availability.zookeeper.path.checkpoints: /flink/zk-checkpoints
> state.backend: filesystem
> state.checkpoints.dir: file:///flink/checkpoints
> state.savepoints.dir: file:///flink/savepoints
> state.checkpoints.num-retained: 2
> jobmanager.execution.failover-strategy: region
> jobmanager.archive.fs.dir: file:///flink/archive/history
>
>
>
> -- 原始邮件 --
> *发件人:* "Vijay Bhaskar";
> *发送时间:* 2019年11月28日(星期四) 下午3:12
> *收件人:* "曾祥才";
> *抄送:* "User-Flink";
> *主题:* Re: JobGraphs not cleaned up in HA mode
>
> Can you share the flink configuration once?
>
> Regards
> Bhaskar
>
> On Thu, Nov 28, 2019 at 12:09 PM 曾祥才  wrote:
>
>> if i clean the zookeeper data , it runs fine .  but next time when the
>> jobmanager failed and redeploy the error occurs again
>>
>>
>>
>>
>> -- 原始邮件 --
>> *发件人:* "Vijay Bhaskar";
>> *发送时间:* 2019年11月28日(星期四) 下午3:05
>> *收件人:* "曾祥才";
>> *主题:* Re: JobGraphs not cleaned up in HA mode
>>
>> Again it could not find the state store file: "Caused by:
>> java.io.FileNotFoundException: /flink/ha/submittedJobGraph0c6bcff01199 "
>>  Check why its unable to find.
>> Better thing is: Clean up zookeeper state and check your configurations,
>> correct them and restart cluster.
>> Otherwise it always picks up corrupted state from zookeeper and it will
>> never restart
>>
>> Regards
>> Bhaskar
>>
>> On Thu, Nov 28, 2019 at 11:51 AM 曾祥才  wrote:
>>
>>> i've made a misstake( the log before is another cluster) . the full
>>> exception log is :
>>>
>>>
>>> INFO  org.apache.flink.runtime.dispatcher.StandaloneDispatcher  -
>>> Recovering all persisted jobs.
>>> 2019-11-28 02:33:12,726 INFO
>>> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl  -
>>> Starting the SlotManager.
>>> 2019-11-28 02:33:12,743 INFO
>>> org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore  -
>>> Released locks of job graph 639170a9d710bacfd113ca66b2aacefa from
>>> ZooKeeper.
>>> 2019-11-28 02:33:12,744 ERROR
>>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Fatal error
>>> occurred in the cluster entrypoint.
>>> org.apache.flink.runtime.dispatcher.DispatcherException: Failed to take
>>> leadership with session id 607e1fe0-f1b4-4af0-af16-4137d779defb.
>>> at
>>> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$null$30(Dispatcher.java:915)
>>>
>>> at
>>> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
>>>
>>> at
>>> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
>>>
>>> at
>>> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
>>>
>>> at
>>> java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:575)
>>> at
>>> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:594)
>>>
>>> at
>>> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
>>>
>>> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
>>> at
>>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
>>>
>>> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>> at
>>> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>>
>>> at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>> at
>>> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>>
>>> Caused by: java.lang.RuntimeException:
>>> org.apache.flink.util.FlinkException: Could not retrieve submitted JobGraph
>>> from state handle under /639170a9d710bacfd113ca66b2aacefa. This indicates
>>> that the retrieved state handle is broken. Try cleaning the state handle
>>> store.
>>> at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:199)
>>> at
>>> org.apache.flink.util.function.FunctionUtils.lambda$uncheckedFunction$2(FunctionUtils.java:75)
>>>
>>> at
>>> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
>>> at
>>> 

Re: Flink 'Job Cluster' mode Ui Access

2019-11-28 Thread vino yang
Hi Jatin,

Which version do you use?

Best,
Vino

Jatin Banger  于2019年11月28日周四 下午5:03写道:

> Hi,
>
> I checked the log file there is no error.
> And I checked the pods internal ports by using rest api.
>
> # curl : 4081
> {"errors":["Not found."]}
> 4081 is the Ui port
>
> # curl :4081/config
> {"refresh-interval":3000,"timezone-name":"Coordinated Universal
> Time","timezone-offset":0,"flink-version":"","flink-revision":"ceba8af
> @ 11.02.2019 @ 22:17:09 CST"}
>
> # curl :4081/jobs
> {"jobs":[{"id":"___job_Id_","status":"RUNNING"}]}
>
> Which shows the state of the job as running.
>
> What else can we do ?
>
> Best regards,
> Jatin
>
> On Thu, Nov 28, 2019 at 1:28 PM vino yang  wrote:
>
>> Hi Jatin,
>>
>> Flink web UI does not depend on any deployment mode.
>>
>> You should check if there are error logs in the log file and the job
>> status is running state.
>>
>> Best,
>> Vino
>>
>> Jatin Banger  于2019年11月28日周四 下午3:43写道:
>>
>>> Hi,
>>>
>>> It seems there is Web Ui for Flink Session cluster, But for Flink Job
>>> Cluster it is Showing
>>>
>>> {"errors":["Not found."]}
>>>
>>> Is it the expected behavior for Flink Job Cluster Mode ?
>>>
>>> Best Regards,
>>> Jatin
>>>
>>


?????? JobGraphs not cleaned up in HA mode

2019-11-28 Thread ??????
anyone have the same problem?? pls help, thks






----
??:"??"http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

flink1.9.1 cep 出现反压及内存问题

2019-11-28 Thread 宋倚天
Hi all,我在使用flink cep的过程中遇到了如下问题(一个 A FollowedBy B 的 case)

Case1. 我构造的数据源中全是事件A,全速发送

该任务在执行数秒后阻塞(task之间不再有数据交换),源端的backpressure=1,一段时间后TM失去心跳响应,任务异常终止,此时TaskManager进程存活,但通过jstat观察内存使用情况后,发现内存被耗尽,且不停的FullGC

Case2. 我构造的数据源中交叉发送事件A和事件B,全速发送
该任务能够正常执行,但源端的backpressure=1,且整个任务的处理性能非常低,大约只有不到5000的eps

问题如下:
1. Flink CEP是否存在潜在的性能问题?
2. 为什么会出现OOM,是否使用不当导致的
3. 总数据量的大小应该小于3GB,是否是由于cep operator
state累积导致的OOM?如果是的话有没有办法可以解决(使用rocksdb state?)

FYI:
   数据类型是JSONObject(fast-json),单条大小约20Byte,累计发送数据15,000,000条
   事件时间为真实系统时间,由于发送线程未设置间隔,因此理论上所有数据的时间应当很接近(为了模拟高eps的场景)
   Flink版本是1.9.1
   集群由1个JobManager和2个TaskManager组成,单个TaskManager的heap大约16GB
   并发度为5
   使用Flink SQL MATCH_RECOGNIZE关键字来实现相同case时,没有出现内存溢出的问题,且反压相对正常(0.4-0.5)

代码如下

```java
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

/**
 * {"event_name": "事件A", "src_address": "1.1.1.1", "occur_time":
1574934481585, "payload": "..."}
 * {"event_name": "事件B", "src_address": "2.2.2.2", "occur_time":
1574934481586, "payload": "..."}
 */
DataStream stream = env.addSource(new
IncreasingFastJsonSource(15_000_000, 10))
.assignTimestampsAndWatermarks(new
BoundedOutOfOrdernessTimestampExtractor(Time.milliseconds(3000))
{
@Override
public long extractTimestamp(JSONObject element) {
return element.getLongValue("occur_time");
}
});

stream = stream.map(element -> element.fluentRemove("payload"))
.keyBy(element -> element.getIntValue("key"));

AfterMatchSkipStrategy skipStrategy =
AfterMatchSkipStrategy.skipPastLastEvent();
Pattern pattern = Pattern.
begin("A", skipStrategy).where(new
SimpleCondition() {
@Override
public boolean filter(JSONObject value) throws
Exception {
return
StringUtils.equals(value.getString("event_name"), "事件A");
}
})
.followedBy("B").where(new SimpleCondition() {
@Override
public boolean filter(JSONObject value) throws
Exception {
return
StringUtils.equals(value.getString("event_name"), "事件B");
}
})
.within(Time.seconds(10));

CEP.pattern(stream, pattern)
.select(FollowedBy::select)
.print();

env.execute("Benchmark");
```


Rest api:/jobs/:jobid/stop报405

2019-11-28 Thread Moc
大家好:

我在官方文档中看到了/jobs/:jobid/stop这个接口,文档上说这个接口可以生成savepoint。因此我想使用这个接口停止运行在yarn上的flink作业,但是会报错。
请问是不是这个接口当前还用不了?是否有其他代替的接口可用?

接口地址我尝试过:
http://host:8088/proxy/application_1572228642684_1907/v1/jobs/a1f207a50cdebd531824d9a4402162ca/stop
 和
http://host:8088/proxy/application_1572228642684_1907/jobs/a1f207a50cdebd531824d9a4402162ca/stop
使用post方式,调用都会报错:405 HTTP method POST is not supported by this URL

flink版本:1.9.1


Re: Flink 'Job Cluster' mode Ui Access

2019-11-28 Thread Jatin Banger
Hi,

I checked the log file there is no error.
And I checked the pods internal ports by using rest api.

# curl : 4081
{"errors":["Not found."]}
4081 is the Ui port

# curl :4081/config
{"refresh-interval":3000,"timezone-name":"Coordinated Universal
Time","timezone-offset":0,"flink-version":"","flink-revision":"ceba8af
@ 11.02.2019 @ 22:17:09 CST"}

# curl :4081/jobs
{"jobs":[{"id":"___job_Id_","status":"RUNNING"}]}

Which shows the state of the job as running.

What else can we do ?

Best regards,
Jatin

On Thu, Nov 28, 2019 at 1:28 PM vino yang  wrote:

> Hi Jatin,
>
> Flink web UI does not depend on any deployment mode.
>
> You should check if there are error logs in the log file and the job
> status is running state.
>
> Best,
> Vino
>
> Jatin Banger  于2019年11月28日周四 下午3:43写道:
>
>> Hi,
>>
>> It seems there is Web Ui for Flink Session cluster, But for Flink Job
>> Cluster it is Showing
>>
>> {"errors":["Not found."]}
>>
>> Is it the expected behavior for Flink Job Cluster Mode ?
>>
>> Best Regards,
>> Jatin
>>
>


Re: What happens to the channels when there is backpressure?

2019-11-28 Thread Felipe Gutierrez
Hi Yingjie,

I read this post and the next one as well (
https://flink.apache.org/2019/07/23/flink-network-stack-2.html).

I mean the bandwidth of the channels between two physical operators. When
they are in different hosts, so when the channels are a network channel.

Thanks
*--*
*-- Felipe Gutierrez*

*-- skype: felipe.o.gutierrez*
*--* *https://felipeogutierrez.blogspot.com
*


On Thu, Nov 28, 2019 at 5:07 AM yingjie cao  wrote:

> Hi Felipe,
>
> That depends on what do you mean by 'bandwidth'. If you mean the
> capability of the network stack, the answer would be no.
>
> Here is a post about Flink network stack which may help:
> https://flink.apache.org/2019/06/05/flink-network-stack.html.
>
> Thanks,
> Yingjie
>
> Felipe Gutierrez  于2019年11月27日周三 下午11:13写道:
>
>> Hi community,
>>
>> I have a question about backpressure. Suppose a scenario that I have a
>> map and a reducer, and the reducer is back pressuring the map operator. I
>> know that the reducer is processing tuples at a lower rate than it is
>> receiving.
>>
>> However, can I say that at least one channel between the map and the
>> reducer is totally using its available bandwidth?
>>
>> My guess is it is not, at least in the beginning. But as the time goes on
>> the tuples will be queued in the network buffer of the reducer and then the
>> bandwidth will be 100% of usage. Am I right?
>>
>> Thanks,
>> Felipe
>>
>>
>>