Re: Semantic when table joins table from window

2018-08-21 Thread Hequn Cheng
Hi Hery,

As for choise1:

   - The state size of join depends on it's input table size, not the
   result table, so the state size of join of choise1 depends on how many
   article id, praise id and response_id.
   - Also non-window join will merge same rows in it's state, i.e, , so the state size won't grows if you keep pouring same article id.
   I think the problem here is you need a distinct before join, so that a
   praise id won't join multi same article ids, and this will influence the
   correctness of the result.
   - I think you need do aggregate before join to make sure the correctness
   of the result. Because there are duplicated article id after article join
   praise and this will influence the value of count(r.response_id).
   - You can't use window or other bounded operators after non-window join.
   The time attribute fields can not be passed through because of semantic
   conflict.
   - Hop window with large fixed duration and small hop interval should be
   avoided. Data will be redundant in various windows. For example, a hopping
   window of 15 minutes size and 5 minute hop interval assigns each row to 3
   different windows of 15 minute size.

As for choice2:

   - I think you need another filed(for example, HOP_START) when join the
   three tables. Only join records in same window.

To solve your problem, I think we can do non-window group by first and then
join three result tables. Furthermore, state retention time can be set to
keep state from growing larger.

Best, Hequn

On Tue, Aug 21, 2018 at 10:07 PM 徐涛  wrote:

> Hi Fabian,
> So maybe I can not join a table that generate from a window, because the
> table is getting larger and larger as the time goes, maybe the system will
> crash one day.
>
> I am working on a system that calculate the “score" of article, which is
> consist of the count of article praise, the count of article response, etc
> Because I can not use flink to save all the article, I decide to update
> the score of the article that created in 3 days.
>
> I have two choises,
> 1. join the article table and praise table, response table then window
> select a.article_id, count(p.praise_id) as pCount, count(r.response_id) as
> rCount
> from
> article a
> left join
> praise p on a.article_id = p.article_id
> left join
> response r on a.article_id = r.article_id
> group by hop(updated_time, interval '1' minute,interval '3' day) ,
> article_id
> 2. window the article table, window the priase table, window the response
> table ,then join them together
> select aAggr.article_id, pAggr.pCount, rAggr.rCount
> (select article_id from article group by hop(updated_time, interval '1'
> minute,interval '3' day) , article_id) aAggr
> left join
> (select article_id,count(praise_id) as pCount from praise group by hop(
> updated_time, interval '1' minute,interval '3' day) , article_id) pAggr
> on aAggr.article_id=pAggr.article_id
> left join
> (select article_id,count(response_id) as rCount from response group by hop
> (updated_time, interval '1' minute,interval '3' day) , article_id) rAggr on
> aAggr.article_id=rAggr.article_id
>
> Maybe I should choose 1,   join then window, but not window then join.
> Please correct me if I am wrong.
>
> I have some worries when choose 1,
> I do not know how Flink works internally, it seems that in the sql , table
> article ,table praise, table response is growing as the time goes by, will
> it introduce performance issue?
>
> Best,
> Henry
>
> 在 2018年8月21日,下午9:29,Hequn Cheng  写道:
>
> Hi Henry,
>
> praiseAggr is an append table, so it contains
> "100,101,102,100,101,103,100".
> 1. if you change your sql to s"SELECT article_id FROM praise GROUP BY
> article_id", the answer is "101,102,103"
> 2. if you change your sql to s"SELECT last_value(article_id) FROM
> praise", the answer is "100"
>
> Best, Hequn
>
> On Tue, Aug 21, 2018 at 8:52 PM, 徐涛  wrote:
>
>> Hi Fabian,
>> Thanks for your response. This question puzzles me for quite a long time.
>> If the praiseAggr has the following value:
>> window-1 100,101,102
>> window-2 100,101,103
>> window-3 100
>>
>> the last time the article table joins praiseAggr, which of the following
>> value does praiseAggr table has?
>> 1— 100,101,102,100,101,103,100   collect all the element of all
>> the window
>> 2—  100the element of the latest window
>> 3—  101,102,103the distinct value of all the window
>>
>>
>> Best,
>> Henry
>>
>>
>> 在 2018年8月21日,下午8:02,Fabian Hueske  写道:
>>
>> Hi,
>>
>> The semantics of a query do not depend on the way that it is used.
>> praiseAggr is a table that grows by one row per second and article_id. If
>> you use that table in a join, the join will fully materialize the table.
>> This is a special case because the same row is added multiple times, so
>> the state won't grow that quickly, but the performance will decrease
>> because for each row from article will join with multiple (a growing
>> number) of rows from praiseAggr.
>>
>> Best, Fabian
>>
>> 

Re: getting error while running flink job on yarn cluster

2018-08-21 Thread vino yang
Hi Yubraj,

The solution to a similar problem from StackOverflow is to explicitly
define the serialVersionUID in your class. For more information, please
visit here.[1]

[1]:
https://stackoverflow.com/questions/27647992/how-resolve-java-io-invalidclassexception-local-class-incompatible-stream-clas

Thanks, vino.

yuvraj singh <19yuvrajsing...@gmail.com> 于2018年8月22日周三 上午1:42写道:

> it works , but now i am getting
>
> org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot 
> instantiate user function.
>   at 
> org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:239)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain.(OperatorChain.java:104)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:267)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.InvalidClassException: 
> com.fasterxml.jackson.databind.ObjectMapper; local class incompatible: stream 
> classdesc serialVersionUID = 1, local class serialVersionUID = 2
>   at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:616)
>   at 
> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1630)
>   at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1521)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1781)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
>   at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
>   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
>   at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
>   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
>   at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
>   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
>   at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
>   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
>   at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373)
>   at 
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:502)
>   at 
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:489)
>   at 
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:477)
>   at 
> org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:438)
>   at 
> org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:224)
>   ... 4 more
>
>
> thanks
>
> Yubraj Singh
>
>
>
> On Tue, Aug 21, 2018 at 10:54 PM Gary Yao  wrote:
>
>> Hi Yubraj Singh,
>>
>> Can you try submitting with HADOOP_CLASSPATH=`hadoop classpath` set? [1]
>> For example:
>>   HADOOP_CLASSPATH=`hadoop classpath` bin/flink run [...]
>>
>> Best,
>> Gary
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-master/ops/deployment/hadoop.html#configuring-flink-with-hadoop-classpaths
>>
>> On Tue, Aug 21, 2018 at 4:23 PM, yuvraj singh <19yuvrajsing...@gmail.com>
>> wrote:
>>
>>> Hi ,
>>>
>>> i am getting a error while running a flink job on yarn cluster , its
>>> running fine when i run it on flink standalone cluster
>>>
>>> java.lang.NoClassDefFoundError:
>>> com/sun/jersey/core/util/FeaturesAndProperties
>>>
>>> at java.lang.ClassLoader.defineClass1(Native Method)
>>>
>>> at java.lang.ClassLoader.defineClass(ClassLoader.java:763)
>>>
>>> at
>>> java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
>>>
>>> at java.net.URLClassLoader.defineClass(URLClassLoader.java:467)
>>>
>>> at java.net.URLClassLoader.access$100(URLClassLoader.java:73)
>>>
>>> at java.net.URLClassLoader$1.run(URLClassLoader.java:368)
>>>
>>> at java.net.URLClassLoader$1.run(URLClassLoader.java:362)
>>>
>>> at java.security.AccessController.doPrivileged(Native Method)
>>>
>>> at java.net.URLClassLoader.findClass(URLClassLoader.java:361)
>>>
>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>>>
>>> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:335)
>>>

Re: classloading strangeness with Avro in Flink

2018-08-21 Thread vino yang
Hi Cliff,

You are welcome, I am very happy to hear this message.

Thanks, vino.

Cliff Resnick  于2018年8月21日周二 下午11:46写道:

> Solved this by moving flink-avro to lib and reverting to
> `classloader.resolve-order: parent-first`.  I still don't know why, but I
> guess if you're reading Avro both from file and Kafka in the same pipeline
> then inverted class loader delegation will not work. Thanks, Vino for your
> help!
>
> On Tue, Aug 21, 2018 at 8:02 AM Cliff Resnick  wrote:
>
>> Hi Aljoscha,
>>
>> We need flink-shaded-hadoop2-uber.jar because there is no hadoop distro
>> on the instance the Flink session/jobs is managed from and the process that
>> launches Flink is not a java process, but execs a process that calls the
>> flink script.
>>
>> -Cliff
>>
>> On Tue, Aug 21, 2018 at 5:11 AM Aljoscha Krettek 
>> wrote:
>>
>>> Hi Cliff,
>>>
>>> Do you actually need the flink-shaded-hadoop2-uber.jar in lib. If you're
>>> running on YARN, you should be able to just remove them because with YARN
>>> you will have Hadoop in the classpath anyways.
>>>
>>> Aljoscha
>>>
>>> On 21. Aug 2018, at 03:45, vino yang  wrote:
>>>
>>> Hi Cliff,
>>>
>>> If so, you can explicitly exclude Avro's dependencies from related
>>> dependencies (using ) and then directly introduce dependencies on
>>> the Avro version you need.
>>>
>>> Thanks, vino.
>>>
>>> Cliff Resnick  于2018年8月21日周二 上午5:13写道:
>>>
 Hi Vino,

 Unfortunately, I'm still stuck here. By moving the avro dependency
 chain to lib (and removing it from user jar), my OCFs decode but I get the
 error described here:

 https://github.com/confluentinc/schema-registry/pull/509

 However, the Flink fix described in the PR above was to move the Avro
 dependency to the user jar. However, since I'm using YARN, I'm required to
 have flink-shaded-hadoop2-uber.jar loaded from lib -- and that has
 avro bundled un-shaded. So I'm back to the start problem...

 Any advice is welcome!

 -Cliff


 On Mon, Aug 20, 2018 at 1:42 PM Cliff Resnick  wrote:

> Hi Vino,
>
> You were right in your assumption -- unshaded avro was being added to
> our application jar via third-party dependency. Excluding it in packaging
> fixed the issue. For the record, it looks flink-avro must be loaded from
> the lib or there will be errors in checkpoint restores.
>
> On Mon, Aug 20, 2018 at 8:43 AM Cliff Resnick 
> wrote:
>
>> Hi Vino,
>>
>> Thanks for the explanation, but the job only ever uses the Avro
>> (1.8.2) pulled in by flink-formats/avro, so it's not a class version
>> conflict there.
>>
>> I'm using default child-first loading. It might be a further
>> transitive dependency, though it's not clear by stack trace or stepping
>> through the process. When I get a chance I'll look further into it but in
>> case anyone is experiencing similar problems, what is clear is that
>> classloader order does matter with Avro.
>>
>> On Sun, Aug 19, 2018, 11:36 PM vino yang 
>> wrote:
>>
>>> Hi Cliff,
>>>
>>> My personal guess is that this may be caused by Job's Avro conflict
>>> with the Avro that the Flink framework itself relies on.
>>> Flink has provided some configuration parameters which allows you to
>>> determine the order of the classloaders yourself. [1]
>>> Alternatively, you can debug classloading and participate in the
>>> documentation.[2]
>>>
>>> [1]:
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.6/ops/config.html
>>> [2]:
>>> https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/debugging_classloading.html
>>>
>>> Thanks, vino.
>>>
>>> Cliff Resnick  于2018年8月20日周一 上午10:40写道:
>>>
 Our Flink/YARN pipeline has been reading Avro from Kafka for a
 while now. We just introduced a source of Avro OCF (Object Container 
 Files)
 read from S3. The Kafka Avro continued to decode without incident, but 
 the
 OCF files failed 100% with anomalous parse errors in the decoding phase
 after the schema and codec were successfully read from them. The 
 pipeline
 would work on my laptop, and when I submitted a test Main program to 
 the
 Flink Session in YARN, that would also successfully decode. Only the 
 actual
 pipeline run from the TaskManager failed. At one point I even remote
 debugged the TaskManager process and stepped through what looked like a
 normal Avro decode (if you can describe Avro code as normal!) -- until 
 it
 abruptly failed with an int decode or what-have-you.

 This stumped me for a while, but I finally tried moving
 flink-avro.jar from the lib to the application jar, and that fixed it. 
 I'm
 not sure why this is, especially since there were no typical

Some questions about the StreamingFileSink

2018-08-21 Thread Benoit MERIAUX
Hi,

I have some questions about the new StreamingFileSink in 1.6.

My usecase is pretty simple.
I have a cassandra table with 150Millions of lines.
They are partitioned by buckets of 100 000 lines.

My job is to export each "bucket" to a file (1 bucket = 1 file), so the job
is degined like this:

The source get the bucketList
then a flatmap task, fetch the lines matching the bucket and map all the
100 000 lines from cassandra to the collector
then a streamingFileSink write each line into a file by bucket (RowFormat).

The checkpointing is enabled, each 10s
The rollingPolicy is OnCheckpointRollingPolicy, and the bucketAssigner is
implemented by bucketId (my bucketId not the sink's one :).

My problem is at the end of the job, i only have in-progress.part files for
each bucket.

I do not understand how i can trigger the finalization of the sink and have
the bucket part files committed.

So I read the code of the StreamingFileSink and the Bucket classes.

If i have well understood, the in-progress bucket files can be closed then
committed (closePartFile method of the Bucket) and move to "pending state"
following the rollingPolicy to wait for a checkpoint to be moved to
"finished" state.

So the rollingPolicy can roll part files on each line, on each
BucketInterval or on each checkpoint.
In my case with the OnCheckpointRollingPolicy, it is only on each
checkpoint. Am I right ?

And when a checkpoint is successful all the pending file are moved to
"finished" state and are exploitable by another jobs.

then this is where I start losing myself.

indeed one thing suprised me in the code of the close method of the
StreamingFileSink.
It discards

all
active buckets since the last successful checkpoint!
but at the end of the successful job, no checkpoint is triggered
automatically if the minimal interval since the last checkpoint is not
expired. so what happen to data written since the last checkpoint ?
(-> Is this sink only for endless Stream ?)

How do i do to get all my file with all my data in "finished" state when my
job is finished with success ?
Do I need to trigger a checkpoint manually?
Is there a better fitting sink for my usecase ?
Should i use a another rollingPolicy ? even with the bucket interval there
still is a window between the interval and the end of the job during which
some part files are not closed and committed.

Even in case of an endless stream, I suggest to improve the behavior of the
close method by calling closePartFile on each active bucket so all valid
data since last checkpoint can be committed to pending state waiting for
the checkpoint a the end of the job.
it seems to be the case of the BucketingSink

-> I can do a PR for this


I'm open to all suggestions :)

regards,

-- 

Benoit  *MERIAUX*
*OCTO Technology*
Consultant Confirmé - Tribu NAD
.
34, Avenue de l'Opéra
75002 Paris
+33 (0)786 59 12 30 <%2F%2F%2B33786591230>
www.octo.com  - blog.octo.com
* www.usievents.com  *
@USIEvents   - @OCTOTechnology

.


error running flink job on yarn cluster

2018-08-21 Thread yuvraj singh
hi all ,

i am getting this error

java.io.InvalidClassException: com.fasterxml.jackson.databind.ObjectMapper;
local class incompatible: stream classdesc serialVersionUID = 1, local
class serialVersionUID = 2

at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:616)

at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1630)

at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1521)

at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1781)

at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)

at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)

at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)

at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)

at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)

at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)

at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)

at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)

at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)

at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)

at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)

at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)

at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)

at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)

at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)

at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)

at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)

at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373)

at
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:502)

at
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:489)

at
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:477)

at
org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:438)

at
org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:224)


please help me , is it because of jackson library ??


thanks

Yubraj singh


Question about QueryableState

2018-08-21 Thread Pierre Zemb
Hi!

I’ve started to deploy a small Flink cluster (4tm and 1jm for now on
1.6.0), and deployed a small job on it. Because of the current load, job is
completely handled by a single tm. I’ve created a small proxy that is using
QueryableStateClient

to access the current state. It is working nicely, except under certain
circumstances. It seems to me that I can only access the state through a
node that is holding a part of the job. Here’s an example:

   -

   job on tm1. Pointing QueryableStateClient to tm1. State accessible
   -

   job still on tm1. Pointing QueryableStateClient to tm2 (for example).
   State inaccessible
   -

   killing tm1, job is now on tm2. State accessible
   -

   job still on tm2. Pointing QueryableStateClient to tm3. State
   inaccessible
   -

   adding some parallelism to spread job on tm1 and tm2. Pointing
   QueryableStateClient to either tm1 and tm2 is working
   -

   job still on tm1 and tm2. Pointing QueryableStateClient to tm3. State
   inaccessible

When the state is inaccessible, I can see this (generated here

):

java.lang.RuntimeException: Failed request 0.
 Caused by: org.apache.flink.queryablestate.exceptions.UnknownLocationException:
Could not retrieve location of state=repo-status of
job=3ac3bc00b2d5bc0752917186a288d40a. Potential reasons are: i) the
state is not ready, or ii) the job does not exist.
at 
org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.getKvStateLookupInfo(KvStateClientProxyHandler.java:228)
at 
org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.getState(KvStateClientProxyHandler.java:162)
at 
org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.executeActionAsync(KvStateClientProxyHandler.java:129)
at 
org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.handleRequest(KvStateClientProxyHandler.java:119)
at 
org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.handleRequest(KvStateClientProxyHandler.java:63)
at 
org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:236)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

>From the documentation, I can see that:

The client connects to a Client Proxy running on a given Task Manager. The
proxy is the entry point of the client to the Flink cluster. It forwards
the requests of the client to the Job Manager and the required Task
Manager, and forwards the final response back the client.

Did I miss something? Is the QueryableStateClientProxy only fetching info
from a job that is running on his local tm? If so, is there a way to
retrieve the job-graph? Or maybe another solution?

Thanks!
Pierre Zemb
​
-- 
Cordialement,
Pierre Zemb
pierrezemb.fr
Software Engineer, Metrics Data Platform @OVH


Re: getting error while running flink job on yarn cluster

2018-08-21 Thread yuvraj singh
it works , but now i am getting

org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot
instantiate user function.
at 
org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:239)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.(OperatorChain.java:104)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:267)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.InvalidClassException:
com.fasterxml.jackson.databind.ObjectMapper; local class incompatible:
stream classdesc serialVersionUID = 1, local class serialVersionUID =
2
at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:616)
at 
java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1630)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1521)
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1781)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373)
at 
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:502)
at 
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:489)
at 
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:477)
at 
org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:438)
at 
org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:224)
... 4 more


thanks

Yubraj Singh



On Tue, Aug 21, 2018 at 10:54 PM Gary Yao  wrote:

> Hi Yubraj Singh,
>
> Can you try submitting with HADOOP_CLASSPATH=`hadoop classpath` set? [1]
> For example:
>   HADOOP_CLASSPATH=`hadoop classpath` bin/flink run [...]
>
> Best,
> Gary
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/ops/deployment/hadoop.html#configuring-flink-with-hadoop-classpaths
>
> On Tue, Aug 21, 2018 at 4:23 PM, yuvraj singh <19yuvrajsing...@gmail.com>
> wrote:
>
>> Hi ,
>>
>> i am getting a error while running a flink job on yarn cluster , its
>> running fine when i run it on flink standalone cluster
>>
>> java.lang.NoClassDefFoundError:
>> com/sun/jersey/core/util/FeaturesAndProperties
>>
>> at java.lang.ClassLoader.defineClass1(Native Method)
>>
>> at java.lang.ClassLoader.defineClass(ClassLoader.java:763)
>>
>> at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
>>
>> at java.net.URLClassLoader.defineClass(URLClassLoader.java:467)
>>
>> at java.net.URLClassLoader.access$100(URLClassLoader.java:73)
>>
>> at java.net.URLClassLoader$1.run(URLClassLoader.java:368)
>>
>> at java.net.URLClassLoader$1.run(URLClassLoader.java:362)
>>
>> at java.security.AccessController.doPrivileged(Native Method)
>>
>> at java.net.URLClassLoader.findClass(URLClassLoader.java:361)
>>
>> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>>
>> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:335)
>>
>> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>>
>> at
>> org.apache.hadoop.yarn.client.api.TimelineClient.createTimelineClient(TimelineClient.java:55)
>>
>> at
>> org.apache.hadoop.yarn.client.api.impl.YarnClientImpl.createTimelineClient(YarnClientImpl.java:181)
>>
>> at
>> org.apache.hadoop.yarn.client.api.impl.YarnClientImpl.serviceInit(YarnClientImpl.java:168)
>>
>> at
>> org.apache.hadoop.service.AbstractService.init(AbstractService.java:163)
>>
>> at
>> 

Re: getting error while running flink job on yarn cluster

2018-08-21 Thread Gary Yao
Hi Yubraj Singh,

Can you try submitting with HADOOP_CLASSPATH=`hadoop classpath` set? [1]
For example:
  HADOOP_CLASSPATH=`hadoop classpath` bin/flink run [...]

Best,
Gary

[1]
https://ci.apache.org/projects/flink/flink-docs-master/ops/deployment/hadoop.html#configuring-flink-with-hadoop-classpaths

On Tue, Aug 21, 2018 at 4:23 PM, yuvraj singh <19yuvrajsing...@gmail.com>
wrote:

> Hi ,
>
> i am getting a error while running a flink job on yarn cluster , its
> running fine when i run it on flink standalone cluster
>
> java.lang.NoClassDefFoundError: com/sun/jersey/core/util/
> FeaturesAndProperties
>
> at java.lang.ClassLoader.defineClass1(Native Method)
>
> at java.lang.ClassLoader.defineClass(ClassLoader.java:763)
>
> at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
>
> at java.net.URLClassLoader.defineClass(URLClassLoader.java:467)
>
> at java.net.URLClassLoader.access$100(URLClassLoader.java:73)
>
> at java.net.URLClassLoader$1.run(URLClassLoader.java:368)
>
> at java.net.URLClassLoader$1.run(URLClassLoader.java:362)
>
> at java.security.AccessController.doPrivileged(Native Method)
>
> at java.net.URLClassLoader.findClass(URLClassLoader.java:361)
>
> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>
> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:335)
>
> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>
> at org.apache.hadoop.yarn.client.api.TimelineClient.createTimelineClient(
> TimelineClient.java:55)
>
> at org.apache.hadoop.yarn.client.api.impl.YarnClientImpl.
> createTimelineClient(YarnClientImpl.java:181)
>
> at org.apache.hadoop.yarn.client.api.impl.YarnClientImpl.
> serviceInit(YarnClientImpl.java:168)
>
> at org.apache.hadoop.service.AbstractService.init(
> AbstractService.java:163)
>
> at org.apache.flink.yarn.cli.FlinkYarnSessionCli.getClusterDescriptor(
> FlinkYarnSessionCli.java:966)
>
> at org.apache.flink.yarn.cli.FlinkYarnSessionCli.createDescriptor(
> FlinkYarnSessionCli.java:269)
>
> at org.apache.flink.yarn.cli.FlinkYarnSessionCli.createClusterDescriptor(
> FlinkYarnSessionCli.java:444)
>
> at org.apache.flink.yarn.cli.FlinkYarnSessionCli.createClusterDescriptor(
> FlinkYarnSessionCli.java:92)
>
> at org.apache.flink.client.cli.CliFrontend.runProgram(
> CliFrontend.java:225)
>
> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:214)
>
> at org.apache.flink.client.cli.CliFrontend.parseParameters(
> CliFrontend.java:1025)
>
> at org.apache.flink.client.cli.CliFrontend.lambda$main$9(
> CliFrontend.java:1101)
>
> at java.security.AccessController.doPrivileged(Native Method)
>
> at javax.security.auth.Subject.doAs(Subject.java:422)
>
> at org.apache.hadoop.security.UserGroupInformation.doAs(
> UserGroupInformation.java:1754)
>
> at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(
> HadoopSecurityContext.java:41)
>
> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1101)
>
> Caused by: java.lang.ClassNotFoundException: com.sun.jersey.core.util.
> FeaturesAndProperties
>
> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>
> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>
> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:335)
>
> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>
> ... 29 more
>
>
> please help me
>
>
> thanks
>
> Yubraj singh
>


Re: getting error while running flink job on yarn cluster

2018-08-21 Thread yuvraj singh
It's stage env for the company and other jobs are running on the same so I
can't change anything

Thanks
Yubraj Singh

On Tue, Aug 21, 2018, 9:31 PM Paul Lam  wrote:

> Hi yuvraj,
>
> Please try turning off timeline server in yarn-site.xml. Currently Flink
> does not ship the required dependencies for timeline server, which I think
> could be a bug.
>
> Best Regards,
> Paul Lam
>
> 在 2018年8月21日,22:23,yuvraj singh <19yuvrajsing...@gmail.com> 写道:
>
> Hi ,
>
> i am getting a error while running a flink job on yarn cluster , its
> running fine when i run it on flink standalone cluster
>
> java.lang.NoClassDefFoundError:
> com/sun/jersey/core/util/FeaturesAndProperties
> at java.lang.ClassLoader.defineClass1(Native Method)
> at java.lang.ClassLoader.defineClass(ClassLoader.java:763)
> at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
> at java.net.URLClassLoader.defineClass(URLClassLoader.java:467)
> at java.net.URLClassLoader.access$100(URLClassLoader.java:73)
> at java.net.URLClassLoader$1.run(URLClassLoader.java:368)
> at java.net.URLClassLoader$1.run(URLClassLoader.java:362)
> at java.security.AccessController.doPrivileged(Native Method)
> at java.net.URLClassLoader.findClass(URLClassLoader.java:361)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:335)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> at
> org.apache.hadoop.yarn.client.api.TimelineClient.createTimelineClient(TimelineClient.java:55)
> at
> org.apache.hadoop.yarn.client.api.impl.YarnClientImpl.createTimelineClient(YarnClientImpl.java:181)
> at
> org.apache.hadoop.yarn.client.api.impl.YarnClientImpl.serviceInit(YarnClientImpl.java:168)
> at org.apache.hadoop.service.AbstractService.init(AbstractService.java:163)
> at
> org.apache.flink.yarn.cli.FlinkYarnSessionCli.getClusterDescriptor(FlinkYarnSessionCli.java:966)
> at
> org.apache.flink.yarn.cli.FlinkYarnSessionCli.createDescriptor(FlinkYarnSessionCli.java:269)
> at
> org.apache.flink.yarn.cli.FlinkYarnSessionCli.createClusterDescriptor(FlinkYarnSessionCli.java:444)
> at
> org.apache.flink.yarn.cli.FlinkYarnSessionCli.createClusterDescriptor(FlinkYarnSessionCli.java:92)
> at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:225)
> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:214)
> at
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1025)
> at
> org.apache.flink.client.cli.CliFrontend.lambda$main$9(CliFrontend.java:1101)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
> at
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1101)
> Caused by: java.lang.ClassNotFoundException:
> com.sun.jersey.core.util.FeaturesAndProperties
> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:335)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> ... 29 more
>
> please help me
>
> thanks
> Yubraj singh
>
>
>


Re: getting error while running flink job on yarn cluster

2018-08-21 Thread Paul Lam
Hi yuvraj,

Please try turning off timeline server in yarn-site.xml. Currently Flink does 
not ship the required dependencies for timeline server, which I think could be 
a bug.

Best Regards,
Paul Lam

> 在 2018年8月21日,22:23,yuvraj singh <19yuvrajsing...@gmail.com> 写道:
> 
> Hi , 
> 
> i am getting a error while running a flink job on yarn cluster , its running 
> fine when i run it on flink standalone cluster 
> 
> java.lang.NoClassDefFoundError: com/sun/jersey/core/util/FeaturesAndProperties
>   at java.lang.ClassLoader.defineClass1(Native Method)
>   at java.lang.ClassLoader.defineClass(ClassLoader.java:763)
>   at 
> java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
>   at java.net.URLClassLoader.defineClass(URLClassLoader.java:467)
>   at java.net.URLClassLoader.access$100(URLClassLoader.java:73)
>   at java.net.URLClassLoader$1.run(URLClassLoader.java:368)
>   at java.net.URLClassLoader$1.run(URLClassLoader.java:362)
>   at java.security.AccessController.doPrivileged(Native Method)
>   at java.net.URLClassLoader.findClass(URLClassLoader.java:361)
>   at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>   at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:335)
>   at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>   at 
> org.apache.hadoop.yarn.client.api.TimelineClient.createTimelineClient(TimelineClient.java:55)
>   at 
> org.apache.hadoop.yarn.client.api.impl.YarnClientImpl.createTimelineClient(YarnClientImpl.java:181)
>   at 
> org.apache.hadoop.yarn.client.api.impl.YarnClientImpl.serviceInit(YarnClientImpl.java:168)
>   at 
> org.apache.hadoop.service.AbstractService.init(AbstractService.java:163)
>   at 
> org.apache.flink.yarn.cli.FlinkYarnSessionCli.getClusterDescriptor(FlinkYarnSessionCli.java:966)
>   at 
> org.apache.flink.yarn.cli.FlinkYarnSessionCli.createDescriptor(FlinkYarnSessionCli.java:269)
>   at 
> org.apache.flink.yarn.cli.FlinkYarnSessionCli.createClusterDescriptor(FlinkYarnSessionCli.java:444)
>   at 
> org.apache.flink.yarn.cli.FlinkYarnSessionCli.createClusterDescriptor(FlinkYarnSessionCli.java:92)
>   at 
> org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:225)
>   at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:214)
>   at 
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1025)
>   at 
> org.apache.flink.client.cli.CliFrontend.lambda$main$9(CliFrontend.java:1101)
>   at java.security.AccessController.doPrivileged(Native Method)
>   at javax.security.auth.Subject.doAs(Subject.java:422)
>   at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
>   at 
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>   at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1101)
> Caused by: java.lang.ClassNotFoundException: 
> com.sun.jersey.core.util.FeaturesAndProperties
>   at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>   at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>   at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:335)
>   at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>   ... 29 more
> 
> please help me 
> 
> thanks 
> Yubraj singh 



Re: classloading strangeness with Avro in Flink

2018-08-21 Thread Cliff Resnick
Solved this by moving flink-avro to lib and reverting to
`classloader.resolve-order: parent-first`.  I still don't know why, but I
guess if you're reading Avro both from file and Kafka in the same pipeline
then inverted class loader delegation will not work. Thanks, Vino for your
help!

On Tue, Aug 21, 2018 at 8:02 AM Cliff Resnick  wrote:

> Hi Aljoscha,
>
> We need flink-shaded-hadoop2-uber.jar because there is no hadoop distro on
> the instance the Flink session/jobs is managed from and the process that
> launches Flink is not a java process, but execs a process that calls the
> flink script.
>
> -Cliff
>
> On Tue, Aug 21, 2018 at 5:11 AM Aljoscha Krettek 
> wrote:
>
>> Hi Cliff,
>>
>> Do you actually need the flink-shaded-hadoop2-uber.jar in lib. If you're
>> running on YARN, you should be able to just remove them because with YARN
>> you will have Hadoop in the classpath anyways.
>>
>> Aljoscha
>>
>> On 21. Aug 2018, at 03:45, vino yang  wrote:
>>
>> Hi Cliff,
>>
>> If so, you can explicitly exclude Avro's dependencies from related
>> dependencies (using ) and then directly introduce dependencies on
>> the Avro version you need.
>>
>> Thanks, vino.
>>
>> Cliff Resnick  于2018年8月21日周二 上午5:13写道:
>>
>>> Hi Vino,
>>>
>>> Unfortunately, I'm still stuck here. By moving the avro dependency chain
>>> to lib (and removing it from user jar), my OCFs decode but I get the error
>>> described here:
>>>
>>> https://github.com/confluentinc/schema-registry/pull/509
>>>
>>> However, the Flink fix described in the PR above was to move the Avro
>>> dependency to the user jar. However, since I'm using YARN, I'm required to
>>> have flink-shaded-hadoop2-uber.jar loaded from lib -- and that has
>>> avro bundled un-shaded. So I'm back to the start problem...
>>>
>>> Any advice is welcome!
>>>
>>> -Cliff
>>>
>>>
>>> On Mon, Aug 20, 2018 at 1:42 PM Cliff Resnick  wrote:
>>>
 Hi Vino,

 You were right in your assumption -- unshaded avro was being added to
 our application jar via third-party dependency. Excluding it in packaging
 fixed the issue. For the record, it looks flink-avro must be loaded from
 the lib or there will be errors in checkpoint restores.

 On Mon, Aug 20, 2018 at 8:43 AM Cliff Resnick  wrote:

> Hi Vino,
>
> Thanks for the explanation, but the job only ever uses the Avro
> (1.8.2) pulled in by flink-formats/avro, so it's not a class version
> conflict there.
>
> I'm using default child-first loading. It might be a further
> transitive dependency, though it's not clear by stack trace or stepping
> through the process. When I get a chance I'll look further into it but in
> case anyone is experiencing similar problems, what is clear is that
> classloader order does matter with Avro.
>
> On Sun, Aug 19, 2018, 11:36 PM vino yang 
> wrote:
>
>> Hi Cliff,
>>
>> My personal guess is that this may be caused by Job's Avro conflict
>> with the Avro that the Flink framework itself relies on.
>> Flink has provided some configuration parameters which allows you to
>> determine the order of the classloaders yourself. [1]
>> Alternatively, you can debug classloading and participate in the
>> documentation.[2]
>>
>> [1]:
>> https://ci.apache.org/projects/flink/flink-docs-release-1.6/ops/config.html
>> [2]:
>> https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/debugging_classloading.html
>>
>> Thanks, vino.
>>
>> Cliff Resnick  于2018年8月20日周一 上午10:40写道:
>>
>>> Our Flink/YARN pipeline has been reading Avro from Kafka for a while
>>> now. We just introduced a source of Avro OCF (Object Container Files) 
>>> read
>>> from S3. The Kafka Avro continued to decode without incident, but the 
>>> OCF
>>> files failed 100% with anomalous parse errors in the decoding phase 
>>> after
>>> the schema and codec were successfully read from them. The pipeline 
>>> would
>>> work on my laptop, and when I submitted a test Main program to the Flink
>>> Session in YARN, that would also successfully decode. Only the actual
>>> pipeline run from the TaskManager failed. At one point I even remote
>>> debugged the TaskManager process and stepped through what looked like a
>>> normal Avro decode (if you can describe Avro code as normal!) -- until 
>>> it
>>> abruptly failed with an int decode or what-have-you.
>>>
>>> This stumped me for a while, but I finally tried moving
>>> flink-avro.jar from the lib to the application jar, and that fixed it. 
>>> I'm
>>> not sure why this is, especially since there were no typical
>>> classloader-type errors.  This issue was observed both on Flink 1.5 and 
>>> 1.6
>>> in Flip-6 mode.
>>>
>>> -Cliff
>>>
>>>
>>>
>>>
>>>
>>>
>>


Re: Unable to start Flink HA cluster with Zookeeper

2018-08-21 Thread mozer
Yeah, you are right. I have already tried to set up jobmanager.rpc.adress and
it works in that case, but if I use this setting I will not be able to use
HA, am i right ?
How the job manager can register to zookeeper with the right address but not
localhost ? 





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


getting error while running flink job on yarn cluster

2018-08-21 Thread yuvraj singh
Hi ,

i am getting a error while running a flink job on yarn cluster , its
running fine when i run it on flink standalone cluster

java.lang.NoClassDefFoundError:
com/sun/jersey/core/util/FeaturesAndProperties

at java.lang.ClassLoader.defineClass1(Native Method)

at java.lang.ClassLoader.defineClass(ClassLoader.java:763)

at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)

at java.net.URLClassLoader.defineClass(URLClassLoader.java:467)

at java.net.URLClassLoader.access$100(URLClassLoader.java:73)

at java.net.URLClassLoader$1.run(URLClassLoader.java:368)

at java.net.URLClassLoader$1.run(URLClassLoader.java:362)

at java.security.AccessController.doPrivileged(Native Method)

at java.net.URLClassLoader.findClass(URLClassLoader.java:361)

at java.lang.ClassLoader.loadClass(ClassLoader.java:424)

at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:335)

at java.lang.ClassLoader.loadClass(ClassLoader.java:357)

at
org.apache.hadoop.yarn.client.api.TimelineClient.createTimelineClient(TimelineClient.java:55)

at
org.apache.hadoop.yarn.client.api.impl.YarnClientImpl.createTimelineClient(YarnClientImpl.java:181)

at
org.apache.hadoop.yarn.client.api.impl.YarnClientImpl.serviceInit(YarnClientImpl.java:168)

at org.apache.hadoop.service.AbstractService.init(AbstractService.java:163)

at
org.apache.flink.yarn.cli.FlinkYarnSessionCli.getClusterDescriptor(FlinkYarnSessionCli.java:966)

at
org.apache.flink.yarn.cli.FlinkYarnSessionCli.createDescriptor(FlinkYarnSessionCli.java:269)

at
org.apache.flink.yarn.cli.FlinkYarnSessionCli.createClusterDescriptor(FlinkYarnSessionCli.java:444)

at
org.apache.flink.yarn.cli.FlinkYarnSessionCli.createClusterDescriptor(FlinkYarnSessionCli.java:92)

at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:225)

at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:214)

at
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1025)

at
org.apache.flink.client.cli.CliFrontend.lambda$main$9(CliFrontend.java:1101)

at java.security.AccessController.doPrivileged(Native Method)

at javax.security.auth.Subject.doAs(Subject.java:422)

at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)

at
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)

at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1101)

Caused by: java.lang.ClassNotFoundException:
com.sun.jersey.core.util.FeaturesAndProperties

at java.net.URLClassLoader.findClass(URLClassLoader.java:381)

at java.lang.ClassLoader.loadClass(ClassLoader.java:424)

at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:335)

at java.lang.ClassLoader.loadClass(ClassLoader.java:357)

... 29 more


please help me


thanks

Yubraj singh


Re: [ANNOUNCE] Apache Flink 1.5.3 released

2018-08-21 Thread Till Rohrmann
Great news. Thanks a lot for managing the release Chesnay and to all who
have contributed to this release.

Cheers,
Till

On Tue, Aug 21, 2018 at 2:12 PM Chesnay Schepler  wrote:

> |The Apache Flink community is very happy to announce the release of
> Apache Flink 1.5.3, which is the third bugfix release for the Apache
> Flink 1.5 series.
>
> Apache Flink® is an open-source stream processing framework for
> distributed, high-performing, always-available, and accurate data
> streaming applications.
>
> The release is available for download at:
> https://flink.apache.org/downloads.html
>
> Please check out the release blog post for an overview of the
> improvements for this bugfix release:
> https://flink.apache.org/news/2018/08/21/release-1.5.3.html
>
> The full release notes are available in Jira:
>
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12343777
>
> We would like to thank all contributors of the Apache Flink community
> who made this release possible!
>
> Regards,
> Chesnay|
>


Re: Semantic when table joins table from window

2018-08-21 Thread 徐涛
Hi Fabian,
So maybe I can not join a table that generate from a window, because 
the table is getting larger and larger as the time goes, maybe the system will 
crash one day. 

I am working on a system that calculate the “score" of article, which 
is consist of the count of article praise, the count of article response, etc
Because I can not use flink to save all the article, I decide to update 
the score of the article that created in 3 days.

I have two choises,
1. join the article table and praise table, response table then window
select a.article_id, count(p.praise_id) as pCount, 
count(r.response_id) as rCount
from
article a
left join
praise p on a.article_id = p.article_id
left join
response r on a.article_id = r.article_id
group by hop(updated_time, interval '1' minute,interval '3' 
day) , article_id
2. window the article table, window the priase table, window the 
response table ,then join them together
select aAggr.article_id, pAggr.pCount, rAggr.rCount
(select article_id from article group by hop(updated_time, 
interval '1' minute,interval '3' day) , article_id) aAggr
left join
(select article_id,count(praise_id) as pCount from praise group 
by hop(updated_time, interval '1' minute,interval '3' day) , article_id) pAggr 
on aAggr.article_id=pAggr.article_id
left join
(select article_id,count(response_id) as rCount from response 
group by hop(updated_time, interval '1' minute,interval '3' day) , article_id) 
rAggr on aAggr.article_id=rAggr.article_id

Maybe I should choose 1,   join then window, but not window then join.  
 
Please correct me if I am wrong.

I have some worries when choose 1,
I do not know how Flink works internally, it seems that in the sql , 
table article ,table praise, table response is growing as the time goes by, 
will it introduce performance issue? 

Best,
Henry

> 在 2018年8月21日,下午9:29,Hequn Cheng  写道:
> 
> Hi Henry,
> 
> praiseAggr is an append table, so it contains "100,101,102,100,101,103,100".
> 1. if you change your sql to s"SELECT article_id FROM praise GROUP BY 
> article_id", the answer is "101,102,103"
> 2. if you change your sql to s"SELECT last_value(article_id) FROM praise", 
> the answer is "100" 
> 
> Best, Hequn
> 
> On Tue, Aug 21, 2018 at 8:52 PM, 徐涛  > wrote:
> Hi Fabian,
>   Thanks for your response. This question puzzles me for quite a long 
> time.
>   If the praiseAggr has the following value:
>   window-1 100,101,102
>   window-2100,101,103
>   window-3100
> 
>   the last time the article table joins praiseAggr, which of the 
> following value does praiseAggr table has?
>   1—  100,101,102,100,101,103,100   collect all the element 
> of all the window
>   2—  100the element 
> of the latest window
>   3—  101,102,103the distinct value 
> of all the window
> 
> 
> Best,
> Henry
> 
> 
>> 在 2018年8月21日,下午8:02,Fabian Hueske > > 写道:
>> 
>> Hi,
>> 
>> The semantics of a query do not depend on the way that it is used.
>> praiseAggr is a table that grows by one row per second and article_id. If 
>> you use that table in a join, the join will fully materialize the table.
>> This is a special case because the same row is added multiple times, so the 
>> state won't grow that quickly, but the performance will decrease because for 
>> each row from article will join with multiple (a growing number) of rows 
>> from praiseAggr.
>> 
>> Best, Fabian
>> 
>> 2018-08-21 12:19 GMT+02:00 徐涛 > >:
>> Hi All,
>>  var praiseAggr = tableEnv.sqlQuery(s"SELECT article_id FROM praise 
>> GROUP BY HOP(updated_time, INTERVAL '1' SECOND,INTERVAL '3' MINUTE) , 
>> article_id" )
>>  tableEnv.registerTable("praiseAggr", praiseAggr)
>> var finalTable = tableEnv.sqlQuery(s”SELECT 1 FROM article a join 
>> praiseAggr p on a.article_id=p.article_id" )
>> tableEnv.registerTable("finalTable", finalTable)
>>   I know that praiseAggr, if written to sink, is append mode , so if a 
>> table joins praiseAggr, what the table “see”, is a table contains the latest 
>> value, or a table that grows larger and larger? If it is the later, will it 
>> introduce performance problem?
>>   Thanks a lot.
>> 
>> 
>> Best, 
>> Henry
>> 
> 
> 



Re: Unable to start Flink HA cluster with Zookeeper

2018-08-21 Thread Dawid Wysakowicz
Hi,
In your case the jobmanager binds itself to localhost and that's what it
writes to zookeeper. Try starting the jobmanager manually with
jobmanager.rpc.address set to the ip of machine you are running the
jobmanager.  In other words make sure the jobmanager binds itself to the
right ip.

Regards
Dawid

On Tue, 21 Aug 2018 at 15:32, mozer 
wrote:

> FQD or full ip; tried all of them, still no changes ...
> For ssh connection, I can connect to each machine without passwords.
>
>
> Do you think that the problem can come from :
>
> *high-availability.storageDir: file:///shareflink/recovery* ?
>
> I don't use a HDFS storage but NAS file system which is common for two
> machines.
>
> I also added ;
>
>
> state.backend: filesystem
> state.checkpoints.fs.dir: file:///shareflink/recovery/checkpoint
> blob.storage.directory: file:///shareflink/recovery/blob
>
> Logs for zookeeper file :
>
> 2018-08-21 14:59:32,652 INFO
> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.server.ZooKeeperServer
>
> - tickTime set to 2000
> 2018-08-21 14:59:32,653 INFO
> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.server.ZooKeeperServer
>
> - minSessionTimeout set to -1
> 2018-08-21 14:59:32,653 INFO
> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.server.ZooKeeperServer
>
> - maxSessionTimeout set to -1
> 2018-08-21 14:59:32,661 INFO
> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.server.NIOServerCnxnFactory
>
> - binding to port 0.0.0.0/0.0.0.0:2181
> 2018-08-21 14:59:39,940 INFO
> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.server.NIOServerCnxnFactory
>
> - Accepted socket connection from /Machine1:60186
> 2018-08-21 14:59:40,015 INFO
> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.server.NIOServerCnxnFactory
>
> - Accepted socket connection from /Machine2:54466
> 2018-08-21 14:59:40,017 INFO
> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.server.ZooKeeperServer
>
> - Client attempting to establish new session at /Machine1:60186
> 2018-08-21 14:59:40,017 INFO
> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.server.ZooKeeperServer
>
> - Client attempting to establish new session at /Machine2:54466
>
> Log for Job Manager :
>
> 2018-08-21 14:59:39,327 INFO
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Trying to
> start actor system at 127.0.0.1:50101
> 2018-08-21 14:59:39,723 INFO  akka.event.slf4j.Slf4jLogger
>
> - Slf4jLogger started
> 2018-08-21 14:59:39,766 INFO  akka.remote.Remoting
>
> - Starting remoting
> 2018-08-21 14:59:39,859 INFO  akka.remote.Remoting
>
> - Remoting started; listening on addresses
> :[akka.tcp://flink@127.0.0.1:50101]
> 2018-08-21 14:59:39,865 INFO
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Actor
> system
> started at akka.tcp://flink@127.0.0.1:50101
> 2018-08-21 14:59:39,872 INFO
> org.apache.flink.runtime.blob.FileSystemBlobStore - Creating
> highly available BLOB storage directory at
> file:///shareflink/recovery///blob
> 2018-08-21 14:59:39,876 INFO
> org.apache.flink.runtime.util.ZooKeeperUtils
> - Enforcing default ACL for ZK connections
> 2018-08-21 14:59:39,876 INFO
> org.apache.flink.runtime.util.ZooKeeperUtils
> - Using '/usr/flink-1.5.1/' as Zookeeper namespace.
> 2018-08-21 14:59:39,919 INFO
> org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CuratorFrameworkImpl
>
> - Starting
>
>
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: Unable to start Flink HA cluster with Zookeeper

2018-08-21 Thread mozer
FQD or full ip; tried all of them, still no changes ... 
For ssh connection, I can connect to each machine without passwords. 


Do you think that the problem can come from : 

*high-availability.storageDir: file:///shareflink/recovery* ? 

I don't use a HDFS storage but NAS file system which is common for two
machines. 

I also added ; 


state.backend: filesystem
state.checkpoints.fs.dir: file:///shareflink/recovery/checkpoint
blob.storage.directory: file:///shareflink/recovery/blob

Logs for zookeeper file : 

2018-08-21 14:59:32,652 INFO 
org.apache.flink.shaded.zookeeper.org.apache.zookeeper.server.ZooKeeperServer 
- tickTime set to 2000
2018-08-21 14:59:32,653 INFO 
org.apache.flink.shaded.zookeeper.org.apache.zookeeper.server.ZooKeeperServer 
- minSessionTimeout set to -1
2018-08-21 14:59:32,653 INFO 
org.apache.flink.shaded.zookeeper.org.apache.zookeeper.server.ZooKeeperServer 
- maxSessionTimeout set to -1
2018-08-21 14:59:32,661 INFO 
org.apache.flink.shaded.zookeeper.org.apache.zookeeper.server.NIOServerCnxnFactory
 
- binding to port 0.0.0.0/0.0.0.0:2181
2018-08-21 14:59:39,940 INFO 
org.apache.flink.shaded.zookeeper.org.apache.zookeeper.server.NIOServerCnxnFactory
 
- Accepted socket connection from /Machine1:60186
2018-08-21 14:59:40,015 INFO 
org.apache.flink.shaded.zookeeper.org.apache.zookeeper.server.NIOServerCnxnFactory
 
- Accepted socket connection from /Machine2:54466
2018-08-21 14:59:40,017 INFO 
org.apache.flink.shaded.zookeeper.org.apache.zookeeper.server.ZooKeeperServer 
- Client attempting to establish new session at /Machine1:60186
2018-08-21 14:59:40,017 INFO 
org.apache.flink.shaded.zookeeper.org.apache.zookeeper.server.ZooKeeperServer 
- Client attempting to establish new session at /Machine2:54466

Log for Job Manager : 

2018-08-21 14:59:39,327 INFO 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Trying to
start actor system at 127.0.0.1:50101
2018-08-21 14:59:39,723 INFO  akka.event.slf4j.Slf4jLogger  
   
- Slf4jLogger started
2018-08-21 14:59:39,766 INFO  akka.remote.Remoting  
   
- Starting remoting
2018-08-21 14:59:39,859 INFO  akka.remote.Remoting  
   
- Remoting started; listening on addresses
:[akka.tcp://flink@127.0.0.1:50101]
2018-08-21 14:59:39,865 INFO 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Actor system
started at akka.tcp://flink@127.0.0.1:50101
2018-08-21 14:59:39,872 INFO 
org.apache.flink.runtime.blob.FileSystemBlobStore - Creating
highly available BLOB storage directory at
file:///shareflink/recovery///blob
2018-08-21 14:59:39,876 INFO  org.apache.flink.runtime.util.ZooKeeperUtils  
   
- Enforcing default ACL for ZK connections
2018-08-21 14:59:39,876 INFO  org.apache.flink.runtime.util.ZooKeeperUtils  
   
- Using '/usr/flink-1.5.1/' as Zookeeper namespace.
2018-08-21 14:59:39,919 INFO 
org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CuratorFrameworkImpl
 
- Starting





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Semantic when table joins table from window

2018-08-21 Thread Hequn Cheng
Hi Henry,

praiseAggr is an append table, so it contains "100,101,102,100,101,103,100".
1. if you change your sql to s"SELECT article_id FROM praise GROUP BY
article_id", the answer is "101,102,103"
2. if you change your sql to s"SELECT last_value(article_id) FROM praise",
the answer is "100"

Best, Hequn

On Tue, Aug 21, 2018 at 8:52 PM, 徐涛  wrote:

> Hi Fabian,
> Thanks for your response. This question puzzles me for quite a long time.
> If the praiseAggr has the following value:
> window-1 100,101,102
> window-2 100,101,103
> window-3 100
>
> the last time the article table joins praiseAggr, which of the following
> value does praiseAggr table has?
> 1— 100,101,102,100,101,103,100   collect all the element of all
> the window
> 2—  100the element of the latest window
> 3—  101,102,103the distinct value of all the window
>
>
> Best,
> Henry
>
>
> 在 2018年8月21日,下午8:02,Fabian Hueske  写道:
>
> Hi,
>
> The semantics of a query do not depend on the way that it is used.
> praiseAggr is a table that grows by one row per second and article_id. If
> you use that table in a join, the join will fully materialize the table.
> This is a special case because the same row is added multiple times, so
> the state won't grow that quickly, but the performance will decrease
> because for each row from article will join with multiple (a growing
> number) of rows from praiseAggr.
>
> Best, Fabian
>
> 2018-08-21 12:19 GMT+02:00 徐涛 :
>
>> Hi All,
>> var praiseAggr = tableEnv.sqlQuery(s"SELECT article_id FROM praise GROUP
>> BY HOP(updated_time, INTERVAL '1' SECOND,INTERVAL '3' MINUTE) , article_id"
>> )
>> tableEnv.registerTable("praiseAggr", praiseAggr)
>>
>> var finalTable = tableEnv.sqlQuery(*s**”**SELECT 1 FROM article a join 
>> praiseAggr p on a.article_id=p.article_id" *)
>> tableEnv.registerTable("finalTable", finalTable)
>>
>>  I know that praiseAggr, if written to sink, is append mode , so if a
>> table joins praiseAggr, what the table “see”, is a table contains the
>> latest value, or a table that grows larger and larger? If it is the later,
>> will it introduce performance problem?
>>  Thanks a lot.
>>
>>
>> Best,
>> Henry
>>
>
>
>


Re: Unable to start Flink HA cluster with Zookeeper

2018-08-21 Thread miki haiat
First of all try with  FQD or full ip.
Also in order to run HA cluster you need to make sure that you have
password less ssh access to your slaves and master communication.   .

On Tue, Aug 21, 2018 at 4:15 PM mozer 
wrote:

> I am trying to install a Flink HA cluster (Zookeeper mode) but the task
> manager cannot find the job manager.
>
> Here I give you the architecture;
>
> - Machine 1 : Job Manager + Zookeeper
> - Machine 2 : Task Manager
>
> masters:
>
> Machine1
>
> slaves :
>
> Machine2
>
> flink-conf.yaml:
>
> #jobmanager.rpc.address: localhost
> jobmanager.rpc.port: 6123
> blob.server.port: 50100-50200
> taskmanager.data.port: 6121
> high-availability: zookeeper
> high-availability.zookeeper.quorum: Machine1:2181
> high-availability.zookeeper.path.root: /flink-1.5.1
> high-availability.cluster-id: /default_b
> high-availability.storageDir: file:///shareflink/recovery
>
> Here this is the log of Task Manager, it tries to connect to localhost
> instead of Machine1:
>
> 2018-08-17 10:46:44,875 INFO
> org.apache.flink.runtime.util.LeaderRetrievalUtils- Trying to
> select the network interface and address to use by connecting to the
> leading
> JobManager.
> 2018-08-17 10:46:44,876 INFO
> org.apache.flink.runtime.util.LeaderRetrievalUtils- TaskManager
> will try to connect for 1 milliseconds before falling back to
> heuristics
> 2018-08-17 10:46:44,966 INFO
> org.apache.flink.runtime.net.ConnectionUtils  - Retrieved
> new target address /127.0.0.1:37133.
> 2018-08-17 10:46:45,324 INFO
> org.apache.flink.runtime.net.ConnectionUtils  - Trying to
> connect to address /127.0.0.1:37133
> 2018-08-17 10:46:45,325 INFO
> org.apache.flink.runtime.net.ConnectionUtils  - Failed to
> connect from address 'Machine2/IP-Machine2': Connection refused
> 2018-08-17 10:46:45,325 INFO
> org.apache.flink.runtime.net.ConnectionUtils  - Failed to
> connect from address '/127.0.0.1': Connection refused
> 2018-08-17 10:46:45,325 INFO
> org.apache.flink.runtime.net.ConnectionUtils  - Failed to
> connect from address '/IP_Machine2': Connection refused
> 2018-08-17 10:46:45,325 INFO
> org.apache.flink.runtime.net.ConnectionUtils  - Failed to
> connect from address '/127.0.0.1': Connection refused
> 2018-08-17 10:46:45,326 INFO
> org.apache.flink.runtime.net.ConnectionUtils  - Failed to
> connect from address '/IP_Machine2': Connection refused
> 2018-08-17 10:46:45,326 INFO
> org.apache.flink.runtime.net.ConnectionUtils  - Failed to
> connect from address '/127.0.0.1': Connection refused
> 2018-08-17 10:46:45,726 INFO
> org.apache.flink.runtime.net.ConnectionUtils  - Trying to
> connect to address /127.0.0.1:37133
> 2018-08-17 10:46:45,727 INFO
> org.apache.flink.runtime.net.ConnectionUtils  - Failed to
> connect from address 'Machine2/IP-Machine2
>
> 2018-08-17 10:47:22,022 WARN  akka.remote.ReliableDeliverySupervisor
>
> - Association with remote system [akka.tcp://flink@127.0.0.1:36515] has
> failed, address is now gated for [50] ms. Reason: [Association failed with
> [akka.tcp://flink@127.0.0.1:36515]] Caused by: [Connection refused:
> /127.0.0.1:36515]
>
> 2018-08-17 10:47:22,022 INFO
> org.apache.flink.runtime.taskexecutor.TaskExecutor- Could not
> resolve ResourceManager address
> akka.tcp://flink@127.0.0.1:36515/user/resourcemanager, retrying in 1
> ms:
> Could not connect to rpc endpoint under address
> akka.tcp://flink@127.0.0.1:36515/user/resourcemanager..
> 2018-08-17 10:47:32,037 WARN
> akka.remote.transport.netty.NettyTransport
> - Remote connection to [null] failed with java.net.ConnectException:
> Connection refused: /127.0.0.1:36515
>
>
>
> PS. : **/etc/hosts** contains the **localhost, Machine1 and Machine2**
>
>
> Can you please tell me how the Task Manager can connect to Job Manager ?
>
> Regards
>
>
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Unable to start Flink HA cluster with Zookeeper

2018-08-21 Thread mozer
I am trying to install a Flink HA cluster (Zookeeper mode) but the task
manager cannot find the job manager. 

Here I give you the architecture; 

- Machine 1 : Job Manager + Zookeeper
- Machine 2 : Task Manager

masters: 

Machine1

slaves : 

Machine2

flink-conf.yaml: 

#jobmanager.rpc.address: localhost
jobmanager.rpc.port: 6123
blob.server.port: 50100-50200
taskmanager.data.port: 6121
high-availability: zookeeper
high-availability.zookeeper.quorum: Machine1:2181
high-availability.zookeeper.path.root: /flink-1.5.1
high-availability.cluster-id: /default_b
high-availability.storageDir: file:///shareflink/recovery

Here this is the log of Task Manager, it tries to connect to localhost
instead of Machine1:

2018-08-17 10:46:44,875 INFO 
org.apache.flink.runtime.util.LeaderRetrievalUtils- Trying to
select the network interface and address to use by connecting to the leading
JobManager.
2018-08-17 10:46:44,876 INFO 
org.apache.flink.runtime.util.LeaderRetrievalUtils- TaskManager
will try to connect for 1 milliseconds before falling back to heuristics
2018-08-17 10:46:44,966 INFO 
org.apache.flink.runtime.net.ConnectionUtils  - Retrieved
new target address /127.0.0.1:37133.
2018-08-17 10:46:45,324 INFO 
org.apache.flink.runtime.net.ConnectionUtils  - Trying to
connect to address /127.0.0.1:37133
2018-08-17 10:46:45,325 INFO 
org.apache.flink.runtime.net.ConnectionUtils  - Failed to
connect from address 'Machine2/IP-Machine2': Connection refused
2018-08-17 10:46:45,325 INFO 
org.apache.flink.runtime.net.ConnectionUtils  - Failed to
connect from address '/127.0.0.1': Connection refused
2018-08-17 10:46:45,325 INFO 
org.apache.flink.runtime.net.ConnectionUtils  - Failed to
connect from address '/IP_Machine2': Connection refused
2018-08-17 10:46:45,325 INFO 
org.apache.flink.runtime.net.ConnectionUtils  - Failed to
connect from address '/127.0.0.1': Connection refused
2018-08-17 10:46:45,326 INFO 
org.apache.flink.runtime.net.ConnectionUtils  - Failed to
connect from address '/IP_Machine2': Connection refused
2018-08-17 10:46:45,326 INFO 
org.apache.flink.runtime.net.ConnectionUtils  - Failed to
connect from address '/127.0.0.1': Connection refused
2018-08-17 10:46:45,726 INFO 
org.apache.flink.runtime.net.ConnectionUtils  - Trying to
connect to address /127.0.0.1:37133
2018-08-17 10:46:45,727 INFO 
org.apache.flink.runtime.net.ConnectionUtils  - Failed to
connect from address 'Machine2/IP-Machine2

2018-08-17 10:47:22,022 WARN  akka.remote.ReliableDeliverySupervisor
   
- Association with remote system [akka.tcp://flink@127.0.0.1:36515] has
failed, address is now gated for [50] ms. Reason: [Association failed with
[akka.tcp://flink@127.0.0.1:36515]] Caused by: [Connection refused:
/127.0.0.1:36515]

2018-08-17 10:47:22,022 INFO 
org.apache.flink.runtime.taskexecutor.TaskExecutor- Could not
resolve ResourceManager address
akka.tcp://flink@127.0.0.1:36515/user/resourcemanager, retrying in 1 ms:
Could not connect to rpc endpoint under address
akka.tcp://flink@127.0.0.1:36515/user/resourcemanager..
2018-08-17 10:47:32,037 WARN  akka.remote.transport.netty.NettyTransport
   
- Remote connection to [null] failed with java.net.ConnectException:
Connection refused: /127.0.0.1:36515



PS. : **/etc/hosts** contains the **localhost, Machine1 and Machine2**


Can you please tell me how the Task Manager can connect to Job Manager ? 

Regards





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Unable to start Flink HA cluster with Zookeeper

2018-08-21 Thread mozer
I am trying to install a Flink HA cluster (Zookeeper mode) but the task
manager cannot find the job manager. 

Here I give you the architecture; 

- Machine 1 : Job Manager + Zookeeper
- Machine 2 : Task Manager

masters: 

Machine1

slaves : 

Machine2

flink-conf.yaml: 

#jobmanager.rpc.address: localhost
jobmanager.rpc.port: 6123
blob.server.port: 50100-50200
taskmanager.data.port: 6121
high-availability: zookeeper
high-availability.zookeeper.quorum: Machine1:2181
high-availability.zookeeper.path.root: /flink-1.5.1
high-availability.cluster-id: /default_b
high-availability.storageDir: file:///shareflink/recovery

Here this is the log of Task Manager, it tries to connect to localhost
instead of Machine1:

2018-08-17 10:46:44,875 INFO 
org.apache.flink.runtime.util.LeaderRetrievalUtils- Trying to
select the network interface and address to use by connecting to the leading
JobManager.
2018-08-17 10:46:44,876 INFO 
org.apache.flink.runtime.util.LeaderRetrievalUtils- TaskManager
will try to connect for 1 milliseconds before falling back to heuristics
2018-08-17 10:46:44,966 INFO 
org.apache.flink.runtime.net.ConnectionUtils  - Retrieved
new target address /127.0.0.1:37133.
2018-08-17 10:46:45,324 INFO 
org.apache.flink.runtime.net.ConnectionUtils  - Trying to
connect to address /127.0.0.1:37133
2018-08-17 10:46:45,325 INFO 
org.apache.flink.runtime.net.ConnectionUtils  - Failed to
connect from address 'Machine2/IP-Machine2': Connection refused
2018-08-17 10:46:45,325 INFO 
org.apache.flink.runtime.net.ConnectionUtils  - Failed to
connect from address '/127.0.0.1': Connection refused
2018-08-17 10:46:45,325 INFO 
org.apache.flink.runtime.net.ConnectionUtils  - Failed to
connect from address '/IP_Machine2': Connection refused
2018-08-17 10:46:45,325 INFO 
org.apache.flink.runtime.net.ConnectionUtils  - Failed to
connect from address '/127.0.0.1': Connection refused
2018-08-17 10:46:45,326 INFO 
org.apache.flink.runtime.net.ConnectionUtils  - Failed to
connect from address '/IP_Machine2': Connection refused
2018-08-17 10:46:45,326 INFO 
org.apache.flink.runtime.net.ConnectionUtils  - Failed to
connect from address '/127.0.0.1': Connection refused
2018-08-17 10:46:45,726 INFO 
org.apache.flink.runtime.net.ConnectionUtils  - Trying to
connect to address /127.0.0.1:37133
2018-08-17 10:46:45,727 INFO 
org.apache.flink.runtime.net.ConnectionUtils  - Failed to
connect from address 'Machine2/IP-Machine2

2018-08-17 10:47:22,022 WARN  akka.remote.ReliableDeliverySupervisor
   
- Association with remote system [akka.tcp://flink@127.0.0.1:36515] has
failed, address is now gated for [50] ms. Reason: [Association failed with
[akka.tcp://flink@127.0.0.1:36515]] Caused by: [Connection refused:
/127.0.0.1:36515]

2018-08-17 10:47:22,022 INFO 
org.apache.flink.runtime.taskexecutor.TaskExecutor- Could not
resolve ResourceManager address
akka.tcp://flink@127.0.0.1:36515/user/resourcemanager, retrying in 1 ms:
Could not connect to rpc endpoint under address
akka.tcp://flink@127.0.0.1:36515/user/resourcemanager..
2018-08-17 10:47:32,037 WARN  akka.remote.transport.netty.NettyTransport
   
- Remote connection to [null] failed with java.net.ConnectException:
Connection refused: /127.0.0.1:36515



PS. : **/etc/hosts** contains the **localhost, Machine1 and Machine2**


Can you please tell me how the Task Manager can connect to Job Manager ? 

Regards





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Side effect of DataStreamRel#translateToPlan

2018-08-21 Thread wangsan
Hi all,

I noticed that the DataStreamRel#translateToPlan is non-idempotent, and that 
may cause the execution plan not as what we expected. Every time we call 
DataStreamRel#translateToPlan (in TableEnvirnment#explain, 
TableEnvirnment#writeToSink, etc), we add same operators in execution 
environment repeatedly. 

Should we eliminate the side effect of DataStreamRel#translateToPlan ? 

Best,  Wangsan

appendix

tenv.registerTableSource("test_source", sourceTable)

val t = tenv.sqlQuery("SELECT * from test_source")
println(tenv.explain(t))
println(tenv.explain(t))

implicit val typeInfo = TypeInformation.of(classOf[Row])
tenv.toAppendStream(t)
println(tenv.explain(t))
We call explain three times, and the Physical Execution Plan are all diffrent.

== Abstract Syntax Tree ==
LogicalProject(f1=[$0], f2=[$1])
  LogicalTableScan(table=[[test_source]])

== Optimized Logical Plan ==
StreamTableSourceScan(table=[[test_source]], fields=[f1, f2], 
source=[CsvTableSource(read fields: f1, f2)])

== Physical Execution Plan ==
Stage 1 : Data Source
content : collect elements with CollectionInputFormat

Stage 2 : Operator
content : CsvTableSource(read fields: f1, f2)
ship_strategy : FORWARD

Stage 3 : Operator
content : Map
ship_strategy : FORWARD


== Abstract Syntax Tree ==
LogicalProject(f1=[$0], f2=[$1])
  LogicalTableScan(table=[[test_source]])

== Optimized Logical Plan ==
StreamTableSourceScan(table=[[test_source]], fields=[f1, f2], 
source=[CsvTableSource(read fields: f1, f2)])

== Physical Execution Plan ==
Stage 1 : Data Source
content : collect elements with CollectionInputFormat

Stage 2 : Operator
content : CsvTableSource(read fields: f1, f2)
ship_strategy : FORWARD

Stage 3 : Operator
content : Map
ship_strategy : FORWARD

Stage 4 : Data Source
content : collect elements with CollectionInputFormat

Stage 5 : Operator
content : CsvTableSource(read fields: f1, f2)
ship_strategy : FORWARD

Stage 6 : Operator
content : Map
ship_strategy : FORWARD


== Abstract Syntax Tree ==
LogicalProject(f1=[$0], f2=[$1])
  LogicalTableScan(table=[[test_source]])

== Optimized Logical Plan ==
StreamTableSourceScan(table=[[test_source]], fields=[f1, f2], 
source=[CsvTableSource(read fields: f1, f2)])

== Physical Execution Plan ==
Stage 1 : Data Source
content : collect elements with CollectionInputFormat

Stage 2 : Operator
content : CsvTableSource(read fields: f1, f2)
ship_strategy : FORWARD

Stage 3 : Operator
content : Map
ship_strategy : FORWARD

Stage 4 : Data Source
content : collect elements with CollectionInputFormat

Stage 5 : Operator
content : CsvTableSource(read fields: f1, f2)
ship_strategy : FORWARD

Stage 6 : Operator
content : Map
ship_strategy : FORWARD

Stage 7 : Data Source
content : collect elements with CollectionInputFormat

Stage 8 : Operator
content : CsvTableSource(read fields: f1, f2)
ship_strategy : FORWARD

Stage 9 : Operator
content : Map
ship_strategy : FORWARD

Stage 10 : Operator
content : to: Row
ship_strategy : FORWARD

Stage 11 : Data Source
content : collect elements with CollectionInputFormat

Stage 12 : Operator
content : CsvTableSource(read fields: f1, f2)
ship_strategy : FORWARD

Stage 13 : Operator
content : Map
ship_strategy : FORWARD



Re: Semantic when table joins table from window

2018-08-21 Thread 徐涛
Hi Fabian,
Thanks for your response. This question puzzles me for quite a long 
time.
If the praiseAggr has the following value:
window-1 100,101,102
window-2100,101,103
window-3100

the last time the article table joins praiseAggr, which of the 
following value does praiseAggr table has?
1—  100,101,102,100,101,103,100   collect all the element 
of all the window
2—  100the element 
of the latest window
3—  101,102,103the distinct value 
of all the window


Best,
Henry

> 在 2018年8月21日,下午8:02,Fabian Hueske  写道:
> 
> Hi,
> 
> The semantics of a query do not depend on the way that it is used.
> praiseAggr is a table that grows by one row per second and article_id. If you 
> use that table in a join, the join will fully materialize the table.
> This is a special case because the same row is added multiple times, so the 
> state won't grow that quickly, but the performance will decrease because for 
> each row from article will join with multiple (a growing number) of rows from 
> praiseAggr.
> 
> Best, Fabian
> 
> 2018-08-21 12:19 GMT+02:00 徐涛  >:
> Hi All,
>   var praiseAggr = tableEnv.sqlQuery(s"SELECT article_id FROM praise 
> GROUP BY HOP(updated_time, INTERVAL '1' SECOND,INTERVAL '3' MINUTE) , 
> article_id" )
>   tableEnv.registerTable("praiseAggr", praiseAggr)
> var finalTable = tableEnv.sqlQuery(s”SELECT 1 FROM article a join 
> praiseAggr p on a.article_id=p.article_id" )
> tableEnv.registerTable("finalTable", finalTable)
>I know that praiseAggr, if written to sink, is append mode , so if a 
> table joins praiseAggr, what the table “see”, is a table contains the latest 
> value, or a table that grows larger and larger? If it is the later, will it 
> introduce performance problem?
>Thanks a lot.
> 
> 
> Best, 
> Henry
> 



[ANNOUNCE] Apache Flink 1.5.3 released

2018-08-21 Thread Chesnay Schepler
|The Apache Flink community is very happy to announce the release of 
Apache Flink 1.5.3, which is the third bugfix release for the Apache 
Flink 1.5 series.


Apache Flink® is an open-source stream processing framework for 
distributed, high-performing, always-available, and accurate data 
streaming applications.


The release is available for download at:
https://flink.apache.org/downloads.html

Please check out the release blog post for an overview of the 
improvements for this bugfix release:

https://flink.apache.org/news/2018/08/21/release-1.5.3.html

The full release notes are available in Jira:
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12343777

We would like to thank all contributors of the Apache Flink community 
who made this release possible!


Regards,
Chesnay|


Re: Semantic when table joins table from window

2018-08-21 Thread Fabian Hueske
Hi,

The semantics of a query do not depend on the way that it is used.
praiseAggr is a table that grows by one row per second and article_id. If
you use that table in a join, the join will fully materialize the table.
This is a special case because the same row is added multiple times, so the
state won't grow that quickly, but the performance will decrease because
for each row from article will join with multiple (a growing number) of
rows from praiseAggr.

Best, Fabian

2018-08-21 12:19 GMT+02:00 徐涛 :

> Hi All,
> var praiseAggr = tableEnv.sqlQuery(s"SELECT article_id FROM praise GROUP
> BY HOP(updated_time, INTERVAL '1' SECOND,INTERVAL '3' MINUTE) , article_id"
> )
> tableEnv.registerTable("praiseAggr", praiseAggr)
>
> var finalTable = tableEnv.sqlQuery(*s**”**SELECT 1 FROM article a join 
> praiseAggr p on a.article_id=p.article_id" *)
> tableEnv.registerTable("finalTable", finalTable)
>
>  I know that praiseAggr, if written to sink, is append mode , so if a
> table joins praiseAggr, what the table “see”, is a table contains the
> latest value, or a table that grows larger and larger? If it is the later,
> will it introduce performance problem?
>  Thanks a lot.
>
>
> Best,
> Henry
>


Re: classloading strangeness with Avro in Flink

2018-08-21 Thread Cliff Resnick
Hi Aljoscha,

We need flink-shaded-hadoop2-uber.jar because there is no hadoop distro on
the instance the Flink session/jobs is managed from and the process that
launches Flink is not a java process, but execs a process that calls the
flink script.

-Cliff

On Tue, Aug 21, 2018 at 5:11 AM Aljoscha Krettek 
wrote:

> Hi Cliff,
>
> Do you actually need the flink-shaded-hadoop2-uber.jar in lib. If you're
> running on YARN, you should be able to just remove them because with YARN
> you will have Hadoop in the classpath anyways.
>
> Aljoscha
>
> On 21. Aug 2018, at 03:45, vino yang  wrote:
>
> Hi Cliff,
>
> If so, you can explicitly exclude Avro's dependencies from related
> dependencies (using ) and then directly introduce dependencies on
> the Avro version you need.
>
> Thanks, vino.
>
> Cliff Resnick  于2018年8月21日周二 上午5:13写道:
>
>> Hi Vino,
>>
>> Unfortunately, I'm still stuck here. By moving the avro dependency chain
>> to lib (and removing it from user jar), my OCFs decode but I get the error
>> described here:
>>
>> https://github.com/confluentinc/schema-registry/pull/509
>>
>> However, the Flink fix described in the PR above was to move the Avro
>> dependency to the user jar. However, since I'm using YARN, I'm required to
>> have flink-shaded-hadoop2-uber.jar loaded from lib -- and that has
>> avro bundled un-shaded. So I'm back to the start problem...
>>
>> Any advice is welcome!
>>
>> -Cliff
>>
>>
>> On Mon, Aug 20, 2018 at 1:42 PM Cliff Resnick  wrote:
>>
>>> Hi Vino,
>>>
>>> You were right in your assumption -- unshaded avro was being added to
>>> our application jar via third-party dependency. Excluding it in packaging
>>> fixed the issue. For the record, it looks flink-avro must be loaded from
>>> the lib or there will be errors in checkpoint restores.
>>>
>>> On Mon, Aug 20, 2018 at 8:43 AM Cliff Resnick  wrote:
>>>
 Hi Vino,

 Thanks for the explanation, but the job only ever uses the Avro (1.8.2)
 pulled in by flink-formats/avro, so it's not a class version conflict
 there.

 I'm using default child-first loading. It might be a further transitive
 dependency, though it's not clear by stack trace or stepping through the
 process. When I get a chance I'll look further into it but in case anyone
 is experiencing similar problems, what is clear is that classloader order
 does matter with Avro.

 On Sun, Aug 19, 2018, 11:36 PM vino yang  wrote:

> Hi Cliff,
>
> My personal guess is that this may be caused by Job's Avro conflict
> with the Avro that the Flink framework itself relies on.
> Flink has provided some configuration parameters which allows you to
> determine the order of the classloaders yourself. [1]
> Alternatively, you can debug classloading and participate in the
> documentation.[2]
>
> [1]:
> https://ci.apache.org/projects/flink/flink-docs-release-1.6/ops/config.html
> [2]:
> https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/debugging_classloading.html
>
> Thanks, vino.
>
> Cliff Resnick  于2018年8月20日周一 上午10:40写道:
>
>> Our Flink/YARN pipeline has been reading Avro from Kafka for a while
>> now. We just introduced a source of Avro OCF (Object Container Files) 
>> read
>> from S3. The Kafka Avro continued to decode without incident, but the OCF
>> files failed 100% with anomalous parse errors in the decoding phase after
>> the schema and codec were successfully read from them. The pipeline would
>> work on my laptop, and when I submitted a test Main program to the Flink
>> Session in YARN, that would also successfully decode. Only the actual
>> pipeline run from the TaskManager failed. At one point I even remote
>> debugged the TaskManager process and stepped through what looked like a
>> normal Avro decode (if you can describe Avro code as normal!) -- until it
>> abruptly failed with an int decode or what-have-you.
>>
>> This stumped me for a while, but I finally tried moving
>> flink-avro.jar from the lib to the application jar, and that fixed it. 
>> I'm
>> not sure why this is, especially since there were no typical
>> classloader-type errors.  This issue was observed both on Flink 1.5 and 
>> 1.6
>> in Flip-6 mode.
>>
>> -Cliff
>>
>>
>>
>>
>>
>>
>


Re: Will idle state retention trigger retract in dynamic table?

2018-08-21 Thread Fabian Hueske
Hi,

No, I don't think this behavior is weird.

If we would retract when idle state is discarded, the result would no
longer correspond to the query.
So we would produce incorrect results even if the removed state would never
by used again.

If you want to have consistent, exact results you need to either provide
the necessary resources to hold the complete state or configure idle state
retention in a way that the deleted state is not needed again.
Another solution that is not supported yet would be to change the semantics
of your query and move records that are older than a certain threshold.
In that case, the query would only operate on tail of the stream, e.g., the
last day or week.

Best, Fabian



2018-08-21 12:03 GMT+02:00 徐涛 :

> Hi Fabian,
> Is the behavior a bit weird? Because it leads to data inconsistency.
>
> Best,
> Henry
>
>
> 在 2018年8月21日,下午5:14,Fabian Hueske  写道:
>
> Hi,
>
> In the given example, article_id 123 will always remain in the external
> storage. The state is removed and hence it cannot be retracted anymore.
> Once the state was removed and the count reaches 10, a second record for
> article_id 123 will be emitted to the data store.
>
> As soon as you enable state retention and state is needed that was
> removed, the query result can become inconsistent.
>
> Best, Fabian
>
> 2018-08-21 10:52 GMT+02:00 徐涛 :
>
>> Hi Fabian,
>> SELECT article_id FROM praise GROUP BY article_id having count(1)>=10
>> If article_id 123 has 100 praises and remains its state in the dynamic
>> table ,and when the time passed, its state is removed, but later the
>> article_id 123 has never reached to 10 praises.
>> How can other program know that the state is been removed? Because the
>> sink currently has the praises count stored as 100, it is not consistent as
>> the dynamic table.
>>
>> Best,
>> Henry
>>
>>
>> 在 2018年8月21日,下午4:16,Fabian Hueske  写道:
>>
>> Hi,
>>
>> No, it won't. I will simply remove state that has not been accessed for
>> the configured time but not change the result.
>> For example, if you have a GROUP BY aggregation and the state for a
>> grouping key is removed, the operator will start a new aggregation if a
>> record with the removed grouping key arrives.
>>
>> Idle state retention is not meant to affect the semantics of a query.
>> The semantics of updating the result should be defined in the query,
>> e.g., with a WHERE clause that removes all records that are older than 1
>> day (note, this is not supported yet).
>>
>> Best, Fabian
>>
>> 2018-08-21 10:04 GMT+02:00 徐涛 :
>>
>>> Hi All,
>>> Will idle state retention trigger retract in dynamic table?
>>>
>>> Best,
>>> Henry
>>>
>>
>>
>>
>
>


Re: Will idle state retention trigger retract in dynamic table?

2018-08-21 Thread Xingcan Cui
Hi Henry,

Idle state retention is just making a trade-off between the accuracy and the 
storage consumption. It can meet part of the calculation requirements in the 
stream environment, but not all. For instance, in your use case, if there 
exists a TTL for each article, their praise states can be safely removed after 
a period of time. Otherwise, inconsistencies are unavoidable.

We admit that there should be other state retention mechanisms which can be 
applied in different scenarios. However, for now, setting a larger retention 
time or simply omitting this config seems to be the only choices.

Best,
Xingcan

> On Aug 21, 2018, at 6:03 PM, 徐涛  wrote:
> 
> Hi Fabian,
>   Is the behavior a bit weird? Because it leads to data inconsistency.
> 
> Best,
> Henry
> 
>> 在 2018年8月21日,下午5:14,Fabian Hueske > > 写道:
>> 
>> Hi,
>> 
>> In the given example, article_id 123 will always remain in the external 
>> storage. The state is removed and hence it cannot be retracted anymore.
>> Once the state was removed and the count reaches 10, a second record for 
>> article_id 123 will be emitted to the data store.
>> 
>> As soon as you enable state retention and state is needed that was removed, 
>> the query result can become inconsistent.
>> 
>> Best, Fabian
>> 
>> 2018-08-21 10:52 GMT+02:00 徐涛 > >:
>> Hi Fabian,
>>  SELECT article_id FROM praise GROUP BY article_id having count(1)>=10
>>  If article_id 123 has 100 praises and remains its state in the dynamic 
>> table ,and when the time passed, its state is removed, but later the 
>> article_id 123 has never reached to 10 praises.
>>  How can other program know that the state is been removed? Because the 
>> sink currently has the praises count stored as 100, it is not consistent as 
>> the dynamic table.
>> 
>> Best, 
>> Henry
>> 
>> 
>>> 在 2018年8月21日,下午4:16,Fabian Hueske >> > 写道:
>>> 
>>> Hi,
>>> 
>>> No, it won't. I will simply remove state that has not been accessed for the 
>>> configured time but not change the result.
>>> For example, if you have a GROUP BY aggregation and the state for a 
>>> grouping key is removed, the operator will start a new aggregation if a 
>>> record with the removed grouping key arrives.
>>> 
>>> Idle state retention is not meant to affect the semantics of a query. 
>>> The semantics of updating the result should be defined in the query, e.g., 
>>> with a WHERE clause that removes all records that are older than 1 day 
>>> (note, this is not supported yet).
>>> 
>>> Best, Fabian
>>> 
>>> 2018-08-21 10:04 GMT+02:00 徐涛 >> >:
>>> Hi All,
>>> Will idle state retention trigger retract in dynamic table?
>>> 
>>> Best,
>>> Henry
>>> 
>> 
>> 
> 



Semantic when table joins table from window

2018-08-21 Thread 徐涛
Hi All,
var praiseAggr = tableEnv.sqlQuery(s"SELECT article_id FROM praise 
GROUP BY HOP(updated_time, INTERVAL '1' SECOND,INTERVAL '3' MINUTE) , 
article_id" )
tableEnv.registerTable("praiseAggr", praiseAggr)
var finalTable = tableEnv.sqlQuery(s”SELECT 1 FROM article a join 
praiseAggr p on a.article_id=p.article_id" )
tableEnv.registerTable("finalTable", finalTable)
 I know that praiseAggr, if written to sink, is append mode , so if a 
table joins praiseAggr, what the table “see”, is a table contains the latest 
value, or a table that grows larger and larger? If it is the later, will it 
introduce performance problem?
 Thanks a lot.


Best, 
Henry

Re: Could not cancel job (with savepoint) "Ask timed out"

2018-08-21 Thread Till Rohrmann
Just a small addition. Concurrent cancel call will interfere with the
cancel-with-savepoint command and directly cancel the job. So it is better
to use the cancel-with-savepoint call in order to take savepoint and then
cancel the job automatically.

Cheers,
Till

On Thu, Aug 9, 2018 at 9:53 AM vino yang  wrote:

> Hi Juho,
>
> We use REST client API : triggerSavepoint(), this API returns a
> CompletableFuture, then we call it's get() API.
>
> You can understand that I am waiting for it to complete in sync.
> Because cancelWithSavepoint is actually waiting for savepoint to complete
> synchronization, and then execute the cancel command.
>
> We do not use CLI. I think since you are through the CLI, you can observe
> whether the savepoint is complete by combining the log or the web UI.
>
> Thanks, vino.
>
>
> Juho Autio  于2018年8月9日周四 下午3:07写道:
>
>> Thanks for the suggestion. Is the separate savepoint triggering async?
>> Would you then separately poll for the savepoint's completion before
>> executing cancel? If additional polling is needed, then I would say that
>> for my purpose it's still easier to call cancel with savepoint and simply
>> ignore the result of the call. I would assume that it won't do any harm if
>> I keep retrying cancel with savepoint until the job stops – I expect that
>> an overlapping cancel request is ignored if the job is already creating a
>> savepoint. Please correct if my assumption is wrong.
>>
>> On Thu, Aug 9, 2018 at 5:04 AM vino yang  wrote:
>>
>>> Hi Juho,
>>>
>>> This problem does exist, I suggest you separate these two steps to
>>> temporarily deal with this problem:
>>> 1) Trigger Savepoint separately;
>>> 2) execute the cancel command;
>>>
>>> Hi Till, Chesnay:
>>>
>>> Our internal environment and multiple users on the mailing list have
>>> encountered similar problems.
>>>
>>> In our environment, it seems that JM shows that the save point is
>>> complete and JM has stopped itself, but the client will still connect to
>>> the old JM and report a timeout exception.
>>>
>>> Thanks, vino.
>>>
>>>
>>> Juho Autio  于2018年8月8日周三 下午9:18写道:
>>>
 I was trying to cancel a job with savepoint, but the CLI command failed
 with "akka.pattern.AskTimeoutException: Ask timed out".

 The stack trace reveals that ask timeout is 10 seconds:

 Caused by: akka.pattern.AskTimeoutException: Ask timed out on
 [Actor[akka://flink/user/jobmanager_0#106635280]] after [1 ms].
 Sender[null] sent message of type
 "org.apache.flink.runtime.rpc.messages.LocalFencedMessage".

 Indeed it's documented that the default value for akka.ask.timeout="10
 s" in

 https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#distributed-coordination-via-akka

 Behind the scenes the savepoint creation & job cancellation succeeded,
 that was to be expected, kind of. So my problem is just getting a proper
 response back from the CLI call instead of timing out so eagerly.

 To be exact, what I ran was:

 flink-1.5.2/bin/flink cancel b7c7d19d25e16a952d3afa32841024e5 -m
 yarn-cluster -yid application_1533676784032_0001 --withSavepoint

 Should I change the akka.ask.timeout to have a longer timeout? If yes,
 can I override it just for the CLI call somehow? Maybe it might have
 undesired side-effects if set globally for the actual flink jobs to use?

 What about akka.client.timeout? The default for it is also rather
 low: "60 s". Should it also be increased accordingly if I want to accept
 longer than 60 s for savepoint creation?

 Finally, that default timeout is so low that I would expect this to be
 a common problem. I would say that Flink CLI should have higher default
 timeout for cancel and savepoint creation ops.

 Thanks!

>>>
>>


Re: Will idle state retention trigger retract in dynamic table?

2018-08-21 Thread 徐涛
Hi Fabian,
Is the behavior a bit weird? Because it leads to data inconsistency.

Best,
Henry

> 在 2018年8月21日,下午5:14,Fabian Hueske  写道:
> 
> Hi,
> 
> In the given example, article_id 123 will always remain in the external 
> storage. The state is removed and hence it cannot be retracted anymore.
> Once the state was removed and the count reaches 10, a second record for 
> article_id 123 will be emitted to the data store.
> 
> As soon as you enable state retention and state is needed that was removed, 
> the query result can become inconsistent.
> 
> Best, Fabian
> 
> 2018-08-21 10:52 GMT+02:00 徐涛  >:
> Hi Fabian,
>   SELECT article_id FROM praise GROUP BY article_id having count(1)>=10
>   If article_id 123 has 100 praises and remains its state in the dynamic 
> table ,and when the time passed, its state is removed, but later the 
> article_id 123 has never reached to 10 praises.
>   How can other program know that the state is been removed? Because the 
> sink currently has the praises count stored as 100, it is not consistent as 
> the dynamic table.
> 
> Best, 
> Henry
> 
> 
>> 在 2018年8月21日,下午4:16,Fabian Hueske > > 写道:
>> 
>> Hi,
>> 
>> No, it won't. I will simply remove state that has not been accessed for the 
>> configured time but not change the result.
>> For example, if you have a GROUP BY aggregation and the state for a grouping 
>> key is removed, the operator will start a new aggregation if a record with 
>> the removed grouping key arrives.
>> 
>> Idle state retention is not meant to affect the semantics of a query. 
>> The semantics of updating the result should be defined in the query, e.g., 
>> with a WHERE clause that removes all records that are older than 1 day 
>> (note, this is not supported yet).
>> 
>> Best, Fabian
>> 
>> 2018-08-21 10:04 GMT+02:00 徐涛 > >:
>> Hi All,
>> Will idle state retention trigger retract in dynamic table?
>> 
>> Best,
>> Henry
>> 
> 
> 



Re: 答复: Best way to find the current alive jobmanager with HA mode zookeeper

2018-08-21 Thread Till Rohrmann
Hi Martin,

when configuring Flink to use the ZooKeeper HA mode, then it won't be
necessary to specify the leader's address manually. The CLI will ask
ZooKeeper for the leader information and send the request to the current
leader. This should work with at least Flink >= 1.5 and also with Flink 1.4.

Cheers,
Till

On Tue, Aug 21, 2018 at 10:20 AM Martin Eden 
wrote:

> Hi guys,
>
> Just to close the loop, with the Flink 1.3.2 cli you have to provide the
> Flink Job Manager host address in order to submit a job like so:
> ${FLINK_HOME}/bin/flink run -d -m ${FLINK_JOBMANAGER_ADDRESS} ${JOB_JAR}
>
> Since we are running the DCOS Flink package we use the Marathon rest api
> to fetch the FLINK_JOBMANAGER_ADDRESS which solved our problem.
>
> We are now thinking of upgrading to the latest 1.6 release. From looking
> at the cli docs and from the previous messages it seems you still need to
> provide the Job Manager address explicitly. Are there any plans to support
> job submission that just takes a zookeeper ensemble and zookeeperNamespace
> (which is currently accepted) without having to provide explicit Job
> Manager address? This would be more user friendly and would eliminate the
> extra step of figuring out the Job Manager address.
>
> Thanks,
> M
>
>
>
> On Tue, Jul 31, 2018 at 3:54 PM, Till Rohrmann 
> wrote:
>
>> I think that the web ui automatically redirects to the current leader. So
>> if you should access the JobManager which is not leader, then you should
>> get an HTTP redirect to the current leader. Due to that it should not be
>> strictly necessary to know which of the JobManagers is the leader.
>>
>> The RestClusterClient uses the ZooKeeperLeaderRetrievalService to
>> retrieve the leader address. You could try the same. Using the
>> RestClusterClient with Flink 1.4 won't work, though. Alternatively, you
>> should be able to directly read the address from the leader ZNode in
>> ZooKeeper.
>>
>> Cheers,
>> Till
>>
>>
>>
>> On Thu, Jul 26, 2018 at 4:14 AM vino yang  wrote:
>>
>>> Hi Youjun,
>>>
>>> Thanks, you can try this but I am not sure if it works correctly.
>>> Because for the REST Client, there are quite a few changes from 1.4 to 1.5.
>>>
>>> Maybe you can customize the source code in 1.4 refer to specific
>>> implementation of 1.5? Another option, upgrade your Flink version.
>>>
>>> To Chesnay and Till:  any suggestion or opinion?
>>>
>>> Thanks, vino.
>>>
>>> 2018-07-26 10:01 GMT+08:00 Yuan,Youjun :
>>>
 Thanks for the information. Forgot to mention, I am using Flink 1.4,
 the RestClusterClient seems don’t have the ability to retrieve the leader
 address. I did notice there is webMonitorRetrievalService member in Flink
 1.5.



 I wonder if I can use RestClusterClient@v1.5 on my client side, to
 retrieve the leader JM of Flink v1.4 Cluster.



 Thanks

 Youjun



 *发件人**:* vino yang 
 *发送时间:* Wednesday, July 25, 2018 7:11 PM
 *收件人:* Martin Eden 
 *抄送:* Yuan,Youjun ; user@flink.apache.org
 *主题:* Re: Best way to find the current alive jobmanager with HA mode
 zookeeper



 Hi Martin,





 For a standalone cluster which exists multiple JM instances, If you do
 not use Rest API, but use Flink provided Cluster client. The client can
 perceive which one this the JM leader from multiple JM instances.



 For example, you can use CLI to submit flink job in a non-Leader node.



 But I did not verify this case for Flink on Mesos.



 Thanks, vino.



 2018-07-25 17:22 GMT+08:00 Martin Eden :

 Hi,



 This is actually very relevant to us as well.



 We want to deploy Flink 1.3.2 on a 3 node DCOS cluster. In the case of
 Mesos/DCOS, Flink HA runs only one JobManager which gets restarted on
 another node by Marathon in case of failure and re-load it's state from
 Zookeeper.



 Yuan I am guessing you are using Flink in standalone mode and there it
 is actually running 3 instances of the Job Manager, 1 active and 2
 stand-bys.



 Either way, in both cases there is the need to "discover" the hostname
 and port of the Job Manager at runtime. This is needed when you want to use
 the cli to submit jobs for instance. Is there an elegant mode to submit
 jobs other than say just trying out all the possible nodes in your cluster?



 Grateful if anyone could clarify any of the above, thanks,

 M



 On Wed, Jul 25, 2018 at 11:37 AM, Yuan,Youjun 
 wrote:

 Hi all,



 I have a standalone cluster with 3 jobmanagers, and set *high-availability
 to zookeeper*. Our client submits job by REST API(POST
 /jars/:jarid/run), which means we need to know the host of the any of the
 current alive jobmanagers. The problem is that, how 

Re: Job Manager killed by Kubernetes during recovery

2018-08-21 Thread Till Rohrmann
Hi Bruno,

in order to debug this problem we would need a bit more information. In
particular, the logs of the cluster entrypoint and your K8s deployment
specification would be helpful. If you have some memory limits specified
these would also be interesting to know.

Cheers,
Till

On Sun, Aug 19, 2018 at 2:43 PM vino yang  wrote:

> Hi Bruno,
>
> Ping Till for you, he may give you some useful information.
>
> Thanks, vino.
>
> Bruno Aranda  于2018年8月19日周日 上午6:57写道:
>
>> Hi,
>>
>> I am experiencing an issue when a job manager is trying to recover using
>> a HA setup. When the job manager starts again and tries to resume from the
>> last checkpoints, it gets killed by Kubernetes (I guess), since I can see
>> the following in the logs while the jobs are deployed:
>>
>> INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
>> RECEIVED SIGNAL 15: SIGTERM. Shutting down as requested.
>>
>> I am requesting enough memory for it, 3000Gi, and it is configured to use
>> 2048Gb of memory. I have tried to increase the max perm size, but did not
>> see an improvement.
>>
>> Any suggestions to help diagnose this?
>>
>> I have the following:
>>
>> Flink 1.6.0 (same with 1.5.1)
>> Azure AKS with Kubernetes 1.11
>> State management using RocksDB with checkpoints stored in Azure Data Lake
>>
>> Thanks!
>>
>> Bruno
>>
>>


Re: Will idle state retention trigger retract in dynamic table?

2018-08-21 Thread Fabian Hueske
Hi,

In the given example, article_id 123 will always remain in the external
storage. The state is removed and hence it cannot be retracted anymore.
Once the state was removed and the count reaches 10, a second record for
article_id 123 will be emitted to the data store.

As soon as you enable state retention and state is needed that was removed,
the query result can become inconsistent.

Best, Fabian

2018-08-21 10:52 GMT+02:00 徐涛 :

> Hi Fabian,
> SELECT article_id FROM praise GROUP BY article_id having count(1)>=10
> If article_id 123 has 100 praises and remains its state in the dynamic
> table ,and when the time passed, its state is removed, but later the
> article_id 123 has never reached to 10 praises.
> How can other program know that the state is been removed? Because the
> sink currently has the praises count stored as 100, it is not consistent as
> the dynamic table.
>
> Best,
> Henry
>
>
> 在 2018年8月21日,下午4:16,Fabian Hueske  写道:
>
> Hi,
>
> No, it won't. I will simply remove state that has not been accessed for
> the configured time but not change the result.
> For example, if you have a GROUP BY aggregation and the state for a
> grouping key is removed, the operator will start a new aggregation if a
> record with the removed grouping key arrives.
>
> Idle state retention is not meant to affect the semantics of a query.
> The semantics of updating the result should be defined in the query, e.g.,
> with a WHERE clause that removes all records that are older than 1 day
> (note, this is not supported yet).
>
> Best, Fabian
>
> 2018-08-21 10:04 GMT+02:00 徐涛 :
>
>> Hi All,
>> Will idle state retention trigger retract in dynamic table?
>>
>> Best,
>> Henry
>>
>
>
>


Re: classloading strangeness with Avro in Flink

2018-08-21 Thread Aljoscha Krettek
Hi Cliff,

Do you actually need the flink-shaded-hadoop2-uber.jar in lib. If you're 
running on YARN, you should be able to just remove them because with YARN you 
will have Hadoop in the classpath anyways.

Aljoscha

> On 21. Aug 2018, at 03:45, vino yang  wrote:
> 
> Hi Cliff,
> 
> If so, you can explicitly exclude Avro's dependencies from related 
> dependencies (using ) and then directly introduce dependencies on 
> the Avro version you need.
> 
> Thanks, vino.
> 
> Cliff Resnick mailto:cre...@gmail.com>> 于2018年8月21日周二 
> 上午5:13写道:
> Hi Vino,
> 
> Unfortunately, I'm still stuck here. By moving the avro dependency chain to 
> lib (and removing it from user jar), my OCFs decode but I get the error 
> described here:
> 
> https://github.com/confluentinc/schema-registry/pull/509 
> 
> 
> However, the Flink fix described in the PR above was to move the Avro 
> dependency to the user jar. However, since I'm using YARN, I'm required to 
> have flink-shaded-hadoop2-uber.jar loaded from lib -- and that has avro 
> bundled un-shaded. So I'm back to the start problem...
> 
> Any advice is welcome!
> 
> -Cliff
> 
> 
> On Mon, Aug 20, 2018 at 1:42 PM Cliff Resnick  > wrote:
> Hi Vino,
> 
> You were right in your assumption -- unshaded avro was being added to our 
> application jar via third-party dependency. Excluding it in packaging fixed 
> the issue. For the record, it looks flink-avro must be loaded from the lib or 
> there will be errors in checkpoint restores.
> 
> On Mon, Aug 20, 2018 at 8:43 AM Cliff Resnick  > wrote:
> Hi Vino,
> 
> Thanks for the explanation, but the job only ever uses the Avro (1.8.2) 
> pulled in by flink-formats/avro, so it's not a class version conflict there. 
> 
> I'm using default child-first loading. It might be a further transitive 
> dependency, though it's not clear by stack trace or stepping through the 
> process. When I get a chance I'll look further into it but in case anyone is 
> experiencing similar problems, what is clear is that classloader order does 
> matter with Avro.
> 
> On Sun, Aug 19, 2018, 11:36 PM vino yang  > wrote:
> Hi Cliff,
> 
> My personal guess is that this may be caused by Job's Avro conflict with the 
> Avro that the Flink framework itself relies on. 
> Flink has provided some configuration parameters which allows you to 
> determine the order of the classloaders yourself. [1]
> Alternatively, you can debug classloading and participate in the 
> documentation.[2]
> 
> [1]: 
> https://ci.apache.org/projects/flink/flink-docs-release-1.6/ops/config.html 
> 
> [2]: 
> https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/debugging_classloading.html
>  
> 
> 
> Thanks, vino.
> 
> Cliff Resnick mailto:cre...@gmail.com>> 于2018年8月20日周一 
> 上午10:40写道:
> Our Flink/YARN pipeline has been reading Avro from Kafka for a while now. We 
> just introduced a source of Avro OCF (Object Container Files) read from S3. 
> The Kafka Avro continued to decode without incident, but the OCF files failed 
> 100% with anomalous parse errors in the decoding phase after the schema and 
> codec were successfully read from them. The pipeline would work on my laptop, 
> and when I submitted a test Main program to the Flink Session in YARN, that 
> would also successfully decode. Only the actual pipeline run from the 
> TaskManager failed. At one point I even remote debugged the TaskManager 
> process and stepped through what looked like a normal Avro decode (if you can 
> describe Avro code as normal!) -- until it abruptly failed with an int decode 
> or what-have-you.
> 
> This stumped me for a while, but I finally tried moving flink-avro.jar from 
> the lib to the application jar, and that fixed it. I'm not sure why this is, 
> especially since there were no typical classloader-type errors.  This issue 
> was observed both on Flink 1.5 and 1.6 in Flip-6 mode.
> 
> -Cliff
> 
> 
> 
> 
> 



Re: UTF-16 support for TextInputFormat

2018-08-21 Thread Fabian Hueske
Thanks for creating FLINK-10134 and adding your suggestions!

Best, Fabian

2018-08-13 23:55 GMT+02:00 David Dreyfus :

> Hi Fabian,
>
> I've added FLINK-10134. FLINK-10134
> . I'm not sure you'd
> consider it a blocker or that I've identified the right component.
> I'm afraid I don't have the bandwidth or knowledge to make the kind of
> pull request you really need. I do hope my suggestions prove a little
> useful.
>
> Thank you,
> David
>
> On Fri, Aug 10, 2018 at 5:41 AM Fabian Hueske  wrote:
>
>> Hi David,
>>
>> Thanks for digging into the code! I had a quick look into the classes as
>> well.
>> As far as I can see, your analysis is correct and the BOM handling in
>> DelimitedInputFormat and TextInputFormat (and other text-based IFs such as
>> CsvInputFormat) is broken.
>> In fact, its obvious that nobody paid attention to this yet.
>>
>> It would be great if you could open a Jira issue and copy your analysis
>> and solution proposal into it.
>> While on it, we could also deprecated the (duplicated) setCharsetName()
>> method from TextInputFormat and redirect it to DelimitedInputFormat.
>> setCharset().
>>
>> Would you also be interested in contributing a fix for this problem?
>>
>> Best, Fabian
>>
>> [1] https://github.com/apache/flink/blob/master/flink-java/
>> src/main/java/org/apache/flink/api/java/io/TextInputFormat.java#L95
>>
>> 2018-08-09 14:55 GMT+02:00 David Dreyfus :
>>
>>> Hi Fabian,
>>>
>>> Thank you for taking my email.
>>> TextInputFormat.setCharsetName("UTF-16") appears to set the private
>>> variable TextInputFormat.charsetName.
>>> It doesn't appear to cause additional behavior that would help interpret
>>> UTF-16 data.
>>>
>>> The method I've tested is calling DelimitedInputFormat.setCharset("UTF-16"),
>>> which then sets TextInputFormat.charsetName and then modifies the
>>> previously set delimiterString to construct the proper byte string encoding
>>> of the the delimiter. This same charsetName is also used in
>>> TextInputFormat.readRecord() to interpret the bytes read from the file.
>>>
>>> There are two problems that this implementation would seem to have when
>>> using UTF-16.
>>>
>>>1. delimiterString.getBytes(getCharset()) in
>>>DelimitedInputFormat.java will return a Big Endian byte sequence 
>>> including
>>>the Byte Order Mark (BOM). The actual text file will not contain a BOM at
>>>each line ending, so the delimiter will never be read. Moreover, if the
>>>actual byte encoding of the file is Little Endian, the bytes will be
>>>interpreted incorrectly.
>>>2. TextInputFormat.readRecord() will not see a BOM each time it
>>>decodes a byte sequence with the String(bytes, offset, numBytes, charset)
>>>call. Therefore, it will assume Big Endian, which may not always be 
>>> correct.
>>>
>>> While there are likely many solutions, I would think that all of them
>>> would have to start by reading the BOM from the file when a Split is opened
>>> and then using that BOM to modify the specified encoding to a BOM specific
>>> one when the caller doesn't specify one, and to overwrite the caller's
>>> specification if the BOM is in conflict with the caller's specification.
>>> That is, if the BOM indicates Little Endian and the caller indicates
>>> UTF-16BE, Flink should rewrite the charsetName as UTF-16LE.
>>>
>>> I hope this makes sense and that I haven't been testing incorrectly or
>>> misreading the code.
>>>
>>> Thank you,
>>> David
>>>
>>> On Thu, Aug 9, 2018 at 4:04 AM Fabian Hueske  wrote:
>>>
 Hi David,

 Did you try to set the encoding on the TextInputFormat with

 TextInputFormat tif = ...
 tif.setCharsetName("UTF-16");

 Best, Fabian

 2018-08-08 17:45 GMT+02:00 David Dreyfus :

> Hello -
>
> It does not appear that Flink supports a charset encoding of "UTF-16".
> It particular, it doesn't appear that Flink consumes the Byte Order Mark
> (BOM) to establish whether a UTF-16 file is UTF-16LE or UTF-16BE. Are 
> there
> any plans to enhance Flink to handle UTF-16 with BOM?
>
> Thank you,
> David
>


>>


Re: Will idle state retention trigger retract in dynamic table?

2018-08-21 Thread 徐涛
Hi Fabian,
SELECT article_id FROM praise GROUP BY article_id having count(1)>=10
If article_id 123 has 100 praises and remains its state in the dynamic 
table ,and when the time passed, its state is removed, but later the article_id 
123 has never reached to 10 praises.
How can other program know that the state is been removed? Because the 
sink currently has the praises count stored as 100, it is not consistent as the 
dynamic table.

Best, 
Henry


> 在 2018年8月21日,下午4:16,Fabian Hueske  写道:
> 
> Hi,
> 
> No, it won't. I will simply remove state that has not been accessed for the 
> configured time but not change the result.
> For example, if you have a GROUP BY aggregation and the state for a grouping 
> key is removed, the operator will start a new aggregation if a record with 
> the removed grouping key arrives.
> 
> Idle state retention is not meant to affect the semantics of a query. 
> The semantics of updating the result should be defined in the query, e.g., 
> with a WHERE clause that removes all records that are older than 1 day (note, 
> this is not supported yet).
> 
> Best, Fabian
> 
> 2018-08-21 10:04 GMT+02:00 徐涛  >:
> Hi All,
> Will idle state retention trigger retract in dynamic table?
> 
> Best,
> Henry
> 



Re: 答复: Best way to find the current alive jobmanager with HA mode zookeeper

2018-08-21 Thread Martin Eden
Hi guys,

Just to close the loop, with the Flink 1.3.2 cli you have to provide the
Flink Job Manager host address in order to submit a job like so:
${FLINK_HOME}/bin/flink run -d -m ${FLINK_JOBMANAGER_ADDRESS} ${JOB_JAR}

Since we are running the DCOS Flink package we use the Marathon rest api to
fetch the FLINK_JOBMANAGER_ADDRESS which solved our problem.

We are now thinking of upgrading to the latest 1.6 release. From looking at
the cli docs and from the previous messages it seems you still need to
provide the Job Manager address explicitly. Are there any plans to support
job submission that just takes a zookeeper ensemble and zookeeperNamespace
(which is currently accepted) without having to provide explicit Job
Manager address? This would be more user friendly and would eliminate the
extra step of figuring out the Job Manager address.

Thanks,
M



On Tue, Jul 31, 2018 at 3:54 PM, Till Rohrmann  wrote:

> I think that the web ui automatically redirects to the current leader. So
> if you should access the JobManager which is not leader, then you should
> get an HTTP redirect to the current leader. Due to that it should not be
> strictly necessary to know which of the JobManagers is the leader.
>
> The RestClusterClient uses the ZooKeeperLeaderRetrievalService to
> retrieve the leader address. You could try the same. Using the
> RestClusterClient with Flink 1.4 won't work, though. Alternatively, you
> should be able to directly read the address from the leader ZNode in
> ZooKeeper.
>
> Cheers,
> Till
>
>
>
> On Thu, Jul 26, 2018 at 4:14 AM vino yang  wrote:
>
>> Hi Youjun,
>>
>> Thanks, you can try this but I am not sure if it works correctly. Because
>> for the REST Client, there are quite a few changes from 1.4 to 1.5.
>>
>> Maybe you can customize the source code in 1.4 refer to specific
>> implementation of 1.5? Another option, upgrade your Flink version.
>>
>> To Chesnay and Till:  any suggestion or opinion?
>>
>> Thanks, vino.
>>
>> 2018-07-26 10:01 GMT+08:00 Yuan,Youjun :
>>
>>> Thanks for the information. Forgot to mention, I am using Flink 1.4, the
>>> RestClusterClient seems don’t have the ability to retrieve the leader
>>> address. I did notice there is webMonitorRetrievalService member in Flink
>>> 1.5.
>>>
>>>
>>>
>>> I wonder if I can use RestClusterClient@v1.5 on my client side, to
>>> retrieve the leader JM of Flink v1.4 Cluster.
>>>
>>>
>>>
>>> Thanks
>>>
>>> Youjun
>>>
>>>
>>>
>>> *发件人**:* vino yang 
>>> *发送时间:* Wednesday, July 25, 2018 7:11 PM
>>> *收件人:* Martin Eden 
>>> *抄送:* Yuan,Youjun ; user@flink.apache.org
>>> *主题:* Re: Best way to find the current alive jobmanager with HA mode
>>> zookeeper
>>>
>>>
>>>
>>> Hi Martin,
>>>
>>>
>>>
>>>
>>>
>>> For a standalone cluster which exists multiple JM instances, If you do
>>> not use Rest API, but use Flink provided Cluster client. The client can
>>> perceive which one this the JM leader from multiple JM instances.
>>>
>>>
>>>
>>> For example, you can use CLI to submit flink job in a non-Leader node.
>>>
>>>
>>>
>>> But I did not verify this case for Flink on Mesos.
>>>
>>>
>>>
>>> Thanks, vino.
>>>
>>>
>>>
>>> 2018-07-25 17:22 GMT+08:00 Martin Eden :
>>>
>>> Hi,
>>>
>>>
>>>
>>> This is actually very relevant to us as well.
>>>
>>>
>>>
>>> We want to deploy Flink 1.3.2 on a 3 node DCOS cluster. In the case of
>>> Mesos/DCOS, Flink HA runs only one JobManager which gets restarted on
>>> another node by Marathon in case of failure and re-load it's state from
>>> Zookeeper.
>>>
>>>
>>>
>>> Yuan I am guessing you are using Flink in standalone mode and there it
>>> is actually running 3 instances of the Job Manager, 1 active and 2
>>> stand-bys.
>>>
>>>
>>>
>>> Either way, in both cases there is the need to "discover" the hostname
>>> and port of the Job Manager at runtime. This is needed when you want to use
>>> the cli to submit jobs for instance. Is there an elegant mode to submit
>>> jobs other than say just trying out all the possible nodes in your cluster?
>>>
>>>
>>>
>>> Grateful if anyone could clarify any of the above, thanks,
>>>
>>> M
>>>
>>>
>>>
>>> On Wed, Jul 25, 2018 at 11:37 AM, Yuan,Youjun 
>>> wrote:
>>>
>>> Hi all,
>>>
>>>
>>>
>>> I have a standalone cluster with 3 jobmanagers, and set *high-availability
>>> to zookeeper*. Our client submits job by REST API(POST
>>> /jars/:jarid/run), which means we need to know the host of the any of the
>>> current alive jobmanagers. The problem is that, how can we know which job
>>> manager is alive, or the host of current leader?  We don’t want to access a
>>> dead JM.
>>>
>>>
>>>
>>> Thanks.
>>>
>>> Youjun Yuan
>>>
>>>
>>>
>>>
>>>
>>
>>


Re: Will idle state retention trigger retract in dynamic table?

2018-08-21 Thread Fabian Hueske
Hi,

No, it won't. I will simply remove state that has not been accessed for the
configured time but not change the result.
For example, if you have a GROUP BY aggregation and the state for a
grouping key is removed, the operator will start a new aggregation if a
record with the removed grouping key arrives.

Idle state retention is not meant to affect the semantics of a query.
The semantics of updating the result should be defined in the query, e.g.,
with a WHERE clause that removes all records that are older than 1 day
(note, this is not supported yet).

Best, Fabian

2018-08-21 10:04 GMT+02:00 徐涛 :

> Hi All,
> Will idle state retention trigger retract in dynamic table?
>
> Best,
> Henry
>


Will idle state retention trigger retract in dynamic table?

2018-08-21 Thread 徐涛
Hi All,
Will idle state retention trigger retract in dynamic table?

Best,
Henry


Re: Flink checkpointing to Google Cloud Storage

2018-08-21 Thread Dominik Wosiński
Hey,
>From my perspective, such issues always meant clashing dependencies in case
of Flink. Have you checked the full dependency tree if there are no issues
there ?
Best Regards,
Dominik.


Re: Data loss when restoring from savepoint

2018-08-21 Thread Juho Autio
I realized that BucketingSink must not play any role in this problem. This
is because only when the 24-hour window triggers, BucketinSink gets a burst
of input. Around the state restoring point (middle of the day) it doesn't
get any input, so it can't lose anything either (right?).

I will next try removing the allowedLateness entirely from the equation.

In the meanwhile, please let me know if you have any suggestions for
debugging the lost data, for example what logs to enable.

We use FlinkKafkaConsumer010 btw. Are there any known issues with that,
that could contribute to lost data when restoring a savepoint?

On Fri, Aug 17, 2018 at 4:23 PM Juho Autio  wrote:

> Some data is silently lost on my Flink stream job when state is restored
> from a savepoint.
>
> Do you have any debugging hints to find out where exactly the data gets
> dropped?
>
> My job gathers distinct values using a 24-hour window. It doesn't have any
> custom state management.
>
> When I cancel the job with savepoint and restore from that savepoint, some
> data is missed. It seems to be losing just a small amount of data. The
> event time of lost data is probably around the time of savepoint. In other
> words the rest of the time window is not entirely missed – collection works
> correctly also for (most of the) events that come in after restoring.
>
> When the job processes a full 24-hour window without interruptions it
> doesn't miss anything.
>
> Usually the problem doesn't happen in test environments that have smaller
> parallelism and smaller data volumes. But in production volumes the job
> seems to be consistently missing at least something on every restore.
>
> This issue has consistently happened since the job was initially created.
> It was at first run on an older version of Flink 1.5-SNAPSHOT and it still
> happens on both Flink 1.5.2 & 1.6.0.
>
> I'm wondering if this could be for example some synchronization issue
> between the kafka consumer offsets vs. what's been written by BucketingSink?
>
> 1. Job content, simplified
>
> kafkaStream
> .flatMap(new ExtractFieldsFunction())
> .keyBy(new MapKeySelector(1, 2, 3, 4))
> .timeWindow(Time.days(1))
> .allowedLateness(allowedLateness)
> .sideOutputLateData(lateDataTag)
> .reduce(new DistinctFunction())
> .addSink(sink)
> // use a fixed number of output partitions
> .setParallelism(8))
>
> /**
>  * Usage: .keyBy("the", "distinct", "fields").reduce(new
> DistinctFunction())
>  */
> public class DistinctFunction implements
> ReduceFunction> {
> @Override
> public Map reduce(Map value1,
> Map value2) {
> return value1;
> }
> }
>
> 2. State configuration
>
> boolean enableIncrementalCheckpointing = true;
> String statePath = "s3n://bucket/savepoints";
> new RocksDBStateBackend(statePath, enableIncrementalCheckpointing);
>
> Checkpointing Mode Exactly Once
> Interval 1m 0s
> Timeout 10m 0s
> Minimum Pause Between Checkpoints 1m 0s
> Maximum Concurrent Checkpoints 1
> Persist Checkpoints Externally Enabled (retain on cancellation)
>
> 3. BucketingSink configuration
>
> We use BucketingSink, I don't think there's anything special here, if not
> the fact that we're writing to S3.
>
> String outputPath = "s3://bucket/output";
> BucketingSink> sink = new
> BucketingSink>(outputPath)
> .setBucketer(new ProcessdateBucketer())
> .setBatchSize(batchSize)
> .setInactiveBucketThreshold(inactiveBucketThreshold)
>
> .setInactiveBucketCheckInterval(inactiveBucketCheckInterval);
> sink.setWriter(new IdJsonWriter());
>
> 4. Kafka & event time
>
> My flink job reads the data from Kafka, using a
> BoundedOutOfOrdernessTimestampExtractor on the kafka consumer to
> synchronize watermarks accross all kafka partitions. We also write late
> data to side output, but nothing is written there – if it would, it could
> explain missed data in the main output (I'm also sure that our late data
> writing works, because we previously had some actual late data which ended
> up there).
>
> 5. allowedLateness
>
> It may be or may not be relevant that I have also enabled allowedLateness
> with 1 minute lateness on the 24-hour window:
>
> If that makes sense, I could try removing allowedLateness entirely? That
> would be just to rule out that Flink doesn't have a bug that's related to
> restoring state in combination with the allowedLateness feature. After all,
> all of our data should be in a good enough order to not be late, given the
> max out of orderness used on kafka consumer timestamp extractor.
>
> Thank you in advance!
>


Re: How do I investigate checkpoints failures

2018-08-21 Thread Dawid Wysakowicz
Hi Alex,

First thing to do in such cases is to analyze logs for jobmanager and
taskmanagers and look for exceptions there.

The cause for latest failed checkpoint says the checkpoint expired. You
can try increasing the checkpoint timeout (you can check more
configuration options for checkpoints here [1]).

Best,

Dawid


[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/state/checkpointing.html#enabling-and-configuring-checkpointing


On 21/08/18 09:10, Alexander Smirnov wrote:
> Hello,
>
> I have a cluster with multiple jobs running on it. One of the jobs has
> checkpoints constantly failing
> image.png
>
> How do I investigate it? 
>
> Thank you,
> Alex



signature.asc
Description: OpenPGP digital signature


How do I investigate checkpoints failures

2018-08-21 Thread Alexander Smirnov
Hello,

I have a cluster with multiple jobs running on it. One of the jobs has
checkpoints constantly failing
[image: image.png]

How do I investigate it?

Thank you,
Alex


How to pass a dynamic path while writing to files using writeFileAsText(path)?

2018-08-21 Thread HarshithBolar
Let's say I have a Stream with elements of type `String`. I want to write
each element in the stream to a separate file in some folder. I'm using the
following set up.

> filteredStream.writeAsText(path).setParallelism(1);

How do I make this path variable? I even tried adding `System.nanotime()` to
the path to make it variable. But it still doesn't seem to work, everything
gets written to a single file. 




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: How does flink know which data is modified in dynamic table?

2018-08-21 Thread Hequn Cheng
Hi,

You are right. We can make use of it to do soft delete.
But there will be problems in other cases. For example, retract messages by
the whole row. I opened a jira[1] about this problem. Thanks for bring up
this discussion.

[1] https://issues.apache.org/jira/browse/FLINK-10188

Best, Hequn

On Tue, Aug 21, 2018 at 12:34 PM, 徐涛  wrote:

> Hi Hequn,
> Another question, for some case, I think update the timestamp of the
> retract row is reasonable, for example, some user does not want to the hard
> delete, but the soft delete, so I write code when the retract row comes I
> only do the soft delete, but I want the update_timestamp different so the
> ETL program can know that this line has changed.
>
>
> For example, if the value is updated from 1 to 2,
>
> previous row:  add (a, 1, 2018-08-20 20:18:10.286)
> retract row: delete (a, 1, 2018-08-20 20:18:10.386)
> new row: add (a, 2, 2018-08-20 20:18:10.486)
>
>
> 在 2018年8月21日,下午12:25,Hequn Cheng  写道:
>
> Hi Henry,
>
> You are right that, in MySQL, SYSDATE returns the time at which it
> executes while LOCALTIMESTAMP returns a constant time that indicates the
> time at which the statement began to execute.
> But other database system seems don't have this constraint(correct me if
> I'm wrong). Sometimes we don't have to follow MySQL.
>
> Best, Hequn
>
> On Tue, Aug 21, 2018 at 10:21 AM, 徐涛  wrote:
>
>> Hi Hequn,
>> Maybe I do not express clearly. I mean if only the update_timestamp of
>> the increment data is updated, it is not enough. Because from the sql, it
>> express the idea “all the time in the table is the same”, but actually each
>> item in the table may be different. It is a bit weird.
>>
>> Best, Henry
>>
>>
>>
>> 在 2018年8月21日,上午10:09,Hequn Cheng  写道:
>>
>> Hi Henry,
>>
>> If you upsert by key 'article_id', the result is correct, i.e, the result
>> is (a, 2, 2018-08-20 20:18:10.486). What do you think?
>>
>> Best, Hequn
>>
>>
>>
>> On Tue, Aug 21, 2018 at 9:44 AM, 徐涛  wrote:
>>
>>> Hi Hequn,
>>> However is it semantically correct? because the sql result is not equal
>>> to the bounded table.
>>>
>>>
>>> 在 2018年8月20日,下午8:34,Hequn Cheng  写道:
>>>
>>> Hi Henry,
>>>
>>> Both sql output incrementally.
>>>
>>> However there are some problems if you use retract sink. You have to pay
>>> attention to the timestamp field since each time the value is different.
>>> For example, if the value is updated from 1 to 2,
>>>
>>> previous row:  add (a, 1, 2018-08-20 20:18:10.286)
>>> retract row: delete (a, 1, 2018-08-20 20:18:10.386)
>>> new row: add (a, 2, 2018-08-20 20:18:10.486)
>>>
>>> The retract row is different from the previous row because of the time
>>> field.
>>>
>>> Of course, this problem should be fixed later.
>>>
>>> Best, Hequn
>>>
>>> On Mon, Aug 20, 2018 at 6:43 PM, 徐涛  wrote:
>>>
 Hi All,
 Like the following code,If I use retract stream, I think Flink is able
 to know which item is modified( if praise has 1 items now, when one
 item comes to the stream, only very small amount of data is write to sink)

var praiseAggr = tableEnv.sqlQuery(*s"SELECT article_id,hll(uid) as PU 
 FROM praise group by article_id**”* )

 tableEnv.registerTable("finalTable", praiseAggr)

tableEnv.sqlUpdate(s"insert into sinkTableName SELECT * from 
 finalTable")


 But if I use the following sql, by adding a dynamic timestamp
 field:
 var praiseAggr = tableEnv.sqlQuery(s"SELECT article_id,hll(uid) as 
 PU,LOCALTIMESTAMP
 as update_timestamp* FROM praise group by article_id**”* )
   Is the whole table flush to the sink? Or only the incremental
 value will flush to the sink? Why?

 Thanks,
 Henry


>>>
>>>
>>
>>
>
>