Using Kafka 0.10.x timestamps as a record value in Flink Streaming

2017-05-11 Thread Jia Teoh
Hi,

Is there a way to retrieve the timestamps that Kafka associates with each
key-value pair within Flink? I would like to be able to use these as values
within my application flow, and defining them before or after Kafka is not
acceptable for the use case due to the latency involved in sending or
receiving from Kafka.

It seems that Flink supports Kafka event time (link
)
but after a brief trace it seems that KafkaConsumer010 still relies on the
Kafka09Fetcher

for
iterating through each Kafka record and deserializing it. The
KeyedDeserializationSchema api does not seem to have support for including
timestamp as additional metadata (just offset, topic, and partition) so
something such as JSONKeyValueDeserializationSchema will not return the
Kafka-specified timestamp.

For reference, I am using Kafka 0.10.2 and the Flink-Streaming API + Kafka
Connector (1.2.1).

Thanks,
Jia Teoh


Re: JobManager Web UI

2017-05-11 Thread Shannon Carey
Since YARN must support people running multiple Flink clusters, the JobManager 
web UI binds to an ephemeral port by default (to prevent port usage conflicts). 
Also, the AM (and web UI) may be placed on any of the Yarn nodes. Therefore, if 
you wanted to access it directly instead of through the Yarn web proxy, you'd 
have to find what machine and port it is running on.

-Shannon

From: Shravan R >
Date: Thursday, May 11, 2017 at 12:43 PM
To: >
Subject: JobManager Web UI

I am running flink-1.1.4 on Cloudera distributed Hadoop (Yarn).
I am not able to get through JobManager webUI through 
http://:8081. I am able to get to it through
Yarn Running applications ---> application master. My flink-conf.yaml has 
jobmanager.web.port: 8081.

Amy I missing something here?

- Shravan


Queryable State Client with 1.3.0-rc0

2017-05-11 Thread Fahey, Claudio
I've been using QueryableStateClient in Flink 1.2 successfully. I have now 
upgraded to release-1.3.0-rc0 and QueryableStateClient now requires a 
HighAvailabilityServices parameter. The documentation hasn't been updated on 
using HighAvailabilityServices so I'm a bit lost on what exactly I should 
specify for that parameter. For development, I want to connect to a Flink Job 
Manager that I created from a different process using 
StreamExecutionEnvironment.createLocalEnvironmentWithWebUI. Can somebody 
provide the code needed to create the appropriate HighAvailabilityServices 
parameter?

I have tried the following code:

  val jobManagerIpcAddress = "localhost"
  val jobManagerIpcPort = 6123
  configuration.setString(JobManagerOptions.ADDRESS, jobManagerIpcAddress)
  configuration.setInteger(JobManagerOptions.PORT, jobManagerIpcPort)
  private val highAvailabilityServices = new 
StandaloneHaServices(jobManagerIpcAddress, jobManagerIpcAddress)
  private val client = new QueryableStateClient(configuration, 
highAvailabilityServices)

It results in:

Exception in thread "main" akka.actor.ActorNotFound: Actor not found for: 
ActorSelection[Anchor(akka://flink/), Path(/localhost)]


Claudio Fahey
Chief Solutions Architect, Analytics
Dell EMC | Emerging Technologies Team



Re: High Availability on Yarn

2017-05-11 Thread Jain, Ankit
Got the answer on #2, looks like that will work, still looking for suggestions 
on #1.

Thanks
Ankit

From: "Jain, Ankit" 
Date: Thursday, May 11, 2017 at 8:26 AM
To: Stephan Ewen , "user@flink.apache.org" 

Subject: Re: High Availability on Yarn

Following up further on this.


1)  We are using a long running EMR cluster to submit jobs right now and as 
you know EMR hasn’t made Yarn ResourceManager HA.

Is there any way we can use the information put in Zookeeper by Flink Job 
Manager to bring the jobs back up on a new EMR cluster if RM goes down?



We are not looking for completely automated option but maybe write a script 
which reads Zookeeper and re-starts all jobs on a fresh EMR cluster?

I am assuming if  Yarn ResouceManager goes down, there is no way to just bring 
it back up – you have to start a new EMR cluster?



2)  Regarding elasticity, I know for now a running flink cluster can’t make 
use of new hosts added to EMR but can I am guessing Yarn will still see the new 
hosts and new flink jobs can make use it, is that right?


Thanks
Ankit

From: "Jain, Ankit" 
Date: Monday, May 8, 2017 at 9:09 AM
To: Stephan Ewen , "user@flink.apache.org" 

Subject: Re: High Availability on Yarn

Thanks Stephan – we will go with a central ZooKeeper Instance and hopefully 
have it started through a cloudformation script as part of EMR startup.

Is Zk also used to keep track of checkpoint metadata and the execution graph of 
the running job to recover from ApplicationMaster failure as Aljoscha was 
guessing below or only for leader election in case of accidently running 
multiple Application Masters ?

Thanks
Ankit

From: Stephan Ewen 
Date: Monday, May 8, 2017 at 9:00 AM
To: "user@flink.apache.org" , "Jain, Ankit" 

Subject: Re: High Availability on Yarn

@Ankit:

ZooKeeper is required in YARN setups still. Even if there is only one 
JobManager in the normal case, Yarn can accidentally create a second one when 
there is a network partition.
To prevent that this leads to inconsistencies, we use ZooKeeper.

Flink uses ZooKeeper very little, so you can just let Flink attach to any 
existing ZooKeeper, or user one ZooKeeper cluster for very many Flink 
clusters/jobs.

Stephan


On Mon, May 8, 2017 at 2:11 PM, Aljoscha Krettek 
> wrote:
Hi,
Yes, it’s recommended to use one ZooKeeper cluster for all Flink clusters.

Best,
Aljoscha

On 5. May 2017, at 16:56, Jain, Ankit 
> wrote:

Thanks for the update Aljoscha.

@Till Rohrmann,
Can you please chim in?

Also, we currently have a long running EMR cluster where we create one flink 
cluster per job – can we just choose to install Zookeeper when creating the EMR 
cluster and use one Zookeeper instance for ALL of flink jobs?
Or
Recommendation is to have a dedicated Zookeeper instance per flink job?

Thanks
Ankit

From: Aljoscha Krettek >
Date: Thursday, May 4, 2017 at 1:19 AM
To: "Jain, Ankit" >
Cc: "user@flink.apache.org" 
>, Till Rohrmann 
>
Subject: Re: High Availability on Yarn

Hi,
Yes, for YARN there is only one running JobManager. As far as I Know, In this 
case ZooKeeper is only used to keep track of checkpoint metadata and the 
execution graph of the running job. Such that a restoring JobManager can pick 
up the data again.

I’m not 100 % sure on this, though, so maybe Till can shed some light on this.

Best,
Aljoscha
On 3. May 2017, at 16:58, Jain, Ankit 
> wrote:

Thanks for your reply Aljoscha.

After building better understanding of Yarn and spending copious amount of time 
on Flink codebase, I think I now get how Flink & Yarn interact – I plan to 
document this soon in case it could help somebody starting afresh with 
Flink-Yarn.

Regarding Zookeper, in YARN mode there is only one JobManager running, do we 
still need leader election?

If the ApplicationMaster goes down (where JM runs) it is restarted by Yarn RM 
and while restarting, Flink AM will bring back previous running containers.  
So, where does Zookeeper sit in this setup?

Thanks
Ankit

From: Aljoscha Krettek >
Date: Wednesday, May 3, 2017 at 2:05 AM
To: "Jain, Ankit" >
Cc: "user@flink.apache.org" 
>, Till Rohrmann 
>
Subject: Re: High Availability on Yarn

Hi,
As a first comment, the work 

Re: Storage options for RocksDBStateBackend

2017-05-11 Thread Stephan Ewen
Small addition to Till's comment:

In the case where file:// points to a mounted distributed file system (NFS,
MapRFs, ...), then it actually works. The important thing is that the
filesystem where the checkpoints go is replicated (fault tolerant) and
accessible from all nodes.

On Thu, May 11, 2017 at 2:16 PM, Till Rohrmann  wrote:

> Hi Ayush,
>
> you’re right that RocksDB is the recommend state backend because of the
> above-mentioned reasons. In order to make the recovery properly work, you
> have to configure a shared directory for the checkpoint data via
> state.backend.fs.checkpointdir. You can basically configure any file
> system which is supported by Hadoop (no HDFS required). The reason is that
> we use Hadoop to bridge between different file systems. The only thing you
> have to make sure is that you have the respective file system
> implementation in your class path.
>
> I think you can access Windows Azure Blob Storage via Hadoop [1] similarly
> to access S3, for example.
>
> If you use S3 to store your checkpoint data, then you will benefit from
> all the advantages of S3 but also suffer from its drawbacks (e.g. that list
> operations are more costly). But these are not specific to Flink.
>
> A URL like file:// usually indicates a local file. Thus, if your Flink
> cluster is not running on a single machine, then this won’t work.
>
> [1] https://hadoop.apache.org/docs/stable/hadoop-azure/index.html
>
> Cheers,
> Till
> ​
>
> On Thu, May 11, 2017 at 10:41 AM, Ayush Goyal  wrote:
>
>> Hello,
>>
>> I had a few questions regarding checkpoint storage options using
>> RocksDBStateBackend. In the flink 1.2 documentation, it is the
>> recommended state
>> backend due to it's ability to store large states and asynchronous
>> snapshotting.
>> For high availabilty it seems HDFS is the recommended store for state
>> backend
>> data. In AWS deployment section, it is also mentioned that s3 can be used
>> for
>> storing state backend data.
>>
>> We don't want to depend on a hadoop cluster for flink deployment, so I had
>>
>> following questions:
>>
>> 1. Can we use any storage backend supported by flink for storing RocksDB
>> StateBackend data with file urls: there are quite a few supported as
>> mentioned here:
>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/
>> internals/filesystems.html
>> and here:
>> https://github.com/apache/flink/blob/master/docs/dev/batch/connectors.md
>>
>> 2. Is there some work already done to support Windows Azure Blob Storage
>> for
>> storing State backend data? There are some docs here:
>> https://github.com/apache/flink/blob/master/docs/dev/batch/connectors.md
>> can we utilize this for that?
>>
>> 3. If utilizing S3 for state backend, is there any performance impact?
>>
>> 4. For high availability can we use a NFS volume for state backend, with
>> "file://" urls? Will there be any performance impact?
>>
>> PS: I posted this email earlier via nabble, but it's not showing up in
>> apache archive. So sending again. Apologies if it results in multiple
>> threads.
>>
>> -- Ayush
>>
>
>


JobManager Web UI

2017-05-11 Thread Shravan R
I am running flink-1.1.4 on Cloudera distributed Hadoop (Yarn).
I am not able to get through JobManager webUI through
http://:8081.
I am able to get to it through
Yarn Running applications ---> application master. My flink-conf.yaml has
jobmanager.web.port: 8081.

Amy I missing something here?

- Shravan


Re: High Availability on Yarn

2017-05-11 Thread Jain, Ankit
Following up further on this.


1)   We are using a long running EMR cluster to submit jobs right now and 
as you know EMR hasn’t made Yarn ResourceManager HA.

Is there any way we can use the information put in Zookeeper by Flink Job 
Manager to bring the jobs back up on a new EMR cluster if RM goes down?



We are not looking for completely automated option but maybe write a script 
which reads Zookeeper and re-starts all jobs on a fresh EMR cluster?

I am assuming if  Yarn ResouceManager goes down, there is no way to just bring 
it back up – you have to start a new EMR cluster?



2)   Regarding elasticity, I know for now a running flink cluster can’t 
make use of new hosts added to EMR but can I am guessing Yarn will still see 
the new hosts and new flink jobs can make use it, is that right?


Thanks
Ankit

From: "Jain, Ankit" 
Date: Monday, May 8, 2017 at 9:09 AM
To: Stephan Ewen , "user@flink.apache.org" 

Subject: Re: High Availability on Yarn

Thanks Stephan – we will go with a central ZooKeeper Instance and hopefully 
have it started through a cloudformation script as part of EMR startup.

Is Zk also used to keep track of checkpoint metadata and the execution graph of 
the running job to recover from ApplicationMaster failure as Aljoscha was 
guessing below or only for leader election in case of accidently running 
multiple Application Masters ?

Thanks
Ankit

From: Stephan Ewen 
Date: Monday, May 8, 2017 at 9:00 AM
To: "user@flink.apache.org" , "Jain, Ankit" 

Subject: Re: High Availability on Yarn

@Ankit:

ZooKeeper is required in YARN setups still. Even if there is only one 
JobManager in the normal case, Yarn can accidentally create a second one when 
there is a network partition.
To prevent that this leads to inconsistencies, we use ZooKeeper.

Flink uses ZooKeeper very little, so you can just let Flink attach to any 
existing ZooKeeper, or user one ZooKeeper cluster for very many Flink 
clusters/jobs.

Stephan


On Mon, May 8, 2017 at 2:11 PM, Aljoscha Krettek 
> wrote:
Hi,
Yes, it’s recommended to use one ZooKeeper cluster for all Flink clusters.

Best,
Aljoscha

On 5. May 2017, at 16:56, Jain, Ankit 
> wrote:

Thanks for the update Aljoscha.

@Till Rohrmann,
Can you please chim in?

Also, we currently have a long running EMR cluster where we create one flink 
cluster per job – can we just choose to install Zookeeper when creating the EMR 
cluster and use one Zookeeper instance for ALL of flink jobs?
Or
Recommendation is to have a dedicated Zookeeper instance per flink job?

Thanks
Ankit

From: Aljoscha Krettek >
Date: Thursday, May 4, 2017 at 1:19 AM
To: "Jain, Ankit" >
Cc: "user@flink.apache.org" 
>, Till Rohrmann 
>
Subject: Re: High Availability on Yarn

Hi,
Yes, for YARN there is only one running JobManager. As far as I Know, In this 
case ZooKeeper is only used to keep track of checkpoint metadata and the 
execution graph of the running job. Such that a restoring JobManager can pick 
up the data again.

I’m not 100 % sure on this, though, so maybe Till can shed some light on this.

Best,
Aljoscha
On 3. May 2017, at 16:58, Jain, Ankit 
> wrote:

Thanks for your reply Aljoscha.

After building better understanding of Yarn and spending copious amount of time 
on Flink codebase, I think I now get how Flink & Yarn interact – I plan to 
document this soon in case it could help somebody starting afresh with 
Flink-Yarn.

Regarding Zookeper, in YARN mode there is only one JobManager running, do we 
still need leader election?

If the ApplicationMaster goes down (where JM runs) it is restarted by Yarn RM 
and while restarting, Flink AM will bring back previous running containers.  
So, where does Zookeeper sit in this setup?

Thanks
Ankit

From: Aljoscha Krettek >
Date: Wednesday, May 3, 2017 at 2:05 AM
To: "Jain, Ankit" >
Cc: "user@flink.apache.org" 
>, Till Rohrmann 
>
Subject: Re: High Availability on Yarn

Hi,
As a first comment, the work mentioned in the FLIP-6 doc you linked is still 
work-in-progress. You cannot use these abstractions yet without going into the 
code and setting up a cluster “by hand”.

The documentation for one-step deployment of a Job to YARN is available here: 

Job submission: Fail using command line. Success using web (flink1.2.0)

2017-05-11 Thread Rami Al-Isawi
Hi,

The same exact jar on the same machine is being deployed just fine in couple of 
seconds using the web interface. On the other hand, if I used the command line, 
I get:

org.apache.flink.client.program.ProgramInvocationException: The program 
execution failed: Couldn't retrieve the JobExecutionResult from the JobManager.
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:427)
at 
org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:101)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:400)
at 
org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66)
…
Caused by: 
org.apache.flink.runtime.client.JobClientActorSubmissionTimeoutException: Job 
submission to the JobManager timed out. You may increase 'akka.client.timeout' 
in case the JobManager needs more time to configure and confirm the job 
submission.
at 
org.apache.flink.runtime.client.JobSubmissionClientActor.handleCustomMessage(JobSubmissionClientActor.java:119)
at 
org.apache.flink.runtime.client.JobClientActor.handleMessage(JobClientActor.java:239)
at 
org.apache.flink.runtime.akka.FlinkUntypedActor.handleLeaderSessionID(FlinkUntypedActor.java:88)
at 
org.apache.flink.runtime.akka.FlinkUntypedActor.onReceive(FlinkUntypedActor.java:68)
at 
akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:167)
at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:97)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
at akka.dispatch.Mailbox.run(Mailbox.scala:220)
at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)


I did increase the timeout, but it fails the same way.

I assume that submission method should not be relevant, so what is the 
difference between command-line and web submission?

I tested with taking out some subtasks and that made the command-line 
successfully submit the job, but then how come it worked fine using the web 
interface with all the subtasks included?

Regards,
-Rami
Disclaimer: This message and any attachments thereto are intended solely for 
the addressed recipient(s) and may contain confidential information. If you are 
not the intended recipient, please notify the sender by reply e-mail and delete 
the e-mail (including any attachments thereto) without producing, distributing 
or retaining any copies thereof. Any review, dissemination or other use of, or 
taking of any action in reliance upon, this information by persons or entities 
other than the intended recipient(s) is prohibited. Thank you.


Re: UnknownHostException during start

2017-05-11 Thread Ted Yu
Dominique:
Which hadoop release are you using ?

Please pastebin the classpath.

Cheers

On Thu, May 11, 2017 at 7:27 AM, Till Rohrmann  wrote:

> Hi Dominique,
>
> I’m not exactly sure but this looks more like a Hadoop or a Hadoop
> configuration problem to me. Could it be that the Hadoop version you’re
> running does not support the specification of multiple KMS servers via
> kms://ht...@lfrarxxx1.srv.company;lfrarXXX2.srv.company:16000/kms?
>
> Cheers,
> Till
> ​
>
> On Thu, May 11, 2017 at 4:06 PM, Dominique Rondé <
> dominique.ro...@allsecur.de> wrote:
>
>> Dear all,
>>
>> i got some trouble during the start of Flink in a Yarn-Container based
>> on Cloudera. I have a start script like that:
>>
>> sla:/applvg/home/flink/mvp $ cat run.sh
>> export FLINK_HOME_DIR=/applvg/home/flink/mvp/flink-1.2.0/
>> export FLINK_JAR_DIR=/applvg/home/flink/mvp/cache
>> export YARN_CONF_DIR=/etc/hadoop/conf
>> export HADOOP_CONF_DIR=/etc/hadoop/conf
>>
>>
>> /applvg/home/flink/mvp/flink-1.2.0/bin/yarn-session.sh -n 4 -s 3 -st -jm
>> 2048 -tm 2048 -qu root.mr-spark.avp -d
>>
>> If I execute this script it looks like following:
>>
>> sla09037:/applvg/home/flink/mvp $ ./run.sh
>> 2017-05-11 15:13:24,541 INFO
>> org.apache.flink.configuration.GlobalConfiguration- Loading
>> configuration property: jobmanager.rpc.address, localhost
>> 2017-05-11 15:13:24,542 INFO
>> org.apache.flink.configuration.GlobalConfiguration- Loading
>> configuration property: jobmanager.rpc.port, 6123
>> 2017-05-11 15:13:24,542 INFO
>> org.apache.flink.configuration.GlobalConfiguration- Loading
>> configuration property: jobmanager.heap.mb, 256
>> 2017-05-11 15:13:24,543 INFO
>> org.apache.flink.configuration.GlobalConfiguration- Loading
>> configuration property: taskmanager.heap.mb, 512
>> 2017-05-11 15:13:24,543 INFO
>> org.apache.flink.configuration.GlobalConfiguration- Loading
>> configuration property: taskmanager.numberOfTaskSlots, 1
>> 2017-05-11 15:13:24,543 INFO
>> org.apache.flink.configuration.GlobalConfiguration- Loading
>> configuration property: taskmanager.memory.preallocate, false
>> 2017-05-11 15:13:24,543 INFO
>> org.apache.flink.configuration.GlobalConfiguration- Loading
>> configuration property: parallelism.default, 1
>> 2017-05-11 15:13:24,543 INFO
>> org.apache.flink.configuration.GlobalConfiguration- Loading
>> configuration property: jobmanager.web.port, 8081
>> 2017-05-11 15:13:24,571 INFO
>> org.apache.flink.configuration.GlobalConfiguration- Loading
>> configuration property: jobmanager.rpc.address, localhost
>> 2017-05-11 15:13:24,572 INFO
>> org.apache.flink.configuration.GlobalConfiguration- Loading
>> configuration property: jobmanager.rpc.port, 6123
>> 2017-05-11 15:13:24,572 INFO
>> org.apache.flink.configuration.GlobalConfiguration- Loading
>> configuration property: jobmanager.heap.mb, 256
>> 2017-05-11 15:13:24,572 INFO
>> org.apache.flink.configuration.GlobalConfiguration- Loading
>> configuration property: taskmanager.heap.mb, 512
>> 2017-05-11 15:13:24,572 INFO
>> org.apache.flink.configuration.GlobalConfiguration- Loading
>> configuration property: taskmanager.numberOfTaskSlots, 1
>> 2017-05-11 15:13:24,572 INFO
>> org.apache.flink.configuration.GlobalConfiguration- Loading
>> configuration property: taskmanager.memory.preallocate, false
>> 2017-05-11 15:13:24,572 INFO
>> org.apache.flink.configuration.GlobalConfiguration- Loading
>> configuration property: parallelism.default, 1
>> 2017-05-11 15:13:24,572 INFO
>> org.apache.flink.configuration.GlobalConfiguration- Loading
>> configuration property: jobmanager.web.port, 8081
>> 2017-05-11 15:13:25,000 INFO
>> org.apache.flink.runtime.security.modules.HadoopModule- Hadoop
>> user set to fl...@companyde.rootdom.net (auth:KERBEROS)
>> 2017-05-11 15:13:25,030 INFO
>> org.apache.flink.configuration.GlobalConfiguration- Loading
>> configuration property: jobmanager.rpc.address, localhost
>> 2017-05-11 15:13:25,030 INFO
>> org.apache.flink.configuration.GlobalConfiguration- Loading
>> configuration property: jobmanager.rpc.port, 6123
>> 2017-05-11 15:13:25,030 INFO
>> org.apache.flink.configuration.GlobalConfiguration- Loading
>> configuration property: jobmanager.heap.mb, 256
>> 2017-05-11 15:13:25,030 INFO
>> org.apache.flink.configuration.GlobalConfiguration- Loading
>> configuration property: taskmanager.heap.mb, 512
>> 2017-05-11 15:13:25,031 INFO
>> org.apache.flink.configuration.GlobalConfiguration- Loading
>> configuration property: taskmanager.numberOfTaskSlots, 1
>> 2017-05-11 15:13:25,031 INFO
>> org.apache.flink.configuration.GlobalConfiguration- Loading
>> configuration property: taskmanager.memory.preallocate, false
>> 2017-05-11 15:13:25,031 INFO
>> 

Re: UnknownHostException during start

2017-05-11 Thread Till Rohrmann
Hi Dominique,

I’m not exactly sure but this looks more like a Hadoop or a Hadoop
configuration problem to me. Could it be that the Hadoop version you’re
running does not support the specification of multiple KMS servers via
kms://ht...@lfrarxxx1.srv.company;lfrarXXX2.srv.company:16000/kms?

Cheers,
Till
​

On Thu, May 11, 2017 at 4:06 PM, Dominique Rondé <
dominique.ro...@allsecur.de> wrote:

> Dear all,
>
> i got some trouble during the start of Flink in a Yarn-Container based
> on Cloudera. I have a start script like that:
>
> sla:/applvg/home/flink/mvp $ cat run.sh
> export FLINK_HOME_DIR=/applvg/home/flink/mvp/flink-1.2.0/
> export FLINK_JAR_DIR=/applvg/home/flink/mvp/cache
> export YARN_CONF_DIR=/etc/hadoop/conf
> export HADOOP_CONF_DIR=/etc/hadoop/conf
>
>
> /applvg/home/flink/mvp/flink-1.2.0/bin/yarn-session.sh -n 4 -s 3 -st -jm
> 2048 -tm 2048 -qu root.mr-spark.avp -d
>
> If I execute this script it looks like following:
>
> sla09037:/applvg/home/flink/mvp $ ./run.sh
> 2017-05-11 15:13:24,541 INFO
> org.apache.flink.configuration.GlobalConfiguration- Loading
> configuration property: jobmanager.rpc.address, localhost
> 2017-05-11 15:13:24,542 INFO
> org.apache.flink.configuration.GlobalConfiguration- Loading
> configuration property: jobmanager.rpc.port, 6123
> 2017-05-11 15:13:24,542 INFO
> org.apache.flink.configuration.GlobalConfiguration- Loading
> configuration property: jobmanager.heap.mb, 256
> 2017-05-11 15:13:24,543 INFO
> org.apache.flink.configuration.GlobalConfiguration- Loading
> configuration property: taskmanager.heap.mb, 512
> 2017-05-11 15:13:24,543 INFO
> org.apache.flink.configuration.GlobalConfiguration- Loading
> configuration property: taskmanager.numberOfTaskSlots, 1
> 2017-05-11 15:13:24,543 INFO
> org.apache.flink.configuration.GlobalConfiguration- Loading
> configuration property: taskmanager.memory.preallocate, false
> 2017-05-11 15:13:24,543 INFO
> org.apache.flink.configuration.GlobalConfiguration- Loading
> configuration property: parallelism.default, 1
> 2017-05-11 15:13:24,543 INFO
> org.apache.flink.configuration.GlobalConfiguration- Loading
> configuration property: jobmanager.web.port, 8081
> 2017-05-11 15:13:24,571 INFO
> org.apache.flink.configuration.GlobalConfiguration- Loading
> configuration property: jobmanager.rpc.address, localhost
> 2017-05-11 15:13:24,572 INFO
> org.apache.flink.configuration.GlobalConfiguration- Loading
> configuration property: jobmanager.rpc.port, 6123
> 2017-05-11 15:13:24,572 INFO
> org.apache.flink.configuration.GlobalConfiguration- Loading
> configuration property: jobmanager.heap.mb, 256
> 2017-05-11 15:13:24,572 INFO
> org.apache.flink.configuration.GlobalConfiguration- Loading
> configuration property: taskmanager.heap.mb, 512
> 2017-05-11 15:13:24,572 INFO
> org.apache.flink.configuration.GlobalConfiguration- Loading
> configuration property: taskmanager.numberOfTaskSlots, 1
> 2017-05-11 15:13:24,572 INFO
> org.apache.flink.configuration.GlobalConfiguration- Loading
> configuration property: taskmanager.memory.preallocate, false
> 2017-05-11 15:13:24,572 INFO
> org.apache.flink.configuration.GlobalConfiguration- Loading
> configuration property: parallelism.default, 1
> 2017-05-11 15:13:24,572 INFO
> org.apache.flink.configuration.GlobalConfiguration- Loading
> configuration property: jobmanager.web.port, 8081
> 2017-05-11 15:13:25,000 INFO
> org.apache.flink.runtime.security.modules.HadoopModule- Hadoop
> user set to fl...@companyde.rootdom.net (auth:KERBEROS)
> 2017-05-11 15:13:25,030 INFO
> org.apache.flink.configuration.GlobalConfiguration- Loading
> configuration property: jobmanager.rpc.address, localhost
> 2017-05-11 15:13:25,030 INFO
> org.apache.flink.configuration.GlobalConfiguration- Loading
> configuration property: jobmanager.rpc.port, 6123
> 2017-05-11 15:13:25,030 INFO
> org.apache.flink.configuration.GlobalConfiguration- Loading
> configuration property: jobmanager.heap.mb, 256
> 2017-05-11 15:13:25,030 INFO
> org.apache.flink.configuration.GlobalConfiguration- Loading
> configuration property: taskmanager.heap.mb, 512
> 2017-05-11 15:13:25,031 INFO
> org.apache.flink.configuration.GlobalConfiguration- Loading
> configuration property: taskmanager.numberOfTaskSlots, 1
> 2017-05-11 15:13:25,031 INFO
> org.apache.flink.configuration.GlobalConfiguration- Loading
> configuration property: taskmanager.memory.preallocate, false
> 2017-05-11 15:13:25,031 INFO
> org.apache.flink.configuration.GlobalConfiguration- Loading
> configuration property: parallelism.default, 1
> 2017-05-11 15:13:25,031 INFO
> org.apache.flink.configuration.GlobalConfiguration- Loading
> configuration property: jobmanager.web.port, 8081
> 

UnknownHostException during start

2017-05-11 Thread Dominique Rondé
Dear all,

i got some trouble during the start of Flink in a Yarn-Container based
on Cloudera. I have a start script like that:

sla:/applvg/home/flink/mvp $ cat run.sh
export FLINK_HOME_DIR=/applvg/home/flink/mvp/flink-1.2.0/
export FLINK_JAR_DIR=/applvg/home/flink/mvp/cache
export YARN_CONF_DIR=/etc/hadoop/conf
export HADOOP_CONF_DIR=/etc/hadoop/conf


/applvg/home/flink/mvp/flink-1.2.0/bin/yarn-session.sh -n 4 -s 3 -st -jm
2048 -tm 2048 -qu root.mr-spark.avp -d

If I execute this script it looks like following:

sla09037:/applvg/home/flink/mvp $ ./run.sh
2017-05-11 15:13:24,541 INFO 
org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: jobmanager.rpc.address, localhost
2017-05-11 15:13:24,542 INFO 
org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: jobmanager.rpc.port, 6123
2017-05-11 15:13:24,542 INFO 
org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: jobmanager.heap.mb, 256
2017-05-11 15:13:24,543 INFO 
org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: taskmanager.heap.mb, 512
2017-05-11 15:13:24,543 INFO 
org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: taskmanager.numberOfTaskSlots, 1
2017-05-11 15:13:24,543 INFO 
org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: taskmanager.memory.preallocate, false
2017-05-11 15:13:24,543 INFO 
org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: parallelism.default, 1
2017-05-11 15:13:24,543 INFO 
org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: jobmanager.web.port, 8081
2017-05-11 15:13:24,571 INFO 
org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: jobmanager.rpc.address, localhost
2017-05-11 15:13:24,572 INFO 
org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: jobmanager.rpc.port, 6123
2017-05-11 15:13:24,572 INFO 
org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: jobmanager.heap.mb, 256
2017-05-11 15:13:24,572 INFO 
org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: taskmanager.heap.mb, 512
2017-05-11 15:13:24,572 INFO 
org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: taskmanager.numberOfTaskSlots, 1
2017-05-11 15:13:24,572 INFO 
org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: taskmanager.memory.preallocate, false
2017-05-11 15:13:24,572 INFO 
org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: parallelism.default, 1
2017-05-11 15:13:24,572 INFO 
org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: jobmanager.web.port, 8081
2017-05-11 15:13:25,000 INFO 
org.apache.flink.runtime.security.modules.HadoopModule- Hadoop
user set to fl...@companyde.rootdom.net (auth:KERBEROS)
2017-05-11 15:13:25,030 INFO 
org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: jobmanager.rpc.address, localhost
2017-05-11 15:13:25,030 INFO 
org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: jobmanager.rpc.port, 6123
2017-05-11 15:13:25,030 INFO 
org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: jobmanager.heap.mb, 256
2017-05-11 15:13:25,030 INFO 
org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: taskmanager.heap.mb, 512
2017-05-11 15:13:25,031 INFO 
org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: taskmanager.numberOfTaskSlots, 1
2017-05-11 15:13:25,031 INFO 
org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: taskmanager.memory.preallocate, false
2017-05-11 15:13:25,031 INFO 
org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: parallelism.default, 1
2017-05-11 15:13:25,031 INFO 
org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: jobmanager.web.port, 8081
2017-05-11 15:13:25,050 INFO 
org.apache.flink.yarn.YarnClusterDescriptor   - Using
values:
2017-05-11 15:13:25,051 INFO 
org.apache.flink.yarn.YarnClusterDescriptor   -  
TaskManager count = 4
2017-05-11 15:13:25,051 INFO 
org.apache.flink.yarn.YarnClusterDescriptor   -  
JobManager memory = 2048
2017-05-11 15:13:25,051 INFO 
org.apache.flink.yarn.YarnClusterDescriptor   -  
TaskManager memory = 2048
2017-05-11 15:13:25,903 WARN 
org.apache.hadoop.util.NativeCodeLoader   - 

Streaming use case: Row enrichment

2017-05-11 Thread Flavio Pompermaier
Hi to all,
we have a particular use case where we have a tabular dataset on HDFS (e.g.
a CSV) that we want to enrich filling some cells with the content returned
by a query to a reverse index (e.g. solr/elasticsearch).
Since we want to be able to make this process resilient and scalable we
thought that Flink streaming could be a good fit since we can control the
"pressure" on the index by adding/removing consumers dynamically and there
is automatic error recovery.

Right now we developed 2 different solutions to the problem:

   1. *move the dataset from HDFS to a queue/topic* (like Kafka or
   RabbitMQ) and then let the queue consumers do the real job (pull Rows from
   the queue, enrich and then persist the enriched Rows). The questions here
   are:
  1. how to properly manage writing to HDFS ? if we read a set of rows,
  we enrich them and we need to write the result back to HDFS, is
it possible
  to automatically compact files in order to avoid the "too many
small files"
  problem on HDFS? How to avoid file name collision (put each batch of rows
  to a different file)?
  2. how to control the number dynamically? is it possible to change
  the parallelism once the job has started?
  2. in order to avoid useless data transfer from HDFS to a queue/topic
   (since we don't need all the Row fields to create the query..usually only
   2/5 fields are needed) we can create a Flink job that put the q*ueries
   into a queue/topic *and wait for the result. The problem with this
   approach is:
  1. how to correlate queries with their responses? creating a unique
  response queue/topic implies that all consumers reads all messages (and
  discard those that are not directed to them) while creating a queue/topic
  for each sub-task could be expansive (in terms of resources and
  managment..but we don't have any evidence/experience of this..it's just a
  possible problem).
   3. Maybe we can exploit *Flink async/IO *somehow...? But how?


Any suggestion/drawbacks on the 2 approaches?

Thanks in advance,
Flavio


Re: Deactive a job like storm

2017-05-11 Thread Till Rohrmann
To properly implement stop we have to change some internal orchestration
structure. This is not a trivial task and so far nobody had found time to
work on it. Moreover, the individual sources have to be adapted as well.

Cheers,
Till

On Thu, May 11, 2017 at 4:54 AM, yunfan123 
wrote:

> But why FlinkKafkaConsumerBase don't implement the StoppableFunction
> interface?
>
>
>
> --
> View this message in context: http://apache-flink-user-
> mailing-list-archive.2336050.n4.nabble.com/Deactive-a-job-
> like-storm-tp13088p13099.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>


Re: Storage options for RocksDBStateBackend

2017-05-11 Thread Till Rohrmann
Hi Ayush,

you’re right that RocksDB is the recommend state backend because of the
above-mentioned reasons. In order to make the recovery properly work, you
have to configure a shared directory for the checkpoint data via
state.backend.fs.checkpointdir. You can basically configure any file system
which is supported by Hadoop (no HDFS required). The reason is that we use
Hadoop to bridge between different file systems. The only thing you have to
make sure is that you have the respective file system implementation in
your class path.

I think you can access Windows Azure Blob Storage via Hadoop [1] similarly
to access S3, for example.

If you use S3 to store your checkpoint data, then you will benefit from all
the advantages of S3 but also suffer from its drawbacks (e.g. that list
operations are more costly). But these are not specific to Flink.

A URL like file:// usually indicates a local file. Thus, if your Flink
cluster is not running on a single machine, then this won’t work.

[1] https://hadoop.apache.org/docs/stable/hadoop-azure/index.html

Cheers,
Till
​

On Thu, May 11, 2017 at 10:41 AM, Ayush Goyal  wrote:

> Hello,
>
> I had a few questions regarding checkpoint storage options using
> RocksDBStateBackend. In the flink 1.2 documentation, it is the recommended
> state
> backend due to it's ability to store large states and asynchronous
> snapshotting.
> For high availabilty it seems HDFS is the recommended store for state
> backend
> data. In AWS deployment section, it is also mentioned that s3 can be used
> for
> storing state backend data.
>
> We don't want to depend on a hadoop cluster for flink deployment, so I had
>
> following questions:
>
> 1. Can we use any storage backend supported by flink for storing RocksDB
> StateBackend data with file urls: there are quite a few supported as
> mentioned here:
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/internals/
> filesystems.html
> and here:
> https://github.com/apache/flink/blob/master/docs/dev/batch/connectors.md
>
> 2. Is there some work already done to support Windows Azure Blob Storage
> for
> storing State backend data? There are some docs here:
> https://github.com/apache/flink/blob/master/docs/dev/batch/connectors.md
> can we utilize this for that?
>
> 3. If utilizing S3 for state backend, is there any performance impact?
>
> 4. For high availability can we use a NFS volume for state backend, with
> "file://" urls? Will there be any performance impact?
>
> PS: I posted this email earlier via nabble, but it's not showing up in
> apache archive. So sending again. Apologies if it results in multiple
> threads.
>
> -- Ayush
>


Re: ConnectedStream keyby issues

2017-05-11 Thread Aljoscha Krettek
Yes, that looks right.

> On 10. May 2017, at 14:56, yunfan123  wrote:
> 
> In upstairs example, it seems I should clear the state in onTimer function in
> order to free resource like follows:
> public void onTimer(long l, OnTimerContext onTimerContext,
> Collector> collector) throws Exception { 
>if (state.value() != null) { 
>collector.collect(state.value()); 
>state.update(null);
>} 
>} 
> 
> 
> 
> --
> View this message in context: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ConnectedStream-keyby-issues-tp12999p13090.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at 
> Nabble.com.



Storage options for RocksDBStateBackend

2017-05-11 Thread Ayush Goyal
Hello,

I had a few questions regarding checkpoint storage options using
RocksDBStateBackend. In the flink 1.2 documentation, it is the recommended
state
backend due to it's ability to store large states and asynchronous
snapshotting.
For high availabilty it seems HDFS is the recommended store for state
backend
data. In AWS deployment section, it is also mentioned that s3 can be used
for
storing state backend data.

We don't want to depend on a hadoop cluster for flink deployment, so I had
following questions:

1. Can we use any storage backend supported by flink for storing RocksDB
StateBackend data with file urls: there are quite a few supported as
mentioned here:
https://ci.apache.org/projects/flink/flink-docs-release-1.3/internals/filesystems.html
and here:
https://github.com/apache/flink/blob/master/docs/dev/batch/connectors.md

2. Is there some work already done to support Windows Azure Blob Storage for

storing State backend data? There are some docs here:
https://github.com/apache/flink/blob/master/docs/dev/batch/connectors.md
can we utilize this for that?

3. If utilizing S3 for state backend, is there any performance impact?

4. For high availability can we use a NFS volume for state backend, with
"file://" urls? Will there be any performance impact?

PS: I posted this email earlier via nabble, but it's not showing up in
apache archive. So sending again. Apologies if it results in multiple
threads.

-- Ayush


Re: issue running flink in docker

2017-05-11 Thread Stephan Ewen
Glad to hear it!

Overlay networks (as most container infras use) are tricky and we need to
add some code to make diagnostics of issues easier in those cases...

Stephan


On Wed, May 10, 2017 at 9:30 PM, David Brelloch  wrote:

> Stephan,
>
> Thanks for pointing us in the right direction on the different addresses.
> That was the issue.
>
> David
>
> On Wed, May 10, 2017 at 3:03 PM, Stephan Ewen  wrote:
>
>> Hi!
>>
>> Can it be that some hostname / IP address mapping / etc gets thrown off
>> somewhere in the process?
>>
>> This exception looks like the following happens:
>>
>>   - JobManager gets a message from a TaskManager that a partition is
>> ready, notifies other TaskManagers
>>   - TaskManager gets the update message, connects to the address of the
>> indicated TaskManager
>>   - That taskmanager does not have that partition
>>
>> Is it possible that JobManager / TaskManager see different names /
>> addresses?
>>
>> Also, is that Flink 1.2, DataSet job?
>>
>> Stephan
>>
>>
>>
>> On Wed, May 10, 2017 at 7:05 PM, David Brelloch 
>> wrote:
>>
>>> Hi everyone,
>>>
>>> We are attempting to run flink 1.2 in a distributed dockerized
>>> environment and are running into issues when running jobs in parallel.
>>>
>>> The exception we are getting fairly quickly after start up is:
>>>
>>> org.apache.flink.runtime.io.network.partition.PartitionNotFoundException: 
>>> Partition d3d8404aa26bedafd77e88bdfd88375b@84037703da6706cd1017f53fd8b818cd 
>>> not found.
>>> at 
>>> org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.failPartitionRequest(RemoteInputChannel.java:204)
>>> at 
>>> org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.retriggerSubpartitionRequest(RemoteInputChannel.java:129)
>>> at 
>>> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.retriggerPartitionRequest(SingleInputGate.java:331)
>>> at 
>>> org.apache.flink.runtime.taskmanager.Task.onPartitionStateUpdate(Task.java:1244)
>>> at org.apache.flink.runtime.taskmanager.Task$2.apply(Task.java:1082)
>>> at org.apache.flink.runtime.taskmanager.Task$2.apply(Task.java:1077)
>>> at 
>>> org.apache.flink.runtime.concurrent.impl.FlinkFuture$5.onComplete(FlinkFuture.java:259)
>>> at akka.dispatch.OnComplete.internal(Future.scala:248)
>>> at akka.dispatch.OnComplete.internal(Future.scala:245)
>>> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:175)
>>> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:172)
>>> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
>>> at 
>>> akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
>>> at 
>>> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91)
>>> at 
>>> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
>>> at 
>>> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
>>> at 
>>> scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
>>> at 
>>> akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90)
>>> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
>>> at 
>>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
>>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>> at 
>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>> at 
>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>> at 
>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>>
>>> This only occurs when running in parallel but I don't have a lot to go
>>> on from the exception. We have configured the following ports:
>>> jobmanager.rpc.port: 6123
>>> taskmanager.rpc.port: 6122
>>> taskmanager.data.port: 6121
>>>
>>> And have mapped the docker ports 6121 and 6122 on the task managers as
>>> well as 6123 on the job manager.
>>>
>>> Does anyone have any suggestions for other places to look or settings to
>>> try?
>>>
>>> Thanks,
>>> David
>>>
>>>
>>>
>>>
>>
>


Storage options for RocksDBStateBackend

2017-05-11 Thread ayush
Hello,

I had a few questions regarding checkpoint storage options using
RocksDBStateBackend. In the flink 1.2 documentation, it is the recommended
state
backend due to it's ability to store large states and asynchronous
snapshotting.
For high availabilty it seems HDFS is the recommended store for state
backend
data. In AWS deployment section, it is also mentioned that s3 can be used
for
storing state backend data.

We don't want to depend on a hadoop cluster for flink deployment, so I had
following questions:

1. Can we use any storage backend supported by flink for storing RocksDB
StateBackend data with file urls: there are quite a few supported as
mentioned here:
https://ci.apache.org/projects/flink/flink-docs-release-1.3/internals/filesystems.html
and here: 
https://github.com/apache/flink/blob/master/docs/dev/batch/connectors.md

2. Is there some work already done to support Windows Azure Blob Storage for
storing State backend data? There are some docs here:
https://github.com/apache/flink/blob/master/docs/dev/batch/connectors.md
can we utilize this for that?

3. If utilizing S3 for state backend, is there any performance impact?

4. For high availability can we use a NFS volume for state backend, with
"file://" urls? Will there be any performance impact?

-- Ayush



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Storage-options-for-RocksDBStateBackend-tp13102.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Re: Re: ElasticsearchSink on DataSet

2017-05-11 Thread Flavio Pompermaier
Great! I was just thinking that, in principle, a streaming sink is an
extension of a batch one. Am I wrong?
This would avoid a lot of code duplication and would improve the overall
maintainability..

On Thu, May 11, 2017 at 4:35 AM, wyphao.2007  wrote:

> Hi Flavio, I made a PR for this : https://github.com/apache/
> flink/pull/3869
> And it also support ActionRequestFailureHandler in DataSet's
> ElasticsearchSink
>
> Best
>
> 在2017年05月09 15时30分, "Flavio Pompermaier"写道:
>
>
> Just one note: I took a look at your connector and it doesn't provide any
> failure handling mechanism that is very useful for us.
> Maybe it could worth to add ActionRequestFailureHandler as provided now
> by the current ES streaming connector and introduced by commit
> https://github.com/apache/flink/commit/3743e898104d79a9813d444d38fa9f
> 86617bb5ef
>
> Best,
> Flavio
>
> On Tue, May 9, 2017 at 8:17 AM, Flavio Pompermaier 
> wrote:
>
>> Thanks a lot for the support!
>>
>> On 9 May 2017 07:53, "Tzu-Li (Gordon) Tai"  wrote:
>>
>>> Hi!
>>>
>>> Thanks for sharing that repo! I think that would be quite an useful
>>> contribution to Flink for the users, if you’re up to preparing a PR for it
>>> :)
>>>
>>> It also looks like you’ve adopted most of the current ElasticsearchSink
>>> APIs (RequestIndexer, ElasticsearchSinkFunction, etc.) for the
>>> ElasticsearchOutputFormat, which is nice to fit into the current code :-D
>>>
>>> Cheers,
>>> Gordon
>>>
>>>
>>> On 9 May 2017 at 1:05:14 PM, wyphao.2007 (wyphao.2...@163.com) wrote:
>>>
>>> Hi Flavio
>>>
>>> Maybe this is what you want: https://github.com/397090770/f
>>> link-elasticsearch2-connector, It can save Flink DataSet to
>>> elasticsearch.
>>>
>>> import scala.collection.JavaConversions._
>>>
>>> val config = Map("bulk.flush.max.actions" -> "1000", "cluster.name" -> 
>>> "elasticsearch")val hosts = "www.iteblog.com"val transports = 
>>> hosts.split(",").map(host => new 
>>> InetSocketAddress(InetAddress.getByName(host), 9300)).toListval data : 
>>> DataSet[String] = 
>>> data.output(new ElasticSearchOutputFormat(config, transports, new 
>>> ElasticsearchSinkFunction[String] {  def createIndexRequest(element: 
>>> String): IndexRequest = {
>>> Requests.indexRequest.index("iteblog").`type`("info").source(element)
>>>   }  override def process(element: String, ctx: RuntimeContext, 
>>> indexer: RequestIndexer) {
>>> indexer.add(createIndexRequest(element))
>>>   }
>>> }))
>>>
>>>
>>> I hope this could help you
>>>
>>> 在2017年05月09 12时59分, "Tzu-Li (Gordon) Tai"写道:
>>>
>>>
>>> Hi Flavio,
>>>
>>> I don’t think there is a bridge class for this. At the moment you’ll
>>> have to implement your own OutputFormat.
>>> The ElasticsearchSink is a SinkFunction which is part of the DataStream
>>> API, which generally speaking at the moment has no bridge or unification
>>> yet with the DataSet API.
>>>
>>> Cheers,
>>> Gordon
>>>
>>>
>>> On 3 May 2017 at 10:15:51 PM, Flavio Pompermaier (pomperma...@okkam.it)
>>> wrote:
>>>
>>>
>>> Hi to all,
>>> at the moment I have a Flink Job that generates a DataSet that I
>>> write to a File that is read by Logstash to index data on ES.
>>> I'd like to use the new ElasticsearchSink to index those JSON directly
>>> from Flink but ElasticsearchSink only works with streaming environment.
>>>
>>> Is there any bridge class for this?
>>>
>>> Best,
>>> Flavio
>>>
>>>
>
>
> --
> Flavio Pompermaier
> Development Department
>
> OKKAM S.r.l.
> Tel. +(39) 0461 1823908 <+39%200461%20182%203908>
>
>


-- 
Flavio Pompermaier
Development Department

OKKAM S.r.l.
Tel. +(39) 0461 1823908


Re: Gelly - generics with custom vertex value

2017-05-11 Thread Kaepke, Marc
Thanks for the hint.

I focused on it and get a strange behavior.
If I change from EdgeDirection.ALL (what I need) to EdgeDirection.OUT (or .IN), 
everything seems okey. The sublist operation was still active.

Then I replaced the sublist with the entire list and there was no exception 
(EdgeDirection.All/IN/OUT worked). In case of .ALL the algorithm ran until 
„maximumNumberOfIterations“. How can I limit it and how can I set the 
termination conditions?


Best,
Marc

Am 10.05.2017 um 18:18 schrieb Stephan Ewen 
>:

Looks like java.util.ArrayList$SubList does not work out of the box with Kryo / 
Flink.

Try registering a custom serializer for it...

On Wed, May 10, 2017 at 4:16 PM, Kaepke, Marc 
> wrote:
Hi,

a part of my bachelor thesis is an implementation of the Semi-Clustering 
algorithm [1].
I’m using the Scatter-Gather-Iteration. Each vertex has to know its neighbors 
and the edge-value between of that. Because Gelly’s vertex doesn’t provide both 
information, I wrote an CustomVertexValue class.  An object contains a set of 
edges and a list of another custom class called SemiCluster, which contains a 
list of vertex and a double value.
Now I’m able to create vertices like Vertex.

Unfortunately I get an exception if I run my Scatter-Gather-Iteration.

Exception in thread "main" 
org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:900)
at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:843)
at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:843)
at 
scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 
'SortMerger Reading Thread' terminated due to an exception: null
at 
org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1094)
at 
org.apache.flink.runtime.operators.CoGroupWithSolutionSetSecondDriver.run(CoGroupWithSolutionSetSecondDriver.java:203)
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:490)
at 
org.apache.flink.runtime.iterative.task.AbstractIterativeTask.run(AbstractIterativeTask.java:146)
at 
org.apache.flink.runtime.iterative.task.IterationTailTask.run(IterationTailTask.java:107)
at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: Thread 'SortMerger Reading Thread' terminated 
due to an exception: null
at 
org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:799)
Caused by: java.lang.NullPointerException
at java.util.ArrayList$SubList.checkForComodification(ArrayList.java:1230)
at java.util.ArrayList$SubList.size(ArrayList.java:1040)
at java.util.AbstractList.add(AbstractList.java:108)
at 
com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:116)
at 
com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
at 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:232)
at 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:246)
at 
org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:144)
at 
org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
at 
org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(ReusingDeserializationDelegate.java:57)
at 
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:109)
at