Re: Can checkpoints be used to migrate jobs between Flink versions ?

2019-01-09 Thread Edward Alexander Rojas Clavijo
Thanks very much for you rapid answer Stefan.

Regards,
Edward

El mié., 9 ene. 2019 a las 15:26, Stefan Richter ()
escribió:

> Hi,
>
> I would assume that this should currently work because the format of basic
> savepoints and checkpoints is the same right now. The restriction in the
> doc is probably there in case that the checkpoint format will diverge more
> in the future.
>
> Best,
> Stefan
>
> > On 9. Jan 2019, at 13:12, Edward Rojas  wrote:
> >
> > Hello,
> >
> > For upgrading jobs between Flink versions I follow the guide in the doc
> > here:
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.7/ops/upgrading.html#upgrading-the-flink-framework-version
> >
> > It states that we should always use savepoints for this procedure, I
> > followed it and it works perfectly.
> >
> > I just would like to know if there is a reason why is not advised to use
> > checkpoints for this procedure.
> >
> > Say for example that the job has externalized checkpoints with
> > RETAIN_ON_CANCELLATION policy, one could cancel the job before the
> upgrade
> > and use the retained checkpoint to restart the job from it once the Flink
> > cluster is upgraded... or maybe I'm missing something ?
> >
> > I performed some tests and we are able to upgrade using checkpoint, by
> > passing the checkpoint path in the "flink run -s" parameter.
> >
> > Could you help to clarify if this is advised (and supported) or we should
> > stick to the use of savepoints for this kind of manipulations ?
> >
> >
> > Thanks in advance for your help.
> >
> > Edward
> >
> >
> >
> > --
> > Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>
>

-- 
*Edward Alexander Rojas Clavijo*



*Software EngineerHybrid CloudIBM France*


Re: How to migrate Kafka Producer ?

2018-12-19 Thread Edward Alexander Rojas Clavijo
Hi Dawid, Piotr,

I see that for the kafka consumer base there are some migration tests here:
https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java

As the kafka consumer state is managed on the FlinkKafkaConsumerBase class
I assumed this would cover also migration of connectors versions, but maybe
I'm missing something (?)

I performed some tests on my own and the migration of the kafka consumer
connector worked.


Regarding the kafka producer I am just updating the job with the new
connector and removing the previous one and upgrading the job by using a
savepoint and the --allowNonRestoredState.
So far my tests with this option are successful.

I appreciate any help here to clarify my understanding.

Regards,
Edward

El mié., 19 dic. 2018 a las 10:28, Dawid Wysakowicz ()
escribió:

> Hi Edward,
>
> AFAIK we do not support migrating state from one connector to another
> one, which is in fact the case for kafka 0.11 and the "universal" one.
>
> You might try to use the project bravo[1] to migrate the state manually,
> but unfortunately you have to understand the internals of both of the
> connectors. I pull also Piotr to the thread, maybe he can provide more
> straightforward workaround.
>
> Best,
>
> Dawid
>
> [1] https://github.com/king/bravo
>
> On 18/12/2018 14:33, Edward Rojas wrote:
> > Hi,
> >
> > I'm planning to migrate from kafka connector 0.11 to the new universal
> kafka
> > connector 1.0.0+ but I'm having some troubles.
> >
> > The kafka consumer seems to be compatible but when trying to migrate the
> > kafka producer I get an incompatibility error for the state migration.
> > It looks like the producer uses a list state of type
> > "NextTransactionalIdHint", but this class is specific for each Producer
> > (FlinkKafkaProducer011.NextTransactionalIdHint  vs
> > FlinkKafkaProducer.NextTransactionalIdHint) and therefore the states are
> not
> > compatible.
> >
> >
> > I would like to know what is the recommended way to perform this kind of
> > migration without losing the state ?
> >
> > Thanks in advance,
> > Edward
> >
> >
> >
> > --
> > Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>
>


Re: BucketingSink vs StreamingFileSink

2018-11-21 Thread Edward Alexander Rojas Clavijo
Thank you very much for the information Andrey.

I'll try on my side to do the migration of what we have now and try to add
the sink with Parquet and I'll be back to you if I have more questions :)

Edward

El vie., 16 nov. 2018 a las 19:54, Andrey Zagrebin (<
and...@data-artisans.com>) escribió:

> Hi,
>
> StreamingFileSink is supposed to subsume BucketingSink which will be
> deprecated.
>
> StreamingFileSink fixes some issues of BucketingSink, especially with AWS
> s3
> and adds more flexibility with defining the rolling policy.
>
> StreamingFileSink does not support older hadoop versions at the moment,
> but there are ideas how to resolve this.
>
> You can have a look how to use StreamingFileSink with Parquet here [1].
>
> I also cc’ed Kostas, he might add more to this topic.
>
> Best,
> Andrey
>
> [1]
> https://github.com/apache/flink/blob/0b4947b6142f813d2f1e0e662d0fefdecca0e382/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/avro/ParquetStreamingFileSinkITCase.java
>
> > On 16 Nov 2018, at 11:31, Edward Rojas  wrote:
> >
> > Hello,
> > We are currently using Flink 1.5 and we use the BucketingSink to save the
> > result of job processing to HDFS.
> > The data is in JSON format and we store one object per line in the
> resulting
> > files.
> >
> > We are planning to upgrade to Flink 1.6 and we see that there is this new
> > StreamingFileSink,  from the description it looks very similar to
> > BucketingSink when using Row-encoded Output Format, my question is,
> should
> > we consider to move to StreamingFileSink?
> >
> > I would like to better understand what are the suggested use cases for
> each
> > of the two options now (?)
> >
> > We are also considering to additionally output the data in Parquet format
> > for data scientists (to be stored in HDFS as well), for this I see some
> > utils to work with StreamingFileSink, so I guess for this case it's
> > recommended to use that option(?).
> > Is it possible to use the Parquet writers even when the schema of the
> data
> > may evolve ?
> >
> > Thanks in advance for your help.
> > (Sorry if I put too many questions in the same message)
> >
> >
> >
> > --
> > Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>
>

-- 
*Edward Alexander Rojas Clavijo*



*Software EngineerHybrid CloudIBM France*


Re: After OutOfMemoryError State can not be readed

2018-09-07 Thread Edward Alexander Rojas Clavijo
Hi Stefan, Vino,
Thanks for your answers.

We are using full checkpointing, not incremental. We are using custom
serializers for the operators state classes, The serializers perform
encryption before writing and decrypt when reading. The serializer is
stateless.
We register the Serializers by using
env.getConfig()
  .registerTypeWithKryoSerializer(ProcessState.class,
ProcessStateSerializer.class);

In normal cases the Serialization works correctly, even after recovering
from a failure. We get this error only when taskmnager fails due to memory
problems.

Thanks again for your help,
Edward

El vie., 7 sept. 2018 a las 11:51, Stefan Richter (<
s.rich...@data-artisans.com>) escribió:

> Hi,
>
> what I can say is that any failures like OOMs should not corrupt
> checkpoint files, because only successfully completed checkpoints are used
> for recovery by the job manager. Just to get a bit more info, are you using
> full or incremental checkpoints? Unfortunately, it is a bit hard to say
> from the given information what the cause of the problem is. Typically,
> these problems have been observed when something was wrong with a
> serializer or a stateful serializer was used from multiple threads.
>
> Best,
> Stefan
>
> Am 07.09.2018 um 05:04 schrieb vino yang :
>
> Hi Edward,
>
> From this log: Caused by: java.io.EOFException, it seems that the state
> metadata file has been corrupted.
> But I can't confirm it, maybe Stefan knows more details, Ping him for you.
>
> Thanks, vino.
>
> Edward Rojas  于2018年9月7日周五 上午1:22写道:
>
>> Hello all,
>>
>> We are running Flink 1.5.3 on Kubernetes with RocksDB as statebackend.
>> When performing some load testing we got an /OutOfMemoryError: native
>> memory
>> exhausted/, causing the job to fail and be restarted.
>>
>> After the Taskmanager is restarted, the job is recovered from a
>> Checkpoint,
>> but it seems that there is a problem when trying to access the state. We
>> got
>> the error from the *onTimer* function of a *onProcessingTime*.
>>
>> It would be possible that the OOM error could have caused to checkpoint a
>> corrupted state?
>>
>> We get Exceptions like:
>>
>> TimerException{java.lang.RuntimeException: Error while retrieving data
>> from
>> RocksDB.}
>> at
>>
>> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:288)
>> at
>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:522)
>> at java.util.concurrent.FutureTask.run(FutureTask.java:277)
>> at
>>
>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:191)
>> at
>>
>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
>> at
>>
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1160)
>> at
>>
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
>> at java.lang.Thread.run(Thread.java:811)
>> Caused by: java.lang.RuntimeException: Error while retrieving data from
>> RocksDB.
>> at
>>
>> org.apache.flink.contrib.streaming.state.RocksDBValueState.value(RocksDBValueState.java:89)
>> at com.xxx.ProcessFunction.*onTimer*(ProcessFunction.java:279)
>> at
>>
>> org.apache.flink.streaming.api.operators.KeyedProcessOperator.invokeUserFunction(KeyedProcessOperator.java:94)
>> at
>>
>> org.apache.flink.streaming.api.operators.KeyedProcessOperator.*onProcessingTime*(KeyedProcessOperator.java:78)
>> at
>>
>> org.apache.flink.streaming.api.operators.HeapInternalTimerService.*onProcessingTime*(HeapInternalTimerService.java:266)
>> at
>>
>> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:285)
>> ... 7 more
>> Caused by: java.io.EOFException
>> at java.io.DataInputStream.readFully(DataInputStream.java:208)
>> at java.io.DataInputStream.readUTF(DataInputStream.java:618)
>> at java.io.DataInputStream.readUTF(DataInputStream.java:573)
>> at
>>
>> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:381)
>> at
>>
>> org.apache.flink.contrib.streaming.state.RocksDBValueState.value(RocksDBValueState.java:87)
>> ... 12 more
>>
>>
>> Thanks in advance for any help
>>
>>
>>
>>
>> --
>> Sent from:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>
>
>

-- 
*Edward Alexander Rojas Clavijo*



*Software EngineerHybrid CloudIBM France*


Re: Dynamically deleting kafka topics does not remove partitions from kafkaConsumer

2018-05-07 Thread Edward Alexander Rojas Clavijo
Hello,
I've being working on a fix for this, I posted more details on the JIRA
ticket.

Regards,
Edward

2018-05-07 5:51 GMT+02:00 Tzu-Li (Gordon) Tai :

> Ah, correct, sorry for the incorrect link.
> Thanks Ted!
>
>
> On 7 May 2018 at 11:43:12 AM, Ted Yu (yuzhih...@gmail.com) wrote:
>
> It seems the correct JIRA should be FLINK-9303
> 
>
> On Sun, May 6, 2018 at 8:29 PM, Tzu-Li (Gordon) Tai 
> wrote:
>
>> Hi Edward,
>>
>> Thanks for brining this up, and I think your suggestion makes sense.
>> The problem is that the Kafka consumer has no notion of "closed"
>> partitions
>> at the moment, so statically assigned partitions to the Kafka client is
>> never removed and is always continuously requested for records.
>>
>> For example, on the Kinesis consumer, there is a notion of closed shards,
>> and therefore is not an issue there.
>>
>> I've created a JIRA to track this:
>> https://issues.apache.org/jira/browse/FLINK-5720
>>
>> Cheers,
>> Gordon
>>
>>
>>
>> --
>> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.
>> nabble.com/
>>
>
>


Re: Regression: On Flink 1.5 CLI -m,--jobmanager option not working

2018-04-27 Thread Edward Alexander Rojas Clavijo
Thank you

2018-04-27 14:55 GMT+02:00 Chesnay Schepler :

> I've responded in the JIRA.
>
>
> On 27.04.2018 14:26, Edward Rojas wrote:
>
>> I'm preparing to migrate my environment from Flink 1.4 to 1.5 and I found
>> this issue.
>>
>> Every time I try to use the flink CLI with the -m option to specify the
>> jobmanager address, the CLI get stuck on "Waiting for response..." and  I
>> get the following error on the Jobmanager:
>>
>> WARN  akka.remote.transport.netty.NettyTransport-
>> Remote
>> connection to [/x.x.x.x:] failed with
>> org.apache.flink.shaded.akka.org.jboss.netty.handler.codec.f
>> rame.TooLongFrameException:
>> Adjusted frame length exceeds 10485760: 1195725860 - discarded
>>
>> I get the error even when I run it locally and try something like "flink
>> list -m localhost:6123". But "flink list" works as expected.
>>
>> I'm using the version from the "release-1.5" branch.
>>
>> I tested on the tag release 1.5.0-rc1 and it's working as expected.
>>
>> I did a /git bisect/ and it seems like the commit introducing the
>> regression
>> is  47909f4
>> <https://github.com/apache/flink/commit/47909f466b9c9ee1f4ca
>> f94e9f6862a21b628817>
>>
>> I created a JIRA ticket for this:
>> https://issues.apache.org/jira/browse/FLINK-9255.
>>
>> Do you have any thoughts about it ?
>>
>> Regards,
>> Edward
>>
>>
>>
>> --
>> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.
>> nabble.com/
>>
>>
>


-- 
*Edward Alexander Rojas Clavijo*



*Software EngineerHybrid CloudIBM France*


Re: SSL config on Kubernetes - Dynamic IP

2018-03-29 Thread Edward Alexander Rojas Clavijo
Hi all,

I did some tests based on the PR Christophe mentioned above and by making a
change on the NettyClient to use CanonicalHostName instead of
HostNameAddress to identify the server, the SSL validation works!!

I created a PR with this change: https://github.com/apache/flink/pull/5789

Regards,
Edward

2018-03-28 17:22 GMT+02:00 Edward Alexander Rojas Clavijo <
edward.roja...@gmail.com>:

> Hi Till,
>
> I just created the JIRA ticket: https://issues.apache.org/
> jira/browse/FLINK-9103
>
> I added the JobManager and TaskManager logs, Hope this helps to resolve
> the issue.
>
> Regards,
> Edward
>
> 2018-03-27 17:48 GMT+02:00 Till Rohrmann :
>
>> Hi Edward,
>>
>> could you please file a JIRA issue for this problem. It might be as
>> simple as that the TaskManager's network stack uses the IP instead of the
>> hostname as you suggested. But we have to look into this to be sure. Also
>> the logs of the JobManager as well as the TaskManagers could be helpful.
>>
>> Cheers,
>> Till
>>
>> On Tue, Mar 27, 2018 at 5:17 PM, Christophe Jolif 
>> wrote:
>>
>>>
>>> I suspect this relates to: https://issues.apache.org/
>>> jira/browse/FLINK-5030
>>>
>>> For which there was a PR at some point but nothing has been done so far.
>>> It seems the current code explicitly uses the IP vs Hostname for Netty SSL
>>> configuration.
>>>
>>> Without that I'm really wondering how people are reasonably using SSL on
>>> a Kubernetes Flink-based cluster as every time a pod is (re-started) it can
>>> theoretically take a different IP? Or do I miss something?
>>>
>>> --
>>> Christophe
>>>
>>> On Tue, Mar 27, 2018 at 3:24 PM, Edward Alexander Rojas Clavijo <
>>> edward.roja...@gmail.com> wrote:
>>>
>>>> Hi all,
>>>>
>>>> Currently I have a Flink 1.4 cluster running on kubernetes and with SSL
>>>> configuration based on https://ci.apache.org/proje
>>>> cts/flink/flink-docs-master/ops/security-ssl.html.
>>>>
>>>> However, as the IP of the nodes are dynamic (from the nature of
>>>> kubernetes), we are using only the DNS which we can control using
>>>> kubernetes services. So we add to the Subject Alternative Name(SAN) the
>>>> flink-jobmanager DNS and also the DNS for the task managers
>>>> *.flink-taskmanager-svc (each task manager has a DNS in the form
>>>> flink-taskmanager-0.flink-taskmanager-svc).
>>>>
>>>> Additionally we set the jobmanager.rpc.address property on all the
>>>> nodes and each task manager sets the taskmanager.host property, all
>>>> matching the ones on the certificate.
>>>>
>>>> This is working well when using Job with Parallelism set to 1. The SSL
>>>> validations are good and the Jobmanager can communicate with Task manager
>>>> and vice versa.
>>>>
>>>> But when we set the parallelism to more than 1 we have exceptions on
>>>> the SSL validation like this:
>>>>
>>>> Caused by: java.security.cert.CertificateException: No subject
>>>> alternative names matching IP address 172.30.247.163 found
>>>> at sun.security.util.HostnameChecker.matchIP(HostnameChecker.java:168)
>>>> at sun.security.util.HostnameChecker.match(HostnameChecker.java:94)
>>>> at sun.security.ssl.X509TrustManagerImpl.checkIdentity(X509Trus
>>>> tManagerImpl.java:455)
>>>> at sun.security.ssl.X509TrustManagerImpl.checkIdentity(X509Trus
>>>> tManagerImpl.java:436)
>>>> at sun.security.ssl.X509TrustManagerImpl.checkTrusted(X509Trust
>>>> ManagerImpl.java:252)
>>>> at sun.security.ssl.X509TrustManagerImpl.checkServerTrusted(X50
>>>> 9TrustManagerImpl.java:136)
>>>> at sun.security.ssl.ClientHandshaker.serverCertificate(ClientHa
>>>> ndshaker.java:1601)
>>>> ... 21 more
>>>>
>>>>
>>>> From the logs I see the Jobmanager is correctly registering the
>>>> taskmanagers:
>>>>
>>>> org.apache.flink.runtime.instance.InstanceManager   - Registered
>>>> TaskManager at flink-taskmanager-1 (akka.ssl.tcp://flink@taiga-fl
>>>> ink-taskmanager-1.flink-taskmanager-svc.default.svc.cluster.local:6122/user/taskmanager)
>>>> as 1a3f59693cec8b3929ed8898edcc2700. Current number of registered
>>>> hosts is 3. Current number of alive task slots is 6.
>>>>
>>>> And also each taskma

Re: SSL config on Kubernetes - Dynamic IP

2018-03-28 Thread Edward Alexander Rojas Clavijo
Hi Till,

I just created the JIRA ticket:
https://issues.apache.org/jira/browse/FLINK-9103

I added the JobManager and TaskManager logs, Hope this helps to resolve the
issue.

Regards,
Edward

2018-03-27 17:48 GMT+02:00 Till Rohrmann :

> Hi Edward,
>
> could you please file a JIRA issue for this problem. It might be as simple
> as that the TaskManager's network stack uses the IP instead of the hostname
> as you suggested. But we have to look into this to be sure. Also the logs
> of the JobManager as well as the TaskManagers could be helpful.
>
> Cheers,
> Till
>
> On Tue, Mar 27, 2018 at 5:17 PM, Christophe Jolif 
> wrote:
>
>>
>> I suspect this relates to: https://issues.apache.org/
>> jira/browse/FLINK-5030
>>
>> For which there was a PR at some point but nothing has been done so far.
>> It seems the current code explicitly uses the IP vs Hostname for Netty SSL
>> configuration.
>>
>> Without that I'm really wondering how people are reasonably using SSL on
>> a Kubernetes Flink-based cluster as every time a pod is (re-started) it can
>> theoretically take a different IP? Or do I miss something?
>>
>> --
>> Christophe
>>
>> On Tue, Mar 27, 2018 at 3:24 PM, Edward Alexander Rojas Clavijo <
>> edward.roja...@gmail.com> wrote:
>>
>>> Hi all,
>>>
>>> Currently I have a Flink 1.4 cluster running on kubernetes and with SSL
>>> configuration based on https://ci.apache.org/proje
>>> cts/flink/flink-docs-master/ops/security-ssl.html.
>>>
>>> However, as the IP of the nodes are dynamic (from the nature of
>>> kubernetes), we are using only the DNS which we can control using
>>> kubernetes services. So we add to the Subject Alternative Name(SAN) the
>>> flink-jobmanager DNS and also the DNS for the task managers
>>> *.flink-taskmanager-svc (each task manager has a DNS in the form
>>> flink-taskmanager-0.flink-taskmanager-svc).
>>>
>>> Additionally we set the jobmanager.rpc.address property on all the nodes
>>> and each task manager sets the taskmanager.host property, all matching the
>>> ones on the certificate.
>>>
>>> This is working well when using Job with Parallelism set to 1. The SSL
>>> validations are good and the Jobmanager can communicate with Task manager
>>> and vice versa.
>>>
>>> But when we set the parallelism to more than 1 we have exceptions on the
>>> SSL validation like this:
>>>
>>> Caused by: java.security.cert.CertificateException: No subject
>>> alternative names matching IP address 172.30.247.163 found
>>> at sun.security.util.HostnameChecker.matchIP(HostnameChecker.java:168)
>>> at sun.security.util.HostnameChecker.match(HostnameChecker.java:94)
>>> at sun.security.ssl.X509TrustManagerImpl.checkIdentity(X509Trus
>>> tManagerImpl.java:455)
>>> at sun.security.ssl.X509TrustManagerImpl.checkIdentity(X509Trus
>>> tManagerImpl.java:436)
>>> at sun.security.ssl.X509TrustManagerImpl.checkTrusted(X509Trust
>>> ManagerImpl.java:252)
>>> at sun.security.ssl.X509TrustManagerImpl.checkServerTrusted(X50
>>> 9TrustManagerImpl.java:136)
>>> at sun.security.ssl.ClientHandshaker.serverCertificate(ClientHa
>>> ndshaker.java:1601)
>>> ... 21 more
>>>
>>>
>>> From the logs I see the Jobmanager is correctly registering the
>>> taskmanagers:
>>>
>>> org.apache.flink.runtime.instance.InstanceManager   - Registered
>>> TaskManager at flink-taskmanager-1 (akka.ssl.tcp://flink@taiga-fl
>>> ink-taskmanager-1.flink-taskmanager-svc.default.svc.cluster.local:6122/user/taskmanager)
>>> as 1a3f59693cec8b3929ed8898edcc2700. Current number of registered hosts
>>> is 3. Current number of alive task slots is 6.
>>>
>>> And also each taskmanager is correctly registered to use the hostname
>>> for communication:
>>>
>>> org.apache.flink.runtime.taskmanager.TaskManager   - TaskManager will
>>> use hostname/address 'flink-taskmanager-1.flink-tas
>>> kmanager-svc.default.svc.cluster.local' (172.30.247.163) for
>>> communication.
>>> ...
>>> akka.remote.Remoting   - Remoting started; listening on addresses
>>> :[akka.ssl.tcp://flink@flink-taskmanager-1.flink-taskmanager
>>> -svc.default.svc.cluster.local:6122]
>>> ...
>>> org.apache.flink.runtime.io.network.netty.NettyConfig   - NettyConfig
>>> [server address: flink-taskmanager-1.flink-task
>>> manager-svc.default.svc.cluster.local

SSL config on Kubernetes - Dynamic IP

2018-03-27 Thread Edward Alexander Rojas Clavijo
Hi all,

Currently I have a Flink 1.4 cluster running on kubernetes and with SSL
configuration based on https://ci.apache.org/projects/flink/flink-docs-
master/ops/security-ssl.html.

However, as the IP of the nodes are dynamic (from the nature of
kubernetes), we are using only the DNS which we can control using
kubernetes services. So we add to the Subject Alternative Name(SAN) the
flink-jobmanager DNS and also the DNS for the task managers
*.flink-taskmanager-svc (each task manager has a DNS in the form
flink-taskmanager-0.flink-taskmanager-svc).

Additionally we set the jobmanager.rpc.address property on all the nodes
and each task manager sets the taskmanager.host property, all matching the
ones on the certificate.

This is working well when using Job with Parallelism set to 1. The SSL
validations are good and the Jobmanager can communicate with Task manager
and vice versa.

But when we set the parallelism to more than 1 we have exceptions on the
SSL validation like this:

Caused by: java.security.cert.CertificateException: No subject alternative
names matching IP address 172.30.247.163 found
at sun.security.util.HostnameChecker.matchIP(HostnameChecker.java:168)
at sun.security.util.HostnameChecker.match(HostnameChecker.java:94)
at sun.security.ssl.X509TrustManagerImpl.checkIdentity(
X509TrustManagerImpl.java:455)
at sun.security.ssl.X509TrustManagerImpl.checkIdentity(
X509TrustManagerImpl.java:436)
at sun.security.ssl.X509TrustManagerImpl.checkTrusted(
X509TrustManagerImpl.java:252)
at sun.security.ssl.X509TrustManagerImpl.checkServerTrusted(
X509TrustManagerImpl.java:136)
at sun.security.ssl.ClientHandshaker.serverCertificate(
ClientHandshaker.java:1601)
... 21 more


>From the logs I see the Jobmanager is correctly registering the
taskmanagers:

org.apache.flink.runtime.instance.InstanceManager   - Registered
TaskManager at flink-taskmanager-1 (akka.ssl.tcp://flink@taiga-
flink-taskmanager-1.flink-taskmanager-svc.default.svc.
cluster.local:6122/user/taskmanager) as 1a3f59693cec8b3929ed8898edcc2700.
Current number of registered hosts is 3. Current number of alive task slots
is 6.

And also each taskmanager is correctly registered to use the hostname for
communication:

org.apache.flink.runtime.taskmanager.TaskManager   - TaskManager will use
hostname/address
'flink-taskmanager-1.flink-taskmanager-svc.default.svc.cluster.local'
(172.30.247.163) for communication.
...
akka.remote.Remoting   - Remoting started; listening on addresses
:[akka.ssl.tcp://flink@flink-taskmanager-1.flink-
taskmanager-svc.default.svc.cluster.local:6122]
...
org.apache.flink.runtime.io.network.netty.NettyConfig   - NettyConfig
[server address: flink-taskmanager-1.flink-taskmanager-svc.default.svc.
cluster.local/172.30.247.163, server port: 6121, ssl enabled: true, memory
segment size (bytes): 32768, transport type: NIO, number of server threads:
2 (manual), number of client threads: 2 (manual), server connect backlog: 0
(use Netty's default), client connect timeout (sec): 120, send/receive
buffer size (bytes): 0 (use Netty's default)]
...
org.apache.flink.runtime.taskmanager.TaskManager   - TaskManager data
connection information: bf4a9b50e57c99c17049adb66d65f685 @
flink-taskmanager-1.flink-taskmanager-svc.default.svc.cluster.local
(dataPort=6121)



But even with that, it seems like the taskmanagers are using the IP
communicate between them and the SSL validation fails.

Do you know if it's possible to make the taskmanagers to use the hostname
to communicate instead of the IP ?
or
Do you have any advice to get the SSL configuration to work on this
environment ?

Thanks in advance.

Regards,
Edward