Re: Where are the TaskManagers IPs and Ports stored?

2018-10-15 Thread vino yang
Hi Chris,

Please refer to official documentation [1][2].

The tm's RPC port is even dynamic, and Flink does not persist it.
You can view them from the "Task Managers" menu on the left side of the
Flink web UI.

Thanks, vino.

[1]:
https://ci.apache.org/projects/flink/flink-docs-release-1.6/ops/config.html#taskmanager-rpc-port
[2]:
https://ci.apache.org/projects/flink/flink-docs-release-1.6/ops/config.html#taskmanager-host

Chris Miller  于2018年10月15日周一 下午8:18写道:

>
>
> Hi,
>
> I'm looking for the property file where IP and Port of TaskManagers are
> stored in Flink.
> Does anyone know where it is located and when it's updated?
>
> (And for the case that there should not be such a file, where does the
> JobManager and TaskManagers take this information from?)
>
>
>
> Thanks.
>
>
>
> Chris
>
>


Re: Can't start cluster

2018-10-15 Thread vino yang
Hi Mar_zieh,

For questions about Python, you can ask Chesnay, I try to answer.

Pyflink.bat is a script for running Flink programs for flink-python*.jar.
You can see the implementation of it. Flink supports Jython.
Also, if Python is your main programming language, you can try using the
Apache Beam Python SDK + Flink Runner, which seems to be a better choice.

You can let Flink run on windnows (JVM is cross-platform), but it is not
recommended, it is recommended to run on unix/linux.

Thanks, vino.

Mar_zieh  于2018年10月15日周一 下午9:07写道:

> Hello
>
> I have two question, would you please answer to me?
> Please tell me "what is "pyflink.bat" for?" Does Flink support python or
> not?
> Moreover, I want to have multiprocessing with Flink on windows. Is it
> possible?
>
> Thank you in advance
>
>
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: Flink1.6 Confirm that flink supports the plan function?

2018-10-15 Thread wangziyu
Hi,
1.I think I have some questions.When I have not submitted my jobs,I need
to read a job to contol jobs.It is mean that If I want to cancel a job which
is running , I can contol it. Maybe my jobs it is not running,I also can
contol them.
2.When I run a job in terminal and pass program arguments . I don't want
wo change my program arguments. I want to get my arguments by java. If I
have some jobs . I want to get all of jobs arguments.
Thanks for you letter ,I an looking forward to you next latter!



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Flink1.6 Confirm that flink supports the plan function?

2018-10-15 Thread wangziyu
Hi,
Thanks for you reply!I will read this [1] and [2] carefully!If I also
have question ,Hope to hear from you again.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: The best way to get processing time of each operator?

2018-10-15 Thread Hequn Cheng
Hi Folani,

I see one option that we can achieve this through metrics[1].
Each operator can report it's processing time as a metric. These metrics
can be gathered and queried later. For example, you can get a metric for a
specified TaskManager or get max/min/avg value of all TaskManagers.

Best, Hequn

[1]
https://ci.apache.org/projects/flink/flink-docs-master/monitoring/metrics.html


On Mon, Oct 15, 2018 at 10:26 PM Folani  wrote:

> I'm going to work on effect of parallelism for different operators on
> heterogeneous machines.
> I need to know the processing time of each operator instance as well as
> overall processing time of all instances of each specific operator.
> I think there are different ways for this purpose.
> However, what is the best way to getting these times as precise as
> possible?
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: [DISCUSS] Integrate Flink SQL well with Hive ecosystem

2018-10-15 Thread Zhang, Xuefu
Hi Shuyi,

Thank you for your input. Yes, I agreed with a phased approach and like to move 
forward fast. :) We did some work internally on DDL utilizing babel parser in 
Calcite. While babel makes Calcite's grammar extensible, at first impression it 
still seems too cumbersome for a project when too much extensions are made. 
It's even challenging to find where the extension is needed! It would be 
certainly better if Calcite can magically support Hive QL by just turning on a 
flag, such as that for MYSQL_5. I can also see that this could mean a lot of 
work on Calcite. Nevertheless, I will bring up the discussion over there and to 
see what their community thinks.

Would mind to share more info about the proposal on DDL that you mentioned? We 
can certainly collaborate on this.

Thanks,
Xuefu


--
Sender:Shuyi Chen 
Sent at:2018 Oct 14 (Sun) 08:30
Recipient:Xuefu 
Cc:yanghua1127 ; Fabian Hueske ; dev 
; user 
Subject:Re: [DISCUSS] Integrate Flink SQL well with Hive ecosystem

Welcome to the community and thanks for the great proposal, Xuefu! I think the 
proposal can be divided into 2 stages: making Flink to support Hive features, 
and make Hive to work with Flink. I agreed with Timo that on starting with a 
smaller scope, so we can make progress faster. As for [6], a proposal for DDL 
is already in progress, and will come after the unified SQL connector API is 
done. For supporting Hive syntax, we might need to work with the Calcite 
community, and a recent effort called babel 
(https://issues.apache.org/jira/browse/CALCITE-2280) in Calcite might help here.

Thanks
Shuyi
On Wed, Oct 10, 2018 at 8:02 PM Zhang, Xuefu  wrote:

Hi Fabian/Vno,

Thank you very much for your encouragement inquiry. Sorry that I didn't see 
Fabian's email until I read Vino's response just now. (Somehow Fabian's went to 
the spam folder.)

My proposal contains long-term and short-terms goals. Nevertheless, the effort 
will focus on the following areas, including Fabian's list:

1. Hive metastore connectivity - This covers both read/write access, which 
means Flink can make full use of Hive's metastore as its catalog (at least for 
the batch but can extend for streaming as well).
2. Metadata compatibility - Objects (databases, tables, partitions, etc) 
created by Hive can be understood by Flink and the reverse direction is true 
also.
3. Data compatibility - Similar to #2, data produced by Hive can be consumed by 
Flink and vise versa.
4. Support Hive UDFs - For all Hive's native udfs, Flink either provides its 
own implementation or make Hive's implementation work in Flink. Further, for 
user created UDFs in Hive, Flink SQL should provide a mechanism allowing user 
to import them into Flink without any code change required.
5. Data types -  Flink SQL should support all data types that are available in 
Hive.
6. SQL Language - Flink SQL should support SQL standard (such as SQL2003) with 
extension to support Hive's syntax and language features, around DDL, DML, and 
SELECT queries.
7.  SQL CLI - this is currently developing in Flink but more effort is needed.
8. Server - provide a server that's compatible with Hive's HiverServer2 in 
thrift APIs, such that HiveServer2 users can reuse their existing client (such 
as beeline) but connect to Flink's thrift server instead.
9. JDBC/ODBC drivers - Flink may provide its own JDBC/ODBC drivers for other 
application to use to connect to its thrift server
10. Support other user's customizations in Hive, such as Hive Serdes, storage 
handlers, etc.
11. Better task failure tolerance and task scheduling at Flink runtime.

As you can see, achieving all those requires significant effort and across all 
layers in Flink. However, a short-term goal could  include only core areas 
(such as 1, 2, 4, 5, 6, 7) or start  at a smaller scope (such as #3, #6).

Please share your further thoughts. If we generally agree that this is the 
right direction, I could come up with a formal proposal quickly and then we can 
follow up with broader discussions.

Thanks,
Xuefu



--
Sender:vino yang 
Sent at:2018 Oct 11 (Thu) 09:45
Recipient:Fabian Hueske 
Cc:dev ; Xuefu ; user 

Subject:Re: [DISCUSS] Integrate Flink SQL well with Hive ecosystem

Hi Xuefu,

Appreciate this proposal, and like Fabian, it would look better if you can give 
more details of the plan.

Thanks, vino.
Fabian Hueske  于2018年10月10日周三 下午5:27写道:
Hi Xuefu,

Welcome to the Flink community and thanks for starting this discussion! Better 
Hive integration would be really great!
Can you go into details of what you are proposing? I can think of a couple ways 
to improve Flink in that regard:

* Support for Hive UDFs
* Support for Hive metadata catalog
* Support for HiveQL syntax
* ???

Best, Fabian

Am Di., 9. Okt. 2018 um 19:22 Uhr schrieb Zhang, Xuefu 
:
Hi all,

 Along with the community's effort, inside Alibaba we have 

Re: [DISCUSS] Integrate Flink SQL well with Hive ecosystem

2018-10-15 Thread Zhang, Xuefu
Hi Bowen,

Thank you for your feedback and interest in the project. Your contribution is 
certainly welcome. Per your suggestion, I have created an Uber JIRA 
(https://issues.apache.org/jira/browse/FLINK-10556) to track our overall effort 
on this. For each subtask, we'd like to see a short description on the status 
quo and what is planned to add or change. Design doc should be provided when 
it's deemed necessary.

I'm looking forward to seeing your contributions!

Thanks,
Xuefu



Thanks,
Xuefu 


--
Sender:Bowen 
Sent at:2018 Oct 13 (Sat) 21:55
Recipient:Xuefu ; Fabian Hueske 
Cc:dev ; user 
Subject:Re: [DISCUSS] Integrate Flink SQL well with Hive ecosystem


Thank you Xuefu, for bringing up this awesome, detailed proposal! It will 
resolve lots of existing pain for users like me.

In general, I totally agree that improving FlinkSQL's completeness would be a 
much better start point than building 'Hive on Flink', as the Hive community is 
concerned about Flink's SQL incompleteness and lack of proven batch performance 
shown in https://issues.apache.org/jira/browse/HIVE-10712. Improving FlinkSQL 
seems a more natural direction to start with in order to achieve the 
integration.

Xuefu and Timo has laid a quite clear path of what to tackle next. Given that 
there're already some efforts going on, for item 1,2,5,3,4,6 in Xuefu's list, 
shall we:

identify gaps between a) Xuefu's proposal/discussion result in this thread and 
b) all the ongoing work/discussions?
then, create some new top-level JIRA tickets to keep track of and start more 
detailed discussions with?
It's gonna be a great and influential project , and I'd love to participate 
into it to move FlinkSQL's adoption and ecosystem even further.

Thanks,
Bowen


在 2018年10月12日,下午3:37,Jörn Franke  写道:


Thank you very nice , I fully agree with that. 

Am 11.10.2018 um 19:31 schrieb Zhang, Xuefu :

Hi Jörn,

Thanks for your feedback. Yes, I think Hive on Flink makes sense and in fact it 
is one of the two approaches that I named in the beginning of the thread. As 
also pointed out there, this isn't mutually exclusive from work we proposed 
inside Flink and they target at different user groups and user cases. Further, 
what we proposed to do in Flink should be a good showcase that demonstrate 
Flink's capabilities in batch processing and convince Hive community of the 
worth of a new engine. As you might know, the idea encountered some doubt and 
resistance. Nevertheless, we do have a solid plan for Hive on Flink, which we 
will execute once Flink SQL is in a good shape.

I also agree with you that Flink SQL shouldn't be closely coupled with Hive. 
While we mentioned Hive in many of the proposed items, most of them are coupled 
only in concepts and functionality rather than code or libraries. We are taking 
the advantage of the connector framework in Flink. The only thing that might be 
exceptional is to support Hive built-in UDFs, which we may not make it work out 
of the box to avoid the coupling. We could, for example, require users bring in 
Hive library and register themselves. This is subject to further discussion.

#11 is about Flink runtime enhancement that is meant to make task failures more 
tolerable (so that the job don't have to start from the beginning in case of 
task failures) and to make task scheduling more resource-efficient. Flink's 
current design in those two aspects leans more to stream processing, which may 
not be good enough for batch processing. We will provide more detailed design 
when we get to them.

Please let me know if you have further thoughts or feedback.

Thanks,
Xuefu


--
Sender:Jörn Franke 
Sent at:2018 Oct 11 (Thu) 13:54
Recipient:Xuefu 
Cc:vino yang ; Fabian Hueske ; dev 
; user 
Subject:Re: [DISCUSS] Integrate Flink SQL well with Hive ecosystem

Would it maybe make sense to provide Flink as an engine on Hive 
(„flink-on-Hive“)? Eg to address 4,5,6,8,9,10. this could be more loosely 
coupled than integrating hive in all possible flink core modules and thus 
introducing a very tight dependency to Hive in the core.
1,2,3 could be achieved via a connector based on the Flink Table API.
Just as a proposal to start this Endeavour as independent projects (hive 
engine, connector) to avoid too tight coupling with Flink. Maybe in a more 
distant future if the Hive integration is heavily demanded one could then 
integrate it more tightly if needed. 

What is meant by 11?
Am 11.10.2018 um 05:01 schrieb Zhang, Xuefu :

Hi Fabian/Vno,

Thank you very much for your encouragement inquiry. Sorry that I didn't see 
Fabian's email until I read Vino's response just now. (Somehow Fabian's went to 
the spam folder.)

My proposal contains long-term and short-terms goals. Nevertheless, the effort 
will focus on the following areas, including Fabian's list:

1. Hive metastore connectivity - This covers both 

Re: Job Records In/Out metrics clarification

2018-10-15 Thread Rafi Aroch
Awesome, thanks!

On Mon, Oct 15, 2018, 17:36 Chesnay Schepler  wrote:

> There is a known issue in 1.5.0 with the numRecordsIn/Out metrics for
> chained tasks: https://issues.apache.org/jira/browse/FLINK-9530
>
> This has been fixed in 1.5.1.
>
> On 15.10.2018 14:37, Rafi Aroch wrote:
>
> Hi,
>
> Below is the In/Out metrics as they appear in the Flink UI.
> I was wondering what are the possible reasons that the "Records sent" of
> one operator is different than the "Records received"  of the next one. I
> would expect to see the same number...
>
> [image: image.png]
>
> * We're using Flink 1.5.0 version.
>
> Thanks,
> Rafi
>
>
>


[Flink 1.6.1] local app infinitely hangs

2018-10-15 Thread Rinat
Hi mates, during the migration to Flink 1.6.1, I've faced with the following issue - flink application hangs infinitely, when application is running from IDE, at the same time, everything works fine on cluster.Seems that problem is occurred during termination of Flink cluster, I’ve checked in 1.6.0 version everything works fine.I was trying to find the problem, but didn’t find anything valuable yet. Maybe someone can help me.During research, I’ve found the following difference: in both apps job is moved into FINISHED state, but in hanging one I couldn’t find the following log messageorg.apache.flink.runtime.dispatcher.StandaloneDispatcher | Job d961010e1e73b804b48f6adf0f102345 reached globally terminal state FINISHED.Maybe it will be useful, also I've attached logs from both versions of my application and thread dump of hanging one, I’ll continue my research, all ideas and suggestions will very useful.Thx !Here is a code snippet, that I’m trying to runimport org.apache.flink.api.common.typeinfo.TypeInformationimport org.apache.flink.api.common.typeutils.base.StringSerializerimport org.apache.flink.streaming.api.functions.source.FromElementsFunctionimport org.apache.flink.streaming.api.scala.StreamExecutionEnvironmentimport org.scalatest.Matchersimport org.scalatest.WordSpecclass ApplicationJobSpec extends WordSpec with Matchers {  "The ApplicationJob" should {"integration test" in {  val serializer = new StringSerializer()  val fixedSource = new FromElementsFunction[String](serializer, "my element")  val environment = StreamExecutionEnvironment.createLocalEnvironment(1)  implicit val typeInfo = TypeInformation.of(classOf[String])  environment.addSource(fixedSource).print()  environment.execute()}  }}

flink-1.6.0.log
Description: Binary data


flink-1.6.1.log
Description: Binary data
Attaching to process ID 19156, please wait...
2018-10-15 17:45:42
Full thread dump Java HotSpot(TM) 64-Bit Server VM (25.31-b07 mixed mode):

"Attach Listener" #71 daemon prio=9 os_prio=31 tid=0x7f9b69f39000 
nid=0x852f waiting on condition [0x]
   java.lang.Thread.State: RUNNABLE

   Locked ownable synchronizers:
- None

"ForkJoinPool.commonPool-worker-1" #70 daemon prio=5 os_prio=31 
tid=0x7f9b757f7000 nid=0x7733 waiting on condition [0x7240d000]
   java.lang.Thread.State: TIMED_WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for  <0x000740463608> (a 
java.util.concurrent.ForkJoinPool)
at java.util.concurrent.ForkJoinPool.awaitWork(ForkJoinPool.java:1756)
at java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1696)
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1644)
at 
java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)

   Locked ownable synchronizers:
- None

"Flink-MetricRegistry-thread-1" #37 daemon prio=5 os_prio=31 
tid=0x7f9b58b1a800 nid=0x7303 waiting on condition [0x72c25000]
   java.lang.Thread.State: TIMED_WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for  <0x00074011d1e0> (a 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at 
java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1093)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809)
at 
java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1067)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

   Locked ownable synchronizers:
- None

"jobmanager-future-thread-4" #36 daemon prio=5 os_prio=31 
tid=0x7f9b777ec000 nid=0x7103 waiting on condition [0x72b22000]
   java.lang.Thread.State: TIMED_WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for  <0x000797b40668> (a 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at 
java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1093)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809)
at 

Re: Job Records In/Out metrics clarification

2018-10-15 Thread Chesnay Schepler
There is a known issue in 1.5.0 with the numRecordsIn/Out metrics for 
chained tasks: https://issues.apache.org/jira/browse/FLINK-9530


This has been fixed in 1.5.1.

On 15.10.2018 14:37, Rafi Aroch wrote:

Hi,

Below is the In/Out metrics as they appear in the Flink UI.
I was wondering what are the possible reasons that the "Records sent" 
of one operator is different than the "Records received"  of the next 
one. I would expect to see the same number...


image.png

* We're using Flink 1.5.0 version.

Thanks,
Rafi





The best way to get processing time of each operator?

2018-10-15 Thread Folani
I'm going to work on effect of parallelism for different operators on
heterogeneous machines.
I need to know the processing time of each operator instance as well as
overall processing time of all instances of each specific operator.
I think there are different ways for this purpose.
However, what is the best way to getting these times as precise as possible? 



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Can't start cluster

2018-10-15 Thread Mar_zieh
Hello

I have two question, would you please answer to me?
Please tell me "what is "pyflink.bat" for?" Does Flink support python or
not?
Moreover, I want to have multiprocessing with Flink on windows. Is it
possible? 

Thank you in advance





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Can't start cluster

2018-10-15 Thread Mar_zieh
Hello

I have two question, would you please answer to me?
Please tell me "what is "pyflink.bat" for?" Does Flink support python or
not?
Moreover, I want to have multiprocessing with Flink on windows. Is it
possible? 

Thank you in advance





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Job Records In/Out metrics clarification

2018-10-15 Thread Rafi Aroch
Hi,

Below is the In/Out metrics as they appear in the Flink UI.
I was wondering what are the possible reasons that the "Records sent" of
one operator is different than the "Records received"  of the next one. I
would expect to see the same number...

[image: image.png]

* We're using Flink 1.5.0 version.

Thanks,
Rafi


Re: Re: Why am I getting AWS access denied error for request type [DeleteObjectRequest] in S3?

2018-10-15 Thread Kumar Bolar, Harshith
Thanks Amit,

I’m now in the process of checking our IAM roles to see if the user has been 
given DeleteObject permission to S3. I’m guessing that’s the most likely cause 
for this error.

- Harshith

From: Amit Jain 
Date: Monday, 15 October 2018 at 4:46 PM
To: Harshith Kumar Bolar 
Cc: "user@flink.apache.org" 
Subject: [External] Re: Why am I getting AWS access denied error for request 
type [DeleteObjectRequest] in S3?

Hi Harshith,


Did you enable delete permission on S3 for running machines? Are you using IAM 
roles or access key id and secret access key combo?


--
Thanks,
Amit

On Mon, Oct 15, 2018 at 3:15 PM Kumar Bolar, Harshith 
mailto:hk...@arity.com>> wrote:
Hi all,

We store Flink checkpoints in Amazon S3. Flink periodically sends out GET, PUT, 
LIST, DELETE requests to S3, to store-clear checkpoints. From the logs, we see 
that GET, PUT and LIST requests are successful but it throws an AWS access 
denied error for DELETE request.

Here’s a snippet of the logs for DELETE request –

2018-10-15 04:13:22,819 INFO  
org.apache.flink.fs.s3presto.shaded.com.amazonaws.latency - 
ServiceName=[Amazon S3], AWSErrorCode=[AccessDenied], StatusCode=[403], 
ServiceEndpoint=[https://xxx-xxx-prod.s3.amazonaws.com],
 
Exception=[org.apache.flink.fs.s3presto.shaded.com.amazonaws.services.s3.model.AmazonS3Exception:
 Access Denied (Service: Amazon S3; Status Code: 403; Error Code: AccessDenied; 
Request ID: x), S3 Extended Request ID: 
xxx], 
RequestType=[DeleteObjectRequest], AWSRequestID=[XX], 
HttpClientPoolPendingCount=0, RetryCapacityConsumed=0, 
HttpClientPoolAvailableCount=1, RequestCount=1, Exception=1, 
HttpClientPoolLeasedCount=0, ClientExecuteTime=[4.984], 
HttpClientSendRequestTime=[0.029], HttpRequestTime=[4.84], 
RequestSigningTime=[0.038], CredentialsRequestTime=[0.0, 0.0], 
HttpClientReceiveResponseTime=[4.78]

Is there some configuration that we’re forgetting that is preventing Flink from 
sending DELETE requests to S3?

I’d be happy to provide more information if needed.

Thanks,
Harshith




Where are the TaskManagers IPs and Ports stored?

2018-10-15 Thread Chris Miller
 

Hi, 

I'm looking for the property file where IP and Port of TaskManagers are
stored in Flink.
Does anyone know where it is located and when it's updated? 

(And for the case that there should not be such a file, where does the
JobManager and TaskManagers take this information from?) 

Thanks. 

Chris 
 

Re: Data loss when restoring from savepoint

2018-10-15 Thread Juho Autio
Hi Stefan,

Sorry but it doesn't seem immediately clear to me what's a good way to use
https://github.com/king/bravo.

How are people using it? Would you for example modify build.gradle somehow
to publish the bravo as a library locally/internally? Or add code directly
in the bravo project (locally) and run it from there (using an IDE, for
example)? Also it doesn't seem like the bravo gradle project supports
building a flink job jar, but if it does, how do I do it?

Thanks.

On Thu, Oct 4, 2018 at 9:30 PM Juho Autio  wrote:

> Good then, I'll try to analyze the savepoints with Bravo. Thanks!
>
> > How would you assume that backpressure would influence your updates?
> Updates to each local state still happen event-by-event, in a single
> reader/writing thread.
>
> Sure, just an ignorant guess by me. I'm not familiar with most of Flink's
> internals. Any way high backpressure is not a seen on this job after it has
> caught up the lag, so at I thought it would be worth mentioning.
>
> On Thu, Oct 4, 2018 at 6:24 PM Stefan Richter 
> wrote:
>
>> Hi,
>>
>> Am 04.10.2018 um 16:08 schrieb Juho Autio :
>>
>> > you could take a look at Bravo [1] to query your savepoints and to
>> check if the state in the savepoint complete w.r.t your expectations
>>
>> Thanks. I'm not 100% if this is the case, but to me it seemed like the
>> missed ids were being logged by the reducer soon after the job had started
>> (after restoring a savepoint). But on the other hand, after that I also
>> made another savepoint & restored that, so what I could check is: does that
>> next savepoint have the missed ids that were logged (a couple of minutes
>> before the savepoint was created, so there should've been more than enough
>> time to add them to the state before the savepoint was triggered) or not.
>> Any way, if I would be able to verify with Bravo that the ids are missing
>> from the savepoint (even though reduced logged that it saw them), would
>> that help in figuring out where they are lost? Is there some major
>> difference compared to just looking at the final output after window has
>> been triggered?
>>
>>
>>
>> I think that makes a difference. For example, you can investigate if
>> there is a state loss or a problem with the windowing. In the savepoint you
>> could see which keys exists and to which windows they are assigned. Also
>> just to make sure there is no misunderstanding: only elements that are in
>> the state at the start of a savepoint are expected to be part of the
>> savepoint; all elements between start and completion of the savepoint are
>> not expected to be part of the savepoint.
>>
>>
>> > I also doubt that the problem is about backpressure after restore,
>> because the job will only continue running after the state restore is
>> already completed.
>>
>> Yes, I'm not suspecting that the state restoring would be the problem
>> either. My concern was about backpressure possibly messing with the updates
>> of reducing state? I would tend to suspect that updating the state
>> consistently is what fails, where heavy load / backpressure might be a
>> factor.
>>
>>
>>
>> How would you assume that backpressure would influence your updates?
>> Updates to each local state still happen event-by-event, in a single
>> reader/writing thread.
>>
>>
>> On Thu, Oct 4, 2018 at 4:18 PM Stefan Richter <
>> s.rich...@data-artisans.com> wrote:
>>
>>> Hi,
>>>
>>> you could take a look at Bravo [1] to query your savepoints and to check
>>> if the state in the savepoint complete w.r.t your expectations. I somewhat
>>> doubt that there is a general problem with the state/savepoints because
>>> many users are successfully running it on a large state and I am not aware
>>> of any data loss problems, but nothing is impossible. What the savepoint
>>> does is also straight forward: iterate a db snapshot and write all
>>> key/value pairs to disk, so all data that was in the db at the time of the
>>> savepoint, should show up. I also doubt that the problem is about
>>> backpressure after restore, because the job will only continue running
>>> after the state restore is already completed. Did you check if you are
>>> using exactly-one-semantics or at-least-once semantics? Also did you check
>>> that the kafka consumer start position is configured properly [2]? Are
>>> watermarks generated as expected after restore?
>>>
>>> One more unrelated high-level comment that I have: for a granularity of
>>> 24h windows, I wonder if it would not make sense to use a batch job instead?
>>>
>>> Best,
>>> Stefan
>>>
>>> [1] https://github.com/king/bravo
>>> [2]
>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-consumers-start-position-configuration
>>>
>>> Am 04.10.2018 um 14:53 schrieb Juho Autio :
>>>
>>> Thanks for the suggestions!
>>>
>>> > In general, it would be tremendously helpful to have a minimal working
>>> example which allows to reproduce the problem.
>>>
>>> Definitely. The problem with reproducing has 

Re: Why am I getting AWS access denied error for request type [DeleteObjectRequest] in S3?

2018-10-15 Thread Amit Jain
Hi Harshith,

Did you enable delete permission on S3 for running machines? Are you using
IAM roles or access key id and secret access key combo?

--
Thanks,
Amit

On Mon, Oct 15, 2018 at 3:15 PM Kumar Bolar, Harshith 
wrote:

> Hi all,
>
>
>
> We store Flink checkpoints in Amazon S3. Flink periodically sends out GET,
> PUT, LIST, DELETE requests to S3, to store-clear checkpoints. From the
> logs, we see that GET, PUT and LIST requests are successful but it throws
> an AWS access denied error for DELETE request.
>
>
>
> Here’s a snippet of the logs for DELETE request –
>
>
>
> 2018-10-15 04:13:22,819 INFO
> org.apache.flink.fs.s3presto.shaded.com.amazonaws.latency -
> ServiceName=[Amazon S3], AWSErrorCode=[AccessDenied], StatusCode=[403],
> ServiceEndpoint=[https://xxx-xxx-prod.s3.amazonaws.com],
> Exception=[org.apache.flink.fs.s3presto.shaded.com.amazonaws.services.s3.model.AmazonS3Exception:
> Access Denied (Service: Amazon S3; *Status Code: 403; Error Code:
> AccessDenied;* Request ID: x), S3 Extended Request ID:
> xxx],
> *RequestType=[DeleteObjectRequest]*, AWSRequestID=[XX],
> HttpClientPoolPendingCount=0, RetryCapacityConsumed=0,
> HttpClientPoolAvailableCount=1, RequestCount=1, Exception=1,
> HttpClientPoolLeasedCount=0, ClientExecuteTime=[4.984],
> HttpClientSendRequestTime=[0.029], HttpRequestTime=[4.84],
> RequestSigningTime=[0.038], CredentialsRequestTime=[0.0, 0.0],
> HttpClientReceiveResponseTime=[4.78]
>
>
>
> Is there some configuration that we’re forgetting that is preventing Flink
> from sending DELETE requests to S3?
>
>
>
> I’d be happy to provide more information if needed.
>
>
>
> Thanks,
>
> Harshith
>
>
>
>
>


Re: Error restoring from savepoint while there's no modification to the job

2018-10-15 Thread Averell
Thank you Stefan, I'll try to follow your guide to debug.

And sorry for being confusing in the previous email. When I said "different
builds", I meant different versions of my application, not different builds
of Flink. 

Between versions of my application, I do add/remove some operators. However,
as I mentioned from the 1st email, I got errors when restoring savepoint
created by the same version of my application.

Thanks and best regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Flink does not checkpoint if operator is in Finished state

2018-10-15 Thread Chesnay Schepler

Known issue: https://issues.apache.org/jira/browse/FLINK-2491

On 15.10.2018 12:23, Kien Truong wrote:



Hi,
As mentioned in the title, my Flink job will not check point if there 
are any finished source operator.


Is this a bug or is it working as intended ?

Regards,
Kien





Flink does not checkpoint if operator is in Finished state

2018-10-15 Thread Kien Truong
Hi,
As mentioned in the title, my Flink job will not check point if there are
any finished source operator.

Is this a bug or is it working as intended ?

Regards,
Kien


Re: Error restoring from savepoint while there's no modification to the job

2018-10-15 Thread Stefan Richter
Hi,

I see, then the important question for me is if the problem exists on the 
release/master code or just on your branches. Of course we can hardly give any 
advice for custom builds and without any code. In general, you should debug in 
HeapKeyedStateBackend lines lines 774-776 (the write part) and check against 
472-474 (the read part). What happens there is very straight forward: remember 
the offset of the output stream, write the key-group. The read part the seeks 
to the remembered offset and reads the key-group. They must match.

Best,
Stefan

> On 15. Oct 2018, at 11:35, Averell  wrote:
> 
> Hi Kostas, Stefan,
> 
> The problem doesn't come on all of my builds, so it is a little bit
> difficult to track. Are there any specific classes that I can turn DEBUG on
> to help in finding the problem? (Turning DEBUG on globally seems too much).
> Will try to minimize the code and post it.
> 
> One more point that I notice is the error doesn't stay on one single
> operator but changes from time to time (even within the same build). For
> example, the previous exception I quoted was with a Window operator, while
> the one below is with CoStreamFlatMap.
> 
> Thanks and best regards,
> Averell
> 
> Caused by: java.lang.Exception: Exception while creating
> StreamOperatorStateContext.
>   at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:192)
>   at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:250)
>   at
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
>   at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.util.FlinkException: Could not restore keyed
> state backend for *CoStreamFlatMap*_68cd726422cf10170c4d6c7fd52ed309_(12/64)
> from any of the 1 provided restore options.
>   at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:137)
>   at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:279)
>   at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:133)
>   ... 5 more
> Caused by: java.lang.IllegalStateException: Unexpected key-group in restore.
>   at 
> org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
>   at
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.readStateHandleStateData(HeapKeyedStateBackend.java:475)
>   at
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restorePartitionedState(HeapKeyedStateBackend.java:438)
>   at
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:377)
>   at
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:105)
>   at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:151)
>   at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:123)
>   ... 7 more
> 
> 
> 
> 
> 
> 
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



Why am I getting AWS access denied error for request type [DeleteObjectRequest] in S3?

2018-10-15 Thread Kumar Bolar, Harshith
Hi all,

We store Flink checkpoints in Amazon S3. Flink periodically sends out GET, PUT, 
LIST, DELETE requests to S3, to store-clear checkpoints. From the logs, we see 
that GET, PUT and LIST requests are successful but it throws an AWS access 
denied error for DELETE request.

Here’s a snippet of the logs for DELETE request –

2018-10-15 04:13:22,819 INFO  
org.apache.flink.fs.s3presto.shaded.com.amazonaws.latency - 
ServiceName=[Amazon S3], AWSErrorCode=[AccessDenied], StatusCode=[403], 
ServiceEndpoint=[https://xxx-xxx-prod.s3.amazonaws.com], 
Exception=[org.apache.flink.fs.s3presto.shaded.com.amazonaws.services.s3.model.AmazonS3Exception:
 Access Denied (Service: Amazon S3; Status Code: 403; Error Code: AccessDenied; 
Request ID: x), S3 Extended Request ID: 
xxx], 
RequestType=[DeleteObjectRequest], AWSRequestID=[XX], 
HttpClientPoolPendingCount=0, RetryCapacityConsumed=0, 
HttpClientPoolAvailableCount=1, RequestCount=1, Exception=1, 
HttpClientPoolLeasedCount=0, ClientExecuteTime=[4.984], 
HttpClientSendRequestTime=[0.029], HttpRequestTime=[4.84], 
RequestSigningTime=[0.038], CredentialsRequestTime=[0.0, 0.0], 
HttpClientReceiveResponseTime=[4.78]

Is there some configuration that we’re forgetting that is preventing Flink from 
sending DELETE requests to S3?

I’d be happy to provide more information if needed.

Thanks,
Harshith




Re: Error restoring from savepoint while there's no modification to the job

2018-10-15 Thread Averell
Hi Kostas, Stefan,

The problem doesn't come on all of my builds, so it is a little bit
difficult to track. Are there any specific classes that I can turn DEBUG on
to help in finding the problem? (Turning DEBUG on globally seems too much).
Will try to minimize the code and post it.

One more point that I notice is the error doesn't stay on one single
operator but changes from time to time (even within the same build). For
example, the previous exception I quoted was with a Window operator, while
the one below is with CoStreamFlatMap.

Thanks and best regards,
Averell

Caused by: java.lang.Exception: Exception while creating
StreamOperatorStateContext.
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:192)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:250)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.util.FlinkException: Could not restore keyed
state backend for *CoStreamFlatMap*_68cd726422cf10170c4d6c7fd52ed309_(12/64)
from any of the 1 provided restore options.
at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:137)
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:279)
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:133)
... 5 more
Caused by: java.lang.IllegalStateException: Unexpected key-group in restore.
at 
org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
at
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.readStateHandleStateData(HeapKeyedStateBackend.java:475)
at
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restorePartitionedState(HeapKeyedStateBackend.java:438)
at
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:377)
at
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:105)
at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:151)
at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:123)
... 7 more






--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Error restoring from savepoint while there's no modification to the job

2018-10-15 Thread Stefan Richter
Hi,

I think it is rather unlikely that this is the problem because it should  give 
a different kind of exception. Would it be possible to provide a minimal and 
self-contained example code for a problematic job?

Best,
Stefan

> On 15. Oct 2018, at 08:29, Averell  wrote:
> 
> Hi everyone,
> 
> In the StreamExecutionEnvironment.createFileInput method, a file source is
> created as following:
>   /SingleOutputStreamOperator source = 
> *addSource*(monitoringFunction,
> sourceName)
>   .*transform*("Split Reader: " + sourceName, 
> typeInfo, reader);/
> 
> Does this create two different operators? If yes, then it seems impossible
> to assign a UID to the 1st operator. And might it be the cause for my
> problem?
> 
> Thanks and best regards,
> Averell
> 
> 
> 
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



Re: Error restoring from savepoint while there's no modification to the job

2018-10-15 Thread Kostas Kloudas
Hi Averell,

This could be the root cause of your problem!
Thanks for digging into it.

Would it be possible for you to verify that this is your problem by manually 
setting 
the UUID and seeing if the problem disappears? In addition, please file a JIRA.

Thanks a lot,
Kostas
 
> On Oct 15, 2018, at 8:29 AM, Averell  wrote:
> 
> Hi everyone,
> 
> In the StreamExecutionEnvironment.createFileInput method, a file source is
> created as following:
>   /SingleOutputStreamOperator source = 
> *addSource*(monitoringFunction,
> sourceName)
>   .*transform*("Split Reader: " + sourceName, 
> typeInfo, reader);/
> 
> Does this create two different operators? If yes, then it seems impossible
> to assign a UID to the 1st operator. And might it be the cause for my
> problem?
> 
> Thanks and best regards,
> Averell
> 
> 
> 
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



Re: Flink 1.4: Queryable State Client

2018-10-15 Thread Kostas Kloudas
Hi Seye,

Thanks for digging into the problem.

As Vino and Jorn suggested, this looks like a bug and please file a JIRA issue.
It would be also nice if you could post it  here so that we know the related 
discussion.

Cheers,
Kostas

> On Oct 14, 2018, at 9:46 AM, Jörn Franke  wrote:
> 
> You have to file an issue. One workaround to see if this really fixes your 
> problem could be to use reflection to mark this method as public and then 
> call it (it is of course nothing for production code). You can also try a 
> newer Flink version.
> 
>> Am 13.10.2018 um 18:02 schrieb Seye Jin :
>> 
>> I recently upgraded to flink 1.4 from 1.3 and leverage Queryable State 
>> client in my application. I have 1 jm and 5 tm all serviced behind 
>> kubernetes. A large state is built and distributed evenly across task 
>> mangers and the client can query state for specified key
>> 
>> Issue: if a task manager dies and a new one gets spun up(automatically) and 
>> the QS states successfully recover in new nodes/task slots. I start to get 
>> time out exception when the client tries to query for key, even if I try to 
>> reset or re-deploy the client jobs
>> 
>> I have been trying to triage this and figure out a way to remediate this 
>> issue and I found that in KvStateClientProxyHandler which is not exposed in 
>> code, there is a forceUpdate flag that can help reset KvStateLocations(plus 
>> inetAddresses) but the default is false and can't be overriden
>> 
>> I was wandering if anyone knows how to remediate this kind of issue or if 
>> there is a way to have the jobmanager know that the task manager location in 
>> cache is no more valid.
>> 
>> Any tip to resolve this will be appreciated (I can't downgrade back to 1.3 
>> or upgrade from 1.4)
>> 



Re: Flink1.6 Confirm that flink supports the plan function?

2018-10-15 Thread Amit Jain
Hi,

2) You may also want to look into ParameterTool[1] class to parse and read
passed properties file [2].

--
Thanks,
Amit

[1]
https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/api/java/utils/ParameterTool.html
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/best_practices.html#getting-your-configuration-values-into-the-parametertool

On Mon, Oct 15, 2018 at 1:28 PM Till Rohrmann  wrote:

> Hi,
>
> 1) you currently cannot merge multiple jobs into one after they have been
> submitted. What you can do though, is to combine multiple jobs in your
> Flink program before you submit it.
>
> 2) you can pass program arguments when you submit your job. After it
> has been submitted, it is no longer possible to change the command line
> arguments.
>
> Cheers,
> Till
>
> On Mon, Oct 15, 2018 at 9:11 AM wangziyu <2375900...@qq.com> wrote:
>
>> Dear Friend:
>>   Now ,I am a learn flink for 20 days.I would to trouble
>> friends
>> to help solve two problems.
>> Questions are as follows:
>>   1. If I have some jobs,How can I merge the some jobs to One
>> that convenient for me to manage?
>> I have look for some restful api in
>> "
>> https://ci.apache.org/projects/flink/flink-docs-release-1.6/monitoring/rest_api.html
>> "。I
>> see "/jars/:jarid/plan" it seem say "Returns the dataflow plan of a job
>> contained in a jar previously uploaded via '/jars/upload'."I think it is
>> not
>> my purpose.
>>   2.When I run a job,I need pass in several  parameters.For
>> example "./flink run -d -c streaming.Kafka010NumCountConsumer
>> /ziyu/flink/kafkaFlink-1.0-SNAPSHOT.jar h1 /ziyu/h1.txt" .Now If I have
>> know
>> JobId,Can I get the job pass in several  parameters by java.I think it is
>> has some interface can use,But I can't get it.
>>  That is all.Can you help me that give me some
>> information.Thanks so mach.
>>
>>
>>
>>
>>
>>
>> --
>> Sent from:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>
>


Re: Flink1.6 Confirm that flink supports the plan function?

2018-10-15 Thread Till Rohrmann
Hi,

1) you currently cannot merge multiple jobs into one after they have been
submitted. What you can do though, is to combine multiple jobs in your
Flink program before you submit it.

2) you can pass program arguments when you submit your job. After it
has been submitted, it is no longer possible to change the command line
arguments.

Cheers,
Till

On Mon, Oct 15, 2018 at 9:11 AM wangziyu <2375900...@qq.com> wrote:

> Dear Friend:
>   Now ,I am a learn flink for 20 days.I would to trouble
> friends
> to help solve two problems.
> Questions are as follows:
>   1. If I have some jobs,How can I merge the some jobs to One
> that convenient for me to manage?
> I have look for some restful api in
> "
> https://ci.apache.org/projects/flink/flink-docs-release-1.6/monitoring/rest_api.html
> "。I
> see "/jars/:jarid/plan" it seem say "Returns the dataflow plan of a job
> contained in a jar previously uploaded via '/jars/upload'."I think it is
> not
> my purpose.
>   2.When I run a job,I need pass in several  parameters.For
> example "./flink run -d -c streaming.Kafka010NumCountConsumer
> /ziyu/flink/kafkaFlink-1.0-SNAPSHOT.jar h1 /ziyu/h1.txt" .Now If I have
> know
> JobId,Can I get the job pass in several  parameters by java.I think it is
> has some interface can use,But I can't get it.
>  That is all.Can you help me that give me some
> information.Thanks so mach.
>
>
>
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: When does Trigger.clear() get called?

2018-10-15 Thread Averell
Thank you Fabian.

All my doubts are cleared now.

Best regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: org.apache.flink.runtime.rpc.exceptions.FencingTokenException:

2018-10-15 Thread Till Rohrmann
This means that the Dispatcher has not set its leader session id which it
gets once gaining the leadership. This can also happen if the Dispatcher
just lost its leadership after you've sent the message. This problem should
resolve itself once the new leadership information has been propagated.

Cheers,
Till

On Fri, Oct 12, 2018 at 9:04 PM Samir Tusharbhai Chauhan <
samir.tusharbhai.chau...@prudential.com.sg> wrote:

> Hi Till,
>
>
>
> Can you tell when do I receive below error message?
>
>
>
> 2018-10-13 03:02:01,337 ERROR
> org.apache.flink.runtime.rest.handler.taskmanager.TaskManagersHandler  -
> Could not retrieve the redirect address.
>
> java.util.concurrent.CompletionException:
> org.apache.flink.runtime.rpc.exceptions.FencingTokenException: Fencing
> token not set: Ignoring message
> LocalFencedMessage(8b79d4540b45b3e622748b813d3a464b,
> LocalRpcInvocation(requestRestAddress(Time))) sent to akka.tcp://
> flink@127.0.0.1:50010/user/dispatcher because the fencing token is null.
>
>
>
> Warm Regards,
>
> *Samir Chauhan*
>
>
>
> *From:* Till Rohrmann [mailto:trohrm...@apache.org]
> *Sent:* Sunday, October 07, 2018 1:24 AM
> *To:* Samir Tusharbhai Chauhan  >
> *Cc:* user 
> *Subject:* Re:
> org.apache.flink.runtime.rpc.exceptions.FencingTokenException:
>
>
>
> Hi Samir,
>
>
>
> 1. In your setup (not running on top of Yarn or Mesos) you need to set the
> jobmanager.rpc.address such that the JM process knows where to bind to. The
> other components use ZooKeeper to find out the addresses. The other
> properties should not be needed.
>
> 3. You can take a look at the ZooKeeper leader latch node. Alternatively,
> you can take a look at the address to which you are redirected when
> accessing the web UI.
>
> 4.
> https://ci.apache.org/projects/flink/flink-docs-master/ops/security-ssl.html
> 
>
>
>
> Cheers,
>
> Till
>
>
>
> On Sat, Oct 6, 2018 at 5:57 PM Samir Tusharbhai Chauhan <
> samir.tusharbhai.chau...@prudential.com.sg> wrote:
>
> Hi Till,
>
>
>
> Thanks for identifying the issue. My cluster is up and running now.
>
>
>
> I have few queries. Can you have to anwer that?
>
>
>
>1. Do I need to set below properties in my cluster?
>
> jobmanager.rpc.address
>
> rest.address
>
> rest.bind-address
>
> jobmanager.web.address
>
>1. Is there anything I should be take care while setting it up?
>2. How do I know which job manager is active?
>3. How do I secure my cluster?
>
>
>
> Samir Chauhan
>
>
>
> *From:* Till Rohrmann [mailto:trohrm...@apache.org]
> *Sent:* Friday, October 05, 2018 11:09 PM
> *To:* Samir Tusharbhai Chauhan  >
> *Cc:* user 
> *Subject:* Re:
> org.apache.flink.runtime.rpc.exceptions.FencingTokenException:
>
>
>
> Hi Samir,
>
>
>
> could you share the logs of the two JMs and the log where you saw the
> FencingTokenException with us?
>
>
>
> It looks to me as if the TM had an outdated fencing token (an outdated
> leader session id) with which it contacted the ResourceManager. This can
> happen and the TM should try to reconnect to the RM once it learns about
> the new leader session id via ZooKeeper. You could, for example check in
> ZooKeeper that it contains the valid leader information.
>
>
>
> Cheers,
>
> Till
>
>
>
> On Fri, Oct 5, 2018 at 9:58 AM Samir Tusharbhai Chauhan <
> samir.tusharbhai.chau...@prudential.com.sg> wrote:
>
> Hi,
>
>
>
> I am having issue in setting up cluster for Flink. I have 2 nodes for Job
> Manager and 2 nodes for Task Manager.
>
>
>
> My configuration file looks like this.
>
>
>
> jobmanager.rpc.port: 6123
>
> jobmanager.heap.size: 2048m
>
> taskmanager.heap.size: 2048m
>
> taskmanager.numberOfTaskSlots: 64
>
> parallelism.default: 1
>
> rest.port: 8081
>
> high-availability.jobmanager.port: 50010
>
> high-availability: zookeeper
>
> high-availability.storageDir: file:///sharedflink/state_dir/ha/
>
> high-availability.zookeeper.quorum: host1:2181,host2:2181,host3:2181
>
> high-availability.zookeeper.path.root: /flink
>
> high-availability.cluster-id: /flick_ns
>
>
>
> state.backend: rocksdb
>
> state.checkpoints.dir: file:///sharedflink/state_dir/backend
>
> state.savepoints.dir: file:///sharedflink/state_dir/savepoint
>
> state.backend.incremental: false
>
> state.backend.rocksdb.timer-service.factory: rocksdb
>
> state.backend.local-recovery: false
>
>
>
> But when I start services, I get this error message.
>
>
>
> 

Re: When does Trigger.clear() get called?

2018-10-15 Thread Fabian Hueske
Hi,

Re Q1: The main purpose of the Trigger.clean() method is to remove all
custom state of the Trigger. State must be explicitly removed, otherwise
the program leaks memory.
Re Q3: If you are using a keyed stream, you need to manually clean up the
state by calling State.clear(). If you are using a ProcessFunction, you can
do that in processElement() or register a timer and clean up in onTimer().

Best, Fabian

Am So., 14. Okt. 2018 um 06:06 Uhr schrieb Averell :

> Hello Hequn,
>
> Thanks for the answers.
> Regarding question no.2, I am now clear.
> Regarding question no.1, does your answer apply to those custom states as
> well? This concern of mine came from Flink's implementation of
> CountTrigger,
> in which a custom state is being cleared explicitly in Trigger.clear():
>
> /   public void clear(W window, TriggerContext ctx) throws Exception {
> ctx.getPartitionedState(stateDesc).clear();
> }
> /
>
> My 3rd question was for ordinary, non-windowed keyed streams, where I don't
> see in Flink's document any mention of using Trigger, so how can I clear
> those streams?
>
> Thank you very much for your help.
> Regards,
> Averell
>
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: Taskmanager times out continuously for registration with Jobmanager

2018-10-15 Thread Till Rohrmann
Hi Abdul,

in Flink 1.4 we use Akka's death watch to detect no longer reachable hosts.
The downside of the death watch mechanism is that hosts which were detected
to be dead are being quarantined. Once in this state you need to restart
the ActorSystem in order to receive messages again. The idea behind this is
to not let the system go into an inconsistent state. You can mitigate this
problem by setting the death watch settings to higher values.

Cheers,
Till

On Fri, Oct 12, 2018 at 11:27 PM Abdul Qadeer  wrote:

> We were able to fix it by passing IP address instead of hostname for actor
> system listen address when starting taskmanager:
>
> def runTaskManager(
> taskManagerHostname: String,
> resourceID: ResourceID,
> actorSystemPort: Int,
> configuration: Configuration,
> highAvailabilityServices: HighAvailabilityServices)
> : Unit = {
>
>
> The following log message at jobmanager gave some clue:
>
> {"timeMillis":1539297842333,"thread":"jobmanager-future-thread-2","level":"DEBUG","loggerName":"org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher","message":"Could
>  not retrieve 
> QueryServiceGateway.","thrown":{"commonElementCount":0,"localizedMessage":"akka.actor.ActorNotFound:
>  Actor not found for: 
> ActorSelection[Anchor(akka.tcp://flink@taskmgr-6b59f97748-fmgwn:8070/), 
> Path(/user/MetricQueryService_5261ccab66b86b53a4edd64f26c1f282)]"...
>
> ...
>
>
> We figured there is some problem with hostname resolution after the actor is 
> quarantined, would you know why this happens? Is it some cache problem in 
> Flink or Akka code JobManager is using?
>
>
> On Fri, Oct 12, 2018 at 1:05 AM Till Rohrmann 
> wrote:
>
>> It is hard to tell without all logs but it could easily be a K8s setup
>> problem. Also problematic is that you are running a Flink version which is
>> no longer actively supported. Try at least to use the latest bug fix
>> release for 1.4.
>>
>> Cheers,
>> Till
>>
>> On Fri, Oct 12, 2018, 09:43 Abdul Qadeer  wrote:
>>
>>> Hi Till,
>>>
>>> A few more data points:
>>>
>>> In a rerun of the same versions with fresh deployment, I see *log*
>>> .debug(*s"RegisterTaskManager: $*msg*"*) in JobManager, however the
>>> *AcknowledgeRegistration/AlreadyRegistered *messages are never sent, I
>>> have taken tcpdump for the taskmanager which doesn't recover and compared
>>> it with another taskmanager which recovers after restart (i.e. receives
>>> *AcknowledgeRegistration *message).
>>>
>>> Restarting the docker container of bad taskmanager doesn't work. The
>>> only workaround right now is to delete the kubernetes pod holding the bad
>>> taskmanager container. Does it have to do something with the akka address
>>> the jobmanager stores for a taskmanager? The only variable I see between
>>> restarting container vs pod is the change in the akka address.
>>>
>>> Also, the infinite retries for registration start after the taskmanager
>>> container restarts with Jobmanager actor system quarantined:
>>>
>>> {"timeMillis":1539282282329,"thread":"flink-akka.actor.default-dispatcher-3","level":"ERROR","loggerName":"org.apache.flink.runtime.taskmanager.TaskManager","message":"The
>>> actor system akka.tcp://flink@taskmgr-6b59f97748-fmgwn:8070 has
>>> quarantined the remote actor system akka.tcp://flink@192.168.83.52:6123.
>>> Shutting the actor system down to be able to reestablish a
>>> connection!","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","threadId":49,"threadPriority":5}
>>>
>>>
>>> A manual restart by docker restart or killing the JVM doesn't reproduce
>>> this problem.
>>>
>>> On Thu, Oct 11, 2018 at 11:15 AM Abdul Qadeer 
>>> wrote:
>>>
 Hi Till,

 I didn't try with newer versions as it is not possible to update the
 Flink version atm.
 If you could give any pointers for debugging that would be great.

 On Thu, Oct 11, 2018 at 2:44 AM Till Rohrmann 
 wrote:

> Hi Abdul,
>
> have you tried whether this problem also occurs with newer Flink
> versions (1.5.4 or 1.6.1)?
>
> Cheers,
> Till
>
> On Thu, Oct 11, 2018 at 9:24 AM Dawid Wysakowicz <
> dwysakow...@apache.org> wrote:
>
>> Hi Abdul,
>>
>> I've added Till and Gary to cc, who might be able to help you.
>>
>> Best,
>>
>> Dawid
>>
>> On 11/10/18 03:05, Abdul Qadeer wrote:
>>
>> Hi,
>>
>>
>> We are facing an issue in standalone HA mode in Flink 1.4.0 where
>> Taskmanager restarts and is not able to register with the Jobmanager. It
>> times out awaiting *AcknowledgeRegistration/AlreadyRegistered*
>> message from Jobmanager Actor and keeps sending *RegisterTaskManager
>> *message. The logs at Jobmanager don’t show anything about
>> registration failure/request. It doesn’t print 
>> *log*.debug(*s"RegisterTaskManager:
>> $*msg*"*) (from 

Flink1.6 Confirm that flink supports the plan function?

2018-10-15 Thread wangziyu
Dear Friend: 
  Now ,I am a learn flink for 20 days.I would to trouble friends
to help solve two problems.
Questions are as follows:
  1. If I have some jobs,How can I merge the some jobs to One
that convenient for me to manage?
I have look for some restful api in
"https://ci.apache.org/projects/flink/flink-docs-release-1.6/monitoring/rest_api.html"。I
see "/jars/:jarid/plan" it seem say "Returns the dataflow plan of a job
contained in a jar previously uploaded via '/jars/upload'."I think it is not
my purpose.
  2.When I run a job,I need pass in several  parameters.For
example "./flink run -d -c streaming.Kafka010NumCountConsumer
/ziyu/flink/kafkaFlink-1.0-SNAPSHOT.jar h1 /ziyu/h1.txt" .Now If I have know
JobId,Can I get the job pass in several  parameters by java.I think it is
has some interface can use,But I can't get it.
 That is all.Can you help me that give me some
information.Thanks so mach.






--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Flink streaming-job with redis sink: SocketTimeoutException

2018-10-15 Thread Amit Jain
Hi Marke,

Stacktrace suggests it is more of a Redis connection issue rather than
something with Flink. Could you share JedisPool configuration of Redis
sink? Are you writing into Redis in continuity or some bulk logic? Looks
like Redis connections are getting timeout here.

--
Thanks,
Amit

On Mon, Oct 15, 2018 at 12:19 PM Marke Builder 
wrote:

> Hi,
>
> what can be the reasons for the following exceptions.
> We are using flink with a redis sink, but from time to time the flink job
> failed with the follwing excpetions.
>
> Thanks, Builder.
>
>
> 10/13/2018 15:37:48 Flat Map -> (Sink: Unnamed, Sink: Unnamed)(9/10)
> switched to FAILED
> redis.clients.jedis.exceptions.JedisConnectionException:
> java.net.SocketTimeoutException: Read timed out
> at
> redis.clients.util.RedisInputStream.ensureFill(RedisInputStream.java:201)
> at
> redis.clients.util.RedisInputStream.readByte(RedisInputStream.java:40)
> at redis.clients.jedis.Protocol.process(Protocol.java:141)
> at redis.clients.jedis.Protocol.read(Protocol.java:205)
> at
> redis.clients.jedis.Connection.readProtocolWithCheckingBroken(Connection.java:297)
> at
> redis.clients.jedis.Connection.getStatusCodeReply(Connection.java:196)
> at redis.clients.jedis.Jedis.set(Jedis.java:69)
> at
> org.apache.flink.streaming.connectors.redis.common.container.RedisContainer.set(RedisContainer.java:178)
> at
> org.apache.flink.streaming.connectors.redis.RedisSink.invoke(RedisSink.java:143)
> at
> org.apache.flink.streaming.api.functions.sink.SinkFunction.invoke(SinkFunction.java:52)
> at
> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:549)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:611)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:572)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:830)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:808)
> at
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
> at
> com.voith.cloud.cache.operators.FlatMapMeasurements.flatMap(FlatMapMeasurements.java:27)
> at
> com.voith.cloud.cache.operators.FlatMapMeasurements.flatMap(FlatMapMeasurements.java:12)
> at
> org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:50)
> at
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:207)
> at
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.net.SocketTimeoutException: Read timed out
> at java.net.SocketInputStream.socketRead0(Native Method)
> at
> java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
> at java.net.SocketInputStream.read(SocketInputStream.java:171)
> at java.net.SocketInputStream.read(SocketInputStream.java:141)
> at java.net.SocketInputStream.read(SocketInputStream.java:127)
> at
> redis.clients.util.RedisInputStream.ensureFill(RedisInputStream.java:195)
> ... 26 more
>
>


Flink streaming-job with redis sink: SocketTimeoutException

2018-10-15 Thread Marke Builder
Hi,

what can be the reasons for the following exceptions.
We are using flink with a redis sink, but from time to time the flink job
failed with the follwing excpetions.

Thanks, Builder.


10/13/2018 15:37:48 Flat Map -> (Sink: Unnamed, Sink: Unnamed)(9/10)
switched to FAILED
redis.clients.jedis.exceptions.JedisConnectionException:
java.net.SocketTimeoutException: Read timed out
at
redis.clients.util.RedisInputStream.ensureFill(RedisInputStream.java:201)
at
redis.clients.util.RedisInputStream.readByte(RedisInputStream.java:40)
at redis.clients.jedis.Protocol.process(Protocol.java:141)
at redis.clients.jedis.Protocol.read(Protocol.java:205)
at
redis.clients.jedis.Connection.readProtocolWithCheckingBroken(Connection.java:297)
at
redis.clients.jedis.Connection.getStatusCodeReply(Connection.java:196)
at redis.clients.jedis.Jedis.set(Jedis.java:69)
at
org.apache.flink.streaming.connectors.redis.common.container.RedisContainer.set(RedisContainer.java:178)
at
org.apache.flink.streaming.connectors.redis.RedisSink.invoke(RedisSink.java:143)
at
org.apache.flink.streaming.api.functions.sink.SinkFunction.invoke(SinkFunction.java:52)
at
org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:549)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:611)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:572)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:830)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:808)
at
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
at
com.voith.cloud.cache.operators.FlatMapMeasurements.flatMap(FlatMapMeasurements.java:27)
at
com.voith.cloud.cache.operators.FlatMapMeasurements.flatMap(FlatMapMeasurements.java:12)
at
org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:50)
at
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:207)
at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.net.SocketTimeoutException: Read timed out
at java.net.SocketInputStream.socketRead0(Native Method)
at java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
at java.net.SocketInputStream.read(SocketInputStream.java:171)
at java.net.SocketInputStream.read(SocketInputStream.java:141)
at java.net.SocketInputStream.read(SocketInputStream.java:127)
at
redis.clients.util.RedisInputStream.ensureFill(RedisInputStream.java:195)
... 26 more


Re: Error restoring from savepoint while there's no modification to the job

2018-10-15 Thread Averell
Hi everyone,

In the StreamExecutionEnvironment.createFileInput method, a file source is
created as following:
/SingleOutputStreamOperator source = 
*addSource*(monitoringFunction,
sourceName)
.*transform*("Split Reader: " + sourceName, 
typeInfo, reader);/

Does this create two different operators? If yes, then it seems impossible
to assign a UID to the 1st operator. And might it be the cause for my
problem?

Thanks and best regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/