Re: Question about flink checkpoint

2018-02-07 Thread Chengzhi Zhao
Thanks, Fabian,

I opened an JIRA ticket and I'd like to work on it if people think this
would be a improvement:
https://issues.apache.org/jira/browse/FLINK-8599

Best,
Chengzhi

On Wed, Feb 7, 2018 at 4:17 AM, Fabian Hueske  wrote:

> Hi Chengzhi Zhao,
>
> I think this is rather an issue with the ContinuousFileReaderOperator than
> with the checkpointing algorithm in general.
> A source can decide which information to store as state and also how to
> handle failures such as file paths that have been put into state but have
> been removed from the file system.
>
> It would be great if you could open a JIRA issue with a feature request to
> improve the failure behavior of the ContinuousFileReaderOperator.
> It could for example check if a path exists and before trying to read a
> file and ignore the input split instead of throwing an exception and
> causing a failure.
> If you want to, you can also work on a fix and contribute it back.
>
> Best, Fabian
>
> 2018-02-06 19:15 GMT+01:00 Chengzhi Zhao :
>
>> Hey, I am new to flink and I have a question and want to see if anyone
>> can help here.
>>
>> So we have a s3 path that flink is monitoring that path to see new files
>> available.
>>
>> val avroInputStream_activity = env.readFile(format, path,
>> FileProcessingMode.PROCESS_CONTINUOUSLY, 1)
>>
>> I am doing both internal and external check pointing and let's say there
>> is a bad file came to the path and flink will do several retries. I want to
>> take those bad files and let the process continue. However, since the file
>> path persist in the checkpoint, when I try to resume from external
>> checkpoint, it threw the following error on no file been found.
>>
>> java.io.IOException: Error opening the Input Split s3a://myfile [0,904]:
>> No such file or directory: s3a://myfile
>>
>> Is there a way to skip this bad file and move on?
>> Thanks in advance.
>>
>> Best,
>> Chengzhi Zhao
>>
>>
>


RE: S3 for state backend in Flink 1.4.0

2018-02-07 Thread Marchant, Hayden
WE actually got it working. Essentially, it's an implementation of 
HadoopFilesytem, and was written with the idea that it can be used with Spark 
(since it has broader adoption than Flink as of now). We managed to get it 
configured, and found the latency to be much lower than by using the s3 
connector. There are a lot less copying operations etc... happening under the 
hood when using this native API which explains the better performance.

Happy to provide assistance offline if you're interested.

Thanks
Hayden

-Original Message-
From: Edward Rojas [mailto:edward.roja...@gmail.com] 
Sent: Thursday, February 01, 2018 6:09 PM
To: user@flink.apache.org
Subject: RE: S3 for state backend in Flink 1.4.0

Hi Hayden,

It seems like a good alternative. But I see it's intended to work with spark, 
did you manage to get it working with Flink ?

I some tests but I get several errors when trying to create a file, either for 
checkpointing or saving data. 

Thanks in advance,
Regards,
Edward



--
Sent from: 
https://urldefense.proofpoint.com/v2/url?u=http-3A__apache-2Dflink-2Duser-2Dmailing-2Dlist-2Darchive.2336050.n4.nabble.com_=DwICAg=j-EkbjBYwkAB4f8ZbVn1Fw=g-5xYRH8L3aCnCNTROw5LrsB5gbTayWjXSm6Nil9x0c=MW1NZ-mLVkooOHg-TWiOE7j2e9PCk7EOAmahXApcLtQ=b8kvNKIjylDuKlc2munyBj1da85y8aZ8brJsO24R2GU=


RE: Latest version of Kafka

2018-02-07 Thread Marchant, Hayden
Thanks for the info!

-Original Message-
From: Piotr Nowojski [mailto:pi...@data-artisans.com] 
Sent: Friday, February 02, 2018 4:37 PM
To: Marchant, Hayden [ICG-IT] 
Cc: user@flink.apache.org
Subject: Re: Latest version of Kafka

Hi,

Flink as for now provides only a connector for Kafka 0.11, which is using 
KafkaClient in 0.11.x version. However you should be able to use it for reading 
to/writing from Kafka 1.0 - Kafka claims (and as far as I know it’s true) that 
Kafka 1.0 is backward compatible with 0.11. 

Piotrek

> On 1 Feb 2018, at 14:46, Marchant, Hayden  wrote:
> 
> What is the newest version of Kafka that is compatible with Flink 1.4.0? I 
> see the last version of Kafka supported is 0.11 , from documentation, but has 
> any testing been done with Kafka 1.0?
> 
> 
> Hayden Marchant
> 



Re: How to handle multiple yarn sessions and choose at runtime the one to submit a ha streaming job ?

2018-02-07 Thread Chesnay Schepler
For future reference, the created JIRA: 
https://issues.apache.org/jira/browse/FLINK-8580


On 07.02.2018 10:48, LINZ, Arnaud wrote:


Hi,

Without any other solution, I made a shell script that copies the 
original content of FLINK_CONF_DIR in a temporary rep, modify 
flink-conf.yaml to set yarn.properties-file.location, and change 
FLINK_CONF_DIR to that temp rep before executing flink.


I am now able to select the container I want, but I think it should be 
made simpler…


I’ll open a Jira.

Best regards,

Arnaud

*De :*LINZ, Arnaud
*Envoyé :* jeudi 1 février 2018 16:23
*À :* user@flink.apache.org
*Objet :* How to handle multiple yarn sessions and choose at runtime 
the one to submit a ha streaming job ?


Hello,

I am using Flink 1.3.2 and I’m struggling to achieve something that 
should be simple.


For isolation reasons, I want to start multiple long living yarn 
session containers (with the same user) and choose at run-time, when I 
start a HA streaming app, which container will hold it.


I start my yarn session with the command line option : 
-Dyarn.properties-file.location=mydir


The session is created and a .yarn-properties-$USER file is generated.

And I’ve tried the following to submit my job:

*CASE 1 *

*flink-conf.yaml*: yarn.properties-file.location: mydir

*flink run options*: none

  * Uses zookeeper and works  – but I cannot choose the container as
the property file is global.

**

*CASE 2*

*flink-conf.yaml*: nothing

*flink run options*: -yid applicationId

  * Do not use zookeeper, tries to connect to yarn job manager but
fails in “Job submission to the JobManager timed out” error

**

*CASE 3*

*flink-conf.yaml*: nothing

*flink run options*: -yid applicationId and -yD with all dynamic 
properties found in the “dynamicPropertiesString” of 
.yarn-properties-$USER file


  * Same as case 2

**

*CASE 4*

*flink-conf.yaml*: nothing

*flink run options*: -yD yarn.properties-file.location=mydir

  * Tries to connect to local (non yarn) job manager (and fails)

**

*CASE 5*

Even weirder:

*flink-conf.yaml*: yarn.properties-file.location: mydir

*flink run options*: -yD yarn.properties-file.location=mydir

  * Still tries to connect to local (non yarn) job manager!

What am I doing wrong?

Logs extracts :

*CASE 1:*

2018:02:01 15:43:20 - Waiting until all TaskManagers have connected

2018:02:01 15:43:20 - Starting client actor system.

2018:02:01 15:43:20 - Starting ZooKeeperLeaderRetrievalService.

2018:02:01 15:43:20 - Trying to select the network interface and 
address to use by connecting to the leading JobManager.


2018:02:01 15:43:20 - TaskManager will try to connect for 1 
milliseconds before falling back to heuristics


2018:02:01 15:43:21 - Retrieved new target address 
elara-data-u2-n01.dev.mlb.jupiter.nbyt.fr/10.136.170.193:33970.


2018:02:01 15:43:21 - Stopping ZooKeeperLeaderRetrievalService.

2018:02:01 15:43:21 - Slf4jLogger started

2018:02:01 15:43:21 - Starting remoting

2018:02:01 15:43:21 - Remoting started; listening on addresses 
:[akka.tcp://fl...@elara-edge-u2-n01.dev.mlb.jupiter.nbyt.fr:36340]


2018:02:01 15:43:21 - Starting ZooKeeperLeaderRetrievalService.

2018:02:01 15:43:21 - Stopping ZooKeeperLeaderRetrievalService.

2018:02:01 15:43:21 - TaskManager status (2/1)

2018:02:01 15:43:21 - All TaskManagers are connected

2018:02:01 15:43:21 - Submitting job with JobID: 
f69197b0b80a76319a87bde10c1e3f77. Waiting for job completion.


2018:02:01 15:43:21 - Starting ZooKeeperLeaderRetrievalService.

2018:02:01 15:43:21 - Received SubmitJobAndWait(JobGraph(jobId: 
f69197b0b80a76319a87bde10c1e3f77)) but there is no connection to a 
JobManager yet.


2018:02:01 15:43:21 - Received job SND-IMP-SIGNAST 
(f69197b0b80a76319a87bde10c1e3f77).


2018:02:01 15:43:21 - Disconnect from JobManager null.

2018:02:01 15:43:21 - Connect to JobManager 
Actor[akka.tcp://fl...@elara-data-u2-n01.dev.mlb.jupiter.nbyt.fr:33970/user/jobmanager#-1554418245].


2018:02:01 15:43:21 - Connected to JobManager at 
Actor[akka.tcp://fl...@elara-data-u2-n01.dev.mlb.jupiter.nbyt.fr:33970/user/jobmanager#-1554418245] 
with leader session id 388af5b8--4923-8ee4-8a4b9bfbb0b9.


2018:02:01 15:43:21 - Sending message to JobManager 
akka.tcp://fl...@elara-data-u2-n01.dev.mlb.jupiter.nbyt.fr:33970/user/jobmanager 
to submit job SND-IMP-SIGNAST (f69197b0b80a76319a87bde10c1e3f77) and 
wait for progress


2018:02:01 15:43:21 - Upload jar files to job manager 
akka.tcp://fl...@elara-data-u2-n01.dev.mlb.jupiter.nbyt.fr:33970/user/jobmanager.


2018:02:01 15:43:21 - Blob client connecting to 
akka.tcp://fl...@elara-data-u2-n01.dev.mlb.jupiter.nbyt.fr:33970/user/jobmanager


2018:02:01 15:43:22 - Submit job to the job manager 
akka.tcp://fl...@elara-data-u2-n01.dev.mlb.jupiter.nbyt.fr:33970/user/jobmanager.


2018:02:01 15:43:22 - Job f69197b0b80a76319a87bde10c1e3f77 was 
successfully submitted to the JobManager akka://flink/deadLetters.


2018:02:01 15:43:22 - 02/01/2018 15:43:22   Job 

Re: Flink CEP with files and no streams?

2018-02-07 Thread Fabian Hueske
Hi,

I'm not aware of a good example but I can give you some pointers.

- Implement the SourceFunction interface. This function will not be
executed in parallel, so you don't have to worry about parallelism.
- Since you said, you want to run it as a batch job, you might not need to
implement checkpointing functionality
- In the run method, you open the file that you need to read. Start parsing
the file, when you have a record, extract the timestamp and emit both by
passing them to the SourceContext.
- Every n-th record, you can emit a watermark. The watermark timestamp must
be smaller than all record that will be emitted in the future.

I'd start processing a single file and extending the source from there.

Hope this helps,
Fabian

2018-02-07 13:59 GMT+01:00 Esa Heikkinen :

> Hi
>
>
>
> Thanks for the reply, but because I am a newbie with Flink, do you have
> any good Scala code examples about this ?
>
>
>
> Esa
>
>
>
> *From:* Fabian Hueske [mailto:fhue...@gmail.com]
> *Sent:* Wednesday, February 7, 2018 11:21 AM
> *To:* Esa Heikkinen 
> *Cc:* user@flink.apache.org
> *Subject:* Re: Flink CEP with files and no streams?
>
>
>
> Hi Esa,
>
> you can also read files as a stream.
> However, you have to be careful in which order you read the files and how
> you generate watermarks.
>
> The easiest approach is to implement a non-parallel source function that
> reads the files in the right order and generates watermarks.
>
> Things become more tricky when you try to read the files in parallel.
>
> Best, Fabian
>
>
>
> 2018-02-07 9:40 GMT+01:00 Esa Heikkinen :
>
> Hello
>
>
>
> I am trying to use CEP of Flink for log files (as batch job), but not for
> streams (as realtime).
>
> Is that possible ? If yes, do you know examples Scala codes about that ?
>
>
>
> Or should I convert the log files (with time stamps) into streams ?
>
> But how to handle time stamps in Flink ?
>
>
>
> If I can not use Flink at all for this purpose, do you have any
> recommendations of other tools ?
>
>
>
> I would want CEP type analysis for log files.
>
>
>
>
>
>
>


RE: kafka as recovery only source

2018-02-07 Thread Sofer, Tovi
Hi Fabian,

Thank you for the suggestion. We will consider it.
Would be glad to hear other ideas how to handle such requirement.

Thanks again,
Tovi
From: Fabian Hueske [mailto:fhue...@gmail.com]
Sent: יום ד 07 פברואר 2018 11:47
To: Sofer, Tovi [ICG-IT] 
Cc: user@flink.apache.org; Tzu-Li (Gordon) Tai 
Subject: Re: kafka as recovery only source

Hi Tovi,
I've been thinking about this idea.
It might be possible, but I think you have to implement a custom source for 
this.

I don't think it would work to have the JMSConsumer, KafkaSink, and 
RecoverySource in separate operators because otherwise it would not be possible 
to share the Kafka write offset of the recovery topic at checkpoints.
QueryableState as you suggested only works for keyed state which is (AFAIK) not 
available for sources.
The custom source operator would consume from JMS and directly write all 
records to Kafka. In case of a recovery, it starts reading from Kafka and 
continues with JMS once the recovery topic has been completely consumed.
If you run the source in parallel, you need to handle the partitions of Kafka 
recovery topic.

I'm adding Gordon to this thread who might have additional comments or ideas.
Best, Fabian


2018-02-06 15:31 GMT+01:00 Sofer, Tovi 
>:
Hi group,

I wanted to get your suggestion on how to implement two requirements we have:

• One is to read from external message queue (JMS) at very fast latency

• Second is to support zero data loss, so that in case of restart and 
recovery, messages not checkpointed (and not part of state) will be replayed 
again.

(which indicates kind of replayble source)

Because of the first requirement we can’t write JMS messages to Kafka first and 
only then read from kafka, because it will increase latency.
Instead we thought to consume the JMS messages and forward them both to job and 
to KafkaSink.
Then in case of failure and recovery, we want to start in recovery mode, and 
read message from offset matching the state\checkpoint.
How can this be done? We though to somehow save in the state the last flushed 
kakfa offset.
The problem is this information is available only via future\interceptor and we 
don’t know how to connect it to state, so RecoverySource can use it…

So current suggestion looks something like:

Happy path:
JMSQueue-> JMSConsumer -> JobMessageParser(and additional operators), KafkaSink
(Here maybe we can add ProducerInterceptor-> which saves offset to state 
somehow)

Failure path: (will run before HappyPath to recover data)
RecoverySource-> JobMessageParser(and additional operators)
(Here maybe add Queryable state client which reads offsets from other operator 
state)

Thanks,
Tovi




Re: Classloader leak with Kafka Mbeans / JMX Reporter

2018-02-07 Thread Chesnay Schepler
This could be a bug in Kafkas JmxReporter class: 
https://issues.apache.org/jira/browse/KAFKA-6307


On 07.02.2018 13:37, Edward wrote:

We are using FlinkKafkaConsumer011 and FlinkKafkaProducer011, but we also
experienced the same behavior with FlinkKafkaConsumer010 and
FlinkKafkaProducer010.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/





RE: Flink CEP with files and no streams?

2018-02-07 Thread Esa Heikkinen
Hi

Thanks for the reply, but because I am a newbie with Flink, do you have any 
good Scala code examples about this ?

Esa

From: Fabian Hueske [mailto:fhue...@gmail.com]
Sent: Wednesday, February 7, 2018 11:21 AM
To: Esa Heikkinen 
Cc: user@flink.apache.org
Subject: Re: Flink CEP with files and no streams?

Hi Esa,
you can also read files as a stream.
However, you have to be careful in which order you read the files and how you 
generate watermarks.
The easiest approach is to implement a non-parallel source function that reads 
the files in the right order and generates watermarks.
Things become more tricky when you try to read the files in parallel.
Best, Fabian

2018-02-07 9:40 GMT+01:00 Esa Heikkinen 
>:
Hello

I am trying to use CEP of Flink for log files (as batch job), but not for 
streams (as realtime).
Is that possible ? If yes, do you know examples Scala codes about that ?

Or should I convert the log files (with time stamps) into streams ?
But how to handle time stamps in Flink ?

If I can not use Flink at all for this purpose, do you have any recommendations 
of other tools ?

I would want CEP type analysis for log files.





Re: Classloader leak with Kafka Mbeans / JMX Reporter

2018-02-07 Thread Edward
We are using FlinkKafkaConsumer011 and FlinkKafkaProducer011, but we also
experienced the same behavior with FlinkKafkaConsumer010 and
FlinkKafkaProducer010.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Rebalance to subtasks in same TaskManager instance

2018-02-07 Thread johannes.barn...@clarivate.com
Hi Piotrek

Yes I've compared rebalance with rescale. I adjusted the parallelism of the
source and target operators so that rescale would behave more or less like
the "local or shuffle grouping" option. I was able to show that for my use
case a "local or shuffle grouping" option would yield at least about 30%
performance improvement over rebalance.

I will submit a feature request.

Thanks for your prompt reply.

Hans



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Kafka and parallelism

2018-02-07 Thread Christophe Jolif
Ok thanks! I should have seen this. Sorry.

--
Christophe

On Wed, Feb 7, 2018 at 10:27 AM, Tzu-Li (Gordon) Tai 
wrote:

> Hi Christophe,
>
> Yes, you can achieve writing to different topics per-message using the
> `KeyedSerializationSchema` provided to the Kafka producer.
> The schema interface has a `getTargetTopic` method which allows you to
> override the default target topic for a given record.
> I agree that the method is somewhat odd to be part of the serialization
> schema, so I have also been thinking about moving that elsewhere (maybe as
> part of the partitioner).
>
> If you want to route a record to some topic depending on which topic it
> came from on the consumer side, you’ll have to wrap the source topic
> information within the records so that it is available to the producer.
> You can access that in the `KeyedDeserializationSchema#deserialize`
> method, which exposes information about which topic and partition each
> record came from.
>
> Cheers,
> Gordon
>
> On 7 February 2018 at 9:40:50 AM, Christophe Jolif (cjo...@gmail.com)
> wrote:
>
> Hi Gordon, or anyone else reading this,
>
> Still on this idea that I consume a Kafka topic pattern.
>
> I want to then to sink the result of the processing in a set of topics
> depending on from where the original message came from (i.e. if this comes
> from origin-topic-1 I will serialize the result in destination-topic-1, if
> from topic-2 to topic-2 etc...). However the KafkaProducer is working on a
> fixed topic. You can provide a partitioning function
> (FlinkKafkaPartitioner) but not a "topic" function that would allow to
> decide to witch topic sending the message a bit like a BucketingSink would
> decide the bucket or ElasticsearchSinkFunction allows you to choose the
> index.
>
> Am I missing something? The reason I'm asking is that some of the sink
> ctor are talking about "defaultTopicId" and some about "topicId" just like
> if in some case there was some ability to override the topic. Is there
> there a feature that allows me to do that?
>
> If not do you think this would be a worthwhile addition?
>
> Thanks again,
> --
> Christophe
>
> On Mon, Feb 5, 2018 at 9:52 AM, Tzu-Li (Gordon) Tai 
> wrote:
>
>> Hi Christophe,
>>
>> You can set the parallelism of the FlinkKafkaConsumer independently of
>> the total number of Kafka partitions (across all subscribed streams,
>> including newly created streams that match a subscribed pattern).
>>
>> The consumer deterministically assigns each partition to a single
>> consumer subtask, in a round-robin fashion.
>> E.g. if the parallelism of your FlinkKafkaConsumer is 2, and there is 6
>> partitions, each consumer subtask will be assigned 3 partitions.
>>
>> As for topic pattern subscription, FlinkKafkaConsumers starting from
>> version 1.4.0 support this feature. You can take a look at [1] on how to do
>> that.
>>
>> Hope this helps!
>>
>> Cheers,
>> Gordon
>>
>> [1] https://ci.apache.org/projects/flink/flink-docs-release-
>> 1.4/dev/connectors/kafka.html#kafka-consumers-topic-and-
>> partition-discovery
>>
>> On 3 February 2018 at 6:53:47 PM, Christophe Jolif (cjo...@gmail.com)
>> wrote:
>>
>> Hi,
>>
>> If I'm sourcing from a KafkaConsumer do I have to explicitly set the
>> Flink job parallelism to the number of partions or will it adjust
>> automatically accordingly? In other word if I don't call setParallelism
>> will get 1 or the number of partitions?
>>
>> The reason I'm asking is that I'm listening to a topic pattern not a
>> single topic and the number of actual topic (and so partitions) behind the
>> pattern can change so it is not possible to know ahead ot time how many
>> partitions I will get.
>>
>> Thanks!
>> --
>> Christophe
>>
>>
>
>
> --
> Christophe
>
>


-- 
Christophe


RE: How to handle multiple yarn sessions and choose at runtime the one to submit a ha streaming job ?

2018-02-07 Thread LINZ, Arnaud
Hi,

Without any other solution, I made a shell script that copies the original 
content of FLINK_CONF_DIR in a temporary rep, modify flink-conf.yaml to set 
yarn.properties-file.location, and change FLINK_CONF_DIR to that temp rep 
before executing flink.
I am now able to select the container I want, but I think it should be made 
simpler...
I'll open a Jira.

Best regards,
Arnaud


De : LINZ, Arnaud
Envoyé : jeudi 1 février 2018 16:23
À : user@flink.apache.org
Objet : How to handle multiple yarn sessions and choose at runtime the one to 
submit a ha streaming job ?

Hello,

I am using Flink 1.3.2 and I'm struggling to achieve something that should be 
simple.
For isolation reasons, I want to start multiple long living yarn session 
containers (with the same user) and choose at run-time, when I start a HA 
streaming app, which container will hold it.

I start my yarn session with the command line option : 
-Dyarn.properties-file.location=mydir
The session is created and a .yarn-properties-$USER file is generated.

And I've tried the following to submit my job:

CASE 1
flink-conf.yaml : yarn.properties-file.location: mydir
flink run options : none

  *   Uses zookeeper and works  - but I cannot choose the container as the 
property file is global.

CASE 2
flink-conf.yaml : nothing
flink run options : -yid applicationId

  *   Do not use zookeeper, tries to connect to yarn job manager but fails in 
"Job submission to the JobManager timed out" error

CASE 3
flink-conf.yaml : nothing
flink run options : -yid applicationId and -yD with all dynamic properties 
found in the "dynamicPropertiesString" of .yarn-properties-$USER file

  *   Same as case 2

CASE 4
flink-conf.yaml : nothing
flink run options : -yD yarn.properties-file.location=mydir

  *   Tries to connect to local (non yarn) job manager (and fails)

CASE 5
Even weirder:
flink-conf.yaml : yarn.properties-file.location: mydir
flink run options : -yD yarn.properties-file.location=mydir

  *   Still tries to connect to local (non yarn) job manager!

What am I doing wrong?

Logs extracts :
CASE 1:
2018:02:01 15:43:20 - Waiting until all TaskManagers have connected
2018:02:01 15:43:20 - Starting client actor system.
2018:02:01 15:43:20 - Starting ZooKeeperLeaderRetrievalService.
2018:02:01 15:43:20 - Trying to select the network interface and address to use 
by connecting to the leading JobManager.
2018:02:01 15:43:20 - TaskManager will try to connect for 1 milliseconds 
before falling back to heuristics
2018:02:01 15:43:21 - Retrieved new target address 
elara-data-u2-n01.dev.mlb.jupiter.nbyt.fr/10.136.170.193:33970.
2018:02:01 15:43:21 - Stopping ZooKeeperLeaderRetrievalService.
2018:02:01 15:43:21 - Slf4jLogger started
2018:02:01 15:43:21 - Starting remoting
2018:02:01 15:43:21 - Remoting started; listening on addresses 
:[akka.tcp://fl...@elara-edge-u2-n01.dev.mlb.jupiter.nbyt.fr:36340]
2018:02:01 15:43:21 - Starting ZooKeeperLeaderRetrievalService.
2018:02:01 15:43:21 - Stopping ZooKeeperLeaderRetrievalService.
2018:02:01 15:43:21 - TaskManager status (2/1)
2018:02:01 15:43:21 - All TaskManagers are connected
2018:02:01 15:43:21 - Submitting job with JobID: 
f69197b0b80a76319a87bde10c1e3f77. Waiting for job completion.
2018:02:01 15:43:21 - Starting ZooKeeperLeaderRetrievalService.
2018:02:01 15:43:21 - Received SubmitJobAndWait(JobGraph(jobId: 
f69197b0b80a76319a87bde10c1e3f77)) but there is no connection to a JobManager 
yet.
2018:02:01 15:43:21 - Received job SND-IMP-SIGNAST 
(f69197b0b80a76319a87bde10c1e3f77).
2018:02:01 15:43:21 - Disconnect from JobManager null.
2018:02:01 15:43:21 - Connect to JobManager 
Actor[akka.tcp://fl...@elara-data-u2-n01.dev.mlb.jupiter.nbyt.fr:33970/user/jobmanager#-1554418245].
2018:02:01 15:43:21 - Connected to JobManager at 
Actor[akka.tcp://fl...@elara-data-u2-n01.dev.mlb.jupiter.nbyt.fr:33970/user/jobmanager#-1554418245]
 with leader session id 388af5b8--4923-8ee4-8a4b9bfbb0b9.
2018:02:01 15:43:21 - Sending message to JobManager 
akka.tcp://fl...@elara-data-u2-n01.dev.mlb.jupiter.nbyt.fr:33970/user/jobmanager
 to submit job SND-IMP-SIGNAST (f69197b0b80a76319a87bde10c1e3f77) and wait for 
progress
2018:02:01 15:43:21 - Upload jar files to job manager 
akka.tcp://fl...@elara-data-u2-n01.dev.mlb.jupiter.nbyt.fr:33970/user/jobmanager.
2018:02:01 15:43:21 - Blob client connecting to 
akka.tcp://fl...@elara-data-u2-n01.dev.mlb.jupiter.nbyt.fr:33970/user/jobmanager
2018:02:01 15:43:22 - Submit job to the job manager 
akka.tcp://fl...@elara-data-u2-n01.dev.mlb.jupiter.nbyt.fr:33970/user/jobmanager.
2018:02:01 15:43:22 - Job f69197b0b80a76319a87bde10c1e3f77 was successfully 
submitted to the JobManager akka://flink/deadLetters.
2018:02:01 15:43:22 - 02/01/2018 15:43:22   Job execution switched to status 
RUNNING.

CASE 2:
2018:02:01 15:48:43 - Waiting until all TaskManagers have connected
2018:02:01 15:48:43 - Starting client actor system.
2018:02:01 15:48:43 - Trying to select the network interface and address to 

Re: kafka as recovery only source

2018-02-07 Thread Fabian Hueske
Hi Tovi,

I've been thinking about this idea.
It might be possible, but I think you have to implement a custom source for
this.

I don't think it would work to have the JMSConsumer, KafkaSink, and
RecoverySource in separate operators because otherwise it would not be
possible to share the Kafka write offset of the recovery topic at
checkpoints.
QueryableState as you suggested only works for keyed state which is (AFAIK)
not available for sources.
The custom source operator would consume from JMS and directly write all
records to Kafka. In case of a recovery, it starts reading from Kafka and
continues with JMS once the recovery topic has been completely consumed.
If you run the source in parallel, you need to handle the partitions of
Kafka recovery topic.

I'm adding Gordon to this thread who might have additional comments or
ideas.

Best, Fabian


2018-02-06 15:31 GMT+01:00 Sofer, Tovi :

> Hi group,
>
>
>
> I wanted to get your suggestion on how to implement two requirements we
> have:
>
> · One is to read from external message queue (JMS) at very fast
> latency
>
> · Second is to support zero data loss, so that in case of restart
> and recovery, messages not checkpointed (and not part of state) will be
> replayed again.
>
> (which indicates kind of replayble source)
>
>
>
> Because of the first requirement we can’t write JMS messages to Kafka
> first and only then read from kafka, because it will increase latency.
>
> Instead we thought to consume the JMS messages and forward them both to
> job and to KafkaSink.
>
> Then in case of failure and recovery, we want to start in recovery mode,
> and read message from offset matching the state\checkpoint.
>
> How can this be done? We though to somehow save in the state the last
> flushed kakfa offset.
>
> The problem is this information is available only via future\interceptor
> and we don’t know how to connect it to state, so RecoverySource can use it…
>
>
>
> So current suggestion looks something like:
>
>
>
> Happy path:
>
> JMSQueue-> JMSConsumer -> JobMessageParser(and additional operators),
> KafkaSink
>
> (Here maybe we can add ProducerInterceptor-> which saves offset to state
> somehow)
>
>
>
> Failure path: (will run before HappyPath to recover data)
>
> RecoverySource-> JobMessageParser(and additional operators)
>
> (Here maybe add Queryable state client which reads offsets from other
> operator state)
>
>
>
> Thanks,
>
> Tovi
>
>
>


Re: Triggering a Savepoint

2018-02-07 Thread Fabian Hueske
Hi Gregory,

IMO, that would be a viable approach.
You have to ensure that all operators (except the sources) have the same
UIDs and state types but I guess you don't want to change the application
logic and just replace the sources.

What might be tricky is to perform the savepoint at the right point in time
when all historic data has been processed and before the job is shutdown.
You might need to add an idle source, that ensures that the job keeps
running even after all files were read.
Another challenge could be to have a seamless handover between historic and
live data which also depends on how you persist the historic data. For
example, do you know the offset until which point the files are written?

Let me know if you have more questions,
Fabian

2018-01-25 20:52 GMT+01:00 Gregory Fee :

> Hi group, I want to bootstrap some aggregates based on historic data in S3
> and then keep them updated based on a stream. To do this I was thinking of
> doing something like processing all of the historic data, doing a save
> point, then restoring my program from that save point but with a stream
> source instead. Does this seem like a reasonable approach or is there a
> better way to approach this functionality? There does not appear to be a
> straightforward way of doing it the way I was thinking so any advice would
> be appreciated. Thanks!


Re: Kafka and parallelism

2018-02-07 Thread Tzu-Li (Gordon) Tai
Hi Christophe,

Yes, you can achieve writing to different topics per-message using the 
`KeyedSerializationSchema` provided to the Kafka producer.
The schema interface has a `getTargetTopic` method which allows you to override 
the default target topic for a given record.
I agree that the method is somewhat odd to be part of the serialization schema, 
so I have also been thinking about moving that elsewhere (maybe as part of the 
partitioner).

If you want to route a record to some topic depending on which topic it came 
from on the consumer side, you’ll have to wrap the source topic information 
within the records so that it is available to the producer.
You can access that in the `KeyedDeserializationSchema#deserialize` method, 
which exposes information about which topic and partition each record came from.

Cheers,
Gordon

On 7 February 2018 at 9:40:50 AM, Christophe Jolif (cjo...@gmail.com) wrote:

Hi Gordon, or anyone else reading this,

Still on this idea that I consume a Kafka topic pattern. 

I want to then to sink the result of the processing in a set of topics 
depending on from where the original message came from (i.e. if this comes from 
origin-topic-1 I will serialize the result in destination-topic-1, if from 
topic-2 to topic-2 etc...). However the KafkaProducer is working on a fixed 
topic. You can provide a partitioning function (FlinkKafkaPartitioner) but not 
a "topic" function that would allow to decide to witch topic sending the 
message a bit like a BucketingSink would decide the bucket or 
ElasticsearchSinkFunction allows you to choose the index. 

Am I missing something? The reason I'm asking is that some of the sink ctor are 
talking about "defaultTopicId" and some about "topicId" just like if in some 
case there was some ability to override the topic. Is there there a feature 
that allows me to do that?

If not do you think this would be a worthwhile addition?

Thanks again,
--
Christophe

On Mon, Feb 5, 2018 at 9:52 AM, Tzu-Li (Gordon) Tai  wrote:
Hi Christophe,

You can set the parallelism of the FlinkKafkaConsumer independently of the 
total number of Kafka partitions (across all subscribed streams, including 
newly created streams that match a subscribed pattern).

The consumer deterministically assigns each partition to a single consumer 
subtask, in a round-robin fashion.
E.g. if the parallelism of your FlinkKafkaConsumer is 2, and there is 6 
partitions, each consumer subtask will be assigned 3 partitions.

As for topic pattern subscription, FlinkKafkaConsumers starting from version 
1.4.0 support this feature. You can take a look at [1] on how to do that.

Hope this helps!

Cheers,
Gordon

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kafka.html#kafka-consumers-topic-and-partition-discovery

On 3 February 2018 at 6:53:47 PM, Christophe Jolif (cjo...@gmail.com) wrote:

Hi,

If I'm sourcing from a KafkaConsumer do I have to explicitly set the Flink job 
parallelism to the number of partions or will it adjust automatically 
accordingly? In other word if I don't call setParallelism will get 1 or the 
number of partitions?

The reason I'm asking is that I'm listening to a topic pattern not a single 
topic and the number of actual topic (and so partitions) behind the pattern can 
change so it is not possible to know ahead ot time how many partitions I will 
get.

Thanks!
--
Christophe



--
Christophe

Re: Spurious warning in logs about flink-queryable-state-runtime

2018-02-07 Thread Chesnay Schepler
This is expected behavior; we try to load the queryable state classes 
via reflection as it is an optional feature.
I'll open a jira to make it less verbose if the classes cannot be found, 
in which case the stacktrace isn't particularly

interesting anyway.

On 05.02.2018 10:18, Fabian Hueske wrote:

Hmm, this seems indeed strange.
Thanks for reporting the issue Ken.

@Kostas: Do you know what is happening here? Can it be avoided or 
should we file a JIRA for this?


Thanks,
Fabian

2018-01-31 23:13 GMT+01:00 Ken Krugler >:


Hi all,

In unit tests that use the LocalFilinkMiniCluster, with Flink 1.4,
I now get this warning in my logs:

> 18/01/31 13:28:19 WARN query.QueryableStateUtils:76 - Could not
load Queryable State Client Proxy. Probable reason:
flink-queryable-state-runtime is not in the classpath. Please put
the corresponding jar from the opt to the lib folder.
> 18/01/31 13:28:19 DEBUG query.QueryableStateUtils:79 - Caught
exception
> java.lang.ClassNotFoundException:
org.apache.flink.queryablestate.client.proxy.KvStateClientProxyImpl
>   at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>   at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>   at
sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:335)
>   at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>   at java.lang.Class.forName0(Native Method)
>   at java.lang.Class.forName(Class.java:264)
>   at

org.apache.flink.runtime.query.QueryableStateUtils.createKvStateClientProxy(QueryableStateUtils.java:67)
>   at

org.apache.flink.runtime.taskexecutor.TaskManagerServices.createNetworkEnvironment(TaskManagerServices.java:339)
>   at

org.apache.flink.runtime.taskexecutor.TaskManagerServices.fromConfiguration(TaskManagerServices.java:159)
>   at

org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster.startTaskManager(LocalFlinkMiniCluster.scala:240)

>From what I see in the code, this warning will always be
triggered, yes?

It seems odd, if I’m not explicitly setting up to use queryable
state, that this is flagged as a problem.

Is there any way in test code to explicitly configure things to
avoid it?

Thanks,

— Ken


http://about.me/kkrugler
+1 530-210-6378 






Re: Flink CEP with files and no streams?

2018-02-07 Thread Fabian Hueske
Hi Esa,

you can also read files as a stream.
However, you have to be careful in which order you read the files and how
you generate watermarks.
The easiest approach is to implement a non-parallel source function that
reads the files in the right order and generates watermarks.
Things become more tricky when you try to read the files in parallel.

Best, Fabian

2018-02-07 9:40 GMT+01:00 Esa Heikkinen :

> Hello
>
>
>
> I am trying to use CEP of Flink for log files (as batch job), but not for
> streams (as realtime).
>
> Is that possible ? If yes, do you know examples Scala codes about that ?
>
>
>
> Or should I convert the log files (with time stamps) into streams ?
>
> But how to handle time stamps in Flink ?
>
>
>
> If I can not use Flink at all for this purpose, do you have any
> recommendations of other tools ?
>
>
>
> I would want CEP type analysis for log files.
>
>
>
>
>


Re: Question about flink checkpoint

2018-02-07 Thread Fabian Hueske
Hi Chengzhi Zhao,

I think this is rather an issue with the ContinuousFileReaderOperator than
with the checkpointing algorithm in general.
A source can decide which information to store as state and also how to
handle failures such as file paths that have been put into state but have
been removed from the file system.

It would be great if you could open a JIRA issue with a feature request to
improve the failure behavior of the ContinuousFileReaderOperator.
It could for example check if a path exists and before trying to read a
file and ignore the input split instead of throwing an exception and
causing a failure.
If you want to, you can also work on a fix and contribute it back.

Best, Fabian

2018-02-06 19:15 GMT+01:00 Chengzhi Zhao :

> Hey, I am new to flink and I have a question and want to see if anyone can
> help here.
>
> So we have a s3 path that flink is monitoring that path to see new files
> available.
>
> val avroInputStream_activity = env.readFile(format, path,
> FileProcessingMode.PROCESS_CONTINUOUSLY, 1)
>
> I am doing both internal and external check pointing and let's say there
> is a bad file came to the path and flink will do several retries. I want to
> take those bad files and let the process continue. However, since the file
> path persist in the checkpoint, when I try to resume from external
> checkpoint, it threw the following error on no file been found.
>
> java.io.IOException: Error opening the Input Split s3a://myfile [0,904]:
> No such file or directory: s3a://myfile
>
> Is there a way to skip this bad file and move on?
> Thanks in advance.
>
> Best,
> Chengzhi Zhao
>
>


Re: Kafka and parallelism

2018-02-07 Thread Christophe Jolif
Hi Gordon, or anyone else reading this,

Still on this idea that I consume a Kafka topic pattern.

I want to then to sink the result of the processing in a set of topics
depending on from where the original message came from (i.e. if this comes
from origin-topic-1 I will serialize the result in destination-topic-1, if
from topic-2 to topic-2 etc...). However the KafkaProducer is working on a
fixed topic. You can provide a partitioning function
(FlinkKafkaPartitioner) but not a "topic" function that would allow to
decide to witch topic sending the message a bit like a BucketingSink would
decide the bucket or ElasticsearchSinkFunction allows you to choose the
index.

Am I missing something? The reason I'm asking is that some of the sink ctor
are talking about "defaultTopicId" and some about "topicId" just like if in
some case there was some ability to override the topic. Is there there a
feature that allows me to do that?

If not do you think this would be a worthwhile addition?

Thanks again,
--
Christophe

On Mon, Feb 5, 2018 at 9:52 AM, Tzu-Li (Gordon) Tai 
wrote:

> Hi Christophe,
>
> You can set the parallelism of the FlinkKafkaConsumer independently of the
> total number of Kafka partitions (across all subscribed streams, including
> newly created streams that match a subscribed pattern).
>
> The consumer deterministically assigns each partition to a single consumer
> subtask, in a round-robin fashion.
> E.g. if the parallelism of your FlinkKafkaConsumer is 2, and there is 6
> partitions, each consumer subtask will be assigned 3 partitions.
>
> As for topic pattern subscription, FlinkKafkaConsumers starting from
> version 1.4.0 support this feature. You can take a look at [1] on how to do
> that.
>
> Hope this helps!
>
> Cheers,
> Gordon
>
> [1] https://ci.apache.org/projects/flink/flink-docs-
> release-1.4/dev/connectors/kafka.html#kafka-consumers-
> topic-and-partition-discovery
>
> On 3 February 2018 at 6:53:47 PM, Christophe Jolif (cjo...@gmail.com)
> wrote:
>
> Hi,
>
> If I'm sourcing from a KafkaConsumer do I have to explicitly set the Flink
> job parallelism to the number of partions or will it adjust automatically
> accordingly? In other word if I don't call setParallelism will get 1 or the
> number of partitions?
>
> The reason I'm asking is that I'm listening to a topic pattern not a
> single topic and the number of actual topic (and so partitions) behind the
> pattern can change so it is not possible to know ahead ot time how many
> partitions I will get.
>
> Thanks!
> --
> Christophe
>
>


-- 
Christophe


Flink CEP with files and no streams?

2018-02-07 Thread Esa Heikkinen
Hello

I am trying to use CEP of Flink for log files (as batch job), but not for 
streams (as realtime).
Is that possible ? If yes, do you know examples Scala codes about that ?

Or should I convert the log files (with time stamps) into streams ?
But how to handle time stamps in Flink ?

If I can not use Flink at all for this purpose, do you have any recommendations 
of other tools ?

I would want CEP type analysis for log files.