Re: Flink on K8s job submission best practices

2017-12-22 Thread Martin Eden
The above applies to Mesos/DCOS as well. So if someone would also share
insights into automatic job deployment in that setup would very useful.
Thanks.
M

On Fri, Dec 22, 2017 at 6:56 PM, Maximilian Bode <
maximilian.b...@tngtech.com> wrote:

> Hi everyone,
>
> We are beginning to run Flink on K8s and found the basic templates [1] as
> well as the example Helm chart [2] very helpful. Also the discussion about
> JobManager HA [3] and Patrick's talk [4] was very interesting. All in all
> it is delightful how easy everything can be set up and works out of the box.
>
> Now we are looking for some best practices as far as job submission is
> concerned. Having played with a few alternative options, we would like to
> get some input on what other people are using. What we have looked into so
> far:
>
>1. Packaging the job jar into e.g. the JM image and submitting
>manually (either from the UI or via `kubectl exec`). Ideally, we would like
>to establish a more automated setup, preferably using native Kubernetes
>objects.
>2. Building a separate image whose responsibility it is to submit the
>job and keep it running. This could either use the API [5] or share the
>Flink config so that CLI calls connect to the existing cluster. When
>scheduling this as a Kubernetes deployment [6] and e.g. the node running
>this client pod fails, one ends up with duplicate jobs. One could build
>custom logic (poll if job exists, only submit if it does not), but this
>seems fragile and it is conceivable that this could lead to weird timing
>issues like different containers trying to submit at the same time. One
>solution would be to implement an atomic submit-if-not-exists, but I
>suppose this would need to involve some level of locking on the JM.
>3. Schedule the client container from the step above as a Kubernetes
>job [7]. This seems somewhat unidiomatic for streaming jobs that are not
>expected to terminate, but one would not have to deal with duplicate Flink
>jobs. In the failure scenario described above, the (Flink) job would still
>be running on the Flink cluster, there just would not be a client attached
>to it (as the Kubernetes job would not be restarted). On the other hand,
>should the (Flink) job fail for some reason, there is no fashion of
>restarting it automatically.
>
> Are we missing something obvious? Has the Flink community come up with a
> default way of submitting Flink jobs on Kubernetes yet or are there people
> willing to share their experiences?
>
> Best regards and happy holidays,
> Max
>
> [1] https://ci.apache.org/projects/flink/flink-docs-
> release-1.4/ops/deployment/kubernetes.html
> [2] https://github.com/docker-flink/examples/tree/master/helm/flink
> [3] http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/Flink-HA-with-Kubernetes-without-Zookeeper-td15033.html
> [4] https://www.youtube.com/watch?v=w721NI-mtAA Slides:
> https://www.slideshare.net/FlinkForward/flink-forward-
> berlin-2017-patrick-lucas-flink-in-containerland
> [5] https://ci.apache.org/projects/flink/flink-docs-
> master/monitoring/rest_api.html#submitting-programs
> [6] https://kubernetes.io/docs/concepts/workloads/controllers/deployment/
> [7] https://kubernetes.io/docs/concepts/workloads/controllers/jobs-run-to-
> completion/
> --
> Maximilian Bode * maximilian.b...@tngtech.com
> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
> Geschäftsführer: Henrik Klagges, Dr. Robert Dahlke, Gerhard Müller
> Sitz: Unterföhring * Amtsgericht München * HRB 135082
>


Re: flink eventTime, lateness, maxoutoforderness

2017-12-22 Thread chen
Thanks Gordon, Please see the rely. I use code, but the result it doesn`t
like what the doc explain.




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: flink eventTime, lateness, maxoutoforderness

2017-12-22 Thread chen

CODE with maxOutOfOrdernesstime effect:
dataStream.keyBy(row -> (String)row.getField(0))
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.fold(initRow(), new FoldFunction() {
@Override
public Row fold(Row ret, Row o) throws Exception {
ret.setField(0, (int)ret.getField(0) + 1);
ret.setField(1, (String)ret.getField(1)+
o.getField(1) + "|");
return  ret;
}
});
 public Watermark getCurrentWatermark(){
  return new Watermark(currentTime - 5000);}

1. Send Data *WITHOUT*Thread.sleep(), the result is like this :  
1,1483250636000|
4,148325064|1483250642000|1483250641000|1483250643000|
4,1483250649000|1483250648000|1483250645000|1483250647000|
2,148325065|1483250653000|
1,1483250658000|
3,1483250661000|1483250662000|1483250663000|
1,1483250667000|

2. Send Data WITH Thread.sleep(), the result is like this, we will see the
function of maxOutOfOrdernesstime, it will delay calculate, then coming out
result.
1,1483250636000|
2,148325064|1483250642000|
3,1483250649000|1483250648000|1483250645000|
2,148325065|1483250653000|
1,1483250658000|
3,1483250661000|1483250662000|1483250663000|
1,1483250667000|

I don`t know how to explain the eventTime, lateness, maxOutOfOrderness.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: flink eventTime, lateness, maxoutoforderness

2017-12-22 Thread chen
Hi Eron,
Thanks for your help. Actually I know maxoutoforder, lateness is based
on Event Time. But in my test it is not. Following is my code and test data.
 "key1|148325064|",
 "key1|1483250636000|",
 "key1|1483250649000|",
 "key1|1483250642000|",
 "key1|148325065|",
 "key1|1483250641000|",
 "key1|1483250653000|",
 "key1|1483250648000|",
 "key1|1483250645000|",
 "key1|1483250658000|",
 "key1|1483250647000|",
 "key1|1483250643000|",
 "key1|1483250661000|",
 "key1|1483250662000|",
 "key1|1483250667000|",
 "key1|1483250663000|",

 dataStream.keyBy(row -> (String)row.getField(0))
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.allowedLateness(Time.seconds(5))
.fold(initRow(), new FoldFunction() {
@Override
public Row fold(Row ret, Row o) throws Exception {
ret.setField(0, (int)ret.getField(0) + 1);
ret.setField(1, (String)ret.getField(1)+
o.getField(1) + "|");
return  ret;
}
})

1. Send Data *WITHOUT*Thread.sleep(), the result is like this :
 1,1483250636000|
 4,148325064|1483250642000|1483250641000|1483250643000|
 4,1483250649000|1483250648000|1483250645000|1483250647000|
 2,148325065|1483250653000|
 1,1483250658000|
 3,1483250661000|1483250662000|1483250663000|
 1,1483250667000|
2. Send Data WITH Thread.sleep(), the result is like this, we will see the
function of allowedLateness, it will trigger the window to calculate again,
the result will come out again.
  1,1483250636000|
  1,148325064|
  2,148325064|1483250642000|
  1,1483250649000|
  2,1483250649000|1483250648000|
  3,1483250649000|1483250648000|1483250645000|
  2,148325065|1483250653000|
  1,1483250658000|
  2,1483250661000|1483250662000|
  3,1483250661000|1483250662000|1483250663000|
  1,1483250667000|




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: CSV writer/parser inconsistency when using the Table API?

2017-12-22 Thread Fabian Hueske
Hi Cliff,

you are right.
The CsvTableSink and the CsvInputFormat are not in sync. It would be great
if you could open a JIRA ticket for this issue.
As a workaround, you could implement your own CsvTableSink to add a
delimiter after the last field.
The code is straightforward, less than 150 lines simple Scala code [1] (see
CsvFormatter at the end of the file).

Best, Fabian

[1]
https://github.com/apache/flink/blob/master/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/CsvTableSink.scala

2017-12-22 21:34 GMT+01:00 Cliff Resnick :

> I've been trying out the Table API for some ETL using a two-stage job of
> CsvTableSink (DataSet) -> CsvInputFormat (Stream). I ran into an issue
> where the first stage produces output with trailing null values (valid),
> which causes a parse error in the second stage.
>
> Looking at RowCsvInputFormatTest.java, I noticed that it expects input
> lines with a trailing delimiter, eg. "a|b|c|". Meanwhile, the CsvTableSink
> creates rows in the form of "a|b|c". As long as 'c' is present, this input
> does get successfully parsed by the RowCsvInputFormat. However, if  'c' is
> defined as a number and missing, eg. the row is "a|b|", the Number parser
> will fail on the empty string.
>
> Is there something I am missing, or is there, in fact, an inconsistency
> between the TableSink and the InputFormat?
>


CSV writer/parser inconsistency when using the Table API?

2017-12-22 Thread Cliff Resnick
I've been trying out the Table API for some ETL using a two-stage job of
CsvTableSink (DataSet) -> CsvInputFormat (Stream). I ran into an issue
where the first stage produces output with trailing null values (valid),
which causes a parse error in the second stage.

Looking at RowCsvInputFormatTest.java, I noticed that it expects input
lines with a trailing delimiter, eg. "a|b|c|". Meanwhile, the CsvTableSink
creates rows in the form of "a|b|c". As long as 'c' is present, this input
does get successfully parsed by the RowCsvInputFormat. However, if  'c' is
defined as a number and missing, eg. the row is "a|b|", the Number parser
will fail on the empty string.

Is there something I am missing, or is there, in fact, an inconsistency
between the TableSink and the InputFormat?


Flink network access control documentation

2017-12-22 Thread Elias Levy
There is a need for better documentation on what connects to what over
which ports in a Flink cluster to allow users to configure network access
control rules.

I was under the impression that in a ZK HA configuration the Job Managers
were essentially independent and only coordinated via ZK.  But starting
multiple JMs in HA with the JM RPC port blocked between JMs shows that the
second JM's Akka subsystem is trying to connect to the leading JM:

INFO  akka.remote.transport.ProtocolStateActor  - No
response from remote for outbound association. Associate timed out after
[2 ms].
WARN  akka.remote.ReliableDeliverySupervisor-
Association with remote system [akka.tcp://flink@10.210.210.127:6123] has
failed, address is now gated for [5000] ms. Reason: [Association failed
with [akka.tcp://flink@10.210.210.127:6123]] Caused by: [No response from
remote for outbound association. Associate timed out after [2 ms].]
WARN  akka.remote.transport.netty.NettyTransport-
Remote connection to [null] failed with
org.apache.flink.shaded.akka.org.jboss.netty.channel.ConnectTimeoutException:
connection timed out: /10.210.210.127:6123


Flink on K8s job submission best practices

2017-12-22 Thread Maximilian Bode
Hi everyone,

We are beginning to run Flink on K8s and found the basic templates [1]
as well as the example Helm chart [2] very helpful. Also the discussion
about JobManager HA [3] and Patrick's talk [4] was very interesting. All
in all it is delightful how easy everything can be set up and works out
of the box.

Now we are looking for some best practices as far as job submission is
concerned. Having played with a few alternative options, we would like
to get some input on what other people are using. What we have looked
into so far:

 1. Packaging the job jar into e.g. the JM image and submitting manually
(either from the UI or via `kubectl exec`). Ideally, we would like
to establish a more automated setup, preferably using native
Kubernetes objects.
 2. Building a separate image whose responsibility it is to submit the
job and keep it running. This could either use the API [5] or share
the Flink config so that CLI calls connect to the existing cluster.
When scheduling this as a Kubernetes deployment [6] and e.g. the
node running this client pod fails, one ends up with duplicate jobs.
One could build custom logic (poll if job exists, only submit if it
does not), but this seems fragile and it is conceivable that this
could lead to weird timing issues like different containers trying
to submit at the same time. One solution would be to implement an
atomic submit-if-not-exists, but I suppose this would need to
involve some level of locking on the JM.
 3. Schedule the client container from the step above as a Kubernetes
job [7]. This seems somewhat unidiomatic for streaming jobs that are
not expected to terminate, but one would not have to deal with
duplicate Flink jobs. In the failure scenario described above, the
(Flink) job would still be running on the Flink cluster, there just
would not be a client attached to it (as the Kubernetes job would
not be restarted). On the other hand, should the (Flink) job fail
for some reason, there is no fashion of restarting it automatically.

Are we missing something obvious? Has the Flink community come up with a
default way of submitting Flink jobs on Kubernetes yet or are there
people willing to share their experiences?

Best regards and happy holidays,
Max

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/deployment/kubernetes.html
[2] https://github.com/docker-flink/examples/tree/master/helm/flink
[3]
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-HA-with-Kubernetes-without-Zookeeper-td15033.html
[4] https://www.youtube.com/watch?v=w721NI-mtAA Slides:
https://www.slideshare.net/FlinkForward/flink-forward-berlin-2017-patrick-lucas-flink-in-containerland
[5]
https://ci.apache.org/projects/flink/flink-docs-master/monitoring/rest_api.html#submitting-programs
[6] https://kubernetes.io/docs/concepts/workloads/controllers/deployment/
[7]
https://kubernetes.io/docs/concepts/workloads/controllers/jobs-run-to-completion/
-- 
Maximilian Bode * maximilian.b...@tngtech.com
TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
Geschäftsführer: Henrik Klagges, Dr. Robert Dahlke, Gerhard Müller
Sitz: Unterföhring * Amtsgericht München * HRB 135082


Re: periodic trigger

2017-12-22 Thread Plamen Paskov
I think it will not solve the problem as if i set 
ContinuousEventTimeTrigger to 10 seconds and 
allowedLateness(Time.seconds(60)) as i don't want to discard events from 
different users received later then i might receive more than one row 
for a single user based on the number of windows created by the events 
of this user. That will make the the average computations wrong.



On 22.12.2017 12:10, Piotr Nowojski wrote:

Ok, I think now I understand your problem.

Wouldn’t it be enough, if you change last global window to something 
like this:


lastUserSession
 .timeWindowAll(*Time.seconds(10)*)
 .aggregate(new AverageSessionLengthAcrossAllUsers())
 .print();

(As a side note, maybe you should use ContinousEventTimeTrigger in the 
first window). This way it will aggregate and calculate average 
session length of only last “preview results” of the 60 seconds user 
windows (emitted every 10 seconds from the first aggregation).


Piotrek

On 21 Dec 2017, at 15:18, Plamen Paskov 
> wrote:


Imagine a case where i want to run a computation every X seconds for 
1 day window. I want the calculate average session length for current 
day every X seconds. Is there an easy way to achieve that?



On 21.12.2017 16:06, Piotr Nowojski wrote:

Hi,

You defined a tumbling window 
(https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/windows.html#tumbling-windows) 
of 60 seconds, triggered every 10 seconds. This means that each 
input element can be processed/averaged up to 6 times (there is no 
other way if you trigger each window multiple times).


I am not sure what are you trying to achieve, but please refer to 
the documentation about different window types (tumbling, sliding, 
session) maybe it will clarify things for you:

https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/windows.html#window-assigners

If you want to avoid duplicated processing, use either tumbling 
window with default trigger (triggering at the end of the window), 
or use session windows.


Piotrek


On 21 Dec 2017, at 13:29, Plamen Paskov 
> wrote:


Hi guys,
I have the following code:

SingleOutputStreamOperator lastUserSession = env
 .socketTextStream("localhost",9000,"\n")
 .map(new MapFunction() {
 @Override public Event map(String value)throws Exception {
 String[] row = value.split(",");
 return new Event(Long.valueOf(row[0]), row[1], 
Long.valueOf(row[2]), Timestamp.valueOf(row[3]).getTime());
 }
 })
 .assignTimestampsAndWatermarks(new 
BoundedOutOfOrdernessTimestampExtractor(Time.seconds(10)) {
 @Override public long extractTimestamp(Event element) {
 return element.timestamp;
 }
 })
 .keyBy("userId","sessionId")
 .window(TumblingEventTimeWindows.of(Time.seconds(60)))
 .trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(10)))
 .maxBy("length",false);

lastUserSession
 .timeWindowAll(Time.seconds(60))
 .aggregate(new AverageSessionLengthAcrossAllUsers())
 .print();

What i'm trying to achieve is to calculate the average session length every 10 
seconds. The problem is that once the window length is 60 seconds and a 
computation is triggered
every 10 seconds i will receive duplicate events in my average calculation 
method so the average will not be correct. If i move 
ContinuousProcessingTimeTrigger down before
AverageSessionLengthAcrossAllUsers() then it's not triggering every 10 seconds.
Any other suggestions how to workaround this?

Thanks










Re: entrypoint for executing job in task manager

2017-12-22 Thread Piotr Nowojski
I don’t think there is such hook in the Flink code now. You will have to walk 
around this issue somehow in user space. 

Maybe you could make a contract that every operator before touching Guice, 
should call static synchronized method `initializeGuiceContext`. This method 
could search the classpath for classes with some specific annotations, for 
example `@MyInitializationHook` and install/add all of such hooks before 
actually using Guice?

Piotrek

> On 21 Dec 2017, at 17:49, Steven Wu  wrote:
> 
> We use Guice for dependency injection. We need to install additional Guice 
> modules (for bindings) when setting up this static context of Guice injector.
> 
> Calling the static initializer from operator open method won't really help. 
> Not all operators are implemented by app developer who want to install 
> additional Guice modules. E.g. kafka source operator is implemented/provided 
> by our platform. I think the source operator will open first, which means app 
> operator won't get a chance to initialize the static context. What would 
> really help if there is a entry hook (at task manager) that is executed 
> before any operator opening.
> 
> On Thu, Dec 21, 2017 at 12:27 AM, Piotr Nowojski  > wrote:
> Open method is called just before any elements are processed. You can hook in 
> any initialisation logic there, including initialisation of a static context. 
> However keep in mind, that since this context is static, it will be shared 
> between multiple operators (if you are running parallelism > number of task 
> managers), so accesses to it must be synchronized (including initialisation). 
> Another thing to consider is that managing the life cycle of static context 
> can be tricky (when to close it and release it’s resources).
> 
> The questions is, whether you really need a static context?
> 
> Thanks,
> Piotrek
> 
> 
> > On 21 Dec 2017, at 07:53, Steven Wu  > > wrote:
> >
> > Here is my understanding of how job submission works in Flink. When 
> > submitting a job to job manager via REST API, we provide a entry class. Job 
> > manager then evaluate job graph and ship serialized operators to task 
> > manager. Task manager then open operators and run tasks.
> >
> > My app would typically requires some initialization phase to setup my own 
> > running context in task manager (e.g. calling a static method of some 
> > class). Does Flink provide any entry hook in task manager when executing a 
> > job (and tasks)? As for job manager, the entry class provides such hook 
> > where I can initialize my static context.
> >
> > Thanks,
> > Steven
> 
> 



Re: periodic trigger

2017-12-22 Thread Piotr Nowojski
Ok, I think now I understand your problem. 

Wouldn’t it be enough, if you change last global window to something like this:

lastUserSession
.timeWindowAll(Time.seconds(10))
.aggregate(new AverageSessionLengthAcrossAllUsers())
.print();

(As a side note, maybe you should use ContinousEventTimeTrigger in the first 
window). This way it will aggregate and calculate average session length of 
only last “preview results” of the 60 seconds user windows (emitted every 10 
seconds from the first aggregation).

Piotrek

> On 21 Dec 2017, at 15:18, Plamen Paskov  wrote:
> 
> Imagine a case where i want to run a computation every X seconds for 1 day 
> window. I want the calculate average session length for current day every X 
> seconds. Is there an easy way to achieve that?
> 
> On 21.12.2017 16:06, Piotr Nowojski wrote:
>> Hi,
>> 
>> You defined a tumbling window 
>> (https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/windows.html#tumbling-windows
>>  
>> )
>>  of 60 seconds, triggered every 10 seconds. This means that each input 
>> element can be processed/averaged up to 6 times (there is no other way if 
>> you trigger each window multiple times).
>> 
>> I am not sure what are you trying to achieve, but please refer to the 
>> documentation about different window types (tumbling, sliding, session) 
>> maybe it will clarify things for you:
>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/windows.html#window-assigners
>>  
>> 
>> 
>> If you want to avoid duplicated processing, use either tumbling window with 
>> default trigger (triggering at the end of the window), or use session 
>> windows.
>> 
>> Piotrek
>> 
>> 
>>> On 21 Dec 2017, at 13:29, Plamen Paskov >> > wrote:
>>> 
>>> Hi guys,
>>> I have the following code:
>>> 
>>> SingleOutputStreamOperator lastUserSession = env
>>> .socketTextStream("localhost", 9000, "\n")
>>> .map(new MapFunction() {
>>> @Override
>>> public Event map(String value) throws Exception {
>>> String[] row = value.split(",");
>>> return new Event(Long.valueOf(row[0]), row[1], 
>>> Long.valueOf(row[2]), Timestamp.valueOf(row[3]).getTime());
>>> }
>>> })
>>> .assignTimestampsAndWatermarks(new 
>>> BoundedOutOfOrdernessTimestampExtractor(Time.seconds(10)) {
>>> @Override
>>> public long extractTimestamp(Event element) {
>>> return element.timestamp;
>>> }
>>> })
>>> .keyBy("userId", "sessionId")
>>> .window(TumblingEventTimeWindows.of(Time.seconds(60)))
>>> .trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(10)))
>>> .maxBy("length", false);
>>> 
>>> lastUserSession
>>> .timeWindowAll(Time.seconds(60))
>>> .aggregate(new AverageSessionLengthAcrossAllUsers())
>>> .print();
>>> 
>>> What i'm trying to achieve is to calculate the average session length every 
>>> 10 seconds. The problem is that once the window length is 60 seconds and a 
>>> computation is triggered
>>> every 10 seconds i will receive duplicate events in my average calculation 
>>> method so the average will not be correct. If i move 
>>> ContinuousProcessingTimeTrigger down before 
>>> AverageSessionLengthAcrossAllUsers() then it's not triggering every 10 
>>> seconds.
>>> Any other suggestions how to workaround this?
>>> 
>>> Thanks
>> 
>