Re: Task manager count goes the expand then converge process when running flink on YARN

2018-10-24 Thread vino yang
Hi Henry,

The phenomenon you expressed is there, this is a bug, but I can't remember
its JIRA number.

Thanks, vino.

徐涛  于2018年10月24日周三 下午11:27写道:

> Hi experts
> I am running flink job on YARN in job cluster mode, the job is divided
> into 2 tasks, the following are some configs of the job:
> parallelism.default => 16
> taskmanager.numberOfTaskSlots => 8
> -yn => 2
>
> when the program starts, I found that the count of task managers is not
> set immediately, but first expand then converge, I record the number during
> the process:
> Task Managers Task Slots Available Task Slots
> 1. 14  10488
> 2. 15 120104
> 3. 16 128112
> 4. 6   48  32
> 5. 3   24  8
> 6. 2   16  0
>
> The final state is correct. There are 2 tasks, 32 subtask in total, due to
> slot sharing, only 16 slots are enough, the number of task slots per TM are
> 8, so 2 TMs are needed.
> I have the following question:
> *Because I specify yn=2, why does not directly allocate 2 TMs, but goes
> the expand then converge process?  Why does it apply 16 task managers at
> most? If it is not a must, how to avoid it?*
>
> Thanks a lot!
>
> Best
> Henry
>


Re: FlinkCEP, circular references and checkpointing failures

2018-10-24 Thread Shailesh Jain
Hi Dawid,

I've upgraded to flink 1.6.1 and rebased by changes against the tag 1.6.1,
the only commit on top of 1.6 is this:
https://github.com/jainshailesh/flink/commit/797e3c4af5b28263fd98fb79daaba97cabf3392c

I ran two separate identical jobs (with and without checkpointing enabled),
I'm hitting a ArrayIndexOutOfBoundsException (and sometimes NPE) *only when
checkpointing (HDFS backend) is enabled*, with the below stack trace.

I did see a similar problem with different operators here (
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/IndexOutOfBoundsException-on-deserialization-after-updating-to-1-6-1-td23933.html).
Is this a known issue which is getting addressed?

Any ideas on what could be causing this?

Thanks,
Shailesh


2018-10-24 17:04:13,365 INFO
org.apache.flink.runtime.taskmanager.Task -
SelectCepOperatorMixedTime (1/1) - SelectCepOperatorMixedTime (1/1)
(3d984b7919342a3886593401088ca2cd) switched from RUNNING to FAILED.
org.apache.flink.util.FlinkRuntimeException: Failure happened in filter
function.
at org.apache.flink.cep.nfa.NFA.createDecisionGraph(NFA.java:731)
at org.apache.flink.cep.nfa.NFA.computeNextStates(NFA.java:541)
at org.apache.flink.cep.nfa.NFA.doProcess(NFA.java:284)
at org.apache.flink.cep.nfa.NFA.process(NFA.java:220)
at
org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.processEvent(AbstractKeyedCEPPatternOperator.java:379)
at
org.apache.flink.cep.operator.AbstractKeyedCEPPatternMixedTimeApproachOperator.processElement(AbstractKeyedCEPPatternMixedTimeApproachOperator.java:45)
at
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.util.WrappingRuntimeException:
java.lang.ArrayIndexOutOfBoundsException: -1
at
org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.lambda$materializeMatch$1(SharedBuffer.java:305)
at java.util.HashMap.computeIfAbsent(HashMap.java:1127)
at
org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.materializeMatch(SharedBuffer.java:301)
at
org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.materializeMatch(SharedBuffer.java:291)
at
org.apache.flink.cep.nfa.NFA$ConditionContext.getEventsForPattern(NFA.java:811)
at
com.stellapps.contrakcep.flink.patterns.critical.AgitatorMalfunctionAfterChillingPattern$1.filter(AgitatorMalfunctionAfterChillingPattern.java:70)
at
com.stellapps.contrakcep.flink.patterns.critical.AgitatorMalfunctionAfterChillingPattern$1.filter(AgitatorMalfunctionAfterChillingPattern.java:62)
at org.apache.flink.cep.nfa.NFA.checkFilterCondition(NFA.java:742)
at org.apache.flink.cep.nfa.NFA.createDecisionGraph(NFA.java:716)
... 10 more
Caused by: java.lang.ArrayIndexOutOfBoundsException: -1
at com.esotericsoftware.kryo.util.IntArray.add(IntArray.java:61)
at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:800)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:655)
at
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:231)
at
org.apache.flink.cep.nfa.sharedbuffer.Lockable$LockableTypeSerializer.copy(Lockable.java:120)
at
org.apache.flink.cep.nfa.sharedbuffer.Lockable$LockableTypeSerializer.copy(Lockable.java:95)
at
org.apache.flink.api.common.typeutils.base.MapSerializer.copy(MapSerializer.java:113)
at
org.apache.flink.api.common.typeutils.base.MapSerializer.copy(MapSerializer.java:49)
at
org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:287)
at
org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:311)
at
org.apache.flink.runtime.state.heap.HeapMapState.get(HeapMapState.java:85)
at
org.apache.flink.runtime.state.UserFacingMapState.get(UserFacingMapState.java:47)
at
org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.lambda$materializeMatch$1(SharedBuffer.java:303)
... 18 more

On Fri, Sep 28, 2018 at 11:00 AM Shailesh Jain 
wrote:

> Hi Dawid,
>
> Thanks for your time on this. The diff should have pointed out only the
> top 3 commits, but since it did not, it is possible I did not rebase my
> branch against 1.4.2 correctly. I'll check this out and get back to you if
> I hit the same issue again.
>
> Thanks again,
> Shailesh
>
> On Thu, Sep 27, 2018 at 1:00 PM Dawid Wysakowicz 
> wrote:
>
>> Hi Shailesh,
>>
>> I am afraid it is gonna be hard to help you, as this branch differs
>> significantly from 1.4.2 release (I've done diff across your 

RocksDB State Backend Exception

2018-10-24 Thread Ning Shi
Hi,

We are doing some performance testing on a 12 node cluster with 8 task
slots per TM. Every 15 minutes or so, the job would run into the
following exception.

java.lang.IllegalArgumentException: Illegal value provided for SubCode.
at org.rocksdb.Status$SubCode.getSubCode(Status.java:109)
at org.rocksdb.Status.(Status.java:30)
at org.rocksdb.RocksDB.put(Native Method)
at org.rocksdb.RocksDB.put(RocksDB.java:511)
at 
org.apache.flink.contrib.streaming.state.AbstractRocksDBAppendingState.updateInternal(AbstractRocksDBAppendingState.java:80)
at 
org.apache.flink.contrib.streaming.state.RocksDBReducingState.add(RocksDBReducingState.java:99)
at 
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:358)
at 
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:745)

I saw an outstanding issue with similar exception in [1]. The ticket
description suggests that it was due to out of disk error, but in our
case, we have plenty of disk left on all TMs.

Has anyone run into this before? If so, is there a fix or workaround?

Thanks,

[1] https://issues.apache.org/jira/browse/FLINK-9233

--
Ning


Re: [DISCUSS] Integrate Flink SQL well with Hive ecosystem

2018-10-24 Thread Zhang, Xuefu
Hi all,

To wrap up the discussion, I have attached a PDF describing the proposal, which 
is also attached to FLINK-10556 [1]. Please feel free to watch that JIRA to 
track the progress.

Please also let me know if you have additional comments or questions.

Thanks,
Xuefu

[1] https://issues.apache.org/jira/browse/FLINK-10556



--
Sender:Xuefu 
Sent at:2018 Oct 16 (Tue) 03:40
Recipient:Shuyi Chen 
Cc:yanghua1127 ; Fabian Hueske ; dev 
; user 
Subject:Re: [DISCUSS] Integrate Flink SQL well with Hive ecosystem

Hi Shuyi,

Thank you for your input. Yes, I agreed with a phased approach and like to move 
forward fast. :) We did some work internally on DDL utilizing babel parser in 
Calcite. While babel makes Calcite's grammar extensible, at first impression it 
still seems too cumbersome for a project when too much extensions are made. 
It's even challenging to find where the extension is needed! It would be 
certainly better if Calcite can magically support Hive QL by just turning on a 
flag, such as that for MYSQL_5. I can also see that this could mean a lot of 
work on Calcite. Nevertheless, I will bring up the discussion over there and to 
see what their community thinks.

Would mind to share more info about the proposal on DDL that you mentioned? We 
can certainly collaborate on this.

Thanks,
Xuefu


--
Sender:Shuyi Chen 
Sent at:2018 Oct 14 (Sun) 08:30
Recipient:Xuefu 
Cc:yanghua1127 ; Fabian Hueske ; dev 
; user 
Subject:Re: [DISCUSS] Integrate Flink SQL well with Hive ecosystem

Welcome to the community and thanks for the great proposal, Xuefu! I think the 
proposal can be divided into 2 stages: making Flink to support Hive features, 
and make Hive to work with Flink. I agreed with Timo that on starting with a 
smaller scope, so we can make progress faster. As for [6], a proposal for DDL 
is already in progress, and will come after the unified SQL connector API is 
done. For supporting Hive syntax, we might need to work with the Calcite 
community, and a recent effort called babel 
(https://issues.apache.org/jira/browse/CALCITE-2280) in Calcite might help here.

Thanks
Shuyi
On Wed, Oct 10, 2018 at 8:02 PM Zhang, Xuefu  wrote:
Hi Fabian/Vno,

Thank you very much for your encouragement inquiry. Sorry that I didn't see 
Fabian's email until I read Vino's response just now. (Somehow Fabian's went to 
the spam folder.)

My proposal contains long-term and short-terms goals. Nevertheless, the effort 
will focus on the following areas, including Fabian's list:

1. Hive metastore connectivity - This covers both read/write access, which 
means Flink can make full use of Hive's metastore as its catalog (at least for 
the batch but can extend for streaming as well).
2. Metadata compatibility - Objects (databases, tables, partitions, etc) 
created by Hive can be understood by Flink and the reverse direction is true 
also.
3. Data compatibility - Similar to #2, data produced by Hive can be consumed by 
Flink and vise versa.
4. Support Hive UDFs - For all Hive's native udfs, Flink either provides its 
own implementation or make Hive's implementation work in Flink. Further, for 
user created UDFs in Hive, Flink SQL should provide a mechanism allowing user 
to import them into Flink without any code change required.
5. Data types -  Flink SQL should support all data types that are available in 
Hive.
6. SQL Language - Flink SQL should support SQL standard (such as SQL2003) with 
extension to support Hive's syntax and language features, around DDL, DML, and 
SELECT queries.
7.  SQL CLI - this is currently developing in Flink but more effort is needed.
8. Server - provide a server that's compatible with Hive's HiverServer2 in 
thrift APIs, such that HiveServer2 users can reuse their existing client (such 
as beeline) but connect to Flink's thrift server instead.
9. JDBC/ODBC drivers - Flink may provide its own JDBC/ODBC drivers for other 
application to use to connect to its thrift server
10. Support other user's customizations in Hive, such as Hive Serdes, storage 
handlers, etc.
11. Better task failure tolerance and task scheduling at Flink runtime.

As you can see, achieving all those requires significant effort and across all 
layers in Flink. However, a short-term goal could  include only core areas 
(such as 1, 2, 4, 5, 6, 7) or start  at a smaller scope (such as #3, #6).

Please share your further thoughts. If we generally agree that this is the 
right direction, I could come up with a formal proposal quickly and then we can 
follow up with broader discussions.

Thanks,
Xuefu



--
Sender:vino yang 
Sent at:2018 Oct 11 (Thu) 09:45
Recipient:Fabian Hueske 
Cc:dev ; Xuefu ; user 

Subject:Re: [DISCUSS] Integrate Flink SQL well with Hive ecosystem

Hi Xuefu,

Appreciate this proposal, and like Fabian, it would look better if 

Re: Dynamically Generated Classes - Cannot load user class

2018-10-24 Thread shkob1
OK I think i figured it out - not sure though exactly the reason:

It seems that i need to have a stream type - Generic Type of the super class
- rather than a Pojo of the concrete generated class. It seems like the
operation definition otherwise cannot load the Pojo class on the task
creation.
So - if i don't declare the map produced type as the concrete generated
class and then work around the keyby which cannot use a field name to a key
selector. 
Doing all of that seems to work. Will be happy to hear about the reason for
it more in depth if anyone knows.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Question over Incremental Snapshot vs Full Snapshot in rocksDb state backend

2018-10-24 Thread chandan prakash
Thanks Tzu-Li for redirecting.
Would also like to be corrected if my any inference from the code is
incorrect or incomplete.
I am sure it will help to clear doubts of more developers like me  :)
Thanks in advance.

Regards,
Chandan


On Wed, Oct 24, 2018 at 9:19 PM Tzu-Li (Gordon) Tai 
wrote:

> Hi,
>
> I’m forwarding this question to Stefan (cc’ed).
> He would most likely be able to answer your question, as he has done
> substantial work in the RocksDB state backends.
>
> Cheers,
> Gordon
>
>
> On 24 October 2018 at 8:47:24 PM, chandan prakash (
> chandanbaran...@gmail.com) wrote:
>
> Hi,
> I am new to Flink.
> Was looking into the code to understand how Flink does FullSnapshot and
> Incremental Snapshot using RocksDB
>
> What I understood:
> 1. *For full snapshot, we call RocksDb snapshot api* which basically an
> iterator handle to the entries in RocksDB instance. We iterate over every
> entry one by one and serialize that to some distributed file system.
> Similarly in restore for fullSnapshot, we read the file to get every entry
> and apply that to the rocksDb instance one by one to fully construct the db
> instance.
>
> 2. On the other hand in *for Incremental Snapshot, we rely on RocksDB
> Checkpoint api* to copy the sst files to HDFS/S3 incrementally.
> Similarly on restore, we copy the sst files to local directory and
> instantiate rocksDB instance with the path of the directory.
>
> *My Question is:*
> 1. Why did we took 2 different approaches using different RocksDB apis ?
> We could have used Checkpoint api of RocksDB for fullSnapshot as well .
> 2. Is there any specific reason to use *Snapshot API of rocksDB*  over 
> *Checkpoint
> api of RocksDB* for *fullSnapshot*?
>
> I am sure, I am missing some important point, really curious to know that.
> Any explanation will be really great. Thanks in advance.
>
>
> Regards,
> Chandan
>
>
>
>
>
> --
> Chandan Prakash
>
>

-- 
Chandan Prakash


[ANNOUNCE] Flink Forward San Francisco 2019 - Call for Presentations is now open

2018-10-24 Thread Till Rohrmann
Hi everybody,

the Call for Presentations for Flink Forward San Francisco 2019 is now
open! Apply by November 30 to share your compelling Flink use case, best
practices, and latest developments with the community on April 1-2 in San
Francisco, CA.

Submit your proposal:
https://flink-forward.org/call-for-presentations-submit-talk

Cheers,
Till


Re: Fail to recover Keyed State afeter ReinterpretAsKeyedStream

2018-10-24 Thread Tzu-Li (Gordon) Tai
Hi Jose,

As far as I know, you should be able to use keyed state on a stream returned by 
DataStreamUtils.reinterpretAsKeyedStream function. That shouldn’t be the issue 
here.

Have you looked into the logs for any meaningful exceptions of why the restore 
failed?
That would be helpful here to understand whether or not this is a bug.

Cheers,
Gordon


On 24 October 2018 at 9:58:54 PM, Jose Cisneros (jacisnero...@googlemail.com) 
wrote:

Hi,

To avoid reshuffling in my job, I started using  
DataStreamUtils.reinterpretAsKeyedStream to avoid having to do another keyBy 
for the same key.  The BackEndState is RocksDB.

When the job recovers after a failure, the ProcessFunction after the keyBy 
restores its Keyed State correctly, while the Process function after 
reinterpretAsKeyedStream does not recover the Keyed State.

I have checked the data written by the checkpoints and there is a reference to 
the sate.

If I change and use keyBy instead of DataStreamUtils.reinterpretAsKeyedStream  
the Keyed State is recovered as expected.

Is the DataStreamUtils.reinterpretAsKeyedStream function not intended to use 
Keyed State? 

Thank you.
Regards,

Jose

Re: cannot find symbol of "fromargs"

2018-10-24 Thread Tzu-Li (Gordon) Tai
Hi!

How are you packaging your Flink program? This looks like a simple dependency 
error.
If you don’t know where to start when beginning to write your Flink program, 
the quickstart Maven templates are always a good place to begin with [1].

Cheers,
Gordon

[1] 
https://ci.apache.org/projects/flink/flink-docs-master/dev/projectsetup/java_api_quickstart.html
On 24 October 2018 at 9:32:29 PM, Mar_zieh (m.marzieh.ghas...@gmail.com) wrote:

Hello  

I am new in Flink. I want to write a program in stream processing. I added  
this line to my program:  
ParameterTool mmm = new ParameterTool.fromArgs(args);  

But I got this error:  

cannot find symbol of "fromargs"  

would you please let me know how to solve this error?  

Thank you in advance.  



--  
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ 
 


Re: Question over Incremental Snapshot vs Full Snapshot in rocksDb state backend

2018-10-24 Thread Tzu-Li (Gordon) Tai
Hi,

I’m forwarding this question to Stefan (cc’ed).
He would most likely be able to answer your question, as he has done 
substantial work in the RocksDB state backends.

Cheers,
Gordon


On 24 October 2018 at 8:47:24 PM, chandan prakash (chandanbaran...@gmail.com) 
wrote:

Hi,
I am new to Flink.
Was looking into the code to understand how Flink does FullSnapshot and 
Incremental Snapshot using RocksDB

What I understood:
1. For full snapshot, we call RocksDb snapshot api which basically an iterator 
handle to the entries in RocksDB instance. We iterate over every entry one by 
one and serialize that to some distributed file system. 
Similarly in restore for fullSnapshot, we read the file to get every entry and 
apply that to the rocksDb instance one by one to fully construct the db 
instance.

2. On the other hand in for Incremental Snapshot, we rely on RocksDB Checkpoint 
api to copy the sst files to HDFS/S3 incrementally.
Similarly on restore, we copy the sst files to local directory and instantiate 
rocksDB instance with the path of the directory.

My Question is:
1. Why did we took 2 different approaches using different RocksDB apis ?
We could have used Checkpoint api of RocksDB for fullSnapshot as well .
2. Is there any specific reason to use Snapshot API of rocksDB  over Checkpoint 
api of RocksDB for fullSnapshot?

I am sure, I am missing some important point, really curious to know that.
Any explanation will be really great. Thanks in advance.


Regards,
Chandan





--
Chandan Prakash



Re: Get request header from Kinesis

2018-10-24 Thread Tzu-Li (Gordon) Tai
Hi,

Could you point to the AWS Kinesis Java API that exposes record headers?
As far as I can tell from the Javadoc, I can’t seem to find how to retrieve 
headers from Kinesis records.

If there is a way to do that, then it might make sense to expose that from the 
Kinesis connector’s serialization / deserialization schema interfaces.

Cheers,
Gordon

On 23 October 2018 at 10:22:45 PM, Madhusudan Shastri 
(testmailmadhusu...@gmail.com) wrote:

Hi,
    I am using below code to read data from AWS Kinesis stream. But it is 
giving me the request body and not the request header. How to get the request 
header from Kinesis. My flink jar versions are:
flink-java - 1.6.1
flink-streaming-java_2.11 - 1.6.1
flink-connector-kinesis_2.11 - 1.6.1

My code is: 
Properties consumerConfig = new Properties();
consumerConfig.put(ConsumerConfigConstants.AWS_REGION, "");
consumerConfig.put(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "");
consumerConfig.put(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "");
consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST");
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
DataStream kinesis = env.addSource(
new FlinkKinesisConsumer("", new SimpleStringSchema(), 
consumerConfig));
kinesis.addSink(new SinkFunction() {
@Override
public void invoke(String value) throws Exception {
System.out.println("value= "+value);
}
});
env.execute();

Thanks and Regards,
Madhusudan B. Shastri

Guava conflict when using flink kinesis consumer with grpc protobuf

2018-10-24 Thread Vijay Balakrishnan
Hi,
I have a dependency on guava in grpc protobuf as follows:


com.google.guava
guava
26.0-jre

I also use Flink Kinesis Connector in the same project:

org.apache.flink
flink-connector-kinesis_${scala.binary.version}
${flink.version}

This Flink Kinesis connector has a dependency on an older version of guava
and this is causing issues with 2 versions of guava being loaded by the
classloaders. How do i avoid this issue ?

Details:
While building the Flink Kinesis Connector, I changed the pom.xml to try to
shade the guava library but this didn't help.It clearly says in the Flink
Kinesis Connector pom.xml to not shade guava.



com.google.protobuf
org.apache.flink.kinesis.shaded.com.google.protobuf
 .
Attempted following in my project's pom.xml but didn't work:




com.google.guava
org.apache.flink.kinesis.shaded.com.google.guava
 ..

Using JDK 8
TIA,


Task manager count goes the expand then converge process when running flink on YARN

2018-10-24 Thread 徐涛
Hi experts
I am running flink job on YARN in job cluster mode, the job is divided 
into 2 tasks, the following are some configs of the job:
parallelism.default => 16
taskmanager.numberOfTaskSlots => 8
-yn => 2

when the program starts, I found that the count of task managers is not 
set immediately, but first expand then converge, I record the number during the 
process:
Task Managers Task Slots Available Task Slots
1.  14   10488
2.  15 120104
3.  16 128112
4.   6   48  32
5.   3   24  8
6.   2   16  0

The final state is correct. There are 2 tasks, 32 subtask in total, due 
to slot sharing, only 16 slots are enough, the number of task slots per TM are 
8, so 2 TMs are needed.
I have the following question:
Because I specify yn=2, why does not directly allocate 2 TMs, but goes 
the expand then converge process?  Why does it apply 16 task managers at most? 
If it is not a must, how to avoid it?

Thanks a lot!

Best
Henry


Task manager count goes the expand then converge process when running flink on YARN

2018-10-24 Thread 徐涛
Hi experts
I am running flink job on YARN in job cluster mode, the job is divided 
into 2 tasks, the following are some configs of the job:
parallelism.default => 16
taskmanager.numberOfTaskSlots => 8
-yn => 2

when the program starts, I found that the count of task managers is not 
set immediately, but first expand then converge, I record the number during the 
process:
Task Managers Task Slots Available Task Slots
1.  14   10488
2.  15 120104
3.  16 128112
4.   6   48  32
5.   3   24  8
6.   2   16  0

The final state is correct. There are 2 tasks, 32 subtask in total, due 
to slot sharing, only 16 slots are enough, the number of task slots per TM are 
8, so 2 TMs are needed.
I have the following question:
Because I specify yn=2, why does not directly allocate 2 TMs, but goes 
the expand then converge process?  Why does it apply 16 task managers at most? 
If it is not a must, how to avoid it?

Thanks a lot!

Best
Henry


Flink weird checkpointing behaviour

2018-10-24 Thread Pawel Bartoszek
Hi,

We have just upgraded to Flink 1.5.2 on EMR from Flink 1.3.2. We have
noticed that some checkpoints are taking a very long time to complete some
of them event fails with exception

Caused by: akka.pattern.AskTimeoutException: Ask timed out on
[Actor[akka://flink/user/jobmanager_0#-665361795]] after [6 ms].


We have noticed that *Checkpoint Duration (Async) *is taking most of
checkpoint time compared to *Checkpoint Duration (Sync). *I thought that
Async checkpoints are only offered by RocksDB backend state. We use
filesystem state.

We didn't have such problems on Flink 1.3.2

Thanks,
Pawel

*Flink configuration*

akka.ask.timeout 60 s
classloader.resolve-order parent-first
containerized.heap-cutoff-ratio 0.15
env.hadoop.conf.dir /etc/hadoop/conf
env.yarn.conf.dir /etc/hadoop/conf
high-availability zookeeper
high-availability.cluster-id application_1540292869184_0001
high-availability.zookeeper.path.root /flink
high-availability.zookeeper.quorum
ip-10-4-X-X.eu-west-1.compute.internal:2181
high-availability.zookeeper.storageDir hdfs:///flink/recovery
internal.cluster.execution-mode NORMAL
internal.io.tmpdirs.use-local-default true
io.tmp.dirs
/mnt/yarn/usercache/hadoop/appcache/application_1540292869184_0001
jobmanager.heap.mb 3072
jobmanager.rpc.address ip-10-4-X-X.eu-west-1.compute.internal
jobmanager.rpc.port 41219
jobmanager.web.checkpoints.history 1000
parallelism.default 32
rest.address ip-10-4-X-X.eu-west-1.compute.internal
rest.port 0
state.backend filesystem
state.backend.fs.checkpointdir s3a://
state.checkpoints.dir s3a://...
state.savepoints.dir s3a://...
taskmanager.heap.mb 6600
taskmanager.numberOfTaskSlots 1
web.port 0
web.tmpdir /tmp/flink-web-c3d16e22-1a33-46a2-9825-a6e268892199
yarn.application-attempts 10
yarn.maximum-failed-containers -1
zookeeper.sasl.disable true


Re: Error with custom `SourceFunction` wrapping `InputFormatSourceFunction` (Flink 1.6)

2018-10-24 Thread Aaron Levin
Hey,

First, I appreciate everyone's help! Thank you!

I wrote several wrappers to try and debug this, including one which is an
exact copy of `InputFormatSourceFunction` which also failed. They all
failed with the same error I detail above. I'll post two of them below.
They all extended `RichParallelSourceFunction` and, as far as I could tell,
were properly initialized (though I may have missed something!).
Additionally, for the two below, if I change `extends
RichParallelSourceFunction` to `extends
InputFormatSourceFunction(...,...)`, I no longer receive the exception.
This is what led me to believe the source of the issue was casting and how
I found the line of code where the stream graph is given the input format.

Quick explanation of the wrappers:
1. `WrappedInputFormat` does a basic wrap around
`InputFormatSourceFunction` and delegates all methods to the underlying
`InputFormatSourceFunction`
2. `ClonedInputFormatSourceFunction` is a ~exact copy of the
`InputFormatSourceFunction` source.
3. They're being used in a test which looks vaguely like:
`DataStreamUtils.collect(env.addSource(new WrappedInputFormat(new
InputFormatSourceFunction[String](source,
implicitly[TypeInformation[String]]))).javaStream).asScala.toSeq`

class WrappedInputFormat[A](
  inputFormat: InputFormatSourceFunction[A]
)(
  implicit typeInfo: TypeInformation[A]
) extends RichParallelSourceFunction[A] {

  override def run(sourceContext: SourceFunction.SourceContext[A]): Unit = {
inputFormat.run(sourceContext)
  }
  override def setRuntimeContext(t: RuntimeContext): Unit = {
inputFormat.setRuntimeContext(t)
  }
  override def equals(obj: scala.Any) = {
inputFormat.equals(obj)
  }
  override def hashCode() = { inputFormat.hashCode() }
  override def toString = { inputFormat.toString }
  override def getRuntimeContext(): RuntimeContext = {
inputFormat.getRuntimeContext }
  override def getIterationRuntimeContext = {
inputFormat.getIterationRuntimeContext }
  override def open(parameters: Configuration): Unit = {
inputFormat.open(parameters)
  }
  override def cancel(): Unit = {
inputFormat.cancel()
  }
  override def close(): Unit = {
inputFormat.close()
  }
}

And the other one:

class ClonedInputFormatSourceFunction[A](val format: InputFormat[A,
InputSplit], val typeInfo: TypeInformation[A]) extends
RichParallelSourceFunction[A] {

  @transient private var provider: InputSplitProvider = _
  @transient private var serializer: TypeSerializer[A] = _
  @transient private var splitIterator: Iterator[InputSplit] = _
  private var isRunning: Boolean = _

  override def open(parameters: Configuration): Unit = {
val context = getRuntimeContext.asInstanceOf[StreamingRuntimeContext]
if(format.isInstanceOf[RichInputFormat[_,_]]) {
  format.asInstanceOf[RichInputFormat[_,_]].setRuntimeContext(context)
}
format.configure(parameters)

provider = context.getInputSplitProvider
serializer =
typeInfo.createSerializer(getRuntimeContext.getExecutionConfig)
splitIterator = getInputSplits()
isRunning = splitIterator.hasNext
  }

  override def run(sourceContext: SourceFunction.SourceContext[A]): Unit = {
if(isRunning && format.isInstanceOf[RichInputFormat[_,_]]) {
  format.asInstanceOf[RichInputFormat[_,_]].openInputFormat()
}

var nextElement: A = serializer.createInstance()
try {
  while (isRunning) {
format.open(splitIterator.next())
while (isRunning && !format.reachedEnd()) {
  nextElement = format.nextRecord(nextElement)
  if (nextElement != null) {
sourceContext.collect(nextElement)
  } else {
break
  }
  format.close()
  if (isRunning) {
isRunning = splitIterator.hasNext
  }
}
  }
} finally {

  format.close()
  if (format.isInstanceOf[RichInputFormat[_,_]]) {
format.asInstanceOf[RichInputFormat[_,_]].closeInputFormat()
  }
  isRunning = false
}
  }

  override def cancel(): Unit = {
isRunning = false
  }

  override def close(): Unit = {
format.close()
if(format.isInstanceOf[RichInputFormat[_,_]]) {
  format.asInstanceOf[RichInputFormat[_,_]].closeInputFormat()
}
  }

  private def getInputSplits(): Iterator[InputSplit] = {
new Iterator[InputSplit] {
  private var nextSplit: InputSplit = _
  private var exhausted: Boolean = _

  override def hasNext: Boolean = {
if(exhausted) { return false }
if(nextSplit != null) { return true }
var split: InputSplit = null

try {
  split =
provider.getNextInputSplit(getRuntimeContext.getUserCodeClassLoader)
} catch {
  case e: InputSplitProviderException =>
throw new RuntimeException("No InputSplit Provider", e)
}

if(split != null) {
  nextSplit = split
  true
} else {
  exhausted = true
  false
}
  }

  

Fail to recover Keyed State afeter ReinterpretAsKeyedStream

2018-10-24 Thread Jose Cisneros
Hi,


To avoid reshuffling in my job, I started using  DataStreamUtils.
reinterpretAsKeyedStream to avoid having to do another keyBy for the same
key.  The BackEndState is RocksDB.


When the job recovers after a failure, the ProcessFunction after the keyBy
restores its Keyed State correctly, while the Process function
after reinterpretAsKeyedStream does not recover the Keyed State.


I have checked the data written by the checkpoints and there is a reference
to the sate.


If I change and use keyBy instead of DataStreamUtils.
reinterpretAsKeyedStream  the Keyed State is recovered as expected.


Is the DataStreamUtils.reinterpretAsKeyedStream function not intended to
use Keyed State?


Thank you.

Regards,


Jose


Re: Clean shutdown of streaming job

2018-10-24 Thread Ning Shi
Hi Neils,

Thanks for the response.

> I think your problem is that the Cassandra sink doesn't support exactly
> once guarantees when the Cassandra query isn't idempotent. If possible, the
> cleanest solution would be implementing a new or extending the existing
> Cassandra sink with the
> https://ci.apache.org/projects/flink/flink-docs-stable/api/java/index.html?org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.html
> interface, and setting your environment to exactly-once guarantee.

You are right. The culprit is that the Cassandra queries are not
idempotent.

I did consider implementing a custom sink that implements the two phase
commit sink function. However, working with an external system that
doesn't ahve any transaction support is non-trivial. We have to build
undo logs and roll it back from the application side in case the
transaction aborts.

That was what led me to think that pausing the Kafka stream might be the
simplest and cleanest solution here. It doesn't mandate that the sink
has to be exactly once and still provide a clean shutdown approach,
which may have broader applications.

--
Ning


cannot find symbol of "fromargs"

2018-10-24 Thread Mar_zieh
Hello

I am new in Flink. I want to write a program in stream processing. I added
this line to my program:
ParameterTool mmm = new ParameterTool.fromArgs(args);

But I got this error:

cannot find symbol of "fromargs"

would you please let me know how to solve this error? 

Thank you in advance. 



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Flink Task Allocation on Nodes

2018-10-24 Thread Kien Truong
Hi,

You can have multiple Flink clusters on the same set of physical machines. In
our experience, it's best to deploy a separate Flink cluster for each job
and adjust the resource accordingly.

Best regards,
Kien

On Oct 24, 2018 at 20:17, > wrote:

Flink Cluster in standalone with HA configuration. It has 6 Task managers
and each has 8 slots. Overall, 48 slots for the cluster.

>>If you cluster only have one task manager with one slot in each node,
then the job should be spread evenly.
Agree, this will solve the issue. However, the cluster is running other
jobs and in this case it won't have hardware resource for other jobs.

On Wed, Oct 24, 2018 at 2:20 PM Kien Truong  wrote:

> Hi,
>
> How are your task managers deploy ?
>
> If you cluster only have one task manager with one slot in each node,
> then the job should be spread evenly.
>
> Regards,
>
> Kien
>
> On 10/24/2018 4:35 PM, Sayat Satybaldiyev wrote:
> > Is there any way to indicate flink not to allocate all parallel tasks
> > on one node?  We have a stateless flink job that reading from 10
> > partition topic and have a parallelism of 6. Flink job manager
> > allocates all 6 parallel operators to one machine, causing all traffic
> > from Kafka allocated to only one machine. We have a cluster of 6 nodes
> > and ideal to spread one parallel operator to one machine. Is there a
> > way to do than in Flink?
>


Re: Flink Task Allocation on Nodes

2018-10-24 Thread Sayat Satybaldiyev
Flink Cluster in standalone with HA configuration. It has 6 Task managers
and each has 8 slots. Overall, 48 slots for the cluster.

>>If you cluster only have one task manager with one slot in each node,
then the job should be spread evenly.
Agree, this will solve the issue. However, the cluster is running other
jobs and in this case it won't have hardware resource for other jobs.

On Wed, Oct 24, 2018 at 2:20 PM Kien Truong  wrote:

> Hi,
>
> How are your task managers deploy ?
>
> If you cluster only have one task manager with one slot in each node,
> then the job should be spread evenly.
>
> Regards,
>
> Kien
>
> On 10/24/2018 4:35 PM, Sayat Satybaldiyev wrote:
> > Is there any way to indicate flink not to allocate all parallel tasks
> > on one node?  We have a stateless flink job that reading from 10
> > partition topic and have a parallelism of 6. Flink job manager
> > allocates all 6 parallel operators to one machine, causing all traffic
> > from Kafka allocated to only one machine. We have a cluster of 6 nodes
> > and ideal to spread one parallel operator to one machine. Is there a
> > way to do than in Flink?
>


HighAvailability :: FsNegativeRunningJobsRegistry

2018-10-24 Thread Mikhail Pryakhin
Hi guys, 
I'm trying to substitute Zookeeper-based HA registry with YARN-based HA 
registry. (The idea was taken from the issue 
https://issues.apache.org/jira/browse/FLINK-5254)
In Flink 1.6.1, there exists an 
org.apache.flink.runtime.highavailability.FsNegativeRunningJobsRegistry which 
claims to be tailored towards YRN deployment mode. I've looked through 
org.apache.flink.configuration.HighAvailabilityOptions in order to figure out 
how to enable YARN-based HA registry but haven't found anything about it. The 
Flink documentation mentions nothing about it either. 

Do I miss something? Is there a way to use this exact registry for YARN 
deployments?

Thank you in advance.

Kind Regards,
Mike Pryakhin



smime.p7s
Description: S/MIME cryptographic signature


Question over Incremental Snapshot vs Full Snapshot in rocksDb state backend

2018-10-24 Thread chandan prakash
Hi,
I am new to Flink.
Was looking into the code to understand how Flink does FullSnapshot and
Incremental Snapshot using RocksDB

What I understood:
1. *For full snapshot, we call RocksDb snapshot api* which basically an
iterator handle to the entries in RocksDB instance. We iterate over every
entry one by one and serialize that to some distributed file system.
Similarly in restore for fullSnapshot, we read the file to get every entry
and apply that to the rocksDb instance one by one to fully construct the db
instance.

2. On the other hand in *for Incremental Snapshot, we rely on RocksDB
Checkpoint api* to copy the sst files to HDFS/S3 incrementally.
Similarly on restore, we copy the sst files to local directory and
instantiate rocksDB instance with the path of the directory.

*My Question is:*
1. Why did we took 2 different approaches using different RocksDB apis ?
We could have used Checkpoint api of RocksDB for fullSnapshot as well .
2. Is there any specific reason to use *Snapshot API of rocksDB*  over
*Checkpoint
api of RocksDB* for *fullSnapshot*?

I am sure, I am missing some important point, really curious to know that.
Any explanation will be really great. Thanks in advance.


Regards,
Chandan





-- 
Chandan Prakash


Re: Flink Task Allocation on Nodes

2018-10-24 Thread Kien Truong

Hi,

How are your task managers deploy ?

If you cluster only have one task manager with one slot in each node, 
then the job should be spread evenly.


Regards,

Kien

On 10/24/2018 4:35 PM, Sayat Satybaldiyev wrote:
Is there any way to indicate flink not to allocate all parallel tasks 
on one node?  We have a stateless flink job that reading from 10 
partition topic and have a parallelism of 6. Flink job manager 
allocates all 6 parallel operators to one machine, causing all traffic 
from Kafka allocated to only one machine. We have a cluster of 6 nodes 
and ideal to spread one parallel operator to one machine. Is there a 
way to do than in Flink?


Re: Checkpoint acknowledge takes too long

2018-10-24 Thread Hequn Cheng
Hi Henry,

@Kien is right. Take a thread dump to see what was doing in the
TaskManager. Also check whether gc happens frequently.

Best, Hequn


On Wed, Oct 24, 2018 at 5:03 PM 徐涛  wrote:

> Hi
> I am running a flink application with parallelism 64, I left the
> checkpoint timeout default value, which is 10minutes, the state size is
> less than 1MB, I am using the FsStateBackend.
> The application triggers some checkpoints but all of them fails
> due to "Checkpoint expired before completing”, I check the checkpoint
> history, found that there are 63 subtask acknowledge, but one left n/a, and
> also the alignment duration is quite long, about 5m27s.
> I want to know why there is one subtask does not acknowledge? And
> because the alignment duration is long, what will influent the alignment
> duration?
> Thank a lot.
>
> Best
> Henry


Re: Size of Checkpoints increasing with time

2018-10-24 Thread Kien Truong

Hi,

Do you use incremental checkpoint ?

RocksDB is an append-only DB, so you will experience the steady increase 
in state size until a compaction occurs and old values of keys are 
garbage-collected.


However, the average state size should stabilize after a while, if the 
load doesn't change.


Regards,

Kien


On 10/23/2018 7:03 PM, Sameer W wrote:

Hi,

We are using ValueState to maintain state. It is a pretty simple job 
with a keyBy operator on a stream and the subsequent map operator 
maintains state in a ValueState instance. The transaction load is in 
billion transactions per day. However the amount of state per key is a 
list of 18x6 long values which are constantly updated. We have about 
20 million keys and transactions are uniformly distributed across 
those keys.


When the job starts the size of the checkpoints (Using RocksDB backed 
by S3) is low (order of 500 MB). However, after 12 hours of operation 
the checkpoint sizes have increased to about 4-5 GB. Time taken to 
complete the checkpoint starts around 15-20 seconds and after 12 hours 
reaches about a minute.


What is the reason behind the increasing size of checkpoints?

Thanks,
Sameer


Re: Checkpoint acknowledge takes too long

2018-10-24 Thread Kien Truong

Hi,

In my experience, this is most likely due to one sub-task is blocked 
doing some long-running operation.


Try to run the task manager with some profiler (like VisualVM) and check 
for hot spot.



Regards,

Kien

On 10/24/2018 4:02 PM, 徐涛 wrote:

Hi
I am running a flink application with parallelism 64, I left the 
checkpoint timeout default value, which is 10minutes, the state size is less 
than 1MB, I am using the FsStateBackend.
The application triggers some checkpoints but all of them fails due to 
"Checkpoint expired before completing”, I check the checkpoint history, found 
that there are 63 subtask acknowledge, but one left n/a, and also the alignment 
duration is quite long, about 5m27s.
I want to know why there is one subtask does not acknowledge? And 
because the alignment duration is long, what will influent the alignment 
duration?
Thank a lot.

Best
Henry


Re: Error with custom `SourceFunction` wrapping `InputFormatSourceFunction` (Flink 1.6)

2018-10-24 Thread Kien Truong

Hi,

Since InputFormatSourceFunction is a subclass of 
RichParallelSourceFunction, your wrapper should also extend this class.


In addition, remember to overwrite the methods defined in the 
AbstractRichFunction interface and


proxy the call to the underlying InputFormatSourceFunction, in order to 
initialize the underlying source correctly.



Best regards,

Kien


On 10/20/2018 1:06 AM, Aaron Levin wrote:

Hi,

I'm writing a custom `SourceFunction` which wraps an underlying 
`InputFormatSourceFunction`. When I try to use this `SourceFunction` 
in a stream (via `env.addSource` and a subsequent sink) I get errors 
related to the `InputSplitAssigner` not being initialized for a 
particular vertex ID. Full error here[1].


I believe the underlying error is related to this[0] call to 
`instanceof InputFormatSourceFunction`.


_My questions_:

1. how can I wrap a `InputFormatSourceFunction` which avoids this 
error? Am I missing a chunk of the API covering this?
2. is the error I'm experience related to that casting call? If so, 
would ya'll be open to a PR which adds an interface one can extend 
which will set the input format in the stream graph? Or is there a 
preferred way of achieving this?


Thanks!

Aaron Levin

[0] 
https://github.com/apache/flink/blob/release-1.6/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java#L480

[1]
java.lang.RuntimeException: Could not retrieve next input split.
    at 
org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction$1.hasNext(InputFormatSourceFunction.java:157)
    at 
org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.open(InputFormatSourceFunction.java:71)

    at REDACTED
    at 
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
    at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)

    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
    at java.lang.Thread.run(Thread.java:748)
Caused by: 
org.apache.flink.runtime.jobgraph.tasks.InputSplitProviderException: 
Requesting the next input split failed.
    at 
org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvider.getNextInputSplit(RpcInputSplitProvider.java:69)
    at 
org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction$1.hasNext(InputFormatSourceFunction.java:155)

    ... 8 more
Caused by: java.util.concurrent.ExecutionException: 
java.lang.Exception: No InputSplitAssigner for vertex ID 
cbc357ccb763df2852fee8c4fc7d55f2
    at 
java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
    at 
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
    at 
org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvider.getNextInputSplit(RpcInputSplitProvider.java:61)

    ... 9 more
Caused by: java.lang.Exception: No InputSplitAssigner for vertex ID 
cbc357ccb763df2852fee8c4fc7d55f2
    at 
org.apache.flink.runtime.jobmaster.JobMaster.requestNextInputSplit(JobMaster.java:575)

    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

    at java.lang.reflect.Method.invoke(Method.java:498)
    at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:247)

...


Flink Task Allocation on Nodes

2018-10-24 Thread Sayat Satybaldiyev
Is there any way to indicate flink not to allocate all parallel tasks on
one node?  We have a stateless flink job that reading from 10 partition
topic and have a parallelism of 6. Flink job manager allocates all 6
parallel operators to one machine, causing all traffic from Kafka allocated
to only one machine. We have a cluster of 6 nodes and ideal to spread one
parallel operator to one machine. Is there a way to do than in Flink?


Checkpoint acknowledge takes too long

2018-10-24 Thread 徐涛
Hi 
I am running a flink application with parallelism 64, I left the 
checkpoint timeout default value, which is 10minutes, the state size is less 
than 1MB, I am using the FsStateBackend.
The application triggers some checkpoints but all of them fails due to 
"Checkpoint expired before completing”, I check the checkpoint history, found 
that there are 63 subtask acknowledge, but one left n/a, and also the alignment 
duration is quite long, about 5m27s.
I want to know why there is one subtask does not acknowledge? And 
because the alignment duration is long, what will influent the alignment 
duration?
Thank a lot.

Best
Henry

Error migrating to 1.6

2018-10-24 Thread Juan Gentile
Hello!

We are trying to migrate from 1.4 to 1.6 and we are getting the following 
exception in our jobs:

org.apache.flink.util.FlinkException: The assigned slot 
container_e293_1539164595645_3455869_01_011241_2 was removed.
   at 
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.removeSlot(SlotManager.java:786)
   at 
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.removeSlots(SlotManager.java:756)
   at 
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.internalUnregisterTaskManager(SlotManager.java:948)
   at 
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.unregisterTaskManager(SlotManager.java:372)
   at 
org.apache.flink.runtime.resourcemanager.ResourceManager.closeTaskManagerConnection(ResourceManager.java:802)
   at 
org.apache.flink.runtime.resourcemanager.ResourceManager.disconnectTaskManager(ResourceManager.java:395)
   at sun.reflect.GeneratedMethodAccessor106.invoke(Unknown Source)
   at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
   at java.lang.reflect.Method.invoke(Method.java:498)
   at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:242)
   at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:162)
   at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
   at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
   at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
   at 
akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
   at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
   at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
   at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
   at akka.actor.ActorCell.invoke(ActorCell.scala:495)
   at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
   at akka.dispatch.Mailbox.run(Mailbox.scala:224)
   at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
   at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
   at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
   at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

Do you know the reason? Any help is appreciated.

Thank you,
Juan


Re: Error with custom `SourceFunction` wrapping `InputFormatSourceFunction` (Flink 1.6)

2018-10-24 Thread Dawid Wysakowicz
Hi Aaron,

Could you share the code of you custom function?

I am also adding Aljosha and Kostas to cc, who should be more helpful on
that topic.

Best,

Dawid

On 19/10/2018 20:06, Aaron Levin wrote:
> Hi,
>
> I'm writing a custom `SourceFunction` which wraps an underlying
> `InputFormatSourceFunction`. When I try to use this `SourceFunction`
> in a stream (via `env.addSource` and a subsequent sink) I get errors
> related to the `InputSplitAssigner` not being initialized for a
> particular vertex ID. Full error here[1].
>
> I believe the underlying error is related to this[0] call to
> `instanceof InputFormatSourceFunction`.
>
> _My questions_:
>
> 1. how can I wrap a `InputFormatSourceFunction` which avoids this
> error? Am I missing a chunk of the API covering this?
> 2. is the error I'm experience related to that casting call? If so,
> would ya'll be open to a PR which adds an interface one can extend
> which will set the input format in the stream graph? Or is there a
> preferred way of achieving this?
>
> Thanks!
>
> Aaron Levin
>
> [0] 
> https://github.com/apache/flink/blob/release-1.6/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java#L480
> [1] 
> java.lang.RuntimeException: Could not retrieve next input split.
>     at
> org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction$1.hasNext(InputFormatSourceFunction.java:157)
>     at
> org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.open(InputFormatSourceFunction.java:71)
>     at REDACTED
>     at
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
>     at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
>     at
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
>     at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>     at java.lang.Thread.run(Thread.java:748)
> Caused by:
> org.apache.flink.runtime.jobgraph.tasks.InputSplitProviderException:
> Requesting the next input split failed.
>     at
> org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvider.getNextInputSplit(RpcInputSplitProvider.java:69)
>     at
> org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction$1.hasNext(InputFormatSourceFunction.java:155)
>     ... 8 more
> Caused by: java.util.concurrent.ExecutionException:
> java.lang.Exception: No InputSplitAssigner for vertex ID
> cbc357ccb763df2852fee8c4fc7d55f2
>     at
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
>     at
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
>     at
> org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvider.getNextInputSplit(RpcInputSplitProvider.java:61)
>     ... 9 more
> Caused by: java.lang.Exception: No InputSplitAssigner for vertex ID
> cbc357ccb763df2852fee8c4fc7d55f2
>     at
> org.apache.flink.runtime.jobmaster.JobMaster.requestNextInputSplit(JobMaster.java:575)
>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>     at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>     at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>     at java.lang.reflect.Method.invoke(Method.java:498)
>     at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:247)
> ...


signature.asc
Description: OpenPGP digital signature


Re: Manual trigger the window in fold operator or incremental aggregation

2018-10-24 Thread Dominik Wosiński
Hey Zhen Li,

What are You trying to do exactly? Maybe there is a more suitable method
than manually triggering windows available in Flink.

Best Regards,
Dom.

śr., 24 paź 2018 o 09:25 Dawid Wysakowicz 
napisał(a):

> Hi Zhen Li,
>
> As far as I know that is not possible. For such custom handling I would
> recommend having a look at ProcessFunction[1], where you have access to
> timers and state.
>
> Best,
>
> Dawid
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/operators/process_function.html#process-function-low-level-operations
>
> On 17/10/2018 14:18, Ahmad Hassan wrote:
>
> Hi Niels,
>
> Can we distinguish within apply function of 'RichWindowFunction' whether
> it was called due to onElement trigger call or onProcessingtime trigger
> call of a custom Trigger ?
>
> Thanks!
>
> On Wed, 17 Oct 2018 at 12:51, Niels van Kaam  wrote:
>
>> Hi Zhen Li,
>>
>> You can control when a windowed stream emits data with "Triggers". See:
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#triggers
>>
>> Flink comes with a couple of default triggers, but you can also create
>> your own by implementing
>> https://ci.apache.org/projects/flink/flink-docs-stable/api/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.html
>> .
>>
>> Note that this does not change the window, but just causes the
>> windowedstream to emit intermediate results to the next operator.
>>
>> Does this answer your question?
>>
>> Cheers,
>> Niels
>>
>> On Wed, Oct 17, 2018 at 12:34 PM zhen li  wrote:
>>
>>> Hi all:
>>> How can I trigger the window manually in  fold operator or
>>> incremental aggregation? For example, when some conditions is meet,althouth
>>> the count window or time window is not reach
>>
>>


Flink Error - Remote system has been silent for too long

2018-10-24 Thread Anil
The Flink jobs are deployed in Yarn cluster. I am seeing the following log
for some of my jobs in Job Manager. I'm using Flink 1.4. The job has,
taskmanager.exit-on-fatal-akka-error=true. 
But I don't see the task manager being restarted. 

I made the following observations - 
1. One job does a join on two kafka topic. One of the stream didn't have any
data in last 24 hours. 
2. Two jobs that have the same log in JobManager.out but is working fine and
the records are being generated. 

{"debug_level":"ERROR","debug_timestamp":"2018-10-24
05:23:25,092","debug_thread":"flink-akka.actor.default-dispatcher-20","debug_file":"MarkerIgnoringBase.java",
"debug_line":"161","debug_message":"Association to
[akka.tcp://flink@ip-*-*-*-*.ap-southeast-1.compute.internal:58208] with UID
[930934199] irrecoverably failed. Quarantining address.", "job_name":
"eb99e094-74c9-4036-aa08-d379d62b7ff2" }
java.util.concurrent.TimeoutException: Remote system has been silent for too
long. (more than 48.0 hours)
at
akka.remote.ReliableDeliverySupervisor$$anonfun$idle$1.applyOrElse(Endpoint.scala:375)
at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
at 
akka.remote.ReliableDeliverySupervisor.aroundReceive(Endpoint.scala:203)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
at akka.actor.ActorCell.invoke(ActorCell.scala:495)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
at akka.dispatch.Mailbox.run(Mailbox.scala:224)
at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)


I read other people having the same kind of issue and that
taskmanager.exit-on-fatal-akka-error setting worked for them. I'm not sure
why I'm seeing this issue and why is that the stream is working fine without
restart and with the error. Will appreciate any help. Thanks!




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Manual trigger the window in fold operator or incremental aggregation

2018-10-24 Thread Dawid Wysakowicz
Hi Zhen Li,

As far as I know that is not possible. For such custom handling I would
recommend having a look at ProcessFunction[1], where you have access to
timers and state.

Best,

Dawid

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/operators/process_function.html#process-function-low-level-operations

On 17/10/2018 14:18, Ahmad Hassan wrote:
> Hi Niels,
>
> Can we distinguish within apply function of 'RichWindowFunction'
> whether it was called due to onElement trigger call or
> onProcessingtime trigger call of a custom Trigger ?
>
> Thanks!
>
> On Wed, 17 Oct 2018 at 12:51, Niels van Kaam  > wrote:
>
> Hi Zhen Li,
>
> You can control when a windowed stream emits data with "Triggers".
> See:
> 
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#triggers
>  
> Flink comes with a couple of default triggers, but you can also
> create your own by implementing
> 
> https://ci.apache.org/projects/flink/flink-docs-stable/api/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.html.
>
> Note that this does not change the window, but just causes the
> windowedstream to emit intermediate results to the next operator.
>
> Does this answer your question?
>
> Cheers,
> Niels
>
> On Wed, Oct 17, 2018 at 12:34 PM zhen li  > wrote:
>
> Hi all:
>     How can I trigger the window manually in  fold operator or
> incremental aggregation? For example, when some conditions is
> meet,althouth the count window or time window is not reach
>


signature.asc
Description: OpenPGP digital signature


Re: Window State is not being store on check-pointing

2018-10-24 Thread Dawid Wysakowicz
Hi,

Do you mean that you stop your job manually and then start it?
Checkpoints are used in case of failures and are 1) automatically not
persisted across separate job runs (unless you set them to be
externalized) 2) are not automatically picked up for starting your job.
For your case when you stop and then want to start a job with a state
from previous run you should use savepoints.

For a more thorough explanation of those concepts please have a look here[1]

Best,

Dawid

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.6/ops/state/savepoints.html#what-is-a-savepoint-how-is-a-savepoint-different-from-a-checkpoint

On 17/10/2018 05:37, sohimankotia wrote:
> Hi Hequn,
>
> I tried with following :
>
> Configuration conf = new Configuration();
>
> conf.setString("state.checkpoints.dir","file:///home/sohanvir/Desktop/flink/checkpoints2");
> final StreamExecutionEnvironment env =
> StreamExecutionEnvironment.createLocalEnvironment(1,conf);
> CheckpointConfig config = env.getCheckpointConfig();
>
> config.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
> env.setParallelism(1);
> env.enableCheckpointing(20 * SECOND);
>
> env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
> env.setStateBackend(new
> RocksDBStateBackend("file:///home/sohanvir/Desktop/flink/checkpoints"));
>
>
> Still issue persists. 
>
> Any idea ?
>
>
>
>
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



signature.asc
Description: OpenPGP digital signature