Jobmanager not properly fenced when killed by YARN RM

2019-12-13 Thread Paul Lam
Hi,

Recently I've seen a situation when a JobManager received a stop signal
from YARN RM but failed to exit and got in the restart loop, and keeps
failing because the TaskManager containers are disconnected (killed by RM
as well) before finally exited when hit the limit of the restart policy.
This further resulted in the flink job being marked as final status failed
and cleanup of  zookeeper paths, so when a new JobManager started up it
found no checkpoint to restore and performed a stateless restart. In
addition, the application is run with Flink 1.7.1 in HA job cluster mode on
Hadoop 2.6.5.

As I can remember, I've seen a similar issue that relates to the fencing of
JobManager, but I searched the JIRA and couldn't find it. It would be great
if someone can point me to the right direction. And any comments are also
welcome! Thanks!

Best,
Paul Lam


Questions about taskmanager.memory.off-heap and taskmanager.memory.preallocate

2019-12-13 Thread Ethan Li
Hi Community,

I have a question about the taskmanager.memory.preallocate config in the doc 
https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/config.html#taskmanager-memory-preallocate
 


We have large memory box so as it suggested we should use off heap memory for 
flink managed memory. And the doc then suggests to set 
taskmanager.memory.preallocate to true. However,

 "For streaming setups is is highly recommended to set this value to false as 
the core state backends currently do not use the managed memory."


Our flink set up is mainly for streaming jobs so I think the above applies to 
our case. So should I use off-heap with “preallocate" setting to false? What 
would be the impact with these configs?


Thank you very much!


Best,
Ethan

Re: Join a datastream with tables stored in Hive

2019-12-13 Thread Krzysztof Zarzycki
Very interesting, Kurt! Yes, I also imagined it's rather a very common
case. In my company we currently have 3 clients wanting this functionality.
I also just realized this slight difference between Temporal Join and
Temporal Table Function Join, that there are actually two methods:)

Regarding option 1:
So I would need to:
* write a Datastream API source, that pulls Hive dictionary table every
let's say day, assigns event time column to rows and creates a stream of
it. It does that and only that.
* create a table (from Table API) out of it, assigning one of the columns
as an event time column.
* then use table.createTemporalTableFunction()
* finally join my main data stream with the temporal table function (let me
use short name TTF from now) from my dictionary, using Flink SQL and LATERAL
TABLE (Rates(o.rowtime)) AS r construct.
And so I should achieve my temporal event-time based join with versioned
dictionaries!
Question 1: do I need to write that Hive source or can I use something
ready, like Hive catalog integration? Or maybe reuse e.g. HiveTableSource
class?

Question/worry 2: One thing that worried me is this comment in the docs:

*Note: State retention defined in a query configuration

is
not yet implemented for temporal joins. This means that the required state
to compute the query result might grow infinitely depending on the number
of distinct primary keys for the history table.  *

On the other side, I find this comment: *By definition of event
time, watermarks

allow
the join operation to move forward in time and discard versions of the
build table that are no longer necessary because no incoming row with lower
or equal timestamp is expected.*
So I believe that the state would grow inifinitely if I had infinite number
of keys, but not only infinite number of versions of all keys. Which is
fine. Do you confirm?

Question 3: I need to be able to cover also reprocessing or backfilling of
historical data. Let's say I would need to join data stream and
(versioned/snapshotted) dictionaries stored on HDFS. Do you imagine that I
could use the same logic for both stream processing and reprocessing just
by replacing sources and sinks? Maybe after some slight modifications?


Regarding option 2:
Here I understand the current limitation (which will stay for some time )
is that the join can happen only on processing time, which means join only
with the latest version of dictionaries.
Accepting that, I understand I would need to do:
a) load Hive table to e.g. HBase and then use HBaseTableSource on it., OR
b) maybe implement Hive/JDBC-based LookupableTableSource that  pulls the
whole dictionary to memory (or even to Flink state, if it is possible to
use it from TableFunction).
Then use this table and my Kafka stream table in temporal join expressed
with Flink SQL.
What do you think, is that feasible?
Do I understand correctly, that this option is available only with Blink
engine and also only with use of Flink SQL, no Table API?

Same question comes up regarding reprocessing: do you think it would be
possible to use the same logic / SQL for reprocessing?

Thank you for continuing discussion with me. I believe we're here on a
subject of a really important design for the community.
Krzysztof

pt., 13 gru 2019 o 09:39 Kurt Young  napisał(a):

> Sorry I forgot to paste the reference url.
>
> Best,
> Kurt
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/joins.html#join-with-a-temporal-table-function
> [2]
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/joins.html#join-with-a-temporal-table
>
> On Fri, Dec 13, 2019 at 4:37 PM Kurt Young  wrote:
>
>> Hi Krzysztof,
>>
>> What you raised also interested us a lot to achieve in Flink.
>> Unfortunately, there
>> is no in place solution in Table/SQL API yet, but you have 2 options
>> which are both
>> close to this thus need some modifications.
>>
>> 1. The first one is use temporal table function [1]. It needs you to
>> write the logic of
>> reading hive tables and do the daily update inside the table function.
>> 2. The second choice is to use temporal table join [2], which only works
>> with processing
>> time now (just like the simple solution you mentioned), and need the
>> table source has
>> look up capability (like hbase). Currently, hive connector doesn't
>> support look up, so to
>> make this work, you need to sync the content to other storages which
>> support look up,
>> like HBase.
>>
>> Both solutions are not ideal now, and we also aims to improve this maybe
>> in the following
>> release.
>>
>> Best,
>> Kurt
>>
>>
>> On Fri, Dec 13, 2019 at 1:44 AM Krzysztof Zarzycki 
>> wrote:
>>
>>> Hello dear Flinkers,
>>> If this kind of question was asked on the groups, I'm sorry for a
>>> duplicate. Feel free to just point me 

Re: [ANNOUNCE] Zhu Zhu becomes a Flink committer

2019-12-13 Thread Peter Huang
Congratulations!:)

On Fri, Dec 13, 2019 at 9:45 AM Piotr Nowojski  wrote:

> Congratulations! :)
>
> > On 13 Dec 2019, at 18:05, Fabian Hueske  wrote:
> >
> > Congrats Zhu Zhu and welcome on board!
> >
> > Best, Fabian
> >
> > Am Fr., 13. Dez. 2019 um 17:51 Uhr schrieb Till Rohrmann <
> > trohrm...@apache.org>:
> >
> >> Hi everyone,
> >>
> >> I'm very happy to announce that Zhu Zhu accepted the offer of the Flink
> PMC
> >> to become a committer of the Flink project.
> >>
> >> Zhu Zhu has been an active community member for more than a year now.
> Zhu
> >> Zhu played an essential role in the scheduler refactoring, helped
> >> implementing fine grained recovery, drives FLIP-53 and fixed various
> bugs
> >> in the scheduler and runtime. Zhu Zhu also helped the community by
> >> reporting issues, answering user mails and being active on the dev
> mailing
> >> list.
> >>
> >> Congratulations Zhu Zhu!
> >>
> >> Best, Till
> >> (on behalf of the Flink PMC)
> >>
>
>


Re: [ANNOUNCE] Zhu Zhu becomes a Flink committer

2019-12-13 Thread Piotr Nowojski
Congratulations! :)

> On 13 Dec 2019, at 18:05, Fabian Hueske  wrote:
> 
> Congrats Zhu Zhu and welcome on board!
> 
> Best, Fabian
> 
> Am Fr., 13. Dez. 2019 um 17:51 Uhr schrieb Till Rohrmann <
> trohrm...@apache.org>:
> 
>> Hi everyone,
>> 
>> I'm very happy to announce that Zhu Zhu accepted the offer of the Flink PMC
>> to become a committer of the Flink project.
>> 
>> Zhu Zhu has been an active community member for more than a year now. Zhu
>> Zhu played an essential role in the scheduler refactoring, helped
>> implementing fine grained recovery, drives FLIP-53 and fixed various bugs
>> in the scheduler and runtime. Zhu Zhu also helped the community by
>> reporting issues, answering user mails and being active on the dev mailing
>> list.
>> 
>> Congratulations Zhu Zhu!
>> 
>> Best, Till
>> (on behalf of the Flink PMC)
>> 



Re: Sample Code for querying Flink's default metrics

2019-12-13 Thread Pankaj Chand
Additionally, when an old job completes and I run a new job on the Flink
Yarn session mode cluster, when I query for metrics before they become
available for the new job, I sometimes get the last metrics for the old job
instead. This happens even if I wait for the TaskManager to be released by
Flink (as shown in the Flink's dashboard Web UI).

This shouldn't happen since the Task_Manager ID "should" be different,
though it would have the old index in the Task_Managers list.
Would this be a bug?

Thanks!

Pankaj

On Thu, Dec 12, 2019 at 5:59 AM Pankaj Chand 
wrote:

> Thank you, Chesnay!
>
> On Thu, Dec 12, 2019 at 5:46 AM Chesnay Schepler 
> wrote:
>
>> Yes, when a cluster was started it takes a few seconds for (any) metrics
>> to be available.
>>
>> On 12/12/2019 11:36, Pankaj Chand wrote:
>>
>> Hi Vino,
>>
>> Thank you for the links regarding backpressure!
>>
>> I am currently using code to get metrics by calling REST API via curl.
>> However, many times the REST API via curl gives an empty JSON object/array.
>> Piped through JQ (for filtering JSON) it produces a null value. This is
>> breaking my code.
>> Example in a Yarn cluster session mode, the following metric
>> "metrics?get=Status.JVM.CPU.Load" randomly (I think) returns an empty json
>> object/array or an actual value.
>>
>> Is it possible that for CPU Load, the empty JSON object is returned when
>> the job is newly started less than 10 seconds ago.
>>
>> Thanks,
>>
>> Pankaj
>>
>>
>>
>> On Mon, Dec 9, 2019 at 4:21 AM vino yang  wrote:
>>
>>> Hi Pankaj,
>>>
>>> > Is there any sample code for how to read such default metrics?  Is
>>> there any way to query the default metrics, such as CPU usage and Memory,
>>> without using REST API or Reporters?
>>>
>>> What's your real requirement? Can you use code to call REST API?  Why
>>> does it not match your requirements?
>>>
>>> > Additionally, how do I query Backpressure using code, or is it still
>>> only visually available via the dashboard UI? Consequently, is there any
>>> way to infer Backpressure by querying one (or more) of the Memory metrics
>>> of the TaskManager?
>>>
>>> The backpressure is related to not only memory metrics but also IO and
>>> network metrics, for more details about measure backpressure please see
>>> this blog.[1][2]
>>>
>>> [1]: https://flink.apache.org/2019/06/05/flink-network-stack.html
>>> [2]: https://flink.apache.org/2019/07/23/flink-network-stack-2.html
>>>
>>> Best,
>>> Vino
>>>
>>> Pankaj Chand  于2019年12月9日周一 下午12:07写道:
>>>
 Hello,

 Using Flink on Yarn, I could not understand the documentation for how
 to read the default metrics via code. In particular, I want to read
 throughput, i.e. CPU usage, Task/Operator's numRecordsOutPerSecond, and
 Memory.

 Is there any sample code for how to read such default metrics?  Is
 there any way to query the default metrics, such as CPU usage and Memory,
 without using REST API or Reporters?

 Additionally, how do I query Backpressure using code, or is it still
 only visually available via the dashboard UI? Consequently, is there any
 way to infer Backpressure by querying one (or more) of the Memory metrics
 of the TaskManager?

 Thank you,

 Pankaj

>>>
>>


Re: [ANNOUNCE] Zhu Zhu becomes a Flink committer

2019-12-13 Thread Fabian Hueske
Congrats Zhu Zhu and welcome on board!

Best, Fabian

Am Fr., 13. Dez. 2019 um 17:51 Uhr schrieb Till Rohrmann <
trohrm...@apache.org>:

> Hi everyone,
>
> I'm very happy to announce that Zhu Zhu accepted the offer of the Flink PMC
> to become a committer of the Flink project.
>
> Zhu Zhu has been an active community member for more than a year now. Zhu
> Zhu played an essential role in the scheduler refactoring, helped
> implementing fine grained recovery, drives FLIP-53 and fixed various bugs
> in the scheduler and runtime. Zhu Zhu also helped the community by
> reporting issues, answering user mails and being active on the dev mailing
> list.
>
> Congratulations Zhu Zhu!
>
> Best, Till
> (on behalf of the Flink PMC)
>


[ANNOUNCE] Zhu Zhu becomes a Flink committer

2019-12-13 Thread Till Rohrmann
Hi everyone,

I'm very happy to announce that Zhu Zhu accepted the offer of the Flink PMC
to become a committer of the Flink project.

Zhu Zhu has been an active community member for more than a year now. Zhu
Zhu played an essential role in the scheduler refactoring, helped
implementing fine grained recovery, drives FLIP-53 and fixed various bugs
in the scheduler and runtime. Zhu Zhu also helped the community by
reporting issues, answering user mails and being active on the dev mailing
list.

Congratulations Zhu Zhu!

Best, Till
(on behalf of the Flink PMC)


Flink slot utilization

2019-12-13 Thread Andrés Garagiola
Hi

I'm testing Flink to do stream processing, in my use case there are
multiples pipelines processing messages from multiple Kafka sources. I have
some questions regarding the jobs and slots.

1) When I deploy a new job, it takes a job slot in the TM, the job never
ends (I think it doesn't end because is a stream pipeline), and the slot is
never released, this means that the slot is busy even when no new messages
are coming from the Kafka topic. Is that OK or I'm doing something wrong?
Is there a way to do a more efficient utilization of the job slots?

2) In my use case, I need good job scalability. Potentially I could have
many pipelines running in the Flink environment, but on the other hand,
increase latency would not be a serious problem for me. There are some
recommendations regarding memory for slot? I saw that the CPU
recommendation is a core per slot, taking into account that increase the
latency would not be a big problem, do you see another good reason to
follow this recommendation?

Thank you
Regards


TypeInformation problem

2019-12-13 Thread Nicholas Walton
I was refactoring some Flink code to use IndexedSeq rather than Array. When I 
compiled the code I had failures that required according to the URL below the 
following to be inserted
/*
 * Type information (see 
https://stackoverflow.com/questions/37920023/could-not-find-implicit-value-for-evidence-parameter-of-type-org-apache-flink-ap)
 *
 * Code when ported to use IndexedSeq rather than Array
 * and similar refuses to build without this information
 */
implicit val typeInfo1 = TypeInformation.of(classOf[(Int, Long, Double, Int)])
implicit val typeInfo2 = TypeInformation.of(classOf[(Int, Long, Double, 
Double)])
implicit val typeInfo3 = TypeInformation.of(classOf[(Int, Long, Double, 
IndexedSeq[Long])])
implicit val typeInfo4 = TypeInformation.of(classOf[(Int, Long, Double, 
IndexedSeq[BigInt])])
implicit val typeInfo5 = TypeInformation.of(classOf[(Int, Long, Double, 
IndexedSeq[String])])
implicit val typeInfo6 = TypeInformation.of(classOf[(String, Int, Long, 
Double)])
implicit val typeInfo7 = TypeInformation.of(classOf[(Int, Long, Double, 
IndexedSeq[String], Int)])
implicit val typeInfo8 = TypeInformation.of(classOf[(Int, Long, Double, String, 
Int)])

The code now compiles fine, but I now have a problem with the code below, which 
was working perfectly fine before I added the above and made the IndexedSeq 
refactor

val readings: DataStream[(Int, Long, Double, Int)] = stream
  .flatMap(new splitReadings())
  .setParallelism(1)
  .assignTimestampsAndWatermarks(new readingTimstamps)
  .setParallelism(1)


val maxChannelScaled: DataStream[(Int, Long, Double, Double)] = readings
  .keyBy(0)
  .countWindow(runmaxWinLen, 1)
  .process(new runningMax())
  .setParallelism(2 * env.getParallelism)

When I submit the job I find the following in the log

2019-12-13 15:37:35,600 INFO  org.apache.flink.api.java.typeutils.TypeExtractor 
- class scala.Tuple4 does not contain a setter for field _1
2019-12-13 15:37:35,601 INFO  org.apache.flink.api.java.typeutils.TypeExtractor 
- Class class scala.Tuple4 cannot be used as a POJO type because 
not all fields are valid POJO fields, and must be processed as GenericType. 
Please read the Flink documentation on "Data Types & Serialization" for details 
of the effect on performance.
2019-12-13 15:37:35,602 ERROR 
org.apache.flink.runtime.webmonitor.handlers.JarPlanHandler   - Unhandled 
exception.
org.apache.flink.client.program.ProgramInvocationException: The main method 
caused an error: Specifying keys via field positions is only valid for tuple 
data types. Type: GenericType
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:546)
at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
at 
org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83)
at 
org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:80)
at 
org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.toJobGraph(JarHandlerUtils.java:126)
at 
org.apache.flink.runtime.webmonitor.handlers.JarPlanHandler.lambda$handleRequest$1(JarPlanHandler.java:100)
at 
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.flink.api.common.InvalidProgramException: Specifying keys 
via field positions is only valid for tuple data types. Type: 
GenericType
at 
org.apache.flink.api.common.operators.Keys$ExpressionKeys.(Keys.java:232)
at 
org.apache.flink.api.common.operators.Keys$ExpressionKeys.(Keys.java:223)
at 
org.apache.flink.streaming.api.datastream.DataStream.keyBy(DataStream.java:321)
at 
org.apache.flink.streaming.api.scala.DataStream.keyBy(DataStream.scala:392)
at org.example.Job$.main(Job.scala:99)
at org.example.Job.main(Job.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529)
... 9 more

What is happening, and more importantly how can I fix the problem?

TIA

Nick

Re: [ANNOUNCE] Apache Flink 1.8.3 released

2019-12-13 Thread Till Rohrmann
Thanks a lot Hequn for being our release manager and to the community for
making this release happen :-)

Cheers,
Till

On Thu, Dec 12, 2019 at 9:02 AM Zhu Zhu  wrote:

> Thanks Hequn for driving the release and everyone who makes this release
> possible!
>
> Thanks,
> Zhu Zhu
>
> Wei Zhong  于2019年12月12日周四 下午3:45写道:
>
>> Thanks Hequn for being the release manager. Great work!
>>
>> Best,
>> Wei
>>
>> 在 2019年12月12日,15:27,Jingsong Li  写道:
>>
>> Thanks Hequn for your driving, 1.8.3 fixed a lot of issues and it is very
>> useful to users.
>> Great work!
>>
>> Best,
>> Jingsong Lee
>>
>> On Thu, Dec 12, 2019 at 3:25 PM jincheng sun 
>> wrote:
>>
>>> Thanks for being the release manager and the great work Hequn :)
>>> Also thanks to the community making this release possible!
>>>
>>> Best,
>>> Jincheng
>>>
>>> Jark Wu  于2019年12月12日周四 下午3:23写道:
>>>
 Thanks Hequn for helping out this release and being the release manager.
 Great work!

 Best,
 Jark

 On Thu, 12 Dec 2019 at 15:02, Jeff Zhang  wrote:

 > Great work, Hequn
 >
 > Dian Fu  于2019年12月12日周四 下午2:32写道:
 >
 >> Thanks Hequn for being the release manager and everyone who
 contributed
 >> to this release.
 >>
 >> Regards,
 >> Dian
 >>
 >> 在 2019年12月12日,下午2:24,Hequn Cheng  写道:
 >>
 >> Hi,
 >>
 >> The Apache Flink community is very happy to announce the release of
 >> Apache Flink 1.8.3, which is the third bugfix release for the Apache
 Flink
 >> 1.8 series.
 >>
 >> Apache Flink® is an open-source stream processing framework for
 >> distributed, high-performing, always-available, and accurate data
 streaming
 >> applications.
 >>
 >> The release is available for download at:
 >> https://flink.apache.org/downloads.html
 >>
 >> Please check out the release blog post for an overview of the
 >> improvements for this bugfix release:
 >> https://flink.apache.org/news/2019/12/11/release-1.8.3.html
 >>
 >> The full release notes are available in Jira:
 >> https://issues.apache.org/jira/projects/FLINK/versions/12346112
 >>
 >> We would like to thank all contributors of the Apache Flink
 community who
 >> made this release possible!
 >> Great thanks to @Jincheng as a mentor during this release.
 >>
 >> Regards,
 >> Hequn
 >>
 >>
 >>
 >
 > --
 > Best Regards
 >
 > Jeff Zhang
 >

>>>
>>
>> --
>> Best, Jingsong Lee
>>
>>
>>


Re: Flink 'Job Cluster' mode Ui Access

2019-12-13 Thread Chesnay Schepler

Thank you for the logs.

Flink can indeed find the WebUI files in the distribution, which is a 
bit odd.
Since there are no static files serve in this case, the 
StaticFileServerHandler is never set up in the first place (hence why we 
didn't find any log statements).


What I also found in the logs (and, looking back, in one of your earlier 
replies) was this: Version: , Rev:ceba8af, Date:11.02.2019 @ 
22:17:09 CST


This tells us a few things.
a) You are not using 1.8.1, but 1.7.2 (based on the revision)
b) You are not using an official release, since the build-date differs 
from the official releases


I tried one of the official 1.7.2 releases, and the WebUI is shown both 
when using:

(after copying the wordcount example into /lib)
./bin/standalone-job.sh start-foreground -j 
org.apache.flink.examples.java.wordcount.WordCount
./bin/standalone-job.sh start -j 
org.apache.flink.examples.java.wordcount.WordCount


Right now I don't know what else to look for; there are some 
discrepancies as to what your environment is vs what you described, and 
as such I can only recommend to carefully evaluate what you have 
actually running and possibly try again with an official release.


Regards,
Chesnay

On 13/12/2019 09:58, Jatin Banger wrote:

Sure, here it is.
Job Manager Logs with logging level as DEBUG

On Wed, Dec 11, 2019 at 3:14 PM Chesnay Schepler > wrote:


Would it be possible for you to provide us with full debug log file?

On 10/12/2019 18:07, Jatin Banger wrote:

Yes, I did.

On Tue, Dec 10, 2019 at 3:47 PM Arvid Heise mailto:ar...@ververica.com>> wrote:

Hi Jatin,

just to be sure. Did you increase the log level to debug [1]
before checking for *StaticFileServerHandler*?

Best,

Arvid

[1]

https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/logging.html#configuring-log4j

On Mon, Dec 9, 2019 at 7:54 AM Jatin Banger
mailto:bangerjatinrm...@gmail.com>> wrote:

Hi,

I have checked the logs with this keyword
*StaticFileServerHandler *in it, But there were no logs
coming for "Flink Job Cluster".
Then i checked for Flink Session Cluster, i was able to
find the logs for the *StaticFileServerHandler *keyword.

Can i raise this as bug ?

Best Regards,
Jatin


On Thu, Dec 5, 2019 at 8:59 PM Chesnay Schepler
mailto:ches...@apache.org>> wrote:

Ok, it's good to know that the WebUI files are there.

Please enable DEBUG logging and try again, searching
for messages from the StaticFileServerHandler.

This handler logs every file that is requested (which
effectively happens when the WebUI is being served);
let's see what is actually being requested.

On 05/12/2019 05:57, Jatin Banger wrote:

I have tried that already using
'$FLINK_HOME/bin/jobmanager.sh" start-foreground
Ui comes fine with this one.
Which means web/index.html is present.


On Wed, Dec 4, 2019 at 9:01 PM Chesnay Schepler
mailto:ches...@apache.org>> wrote:

hmm...this is quite odd.

Let's try to narrow things down a bit.

Could you try starting a local cluster (using
the same distribution) and checking whether the
UI is accessible?

Could you also check whether the flink-dist.jar
in /lib contains web/index.html?

On 04/12/2019 06:02, Jatin Banger wrote:

Hi,

I am using flink binary directly.

I am using this command to deploy the script.

"$FLINK_HOME/bin/standalone-job.sh"
start-foreground --job-classname ${ARGS_FOR_JOB}
where ARGS_FOR_JOB contain job class name and
all other necessary details needed by the job.

Best regards,
Jatin


On Fri, Nov 29, 2019 at 4:18 PM Chesnay
Schepler mailto:ches...@apache.org>> wrote:

To clarify, you ran "mvn package -pl
flink-dist -am" to build Fink?

If so, could you run that again and provide
us with the maven output?
|
|
On 29/11/2019 11:23, Jatin Banger wrote:

Hi,

@vino yang  
I am using flink 1.8.1

I am using the following procedure for the
deployment:
  

Re: State Processor API: StateMigrationException for keyed state

2019-12-13 Thread Peter Westermann
Sorry I posted it but I guess it got dropped when the message was formatted.  
Here’s another attempt:
2019-12-12 12:50:08
java.io.IOException: Failed to restore state backend
at 
org.apache.flink.state.api.input.KeyedStateInputFormat.getStreamOperatorStateContext(KeyedStateInputFormat.java:231)
at 
org.apache.flink.state.api.input.KeyedStateInputFormat.open(KeyedStateInputFormat.java:177)
at 
org.apache.flink.state.api.input.KeyedStateInputFormat.open(KeyedStateInputFormat.java:79)
at 
org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:173)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.Exception: Exception while creating 
StreamOperatorStateContext.
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:195)
at 
org.apache.flink.state.api.input.KeyedStateInputFormat.getStreamOperatorStateContext(KeyedStateInputFormat.java:223)
... 6 more
Caused by: org.apache.flink.util.FlinkException: Could not restore keyed state 
backend for 
ebe1b56bc6601c8bccba93887bec8059_ebe1b56bc6601c8bccba93887bec8059_(1/1) from 
any of the 1 provided restore options.
at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:307)
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:135)
... 7 more
Caused by: org.apache.flink.runtime.state.BackendBuildingException: Caught 
unexpected exception.
at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:326)
at 
org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:520)
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:291)
at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
... 9 more
Caused by: org.apache.flink.util.StateMigrationException: The new key 
serializer must be compatible.
at 
org.apache.flink.contrib.streaming.state.restore.AbstractRocksDBRestoreOperation.readMetaData(AbstractRocksDBRestoreOperation.java:194)
at 
org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKVStateMetaData(RocksDBFullRestoreOperation.java:170)
at 
org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBFullRestoreOperation.java:157)
at 
org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restore(RocksDBFullRestoreOperation.java:141)
at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:270)
... 13 more


From: vino yang 
Date: Thursday, December 12, 2019 at 8:46 PM
To: Peter Westermann 
Cc: user 
Subject: Re: State Processor API: StateMigrationException for keyed state

Hi pwestermann,

Can you share the relevant detailed exception message?

Best,
Vino

pwestermann mailto:no.westerm...@genesys.com>> 
于2019年12月13日周五 上午2:00写道:
I am trying to get the new State Processor API but I am having trouble with
keyed state (this is for Flink 1.9.1 with RocksDB on S3 as the backend).
I can read keyed state for simple key type such as Strings but whenever I
tried to read state with a more complex key type - such as a named tuple
type (for example ), I get a StateMigrationException:



Any idea what could be wrong?



--
Sent from: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Table API问题

2019-12-13 Thread chendan
仔细看了Table API部分的文档,有以下理论和实际编程问题,盼逐个解答: 

1.跟BatchTableEnvironment 和StreamTableEnvironment 相比,TableEnvironment 
应该用在什么场景? 

2.文中提到Register an External Catalog,什么情况下会用到external Catalog?
但是在API文档里,registerExternalCatalog已经被定为Deprecated。那就只能使用
registerCatalog了。 内部Catalog和外部Catalog有什么区别?为什么需要不同的
Catalog?在什么情况下需要注册多个Catalog? 

3.注册一个Table,注册一个TableSource,然后怎么用呢?怎么写代码?文档语焉不
详。注册的作用是什么? 

4.API文档中TableEnvironment的方法scan的入参是tablePath,请问什么是
tablePath? 

在文档
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/streaming/time_attributes.html
 
中看到如下示例代码: 

这么看起来,UserActions是TableSource的名字,scan就是把TableSource的名字作
为入参。但是当我自己按照这个方法来写的时候,却报错: 
这是我的代码: 

这是运行结果: 

我明明把user_moid注册成TableSource,TableSource的名字作为scan的入参。 


Table API????

2019-12-13 Thread Chen Dan
API??TableEnvironment??scantablePathtablePath? 
 
 
??https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/streaming/time_attributes.html
  
 
 
UserActions??TableSourcescan??TableSource
 
 ?? 
 
 ?? 
 
 user_moid??TableSource??TableSource??scan

Re: [DISCUSS] have separate Flink distributions with built-in Hive dependencies

2019-12-13 Thread Jeff Zhang
+1, this is definitely necessary for better user experience. Setting up
environment is always painful for many big data tools.



Bowen Li  于2019年12月13日周五 下午5:02写道:

> cc user ML in case anyone want to chime in
>
> On Fri, Dec 13, 2019 at 00:44 Bowen Li  wrote:
>
>> Hi all,
>>
>> I want to propose to have a couple separate Flink distributions with Hive
>> dependencies on specific Hive versions (2.3.4 and 1.2.1). The distributions
>> will be provided to users on Flink download page [1].
>>
>> A few reasons to do this:
>>
>> 1) Flink-Hive integration is important to many many Flink and Hive users
>> in two dimensions:
>>  a) for Flink metadata: HiveCatalog is the only persistent catalog to
>> manage Flink tables. With Flink 1.10 supporting more DDL, the persistent
>> catalog would be playing even more critical role in users' workflow
>>  b) for Flink data: Hive data connector (source/sink) helps both
>> Flink and Hive users to unlock new use cases in streaming,
>> near-realtime/realtime data warehouse, backfill, etc.
>>
>> 2) currently users have to go thru a *really* tedious process to get
>> started, because it requires lots of extra jars (see [2]) that are absent
>> in Flink's lean distribution. We've had so many users from public mailing
>> list, private email, DingTalk groups who got frustrated on spending lots of
>> time figuring out the jars themselves. They would rather have a more "right
>> out of box" quickstart experience, and play with the catalog and
>> source/sink without hassle.
>>
>> 3) it's easier for users to replace those Hive dependencies for their own
>> Hive versions - just replace those jars with the right versions and no need
>> to find the doc.
>>
>> * Hive 2.3.4 and 1.2.1 are two versions that represent lots of user base
>> out there, and that's why we are using them as examples for dependencies in
>> [1] even though we've supported almost all Hive versions [3] now.
>>
>> I want to hear what the community think about this, and how to achieve it
>> if we believe that's the way to go.
>>
>> Cheers,
>> Bowen
>>
>> [1] https://flink.apache.org/downloads.html
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/hive/#dependencies
>> [3]
>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/hive/#supported-hive-versions
>>
>

-- 
Best Regards

Jeff Zhang


Re: [DISCUSS] have separate Flink distributions with built-in Hive dependencies

2019-12-13 Thread Bowen Li
cc user ML in case anyone want to chime in

On Fri, Dec 13, 2019 at 00:44 Bowen Li  wrote:

> Hi all,
>
> I want to propose to have a couple separate Flink distributions with Hive
> dependencies on specific Hive versions (2.3.4 and 1.2.1). The distributions
> will be provided to users on Flink download page [1].
>
> A few reasons to do this:
>
> 1) Flink-Hive integration is important to many many Flink and Hive users
> in two dimensions:
>  a) for Flink metadata: HiveCatalog is the only persistent catalog to
> manage Flink tables. With Flink 1.10 supporting more DDL, the persistent
> catalog would be playing even more critical role in users' workflow
>  b) for Flink data: Hive data connector (source/sink) helps both Flink
> and Hive users to unlock new use cases in streaming, near-realtime/realtime
> data warehouse, backfill, etc.
>
> 2) currently users have to go thru a *really* tedious process to get
> started, because it requires lots of extra jars (see [2]) that are absent
> in Flink's lean distribution. We've had so many users from public mailing
> list, private email, DingTalk groups who got frustrated on spending lots of
> time figuring out the jars themselves. They would rather have a more "right
> out of box" quickstart experience, and play with the catalog and
> source/sink without hassle.
>
> 3) it's easier for users to replace those Hive dependencies for their own
> Hive versions - just replace those jars with the right versions and no need
> to find the doc.
>
> * Hive 2.3.4 and 1.2.1 are two versions that represent lots of user base
> out there, and that's why we are using them as examples for dependencies in
> [1] even though we've supported almost all Hive versions [3] now.
>
> I want to hear what the community think about this, and how to achieve it
> if we believe that's the way to go.
>
> Cheers,
> Bowen
>
> [1] https://flink.apache.org/downloads.html
> [2]
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/hive/#dependencies
> [3]
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/hive/#supported-hive-versions
>


Re: Flink1.9.1的SQL向前不兼容的问题

2019-12-13 Thread Kurt Young
Hi,

建议你翻译成英文然后到jira里建个issue。

Best,
Kurt


On Thu, Dec 12, 2019 at 11:39 PM 李佟  wrote:

> 近期进行Flink升级,将原来的程序从老的集群(1.8.0运行正常)迁移到新的集群(1.9.1)中。在部署程序的时候发现在1.9.1的集群中,原来运行正常的Flink
> SQL的程序无法执行,异常如下:
>
>
> org.apache.flink.table.api.ValidationException: *Window can only be
> defined over a time attribute column.*
> at
> org.apache.flink.table.plan.rules.datastream.DataStreamLogicalWindowAggregateRule.getOperandAsTimeIndicator$1(DataStreamLogicalWindowAggregateRule.scala:85)
>
> at
> org.apache.flink.table.plan.rules.datastream.DataStreamLogicalWindowAggregateRule.translateWindowExpression(DataStreamLogicalWindowAggregateRule.scala:90)
>
>
>
> 跟踪到反编译的scala文件并设置断点,发现下图中的红框部分没有执行,直接跳过。
>
>
> 功能很简单,就是在某个时间窗对数值求和。测试用例如下:
>
>
> package org.flowmatrix.isp.traffic.accounting.test;
>
> import org.apache.flink.api.common.typeinfo.TypeInformation;
> import org.apache.flink.api.scala.typeutils.Types;
> import org.apache.flink.streaming.api.TimeCharacteristic;
> import org.apache.flink.streaming.api.datastream.DataStream;
> import
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import
> org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
> import org.apache.flink.streaming.api.watermark.Watermark;
> import org.apache.flink.table.api.Table;
> import org.apache.flink.table.api.TableSchema;
> import org.apache.flink.table.api.java.StreamTableEnvironment;
> import org.apache.flink.table.sinks.CsvTableSink;
> import org.apache.flink.table.sinks.TableSink;
> import org.apache.flink.table.sources.DefinedRowtimeAttributes;
> import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
> import org.apache.flink.table.sources.StreamTableSource;
> import org.apache.flink.table.sources.tsextractors.ExistingField;
> import org.apache.flink.table.sources.wmstrategies.AscendingTimestamps;
> import org.apache.flink.types.Row;
> import org.junit.Test;
>
> import javax.annotation.Nullable;
> import java.sql.Timestamp;
> import java.util.ArrayList;
> import java.util.Collections;
> import java.util.List;
>
> public class TestSql {
> @Test
> public void testAccountingSql() {
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> env.setParallelism(1);
>
> try {
> StreamTableEnvironment tableEnv =
> StreamTableEnvironment.create(env);
>
> SimpleTableSource source = new SimpleTableSource();
> Table t = tableEnv.fromTableSource(source);
>
> String interval = "5"; //5 second
> System.out.println("source schema is " +
> source.getTableSchema());
>
> Table sqlResult = tableEnv.sqlQuery("SELECT " +
> " TUMBLE_START(UserActionTime, INTERVAL '" + interval
> + "' SECOND) as rowTime, " +
> " Username," +
> " SUM(Data) as Data " +
> " FROM  " + t +
> " GROUP BY TUMBLE(UserActionTime, INTERVAL '" +
> interval + "' SECOND),Username");
>
>
> String[] fieldNames = {
> "rowTime",
> "Username", "Data"};
> TypeInformation[] fieldTypes = {
> TypeInformation.of(Timestamp.class),
> TypeInformation.of(String.class),
> TypeInformation.of(Long.class)};
>
> TableSink sink1 = new CsvTableSink("/tmp/data.log", ",");
> sink1 = sink1.configure(fieldNames, fieldTypes);
> tableEnv.registerTableSink("EsSinkTable", sink1);
> System.out.println("sql result schema is " +
> sqlResult.getSchema());
>
> tableEnv.sqlUpdate("insert into EsSinkTable select  " +
> "rowTime,Username,Data from " + sqlResult + "");
>
> env.execute("test");
> } catch (Exception e) {
> e.printStackTrace();
> System.err.println("start program error. FlowMatrix
> --zookeeper  --config " +
> " --name  --interval 
> --indexName ");
> System.err.println(e.toString());
> return;
> }
> }
>
> public static class SimpleTableSource implements
> StreamTableSource, DefinedRowtimeAttributes {
> @Override
> public DataStream getDataStream(StreamExecutionEnvironment
> env) {
> return
> env.fromCollection(genertateData()).assignTimestampsAndWatermarks(new
> AssignerWithPunctuatedWatermarks() {
> private long lastWaterMarkMillSecond = -1;
> private long waterMarkPeriodMillSecond = 1000;
> @Nullable
> @Override
> public Watermark checkAndGetNextWatermark(Row lastElement,
> long extractedTimestamp) {
> if(extractedTimestamp - lastWaterMarkMillSecond >=
> waterMarkPeriodMillSecond){
> lastWaterMarkMillSecond = 

Re: Join a datastream with tables stored in Hive

2019-12-13 Thread Kurt Young
Sorry I forgot to paste the reference url.

Best,
Kurt

[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/joins.html#join-with-a-temporal-table-function
[2]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/joins.html#join-with-a-temporal-table

On Fri, Dec 13, 2019 at 4:37 PM Kurt Young  wrote:

> Hi Krzysztof,
>
> What you raised also interested us a lot to achieve in Flink.
> Unfortunately, there
> is no in place solution in Table/SQL API yet, but you have 2 options which
> are both
> close to this thus need some modifications.
>
> 1. The first one is use temporal table function [1]. It needs you to write
> the logic of
> reading hive tables and do the daily update inside the table function.
> 2. The second choice is to use temporal table join [2], which only works
> with processing
> time now (just like the simple solution you mentioned), and need the table
> source has
> look up capability (like hbase). Currently, hive connector doesn't support
> look up, so to
> make this work, you need to sync the content to other storages which
> support look up,
> like HBase.
>
> Both solutions are not ideal now, and we also aims to improve this maybe
> in the following
> release.
>
> Best,
> Kurt
>
>
> On Fri, Dec 13, 2019 at 1:44 AM Krzysztof Zarzycki 
> wrote:
>
>> Hello dear Flinkers,
>> If this kind of question was asked on the groups, I'm sorry for a
>> duplicate. Feel free to just point me to the thread.
>> I have to solve a probably pretty common case of joining a datastream to
>> a dataset.
>> Let's say I have the following setup:
>> * I have a high pace stream of events coming in Kafka.
>> * I have some dimension tables stored in Hive. These tables are changed
>> daily. I can keep a snapshot for each day.
>>
>> Now conceptually, I would like to join the stream of incoming events to
>> the dimension tables (simple hash join). we can consider two cases:
>> 1) simpler, where I join the stream with the most recent version of the
>> dictionaries. (So the result is accepted to be nondeterministic if the job
>> is retried).
>> 2) more advanced, where I would like to do temporal join of the stream
>> with dictionaries snapshots that were valid at the time of the event. (This
>> result should be deterministic).
>>
>> The end goal is to do aggregation of that joined stream, store results in
>> Hive or more real-time analytical store (Druid).
>>
>> Now, could you please help me understand is any of these cases
>> implementable with declarative Table/SQL API? With use of temporal joins,
>> catalogs, Hive integration, JDBC connectors, or whatever beta features
>> there are now. (I've read quite a lot of Flink docs about each of those,
>> but I have a problem to compile this information in the final design.)
>> Could you please help me understand how these components should
>> cooperate?
>> If that is impossible with Table API, can we come up with the easiest
>> implementation using Datastream API ?
>>
>> Thanks a lot for any help!
>> Krzysztof
>>
>


Re: Join a datastream with tables stored in Hive

2019-12-13 Thread Kurt Young
Hi Krzysztof,

What you raised also interested us a lot to achieve in Flink.
Unfortunately, there
is no in place solution in Table/SQL API yet, but you have 2 options which
are both
close to this thus need some modifications.

1. The first one is use temporal table function [1]. It needs you to write
the logic of
reading hive tables and do the daily update inside the table function.
2. The second choice is to use temporal table join [2], which only works
with processing
time now (just like the simple solution you mentioned), and need the table
source has
look up capability (like hbase). Currently, hive connector doesn't support
look up, so to
make this work, you need to sync the content to other storages which
support look up,
like HBase.

Both solutions are not ideal now, and we also aims to improve this maybe in
the following
release.

Best,
Kurt


On Fri, Dec 13, 2019 at 1:44 AM Krzysztof Zarzycki 
wrote:

> Hello dear Flinkers,
> If this kind of question was asked on the groups, I'm sorry for a
> duplicate. Feel free to just point me to the thread.
> I have to solve a probably pretty common case of joining a datastream to a
> dataset.
> Let's say I have the following setup:
> * I have a high pace stream of events coming in Kafka.
> * I have some dimension tables stored in Hive. These tables are changed
> daily. I can keep a snapshot for each day.
>
> Now conceptually, I would like to join the stream of incoming events to
> the dimension tables (simple hash join). we can consider two cases:
> 1) simpler, where I join the stream with the most recent version of the
> dictionaries. (So the result is accepted to be nondeterministic if the job
> is retried).
> 2) more advanced, where I would like to do temporal join of the stream
> with dictionaries snapshots that were valid at the time of the event. (This
> result should be deterministic).
>
> The end goal is to do aggregation of that joined stream, store results in
> Hive or more real-time analytical store (Druid).
>
> Now, could you please help me understand is any of these cases
> implementable with declarative Table/SQL API? With use of temporal joins,
> catalogs, Hive integration, JDBC connectors, or whatever beta features
> there are now. (I've read quite a lot of Flink docs about each of those,
> but I have a problem to compile this information in the final design.)
> Could you please help me understand how these components should cooperate?
> If that is impossible with Table API, can we come up with the easiest
> implementation using Datastream API ?
>
> Thanks a lot for any help!
> Krzysztof
>


Re: How to understand create watermark for Kafka partitions

2019-12-13 Thread vino yang
Hi Alex,

>> But why also say created watermark for each Kafka topic partitions ?

IMO, the official documentation has explained the reason. Just copied here:

When using Apache Kafka

as
a data source, each Kafka partition may have a simple event time pattern
(ascending timestamps or bounded out-of-orderness). However, when consuming
streams from Kafka, multiple partitions often get consumed in parallel,
interleaving the events from the partitions and destroying the
per-partition patterns (this is inherent in how Kafka’s consumer clients
work).

In that case, you can use Flink’s Kafka-partition-aware watermark
generation. Using that feature, watermarks are generated inside the Kafka
consumer, per Kafka partition, and the per-partition watermarks are merged
in the same way as watermarks are merged on stream shuffles.


>> As I tested, watermarks also created by global, even I run my job with
parallels. And assign watermarks on Kafka consumer .


Did you follow the official example? Can you share your program?


Best,

Vino

qq <471237...@qq.com> 于2019年12月13日周五 上午9:57写道:

> Hi all,
>
>   I confused with watermark for each Kafka partitions.  As I know
> watermark  created by data stream level. But why also say created watermark
> for each Kafka topic partitions ? As I tested, watermarks also created by
> global, even I run my job with parallels. And assign watermarks on Kafka
> consumer . Thanks .
>
> Below text copied from flink web.
>
>
> you can use Flink’s Kafka-partition-aware watermark generation. Using that
> feature, watermarks are generated inside the Kafka consumer, per Kafka
> partition, and the per-partition watermarks are merged in the same way as
> watermarks are merged on stream shuffles.
>
> For example, if event timestamps are strictly ascending per Kafka
> partition, generating per-partition watermarks with the ascending
> timestamps watermark generator
> 
>  will
> result in perfect overall watermarks.
>
> The illustrations below show how to use the per-Kafka-partition watermark
> generation, and how watermarks propagate through the streaming dataflow in
> that case.
>
>
>
> Thanks
> Alex Fu
>