Making external calls from a FlinkKafkaPartitioner

2017-11-02 Thread Ron Crocker
We have a system where the Kafka partition a message should go into is a 
function of a value in the message. Often, it’s value % # partitions, but for 
some values it’s not - it’s a specified list of partitions that changes over 
time. Our “simple Java library” that produces messages for this system also has 
a background thread that periodically polls a HTTP endpoint (at a rate of 
1/minute as its default) to refresh that list of special cases.

It’s easy to create a FlinkKafkaPartitioner that does the mod operation; what 
I’m not so sure about is how to get this polling operation into the 
partitioner. I’m about to try it the obvious way (create a background thread 
that polls the URL and updates the partition map), but I wonder if that’s 
actually going to cause a bunch of problems for the Flink runtime.

Here’s the code that I have right now:
public class EventInsertPartitioner extends KafkaPartitioner> {
private final String partitionerURL;
private final long updateIntervalInMillis;
private Map partitionMap;
private ScheduledExecutorService executor;

public EventInsertPartitioner(String partitionerURL, long 
updateIntervalInMillis) {
this.partitionerURL = partitionerURL;
this.updateIntervalInMillis = updateIntervalInMillis;
this.partitionMap = new HashMap<>();
}

@Override
public void open(int parallelInstanceId, int parallelInstances, int[] 
partitions) {
executor = Executors.newScheduledThreadPool(1);
executor.scheduleAtFixedRate(
() -> updatePartitionMapRunnable(),
updateIntervalInMillis,
updateIntervalInMillis,
TimeUnit.MILLISECONDS);

}

private void updatePartitionMapRunnable() {
// Make synchronous request to partitionerURL
// This is a simple JSON that matches our data
String response = "{1:[1,2,3],2:[2]}";
// Replace current partitionMap with new HashMap from the response
this.partitionMap = convertResponseToMap(response); 
// Replacing the current value of partitionMap with the updated version 
doesn't
// require synchronization
}

private Map convertResponseToMap(String response) {
Map hashMap = new HashMap<>();
// Convert response to JSON structure and just use that?
// or Iterate and add to local hashMap
return hashMap;
}

@Override
public int partition(Tuple2 next, byte[] serializedKey, 
byte[] serializedValue, int numPartitions) {
long myKey = next.f0;

if (partitionMap.containsKey(myKey)) {
List partitions = partitionMap.get(myKey);
myKey = 
partitions.get(ThreadLocalRandom.current().nextInt(partitions.size()));
}

return (int)(myKey % numPartitions);
}
}
Ron
—
Ron Crocker
Principal Engineer & Architect
( ( •)) New Relic
rcroc...@newrelic.com
M: +1 630 363 8835



Incremental checkpointing documentation

2017-11-02 Thread Elias Levy
There doesn't appear to be much in the way of documentation for incremental
checkpointing other than how to turn it on.  That leaves a lot of questions
unanswered.

What is the interaction of incremental checkpointing and external
checkpoints?

Any interaction with the state.checkpoints.num-retained config?

Does incremental checkpointing require any maintenance?

Any interaction with savepoints?

Does it perform better against certain "file systems"?  E.g. it S3 not
recommended for it?  How about EFS?


Initialise side input state

2017-11-02 Thread Maxim Parkachov
Hi Flink users,

I'm struggling with some basic concept and would appreciate some help. I
have 2 Input streams, one is fast event stream and one is slow changing
dimension. They have the same key and I use CoProcessFunction to store slow
data in state and enrich fast data from this state. Everything works as
expected.

Before I start processing fast streams on first run, I would like to completely
initialise state. I though it could be done in open(), but I don't
understand how it will be re-distributed across parallel operators.

Another alternative would be to create custom source and push all slow
dimension
data downstream, but I could not find how to hold processing fast data
until state is initialised.

I realise that FLIP-17 (Side Inputs) is what I need, but is there some other
way to implement it now ?

Thanks,
Maxim.


Re: Docker-Flink Project: TaskManagers can't talk to JobManager if they are on different nodes

2017-11-02 Thread Vergilio, Thalita
I think I may be getting somewhere with this. I have opened the 
blob.server.port and the query.server.port on the TaskManager service, and I 
can now connect to JobManager from nodes in the same subnet.

However, nodes that are located in different clouds don't seem to be able to 
resolve the 'jobmanager' host by name:

ubuntu@osdc-swarm-worker-1:~$ sudo docker ps
CONTAINER IDIMAGE   COMMAND  CREATED
 STATUS  PORTSNAMES
e6a26caf81b4flink:latest"/docker-entrypoin..."   21 seconds ago 
 Up 1 second 6123/tcp, 8081/tcp   
taskmanager.7.k0nc3tb7pxv4ppfuaxg155ku5
e12a280860a7flink:latest"/docker-entrypoin..."   21 seconds ago 
 Up 2 seconds6123/tcp, 8081/tcp   
taskmanager.8.si7f8wk132jn9z5hwbx568nbj
b459162a8ef6flink:latest"/docker-entrypoin..."   22 seconds ago 
 Up 5 seconds6123/tcp, 8081/tcp   
taskmanager.3.x2s45mt0qyx2eucirxwj0wmyx
ubuntu@osdc-swarm-worker-1:~$ sudo docker logs e12a280860a7
Starting Task Manager
config file:
jobmanager.rpc.address: jobmanager
jobmanager.rpc.port: 6123
jobmanager.heap.mb: 1024
taskmanager.heap.mb: 1024
taskmanager.numberOfTaskSlots: 2
taskmanager.memory.preallocate: false
parallelism.default: 1
jobmanager.web.port: 8081
blob.server.port: 6124
query.server.port: 6125
Starting taskmanager as a console application on host e12a280860a7.
2017-11-02 18:46:35,481 WARN  org.apache.hadoop.util.NativeCodeLoader   
- Unable to load native-hadoop library for your platform... using 
builtin-java classes where applicable
2017-11-02 18:46:35,744 INFO  org.apache.flink.runtime.taskmanager.TaskManager  
- 

2017-11-02 18:46:35,744 INFO  org.apache.flink.runtime.taskmanager.TaskManager  
-  Starting TaskManager (Version: 1.3.2, Rev:0399bee, 
Date:03.08.2017 @ 10:23:11 UTC)
2017-11-02 18:46:35,744 INFO  org.apache.flink.runtime.taskmanager.TaskManager  
-  Current user: flink
2017-11-02 18:46:35,744 INFO  org.apache.flink.runtime.taskmanager.TaskManager  
-  JVM: OpenJDK 64-Bit Server VM - Oracle Corporation - 
1.8/25.141-b15
2017-11-02 18:46:35,745 INFO  org.apache.flink.runtime.taskmanager.TaskManager  
-  Maximum heap size: 1024 MiBytes
2017-11-02 18:46:35,745 INFO  org.apache.flink.runtime.taskmanager.TaskManager  
-  JAVA_HOME: /docker-java-home/jre
2017-11-02 18:46:35,752 INFO  org.apache.flink.runtime.taskmanager.TaskManager  
-  Hadoop version: 2.7.2
2017-11-02 18:46:35,753 INFO  org.apache.flink.runtime.taskmanager.TaskManager  
-  JVM Options:
2017-11-02 18:46:35,753 INFO  org.apache.flink.runtime.taskmanager.TaskManager  
- -XX:+UseG1GC
2017-11-02 18:46:35,753 INFO  org.apache.flink.runtime.taskmanager.TaskManager  
- -Xms1024M
2017-11-02 18:46:35,753 INFO  org.apache.flink.runtime.taskmanager.TaskManager  
- -Xmx1024M
2017-11-02 18:46:35,753 INFO  org.apache.flink.runtime.taskmanager.TaskManager  
- -XX:MaxDirectMemorySize=8388607T
2017-11-02 18:46:35,753 INFO  org.apache.flink.runtime.taskmanager.TaskManager  
- 
-Dlog4j.configuration=file:/opt/flink/conf/log4j-console.properties
2017-11-02 18:46:35,753 INFO  org.apache.flink.runtime.taskmanager.TaskManager  
- 
-Dlogback.configurationFile=file:/opt/flink/conf/logback-console.xml
2017-11-02 18:46:35,753 INFO  org.apache.flink.runtime.taskmanager.TaskManager  
-  Program Arguments:
2017-11-02 18:46:35,753 INFO  org.apache.flink.runtime.taskmanager.TaskManager  
- --configDir
2017-11-02 18:46:35,754 INFO  org.apache.flink.runtime.taskmanager.TaskManager  
- /opt/flink/conf
2017-11-02 18:46:35,754 INFO  org.apache.flink.runtime.taskmanager.TaskManager  
-  Classpath: 
/opt/flink/lib/flink-python_2.11-1.3.2.jar:/opt/flink/lib/flink-shaded-hadoop2-uber-1.3.2.jar:/opt/flink/lib/log4j-1.2.17.jar:/opt/flink/lib/slf4j-log4j12-1.7.7.jar:/opt/flink/lib/flink-dist_2.11-1.3.2.jar:::
2017-11-02 18:46:35,754 INFO  org.apache.flink.runtime.taskmanager.TaskManager  
- 

2017-11-02 18:46:35,757 INFO  org.apache.flink.runtime.taskmanager.TaskManager  
- Registered UNIX signal handlers for [TERM, HUP, INT]
2017-11-02 18:46:35,781 INFO  org.apache.flink.runtime.taskmanager.TaskManager  
- Maximum number of open file descriptors is 1048576
2017-11-02 18:46:35,834 INFO  org.apache.flink.runtime.taskmanager.TaskManager  
- Loading configuration from /opt/flink/conf
2017-11-02 18:46:35,843 INFO  
org.apache.flink.configuration.GlobalConfiguration- Loading 
configuration property: jobmanager.rpc.address, jobmanager

Re: Docker-Flink Project: TaskManagers can't talk to JobManager if they are on different nodes

2017-11-02 Thread Vergilio, Thalita
Hi Piotr,


Thank you very much for your reply.


Yes, I have tried to open these ports when I create the services.  If I create 
them with:


docker service create --name jobmanager --env 
JOB_MANAGER_RPC_ADDRESS=jobmanager  -p 8081:8081 -p 6123:6123 -p 48081:48081  
--network overlay --constraint 'node.hostname == ubuntu-swarm-manager' flink 
jobmanager

docker service create --name taskmanager --env 
JOB_MANAGER_RPC_ADDRESS=jobmanager -p 6121:6121 -p 6122:6122 --network overlay 
--constraint 'node.hostname != ubuntu-swarm-manager' flink taskmanager

I still get the same issue.


Thank you very much for taking your time to look at this.


Best wishes,


Thalita


From: Piotr Nowojski 
Sent: 02 November 2017 14:26:32
To: Vergilio, Thalita
Cc: user@flink.apache.org
Subject: Re: Docker-Flink Project: TaskManagers can't talk to JobManager if 
they are on different nodes

Did you try to expose required ports that are listed in the README when 
starting the containers?

https://github.com/apache/flink/tree/master/flink-contrib/docker-flink

Ports:
• The Web Client is on port 48081
• JobManager RPC port 6123 (default, not exposed to host)
• TaskManagers RPC port 6122 (default, not exposed to host)
• TaskManagers Data port 6121 (default, not exposed to host)

Piotrek

On 2 Nov 2017, at 14:44, javalass 
>
 wrote:

I am using the Docker-Flink project in:
https://github.com/apache/flink/tree/master/flink-contrib/docker-flink

I am creating the services with the following commands:
docker network create -d overlay overlay
docker service create --name jobmanager --env
JOB_MANAGER_RPC_ADDRESS=jobmanager -p 8081:8081 --network overlay
--constraint 'node.hostname == ubuntu-swarm-manager' flink jobmanager
docker service create --name taskmanager --env
JOB_MANAGER_RPC_ADDRESS=jobmanager --network overlay --constraint
'node.hostname != ubuntu-swarm-manager' flink taskmanager

I wonder if there's any configuration I'm missing. This is the error I get:
- Trying to register at JobManager akka.tcp://flink@jobmanager:6123/
user/jobmanager (attempt 4, timeout: 4000 milliseconds)







--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

To view the terms under which this email is distributed, please go to:-
http://disclaimer.leedsbeckett.ac.uk/disclaimer/disclaimer.html


Re: Flink send checkpointing message in IT

2017-11-02 Thread Rinat
Chesnay, thanks for your reply, it was very helpful, but I took logic from this 
test template and tried to reuse it in my IT case, but found one more issue.
I’ve registered an accumulator in my source function, and for it’s value, as 
specified in the specified example.
When accumulator has an expected value, I perform a savepoint and wait for it’s 
completion using the further code

ActorGateway jobManager = (ActorGateway) 
Await.result(cluster.leaderGateway().future(), DEADLINE.timeLeft());
Future savepointResultFuture = jobManager.ask(new 
JobManagerMessages.TriggerSavepoint(
jobId, Option.empty()), DEADLINE.timeLeft()
);
Object savepointResult = Await.result(savepointResultFuture, 
DEADLINE.timeLeft());
Afterwards, if failures haven’t been detected I cancels my job and shutdowns 
cluster.

I found, that checkpoint method notifyCheckpointComplete not always called, 
before the savepointResult is ready. So the part of my logic, that lives in 
implementation of this method doesn’t work and test fails.

So could you or someone explain, does Flink guaranties, that 
notifyCheckpointComplete method will be called before savepointResult  will 
become accessable.
For me, it’s rather strange behaviour and I think that I’m doing something 
wrong.

Thx.

> On 1 Nov 2017, at 14:26, Chesnay Schepler  wrote:
> 
> You could trigger a savepoint, which from the viewpoint of 
> sources/operators/sinks is the same thing as a checkpoint.
> 
> How to do this depends a bit on how your test case is written, but you can 
> take a look at the SavepointMigrationTestBase#executeAndSavepoint which is 
> all about running josb and triggering
> savepoints once certain conditions have been met.
> 
> On 30.10.2017 16:01, Rinat wrote:
>> Hi guys, I’ve got a question about working with checkpointing.
>> I would like to implement IT test, where source is a fixed collection of 
>> items and sink performs additional logic, when checkpointing is completed.
>> 
>> I would like to force executing checkpointing, when all messages from my 
>> test source were sent and processed by sink.
>> Please tell me, whether such logic could be performed or not, and how.
>> 
>> Thx !
> 
> 



Negative values using latency marker

2017-11-02 Thread Sofer, Tovi
Hi group,

Can someone maybe elaborate how can latency gauge shown by latency marker be 
negative?

2017-11-02 18:54:56,842 INFO  com.citi.artemis.flink.reporters.ArtemisReporter 
- [Flink-MetricRegistry-1] 
127.0.0.1.taskmanager.f40ca57104c8642218e5b8979a380ee4.Flink Streaming 
Job.Sink: FinalSink.0.latency: {LatencySourceDescriptor{vertexID=1, 
subtaskIndex=0}={p99=-5.0, p50=-5.0, min=-5.0, max=-5.0, p95=-5.0, mean=-5.0}, 
LatencySourceDescriptor{vertexID=1, subtaskIndex=1}={p99=-5.0, p50=-5.0, 
min=-5.0, max=-5.0, p95=-5.0, mean=-5.0}, LatencySourceDescriptor{vertexID=1, 
subtaskIndex=2}={p99=-6.0, p50=-6.0, min=-6.0, max=-6.0, p95=-6.0, mean=-6.0}, 
LatencySourceDescriptor{vertexID=1, subtaskIndex=3}={p99=-6.0, p50=-6.0, 
min=-6.0, max=-6.0, p95=-6.0, mean=-6.0}}
2017-11-02 18:54:56,843 INFO  com.citi.artemis.flink.reporters.ArtemisReporter 
- [Flink-MetricRegistry-1] 
127.0.0.1.taskmanager.f40ca57104c8642218e5b8979a380ee4.Flink Streaming 
Job.AverageE2ELatencyChecker.0.60SecWarmUpRecordsCounter: 2858446
2017-11-02 18:54:56,843 INFO  com.citi.artemis.flink.reporters.ArtemisReporter 
- [Flink-MetricRegistry-1] 
127.0.0.1.taskmanager.f40ca57104c8642218e5b8979a380ee4.Flink Streaming 
Job.Source: fixTopicConsumerSource.3.numRecordsOut: 1954784
2017-11-02 18:54:56,843 INFO  com.citi.artemis.flink.reporters.ArtemisReporter 
- [Flink-MetricRegistry-1] 
127.0.0.1.taskmanager.f40ca57104c8642218e5b8979a380ee4.Flink Streaming 
Job.AverageE2ELatencyChecker.0.ActualRecordsCounter: 4962675
2017-11-02 18:54:56,843 INFO  com.citi.artemis.flink.reporters.ArtemisReporter 
- [Flink-MetricRegistry-1] 
127.0.0.1.taskmanager.f40ca57104c8642218e5b8979a380ee4.Flink Streaming 
Job.AverageE2ELatencyChecker.0.AverageLatencyMs: 0.0753785
2017-11-02 18:54:56,843 INFO  com.citi.artemis.flink.reporters.ArtemisReporter 
- [Flink-MetricRegistry-1] 
127.0.0.1.taskmanager.f40ca57104c8642218e5b8979a380ee4.Flink Streaming 
Job.AverageE2ELatencyChecker.0.HighLatencyMsgPercentage: 0.5918576
2017-11-02 18:54:56,843 INFO  com.citi.artemis.flink.reporters.ArtemisReporter 
- [Flink-MetricRegistry-1] 
127.0.0.1.taskmanager.f40ca57104c8642218e5b8979a380ee4.Flink Streaming 
Job.Source: fixTopicConsumerSource.0.numRecordsOutPerSecond: 12943.1167
2017-11-02 18:54:56,843 INFO  com.citi.artemis.flink.reporters.ArtemisReporter 
- [Flink-MetricRegistry-1] 
127.0.0.1.taskmanager.f40ca57104c8642218e5b8979a380ee4.Flink Streaming 
Job.AverageE2ELatencyChecker.0.numRecordsInPerSecond: 51751.4
2017-11-02 18:54:56,843 INFO  com.citi.artemis.flink.reporters.ArtemisReporter 
- [Flink-MetricRegistry-1] 
127.0.0.1.taskmanager.f40ca57104c8642218e5b8979a380ee4.Flink Streaming 
Job.Source: fixTopicConsumerSource.3.numRecordsOutPerSecond: 12935.05
2017-11-02 18:54:56,843 INFO  com.citi.artemis.flink.reporters.ArtemisReporter 
- [Flink-MetricRegistry-1] 
127.0.0.1.taskmanager.f40ca57104c8642218e5b8979a380ee4.Flink Streaming 
Job.Source: fixTopicConsumerSource.2.numRecordsOutPerSecond: 12946.9166
2017-11-02 18:54:56,843 INFO  com.citi.artemis.flink.reporters.ArtemisReporter 
- [Flink-MetricRegistry-1] 
127.0.0.1.taskmanager.f40ca57104c8642218e5b8979a380ee4.Flink Streaming 
Job.Source: fixTopicConsumerSource.1.numRecordsOutPerSecond: 12926.3168
2017-11-02 18:54:56,844 INFO  com.citi.artemis.flink.reporters.ArtemisReporter 
- [Flink-MetricRegistry-1] 
127.0.0.1.taskmanager.f40ca57104c8642218e5b8979a380ee4.Flink Streaming 
Job.AverageE2ELatencyChecker.0.LatencyHistogram: count:1 min:24753 
max:19199891 mean:77637.6484 stddev:341333.9414842662 p50:40752.0 p75:49809.0 
p95:190480.95 p98:539110.819994 p99:749224.889995 
p999:3817927.9259998496

Regards,
Tovi



Re: Batch job per stream message?

2017-11-02 Thread Fabian Hueske
Hi Tomas,

I'm not familiar with the details of the AsyncFunction, but I'd interpret
this as follows:

- you can make one async call in the asyncInvoke method.
- this call will result in a callback and from that one callback you can
emit a single result by calling AsyncCollector.collect()

The asyncInvoke method is called once per event in the stream, so each
stream event results in one async call and one result.
It's kind of like a MapFunction that talks to an external service.

So if you need to make multiple calls per event, you need multiple
AsyncFunctions.

Best, Fabian

2017-11-01 16:12 GMT+01:00 Tomas Mazukna :

> Hi Fabian,
>
> thanks for pointing me in the right direction
> reading through the documentation here:
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/
> dev/stream/asyncio.html
>
> seems like I can accomplish what I need with async call to a rest service
> or jdbc query per stream item being processed.
> The only confusion for is this statement:
>
> The AsyncCollector is completed with the first call of
> AsyncCollector.collect. All subsequent collect calls will be ignored.
>
> so basically there has to be an accumulator implemented inside
> AsyncFunction to gather up all results and return them in a single
> .collect() call.
> but how to know when to do so? or I am completely off track here
>
>
>
> On Wed, 1 Nov 2017 at 03:57 Fabian Hueske  wrote:
>
>> Hi Tomas,
>>
>> triggering a batch DataSet job from a DataStream program for each input
>> record doesn't sound like a good idea to me.
>> You would have to make sure that the cluster always has sufficient
>> resources and handle failures.
>>
>> It would be preferable to have all data processing in a DataStream job.
>> You mentioned that the challenge is to join the data of the files with a
>> JDBC database.
>> I see two ways to do that in a DataStream program:
>> - replicate the JDBC table in a stateful operator. This means that you
>> have to publish updates on the database to the Flink program.
>> - query the JDBC table with an AsyncFunction. This operator concurrently
>> executes multiple calls to an external service which improves latency and
>> throughput. The operator ensures that checkpoints and watermarks are
>> correctly handled.
>>
>> Best, Fabian
>>
>> 2017-10-30 19:11 GMT+01:00 Tomas Mazukna :
>>
>>> Trying to figure out the best design in Flink.
>>> Reading from a kafka topic which has messages with pointers to files to
>>> be processed.
>>> I am thinking to somehow kick off a batch job per file... unless there
>>> is an easy way to get a separate dataset per file.
>>> I can do almost all of this in the stream, parse file with flat map ->
>>> explode its contents into multiple data elements -> map, etc...
>>> On of these steps would be to grab another dataset from JDBC source and
>>> join with the stream's contents...
>>> I think I am mixing the two concepts here and the right approach would
>>> be to kick of this mini batch job per file,
>>> where I have file datase t+ jdbc dataset to join with.
>>>
>>> So how would I go about kicking a batch from from streaming job?
>>>
>>> Thanks,
>>> Tomas
>>>
>>
>>


Re: Docker-Flink Project: TaskManagers can't talk to JobManager if they are on different nodes

2017-11-02 Thread Piotr Nowojski
Did you try to expose required ports that are listed in the README when 
starting the containers?

https://github.com/apache/flink/tree/master/flink-contrib/docker-flink 

Ports:
• The Web Client is on port 48081
• JobManager RPC port 6123 (default, not exposed to host)
• TaskManagers RPC port 6122 (default, not exposed to host)
• TaskManagers Data port 6121 (default, not exposed to host)

Piotrek

> On 2 Nov 2017, at 14:44, javalass  
> wrote:
> 
> I am using the Docker-Flink project in:
> https://github.com/apache/flink/tree/master/flink-contrib/docker-flink 
> 
> I am creating the services with the following commands:
> docker network create -d overlay overlay
> docker service create --name jobmanager --env
> JOB_MANAGER_RPC_ADDRESS=jobmanager -p 8081:8081 --network overlay
> --constraint 'node.hostname == ubuntu-swarm-manager' flink jobmanager
> docker service create --name taskmanager --env
> JOB_MANAGER_RPC_ADDRESS=jobmanager --network overlay --constraint
> 'node.hostname != ubuntu-swarm-manager' flink taskmanager
> 
> I wonder if there's any configuration I'm missing. This is the error I get:
> - Trying to register at JobManager akka.tcp://flink@jobmanager:6123/  
> user/jobmanager (attempt 4, timeout: 4000 milliseconds)
> 
> 
> 
> 
> 
> 
> 
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



Docker-Flink Project: TaskManagers can't talk to JobManager if they are on different nodes

2017-11-02 Thread javalass
I am using the Docker-Flink project in:
https://github.com/apache/flink/tree/master/flink-contrib/docker-flink 

I am creating the services with the following commands:
docker network create -d overlay overlay
docker service create --name jobmanager --env
JOB_MANAGER_RPC_ADDRESS=jobmanager -p 8081:8081 --network overlay
--constraint 'node.hostname == ubuntu-swarm-manager' flink jobmanager
docker service create --name taskmanager --env
JOB_MANAGER_RPC_ADDRESS=jobmanager --network overlay --constraint
'node.hostname != ubuntu-swarm-manager' flink taskmanager

I wonder if there's any configuration I'm missing. This is the error I get:
- Trying to register at JobManager akka.tcp://flink@jobmanager:6123/  
user/jobmanager (attempt 4, timeout: 4000 milliseconds)







--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Use keyBy to deterministically hash each record to a processor/task/slot

2017-11-02 Thread m@xi
Hello Dongwon,

Thanks a lot for your excellent reply! Seems we have the same problem. Still
your solution is less hard coded than mine.

Thanks a lot!

I am also looking forward to see a capability of creating a custom
partitioner for keyBy() in Flink.

Best,
Max



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Use keyBy to deterministically hash each record to a processor/task/slot

2017-11-02 Thread Dongwon Kim
Hello,

As I need to generate the same number of keys as that of partitions, I also
suffer from this problem [1]:
My current solution is to generate enough keys until I have at least one
key per partition, which looks very stupid to me (I copy and paste my code
below).
If Flink changes its way to compute a partition from a given key for keyBy
operator, I need to modify my vulnerable key generator.
Stephan once mentioned that a custom partitioner for keyBy can make things
complicated in [2].

Hope Flink can provide a way to specify a custom partitioner for keyBy.
I know Flink is primarily targeting data intensive applications as
mentioned in [3],
but compute-intensive applications (especially from the
MachineLearning/DeepLearning domain) can require this feature for evenly
distributing a small number of keys over another small number of
partitions.

Below is my *vulnerable* key generator written in Scala:

import org.apache.flink.util.MathUtils
import scala.collection.mutable

class KeyGenerator(val partitions: Int,
  val maxPartitions: Int) {

  def this(partitions: Int) = this(partitions, 128)

  val ids = Stream.from(1).iterator
  val cache = mutable.HashMap[Int, mutable.Queue[Int]]()

  def next(targetPartition: Int): Int = {
val queue = cache.getOrElseUpdate(targetPartition, mutable.Queue[Int]())
if (queue.size == 0) {
  var found = false
  while (!found) {
val id = ids.next
val partition =
  (MathUtils.murmurHash(id) % maxPartitions) * partitions /
maxPartitions

cache
  .getOrElseUpdate(partition, mutable.Queue[Int]())
  .enqueue(id)

if (partition == targetPartition) {
  found = true
}
  }
}
queue.dequeue()
  }
}


I use it like this:

import org.apache.flink.runtime.state.KeyGroupRangeAssignment
...
val numPartitions = 10
val numKeys = 10
val parallelism = 10

val keyGenerator = new KeyGenerator(numPartitions,
KeyGroupRangeAssignment.computeDefaultMaxParallelism(parallelism))
val desiredKeys = (0 until numKeys) map idGen.next
...


Thanks,

[1]
https://image.slidesharecdn.com/pdm-with-apache-flink-flinkforward-170919021613/95/predictive-maintenance-with-deep-learning-and-apache-flink-41-638.jpg?cb=1505787617
[2]
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/keyBy-using-custom-partitioner-tt5379.html#a5389
[3]
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Using-CustomPartitionerWrapper-with-KeyedStream-td8481.htm

- Dongwon


On Thu, Nov 2, 2017 at 8:00 PM, m@xi  wrote:

> Hello Tony,
>
> Thanks a lot for your answer. Now I know exactly what happens with keyBy
> function, yet still I haven't figured out a proper (non hard coded way) to
> deterministically send a tuple to each key.
>
> If somenone from the Flink team could help it would be great!
>
> Max
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/
>


Re: Use keyBy to deterministically hash each record to a processor/task/slot

2017-11-02 Thread m@xi
Hello Tony,

Thanks a lot for your answer. Now I know exactly what happens with keyBy
function, yet still I haven't figured out a proper (non hard coded way) to
deterministically send a tuple to each key.

If somenone from the Flink team could help it would be great!

Max



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/