Re: EnvironmentInformation class logs secrets passed as JVM/CLI arguments

2021-06-17 Thread Jose Vargas
Hi Arvid,

I see what you mean; no solution in Flink will be able to account for the
different variations in which applications may want to pass in parameters
or the external processes or events that introspect wherever the Flink
process happens to run. I do think there is an opportunity to prevent
logging secrets by focusing on a couple of areas. The reason I think we
should improve where we can is because logs can end up in systems that a
greater number of people have access to. For example, in a given
environment, perhaps only automated systems have the ability to deploy and
instropect the servers, but engineers across teams may have access to all
logs from that environment.

The areas where I think we can prevent logging secrets are:
1) Obfuscating JVM parameters
and
2) Apply the logic in ParameterTool's "fromArgs" method to parse out
arguments in the EnvironmentInformation class.

For example, one of the documented ways of passing in AWS credentials are
via JVM parameters,
https://docs.aws.amazon.com/sdk-for-java/v1/developer-guide/credentials.html
By leveraging ParameterTool's logic in the EnvironmentInformation class, we
can bridge the intent of the current code with how Flink's built-in
argument parser works.

On Thu, Jun 17, 2021 at 2:31 PM Arvid Heise  wrote:

> Hi Jose,
>
> Masking secrets is a recurring topic where ultimately you won't find a
> good solution. Your secret might for example appear in a crash dump or on
> some process monitoring application. To mask reliably you'd either need
> specific application knowledge (every user supplies arguments differently)
> or disable logging of parameters completely.
>
> Frankly speaking, I have never seen passwords being passed over CLI being
> really secure. The industry practice is to either use a sidecar approach or
> fetch secrets file-based (e.g., docker mounts). Even using ENV is
> discouraged.
>
> On Wed, Jun 16, 2021 at 11:28 PM Jose Vargas 
> wrote:
>
>> Hi,
>>
>> I am using Flink 1.13.1 and I noticed that the logs coming from the
>> EnvironmentInformation class,
>> https://github.com/apache/flink/blob/release-1.13.1/flink-runtime/src/main/java/org/apache/flink/runtime/util/EnvironmentInformation.java#L444-L467,
>> log the value of secrets that are passed in as JVM and CLI arguments. For
>> the JVM arguments, both the secret key and value are logged. For the CLI
>> arguments, the secret key is obfuscated, but the actual value of the secret
>> is not. This also affects Flink 1.12.
>>
>> For example, with CLI arguments like "--my-password VALUE_TO_HIDE", the
>> jobmanager will log the following (assuming cluster is in application mode)
>>
>> jobmanager | ** (sensitive information)
>> jobmanager | VALUE_TO_HIDE
>>
>> The key is obfuscated but the actual value isn't. This means that secret
>> values can end up in central logging systems. Passing in the CLI argument
>> as "--my-password*=*VALUE_TO_HIDE" hides the entire string but makes the
>> value unusable and is different from how the docs mentions job arguments
>> should be passed in [1].
>>
>> I saw that there was a ticket to obfuscate secrets [2], but that seems to
>> only apply to the UI, not for the configuration logs. Turning off, or
>> otherwise disabling logs from the appropriate logger is one solution, but
>> it seems to me that the logger that a user would need to turn off is
>> dependent on how the Flink cluster is running (standalone, k8s, yarn,
>> mesos, etc). Furthermore, it can be useful to see these configuration logs.
>>
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/application_parameters/#from-the-command-line-arguments
>> [2] https://issues.apache.org/jira/browse/FLINK-14047
>>
>> Thanks,
>> --
>>
>> Jose Vargas
>>
>> Software Engineer, Data Engineering
>>
>> E: jose.var...@fiscalnote.com
>>
>> fiscalnote.com <https://www.fiscalnote.com>  |  info.cq.com
>> <http://www.info.cq.com>  | rollcall.com <https://www.rollcall.com>
>>
>>

-- 

Jose Vargas

Software Engineer, Data Engineering

E: jose.var...@fiscalnote.com

fiscalnote.com <https://www.fiscalnote.com>  |  info.cq.com
<http://www.info.cq.com>  | rollcall.com <https://www.rollcall.com>


EnvironmentInformation class logs secrets passed as JVM/CLI arguments

2021-06-16 Thread Jose Vargas
Hi,

I am using Flink 1.13.1 and I noticed that the logs coming from the
EnvironmentInformation class,
https://github.com/apache/flink/blob/release-1.13.1/flink-runtime/src/main/java/org/apache/flink/runtime/util/EnvironmentInformation.java#L444-L467,
log the value of secrets that are passed in as JVM and CLI arguments. For
the JVM arguments, both the secret key and value are logged. For the CLI
arguments, the secret key is obfuscated, but the actual value of the secret
is not. This also affects Flink 1.12.

For example, with CLI arguments like "--my-password VALUE_TO_HIDE", the
jobmanager will log the following (assuming cluster is in application mode)

jobmanager | ** (sensitive information)
jobmanager | VALUE_TO_HIDE

The key is obfuscated but the actual value isn't. This means that secret
values can end up in central logging systems. Passing in the CLI argument
as "--my-password*=*VALUE_TO_HIDE" hides the entire string but makes the
value unusable and is different from how the docs mentions job arguments
should be passed in [1].

I saw that there was a ticket to obfuscate secrets [2], but that seems to
only apply to the UI, not for the configuration logs. Turning off, or
otherwise disabling logs from the appropriate logger is one solution, but
it seems to me that the logger that a user would need to turn off is
dependent on how the Flink cluster is running (standalone, k8s, yarn,
mesos, etc). Furthermore, it can be useful to see these configuration logs.


[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/application_parameters/#from-the-command-line-arguments
[2] https://issues.apache.org/jira/browse/FLINK-14047

Thanks,
-- 

Jose Vargas

Software Engineer, Data Engineering

E: jose.var...@fiscalnote.com

fiscalnote.com <https://www.fiscalnote.com>  |  info.cq.com
<http://www.info.cq.com>  | rollcall.com <https://www.rollcall.com>


Re: RabbitMQ source does not stop unless message arrives in queue

2021-06-01 Thread Jose Vargas
Hi all,

Apologies for not following up sooner. Thank you Austin for creating
FLINK-22698. It seems that the issue is well understood and a fix is
currently under development/review. Please let me know if there is anything
additional that I can do. I look forward to testing out a new version of
Flink that includes this fix.

Thanks again,
Jose

On Tue, May 18, 2021 at 4:38 PM Austin Cawley-Edwards <
austin.caw...@gmail.com> wrote:

> Hey all,
>
> Thanks for the details, John! Hmm, that doesn't look too good either 
> but probably a different issue with the RMQ source/ sink. Hopefully, the
> new FLIP-27 sources will help you guys out there! The upcoming HybridSource
> in FLIP-150 [1] might also be interesting to you in finely controlling
> sources.
>
> @Jose Vargas  I've created FLINK-22698 [2] to
> track your issue. Do you have a small reproducible case/ GitHub repo? Also,
> would you be able to provide a little bit more about the Flink job that you
> see this issue in? i.e. overall parallelism, the parallelism of the
> sources/ sinks, checkpointing mode.
>
> Best,
> Austin
>
> [1]:
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-150%3A+Introduce+Hybrid+Source
> [2]: https://issues.apache.org/jira/browse/FLINK-22698
>
> On Thu, May 13, 2021 at 9:25 PM John Morrow 
> wrote:
>
>> Hi Jose, hey Austin!!
>>
>> I know we were just recently looking at trying to consume a fixed number
>> of messages from an RMQ source, process them and output them to an RMQ
>> sink. As a naive first attempt at stopping the job when the target number
>> of messaged had been processed, we put a counter state in the process
>> function and tried throwing an exception when the counter >= the target
>> message count.
>>
>> The job had:
>>
>>- parallelism: 1
>>- checkpointing: 1000 (1 sec)
>>- restartStrategy: noRestart
>>- prefetchCount: 100
>>
>> Running it with 150 messages in the input queue and 150 also as the
>> target number, at the end the queues had:
>>
>>- output queue - 150
>>- input queue - 50
>>
>> So it looks like it did transfer all the messages, but some unack'd ones
>> also got requeued back at the source so end up as duplicates. I know
>> throwing an exception in the Flink job is not the same as triggering a
>> stateful shutdown, but it might be hitting similar unack issues.
>>
>> John
>>
>> --
>> *From:* Austin Cawley-Edwards 
>> *Sent:* Thursday 13 May 2021 16:49
>> *To:* Jose Vargas ; John Morrow <
>> johnniemor...@hotmail.com>
>> *Cc:* user 
>> *Subject:* Re: RabbitMQ source does not stop unless message arrives in
>> queue
>>
>> Hey Jose,
>>
>> Thanks for bringing this up – it indeed sounds like a bug. There is
>> ongoing work to update the RMQ source to the new interface, which might
>> address some of these issues (or should, if it is not already), tracked in
>> FLINK-20628[1]. Would you be able to create a JIRA issue for this, or would
>> you like me to?
>>
>> At my previous company, we only consumed one Rabbit queue per
>> application, so we didn't run into this exactly but did see other weird
>> behavior in the RMQ source that could be related. I'm going to cc @John
>> Morrow  who might be able to contribute to
>> what he's seen working with the source, if he's around. I remember some
>> messages not properly being ack'ed during a stateful shutdown via the
>> Ververica Platform's stop-with-savepoint functionality that you mention,
>> though that might be more related to FLINK-20244[2], perhaps.
>>
>>
>> Best,
>> Austin
>>
>> [1]: https://issues.apache.org/jira/browse/FLINK-20628
>> [2]: https://issues.apache.org/jira/browse/FLINK-20244
>>
>> On Thu, May 13, 2021 at 10:23 AM Jose Vargas 
>> wrote:
>>
>> Hi,
>>
>> I am using Flink 1.12 to read from and write to a RabbitMQ cluster.
>> Flink's RabbitMQ source has some surprising behavior when a
>> stop-with-savepoint request is made.
>>
>> *Expected Behavior:*
>> The stop-with-savepoint request stops the job with a FINISHED state.
>>
>> *Actual Behavior:*
>> The stop-with-savepoint request either times out or hangs indefinitely
>> unless a message arrives in all the queues that the job consumes from after
>> the stop-with-savepoint request is made.
>>
>>
>> I know that one possible workaround is to send a sentinel value to each
>> of the queues consumed by the job that the deserialization schema checks in
>> its isEnd

RabbitMQ source does not stop unless message arrives in queue

2021-05-13 Thread Jose Vargas
Hi,

I am using Flink 1.12 to read from and write to a RabbitMQ cluster. Flink's
RabbitMQ source has some surprising behavior when a stop-with-savepoint
request is made.

*Expected Behavior:*
The stop-with-savepoint request stops the job with a FINISHED state.

*Actual Behavior:*
The stop-with-savepoint request either times out or hangs indefinitely
unless a message arrives in all the queues that the job consumes from after
the stop-with-savepoint request is made.


I know that one possible workaround is to send a sentinel value to each of
the queues consumed by the job that the deserialization schema checks in
its isEndOfStream method. However, this is somewhat cumbersome and
complicates the continuous delivery of a Flink job. For example,
Ververica Platform will trigger a stop-with-savepoint for the user if one
of many possible Flink configurations for a job are changed. The
stop-with-savepoint can then hang indefinitely because only some of the
RabbitMQ sources will have reached a FINISHED state.

I have attached the TaskManager thread dump after the save-with-savepoint
request was made. Most every thread is either sleeping or waiting around
for locks to be released, and then there are a handful of threads trying to
read data from a socket via the com.rabbitmq.client.impl.Frame.readFrom
method.

Ideally, once a stop-with-savepoint request is made, the threads trying to
read data from RabbitMQ would be interrupted so that all RabbitMQ sources
would reach a FINISHED state.

Regular checkpoints and savepoints complete successfully, it is only the
stop-with-savepoint request where I see this behavior.


Respectfully,


Jose Vargas

Software Engineer, Data Engineering

E: jose.var...@fiscalnote.com

fiscalnote.com <https://www.fiscalnote.com>  |  info.cq.com
<http://www.info.cq.com>  | rollcall.com <https://www.rollcall.com>


taskmanager_thread_dump.json
Description: application/json