Re: MODERATE for d...@flink.apache.org

2017-11-06 Thread Jordan Kuan
Dear Flink Dev Team

I have encountered a problem and can't find any solution in Google.
And I have created a thread in stackoverflow.com but no response.
https://stackoverflow.com/questions/47123371/flink-windows-ha

I would really appreciate it if you could give some suggestions to me.

Thanks,

Jordan

On Tue, Nov 7, 2017 at 1:53 AM, Robert Metzger  wrote:

> Hi,
> I would suggest to send your question to the user@flink.apache.org list
> (make sure to subscribe first)
>
> -- Forwarded message --
> From: 
> Date: Mon, Nov 6, 2017 at 5:23 PM
> Subject: MODERATE for d...@flink.apache.org
> To:
> Cc: dev-allow-tc.1509985381.dcccgaimcaiiefbkiapi-jordan.kuan=
> gmail@flink.apache.org
>
>
>
> To approve:
>dev-accept-1509985381.14933.cigohlkhhjaclimeg...@flink.apache.org
> To reject:
>dev-reject-1509985381.14933.cigohlkhhjaclimeg...@flink.apache.org
> To give a reason to reject:
> %%% Start comment
> %%% End comment
>
>
>
> -- Forwarded message --
> From: Jordan Kuan 
> To: d...@flink.apache.org
> Cc:
> Bcc:
> Date: Tue, 7 Nov 2017 00:22:53 +0800
> Subject: Flink Windows HA Issue.
> Dear Flink Dev Team
>
> I have encountered a problem and can't find any solution in Google.
> And I have created a thread in stackoverflow.com but no response.
> https://stackoverflow.com/questions/47123371/flink-windows-ha
>
> I would really appreciate it if you could give some suggestions to me.
>
> Thanks,
>
> Jordan
>
>
>
>


-- 
Best Regards,
Jordan Kuan


flink-conf.yaml
Description: Binary data


Re: DataStream to Table Api idioms

2017-11-06 Thread Seth Wiesman
Not a problem, thanks for the quick feedback.

https://issues.apache.org/jira/browse/FLINK-7999

Seth Wiesman

From: Fabian Hueske 
Date: Monday, November 6, 2017 at 9:14 AM
To: Seth Wiesman 
Cc: user 
Subject: Re: DataStream to Table Api idioms

Hi Seth,

I think the Table API is not there yet to address you use case.

1. Allowed lateness cannot be configured but it is on the list of features that 
we plan to add in the future.
2. Custom triggers are not supported. We are planning to add an option to 
support your use case (early firing and updates).
3. The window joins that will be release with 1.4 require constant boundaries 
(left.time > right.time - X and left.time < right.time + Y).
Variable join window boundaries have not been considered yet and would be quite 
tricky to implement. Would you mind opening a JIRA issue for this feature?

Best, Fabian

2017-11-06 14:56 GMT+01:00 Seth Wiesman 
>:
Hi,

I am experimenting with rewriting some of my datastream projects with the table 
api and I had some questions on how to express certain idioms. I am using 
1.4-SNAPSHOT.


1)   Can I express allowed lateness?

2)   Can I use a custom trigger? More specifically, I have a 24hr window 
but would like to receive partial results say every hour.

3)   Do window join time intervals have to be constant or can they depend 
on row attributes. I am running campaigns that have start and end dates and so 
I would like my join window to be that interval.

Thank you,

Seth Wiesman




Re: Savepoints and migrating value state data types

2017-11-06 Thread Aljoscha Krettek
Actually, Flink 1.4 will come with improved Avro support. See especially:

 - https://issues.apache.org/jira/browse/FLINK-7420: 
 Move All Avro Code to 
flink-avro
 - https://issues.apache.org/jira/browse/FLINK-7997: 
 Avro should be always in 
the user code
 - https://issues.apache.org/jira/browse/FLINK-6022: 
 Improve support for Avro 
GenericRecord

This makes AvroTypeInfo and AvroSerializer quite usable.

> On 6. Nov 2017, at 15:13, mrooding  wrote:
> 
> Hi Gordon
> 
> I've been looking into creating a custom AvroSerializer without Kryo which
> would support Avro schemas and I'm starting to wonder if this is actually
> the most straightforward way to do it. 
> 
> If I extend a class from TypeSerializer I would also need to implement a
> TypeInformation class to be able to provide my serializer. Implementing all
> these classes seems to be quite the ordeal without proper documentation. Are
> you sure that this is the right way forward and that there's no other option
> of using Avro serialization with schema support for Flink?
> 
> Thanks again
> 
> Marc
> 
> 
> 
> 
> 
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



Re: DataStream to Table Api idioms

2017-11-06 Thread Fabian Hueske
Hi Seth,

I think the Table API is not there yet to address you use case.

1. Allowed lateness cannot be configured but it is on the list of features
that we plan to add in the future.
2. Custom triggers are not supported. We are planning to add an option to
support your use case (early firing and updates).
3. The window joins that will be release with 1.4 require constant
boundaries (left.time > right.time - X and left.time < right.time + Y).
Variable join window boundaries have not been considered yet and would be
quite tricky to implement. Would you mind opening a JIRA issue for this
feature?

Best, Fabian

2017-11-06 14:56 GMT+01:00 Seth Wiesman :

> Hi,
>
>
>
> I am experimenting with rewriting some of my datastream projects with the
> table api and I had some questions on how to express certain idioms. I am
> using 1.4-SNAPSHOT.
>
>
>
> 1)   Can I express allowed lateness?
>
> 2)   Can I use a custom trigger? More specifically, I have a 24hr
> window but would like to receive partial results say every hour.
>
> 3)   Do window join time intervals have to be constant or can they
> depend on row attributes. I am running campaigns that have start and end
> dates and so I would like my join window to be that interval.
>
>
>
> Thank you,
>
>
>
> Seth Wiesman
>
>
>


Re: Savepoints and migrating value state data types

2017-11-06 Thread mrooding
Hi Gordon

I've been looking into creating a custom AvroSerializer without Kryo which
would support Avro schemas and I'm starting to wonder if this is actually
the most straightforward way to do it. 

If I extend a class from TypeSerializer I would also need to implement a
TypeInformation class to be able to provide my serializer. Implementing all
these classes seems to be quite the ordeal without proper documentation. Are
you sure that this is the right way forward and that there's no other option
of using Avro serialization with schema support for Flink?

Thanks again

Marc





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


DataStream to Table Api idioms

2017-11-06 Thread Seth Wiesman
Hi,

I am experimenting with rewriting some of my datastream projects with the table 
api and I had some questions on how to express certain idioms. I am using 
1.4-SNAPSHOT.


1)   Can I express allowed lateness?

2)   Can I use a custom trigger? More specifically, I have a 24hr window 
but would like to receive partial results say every hour.

3)   Do window join time intervals have to be constant or can they depend 
on row attributes. I am running campaigns that have start and end dates and so 
I would like my join window to be that interval.

Thank you,

Seth Wiesman



Re: Docker-Flink Project: TaskManagers can't talk to JobManager if they are on different nodes

2017-11-06 Thread Till Rohrmann
I'm not entirely sure how docker swarm works but from the Flink perspective
there mustn't be two TaskManagers running on the same host (meaning an
entity where you share the same address) if you set the TaskManager data
port to a fixed value (otherwise only one of them can be started due to
port conflicts). If you can ensure that this is the case, then it should be
save to specify a port for the data transmission.

Cheers,
Till

On Mon, Nov 6, 2017 at 2:37 PM, Vergilio, Thalita <
t.vergilio4...@student.leedsbeckett.ac.uk> wrote:

> Hi Till,
>
>
> Thanks a lot for your answer.
>
>
> Is the taskmanager.data.port unique per TaskManager? The documentation
> says it is assigned at runtime by the OS. My thinking here is that you
> would need to know what that is at service creation time, which would go
> against the whole idea of how services are scaled in Docker Swarm.
>
>
> When you create a Swarm service using 'docker stack deploy' or 'docker
> service create', the configuration that is used at that point is the same
> that will be used by all instances of the service. If you then scale
> TaskManager to 8 or 10 containers, each of them gets the same service
> configuration(the one used to create the service).
>
>
> I have in fact tried to map specific ports in the TaskManager service
> configuration, but then I got "port already in use" when I tried to scale
> up the service.
>
>
> I wonder if there is a way around it.
>
>
> Maybe the people who developed the create-docker-swarm-service.sh script
> in the docker-flink project would be able to shed some light?
>
>
> --
> *From:* Till Rohrmann 
> *Sent:* 06 November 2017 12:40:33
> *To:* Piotr Nowojski
> *Cc:* Vergilio, Thalita; user@flink.apache.org; Patrick Lucas
>
> *Subject:* Re: Docker-Flink Project: TaskManagers can't talk to
> JobManager if they are on different nodes
>
> Hi Thalita,
>
> in order to make Flink work, I think you have to expose the JobManager RPC
> port, the Blob server port and make sure that the TaskManager can talk to
> each other by exposing the `taskmanager.data.port`. The query server port
> is only necessary if you want to use queryable state.
>
> I've pulled in Patrick who has more experience with running Flink on top
> of Docker. He'll definitely be able to provide more detailed
> recommendations.
>
> Cheers,
> Till
>
> On Mon, Nov 6, 2017 at 9:22 AM, Piotr Nowojski 
> wrote:
>
> Till, is there somewhere a list of ports that need to exposed that’s more
> up to date compared to docker-flunk README?
>
> Piotrek
>
> On 3 Nov 2017, at 10:23, Vergilio, Thalita  eckett.ac.uk> wrote:
>
> Just an update: by changing the JOB_MANAGER_RPC_ADDRESS to the public IP
> of the JobManager and exposing port 6123 as {{PUBLIC_IP}}:6123:6123, I
> manged to get the TaskManagers from different nodes and even different
> subnets to talk to the JobManager.
>
> This is how I created the services:
>
> docker network create -d overlay overlay
>
> docker service create --name jobmanager --env JOB_MANAGER_RPC_ADDRESS=
> {{PUBLIC_IP}}  -p 8081:8081 -p{{PUBLIC_IP}}:6123:6123 -p 48081:48081 -p
> 6124:6124 -p 6125:6125 --network overlay --constraint 'node.hostname ==
> ubuntu-swarm-manager' flink jobmanager
>
> docker service create --name taskmanager --env JOB_MANAGER_RPC_ADDRESS=
> {{PUBLIC_IP}}  -p 6121:6121 -p 6122:6122  --network overlay --constraint
> 'node.hostname != ubuntu-swarm-manager' flink taskmanager
>
> However, I am still encountering errors further down the line. When I
> submit a job using the Web UI, it fails because the JobManager can't talk
> to the TaskManager on port 35033. I presume this is the
> taskmanager.data.port, which needs to be set to a range and this range
> exposed when I create the service?
>
> Are there any other ports that I need to open at service creation time?
>
> Connecting the channel failed: Connecting to remote task manager + 
> '/{{IP_ADDRESS_OF_MANAGER}}:35033' has failed. This might indicate that the 
> remote task manager has been lost.
>   at 
> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.waitForChannel(PartitionRequestClientFactory.java:196)
>   at 
> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.access$000(PartitionRequestClientFactory.java:131)
>   at 
> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.createPartitionRequestClient(PartitionRequestClientFactory.java:83)
>   at 
> org.apache.flink.runtime.io.network.netty.NettyConnectionManager.createPartitionRequestClient(NettyConnectionManager.java:59)
>   at 
> org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:112)
>   at 
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:433)
>   at 
> 

Re: Docker-Flink Project: TaskManagers can't talk to JobManager if they are on different nodes

2017-11-06 Thread Vergilio, Thalita
Hi Till,


Thanks a lot for your answer.


Is the taskmanager.data.port unique per TaskManager? The documentation says it 
is assigned at runtime by the OS. My thinking here is that you would need to 
know what that is at service creation time, which would go against the whole 
idea of how services are scaled in Docker Swarm.


When you create a Swarm service using 'docker stack deploy' or 'docker service 
create', the configuration that is used at that point is the same that will be 
used by all instances of the service. If you then scale TaskManager to 8 or 10 
containers, each of them gets the same service configuration(the one used to 
create the service).


I have in fact tried to map specific ports in the TaskManager service 
configuration, but then I got "port already in use" when I tried to scale up 
the service.


I wonder if there is a way around it.


Maybe the people who developed the create-docker-swarm-service.sh script in the 
docker-flink project would be able to shed some light?




From: Till Rohrmann 
Sent: 06 November 2017 12:40:33
To: Piotr Nowojski
Cc: Vergilio, Thalita; user@flink.apache.org; Patrick Lucas
Subject: Re: Docker-Flink Project: TaskManagers can't talk to JobManager if 
they are on different nodes

Hi Thalita,

in order to make Flink work, I think you have to expose the JobManager RPC 
port, the Blob server port and make sure that the TaskManager can talk to each 
other by exposing the `taskmanager.data.port`. The query server port is only 
necessary if you want to use queryable state.

I've pulled in Patrick who has more experience with running Flink on top of 
Docker. He'll definitely be able to provide more detailed recommendations.

Cheers,
Till

On Mon, Nov 6, 2017 at 9:22 AM, Piotr Nowojski 
> wrote:
Till, is there somewhere a list of ports that need to exposed that’s more up to 
date compared to docker-flunk README?

Piotrek

On 3 Nov 2017, at 10:23, Vergilio, Thalita 
>
 wrote:

Just an update: by changing the JOB_MANAGER_RPC_ADDRESS to the public IP of the 
JobManager and exposing port 6123 as {{PUBLIC_IP}}:6123:6123, I manged to get 
the TaskManagers from different nodes and even different subnets to talk to the 
JobManager.

This is how I created the services:


docker network create -d overlay overlay

docker service create --name jobmanager --env 
JOB_MANAGER_RPC_ADDRESS={{PUBLIC_IP}}  -p 8081:8081 -p{{PUBLIC_IP}}:6123:6123 
-p 48081:48081 -p 6124:6124 -p 6125:6125 --network overlay --constraint 
'node.hostname == ubuntu-swarm-manager' flink jobmanager

docker service create --name taskmanager --env 
JOB_MANAGER_RPC_ADDRESS={{PUBLIC_IP}}  -p 6121:6121 -p 6122:6122  --network 
overlay --constraint 'node.hostname != ubuntu-swarm-manager' flink taskmanager

However, I am still encountering errors further down the line. When I submit a 
job using the Web UI, it fails because the JobManager can't talk to the 
TaskManager on port 35033. I presume this is the taskmanager.data.port, which 
needs to be set to a range and this range exposed when I create the service?

Are there any other ports that I need to open at service creation time?


Connecting the channel failed: Connecting to remote task manager + 
'/{{IP_ADDRESS_OF_MANAGER}}:35033' has failed. This might indicate that the 
remote task manager has been lost.
at 
org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.waitForChannel(PartitionRequestClientFactory.java:196)
at 
org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.access$000(PartitionRequestClientFactory.java:131)
at 
org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.createPartitionRequestClient(PartitionRequestClientFactory.java:83)
at 
org.apache.flink.runtime.io.network.netty.NettyConnectionManager.createPartitionRequestClient(NettyConnectionManager.java:59)
at 
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:112)
at 
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:433)
at 
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:455)
at 
org.apache.flink.streaming.runtime.io.BarrierTracker.getNextNonBlocked(BarrierTracker.java:91)
at 

Re: JobManager web interface redirect strategy when running in HA

2017-11-06 Thread mrooding
Chesnay, your solution is definitely the best approach. I was already
wondering why the decision was made to only support the UI through the
leading job manager only.

Jürgen, I don't think that your solution will work in our setup. We're
currently running 3 services, one for each job manager. We need a service
per job manager because they obviously need to be able to talk to each
other. In the latest version of OpenShift you can use a StatefulSet to
handle these situations but unfortunately, StatefulSets seem to rely on each
node receiving its own persistent volume claim whereas Flink seems to share
1 persistent volume claim for all nodes.

I've been going through the Kubernetes documentation about Load Balancers
but I'm unable to find a solution which handles both cases:
- each node being available through a cluster name (e.g.
flink-jobmanager-1.env.svc.cluster.local)
- exposing 1 URL which uses the load balancing solution proposed by you

Worst case is that we would have to wait for Flink 1.5 and keep using 3
distinct URLs. It's not ideal but there are also bigger fish to tackle.

Marc



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Docker-Flink Project: TaskManagers can't talk to JobManager if they are on different nodes

2017-11-06 Thread Till Rohrmann
Hi Thalita,

in order to make Flink work, I think you have to expose the JobManager RPC
port, the Blob server port and make sure that the TaskManager can talk to
each other by exposing the `taskmanager.data.port`. The query server port
is only necessary if you want to use queryable state.

I've pulled in Patrick who has more experience with running Flink on top of
Docker. He'll definitely be able to provide more detailed recommendations.

Cheers,
Till

On Mon, Nov 6, 2017 at 9:22 AM, Piotr Nowojski 
wrote:

> Till, is there somewhere a list of ports that need to exposed that’s more
> up to date compared to docker-flunk README?
>
> Piotrek
>
> On 3 Nov 2017, at 10:23, Vergilio, Thalita  leedsbeckett.ac.uk> wrote:
>
> Just an update: by changing the JOB_MANAGER_RPC_ADDRESS to the public IP
> of the JobManager and exposing port 6123 as {{PUBLIC_IP}}:6123:6123, I
> manged to get the TaskManagers from different nodes and even different
> subnets to talk to the JobManager.
>
> This is how I created the services:
>
> docker network create -d overlay overlay
>
> docker service create --name jobmanager --env JOB_MANAGER_RPC_ADDRESS={{
> PUBLIC_IP}}  -p 8081:8081 -p{{PUBLIC_IP}}:6123:6123 -p 48081:48081 -p
> 6124:6124 -p 6125:6125 --network overlay --constraint 'node.hostname ==
> ubuntu-swarm-manager' flink jobmanager
>
> docker service create --name taskmanager --env JOB_MANAGER_RPC_ADDRESS={{
> PUBLIC_IP}}  -p 6121:6121 -p 6122:6122  --network overlay --constraint
> 'node.hostname != ubuntu-swarm-manager' flink taskmanager
>
> However, I am still encountering errors further down the line. When I
> submit a job using the Web UI, it fails because the JobManager can't talk
> to the TaskManager on port 35033. I presume this is the
> taskmanager.data.port, which needs to be set to a range and this range
> exposed when I create the service?
>
> Are there any other ports that I need to open at service creation time?
>
> Connecting the channel failed: Connecting to remote task manager + 
> '/{{IP_ADDRESS_OF_MANAGER}}:35033' has failed. This might indicate that the 
> remote task manager has been lost.
>   at 
> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.waitForChannel(PartitionRequestClientFactory.java:196)
>   at 
> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.access$000(PartitionRequestClientFactory.java:131)
>   at 
> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.createPartitionRequestClient(PartitionRequestClientFactory.java:83)
>   at 
> org.apache.flink.runtime.io.network.netty.NettyConnectionManager.createPartitionRequestClient(NettyConnectionManager.java:59)
>   at 
> org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:112)
>   at 
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:433)
>   at 
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:455)
>   at 
> org.apache.flink.streaming.runtime.io.BarrierTracker.getNextNonBlocked(BarrierTracker.java:91)
>   at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:213)
>   at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>   at java.lang.Thread.run(Thread.java:748)
>
>
>
> --
> *From:* Piotr Nowojski 
> *Sent:* 02 November 2017 14:26:32
> *To:* Vergilio, Thalita
> *Cc:* user@flink.apache.org
> *Subject:* Re: Docker-Flink Project: TaskManagers can't talk to
> JobManager if they are on different nodes
>
> Did you try to expose required ports that are listed in the README when
> starting the containers?
>
> https://github.com/apache/flink/tree/master/flink-contrib/docker-flink
>
> Ports:
> • The Web Client is on port 48081
> • JobManager RPC port 6123 (default, not exposed to host)
> • TaskManagers RPC port 6122 (default, not exposed to host)
> • TaskManagers Data port 6121 (default, not exposed to host)
>
> Piotrek
>
> On 2 Nov 2017, at 14:44, javalass  leedsbeckett.ac.uk> wrote:
>
> I am using the Docker-Flink project in:
> https://github.com/apache/flink/tree/master/flink-contrib/docker-flink
>
> I am creating the services with the following commands:
> docker network create -d overlay overlay
> docker service create --name jobmanager --env
> JOB_MANAGER_RPC_ADDRESS=jobmanager -p 8081:8081 --network overlay
> --constraint 'node.hostname == ubuntu-swarm-manager' flink jobmanager
> docker service create --name taskmanager --env
> 

FlinkCEP behaviour with time constraints not as expected

2017-11-06 Thread Federico D'Ambrosio
Hi everyone,

I wanted to ask if FlinkCEP in the following scenario is working as it
should, or I have misunderstood its functioning.

I've got a keyedstream associated with the following pattern:

Pattern[Event].begin("start").where(_.value >=100).oneOrMore
.notNext("end").where(_.value >=100).within(Time.minutes(30))

Considering a single key in the stream, for simplicity, I've got the
following sequence of events (using EventTime on the "time" field of the
json event):

{value: 100, time: "2017-11-05 03:50:02.000"}
{value: 100, time: "2017-11-05 03:52:02.000"}
{value: 100, time: "2017-11-05 03:54:02.000"}
{value: 100, time: "2017-11-05 03:56:02.000"} // end of events within the
30 minutes from the first event
{value: 100, time: "2017-11-05 06:00:02.000"}

Now, when it comes to the select/flatselect function, I tried printing the
content of the pattern map and what I noticed is that, for example, the
first 2 events weren't considered in the same pattern as the map was like
the following:

{start=[{value: 100, time: 2017-11-05 03:50:02.000}]}
{start=[{value: 100, time: 2017-11-05 03:52:02.000}]}

Now, shouldn't they be in the same List, as they belong to the same
iterative pattern, defined with the oneOrMore clause?

Thank you for your insight,
Federico D'Ambrosio


Re: Negative values using latency marker

2017-11-06 Thread Nico Kruber
Ok, digging into it a bit further:

The LatencyMarker is scheduled at a certain period with some initialDelay. Its 
initial time is `System.currentTimeMillis() + initialDelay` (when it should 
first be run). Depending on your system's load, this run may actually be 
delayed (but then the marker's time will fall behind, not explaining a 
negative value) but from the Executor's documentation, I don't think, it 
should execute it too early. For future markers, their time will simply be 
increased by the period (which may fall behind for the same reason).

Before emitting the metric, the difference to `System.currentTimeMillis()` 
will be used which is based on system time and may decrease if the clock is 
adjusted, e.g. via NTP. Also, this is probably called from a different thread 
and `System.currentTimeMillis()` apparently may jump backwards there as well 
[1].


Nico

[1] 
https://stackoverflow.com/questions/2978598/will-system-currenttimemillis-always-return-a-value-previous-calls


On Sunday, 5 November 2017 09:22:05 CET Sofer, Tovi  wrote:
> Hi Nico,
> 
> Actually the run below is on my local machine, and both Kafka and flink run
> on it.
> 
> Thanks and regards,
> Tovi
> -Original Message-
> From: Nico Kruber [mailto:n...@data-artisans.com]
> Sent: יום ו 03 נובמבר 2017 15:22
> To: user@flink.apache.org
> Cc: Sofer, Tovi [ICG-IT] 
> Subject: Re: Negative values using latency marker
> 
> Hi Tovi,
> if I see this correctly, the LatencyMarker gets its initial timstamp during
> creation at the source and the latency is reported as a metric at a sink by
> comparing the initial timestamp with the current time. If the clocks
> between the two machines involved diverge, e.g. the sinks clock falling
> behind, the difference may be negative.
> 
> 
> Nico
> 
> On Thursday, 2 November 2017 17:58:51 CET Sofer, Tovi  wrote:
> > Hi group,
> > 
> > Can someone maybe elaborate how can latency gauge shown by latency
> > marker be negative?
> > 
> > 2017-11-02 18:54:56,842 INFO
> > com.citi.artemis.flink.reporters.ArtemisReporter -
> > [Flink-MetricRegistry-1]
> > 127.0.0.1.taskmanager.f40ca57104c8642218e5b8979a380ee4.Flink Streaming
> > Job.Sink: FinalSink.0.latency: {LatencySourceDescriptor{vertexID=1,
> > subtaskIndex=0}={p99=-5.0, p50=-5.0, min=-5.0, max=-5.0, p95=-5.0,
> > mean=-5.0}, LatencySourceDescriptor{vertexID=1,
> > subtaskIndex=1}={p99=-5.0, p50=-5.0, min=-5.0, max=-5.0, p95=-5.0,
> > mean=-5.0}, LatencySourceDescriptor{vertexID=1,
> > subtaskIndex=2}={p99=-6.0, p50=-6.0, min=-6.0, max=-6.0, p95=-6.0,
> > mean=-6.0}, LatencySourceDescriptor{vertexID=1,
> > subtaskIndex=3}={p99=-6.0, p50=-6.0, min=-6.0, max=-6.0, p95=-6.0,
> > mean=-6.0}} 2017-11-02 18:54:56,843 INFO
> > com.citi.artemis.flink.reporters.ArtemisReporter -
> > [Flink-MetricRegistry-1]
> > 127.0.0.1.taskmanager.f40ca57104c8642218e5b8979a380ee4.Flink Streaming
> > Job.AverageE2ELatencyChecker.0.60SecWarmUpRecordsCounter: 2858446
> > 2017-11-02 18:54:56,843 INFO
> > com.citi.artemis.flink.reporters.ArtemisReporter -
> > [Flink-MetricRegistry-1]
> > 127.0.0.1.taskmanager.f40ca57104c8642218e5b8979a380ee4.Flink Streaming
> > Job.Source: fixTopicConsumerSource.3.numRecordsOut: 1954784 2017-11-02
> > 18:54:56,843 INFO  com.citi.artemis.flink.reporters.ArtemisReporter -
> > [Flink-MetricRegistry-1]
> > 127.0.0.1.taskmanager.f40ca57104c8642218e5b8979a380ee4.Flink Streaming
> > Job.AverageE2ELatencyChecker.0.ActualRecordsCounter: 4962675
> > 2017-11-02
> > 18:54:56,843 INFO  com.citi.artemis.flink.reporters.ArtemisReporter -
> > [Flink-MetricRegistry-1]
> > 127.0.0.1.taskmanager.f40ca57104c8642218e5b8979a380ee4.Flink Streaming
> > Job.AverageE2ELatencyChecker.0.AverageLatencyMs: 0.0753785 2017-11-02
> > 18:54:56,843 INFO  com.citi.artemis.flink.reporters.ArtemisReporter -
> > [Flink-MetricRegistry-1]
> > 127.0.0.1.taskmanager.f40ca57104c8642218e5b8979a380ee4.Flink Streaming
> > Job.AverageE2ELatencyChecker.0.HighLatencyMsgPercentage: 0.5918576
> > 2017-11-02 18:54:56,843 INFO
> > com.citi.artemis.flink.reporters.ArtemisReporter -
> > [Flink-MetricRegistry-1]
> > 127.0.0.1.taskmanager.f40ca57104c8642218e5b8979a380ee4.Flink Streaming
> > Job.Source: fixTopicConsumerSource.0.numRecordsOutPerSecond:
> > 12943.1167 2017-11-02 18:54:56,843 INFO
> > com.citi.artemis.flink.reporters.ArtemisReporter -
> > [Flink-MetricRegistry-1]
> > 127.0.0.1.taskmanager.f40ca57104c8642218e5b8979a380ee4.Flink Streaming
> > Job.AverageE2ELatencyChecker.0.numRecordsInPerSecond: 51751.4
> > 2017-11-02
> > 18:54:56,843 INFO  com.citi.artemis.flink.reporters.ArtemisReporter -
> > [Flink-MetricRegistry-1]
> > 127.0.0.1.taskmanager.f40ca57104c8642218e5b8979a380ee4.Flink Streaming
> > Job.Source: fixTopicConsumerSource.3.numRecordsOutPerSecond: 12935.05
> > 2017-11-02 18:54:56,843 INFO
> > com.citi.artemis.flink.reporters.ArtemisReporter -
> > [Flink-MetricRegistry-1]
> > 127.0.0.1.taskmanager.f40ca57104c8642218e5b8979a380ee4.Flink Streaming
> 

Re: Facing issues with Logback

2017-11-06 Thread Fabian Hueske
Hi Teena,

thanks for reaching out to the mailing list for this issue. This sound
indeed like a bug in Flink and should be investigated.
We are currently working on a new release 1.4 and the testing phase will
start soon. So it would make sense to include this problem in the testing
and hopefully include a bugfix for the next release.

I've created a JIRA issue to track the problem [1].

I left out the "affects version" field because you didn't mention your
Flink version.
Can you update the JIRA issue or reply with your version?

Thank you,
Fabian

[1] https://issues.apache.org/jira/browse/FLINK-7990

2017-10-31 8:37 GMT+01:00 Teena K :

> I have a single node Flink instance which has the required jars for
> logback in the lib folder (logback-classic.jar, logback-core.jar,
> log4j-over-slf4j.jar). I have removed the jars for log4j from the lib
> folder (log4j-1.2.17.jar, slf4j-log4j12-1.7.7.jar). 'logback.xml' is also
> correctly updated in 'conf' folder. I have also included 'logback.xml' in
> the classpath, although this does not seem to be considered while the job
> is run. Flink refers to logback.xml inside the conf folder only. I have
> updated pom.xml as per Flink's documentation in order to exclude log4j. I
> have some log entries set inside a few map and flatmap functions and some
> log entries outside those functions (eg: "program execution started").
>
> When I run the job, Flink writes only those logs that are coded outside
> the transformations. Those logs that are coded inside the transformations
> (map, flatmap etc) are not getting written to the log file. If this was
> happening always, I could have assumed that the Task Manager is not writing
> the logs. But Flink displays a strange behavior regarding this. Whenever I
> update the logback jars inside the the lib folder(due to version changes),
> during the next job run, all logs (even those inside map and flatmap) are
> written correctly into the log file. But the logs don't get written in any
> of the runs after that. This means that my 'logback.xml' file is correct
> and the settings are also correct. But I don't understand why the same
> settings don't work while the same job is run again.
>
>
>
>


Re: Docker-Flink Project: TaskManagers can't talk to JobManager if they are on different nodes

2017-11-06 Thread Piotr Nowojski
Till, is there somewhere a list of ports that need to exposed that’s more up to 
date compared to docker-flunk README?

Piotrek

> On 3 Nov 2017, at 10:23, Vergilio, Thalita 
>  wrote:
> 
> Just an update: by changing the JOB_MANAGER_RPC_ADDRESS to the public IP of 
> the JobManager and exposing port 6123 as {{PUBLIC_IP}}:6123:6123, I manged to 
> get the TaskManagers from different nodes and even different subnets to talk 
> to the JobManager.
> 
> This is how I created the services:
> 
> docker network create -d overlay overlay
> 
> docker service create --name jobmanager --env 
> JOB_MANAGER_RPC_ADDRESS={{PUBLIC_IP}}  -p 8081:8081 -p{{PUBLIC_IP}}:6123:6123 
> -p 48081:48081 -p 6124:6124 -p 6125:6125 --network overlay --constraint 
> 'node.hostname == ubuntu-swarm-manager' flink jobmanager
> 
> docker service create --name taskmanager --env 
> JOB_MANAGER_RPC_ADDRESS={{PUBLIC_IP}}  -p 6121:6121 -p 6122:6122  --network 
> overlay --constraint 'node.hostname != ubuntu-swarm-manager' flink taskmanager
> 
> However, I am still encountering errors further down the line. When I submit 
> a job using the Web UI, it fails because the JobManager can't talk to the 
> TaskManager on port 35033. I presume this is the taskmanager.data.port, which 
> needs to be set to a range and this range exposed when I create the service?
> 
> Are there any other ports that I need to open at service creation time?
> 
> Connecting the channel failed: Connecting to remote task manager + 
> '/{{IP_ADDRESS_OF_MANAGER}}:35033' has failed. This might indicate that the 
> remote task manager has been lost.
>   at 
> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.waitForChannel(PartitionRequestClientFactory.java:196)
>   at 
> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.access$000(PartitionRequestClientFactory.java:131)
>   at 
> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.createPartitionRequestClient(PartitionRequestClientFactory.java:83)
>   at 
> org.apache.flink.runtime.io.network.netty.NettyConnectionManager.createPartitionRequestClient(NettyConnectionManager.java:59)
>   at 
> org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:112)
>   at 
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:433)
>   at 
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:455)
>   at 
> org.apache.flink.streaming.runtime.io.BarrierTracker.getNextNonBlocked(BarrierTracker.java:91)
>   at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:213)
>   at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>   at java.lang.Thread.run(Thread.java:748)
> 
> 
> From: Piotr Nowojski 
> Sent: 02 November 2017 14:26:32
> To: Vergilio, Thalita
> Cc: user@flink.apache.org
> Subject: Re: Docker-Flink Project: TaskManagers can't talk to JobManager if 
> they are on different nodes
>  
> Did you try to expose required ports that are listed in the README when 
> starting the containers?
> 
> https://github.com/apache/flink/tree/master/flink-contrib/docker-flink 
> 
> Ports:
> • The Web Client is on port 48081
> • JobManager RPC port 6123 (default, not exposed to host)
> • TaskManagers RPC port 6122 (default, not exposed to host)
> • TaskManagers Data port 6121 (default, not exposed to host)
> 
> Piotrek
> 
>> On 2 Nov 2017, at 14:44, javalass > > wrote:
>> 
>> I am using the Docker-Flink project in:
>> https://github.com/apache/flink/tree/master/flink-contrib/docker-flink 
>>  
>> 
>> I am creating the services with the following commands:
>> docker network create -d overlay overlay
>> docker service create --name jobmanager --env
>> JOB_MANAGER_RPC_ADDRESS=jobmanager -p 8081:8081 --network overlay
>> --constraint 'node.hostname == ubuntu-swarm-manager' flink jobmanager
>> docker service create --name taskmanager --env
>> JOB_MANAGER_RPC_ADDRESS=jobmanager --network overlay --constraint
>> 'node.hostname != ubuntu-swarm-manager' flink taskmanager
>> 
>> I wonder if there's any configuration I'm missing. This is the error I get:
>> - Trying to register at JobManager akka.tcp://flink@jobmanager:6123/ 
>>   
>> user/jobmanager (attempt 4, timeout: 4000 milliseconds)
>>