Couldn't figure out - How to do this in Flink? - Pls assist with suggestions

2019-02-07 Thread Titus Rakkesh
Dears,

I have a data stream continuously coming,

DataStream> splitZTuple;

Eg  - (775168263,113182,0.0)

I have to store this for 24 hrs expiry in somewhere (Window or somewhere)
to check against another stream.

The second stream is

DataStream> splittedVomsTuple which also
continuously receiving one.

Eg. (775168263,100.0)


We need to accumulate the third element in (775168263,113182,*0.0*) in the
WINDOW (If the corresponding first element match happened with the incoming
second streams second element 775168263,*100.0*)

While keeping this WINDOW session if any (775168263,113182,*175*) third
element in the Window Stream exceed a value (Eg >150) we need to call back
a another rest point to send an alert --- (775168263,113182,*175*)
match the criteria. Simply a CEP call back.


In Flink how we can do this kind of operations? Or do I need to think about
any other framework? Please advise.

Thanks...


Re: MapState - TypeSerializer

2019-02-07 Thread Alexey Trenikhun
But it will be two TypeSerializerConfigSnapshots, otherwise unclear how 
TypeSerializer2 will able to check compatibility?

Thanks,
Alexey



From: Congxian Qiu 
Sent: Thursday, February 7, 2019 8:14 PM
To: Alexey Trenikhun
Cc: user@flink.apache.org
Subject: Re: MapState - TypeSerializer

Hi, Alexey
In your case, only TypeSerializer2 will be stored in meta information. and 
TypeSerializer2 and TypeSeriaizer1 have to be compatible.

Best,
Congxian


Alexey Trenikhun mailto:yen...@msn.com>> 于2019年2月8日周五 上午10:39写道:
What if I’m using RocksDB, and MapState had single entry and TypeSerializer1, 
then we take save point upgrade job (TypeSerializer2), put new entry, at that 
point we have two entries written by different serializers, so both 
TypeSerializers should be stored in meta information?
Thanks,
Alexey



From: Andrey Zagrebin 
mailto:and...@data-artisans.com>>
Sent: Wednesday, November 28, 2018 2:23 AM
To: Alexey Trenikhun
Cc: user@flink.apache.org
Subject: Re: MapState - TypeSerializer

Hi Alexey,

it is written once per state name in its meta information, apart from user data 
entries.

Best,
Andrey

On 28 Nov 2018, at 04:56, Alexey Trenikhun 
mailto:yen...@msn.com>> wrote:

Hello,
Flink documentation states that “TypeSerializers and 
TypeSerializerConfigSnapshots are written as part of checkpoints along with the 
state values”, in context of MapState, does it mean TypeSerializer per each 
MapState entry or only once per state?
Alexey



Re: MapState - TypeSerializer

2019-02-07 Thread Congxian Qiu
Hi, Alexey
In your case, only TypeSerializer2 will be stored in meta information.
and TypeSerializer2 and TypeSeriaizer1 have to be compatible.

Best,
Congxian


Alexey Trenikhun  于2019年2月8日周五 上午10:39写道:

> What if I’m using RocksDB, and MapState had single entry and
> TypeSerializer1, then we take save point upgrade job (TypeSerializer2), put
> new entry, at that point we have two entries written by different
> serializers, so both TypeSerializers should be stored in meta information?
> Thanks,
> Alexey
>
>
> --
> *From:* Andrey Zagrebin 
> *Sent:* Wednesday, November 28, 2018 2:23 AM
> *To:* Alexey Trenikhun
> *Cc:* user@flink.apache.org
> *Subject:* Re: MapState - TypeSerializer
>
> Hi Alexey,
>
> it is written once per state name in its meta information, apart from user
> data entries.
>
> Best,
> Andrey
>
> On 28 Nov 2018, at 04:56, Alexey Trenikhun  wrote:
>
> Hello,
> Flink documentation states that “TypeSerializers and
> TypeSerializerConfigSnapshots are written as part of checkpoints along with
> the state values”, in context of MapState, does it mean TypeSerializer per
> each MapState entry or only once per state?
> Alexey
>
>
>


Re: MapState - TypeSerializer

2019-02-07 Thread Alexey Trenikhun
What if I’m using RocksDB, and MapState had single entry and TypeSerializer1, 
then we take save point upgrade job (TypeSerializer2), put new entry, at that 
point we have two entries written by different serializers, so both 
TypeSerializers should be stored in meta information?
Thanks,
Alexey



From: Andrey Zagrebin 
Sent: Wednesday, November 28, 2018 2:23 AM
To: Alexey Trenikhun
Cc: user@flink.apache.org
Subject: Re: MapState - TypeSerializer

Hi Alexey,

it is written once per state name in its meta information, apart from user data 
entries.

Best,
Andrey

On 28 Nov 2018, at 04:56, Alexey Trenikhun 
mailto:yen...@msn.com>> wrote:

Hello,
Flink documentation states that “TypeSerializers and 
TypeSerializerConfigSnapshots are written as part of checkpoints along with the 
state values”, in context of MapState, does it mean TypeSerializer per each 
MapState entry or only once per state?
Alexey



Flink Job and Watermarking

2019-02-07 Thread Kaustubh Rudrawar
Hi,

I'm writing a job that wants to make an HTTP request once a watermark has
reached all tasks of an operator. It would be great if this could be
determined from outside the Flink job, but I don't think it's possible to
access watermark information for the job as a whole. Below is a workaround
I've come up with:

   1. Read messages from Kafka using the provided KafkaSource. Event time
   will be defined as a timestamp within the message.
   2. Key the stream based on an id from the message.
   3. DedupOperator that dedupes messages. This operator will run with a
   parallelism of N.
   4. An operator that persists the messages to S3. It doesn't need to
   output anything - it should ideally be a Sink (if it were a sink we could
   use the StreamingFileSink).
   5. Implement an operator that will make an HTTP request once
   processWatermark is called for time T. A parallelism of 1 will be used for
   this operator as it will do very little work. Because it has a parallelism
   of 1, the operator in step 4 cannot send anything to it as it could become
   a throughput bottleneck.

Does this implementation seem like a valid workaround? Any other
alternatives I should consider?

Thanks for your help,
Kaustubh


stream of large objects

2019-02-07 Thread Aggarwal, Ajay
In my use case my source stream contain small size messages, but as part of 
flink processing I will be aggregating them into large messages and further 
processing will happen on these large messages. The structure of this large 
message will be something like this:

   Class LargeMessage {
String key
   List  messages; // this is where the aggregation of smaller 
messages happen
   }

In some cases this list field of LargeMessage can get very large (1000’s of 
messages). Is it ok to create an intermediate stream of these LargeMessages? 
What should I be concerned about while designing the flink job? Specifically 
with parallelism in mind. As these LargeMessages flow from one flink subtask to 
another, do they get serialized/deserialized ?

Thanks.



Re: [External] Re: Flink and S3 AWS keys rotation

2019-02-07 Thread Antonio Verardi
Hi Bruno,

The problem with such a solution would be that those permissions will apply
to any application running on the Kubernetes cluster, not only to Flink.
Sharing resources with other applications is one of the cool things about
Kubernetes and it would be ideal not to lose such a property. That's why I
was looking into those IAM roles for pods/containers.

Cheers,
Antonio

On Thu, Feb 7, 2019 at 2:38 PM Bruno Aranda  wrote:

> Hi,
>
> You can give specific IAM instance roles to the instances running Flink.
> This way you never expose access keys anywhere. As the docs say, that is
> the recommended way (and not just for Flink, but for any service you want
> to use, never set it up with AWS credentials in config). IAM will
> transparently deal with the security, and you can be extremely restrictive
> on what policies you attach to the instance roles.
>
> Cheers,
>
> Bruno
>
> On Thu, 7 Feb 2019 at 13:38, Kostas Kloudas  wrote:
>
>> Hi Antonio,
>>
>> I am  cc'ing Till who may have something to say on this.
>>
>> Cheers,
>> Kostas
>>
>> On Thu, Feb 7, 2019 at 1:32 PM Antonio Verardi  wrote:
>>
>>> Hi there,
>>>
>>> I'm trying out to run Flink on Kubernetes and I run into a problem with
>>> the way Flink sets up AWS credentials to talk with S3 and the way we manage
>>> AWS secrets in my company.
>>>
>>> To give permissions to Flink I am using AWS keys embedded in flink.conf,
>>> as per
>>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/aws.html#configure-access-credentials.
>>> The problem there is that we rotate daily our AWS keys in order to mitigate
>>> any eventual leak of keys. In order to make Flink pick up the new keys I
>>> understand I have to restart it, but that means downtime, especially for
>>> the jobs which have a large state to save.
>>>
>>> I know that in Kubernetes land there are these two projects,
>>> https://github.com/uswitch/kiam and https://github.com/jtblin/kube2iam
>>> , that make possible to associate
>>> IAM policies to pods/containers. But they are not part of the "official"
>>> Kubernetes software, which kinda surprises me.
>>>
>>> Did anyone run into a similar problem? If so, how did you solve it?
>>>
>>> Cheers,
>>> Antonio
>>>
>>


Re: Flink and S3 AWS keys rotation

2019-02-07 Thread Bruno Aranda
Hi,

You can give specific IAM instance roles to the instances running Flink.
This way you never expose access keys anywhere. As the docs say, that is
the recommended way (and not just for Flink, but for any service you want
to use, never set it up with AWS credentials in config). IAM will
transparently deal with the security, and you can be extremely restrictive
on what policies you attach to the instance roles.

Cheers,

Bruno

On Thu, 7 Feb 2019 at 13:38, Kostas Kloudas  wrote:

> Hi Antonio,
>
> I am  cc'ing Till who may have something to say on this.
>
> Cheers,
> Kostas
>
> On Thu, Feb 7, 2019 at 1:32 PM Antonio Verardi  wrote:
>
>> Hi there,
>>
>> I'm trying out to run Flink on Kubernetes and I run into a problem with
>> the way Flink sets up AWS credentials to talk with S3 and the way we manage
>> AWS secrets in my company.
>>
>> To give permissions to Flink I am using AWS keys embedded in flink.conf,
>> as per
>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/aws.html#configure-access-credentials.
>> The problem there is that we rotate daily our AWS keys in order to mitigate
>> any eventual leak of keys. In order to make Flink pick up the new keys I
>> understand I have to restart it, but that means downtime, especially for
>> the jobs which have a large state to save.
>>
>> I know that in Kubernetes land there are these two projects,
>> https://github.com/uswitch/kiam and https://github.com/jtblin/kube2iam
>> , that make possible to associate
>> IAM policies to pods/containers. But they are not part of the "official"
>> Kubernetes software, which kinda surprises me.
>>
>> Did anyone run into a similar problem? If so, how did you solve it?
>>
>> Cheers,
>> Antonio
>>
>


Re: Exactly Once Guarantees with StreamingFileSink to S3

2019-02-07 Thread Kostas Kloudas
No problem!

On Wed, Feb 6, 2019 at 6:38 PM Kaustubh Rudrawar  wrote:

> Hi Kostas,
>
> Thanks for the response! Yes - I see the commitAfterRecovery being called
> when a Bucket is restored. I confused myself in thinking that
> 'onSuccessfulCompletionOfCheckpoint' is called on restore as well, which
> led me to believe that we were only calling commit and not
> commitAfterRecovery.
>
> Thanks for the clarification!
> -Kaustubh
>
> On Wed, Feb 6, 2019 at 2:16 AM Kostas Kloudas  wrote:
>
>> Hi Kaustubh,
>>
>> Your general understanding is correct.
>>
>> In this case though, the sink will call the
>> S3Committer#commitAfterRecovery() method.
>> This method, after failing to commit the MPU, it will check if the file
>> is there and if the length
>> is correct, and if everything is ok (which is the case in your example),
>> then it will
>> continue to normal execution.
>>
>> I hope this helps.
>>
>> Kostas
>>
>> On Wed, Feb 6, 2019 at 7:47 AM Kaustubh Rudrawar 
>> wrote:
>>
>>> Hi,
>>>
>>> I'm trying to understand the exactly once semantics of the
>>> StreamingFileSink with S3 in Flink 1.7.1 and am a bit confused on how it
>>> guarantees exactly once under a very specific failure scenario.
>>>
>>> For simplicity, lets say we will roll the current part file on
>>> checkpoint (and only on checkpoint), the process is as follows:
>>> 1. Framework tells the sink to prepare for a checkpoint. This ultimately
>>> results in 'onReceptionOfCheckpoint' being called on Bucket.java.
>>> 2. This takes the current file, and based on our roll policy of rolling
>>> on checkpoint, it closes and uploads it to S3 as part of a MPU and the
>>> reference to this upload is stored as part of 'pendingPartsPerCheckpoint'.
>>> 3. Once the checkpoint successfully completes, the bucket is notified
>>> via 'onSuccessfulCompletionOfCheckpoint'. At this point, the bucket goes
>>> through all pendingPartsPerCheckpoint and for each of them: recovers the in
>>> progress part (which doesn't exist in this scenario) and then commits the
>>> upload.
>>> 4. The AmazonS3Client is ultimately called to perform the upload and it
>>> retries the attempt up to N times. If it exhausts retries, it will throw an
>>> Exception.
>>> 5. Upon successful commit of the MPU, Bucket clears out its references
>>> to these uploads from its state.
>>>
>>> Given this flow, I'm having trouble understanding how the following
>>> scenario works:
>>>
>>>- Step 4: The commit on the MPU succeeds,
>>>- Step 5: Before this step completes, the task crashes. So at this
>>>point, S3 has successfully completed the MPU but to the client (the
>>>Flink job), it has not completed.
>>>- Flink will then recover from the checkpoint we just took and steps
>>>3 and 4 will be repeated. My understanding is that, since the MPU 
>>> succeeded
>>>previously, any attempts at re-committing that upload will result in a 
>>> 404
>>>('NoSuchUpload'). So Step 4 should throw an exception. Which would then 
>>> get
>>>retried by the framework and this process repeats itself.
>>>
>>> So how is this case handled?
>>>
>>> Really appreciate the help!
>>> -Kaustubh
>>>
>>>
>>>


Re: Avro serialization and deserialization to Kafka in Scala

2019-02-07 Thread Kostas Kloudas
Hi Wouter,

I think Gordon or Igal are the best to answer this question.

Cheers,
Kostas

On Thu, Feb 7, 2019 at 11:04 AM Wouter Zorgdrager 
wrote:

> Hello all,
>
>
> I saw the recent updates in Flink related to supporting Avro schema
> evolution in state. I'm curious how Flink handles this internally for Scala
> case classes. I'm working on custom (de-)serialization schema's to write
> and read from Kafka. However, I'm currently stuck because of the fact that
> Avro doesn't natively support Scala. This means that in order to support
> case class serialization using Scala specific types (like Option, Either,
> etc.) I need a library like Avro4s [1] or AvroHugger [2] which on
> compile-time generates schemas using macros. These macro-extensions are
> extremely slow for complex case classes (compile-time of 15 minutes for a
> few nested types). I'm looking for an approach without the use of these
> libraries and therefore curious how Flink handles this.
>
>
> Does anyone has some good leads for this?
>
>
> Thanks in advance!
>
>
> Kind regards,
> Wouter Zorgdrager
>
>
> [1] https://github.com/sksamuel/avro4s
> [2] https://github.com/julianpeeters/avrohugger
>
>


Re: Flink and S3 AWS keys rotation

2019-02-07 Thread Kostas Kloudas
Hi Antonio,

I am  cc'ing Till who may have something to say on this.

Cheers,
Kostas

On Thu, Feb 7, 2019 at 1:32 PM Antonio Verardi  wrote:

> Hi there,
>
> I'm trying out to run Flink on Kubernetes and I run into a problem with
> the way Flink sets up AWS credentials to talk with S3 and the way we manage
> AWS secrets in my company.
>
> To give permissions to Flink I am using AWS keys embedded in flink.conf,
> as per
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/aws.html#configure-access-credentials.
> The problem there is that we rotate daily our AWS keys in order to mitigate
> any eventual leak of keys. In order to make Flink pick up the new keys I
> understand I have to restart it, but that means downtime, especially for
> the jobs which have a large state to save.
>
> I know that in Kubernetes land there are these two projects,
> https://github.com/uswitch/kiam and https://github.com/jtblin/kube2iam
> , that make possible to associate
> IAM policies to pods/containers. But they are not part of the "official"
> Kubernetes software, which kinda surprises me.
>
> Did anyone run into a similar problem? If so, how did you solve it?
>
> Cheers,
> Antonio
>


Flink and S3 AWS keys rotation

2019-02-07 Thread Antonio Verardi
Hi there,

I'm trying out to run Flink on Kubernetes and I run into a problem with the
way Flink sets up AWS credentials to talk with S3 and the way we manage AWS
secrets in my company.

To give permissions to Flink I am using AWS keys embedded in flink.conf, as
per
https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/aws.html#configure-access-credentials.
The problem there is that we rotate daily our AWS keys in order to mitigate
any eventual leak of keys. In order to make Flink pick up the new keys I
understand I have to restart it, but that means downtime, especially for
the jobs which have a large state to save.

I know that in Kubernetes land there are these two projects,
https://github.com/uswitch/kiam and https://github.com/jtblin/kube2iam
, that make possible to associate IAM
policies to pods/containers. But they are not part of the "official"
Kubernetes software, which kinda surprises me.

Did anyone run into a similar problem? If so, how did you solve it?

Cheers,
Antonio


Re: How to use ExceptionUtils.findThrowable() in Scala

2019-02-07 Thread Averell
P/S: This is the full stack trace

2019-02-07 01:53:12.790 [I/O dispatcher 16] ERROR
o.a.f.s.connectors.elasticsearch.ElasticsearchSinkBase  - Failed
Elasticsearch item request: [...][[...][1]]
ElasticsearchException[Elasticsearch exception
[type=version_conflict_engine_exception, reason=[_doc][...]: version
conflict, document already exists (current version [1])]]
org.elasticsearch.ElasticsearchException: Elasticsearch exception
[type=version_conflict_engine_exception, reason=[_doc][...]: version
conflict, document already exists (current version [1])]
at
org.elasticsearch.ElasticsearchException.innerFromXContent(ElasticsearchException.java:510)
at
org.elasticsearch.ElasticsearchException.fromXContent(ElasticsearchException.java:421)
at
org.elasticsearch.action.bulk.BulkItemResponse.fromXContent(BulkItemResponse.java:135)
at
org.elasticsearch.action.bulk.BulkResponse.fromXContent(BulkResponse.java:198)
at
org.elasticsearch.client.RestHighLevelClient.parseEntity(RestHighLevelClient.java:653)
at
org.elasticsearch.client.RestHighLevelClient.lambda$performRequestAsyncAndParseEntity$3(RestHighLevelClient.java:549)
at
org.elasticsearch.client.RestHighLevelClient$1.onSuccess(RestHighLevelClient.java:580)
at
org.elasticsearch.client.RestClient$FailureTrackingResponseListener.onSuccess(RestClient.java:621)
at org.elasticsearch.client.RestClient$1.completed(RestClient.java:375)
at org.elasticsearch.client.RestClient$1.completed(RestClient.java:366)
at 
org.apache.http.concurrent.BasicFuture.completed(BasicFuture.java:119)
at
org.apache.http.impl.nio.client.DefaultClientExchangeHandlerImpl.responseCompleted(DefaultClientExchangeHandlerImpl.java:177)
at
org.apache.http.nio.protocol.HttpAsyncRequestExecutor.processResponse(HttpAsyncRequestExecutor.java:436)
at
org.apache.http.nio.protocol.HttpAsyncRequestExecutor.inputReady(HttpAsyncRequestExecutor.java:326)
at
org.apache.http.impl.nio.DefaultNHttpClientConnection.consumeInput(DefaultNHttpClientConnection.java:265)
at
org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:81)
at
org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:39)
at
org.apache.http.impl.nio.reactor.AbstractIODispatch.inputReady(AbstractIODispatch.java:121)
at
org.apache.http.impl.nio.reactor.BaseIOReactor.readable(BaseIOReactor.java:162)
at
org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvent(AbstractIOReactor.java:337)
at
org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvents(AbstractIOReactor.java:315)
at
org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:276)
at
org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104)
at
org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:588)
at java.lang.Thread.run(Thread.java:748)



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: No resource available error while testing HA

2019-02-07 Thread Averell
Hi Gary,

I am trying to reproduce that problem.
BTW, is that possible to change log level (I'm using logback) for a running
job?

Thanks and regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


How to use ExceptionUtils.findThrowable() in Scala

2019-02-07 Thread Averell
Hello,

I am trying to implement error handling in ElasticSearch sink (following the
seem-outdated Flink document [1])


override def onFailure(actionRequest: ActionRequest, failure: Throwable,
restStatusCode: Int, indexer: RequestIndexer): Unit = {
if (ExceptionUtils.findThrowable(failure,
classOf[org.elasticsearch.index.engine.VersionConflictEngineException]) !=
Optional.empty()) {
LOG.warn("Failed inserting record to ElasticSearch: 
statusCode {}
message: {} record: {} stacktrace {}.\nRetrying", restStatusCode.toString,
failure.getMessage, actionRequest.toString, failure.getStackTrace)
// Do something here
}
else {
LOG.error(s"ELASTICSEARCH FAILED:\nstatusCode 
$restStatusCode\n   
message: ${failure.getMessage}\n${failure.getStackTrace}")
}
}


I tried to have different handling for the case of
VersionConflictEngineException, but failed. It always came to the "else"
branch, thus my log message is:
/ELASTICSEARCH FAILED:
statusCode 409
message: Elasticsearch exception
[type=version_conflict_engine_exception, reason=[_doc][...]: version
conflict, document already exists (current version [1])]
/
Thanks and best regards,
Averell

[1]  handling-failing-elasticsearch-requests

  




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Avro serialization and deserialization to Kafka in Scala

2019-02-07 Thread Wouter Zorgdrager
Hello all,


I saw the recent updates in Flink related to supporting Avro schema evolution 
in state. I'm curious how Flink handles this internally for Scala case classes. 
I'm working on custom (de-)serialization schema's to write and read from Kafka. 
However, I'm currently stuck because of the fact that Avro doesn't natively 
support Scala. This means that in order to support case class serialization 
using Scala specific types (like Option, Either, etc.) I need a library like 
Avro4s [1] or AvroHugger [2] which on compile-time generates schemas using 
macros. These macro-extensions are extremely slow for complex case classes 
(compile-time of 15 minutes for a few nested types). I'm looking for an 
approach without the use of these libraries and therefore curious how Flink 
handles this.


Does anyone has some good leads for this?


Thanks in advance!


Kind regards,
Wouter Zorgdrager


[1] https://github.com/sksamuel/avro4s
[2] https://github.com/julianpeeters/avrohugger



Re: graceful shutdown of taskmanager

2019-02-07 Thread Till Rohrmann
Hi Bernd,

at the moment this is not supported out of the box by Flink. What you can
do is the following: First cancel a job with savepoint. After the job has
been terminated, terminate TaskManagers and then resume the job from the
savepoint you've just taken. This assumes that you have a single job
running on a cluster. If you have multiple jobs running, then you might
need to cancel with savepoint every running job.

Cheers,
Till

On Wed, Feb 6, 2019 at 11:54 AM  wrote:

> Hi
> Is there a possibility to gracefully remove a taskmanager from a running
> cluster?
> My idea would be to trigger affected jobs to restart via a savepoint on
> the remaining taskmanagers. When the taskmanager is idle it can be stopped
> without jobs falling back to an older checkpoint.
>
> Regards
>
> Bernd
>
>
> *    *
>
>
> Landesbank Hessen-Thueringen Girozentrale
> Anstalt des oeffentlichen Rechts
> Sitz: Frankfurt am Main / Erfurt
> Amtsgericht Frankfurt am Main, HRA 29821 / Amtsgericht Jena, HRA 102181
>
> Bitte nutzen Sie die E-Mail-Verbindung mit uns ausschliesslich zum
> Informationsaustausch. Wir koennen auf diesem Wege keine
> rechtsgeschaeftlichen Erklaerungen (Auftraege etc.) entgegennehmen.
>
> Der Inhalt dieser Nachricht ist vertraulich und nur fuer den angegebenen
> Empfaenger bestimmt. Jede Form der Kenntnisnahme oder Weitergabe durch
> Dritte ist unzulaessig. Sollte diese Nachricht nicht fur Sie bestimmt sein,
> so bitten wir Sie, sich mit uns per E-Mail oder telefonisch in Verbindung
> zu setzen.
>
> Please use your E-mail connection with us exclusively for the exchange of
> information. We do not accept legally binding declarations (orders, etc.)
> by this means of communication.
>
> The contents of this message is confidential and intended only for the
> recipient indicated. Taking notice of this message or disclosure by third
> parties is not
> permitted. In the event that this message is not intended for you, please
> contact us via E-mail or phone.
>


Re: Running JobManager as Deployment instead of Job

2019-02-07 Thread Till Rohrmann
Hi Sergey,

the rationale why we are using a K8s job instead of a deployment is that a
Flink job cluster should terminate after it has successfully executed the
Flink job. This is unlike a session cluster which should run forever and
for which a K8s deployment would be better suited.

If in your use case a K8s deployment would better work, then I would
suggest to change the `job-cluster-job.yaml` accordingly.

Cheers,
Till

On Tue, Feb 5, 2019 at 4:12 PM Sergey Belikov 
wrote:

> Hi,
>
> my team is currently experimenting with Flink running in Kubernetes (job
> cluster setup). And we found out that with JobManager being deployed as
> "Job" we can't just simply update certain values in job's yaml, e.g.
> spec.template.spec.containers.image (
> https://github.com/kubernetes/kubernetes/issues/48388#issuecomment-319493817).
> This causes certain troubles in our CI/CD pipelines so we are thinking
> about using "Deployment" instead of "Job".
>
> With that being said I'm wondering what was the motivation behind using
> "Job" resource for deploying JobManager? And are there any pitfalls related
> to using Deployment and not Job for JobManager?
>
> Thank you in advance.
> --
> Best regards,
> Sergey Belikov
>