Re: Schema Evolution & Json Schemas

2024-02-25 Thread Andrew Otto
>  the following code generator
Oh, and FWIW we avoid code generation and POJOs, and instead rely on
Flink's Row or RowData abstractions.





On Sun, Feb 25, 2024 at 10:35 AM Andrew Otto  wrote:

> Hi!
>
> I'm not sure if this totally is relevant for you, but we use JSONSchema
> and JSON with Flink at the Wikimedia Foundation.
> We explicitly disallow the use of additionalProperties
> <https://wikitech.wikimedia.org/wiki/Event_Platform/Schemas/Guidelines#No_object_additionalProperties>,
> unless it is to define Map type fields
> <https://wikitech.wikimedia.org/wiki/Event_Platform/Schemas/Guidelines#map_types>
> (where additionalProperties itself is a schema).
>
> We have JSONSchema converters and JSON Serdes to be able to use our
> JSONSchemas and JSON records with both the DataStream API (as Row) and
> Table API (as RowData).
>
> See:
> -
> https://gerrit.wikimedia.org/r/plugins/gitiles/wikimedia-event-utilities/+/refs/heads/master/eventutilities-flink/src/main/java/org/wikimedia/eventutilities/flink/formats/json
> -
> https://gerrit.wikimedia.org/r/plugins/gitiles/wikimedia-event-utilities/+/refs/heads/master/eventutilities-flink/#managing-a-object
>
> State schema evolution is supported via the EventRowTypeInfo wrapper
> <https://gerrit.wikimedia.org/r/plugins/gitiles/wikimedia-event-utilities/+/refs/heads/master/eventutilities-flink/src/main/java/org/wikimedia/eventutilities/flink/EventRowTypeInfo.java#42>
> .
>
> Less directly about Flink: I gave a talk at Confluent's Current conf in
> 2022 about why we use JSONSchema
> <https://www.confluent.io/events/current-2022/wikipedias-event-data-platform-or-json-is-okay-too/>.
> See also this blog post series if you are interested
> <https://techblog.wikimedia.org/2020/09/10/wikimedias-event-data-platform-or-json-is-ok-too/>
> !
>
> -Andrew Otto
>  Wikimedia Foundation
>
>
> On Fri, Feb 23, 2024 at 1:58 AM Salva Alcántara 
> wrote:
>
>> I'm facing some issues related to schema evolution in combination with
>> the usage of Json Schemas and I was just wondering whether there are any
>> recommended best practices.
>>
>> In particular, I'm using the following code generator:
>>
>> - https://github.com/joelittlejohn/jsonschema2pojo
>>
>> Main gotchas so far relate to the `additionalProperties` field. When
>> setting that to true, the resulting POJO is not valid according to Flink
>> rules because the generated getter/setter methods don't follow the java
>> beans naming conventions, e.g., see here:
>>
>> - https://github.com/joelittlejohn/jsonschema2pojo/issues/1589
>>
>> This means that the Kryo fallback is used for serialization purposes,
>> which is not only bad for performance but also breaks state schema
>> evolution.
>>
>> So, because of that, setting `additionalProperties` to `false` looks like
>> a good idea but then your job will break if an upstream/producer service
>> adds a property to the messages you are reading. To solve this problem, the
>> POJOs for your job (as a reader) can be generated to ignore the
>> `additionalProperties` field (via the `@JsonIgnore` Jackson annotation).
>> This seems to be a good overall solution to the problem, but looks a bit
>> convoluted to me / didn't come without some trial & error (= pain &
>> frustration).
>>
>> Is there anyone here facing similar issues? It would be good to hear your
>> thoughts on this!
>>
>> BTW, this is very interesting article that touches on the above mentioned
>> difficulties:
>> -
>> https://www.creekservice.org/articles/2024/01/09/json-schema-evolution-part-2.html
>>
>>
>>


Re: Schema Evolution & Json Schemas

2024-02-25 Thread Andrew Otto
Hi!

I'm not sure if this totally is relevant for you, but we use JSONSchema and
JSON with Flink at the Wikimedia Foundation.
We explicitly disallow the use of additionalProperties
<https://wikitech.wikimedia.org/wiki/Event_Platform/Schemas/Guidelines#No_object_additionalProperties>,
unless it is to define Map type fields
<https://wikitech.wikimedia.org/wiki/Event_Platform/Schemas/Guidelines#map_types>
(where additionalProperties itself is a schema).

We have JSONSchema converters and JSON Serdes to be able to use our
JSONSchemas and JSON records with both the DataStream API (as Row) and
Table API (as RowData).

See:
-
https://gerrit.wikimedia.org/r/plugins/gitiles/wikimedia-event-utilities/+/refs/heads/master/eventutilities-flink/src/main/java/org/wikimedia/eventutilities/flink/formats/json
-
https://gerrit.wikimedia.org/r/plugins/gitiles/wikimedia-event-utilities/+/refs/heads/master/eventutilities-flink/#managing-a-object

State schema evolution is supported via the EventRowTypeInfo wrapper
<https://gerrit.wikimedia.org/r/plugins/gitiles/wikimedia-event-utilities/+/refs/heads/master/eventutilities-flink/src/main/java/org/wikimedia/eventutilities/flink/EventRowTypeInfo.java#42>
.

Less directly about Flink: I gave a talk at Confluent's Current conf in
2022 about why we use JSONSchema
<https://www.confluent.io/events/current-2022/wikipedias-event-data-platform-or-json-is-okay-too/>.
See also this blog post series if you are interested
<https://techblog.wikimedia.org/2020/09/10/wikimedias-event-data-platform-or-json-is-ok-too/>
!

-Andrew Otto
 Wikimedia Foundation


On Fri, Feb 23, 2024 at 1:58 AM Salva Alcántara 
wrote:

> I'm facing some issues related to schema evolution in combination with the
> usage of Json Schemas and I was just wondering whether there are any
> recommended best practices.
>
> In particular, I'm using the following code generator:
>
> - https://github.com/joelittlejohn/jsonschema2pojo
>
> Main gotchas so far relate to the `additionalProperties` field. When
> setting that to true, the resulting POJO is not valid according to Flink
> rules because the generated getter/setter methods don't follow the java
> beans naming conventions, e.g., see here:
>
> - https://github.com/joelittlejohn/jsonschema2pojo/issues/1589
>
> This means that the Kryo fallback is used for serialization purposes,
> which is not only bad for performance but also breaks state schema
> evolution.
>
> So, because of that, setting `additionalProperties` to `false` looks like
> a good idea but then your job will break if an upstream/producer service
> adds a property to the messages you are reading. To solve this problem, the
> POJOs for your job (as a reader) can be generated to ignore the
> `additionalProperties` field (via the `@JsonIgnore` Jackson annotation).
> This seems to be a good overall solution to the problem, but looks a bit
> convoluted to me / didn't come without some trial & error (= pain &
> frustration).
>
> Is there anyone here facing similar issues? It would be good to hear your
> thoughts on this!
>
> BTW, this is very interesting article that touches on the above mentioned
> difficulties:
> -
> https://www.creekservice.org/articles/2024/01/09/json-schema-evolution-part-2.html
>
>
>


Re: Handling Schema Variability and Applying Regex Patterns in Flink Job Configuration

2023-11-06 Thread Andrew Otto
> unpredictable file schema(Table API)  in the source directory

You'll probably have to write some logic that helps predict the schema :)

Are there actual schemas for the CSV files somewhere?  JSONSchema or
something of the like?At Wikimedia we use JSONSchema (not with CSV
data, but it could work), and have code that can convert from JSONSchema

to Flink Schemas, either TypeInformation or Table API DataType


Here's an example

in code docs for use with Kafka.  You could use this to build read CSV
files instead?  Something like:

TableDescriptor.forConnector("filesystem")
.schema(JsonSchemaFlinkConverter.toSchemaBuilder(jsonSchema).build())
...

If you are doing pure SQL (not table api), you'll need to have something
that translates from your schema to SQL...or start implementing a custom
Catalog
,
which uh, we kind of did
,
but it was not easy.









On Mon, Nov 6, 2023 at 1:30 PM arjun s  wrote:

> Thanks for your response.
> How should we address the issue of dealing with the unpredictable file
> schema(Table API)  in the source directory, as I previously mentioned in my
> email?
>
> Thanks and regards,
> Arjun
>
> On Mon, 6 Nov 2023 at 20:56, Chen Yu  wrote:
>
>> Hi Arjun,
>>
>> If you can filter files by a regex pattern, I think the config
>> `source.path.regex-pattern`[1] maybe what you want.
>>
>>   'source.path.regex-pattern' = '...',  -- optional: regex pattern to filter 
>> files to read under the -- directory 
>> of `path` option. This regex pattern should be   
>>  -- matched with the absolute file path. If this option is set,  
>>   -- the connector  will recursive all files 
>> under the directory-- of `path` 
>> option
>>
>>
>> Best,
>> Yu Chen
>>
>>
>> [1]
>> https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/connectors/table/filesystem/
>>
>> --
>> *发件人:* arjun s 
>> *发送时间:* 2023年11月6日 20:50
>> *收件人:* user@flink.apache.org 
>> *主题:* Handling Schema Variability and Applying Regex Patterns in Flink
>> Job Configuration
>>
>> Hi team,
>> I'm currently utilizing the Table API function within my Flink job, with
>> the objective of reading records from CSV files located in a source
>> directory. To obtain the file names, I'm creating a table and specifying
>> the schema using the Table API in Flink. Consequently, when the schema
>> matches, my Flink job successfully submits and executes as intended.
>> However, in cases where the schema does not match, the job fails to submit.
>> Given that the schema of the files in the source directory is
>> unpredictable, I'm seeking a method to handle this situation.
>> Create table query
>> =
>> CREATE TABLE sample (col1 STRING,col2 STRING,col3 STRING,col4
>> STRING,file.path` STRING NOT NULL METADATA) WITH ('connector' =
>> 'filesystem','path' = 'file:///home/techuser/inputdata','format' =
>> 'csv','source.monitor-interval' = '1')
>> =
>>
>> Furthermore, I have a question about whether there's a way to read files
>> from the source directory based on a specific regex pattern. This is
>> relevant in our situation because only file names that match a particular
>> pattern need to be processed by the Flink job.
>>
>> Thanks and Regards,
>> Arjun
>>
>


Re: Enhancing File Processing and Kafka Integration with Flink Jobs

2023-10-28 Thread Andrew Otto
> This is not a robust solution, I would advise against it.
Oh no?  Am curious as to why not.  It seems not dissimilar to how Kafka
topic retention works: the messages are removed after some time period
(hopefully after they are processed), so why would it be bad to remove
files that are already processed?

Or was it the querying of the checkpoints you were advising against?

To be sure, I was referring to moving the previously processed files away,
not the checkpoints themselves.

On Fri, Oct 27, 2023 at 12:45 PM Alexander Fedulov <
alexander.fedu...@gmail.com> wrote:

> > I wonder if you could use this fact to query the committed checkpoints
> and move them away after the job is done.
>
> This is not a robust solution, I would advise against it.
>
> Best,
> Alexander
>
> On Fri, 27 Oct 2023 at 16:41, Andrew Otto  wrote:
>
>> For moving the files:
>> > It will keep the files as is and remember the name of the file read in
>> checkpointed state to ensure it doesnt read the same file twice.
>>
>> I wonder if you could use this fact to query the committed checkpoints
>> and move them away after the job is done.  I think it should even be safe
>> to do this outside of the Flink job periodically (cron, whatever), because
>> on restart it won't reprocess the files that have been committed in the
>> checkpoints.
>>
>>
>> https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/libs/state_processor_api/#reading-state
>>
>>
>>
>>
>> On Fri, Oct 27, 2023 at 1:13 AM arjun s  wrote:
>>
>>> Hi team, Thanks for your quick response.
>>> I have an inquiry regarding file processing in the event of a job
>>> restart. When the job is restarted, we encounter challenges in tracking
>>> which files have been processed and which remain pending. Is there a method
>>> to seamlessly resume processing files from where they were left off,
>>> particularly in situations where we need to submit and restart the job
>>> manually due to any server restart or application restart? This becomes an
>>> issue when the job processes all the files in the directory from the
>>> beginning after a restart, and I'm seeking a solution to address this.
>>>
>>> Thanks and regards,
>>> Arjun
>>>
>>> On Fri, 27 Oct 2023 at 07:29, Chirag Dewan 
>>> wrote:
>>>
>>>> Hi Arjun,
>>>>
>>>> Flink's FileSource doesnt move or delete the files as of now. It will
>>>> keep the files as is and remember the name of the file read in checkpointed
>>>> state to ensure it doesnt read the same file twice.
>>>>
>>>> Flink's source API works in a way that single Enumerator operates on
>>>> the JobManager. The enumerator is responsible for listing the files and
>>>> splitting these into smaller units. These units could be the complete file
>>>> (in case of row formats) or splits within a file (for bulk formats). The
>>>> reading is done by SplitReaders in the Task Managers. This way it ensures
>>>> that only reading is done concurrently and is able to track file
>>>> completions.
>>>>
>>>> You can read more Flink Sources
>>>> <https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/datastream/sources/>
>>>>  and here
>>>> <https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/connectors/datastream/filesystem/>
>>>>
>>>> FileSystem
>>>>
>>>> FileSystem # This connector provides a unified Source and Sink for
>>>> BATCH and STREAMING that reads or writes (par...
>>>>
>>>> <https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/connectors/datastream/filesystem/>
>>>>
>>>>
>>>>
>>>> On Thursday, 26 October, 2023 at 06:53:23 pm IST, arjun s <
>>>> arjunjoice...@gmail.com> wrote:
>>>>
>>>>
>>>> Hello team,
>>>> I'm currently in the process of configuring a Flink job. This job
>>>> entails reading files from a specified directory and then transmitting the
>>>> data to a Kafka sink. I've already successfully designed a Flink job that
>>>> reads the file contents in a streaming manner and effectively sends them to
>>>> Kafka. However, my specific requirement is a bit more intricate. I need the
>>>> job to not only read these files and push the data to Kafka but also
>>>> relocate the processed file to a different directory once all of its
>>>> contents have been processed. Following this, the job should seamlessly
>>>> transition to processing the next file in the source directory.
>>>> Additionally, I have some concerns regarding how the job will behave if it
>>>> encounters a restart. Could you please advise if this is achievable, and if
>>>> so, provide guidance or code to implement it?
>>>>
>>>> I'm also quite interested in how the job will handle situations where
>>>> the source has a parallelism greater than 2 or 3, and how it can accurately
>>>> monitor the completion of reading all contents in each file.
>>>>
>>>> Thanks and Regards,
>>>> Arjun
>>>>
>>>


Re: Enhancing File Processing and Kafka Integration with Flink Jobs

2023-10-27 Thread Andrew Otto
For moving the files:
> It will keep the files as is and remember the name of the file read in
checkpointed state to ensure it doesnt read the same file twice.

I wonder if you could use this fact to query the committed checkpoints and
move them away after the job is done.  I think it should even be safe to do
this outside of the Flink job periodically (cron, whatever), because on
restart it won't reprocess the files that have been committed in the
checkpoints.

https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/libs/state_processor_api/#reading-state




On Fri, Oct 27, 2023 at 1:13 AM arjun s  wrote:

> Hi team, Thanks for your quick response.
> I have an inquiry regarding file processing in the event of a job restart.
> When the job is restarted, we encounter challenges in tracking which files
> have been processed and which remain pending. Is there a method to
> seamlessly resume processing files from where they were left off,
> particularly in situations where we need to submit and restart the job
> manually due to any server restart or application restart? This becomes an
> issue when the job processes all the files in the directory from the
> beginning after a restart, and I'm seeking a solution to address this.
>
> Thanks and regards,
> Arjun
>
> On Fri, 27 Oct 2023 at 07:29, Chirag Dewan 
> wrote:
>
>> Hi Arjun,
>>
>> Flink's FileSource doesnt move or delete the files as of now. It will
>> keep the files as is and remember the name of the file read in checkpointed
>> state to ensure it doesnt read the same file twice.
>>
>> Flink's source API works in a way that single Enumerator operates on the
>> JobManager. The enumerator is responsible for listing the files and
>> splitting these into smaller units. These units could be the complete file
>> (in case of row formats) or splits within a file (for bulk formats). The
>> reading is done by SplitReaders in the Task Managers. This way it ensures
>> that only reading is done concurrently and is able to track file
>> completions.
>>
>> You can read more Flink Sources
>> 
>>  and here
>> 
>>
>> FileSystem
>>
>> FileSystem # This connector provides a unified Source and Sink for BATCH
>> and STREAMING that reads or writes (par...
>>
>> 
>>
>>
>>
>> On Thursday, 26 October, 2023 at 06:53:23 pm IST, arjun s <
>> arjunjoice...@gmail.com> wrote:
>>
>>
>> Hello team,
>> I'm currently in the process of configuring a Flink job. This job entails
>> reading files from a specified directory and then transmitting the data to
>> a Kafka sink. I've already successfully designed a Flink job that reads the
>> file contents in a streaming manner and effectively sends them to Kafka.
>> However, my specific requirement is a bit more intricate. I need the job to
>> not only read these files and push the data to Kafka but also relocate the
>> processed file to a different directory once all of its contents have been
>> processed. Following this, the job should seamlessly transition to
>> processing the next file in the source directory. Additionally, I have some
>> concerns regarding how the job will behave if it encounters a restart.
>> Could you please advise if this is achievable, and if so, provide guidance
>> or code to implement it?
>>
>> I'm also quite interested in how the job will handle situations where the
>> source has a parallelism greater than 2 or 3, and how it can accurately
>> monitor the completion of reading all contents in each file.
>>
>> Thanks and Regards,
>> Arjun
>>
>


Re: Using HybridSource

2023-07-05 Thread Andrew Otto
Hm. I wonder if you could implement a custom Deserializer that wraps both
the CSV and Protobuf deserializer, and conditionally chooses which one to
use. As long as the final TypeInformation returned by the Source is the
same in either case, I think it should work?

> Kafka comes from protobuf while the CSV is a POJO though both have the
same fields
This could be the hard part, I think no matter what you do you'll have to
make the TypeInformation the HybridSource uses in either case be the exact
same.  Maybe you could convert your Protobuf to the POJO, or vice versa?




On Wed, Jul 5, 2023 at 10:19 AM Oscar Perez via user 
wrote:

> and this is our case Alexander, it is the same data schema but different
> data format. Kafka comes from protobuf while the CSV is a POJO though both
> have the same fields. IMHO, the design of HybridSource is very limited and
> you have to do nasty workarounds if you want to combine from cold storage
> (CSV, S3) and kafka if the expectations differ a bit from the most common
> use case (e.g. using protobuf)
>
> Regards,
> Oscar
>
> On Wed, 5 Jul 2023 at 12:53, Alexander Fedulov <
> alexander.fedu...@gmail.com> wrote:
>
>> I do not think that trying to "squash" two different data types into one
>> just to use HybridSource is the right thing to do here. HybridSource is
>> primarily intended for use cases that need to read the same data from
>> different sources. A typical example: read events from "cold storage" in S3
>> up to a specific point and switch over to "live" data in Kafka.
>> Since you are already using the low-level API, you can either
>> manually pull the data in inside of the open() function, or stream it into
>> the local state using KeyedCoProcessFunction or
>> KeyedBroadcastProcessFunction (depending on the size of the lookup state).
>>
>> This video should get you covered:
>> https://www.youtube.com/watch?v=cJS18iKLUIY
>>
>> Best,
>> Alex
>>
>>
>> On Wed, 5 Jul 2023 at 07:29, Péter Váry 
>> wrote:
>>
>>> Was it a conscious decision that HybridSource only accept Sources, and
>>> does not allow mapping functions applied to them before combining them?
>>>
>>> On Tue, Jul 4, 2023, 23:53 Ken Krugler 
>>> wrote:
>>>
 Hi Oscar,

 Couldn’t you have both the Kafka and File sources return an Either>>> from CSV File, Protobuf from Kafka>, and then (after the HybridSource) use
 a MapFunction to convert to the unified/correct type?

 — Ken


 On Jul 4, 2023, at 12:13 PM, Oscar Perez via user <
 user@flink.apache.org> wrote:

 Hei,
 1) We populate state based on this CSV data and do business logic based
 on this state and events coming from other unrelated streams.
 2) We are using low level process function in order to process this
 future hybrid source

 Regardless of the aforementioned points, please note that the main
 challenge is to combine in a hybridsource CSV and kafka topic that return
 different datatypes so I dont know how my answers relate to the original
 problem tbh. Regards,
 Oscar

 On Tue, 4 Jul 2023 at 20:53, Alexander Fedulov <
 alexander.fedu...@gmail.com> wrote:

> @Oscar
> 1. How do you plan to use that CSV data? Is it needed for lookup from
> the "main" stream?
> 2. Which API are you using? DataStream/SQL/Table or low level
> ProcessFunction?
>
> Best,
> Alex
>
>
> On Tue, 4 Jul 2023 at 11:14, Oscar Perez via user <
> user@flink.apache.org> wrote:
>
>> ok, but is it? As I said, both sources have different data types. In
>> the example here:
>>
>>
>> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/hybridsource/
>>
>> We are using both sources as returning string but in our case, one
>> source would return a protobuf event while the other would return a pojo.
>> How can we make the 2 sources share the same datatype so that we can
>> successfully use hybrid source?
>>
>> Regards,
>> Oscar
>>
>> On Tue, 4 Jul 2023 at 12:04, Alexey Novakov 
>> wrote:
>>
>>> Hi Oscar,
>>>
>>> You could use connected streams and put your file into a special
>>> Kafka topic before starting such a job:
>>> https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/datastream/operators/overview/#connect
>>> But this may require more work and the event ordering (which is
>>> shuffled) in the connected streams is probably not what you are looking 
>>> for.
>>>
>>> I think HybridSource is the right solution.
>>>
>>> Best regards,
>>> Alexey
>>>
>>> On Mon, Jul 3, 2023 at 3:44 PM Oscar Perez via user <
>>> user@flink.apache.org> wrote:
>>>
 Hei, We want to bootstrap some data from a CSV file before reading
 from a kafka topic that has a retention period of 7 days.

 We believe the best tool for that would be the 

Re: Flink Stream processing with Dynamic Conditional Sinks

2023-06-05 Thread Andrew Otto
I've considered a similar question before:  Dynamic addition of Sinks /
based on some external configuration.

The answer I've mostly been given is: this is a bad idea.  The checkpoint
state that flink uses for job recovery is dependent on the topology of the
job, and dynamically adding more sinks changes this topology.

You might be able to get away with dynamically submitting multiple jobs in
a single Flink application, but you'd have to restart the application every
time you add a new job.

I've not tried this though, so hopefully someone smarter can come in and
advise as well :)

-Andrew Otto
 Wikimedia Foundation


On Mon, Jun 5, 2023 at 8:27 AM Yogesh Rao  wrote:

> Hello,
>
> I am trying out flink for one stream processing scenario and was wondering
> if it can be achieved using Apache Flink. So any pointers regarding how it
> can be achieved will be of great help.
>
> Scenario :-
>
> A kafka topic has the input for stream processing, multiple applications
> lets say A & B would be publishing their message to the same topic (Topic
> X) with different keys (keys being application names). These messages are
> read by stream processing applications and processed eventually landing in
> sinks specific for A & B. The end result is to have this entire piece
> dynamic so that new applications C,D,E etc.. can be automatically
> accommodated.
>
> ATM i am able to figure out the kafka source and stream processing part.
> What I am not clear is incase of streaming would conditional multiple sinks
> work ? i.e. for Application A data lands into Sink A, Application B -> Sink
> B and so on .
>
> From Implementation I could probably split the stream and pass those
> streams to respective tables. However all this needs to happen dynamically.
>
> Would Apache Flink be able to support this ? if yes how?
>
> I am using Apache Flink 1.17.1 with the pipeline written in Java
>
> Thank you in advance,
>
> Regards,
> -Yogesh
>


Re: Flink Kubernetes Operator lifecycle state count metrics question

2023-05-22 Thread Andrew Otto
Also!  I do have 2 FlinkDeployments deployed with this operator, but they
are in different namespaces, and each of the per namespace metrics reports
that it has 2 Deployments in them, even though there is only one according
to kubectl.

Actually...we just tried to deploy a change (enabling some checkpointing)
that caused one of our FlinkDeployments to fail.  Now, both namespace
STABLE_Counts each report 1.

# curl -s : | grep
flink_k8soperator_namespace_Lifecycle_State_STABLE_Count
flink_k8soperator_namespace_Lifecycle_State_STABLE_Count{resourcetype="FlinkDeployment",resourcens="stream_enrichment_poc",name="flink_kubernetes_operator",host="flink_kubernetes_operator_86b888d6b6_gbrt4",namespace="flink_operator",}
1.0
flink_k8soperator_namespace_Lifecycle_State_STABLE_Count{resourcetype="FlinkDeployment",resourcens="rdf_streaming_updater",name="flink_kubernetes_operator",host="flink_kubernetes_operator_86b888d6b6_gbrt4",namespace="flink_operator",}
1.0

It looks like maybe this metric is not reporting per namespace, but a
global count.



On Mon, May 22, 2023 at 2:56 PM Andrew Otto  wrote:

> Oh, FWIW, I do have operator HA enabled with 2 replicas running, but in my
> examples there, I am curl-ing the leader flink operator pod.
>
>
>
> On Mon, May 22, 2023 at 2:47 PM Andrew Otto  wrote:
>
>> Hello!
>>
>> I'm doing some grafana+prometheus dashboarding for
>> flink-kubernetes-operator.  Reading metrics docs
>> <https://stackoverflow.com/a/61795256>, I see that I have nice per k8s
>> namespace lifecycle current count gauge metrics in Prometheus.
>>
>> Using kubectl, I can see that I have one FlinkDeployment in my namespace:
>>
>> # kubectl -n stream-enrichment-poc get flinkdeployments
>> NAME JOB STATUS   LIFECYCLE STATE
>> flink-app-main   RUNNING  STABLE
>>
>> But, prometheus is reporting that I have 2 FlinkDeployments in the STABLE
>> state.
>>
>> # curl -s :  | grep
>> flink_k8soperator_namespace_Lifecycle_State_STABLE_Count
>> flink_k8soperator_namespace_Lifecycle_State_STABLE_Count{resourcetype="FlinkDeployment",resourcens="stream_enrichment_poc",name="flink_kubernetes_operator",host="flink_kubernetes_operator_86b888d6b6_gbrt4",namespace="flink_operator",}
>> 2.0
>>
>> I'm not sure why I see 2.0 reported.
>> flink_k8soperator_namespace_JmDeploymentStatus_READY_Count reports only
>> one FlinkDeployment.
>>
>> # curl :/metrics | grep
>> flink_k8soperator_namespace_JmDeploymentStatus_READY_Count
>> flink_k8soperator_namespace_JmDeploymentStatus_READY_Count{resourcetype="FlinkDeployment",resourcens="stream_enrichment_poc",name="flink_kubernetes_operator",host="flink_kubernetes_operator_86b888d6b6_gbrt4",namespace="flink_operator",}
>> 1.0
>>
>> Is it possible that
>> flink_k8soperator_namespace_Lifecycle_State_STABLE_Count is being
>> reported as an incrementing counter instead of a guage?
>>
>> Thanks
>> -Andrew Otto
>>  Wikimedia Foundation
>>
>>


Re: Flink Kubernetes Operator lifecycle state count metrics question

2023-05-22 Thread Andrew Otto
Oh, FWIW, I do have operator HA enabled with 2 replicas running, but in my
examples there, I am curl-ing the leader flink operator pod.



On Mon, May 22, 2023 at 2:47 PM Andrew Otto  wrote:

> Hello!
>
> I'm doing some grafana+prometheus dashboarding for
> flink-kubernetes-operator.  Reading metrics docs
> <https://stackoverflow.com/a/61795256>, I see that I have nice per k8s
> namespace lifecycle current count gauge metrics in Prometheus.
>
> Using kubectl, I can see that I have one FlinkDeployment in my namespace:
>
> # kubectl -n stream-enrichment-poc get flinkdeployments
> NAME JOB STATUS   LIFECYCLE STATE
> flink-app-main   RUNNING  STABLE
>
> But, prometheus is reporting that I have 2 FlinkDeployments in the STABLE
> state.
>
> # curl -s :  | grep
> flink_k8soperator_namespace_Lifecycle_State_STABLE_Count
> flink_k8soperator_namespace_Lifecycle_State_STABLE_Count{resourcetype="FlinkDeployment",resourcens="stream_enrichment_poc",name="flink_kubernetes_operator",host="flink_kubernetes_operator_86b888d6b6_gbrt4",namespace="flink_operator",}
> 2.0
>
> I'm not sure why I see 2.0 reported.
> flink_k8soperator_namespace_JmDeploymentStatus_READY_Count reports only
> one FlinkDeployment.
>
> # curl :/metrics | grep
> flink_k8soperator_namespace_JmDeploymentStatus_READY_Count
> flink_k8soperator_namespace_JmDeploymentStatus_READY_Count{resourcetype="FlinkDeployment",resourcens="stream_enrichment_poc",name="flink_kubernetes_operator",host="flink_kubernetes_operator_86b888d6b6_gbrt4",namespace="flink_operator",}
> 1.0
>
> Is it possible that
> flink_k8soperator_namespace_Lifecycle_State_STABLE_Count is being
> reported as an incrementing counter instead of a guage?
>
> Thanks
> -Andrew Otto
>  Wikimedia Foundation
>
>


Flink Kubernetes Operator lifecycle state count metrics question

2023-05-22 Thread Andrew Otto
Hello!

I'm doing some grafana+prometheus dashboarding for
flink-kubernetes-operator.  Reading metrics docs
<https://stackoverflow.com/a/61795256>, I see that I have nice per k8s
namespace lifecycle current count gauge metrics in Prometheus.

Using kubectl, I can see that I have one FlinkDeployment in my namespace:

# kubectl -n stream-enrichment-poc get flinkdeployments
NAME JOB STATUS   LIFECYCLE STATE
flink-app-main   RUNNING  STABLE

But, prometheus is reporting that I have 2 FlinkDeployments in the STABLE
state.

# curl -s :  | grep
flink_k8soperator_namespace_Lifecycle_State_STABLE_Count
flink_k8soperator_namespace_Lifecycle_State_STABLE_Count{resourcetype="FlinkDeployment",resourcens="stream_enrichment_poc",name="flink_kubernetes_operator",host="flink_kubernetes_operator_86b888d6b6_gbrt4",namespace="flink_operator",}
2.0

I'm not sure why I see 2.0 reported.
flink_k8soperator_namespace_JmDeploymentStatus_READY_Count reports only one
FlinkDeployment.

# curl :/metrics | grep
flink_k8soperator_namespace_JmDeploymentStatus_READY_Count
flink_k8soperator_namespace_JmDeploymentStatus_READY_Count{resourcetype="FlinkDeployment",resourcens="stream_enrichment_poc",name="flink_kubernetes_operator",host="flink_kubernetes_operator_86b888d6b6_gbrt4",namespace="flink_operator",}
1.0

Is it possible that flink_k8soperator_namespace_Lifecycle_State_STABLE_Count
is being reported as an incrementing counter instead of a guage?

Thanks
-Andrew Otto
 Wikimedia Foundation


Re: flink-kubernetes-operator HA k8s RoleBinding for Leases?

2023-05-09 Thread Andrew Otto
> Could you please open a jira

Done: https://issues.apache.org/jira/browse/FLINK-32041

> PR (in case you fixed this already)
Haven't fixed it yet! But if I find time to do it I will!

Thanks!


On Tue, May 9, 2023 at 4:49 AM Tamir Sagi 
wrote:

> Hey,
>
> I also encountered something similar with different error.  I enabled HA
> with RBAC.
>
> org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.client.KubernetesClientException","message":"Failure
> executing: GET at: https://172.20.0.1/api/v1/nodes. Message:
> Forbidden!Configured service account doesn't have access. Service account
> may have been revoked. nodes is forbidden: User
> "system:serviceaccount:dev-0-flink-clusters:
> *dev-0-xsight-flink-operator-sa*" cannot list resource "nodes" in API
> group "" at the cluster scope."
>
> I checked the rolebinding between the service account 
> `dev-0-flink-clusters:dev-0-xsight-flink-operator-sa`
> and the corresponded role(*flink-operator*) which has been created by the
> operator using *rbac.nodesRule.create=true.*
>
> role binding
>
>
> role: flink-operator
>
>
> am I missing something?*​*
>
>
> --
> *From:* Gyula Fóra 
> *Sent:* Tuesday, May 9, 2023 7:43 AM
> *To:* Andrew Otto 
> *Cc:* User 
> *Subject:* Re: flink-kubernetes-operator HA k8s RoleBinding for Leases?
>
>
> *EXTERNAL EMAIL*
>
>
> Hey!
>
> Sounds like a bug :) Could you please open a jira / PR (in case you fixed
> this already)?
>
> Thanks
> Gyula
>
> On Mon, 8 May 2023 at 22:20, Andrew Otto  wrote:
>
> Hi,
>
> I'm trying to enable HA for flink-kubernetes-operator
> <https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/operations/configuration/#leader-election-and-high-availability>
> with Helm.  We are using namespaced RBAC via watchedNamespaces.
>
> I've followed instructions and set
> kubernetes.operator.leader-election.enabled and
> kubernetes.operator.leader-election.lease-name, and increased replicas to
> 2.  When I deploy, the second replica comes online, but errors with:
>
> Exception occurred while acquiring lock 'LeaseLock: flink-operator -
> flink-operator-lease (flink-kubernetes-operator-86b888d6b6-8cxjs
> Failure executing: GET at:
> https://x.x.x.x/apis/coordination.k8s.io/v1/namespaces/flink-operator/leases/flink-operator-lease.
> Message: Forbidden!Configured service account doesn't have access. Service
> account may have been revoked. leases.coordination.k8s.io
> "flink-operator-lease" is forbidden: User
> "system:serviceaccount:flink-operator:flink-operator" cannot get resource
> "leases" in API group "coordination.k8s.io" in the namespace
> "flink-operator".
>
> Looking at the rbac.yaml helm template
> <https://github.com/apache/flink-kubernetes-operator/blob/main/helm/flink-kubernetes-operator/templates/rbac.yaml>,
> it looks like the Role and RoleBindings that grant access to the leases
> resource are created for the configured watchNamespaces, but not for the
> namespace in which the flink-kubernetes-operator is deployed.  I think that
> for HA, the flink-kubernetes-operator is going to be asking k8s for Leases
> in its own namespace, right?
>
> Is this a bug, or am I doing something wrong?  I'd file a JIRA, but I
> betcha I'm just doing something wrong (unless I'm the first person who's
> tried to use HA + namespaced RBAC with the helm charts?).
>
> Thanks!
> -Andrew Otto
>  Wikimedia Foundation
>
>
>
>
>
>
> Confidentiality: This communication and any attachments are intended for
> the above-named persons only and may be confidential and/or legally
> privileged. Any opinions expressed in this communication are not
> necessarily those of NICE Actimize. If this communication has come to you
> in error you must take no action based on it, nor must you copy or show it
> to anyone; please delete/destroy and inform the sender by e-mail
> immediately.
> Monitoring: NICE Actimize may monitor incoming and outgoing e-mails.
> Viruses: Although we have taken steps toward ensuring that this e-mail and
> attachments are free from any virus, we advise that in keeping with good
> computing practice the recipient should ensure they are actually virus free.
>


flink-kubernetes-operator HA k8s RoleBinding for Leases?

2023-05-08 Thread Andrew Otto
Hi,

I'm trying to enable HA for flink-kubernetes-operator
<https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/operations/configuration/#leader-election-and-high-availability>
with Helm.  We are using namespaced RBAC via watchedNamespaces.

I've followed instructions and set
kubernetes.operator.leader-election.enabled and
kubernetes.operator.leader-election.lease-name, and increased replicas to
2.  When I deploy, the second replica comes online, but errors with:

Exception occurred while acquiring lock 'LeaseLock: flink-operator -
flink-operator-lease (flink-kubernetes-operator-86b888d6b6-8cxjs
Failure executing: GET at:
https://x.x.x.x/apis/coordination.k8s.io/v1/namespaces/flink-operator/leases/flink-operator-lease.
Message: Forbidden!Configured service account doesn't have access. Service
account may have been revoked. leases.coordination.k8s.io
"flink-operator-lease" is forbidden: User
"system:serviceaccount:flink-operator:flink-operator" cannot get resource
"leases" in API group "coordination.k8s.io" in the namespace
"flink-operator".

Looking at the rbac.yaml helm template
<https://github.com/apache/flink-kubernetes-operator/blob/main/helm/flink-kubernetes-operator/templates/rbac.yaml>,
it looks like the Role and RoleBindings that grant access to the leases
resource are created for the configured watchNamespaces, but not for the
namespace in which the flink-kubernetes-operator is deployed.  I think that
for HA, the flink-kubernetes-operator is going to be asking k8s for Leases
in its own namespace, right?

Is this a bug, or am I doing something wrong?  I'd file a JIRA, but I
betcha I'm just doing something wrong (unless I'm the first person who's
tried to use HA + namespaced RBAC with the helm charts?).

Thanks!
-Andrew Otto
 Wikimedia Foundation


Re: Flink Job across Data Centers

2023-04-12 Thread Andrew Otto
Hi, I asked a similar question in this thread
, which
might have some relevant info.

On Wed, Apr 12, 2023 at 7:23 AM Chirag Dewan via user 
wrote:

> Hi,
>
> Can anyone share any experience on running Flink jobs across data centers?
>
> I am trying to create a Multi site/Geo Replicated Kafka cluster. I want
> that my Flink job to be closely colocated with my Kafka multi site cluster.
> If the Flink job is bound to a single data center, I believe we will
> observe a lot of client latency by trying to access the broker in another
> DC.
>
> Rather if I can make my Flink Kafka collectors as rack aware and start
> fetching data from the closest Kafka broker, I should get better results.
>
> I will be deploying Flink 1.16 on Kubernetes with Strimzi managed Apache
> Kafka.
>
> Thanks.
>
>


Re: Re: KafkaSource consumer group

2023-03-31 Thread Andrew Otto
Hi,

FWIW, I asked a similar question here:
https://lists.apache.org/thread/1f01zo1lqcmhvosptpjlm6k3mgx0sv1m

:)


On Fri, Mar 31, 2023 at 3:57 AM Roberts, Ben (Senior Developer) via user <
user@flink.apache.org> wrote:

> Hi Gordon,
>
> Thanks for the reply!
> I think that makes sense.
>
> The reason for investigating is that generally we run our production
> workloads across 2 kubernetes clusters (each in a different cloud region)
> for availability reasons. So for instance requests to web apps are load
> balanced between servers in both clusters, and pub/sub apps will have
> consumers running in both clusters in the same consumer group (or non-kafka
> equivalent).
>
> We’ve just recently deployed our first production Flink workload, using
> the flink-kubernetes-operator and running the job(s) in HA mode, but we
> discovered that the same job running in each k8s cluster was processing the
> same messages, which was different to what we’d expected.
> It sounds like this is intentional from Flink’s POV though.
>
> I don’t suppose you’re aware of a feature that would allow us to run a
> Flink job across 2 clusters? Otherwise I guess we’ll need to just run it in
> a single cluster and be aware of the risks if we lost that cluster.
>
> Thanks,
> Ben
>
> On 2023/03/30 16:52:31 "Tzu-Li (Gordon) Tai" wrote:
> > Hi Robert,
> >
> > This is a design choice. Flink's KafkaSource doesn't rely on consumer
> > groups for assigning partitions / rebalancing / offset tracking. It
> > manually assigns whatever partitions are in the specified topic across
> its
> > consumer instances, and rebalances only when the Flink job / KafkaSink is
> > rescaled.
> >
> > Is there a specific reason that you need two Flink jobs for this? I
> believe
> > the Flink-way of doing this would be to have one job read the topic, and
> > then you'd do a stream split if you want to have two different branches
> of
> > processing business logic.
> >
> > Thanks,
> > Gordon
> >
> > On Thu, Mar 30, 2023 at 9:34 AM Roberts, Ben (Senior Developer) via user
> <
> > user@flink.apache.org> wrote:
> >
> > > Hi,
> > >
> > >
> > >
> > > Is there a way to run multiple flink jobs with the same Kafka group.id
> > > and have them join the same consumer group?
> > >
> > >
> > >
> > > It seems that setting the group.id using
> > > KafkaSource.builder().set_group_id() does not have the effect of
> creating
> > > an actual consumer group in Kafka.
> > >
> > >
> > >
> > > Running the same flink job with the same group.id, consuming from the
> > > same topic, will result in both flink jobs receiving the same messages
> from
> > > the topic, rather than only one of the jobs receiving the messages (as
> > > would be expected for consumers in a consumer group normally with
> Kafka).
> > >
> > >
> > >
> > > Is this a design choice, and is there a way to configure it so messages
> > > can be split across two jobs using the same “group.id”?
> > >
> > >
> > >
> > > Thanks in advance,
> > >
> > > Ben
> > >
> > >
> > > Information in this email including any attachments may be privileged,
> > > confidential and is intended exclusively for the addressee. The views
> > > expressed may not be official policy, but the personal views of the
> > > originator. If you have received it in error, please notify the sender
> by
> > > return e-mail and delete it from your system. You should not reproduce,
> > > distribute, store, retransmit, use or disclose its contents to anyone.
> > > Please note we reserve the right to monitor all e-mail communication
> > > through our internal and external networks. SKY and the SKY marks are
> > > trademarks of Sky Limited and Sky International AG and are used under
> > > licence.
> > >
> > > Sky UK Limited (Registration No. 2906991), Sky-In-Home Service Limited
> > > (Registration No. 2067075), Sky Subscribers Services Limited
> (Registration
> > > No. 2340150) and Sky CP Limited (Registration No. 9513259) are direct
> or
> > > indirect subsidiaries of Sky Limited (Registration No. 2247735). All
> of the
> > > companies mentioned in this paragraph are incorporated in England and
> Wales
> > > and share the same registered office at Grant Way, Isleworth,
> Middlesex TW7
> > > 5QD
> > >
> >
> Information in this email including any attachments may be privileged,
> confidential and is intended exclusively for the addressee. The views
> expressed may not be official policy, but the personal views of the
> originator. If you have received it in error, please notify the sender by
> return e-mail and delete it from your system. You should not reproduce,
> distribute, store, retransmit, use or disclose its contents to anyone.
> Please note we reserve the right to monitor all e-mail communication
> through our internal and external networks. SKY and the SKY marks are
> trademarks of Sky Limited and Sky International AG and are used under
> licence.
>
> Sky UK Limited (Registration No. 2906991), Sky-In-Home Service Limited
> (Registration No. 2067075), Sky 

Re: [ANNOUNCE] Flink Table Store Joins Apache Incubator as Apache Paimon(incubating)

2023-03-27 Thread Andrew Otto
Exciting!

If this ends up working well, Wikimedia Foundation would love to try it out!

On Mon, Mar 27, 2023 at 8:39 AM Matthias Pohl via user <
user@flink.apache.org> wrote:

> Congratulations and good luck with pushing the project forward.
>
> On Mon, Mar 27, 2023 at 2:35 PM Jing Ge via user 
> wrote:
>
>> Congrats!
>>
>> Best regards,
>> Jing
>>
>> On Mon, Mar 27, 2023 at 2:32 PM Leonard Xu  wrote:
>>
>>> Congratulations!
>>>
>>>
>>> Best,
>>> Leonard
>>>
>>> On Mar 27, 2023, at 5:23 PM, Yu Li  wrote:
>>>
>>> Dear Flinkers,
>>>
>>>
>>>
>>> As you may have noticed, we are pleased to announce that Flink Table Store 
>>> has joined the Apache Incubator as a separate project called Apache 
>>> Paimon(incubating) [1] [2] [3]. The new project still aims at building a 
>>> streaming data lake platform for high-speed data ingestion, change data 
>>> tracking and efficient real-time analytics, with the vision of supporting a 
>>> larger ecosystem and establishing a vibrant and neutral open source 
>>> community.
>>>
>>>
>>>
>>> We would like to thank everyone for their great support and efforts for the 
>>> Flink Table Store project, and warmly welcome everyone to join the 
>>> development and activities of the new project. Apache Flink will continue 
>>> to be one of the first-class citizens supported by Paimon, and we believe 
>>> that the Flink and Paimon communities will maintain close cooperation.
>>>
>>>
>>> 亲爱的Flinkers,
>>>
>>>
>>> 正如您可能已经注意到的,我们很高兴地宣布,Flink Table Store 已经正式加入 Apache
>>> 孵化器独立孵化 [1] [2] [3]。新项目的名字是
>>> Apache 
>>> Paimon(incubating),仍致力于打造一个支持高速数据摄入、流式数据订阅和高效实时分析的新一代流式湖仓平台。此外,新项目将支持更加丰富的生态,并建立一个充满活力和中立的开源社区。
>>>
>>>
>>> 在这里我们要感谢大家对 Flink Table Store
>>> 项目的大力支持和投入,并热烈欢迎大家加入新项目的开发和社区活动。Apache Flink 将继续作为 Paimon 支持的主力计算引擎之一,我们也相信
>>> Flink 和 Paimon 社区将继续保持密切合作。
>>>
>>>
>>> Best Regards,
>>> Yu (on behalf of the Apache Flink PMC and Apache Paimon PPMC)
>>>
>>> 致礼,
>>> 李钰(谨代表 Apache Flink PMC 和 Apache Paimon PPMC)
>>>
>>> [1] https://paimon.apache.org/
>>> [2] https://github.com/apache/incubator-paimon
>>> [3] https://cwiki.apache.org/confluence/display/INCUBATOR/PaimonProposal
>>>
>>>
>>>


Re: Handling JSON Serialization without Kryo

2023-03-27 Thread Andrew Otto
Hi,

> The problem here is that the shape of the data can vary wildly and
dynamically. Some records may have properties unique to only that record,
which makes defining a POJO difficult

AFAIK, the only way to avoid POJOs in Flink is to use Row (DataStream) or
RowData (Table API).  These are Flink's 'schema' typesystem, and they allow
you to dynamically describe the shape of data.

At the Wikimedia Foundation, we also find it difficult to use JSON +
Flink.  We at least have JSONSchemas though, and that has allowed us to
write custom converters (serdes) for JSONSchema -> Row & RowData.

This might not be helpful for you (unless you also use JSONSchema), but
here's a couple of convos on this list with some relevant info:

- Flink, JSON, and JSONSchemas
<https://lists.apache.org/thread/v6622nv043x71h3jgz9zyc9oppnx2g6r>
- Converting ResolvedSchema to JSON and Protobuf Schemas
<https://lists.apache.org/thread/7gnllmggbqwnoj22dfcbrmngr16dbnxb>
Good luck!
-Andrew Otto
 Wikimedia Foundation

On Wed, Mar 22, 2023 at 8:07 AM Rion Williams  wrote:

> Hi Ken,
>
> I’m going to profile the job today to try and get a better handle on where
> the bottleneck is. The job currently just passes around JsonObjects between
> the operators, which are relying on Kryo. The job also writes to Postgres,
> Kafka, and Elasticsearch so it’s possible that one of those is causing the
> back-pressure.
>
> I’m a bit shocked at the stunningly low speeds as well. Initially, the job
> would perform fine but checkpointing sizes would gradually build up (as
> would durations for them) until performance degraded to the borderline
> unusable 1-2 records/second.
>
> On Mar 21, 2023, at 2:35 PM, Ken Krugler 
> wrote:
>
> Hi Rion,
>
> I’m using Gson to deserialize to a Map.
>
> 1-2 records/second sounds way too slow, unless each record is enormous.
>
> — Ken
>
> On Mar 21, 2023, at 6:18 AM, Rion Williams  wrote:
>
> Hi Ken,
>
> Thanks for the response. I hadn't tried exploring the use of the Record
> class, which I'm assuming you're referring to a flink.types.Record, to read
> the JSON into. Did you handle this via using a mapper to read the
> properties in (e.g. Gson, Jackson) as fields or take a different approach?
> Additionally, how has your experience been with performance? Kryo with the
> existing job leveraging JsonObjects (via Gson) is horrific (~1-2
> records/second) and can't keep up with the speed of the producers, which is
> the impetus behind reevaluating the serialization.
>
> I'll explore this a bit more.
>
> Thanks,
>
> Rion
>
> On Mon, Mar 20, 2023 at 10:28 PM Ken Krugler 
> wrote:
>
>> Hi Rion,
>>
>> For my similar use case, I was able to make a simplifying assumption that
>> my top-level JSON object was a record.
>>
>> I then registered a custom Kryo serde that knew how to handle the handful
>> of JsonPrimitive types for the record entries.
>>
>> I recently looked at extending that to support arrays and nested records,
>> but haven’t had to do that.
>>
>> — Ken
>>
>>
>> On Mar 20, 2023, at 6:56 PM, Rion Williams  wrote:
>>
>> Hi Shammon,
>>
>> Unfortunately it’s a data stream job. I’ve been exploring a few options
>> but haven’t found anything I’ve decided on yet. I’m currently looking at
>> seeing if I can leverage some type of partial serialization to bind to the
>> properties that I know the job will use and retain the rest as a JSON blob.
>> I’ve also consider trying to store the fields as a large map of
>> string-object pairs and translating thay into a string prior to writing to
>> the sinks.
>>
>> Still accepting any/all ideas that I come across to see if I can handle
>> this in an efficient, reasonable way.
>>
>> Thanks,
>>
>> Rion
>>
>> On Mar 20, 2023, at 8:40 PM, Shammon FY  wrote:
>>
>> 
>> Hi Rion
>>
>> Is your job datastream or table/sql? If it is a table/sql job, and you
>> can define all the fields in json you need, then you can directly use json
>> format [1] to parse the data.
>>
>> You can also customize udf functions to parse json data into struct data,
>> such as map, row and other types supported by flink
>>
>>
>> [1]
>> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/formats/json/
>>
>> Best,
>> Shammon FY
>>
>>
>> On Sun, Mar 19, 2023 at 7:44 AM Rion Williams 
>> wrote:
>>
>>> Hi all,
>>>
>>> I’m reaching out today for some suggestions (and hopefully a solution)
>>> for a Flink job that I’m working on. The job itself reads JSON strings from
>>&g

Re: Kubernetes operator set container resources and limits

2023-03-13 Thread Andrew Otto
Hi,

> return to the same values from jobManager.resource
FlinkDeployment manifest parameter
I believe this is the correct way; using jobManager.resources

and taskManager.resources

in
the FlinkDeployment

.

Is there a reason you can't change the resources values there?  I don't
think you should need to do so with podTemplate.



On Mon, Mar 13, 2023 at 9:56 AM Evgeniy Lyutikov 
wrote:

> Hi all
> Is there any way to specify different values for resources and limits for
> a jobmanager container?
> The problem is that sometimes kubernetes kills the jobmanager container
> because it exceeds the memory consumption.
>
>
> Last State: Terminated
>   Reason:   OOMKilled
>   Exit Code:137
>   Started:  Tue, 07 Mar 2023 18:06:01 +0700
>   Finished: Fri, 10 Mar 2023 23:20:54 +0700
>
> What I tried to do:
> 1. added the 'jobmanager.memory.process.size' parameter to
> flinkConfiguration with a value less than the allocated resources for the
> container, but after launch, the value of this parameter is set to the
> amount of memory allocated to the container.
> 2. I tried to set resources and limits through the jobmanager pod template,
> but for the running container, the values again return to the same values
> from jobManager.resource FlinkDeployment manifest parameter
>
> Kubernetes operator 1.2.0 and Flink 1.14.4
>
>
>
> * -- *“This message contains confidential
> information/commercial secret. If you are not the intended addressee of
> this message you may not copy, save, print or forward it to any third party
> and you are kindly requested to destroy this message and notify the sender
> thereof by email.
> Данное сообщение содержит конфиденциальную информацию/информацию,
> являющуюся коммерческой тайной. Если Вы не являетесь надлежащим адресатом
> данного сообщения, Вы не вправе копировать, сохранять, печатать или
> пересылать его каким либо иным лицам. Просьба уничтожить данное сообщение и
> уведомить об этом отправителя электронным письмом.”
>


Re: Reusing the same OutputTag in multiple ProcessFunctions

2023-02-15 Thread Andrew Otto
Wow thank you so much!  Good to know its not just me.

At the end of my day yesterday, I started sniffing this out too.  I think I
effectively did the same thing as setting _j_typeinfo to None by manually
recreating the _j_typeinfo in a new ('cloned') output tag:

from pyflink.common.typeinfo import TypeInformation, _from_java_type
from pyflink.datastream import OutputTag

def clone_type_info(type_info: TypeInformation) -> TypeInformation:
return _from_java_type(type_info.get_java_type_info())

def clone_output_tag(tag: OutputTag) -> OutputTag:
return OutputTag(tag.tag_id, clone_type_info(tag.type_info))

Then, every time I need to use an OutputTag (or any function that will
enclos a _j_type_info) I make sure that that object is 'cloned'.

Thanks so much for the bugfiix!  Looking forward to it!


On Wed, Feb 15, 2023 at 4:41 AM Juntao Hu  wrote:

> Hi Andrew,
>
> I've found out that this's a bug brought by another bugfix FLINK-29681
> <https://issues.apache.org/jira/browse/FLINK-29681>, I've created an
> issue FLINK-31083 <https://issues.apache.org/jira/browse/FLINK-31083> for
> this problem. You could temporarily set inner java type_info to None before
> reusing the ProcessFunction to work around in your code, e.g.
> ```python
> side_output_ds1 = processed_ds1.get_side_output(output_tag1)
> output_tag1.type_info._j_typeinfo = None
> processed_ds2 = processed_ds1.process(LinesWithAndToSideOutput())
> ```
>
> Thanks for reporting!
>
> David Anderson  于2023年2月15日周三 14:03写道:
>
>> I can't respond to the python-specific aspects of this situation, but
>> I don't believe you need to use the same OutputTag instance. It should
>> be enough that the various tag instances involved all have the same
>> String id. (That's why the id exists.)
>>
>> David
>>
>> On Tue, Feb 14, 2023 at 11:51 AM Andrew Otto  wrote:
>> >
>> > Hi,
>> >
>> > I'm attempting to implement a generic error handling ProcessFunction in
>> pyflink.  Given a user provided function, I want to invoke that function
>> for each element in the DataStream, catch any errors thrown by the
>> function, convert those errors into events, and then emit those event
>> errors to a different DataStream sink.
>> >
>> > I'm trying to do this by reusing the same OutputTag in each of my
>> ProcessFunctions.
>> > However, this does not work, I believe because I am using the same
>> error_output_tag in two different functions, which causes it to have a
>> reference(?)  to _thread.Rlock, which causes the ProcessFunction instance
>> to be un-pickleable.
>> >
>> > Here's a standalone example of the failure using the canonical
>> word_count example.
>> >
>> > My question is.
>> > 1. Does Flink support re-use of the same OutputTag instance in multiple
>> ProcessFunctions?
>> > 2. If so, is my problem pyflink / python / pickle specific?
>> >
>> > Thanks!
>> > -Andrew Otto
>> >  Wikimedia Foundation
>> >
>> >
>>
>


Reusing the same OutputTag in multiple ProcessFunctions

2023-02-14 Thread Andrew Otto
Hi,

I'm attempting to implement a generic error handling ProcessFunction in
pyflink.  Given a user provided function, I want to invoke that function
for each element in the DataStream, catch any errors thrown by
the function, convert those errors into events, and then emit those event
errors to a different DataStream sink.

I'm trying to do this by reusing the same OutputTag in each of my
ProcessFunctions.
However, this does not work, I believe because I am using the same
error_output_tag in two different functions, which causes it to have a
reference(?)  to _thread.Rlock, which causes the ProcessFunction instance
to be un-pickleable.

Here's a standalone example
<https://gist.github.com/ottomata/cba55f2c65cf584ffdb933410f3b4237> of the
failure using the canonical word_count example.

My question is.
1. Does Flink support re-use of the same OutputTag instance in multiple
ProcessFunctions?
2. If so, is my problem pyflink / python / pickle specific?

Thanks!
-Andrew Otto
 Wikimedia Foundation


Re: Pyflink Side Output Question and/or suggested documentation change

2023-02-13 Thread Andrew Otto
Thank you!

On Mon, Feb 13, 2023 at 5:55 AM Dian Fu  wrote:

> Thanks Andrew, I think this is a valid advice. I will update the
> documentation~
>
> Regards,
> Dian
>
> ,
>
> On Fri, Feb 10, 2023 at 10:08 PM Andrew Otto  wrote:
>
>> Question about side outputs and OutputTags in pyflink.  The docs
>> <https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/side_output/>
>> say we are supposed to
>>
>> yield output_tag, value
>>
>> Docs then say:
>> > For retrieving the side output stream you use getSideOutput(OutputTag) on
>> the result of the DataStream operation.
>>
>> From this, I'd expect that calling datastream.get_side_output would be
>> optional.   However, it seems that if you do not call
>> datastream.get_side_output, then the main datastream will have the record
>> destined to the output tag still in it, as a Tuple(output_tag, value).
>> This caused me great confusion for a while, as my downstream tasks would
>> break because of the unexpected Tuple type of the record.
>>
>> Here's an example of the failure using side output and ProcessFunction
>> in the word count example.
>> <https://gist.github.com/ottomata/001df5df72eb1224c01c9827399fcbd7#file-pyflink_sideout_fail_word_count_example-py-L86-L100>
>>
>> I'd expect that just yielding an output_tag would make those records be
>> in a different datastream, but apparently this is not the case unless you
>> call get_side_output.
>>
>> If this is the expected behavior, perhaps the docs should be updated to
>> say so?
>>
>> -Andrew Otto
>>  Wikimedia Foundation
>>
>>
>>
>>
>>


Pyflink Side Output Question and/or suggested documentation change

2023-02-10 Thread Andrew Otto
Question about side outputs and OutputTags in pyflink.  The docs
<https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/side_output/>
say we are supposed to

yield output_tag, value

Docs then say:
> For retrieving the side output stream you use getSideOutput(OutputTag) on
the result of the DataStream operation.

>From this, I'd expect that calling datastream.get_side_output would be
optional.   However, it seems that if you do not call
datastream.get_side_output, then the main datastream will have the record
destined to the output tag still in it, as a Tuple(output_tag, value).
This caused me great confusion for a while, as my downstream tasks would
break because of the unexpected Tuple type of the record.

Here's an example of the failure using side output and ProcessFunction in
the word count example.
<https://gist.github.com/ottomata/001df5df72eb1224c01c9827399fcbd7#file-pyflink_sideout_fail_word_count_example-py-L86-L100>

I'd expect that just yielding an output_tag would make those records be in
a different datastream, but apparently this is not the case unless you call
get_side_output.

If this is the expected behavior, perhaps the docs should be updated to say
so?

-Andrew Otto
 Wikimedia Foundation


Re: Kafka Sink Kafka Producer metrics?

2023-02-07 Thread Andrew Otto
Wow, not sure how I missed that.  Thank you.



On Mon, Feb 6, 2023 at 9:22 PM Mason Chen  wrote:

> Hi Andrew,
>
> I misread the docs: `register.producer.metrics` is mentioned here, but it
> is not on by default.
> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/#kafka-connector-metrics
>
> Best,
> Mason
>
> On Mon, Feb 6, 2023 at 6:19 PM Mason Chen  wrote:
>
>> Hi Andrew,
>>
>> Unfortunately, the functionality is undocumented, but you can set the
>> property `register.producer.metrics` to true in your Kafka client
>> properties map. This is a JIRA to document the feature:
>> https://issues.apache.org/jira/browse/FLINK-30932
>>
>> Best,
>> Mason
>>
>> On Mon, Feb 6, 2023 at 11:49 AM Andrew Otto  wrote:
>>
>>> Hi!
>>>
>>> Kafka Source will emit KafkaConsumer metrics
>>> <https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/#kafka-consumer-metrics>
>>> .
>>>
>>> It looks like Kafka Sink
>>> <https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/#monitoring-1>
>>> does not emit KafkaProducer metrics
>>> <https://kafka.apache.org/documentation/#producer_monitoring>.  Is this
>>> correct?  If so, why not?
>>>
>>> Thanks,
>>> -Andrew Otto
>>>  Wikimedia Foundation
>>>
>>


Kafka Sink Kafka Producer metrics?

2023-02-06 Thread Andrew Otto
Hi!

Kafka Source will emit KafkaConsumer metrics
<https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/#kafka-consumer-metrics>
.

It looks like Kafka Sink
<https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/#monitoring-1>
does not emit KafkaProducer metrics
<https://kafka.apache.org/documentation/#producer_monitoring>.  Is this
correct?  If so, why not?

Thanks,
-Andrew Otto
 Wikimedia Foundation


Re: Using pyflink from flink distribution

2023-01-31 Thread Andrew Otto
Great, thank you so much for your responses.  It all makes sense now. :)

On Mon, Jan 30, 2023 at 10:41 PM Dian Fu  wrote:

> >> What is the reason for including
> opt/python/{pyflink.zip,cloudpickle.zip,py4j.zip} in the base
> distribution then?  Oh, a guess: to make it easier for TaskManagers to run
> pyflink without having pyflink installed themselves?  Somehow I'd guess
> this wouldn't work tho; I'd assume TaskManagers would also need some python
> transitive dependencies, e.g. google protobuf.
>
> It has some historical reasons. In the first version (1.9.x) which has not
> provided Python UDF support, it's not necessary to install PyFlink in the
> nodes of TaskManagers. Since 1.10 which supports Python UDF, users have to
> install PyFlink in the nodes of TaskManager as there are many transitive
> dependencies, e.g. Apache Beam、protobuf、pandas, etc. However, we have not
> removed these packages as they are still useful for client node which is
> responsible for compiling jobs(it's not necessary to install PyFlink in the
> client node).
>
> >> Since we're building our own Docker image, I'm going the other way
> around: just install pyflink, and symlink /opt/flink ->
> /usr/lib/python3.7/dist-packages/pyflink.  So far so good, but I'm
> worried that something will be fishy when trying to run JVM apps via
> pyflink.
>
> Good idea! It contains all the things necessary needed to run JVM apps in
> the PyFlink package and so I think you could just try this way.
>
> Regards,
> Dian
>
> On Mon, Jan 30, 2023 at 9:58 PM Andrew Otto  wrote:
>
>> Thanks Dian!
>>
>> > >> Is using pyflink from the flink distribution tarball (without pip)
>> not a supported way to use pyflink?
>> > You are right.
>>
>> What is the reason for including
>> opt/python/{pyflink.zip,cloudpickle.zip,py4j.zip} in the base
>> distribution then?  Oh, a guess: to make it easier for TaskManagers to run
>> pyflink without having pyflink installed themselves?  Somehow I'd guess
>> this wouldn't work tho; I'd assume TaskManagers would also need some python
>> transitive dependencies, e.g. google protobuf.
>>
>> > you could remove the JAR packages located under
>> /usr/local/lib/python3.7/dist-packages/pyflink/lib manually after `pip
>> install apache-flink`
>>
>> Since we're building our own Docker image, I'm going the other way
>> around: just install pyflink, and symlink /opt/flink ->
>> /usr/lib/python3.7/dist-packages/pyflink.  So far so good, but I'm worried
>> that something will be fishy when trying to run JVM apps via pyflink.
>>
>> -Ao
>>
>>
>>
>> On Sun, Jan 29, 2023 at 1:43 AM Dian Fu  wrote:
>>
>>> Hi Andrew,
>>>
>>> >> By pip installing apache-flink, this docker image will have the flink
>>> distro installed at /opt/flink and FLINK_HOME set to /opt/flink
>>> <https://github.com/apache/flink-docker/blob/master/1.16/scala_2.12-java11-ubuntu/Dockerfile>.
>>> BUT ALSO flink lib jars will be installed at e.g.
>>> /usr/local/lib/python3.7/dist-packages/pyflink/lib!
>>> So, by following those instructions, flink is effectively installed
>>> twice into the docker image.
>>>
>>> Yes, your understanding is correct. The base image `flink:1.15.2`
>>> doesn't include PyFlink and so you need to build a custom image if you want
>>> to use PyFlink. Regarding to the jar packages which are installed twice,
>>> you could remove the JAR packages located under
>>> /usr/local/lib/python3.7/dist-packages/pyflink/lib manually after `pip
>>> install apache-flink`. It will use the JAR packages located under
>>> $FLINK_HOME/lib.
>>>
>>> >> Is using pyflink from the flink distribution tarball (without pip)
>>> not a supported way to use pyflink?
>>> You are right.
>>>
>>> Regards,
>>> Dian
>>>
>>>
>>> On Thu, Jan 26, 2023 at 11:12 PM Andrew Otto  wrote:
>>>
>>>> Ah, oops and my original email had a typo:
>>>> > Some python dependencies are not included in the flink distribution
>>>> tarballs: cloudpickle, py4j and pyflink are in opt/python.
>>>>
>>>> Should read:
>>>> > Some python dependencies ARE included in the flink distribution
>>>> tarballs: cloudpickle, py4j and pyflink are in opt/python.
>>>>
>>>> On Thu, Jan 26, 2023 at 10:10 AM Andrew Otto 
>>>> wrote:
>>>>
>>>>> Let me ask a related question:
>>>>>
>>>>> We are buil

Re: Using pyflink from flink distribution

2023-01-30 Thread Andrew Otto
Thanks Dian!

> >> Is using pyflink from the flink distribution tarball (without pip) not
a supported way to use pyflink?
> You are right.

What is the reason for including
opt/python/{pyflink.zip,cloudpickle.zip,py4j.zip} in the base distribution
then?  Oh, a guess: to make it easier for TaskManagers to run
pyflink without having pyflink installed themselves?  Somehow I'd guess
this wouldn't work tho; I'd assume TaskManagers would also need some python
transitive dependencies, e.g. google protobuf.

> you could remove the JAR packages located under
/usr/local/lib/python3.7/dist-packages/pyflink/lib manually after `pip
install apache-flink`

Since we're building our own Docker image, I'm going the other way around:
just install pyflink, and symlink /opt/flink ->
/usr/lib/python3.7/dist-packages/pyflink.  So far so good, but I'm worried
that something will be fishy when trying to run JVM apps via pyflink.

-Ao



On Sun, Jan 29, 2023 at 1:43 AM Dian Fu  wrote:

> Hi Andrew,
>
> >> By pip installing apache-flink, this docker image will have the flink
> distro installed at /opt/flink and FLINK_HOME set to /opt/flink
> <https://github.com/apache/flink-docker/blob/master/1.16/scala_2.12-java11-ubuntu/Dockerfile>.
> BUT ALSO flink lib jars will be installed at e.g.
> /usr/local/lib/python3.7/dist-packages/pyflink/lib!
> So, by following those instructions, flink is effectively installed twice
> into the docker image.
>
> Yes, your understanding is correct. The base image `flink:1.15.2` doesn't
> include PyFlink and so you need to build a custom image if you want to use
> PyFlink. Regarding to the jar packages which are installed twice, you could
> remove the JAR packages located under
> /usr/local/lib/python3.7/dist-packages/pyflink/lib manually after `pip
> install apache-flink`. It will use the JAR packages located under
> $FLINK_HOME/lib.
>
> >> Is using pyflink from the flink distribution tarball (without pip) not
> a supported way to use pyflink?
> You are right.
>
> Regards,
> Dian
>
>
> On Thu, Jan 26, 2023 at 11:12 PM Andrew Otto  wrote:
>
>> Ah, oops and my original email had a typo:
>> > Some python dependencies are not included in the flink distribution
>> tarballs: cloudpickle, py4j and pyflink are in opt/python.
>>
>> Should read:
>> > Some python dependencies ARE included in the flink distribution
>> tarballs: cloudpickle, py4j and pyflink are in opt/python.
>>
>> On Thu, Jan 26, 2023 at 10:10 AM Andrew Otto  wrote:
>>
>>> Let me ask a related question:
>>>
>>> We are building our own base Flink docker image.  We will be deploying
>>> both JVM and python apps via flink-kubernetes-operator.
>>>
>>> Is there any reason not to install Flink in this image via `pip install
>>> apache-flink` and use it for JVM apps?
>>>
>>> -Andrew Otto
>>>  Wikimedia Foundation
>>>
>>>
>>>
>>> On Tue, Jan 24, 2023 at 4:26 PM Andrew Otto  wrote:
>>>
>>>> Hello,
>>>>
>>>> I'm having quite a bit of trouble running pyflink from the default
>>>> flink distribution tarballs.  I'd expect the python examples to work as
>>>> long as python is installed, and we've got the distribution.  Some python
>>>> dependencies are not included in the flink distribution tarballs:
>>>> cloudpickle, py4j and pyflink are in opt/python.  Others are not, e.g.
>>>> protobuf.
>>>>
>>>> Now that I'm looking, I see that the pyflink installation instructions
>>>> <https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/python/installation/>
>>>>  are
>>>> to install via pip.
>>>>
>>>> I'm doing this in Docker for use with the flink-kubernetes-operator.
>>>> In the Using Flink Python on Docker
>>>> <https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/resource-providers/standalone/docker/#using-flink-python-on-docker>
>>>>  instructions,
>>>> there is a pip3 install apache-flink step.  I find this strange, since I'd
>>>> expect the 'FROM flink:1.15.2'  part to be sufficient.
>>>>
>>>> By pip installing apache-flink, this docker image will have the flink
>>>> distro installed at /opt/flink and FLINK_HOME set to /opt/flink
>>>> <https://github.com/apache/flink-docker/blob/master/1.16/scala_2.12-java11-ubuntu/Dockerfile>.
>>>> BUT ALSO flink lib jars will be installed at e.g.
>>>> /usr/local/lib/python3.7/dist-packages/pyflink/lib!
>>>> So, by following those instructions, flink is effectively installed
>>>> twice into the docker image.
>>>>
>>>> Am I correct or am I missing something?
>>>>
>>>> Is using pyflink from the flink distribution tarball (without pip) not
>>>> a supported way to use pyflink?
>>>>
>>>> Thanks!
>>>> -Andrew Otto
>>>>  Wikimedia Foundation
>>>>
>>>>


Re: Which flink version is compatible with beam

2023-01-30 Thread Andrew Otto
Hi, I'm not sure about beam, but Flink is not officially compatible with
python3.10.

https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/python/installation/

> Python version (3.6, 3.7 or 3.8) is required for PyFlink.


On Fri, Jan 27, 2023 at 11:50 PM P Singh 
wrote:

> Hi,
>
> It’s not working with flink 1.14 and beam 2.44 or 2.43 with python 3.10.
>
> Please suggest.
>
> Get Outlook for iOS 
> --
> *From:* Yaroslav Tkachenko 
> *Sent:* Friday, January 27, 2023 10:53:49 PM
> *To:* P Singh 
> *Cc:* user@flink.apache.org 
> *Subject:* Re: Which flink version is compatible with beam
>
> Hi! According to this
> https://beam.apache.org/documentation/runners/flink/#flink-version-compatibility,
> 1.14 is the latest supported version.
>
> On Fri, Jan 27, 2023 at 9:19 AM P Singh 
> wrote:
>
> Hi Team,
>
> I am trying to run apache beam pipeline on flink cluster. I have set up
> kubernetes locally with flink1.16and apache/beam_python3.10_sdk:2.44.0.
> When I submit the job using like
>
> python file.py
>
> Job is just hang not able to see on flink UI or logs.
>
> Can you please suggest compatible versions?
>
>
> Looking forward to hearing from you.
>
>


Re: Using pyflink from flink distribution

2023-01-26 Thread Andrew Otto
Ah, oops and my original email had a typo:
> Some python dependencies are not included in the flink distribution
tarballs: cloudpickle, py4j and pyflink are in opt/python.

Should read:
> Some python dependencies ARE included in the flink distribution tarballs:
cloudpickle, py4j and pyflink are in opt/python.

On Thu, Jan 26, 2023 at 10:10 AM Andrew Otto  wrote:

> Let me ask a related question:
>
> We are building our own base Flink docker image.  We will be deploying
> both JVM and python apps via flink-kubernetes-operator.
>
> Is there any reason not to install Flink in this image via `pip install
> apache-flink` and use it for JVM apps?
>
> -Andrew Otto
>  Wikimedia Foundation
>
>
>
> On Tue, Jan 24, 2023 at 4:26 PM Andrew Otto  wrote:
>
>> Hello,
>>
>> I'm having quite a bit of trouble running pyflink from the default flink
>> distribution tarballs.  I'd expect the python examples to work as long as
>> python is installed, and we've got the distribution.  Some python
>> dependencies are not included in the flink distribution tarballs:
>> cloudpickle, py4j and pyflink are in opt/python.  Others are not, e.g.
>> protobuf.
>>
>> Now that I'm looking, I see that the pyflink installation instructions
>> <https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/python/installation/>
>>  are
>> to install via pip.
>>
>> I'm doing this in Docker for use with the flink-kubernetes-operator.  In
>> the Using Flink Python on Docker
>> <https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/resource-providers/standalone/docker/#using-flink-python-on-docker>
>>  instructions,
>> there is a pip3 install apache-flink step.  I find this strange, since I'd
>> expect the 'FROM flink:1.15.2'  part to be sufficient.
>>
>> By pip installing apache-flink, this docker image will have the flink
>> distro installed at /opt/flink and FLINK_HOME set to /opt/flink
>> <https://github.com/apache/flink-docker/blob/master/1.16/scala_2.12-java11-ubuntu/Dockerfile>.
>> BUT ALSO flink lib jars will be installed at e.g.
>> /usr/local/lib/python3.7/dist-packages/pyflink/lib!
>> So, by following those instructions, flink is effectively installed twice
>> into the docker image.
>>
>> Am I correct or am I missing something?
>>
>> Is using pyflink from the flink distribution tarball (without pip) not a
>> supported way to use pyflink?
>>
>> Thanks!
>> -Andrew Otto
>>  Wikimedia Foundation
>>
>>


Re: Using pyflink from flink distribution

2023-01-26 Thread Andrew Otto
Let me ask a related question:

We are building our own base Flink docker image.  We will be deploying both
JVM and python apps via flink-kubernetes-operator.

Is there any reason not to install Flink in this image via `pip install
apache-flink` and use it for JVM apps?

-Andrew Otto
 Wikimedia Foundation



On Tue, Jan 24, 2023 at 4:26 PM Andrew Otto  wrote:

> Hello,
>
> I'm having quite a bit of trouble running pyflink from the default flink
> distribution tarballs.  I'd expect the python examples to work as long as
> python is installed, and we've got the distribution.  Some python
> dependencies are not included in the flink distribution tarballs:
> cloudpickle, py4j and pyflink are in opt/python.  Others are not, e.g.
> protobuf.
>
> Now that I'm looking, I see that the pyflink installation instructions
> <https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/python/installation/>
>  are
> to install via pip.
>
> I'm doing this in Docker for use with the flink-kubernetes-operator.  In
> the Using Flink Python on Docker
> <https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/resource-providers/standalone/docker/#using-flink-python-on-docker>
>  instructions,
> there is a pip3 install apache-flink step.  I find this strange, since I'd
> expect the 'FROM flink:1.15.2'  part to be sufficient.
>
> By pip installing apache-flink, this docker image will have the flink
> distro installed at /opt/flink and FLINK_HOME set to /opt/flink
> <https://github.com/apache/flink-docker/blob/master/1.16/scala_2.12-java11-ubuntu/Dockerfile>.
> BUT ALSO flink lib jars will be installed at e.g.
> /usr/local/lib/python3.7/dist-packages/pyflink/lib!
> So, by following those instructions, flink is effectively installed twice
> into the docker image.
>
> Am I correct or am I missing something?
>
> Is using pyflink from the flink distribution tarball (without pip) not a
> supported way to use pyflink?
>
> Thanks!
> -Andrew Otto
>  Wikimedia Foundation
>
>


Using pyflink from flink distribution

2023-01-24 Thread Andrew Otto
Hello,

I'm having quite a bit of trouble running pyflink from the default flink
distribution tarballs.  I'd expect the python examples to work as long as
python is installed, and we've got the distribution.  Some python
dependencies are not included in the flink distribution tarballs:
cloudpickle, py4j and pyflink are in opt/python.  Others are not, e.g.
protobuf.

Now that I'm looking, I see that the pyflink installation instructions
<https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/python/installation/>
are
to install via pip.

I'm doing this in Docker for use with the flink-kubernetes-operator.  In
the Using Flink Python on Docker
<https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/resource-providers/standalone/docker/#using-flink-python-on-docker>
instructions,
there is a pip3 install apache-flink step.  I find this strange, since I'd
expect the 'FROM flink:1.15.2'  part to be sufficient.

By pip installing apache-flink, this docker image will have the flink
distro installed at /opt/flink and FLINK_HOME set to /opt/flink
<https://github.com/apache/flink-docker/blob/master/1.16/scala_2.12-java11-ubuntu/Dockerfile>.
BUT ALSO flink lib jars will be installed at e.g.
/usr/local/lib/python3.7/dist-packages/pyflink/lib!
So, by following those instructions, flink is effectively installed twice
into the docker image.

Am I correct or am I missing something?

Is using pyflink from the flink distribution tarball (without pip) not a
supported way to use pyflink?

Thanks!
-Andrew Otto
 Wikimedia Foundation


Re: Flink Kubernetes Operator podTemplate and 'app' pod label bug?

2023-01-23 Thread Andrew Otto
Thanks all, I'm using other labels instead.  Specifically, I'm using the
component label to select the pods I need for my networkpolicies.

- I agree that it would probably be best if flink k8s native did not use
this label.

- It would be nice if there was a common label applied to all pods created
by flink and flink kubernetes operator.  I tried to bikeshed one but didn'
come up with anything great.  The app label as is doesn't work because it
appends the helm release name.  something like 'engine: flink'?  Not sure.

Anyway, thank you!


On Fri, Jan 20, 2023 at 2:46 AM Gyula Fóra  wrote:

> To clarify this logic is inherited from the Flink Native Kubernetes
> integration itself. The operator specific labels we use are already fully
> qualified.
> I agree that this could be improved in Flink by a better label.
>
> Cheers,
> Gyula
>
> On Thu, Jan 19, 2023 at 11:00 PM Mason Chen 
> wrote:
>
>> @Andrew I was also confused by this earlier and FYI this line where it is
>> referenced
>> https://github.com/apache/flink-kubernetes-operator/blame/7d5bf9536bdfbf86de5803766b28e503cd32ee04/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/utils/StandaloneKubernetesUtils.java#L43
>>
>> On Thu, Jan 19, 2023 at 1:59 PM Őrhidi Mátyás 
>> wrote:
>>
>>> On a side note, we should probably use a qualified label name instead of
>>> the pretty common app here. WDYT Gyula?
>>>
>>> On Thu, Jan 19, 2023 at 1:48 PM Gyula Fóra  wrote:
>>>
>>>> Hi!
>>>>
>>>> The app label itself is used by Flink internally for a different
>>>> purpose so it’s overriden. This is completely expected.
>>>>
>>>> I think it would be better to use some other label :)
>>>>
>>>> Cheers,
>>>> Gyula
>>>>
>>>> On Thu, 19 Jan 2023 at 19:02, Andrew Otto  wrote:
>>>>
>>>>> Hello!
>>>>>
>>>>> I'm seeing an unexpected label value assignment happening, and I'm not
>>>>> sure how it's happening.  It is possible it is in my own helm charts and
>>>>> templates somewhere, but I'm not seeing it, so I'm beginning to think this
>>>>> is happening in the FlinkDeployment CRD in the operator code somewhere.
>>>>>
>>>>> I'm using FlinkDeployment podTemplate
>>>>> <https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/custom-resource/pod-template/>
>>>>> to add an 'app' label:
>>>>>
>>>>>  podTemplate:
>>>>> apiVersion: v1
>>>>> kind: Pod
>>>>> metadata:
>>>>>   labels:
>>>>> app: flink-app
>>>>> release: flink-example
>>>>> ...
>>>>>
>>>>> I also have this app label set in the FlinkDeployment labels:
>>>>>
>>>>> kind: FlinkDeployment
>>>>> metadata:
>>>>>   name: flink-app-flink-example
>>>>>   labels:
>>>>> app: flink-app
>>>>> chart: flink-app-0.1.1
>>>>> release: flink-example
>>>>>
>>>>> Since I've set app: flink-app in the podTemplate, I would expect all
>>>>> pods to get this label.  The FlinkDeployment resource has this label
>>>>> value as expected.  However, I see that in the pods, as well as the
>>>>> Deployment that are created by FlinkDeployment:
>>>>>
>>>>> *$ kubectl -n flink-app0 describe deployments flink-app-flink-example*
>>>>> ...
>>>>> Name:   flink-app-flink-example
>>>>> Namespace:  flink-app0
>>>>> CreationTimestamp:  Thu, 19 Jan 2023 12:42:05 -0500
>>>>> Labels: app=flink-app-flink-example
>>>>> component=jobmanager
>>>>> ...
>>>>>
>>>>> Pod Template:
>>>>>   Labels:   app=flink-app-flink-example
>>>>> component=jobmanager
>>>>> release=flink-example
>>>>> ...
>>>>>
>>>>>
>>>>> *$ kubectl -n flink-app0 describe pod
>>>>> flink-app-flink-example-d974cb595-788ch*
>>>>> ...
>>>>> Labels:   app=flink-app-flink-example
>>>>>   component=jobmanager
>>>>>   pod-template-hash=d974cb595
>>>>>   release=flink-example
>>>>> ...
>>>>>
>>>>>
>>>>> I'd expect the app label to be 'flink-app' for at least the Deployment
>>>>> PodTemplate and the Pod, if not the Deployment itself too.
>>>>>
>>>>> Something is overriding the app label in podTemplate, and I don't
>>>>> think it's my chart or installation.  I looked in 
>>>>> flink-kubernetes-operator
>>>>> code and I didn't find where this was happening either.  I am not setting
>>>>> e.g. kubernetes.jobmanager.labels
>>>>> <https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/config/#kubernetes-jobmanager-labels>
>>>>> .
>>>>>
>>>>> Is this expected?
>>>>>
>>>>> Thank you!
>>>>>
>>>>> -Andrew Otto
>>>>>  Wikimedia Foundation
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>


Flink Kubernetes Operator podTemplate and 'app' pod label bug?

2023-01-19 Thread Andrew Otto
Hello!

I'm seeing an unexpected label value assignment happening, and I'm not sure
how it's happening.  It is possible it is in my own helm charts and
templates somewhere, but I'm not seeing it, so I'm beginning to think this
is happening in the FlinkDeployment CRD in the operator code somewhere.

I'm using FlinkDeployment podTemplate
<https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/custom-resource/pod-template/>
to add an 'app' label:

 podTemplate:
apiVersion: v1
kind: Pod
metadata:
  labels:
app: flink-app
release: flink-example
...

I also have this app label set in the FlinkDeployment labels:

kind: FlinkDeployment
metadata:
  name: flink-app-flink-example
  labels:
app: flink-app
chart: flink-app-0.1.1
release: flink-example

Since I've set app: flink-app in the podTemplate, I would expect all pods
to get this label.  The FlinkDeployment resource has this label value as
expected.  However, I see that in the pods, as well as the Deployment that
are created by FlinkDeployment:

*$ kubectl -n flink-app0 describe deployments flink-app-flink-example*
...
Name:   flink-app-flink-example
Namespace:  flink-app0
CreationTimestamp:  Thu, 19 Jan 2023 12:42:05 -0500
Labels: app=flink-app-flink-example
component=jobmanager
...

Pod Template:
  Labels:   app=flink-app-flink-example
component=jobmanager
release=flink-example
...


*$ kubectl -n flink-app0 describe pod
flink-app-flink-example-d974cb595-788ch*
...
Labels:   app=flink-app-flink-example
  component=jobmanager
  pod-template-hash=d974cb595
  release=flink-example
...


I'd expect the app label to be 'flink-app' for at least the Deployment
PodTemplate and the Pod, if not the Deployment itself too.

Something is overriding the app label in podTemplate, and I don't think
it's my chart or installation.  I looked in flink-kubernetes-operator code
and I didn't find where this was happening either.  I am not setting e.g.
kubernetes.jobmanager.labels
<https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/config/#kubernetes-jobmanager-labels>
.

Is this expected?

Thank you!

-Andrew Otto
 Wikimedia Foundation


Re: What is the flink-kubernetes-operator webhook for?

2022-12-09 Thread Andrew Otto
Okay, thank you both.  We will disable webhook creation unless we end up
needing it.



On Fri, Dec 9, 2022 at 9:39 AM Gyula Fóra  wrote:

> To add to what Matyas said:
>
> Validation in itself is a mandatory step for every spec change that is
> submitted to guard against broken configs (things like negative parallelism
> etc).
>
> But validation can happen in 2 places. It can be done through the webhook,
> which would result in upfront rejection of the spec on validation error.
>
> Or it can happen during regular processing/reconciliation process in which
> case errors are recorded in the status .
>
> The webhook is nice way to get validation error’s immediately but as you
> see it’s not necessary as validation would happen anyways .
>
> Gyula
>
> On Fri, 9 Dec 2022 at 09:21, Őrhidi Mátyás 
> wrote:
>
>> Hi Otto,
>>
>> webhooks in general are optional components of the k8s operator pattern.
>> Mostly used for validation, sometimes for changing custom resources and
>> handling multiple versions, etc. It's an optional component in the Flink
>> Kubernetes Operator too.
>>
>> Regards,
>> Matyas
>>
>> On Fri, Dec 9, 2022 at 5:59 AM Andrew Otto  wrote:
>>
>>> Hello!
>>>
>>> What is the Flink Kubernetes Webhook
>>> <https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/concepts/architecture/#admission-control>
>>> for?  I probably don't know just because I don't know k8s that well, but
>>> reading code and other docs didn't particular enlighten me :)
>>>
>>> It looks like maybe its for doing some extra validation of k8s API
>>> requests, and allows you to customize how those requests are validated and
>>> processed if you have special requirements to do so.
>>>
>>> Since it can be so easily disabled
>>> <https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/try-flink-kubernetes-operator/quick-start/#deploying-the-operator>,
>>> do we need to install it for production use?  FWIW, we will not be using
>>> FlinkSessionJob, so perhaps we don't need it if we don't use that?
>>>
>>> Thanks!
>>> -Andrew Otto
>>>  Wikimedia Foundation
>>>
>>


What is the flink-kubernetes-operator webhook for?

2022-12-09 Thread Andrew Otto
Hello!

What is the Flink Kubernetes Webhook
<https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/concepts/architecture/#admission-control>
for?  I probably don't know just because I don't know k8s that well, but
reading code and other docs didn't particular enlighten me :)

It looks like maybe its for doing some extra validation of k8s API
requests, and allows you to customize how those requests are validated and
processed if you have special requirements to do so.

Since it can be so easily disabled
<https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/try-flink-kubernetes-operator/quick-start/#deploying-the-operator>,
do we need to install it for production use?  FWIW, we will not be using
FlinkSessionJob, so perhaps we don't need it if we don't use that?

Thanks!
-Andrew Otto
 Wikimedia Foundation


Re: flink-kubernetes-operator: image entrypoint misbehaves due to inability to write

2022-12-01 Thread Andrew Otto
Ah, got it.  Thanks!

On Thu, Dec 1, 2022 at 11:34 AM Gyula Fóra  wrote:

> As I also mentioned in the email, this is on our roadmap for the operator
> but we have not implemented it yet because this feature only became
> available as of Flink 1.16.
>
> Ideally in the operator FlinkDeployment spec.flinkConfiguration section
> the user should be able to use env vars if this is added.
>
> Gyula
>
> On Thu, Dec 1, 2022 at 5:18 PM Andrew Otto  wrote:
>
>> > Andrew please see my previous response, that covers the secrets case.
>> > kubernetes.jobmanager.entrypoint.args: -D
>> datadog.secret.conf=$MY_SECRET_ENV
>>
>> This way^?  Ya that makes sense.  It'd be nice if there was a way to get
>> Secrets into the values used for rendering flink-conf.yaml too, so the
>> confs will be all in the same place.
>>
>>
>>
>>
>>
>> On Thu, Dec 1, 2022 at 9:30 AM Gyula Fóra  wrote:
>>
>>> Andrew please see my previous response, that covers the secrets case.
>>>
>>> Gyula
>>>
>>> On Thu, Dec 1, 2022 at 2:54 PM Andrew Otto  wrote:
>>>
>>>> > several failures to write into $FLINK_HOME/conf/.
>>>> I'm working on
>>>> <https://gerrit.wikimedia.org/r/c/operations/docker-images/production-images/+/858356/>
>>>> building Flink and flink-kubernetes-operator images for the Wikimedia
>>>> Foundation, and I found this strange as well.  It makes sense in a docker /
>>>> docker-compose only environment, but in k8s where you have ConfigMap
>>>> responsible for flink-conf.yaml, and (also logs all going to the console,
>>>> not FLINK_HOME/log), I'd prefer if the image was not modified by the
>>>> ENTRYPOINT.
>>>>
>>>> I believe that for flink-kubernetes-operator, the docker-entrypoint.sh
>>>> <https://github.com/apache/flink-docker/blob/master/1.16/scala_2.12-java11-ubuntu/docker-entrypoint.sh>
>>>> provided by flink-docker is not really needed.  It seems to be written more
>>>> for deployments outside of kubernetes.
>>>>  flink-kubernetes-operator never calls the built in subcommands (e.g.
>>>> standalone-job), and always runs in 'pass-through' mode, just execing the
>>>> args passed to it.  At WMF we build
>>>> <https://doc.wikimedia.org/docker-pkg/> our own images, so I'm
>>>> planning on removing all of the stuff in ENTRYPOINTs that mangles the
>>>> image.  Anything that I might want to keep from docker-entrypoint.sh (like 
>>>> enabling
>>>> jemoalloc
>>>> <https://gerrit.wikimedia.org/r/c/operations/docker-images/production-images/+/858356/6/images/flink/Dockerfile.template#73>)
>>>> I should be able to do in the Dockerfile at image creation time.
>>>>
>>>> >  want to set an API key as part of the flink-conf.yaml file, but we
>>>> don't want it to be persisted in Kubernetes or in our version control
>>>> I personally am still pretty green at k8s, but would using kubernetes
>>>> Secrets
>>>> <https://kubernetes.io/docs/concepts/configuration/secret/#use-case-secret-visible-to-one-container-in-a-pod>
>>>> work for your use case? I know we use them at WMF, but from a quick glance
>>>> I'm not sure how to combine them in flink-kubernetes-operator's ConfigMap
>>>> that renders flink-conf.yaml, but I feel like there should be a way.
>>>>
>>>>
>>>>
>>>>
>>>> On Wed, Nov 30, 2022 at 4:59 PM Gyula Fóra 
>>>> wrote:
>>>>
>>>>> Hi Lucas!
>>>>>
>>>>> The Flink kubernetes integration itself is responsible for mounting
>>>>> the configmap and overwriting the entrypoint not the operator. Therefore
>>>>> this is not something we can easily change from the operator side. However
>>>>> I think we are looking at the problem from the wrong side and there may be
>>>>> a solution already :)
>>>>>
>>>>> Ideally what you want is ENV replacement in Flink configuration. This
>>>>> is not something that the Flink community has added yet unfortunately but
>>>>> we have it on our radar for the operator at least (
>>>>> https://issues.apache.org/jira/browse/FLINK-27491). It will probably
>>>>> be added in the next 1.4.0 version.
>>>>>
>>>>> This will be possible from Flink 1.16 which introduced a small feature
>>>>> that allows us to inject parameter

Re: flink-kubernetes-operator: image entrypoint misbehaves due to inability to write

2022-12-01 Thread Andrew Otto
> Andrew please see my previous response, that covers the secrets case.
> kubernetes.jobmanager.entrypoint.args: -D
datadog.secret.conf=$MY_SECRET_ENV

This way^?  Ya that makes sense.  It'd be nice if there was a way to get
Secrets into the values used for rendering flink-conf.yaml too, so the
confs will be all in the same place.





On Thu, Dec 1, 2022 at 9:30 AM Gyula Fóra  wrote:

> Andrew please see my previous response, that covers the secrets case.
>
> Gyula
>
> On Thu, Dec 1, 2022 at 2:54 PM Andrew Otto  wrote:
>
>> > several failures to write into $FLINK_HOME/conf/.
>> I'm working on
>> <https://gerrit.wikimedia.org/r/c/operations/docker-images/production-images/+/858356/>
>> building Flink and flink-kubernetes-operator images for the Wikimedia
>> Foundation, and I found this strange as well.  It makes sense in a docker /
>> docker-compose only environment, but in k8s where you have ConfigMap
>> responsible for flink-conf.yaml, and (also logs all going to the console,
>> not FLINK_HOME/log), I'd prefer if the image was not modified by the
>> ENTRYPOINT.
>>
>> I believe that for flink-kubernetes-operator, the docker-entrypoint.sh
>> <https://github.com/apache/flink-docker/blob/master/1.16/scala_2.12-java11-ubuntu/docker-entrypoint.sh>
>> provided by flink-docker is not really needed.  It seems to be written more
>> for deployments outside of kubernetes.
>>  flink-kubernetes-operator never calls the built in subcommands (e.g.
>> standalone-job), and always runs in 'pass-through' mode, just execing the
>> args passed to it.  At WMF we build
>> <https://doc.wikimedia.org/docker-pkg/> our own images, so I'm planning
>> on removing all of the stuff in ENTRYPOINTs that mangles the image.
>> Anything that I might want to keep from docker-entrypoint.sh (like enabling
>> jemoalloc
>> <https://gerrit.wikimedia.org/r/c/operations/docker-images/production-images/+/858356/6/images/flink/Dockerfile.template#73>)
>> I should be able to do in the Dockerfile at image creation time.
>>
>> >  want to set an API key as part of the flink-conf.yaml file, but we
>> don't want it to be persisted in Kubernetes or in our version control
>> I personally am still pretty green at k8s, but would using kubernetes
>> Secrets
>> <https://kubernetes.io/docs/concepts/configuration/secret/#use-case-secret-visible-to-one-container-in-a-pod>
>> work for your use case? I know we use them at WMF, but from a quick glance
>> I'm not sure how to combine them in flink-kubernetes-operator's ConfigMap
>> that renders flink-conf.yaml, but I feel like there should be a way.
>>
>>
>>
>>
>> On Wed, Nov 30, 2022 at 4:59 PM Gyula Fóra  wrote:
>>
>>> Hi Lucas!
>>>
>>> The Flink kubernetes integration itself is responsible for mounting the
>>> configmap and overwriting the entrypoint not the operator. Therefore this
>>> is not something we can easily change from the operator side. However I
>>> think we are looking at the problem from the wrong side and there may be a
>>> solution already :)
>>>
>>> Ideally what you want is ENV replacement in Flink configuration. This is
>>> not something that the Flink community has added yet unfortunately but we
>>> have it on our radar for the operator at least (
>>> https://issues.apache.org/jira/browse/FLINK-27491). It will probably be
>>> added in the next 1.4.0 version.
>>>
>>> This will be possible from Flink 1.16 which introduced a small feature
>>> that allows us to inject parameters to the kubernetes entrypoints:
>>> https://issues.apache.org/jira/browse/FLINK-29123
>>>
>>> https://github.com/apache/flink/commit/c37643031dca2e6d4c299c0d704081a8bffece1d
>>>
>>> While it's not implemented in the operator yet, you could try setting
>>> the following config in Flink 1.16.0:
>>> kubernetes.jobmanager.entrypoint.args: -D
>>> datadog.secret.conf=$MY_SECRET_ENV
>>> kubernetes.taskmanager.entrypoint.args: -D
>>> datadog.secret.conf=$MY_SECRET_ENV
>>>
>>> If you use this configuration together with the default native mode in
>>> the operator, it should work I believe.
>>>
>>> Please try and let me know!
>>> Gyula
>>>
>>> On Wed, Nov 30, 2022 at 10:36 PM Lucas Caparelli <
>>> lucas.capare...@gympass.com> wrote:
>>>
>>>> Hello folks,
>>>>
>>>> Not sure if this is the best list for this, sorry if it isn't. I'd
>>>> appreciate some pointers :-)
>>>&g

Re: flink-kubernetes-operator: image entrypoint misbehaves due to inability to write

2022-12-01 Thread Andrew Otto
> several failures to write into $FLINK_HOME/conf/.
I'm working on

building Flink and flink-kubernetes-operator images for the Wikimedia
Foundation, and I found this strange as well.  It makes sense in a docker /
docker-compose only environment, but in k8s where you have ConfigMap
responsible for flink-conf.yaml, and (also logs all going to the console,
not FLINK_HOME/log), I'd prefer if the image was not modified by the
ENTRYPOINT.

I believe that for flink-kubernetes-operator, the docker-entrypoint.sh

provided by flink-docker is not really needed.  It seems to be written more
for deployments outside of kubernetes.
 flink-kubernetes-operator never calls the built in subcommands (e.g.
standalone-job), and always runs in 'pass-through' mode, just execing the
args passed to it.  At WMF we build 
our own images, so I'm planning on removing all of the stuff in ENTRYPOINTs
that mangles the image.  Anything that I might want to keep from
docker-entrypoint.sh (like enabling jemoalloc
)
I should be able to do in the Dockerfile at image creation time.

>  want to set an API key as part of the flink-conf.yaml file, but we don't
want it to be persisted in Kubernetes or in our version control
I personally am still pretty green at k8s, but would using kubernetes
Secrets

work for your use case? I know we use them at WMF, but from a quick glance
I'm not sure how to combine them in flink-kubernetes-operator's ConfigMap
that renders flink-conf.yaml, but I feel like there should be a way.




On Wed, Nov 30, 2022 at 4:59 PM Gyula Fóra  wrote:

> Hi Lucas!
>
> The Flink kubernetes integration itself is responsible for mounting the
> configmap and overwriting the entrypoint not the operator. Therefore this
> is not something we can easily change from the operator side. However I
> think we are looking at the problem from the wrong side and there may be a
> solution already :)
>
> Ideally what you want is ENV replacement in Flink configuration. This is
> not something that the Flink community has added yet unfortunately but we
> have it on our radar for the operator at least (
> https://issues.apache.org/jira/browse/FLINK-27491). It will probably be
> added in the next 1.4.0 version.
>
> This will be possible from Flink 1.16 which introduced a small feature
> that allows us to inject parameters to the kubernetes entrypoints:
> https://issues.apache.org/jira/browse/FLINK-29123
>
> https://github.com/apache/flink/commit/c37643031dca2e6d4c299c0d704081a8bffece1d
>
> While it's not implemented in the operator yet, you could try setting the
> following config in Flink 1.16.0:
> kubernetes.jobmanager.entrypoint.args: -D
> datadog.secret.conf=$MY_SECRET_ENV
> kubernetes.taskmanager.entrypoint.args: -D
> datadog.secret.conf=$MY_SECRET_ENV
>
> If you use this configuration together with the default native mode in the
> operator, it should work I believe.
>
> Please try and let me know!
> Gyula
>
> On Wed, Nov 30, 2022 at 10:36 PM Lucas Caparelli <
> lucas.capare...@gympass.com> wrote:
>
>> Hello folks,
>>
>> Not sure if this is the best list for this, sorry if it isn't. I'd
>> appreciate some pointers :-)
>>
>> When using flink-kubernetes-operator [1], docker-entrypoint.sh [2] goes
>> through several failures to write into $FLINK_HOME/conf/. We believe this
>> is due to this volume being mounted from a ConfigMap, which means it's
>> read-only.
>>
>> This has been reported in the past in GCP's operator, but I was unable to
>> find any kind of resolution for it:
>> https://github.com/GoogleCloudPlatform/flink-on-k8s-operator/issues/213
>>
>> In our use case, we want to set an API key as part of the flink-conf.yaml
>> file, but we don't want it to be persisted in Kubernetes or in our version
>> control, since it's sensitive data. This API Key is used by Flink to report
>> metrics to Datadog [3].
>>
>> We have automation in place which allows us to accomplish this by setting
>> environment variables pointing to a path in our secret manager, which only
>> gets injected during runtime. That part is working fine.
>>
>> However, we're trying to inject this secret using the FLINK_PROPERTIES
>> variable, which is appended [4] to the flink-conf.yaml file in the
>> docker-entrypoint script, which fails due to the filesystem where the file
>> is being read-only.
>>
>> We attempted working around this in 2 different ways:
>>
>>   - providing our own .spec.containers[0].command, where we copied over
>> /opt/flink to /tmp/flink and set FLINK_HOME=/tmp/flink. This did not work
>> because the 

Re: Converting ResolvedSchema to JSON and Protobuf Schemas

2022-11-15 Thread Andrew Otto
> meaning that double and integer
I meant to write: "meaning that double and bigint ... "
:)

On Tue, Nov 15, 2022 at 8:54 AM Andrew Otto  wrote:

> > Also thanks for showing me your pattern with the SchemaConversions and
> stuff. Feels pretty clean and worked like a charm :)
> Glad to hear it, that is very cool!
>
> > converts number to double always. I wonder, did you make this up?
> Yes, we chose the the mapping.  We chose to do number -> double and
> integer -> bigint because both of those are wider than their float/int
> counterparts, meaning that double and integer will work in more cases.  Of
> course, this is not an optimal usage of bits, but at least things won't
> break.
>
> > all kinds of fields like double, float, big decimal… they all get
> mapped to number by my converter
> It is possible to make some non-JSONSchema convention in the JSONSchema to
> map to more specific types.  This is done for example with format:
> date-time in our code, to map from a ISO-8601 string to a timestamp.  I
> just did a quick google to find some example of someone else already doing
> this and found this doc from IBM
> <https://www.ibm.com/docs/en/cics-ts/5.3?topic=mapping-json-schema-c-c> saying
> they use JSONSchema's format to specify a float, like
>
>   type: number
>   format: float
>
> This seems like a pretty good idea to me, and we should probably do this
> at WMF too!  However, it would be a custom convention, and not in the
> JSONSchema spec itself, so when you convert back to a JSONSchema, you'd
> have to codify this convention to do so (and nothing outside of your code
> would really respect it).
>
>
>
>
>
>
> On Tue, Nov 15, 2022 at 4:23 AM Theodor Wübker 
> wrote:
>
>> Yes, you are right. Schemas are not so nice in Json. When implementing
>> and testing my converter from DataType to JsonSchema I noticed that your
>> converter from JsonSchema to DataType converts number to double always. I
>> wonder, did you make this up? Because the table that specifies the
>> mapping
>> <https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/formats/json/>
>>  only
>> does it for DataType -> JsonSchema.
>>
>> Its generally unfortunate that json schema only offers so little
>> possibility to specify type information… now when I have a Flink DataType
>> with all kinds of fields like double, float, big decimal… they all get
>> mapped to number by my converter - in return when I use yours they are all
>> mapped to a Flink Datatype double again. So I lose a lot of precision.
>>
>> I guess for my application it would in general be better to use Avro or
>> Protobuf, since they retain a lot more type information when you convert
>> them back and forth…
>> Also thanks for showing me your pattern with the SchemaConversions and
>> stuff. Feels pretty clean and worked like a charm :)
>>
>> -Theo
>>
>>
>> On 10. Nov 2022, at 15:02, Andrew Otto  wrote:
>>
>> >  I find it interesting that the Mapping from DataType to AvroSchema
>> does exist in Flink (see AvroSchemaConverter), but for all the other
>> formats there is no such Mapping,
>> Yah, but I guess for JSON, there isn't a clear 'schema' to be had.  There
>> of course is JSONSchema, but it isn't a real java-y type system; it's just
>> more JSON for which there exist validators.
>>
>>
>>
>> On Thu, Nov 10, 2022 at 2:12 AM Theodor Wübker <
>> theo.wueb...@inside-m2m.de> wrote:
>>
>>> Great, I will have a closer look at what you sent. Your idea seems very
>>> good, it would be a very clean solution to be able to plug in different
>>> SchemaConversions that a (Row) DataType can be mapped to. I will probably
>>> try to implement it like this. I find it interesting that the Mapping from
>>> DataType to AvroSchema does exist in Flink (see AvroSchemaConverter), but
>>> for all the other formats there is no such Mapping. Maybe this would be
>>> something that would interest more people, so I when I am finished perhaps
>>> I can suggest putting the solution into the flink-json and flink-protobuf
>>> packages.
>>>
>>> -Theo
>>>
>>> On 9. Nov 2022, at 21:24, Andrew Otto  wrote:
>>>
>>> Interesting, yeah I think you'll have to implement code to recurse
>>> through the (Row) DataType and somehow auto generate the JSONSchema you
>>> want.
>>>
>>> We abstracted the conversions from JSONSchema to other type systems in
>>> this JsonSchemaConverter
>>> <https://gerrit.wikimedia.org/r/plugins/gi

Re: Converting ResolvedSchema to JSON and Protobuf Schemas

2022-11-15 Thread Andrew Otto
> Also thanks for showing me your pattern with the SchemaConversions and
stuff. Feels pretty clean and worked like a charm :)
Glad to hear it, that is very cool!

> converts number to double always. I wonder, did you make this up?
Yes, we chose the the mapping.  We chose to do number -> double and integer
-> bigint because both of those are wider than their float/int
counterparts, meaning that double and integer will work in more cases.  Of
course, this is not an optimal usage of bits, but at least things won't
break.

> all kinds of fields like double, float, big decimal… they all get mapped
to number by my converter
It is possible to make some non-JSONSchema convention in the JSONSchema to
map to more specific types.  This is done for example with format:
date-time in our code, to map from a ISO-8601 string to a timestamp.  I
just did a quick google to find some example of someone else already doing
this and found this doc from IBM
<https://www.ibm.com/docs/en/cics-ts/5.3?topic=mapping-json-schema-c-c> saying
they use JSONSchema's format to specify a float, like

  type: number
  format: float

This seems like a pretty good idea to me, and we should probably do this at
WMF too!  However, it would be a custom convention, and not in the
JSONSchema spec itself, so when you convert back to a JSONSchema, you'd
have to codify this convention to do so (and nothing outside of your code
would really respect it).






On Tue, Nov 15, 2022 at 4:23 AM Theodor Wübker 
wrote:

> Yes, you are right. Schemas are not so nice in Json. When implementing and
> testing my converter from DataType to JsonSchema I noticed that your
> converter from JsonSchema to DataType converts number to double always. I
> wonder, did you make this up? Because the table that specifies the mapping
> <https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/formats/json/>
>  only
> does it for DataType -> JsonSchema.
>
> Its generally unfortunate that json schema only offers so little
> possibility to specify type information… now when I have a Flink DataType
> with all kinds of fields like double, float, big decimal… they all get
> mapped to number by my converter - in return when I use yours they are all
> mapped to a Flink Datatype double again. So I lose a lot of precision.
>
> I guess for my application it would in general be better to use Avro or
> Protobuf, since they retain a lot more type information when you convert
> them back and forth…
> Also thanks for showing me your pattern with the SchemaConversions and
> stuff. Feels pretty clean and worked like a charm :)
>
> -Theo
>
>
> On 10. Nov 2022, at 15:02, Andrew Otto  wrote:
>
> >  I find it interesting that the Mapping from DataType to AvroSchema
> does exist in Flink (see AvroSchemaConverter), but for all the other
> formats there is no such Mapping,
> Yah, but I guess for JSON, there isn't a clear 'schema' to be had.  There
> of course is JSONSchema, but it isn't a real java-y type system; it's just
> more JSON for which there exist validators.
>
>
>
> On Thu, Nov 10, 2022 at 2:12 AM Theodor Wübker 
> wrote:
>
>> Great, I will have a closer look at what you sent. Your idea seems very
>> good, it would be a very clean solution to be able to plug in different
>> SchemaConversions that a (Row) DataType can be mapped to. I will probably
>> try to implement it like this. I find it interesting that the Mapping from
>> DataType to AvroSchema does exist in Flink (see AvroSchemaConverter), but
>> for all the other formats there is no such Mapping. Maybe this would be
>> something that would interest more people, so I when I am finished perhaps
>> I can suggest putting the solution into the flink-json and flink-protobuf
>> packages.
>>
>> -Theo
>>
>> On 9. Nov 2022, at 21:24, Andrew Otto  wrote:
>>
>> Interesting, yeah I think you'll have to implement code to recurse
>> through the (Row) DataType and somehow auto generate the JSONSchema you
>> want.
>>
>> We abstracted the conversions from JSONSchema to other type systems in
>> this JsonSchemaConverter
>> <https://gerrit.wikimedia.org/r/plugins/gitiles/wikimedia-event-utilities/+/refs/heads/master/eventutilities/src/main/java/org/wikimedia/eventutilities/core/event/types/JsonSchemaConverter.java>.
>> There's nothing special going on here, I've seen versions of this schema
>> conversion code over and over again in different frameworks. This one just
>> allows us to plug in a SchemaConversions
>> <https://gerrit.wikimedia.org/r/plugins/gitiles/wikimedia-event-utilities/+/refs/heads/master/eventutilities/src/main/java/org/wikimedia/eventutilities/core/event/types/SchemaConversions.java>
>>  implementation
>&

Re: Converting ResolvedSchema to JSON and Protobuf Schemas

2022-11-09 Thread Andrew Otto
Interesting, yeah I think you'll have to implement code to recurse through
the (Row) DataType and somehow auto generate the JSONSchema you want.

We abstracted the conversions from JSONSchema to other type systems in this
JsonSchemaConverter
<https://gerrit.wikimedia.org/r/plugins/gitiles/wikimedia-event-utilities/+/refs/heads/master/eventutilities/src/main/java/org/wikimedia/eventutilities/core/event/types/JsonSchemaConverter.java>.
There's nothing special going on here, I've seen versions of this schema
conversion code over and over again in different frameworks. This one just
allows us to plug in a SchemaConversions
<https://gerrit.wikimedia.org/r/plugins/gitiles/wikimedia-event-utilities/+/refs/heads/master/eventutilities/src/main/java/org/wikimedia/eventutilities/core/event/types/SchemaConversions.java>
implementation
to provide the mappings to the output type system (like the Flink DataType
mappings
<https://gerrit.wikimedia.org/r/plugins/gitiles/wikimedia-event-utilities/+/refs/heads/master/eventutilities-flink/src/main/java/org/wikimedia/eventutilities/flink/formats/json/DataTypeSchemaConversions.java>
I
linked to before), rather than hardcoding the output types.

If I were trying to do what you are doing (in our codebase)...I'd create a
Flink DataTypeConverter that iterated through a (Row) DataType and a
SchemaConversions implementation that mapped to the JsonNode that
represented the JSONSchema.  (If not using Jackson...then you could use
another Java JSON object than JsonNode).
You could also make a SchemaConversions (with whatever
Protobuf class to use...I'm not familiar with Protobuf) and then use the
same DataTypeConverter to convert to ProtobufSchema.   AND THEN...I'd
wonder if the input schema recursion code itself could be abstracted too so
that it would work for either JsonSchema OR DataType OR whatever but anyway
that is probably too crazy and too much for what you are doing...but it
would be cool! :p





On Wed, Nov 9, 2022 at 9:52 AM Theodor Wübker 
wrote:

> I want to register the result-schema in a schema registry, as I am pushing
> the result-data to a Kafka topic. The result-schema is not known at
> compile-time, so I need to find a way to compute it at runtime from the
> resulting Flink Schema.
>
> -Theo
>
> (resent - again sorry, I forgot to add the others in the cc)
>
> On 9. Nov 2022, at 14:59, Andrew Otto  wrote:
>
> >  I want to convert the schema of a Flink table to both Protobuf *schema* and
> JSON *schema*
> Oh, you want to convert from Flink Schema TO JSONSchema?  Interesting.
> That would indeed be something that is not usually done.  Just curious, why
> do you want to do this?
>
> On Wed, Nov 9, 2022 at 8:46 AM Andrew Otto  wrote:
>
>> Hello!
>>
>> I see you are talking about JSONSchema, not just JSON itself.
>>
>> We're trying to do a similar thing at Wikimedia and have developed some
>> tooling around this.
>>
>> JsonSchemaFlinkConverter
>> <https://gerrit.wikimedia.org/r/plugins/gitiles/wikimedia-event-utilities/+/refs/heads/master/eventutilities-flink/src/main/java/org/wikimedia/eventutilities/flink/formats/json/JsonSchemaFlinkConverter.java>
>> has some logic to convert from JSONSchema Jackson ObjectNodes to Flink
>> Table DataType or Table SchemaBuilder, or Flink DataStream
>> TypeInformation[Row].  Some of the conversions from JSONSchema to Flink
>> type are opinionated.  You can see the mappings here
>> <https://gerrit.wikimedia.org/r/plugins/gitiles/wikimedia-event-utilities/+/refs/heads/master/eventutilities-flink/src/main/java/org/wikimedia/eventutilities/flink/formats/json/DataTypeSchemaConversions.java>
>> .
>>
>>
>>
>>
>>
>>
>>
>> On Wed, Nov 9, 2022 at 2:33 AM Theodor Wübker 
>> wrote:
>>
>>> Thanks for your reply Yaroslav! The way I do it with Avro seems similar
>>> to what you pointed out:
>>>
>>> ResolvedSchema resultSchema = resultTable.getResolvedSchema();
>>> DataType type = resultSchema.toSinkRowDataType();
>>> org.apache.avro.Schema converted = 
>>> AvroSchemaConverter.convertToSchema(type.getLogicalType());
>>>
>>> I mentioned the ResolvedSchema because it is my starting point after the
>>> SQL operation. It seemed to me that I can not retrieve something that
>>> contains more schema information from the table so I got myself this. About
>>> your other answers: It seems the classes you mentioned can be used to
>>> serialize actual Data? However this is not quite what I want to do.
>>> Essentially I want to convert the schema of a Flink table to both
>>> Protobuf *schema* and JSON *schema* (for Avro as you can see I have it
>>> already). It seems odd

Re: Converting ResolvedSchema to JSON and Protobuf Schemas

2022-11-09 Thread Andrew Otto
>  I want to convert the schema of a Flink table to both Protobuf *schema* and
JSON *schema*
Oh, you want to convert from Flink Schema TO JSONSchema?  Interesting.
That would indeed be something that is not usually done.  Just curious, why
do you want to do this?

On Wed, Nov 9, 2022 at 8:46 AM Andrew Otto  wrote:

> Hello!
>
> I see you are talking about JSONSchema, not just JSON itself.
>
> We're trying to do a similar thing at Wikimedia and have developed some
> tooling around this.
>
> JsonSchemaFlinkConverter
> <https://gerrit.wikimedia.org/r/plugins/gitiles/wikimedia-event-utilities/+/refs/heads/master/eventutilities-flink/src/main/java/org/wikimedia/eventutilities/flink/formats/json/JsonSchemaFlinkConverter.java>
> has some logic to convert from JSONSchema Jackson ObjectNodes to Flink
> Table DataType or Table SchemaBuilder, or Flink DataStream
> TypeInformation[Row].  Some of the conversions from JSONSchema to Flink
> type are opinionated.  You can see the mappings here
> <https://gerrit.wikimedia.org/r/plugins/gitiles/wikimedia-event-utilities/+/refs/heads/master/eventutilities-flink/src/main/java/org/wikimedia/eventutilities/flink/formats/json/DataTypeSchemaConversions.java>
> .
>
>
>
>
>
>
>
> On Wed, Nov 9, 2022 at 2:33 AM Theodor Wübker 
> wrote:
>
>> Thanks for your reply Yaroslav! The way I do it with Avro seems similar
>> to what you pointed out:
>>
>> ResolvedSchema resultSchema = resultTable.getResolvedSchema();
>> DataType type = resultSchema.toSinkRowDataType();
>> org.apache.avro.Schema converted = 
>> AvroSchemaConverter.convertToSchema(type.getLogicalType());
>>
>> I mentioned the ResolvedSchema because it is my starting point after the
>> SQL operation. It seemed to me that I can not retrieve something that
>> contains more schema information from the table so I got myself this. About
>> your other answers: It seems the classes you mentioned can be used to
>> serialize actual Data? However this is not quite what I want to do.
>> Essentially I want to convert the schema of a Flink table to both
>> Protobuf *schema* and JSON *schema* (for Avro as you can see I have it
>> already). It seems odd that this is not easily possible, because converting
>> from a JSON schema to a Schema of Flink is possible using the
>> JsonRowSchemaConverter. However the other way is not implemented it seems.
>> This is how I got a Table Schema (that I can use in a table descriptor)
>> from a JSON schema:
>>
>> TypeInformation type = JsonRowSchemaConverter.convert(json);
>> DataType row = TableSchema.fromTypeInfo(type).toPhysicalRowDataType();
>> Schema schema = Schema.newBuilder().fromRowDataType(row).build();
>>
>> Sidenote: I use deprecated methods here, so if there is a better approach
>> please let me know! But it shows that in Flink its easily possible to
>> create a Schema for a TableDescriptor from a JSON Schema - the other way is
>> just not so trivial it seems. And for Protobuf so far I don’t have any
>> solutions, not even creating a Flink Schema from a Protobuf Schema - not to
>> mention the other way around.
>>
>> -Theo
>>
>> (resent because I accidentally only responded to you, not the Mailing
>> list - sorry)
>>
>>


Re: Converting ResolvedSchema to JSON and Protobuf Schemas

2022-11-09 Thread Andrew Otto
Hello!

I see you are talking about JSONSchema, not just JSON itself.

We're trying to do a similar thing at Wikimedia and have developed some
tooling around this.

JsonSchemaFlinkConverter

has some logic to convert from JSONSchema Jackson ObjectNodes to Flink
Table DataType or Table SchemaBuilder, or Flink DataStream
TypeInformation[Row].  Some of the conversions from JSONSchema to Flink
type are opinionated.  You can see the mappings here

.







On Wed, Nov 9, 2022 at 2:33 AM Theodor Wübker 
wrote:

> Thanks for your reply Yaroslav! The way I do it with Avro seems similar to
> what you pointed out:
>
> ResolvedSchema resultSchema = resultTable.getResolvedSchema();
> DataType type = resultSchema.toSinkRowDataType();
> org.apache.avro.Schema converted = 
> AvroSchemaConverter.convertToSchema(type.getLogicalType());
>
> I mentioned the ResolvedSchema because it is my starting point after the
> SQL operation. It seemed to me that I can not retrieve something that
> contains more schema information from the table so I got myself this. About
> your other answers: It seems the classes you mentioned can be used to
> serialize actual Data? However this is not quite what I want to do.
> Essentially I want to convert the schema of a Flink table to both Protobuf
> *schema* and JSON *schema* (for Avro as you can see I have it already).
> It seems odd that this is not easily possible, because converting from a
> JSON schema to a Schema of Flink is possible using the
> JsonRowSchemaConverter. However the other way is not implemented it seems.
> This is how I got a Table Schema (that I can use in a table descriptor)
> from a JSON schema:
>
> TypeInformation type = JsonRowSchemaConverter.convert(json);
> DataType row = TableSchema.fromTypeInfo(type).toPhysicalRowDataType();
> Schema schema = Schema.newBuilder().fromRowDataType(row).build();
>
> Sidenote: I use deprecated methods here, so if there is a better approach
> please let me know! But it shows that in Flink its easily possible to
> create a Schema for a TableDescriptor from a JSON Schema - the other way is
> just not so trivial it seems. And for Protobuf so far I don’t have any
> solutions, not even creating a Flink Schema from a Protobuf Schema - not to
> mention the other way around.
>
> -Theo
>
> (resent because I accidentally only responded to you, not the Mailing list
> - sorry)
>
>


Re: KafkaSource, consumer assignment, and Kafka ‘stretch’ clusters for multi-DC streaming apps

2022-10-05 Thread Andrew Otto
(Ah, note that I am considering very simple streaming apps here, e.g. event
enrichment apps.  No windowing or complex state.  The only state is the
Kafka offsets, which I suppose would also have to be managed from Kafka,
not from Flink state.)



On Wed, Oct 5, 2022 at 9:54 AM Andrew Otto  wrote:

> Hi all,
>
> *tl;dr: Would it be possible to make a KafkaSource that uses Kafka's built
> in consumer assignment for Flink tasks?*
>
> At the Wikimedia Foundation we are evaluating
> <https://phabricator.wikimedia.org/T307944> whether we can use a Kafka
> 'stretch' cluster to simplify the multi-datacenter deployment architecture
> of streaming applications.
>
> A Kafka stretch cluster is one in which the brokers span multiple
> datacenters, relying on the usual Kafka broker replication for multi DC
> replication (rather than something like Kafka MirrorMaker).  This is
> feasible with Kafka today mostly because of follower fetching
> <https://cwiki.apache.org/confluence/display/KAFKA/KIP-392%3A+Allow+consumers+to+fetch+from+closest+replica>
> support, allowing consumers to be assigned to consume from partitions that
> are 'closest' to them, e.g. in the same 'rack' (or DC :) ).
>
> Having a single Kafka cluster makes datacenter failover for streaming
> applications a little bit simpler, as there is only one set of offsets to
> use when saving state.  We can run a streaming app in active/passive mode.
> This would allow us to stop the app in one datacenter, and then start it up
> in another, using the same state snapshot and same Kafka cluster.
>
> But, this got me wondering...would it be possible to run a streaming app
> in an active/active mode, where in normal operation, half of the work was
> being done in each DC, and in failover, all of the work would automatically
> failover to the online DC.
>
> I don't think running a single Flink application cross DC would work well;
> there's too much inter-node traffic happening, and the Flink tasks don't
> have any DC awareness.
>
> But would it be possible to run two separate streaming applications in
> each DC, but in the *same Kafka consumer group*? I believe that, if the
> streaming app was using Kafka's usual consumer assignment and rebalancing
> protocol, it would.  Kafka would just see clients connecting from either DC
> in the same consumer group, and assign each consumer an equal number of
> partitions to consume, resulting in equal partition balancing in DCs.  If
> we shut down one of the streaming apps, Kafka would automatically rebalance
> the consumers in the consumer group, assigning all of the work to the
> remaining streaming app in the other DC.
>
> I got excited about this possibility, only to learn that Flink's
> KafkaSource does not use Kafka for consumer assignment.  I think I
> understand why it does this: the Source API can do a lot more than Kafka,
> so having some kind of state management (offsets) and task assignment
> (Kafka consumer balance protocol) outside of the usual Flink Source would
> be pretty weird.  Implementing offset and task assignment inside of the
> KafkaSource allows it to work like any other Source implementation.
>
> However, this active/active multi DC streaming app idea seems pretty
> compelling, as it would greatly reduce operator/SRE overhead.  It seems to
> me that any Kafka streaming app that did use Kafka's built in consumer
> assignment protocol (like Kafka Streams) would be deployable in this way.
> But in Flink this is not possible because of the way it assigns tasks.
>
> I'm writing this email to see what others think about this, and wonder if
> it might be possible to implement a KafkaSource that assigned tasks using
> Kafka's usual consumer assignment protocol.  Hopefully someone more
> knowledgeable about Sources and TaskSplits, etc. could advise here.
>
> Thank you!
>
> - Andrew Otto
>   Wikimedia Foundation
>
>
>


KafkaSource, consumer assignment, and Kafka ‘stretch’ clusters for multi-DC streaming apps

2022-10-05 Thread Andrew Otto
Hi all,

*tl;dr: Would it be possible to make a KafkaSource that uses Kafka's built
in consumer assignment for Flink tasks?*

At the Wikimedia Foundation we are evaluating
<https://phabricator.wikimedia.org/T307944> whether we can use a Kafka
'stretch' cluster to simplify the multi-datacenter deployment architecture
of streaming applications.

A Kafka stretch cluster is one in which the brokers span multiple
datacenters, relying on the usual Kafka broker replication for multi DC
replication (rather than something like Kafka MirrorMaker).  This is
feasible with Kafka today mostly because of follower fetching
<https://cwiki.apache.org/confluence/display/KAFKA/KIP-392%3A+Allow+consumers+to+fetch+from+closest+replica>
support, allowing consumers to be assigned to consume from partitions that
are 'closest' to them, e.g. in the same 'rack' (or DC :) ).

Having a single Kafka cluster makes datacenter failover for streaming
applications a little bit simpler, as there is only one set of offsets to
use when saving state.  We can run a streaming app in active/passive mode.
This would allow us to stop the app in one datacenter, and then start it up
in another, using the same state snapshot and same Kafka cluster.

But, this got me wondering...would it be possible to run a streaming app in
an active/active mode, where in normal operation, half of the work was
being done in each DC, and in failover, all of the work would automatically
failover to the online DC.

I don't think running a single Flink application cross DC would work well;
there's too much inter-node traffic happening, and the Flink tasks don't
have any DC awareness.

But would it be possible to run two separate streaming applications in each
DC, but in the *same Kafka consumer group*? I believe that, if the
streaming app was using Kafka's usual consumer assignment and rebalancing
protocol, it would.  Kafka would just see clients connecting from either DC
in the same consumer group, and assign each consumer an equal number of
partitions to consume, resulting in equal partition balancing in DCs.  If
we shut down one of the streaming apps, Kafka would automatically rebalance
the consumers in the consumer group, assigning all of the work to the
remaining streaming app in the other DC.

I got excited about this possibility, only to learn that Flink's
KafkaSource does not use Kafka for consumer assignment.  I think I
understand why it does this: the Source API can do a lot more than Kafka,
so having some kind of state management (offsets) and task assignment
(Kafka consumer balance protocol) outside of the usual Flink Source would
be pretty weird.  Implementing offset and task assignment inside of the
KafkaSource allows it to work like any other Source implementation.

However, this active/active multi DC streaming app idea seems pretty
compelling, as it would greatly reduce operator/SRE overhead.  It seems to
me that any Kafka streaming app that did use Kafka's built in consumer
assignment protocol (like Kafka Streams) would be deployable in this way.
But in Flink this is not possible because of the way it assigns tasks.

I'm writing this email to see what others think about this, and wonder if
it might be possible to implement a KafkaSource that assigned tasks using
Kafka's usual consumer assignment protocol.  Hopefully someone more
knowledgeable about Sources and TaskSplits, etc. could advise here.

Thank you!

- Andrew Otto
  Wikimedia Foundation


Re: Re: [DISCUSS] Contribution of Multi Cluster Kafka Source

2022-06-27 Thread Andrew Otto
This sounds very useful!  Another potential use case:

- Consuming from multiple kafka clusters in different datacenters/regions.

I'm not sure if we would ultimately want to do this, but having it as an
option when you need events from multiple kafka clusters to get the full
history of changes (instead of relying on MirrorMaker) could be nice.






On Mon, Jun 27, 2022 at 1:02 PM Ryan van Huuksloot <
ryan.vanhuuksl...@shopify.com> wrote:

> Hi Mason,
>
> Thanks for starting this discussion! The proposed Source sounds awesome
> and we would be interested in taking a look at the source code and
> evaluating our use cases. We can provide information and review on a
> potential FLIP based on other use cases.
>
> Do you have a fork/branch that you are working with that is public? Could
> you attach that so we can start looking at it?
>
> Let us know if you need anything from us to help move this forward.
>
> Thanks!
> Ryan
>
> On 2022/06/27 03:08:13 Qingsheng Ren wrote:
> > Hi Mason,
> >
> > It sounds like an exciting enhancement to the Kafka source and will
> benefit a lot of users I believe.
> >
> > Would you prefer to reuse the existing flink-connector-kafka module or
> create a new one for the new multi-cluster feature? Personally I prefer the
> former one because users won’t need to introduce another dependency module
> to their projects in order to use the feature.
> >
> > Thanks for the effort on this and looking forward to your FLIP!
> >
> > Best,
> > Qingsheng
> >
> > > On Jun 24, 2022, at 09:43, Mason Chen  wrote:
> > >
> > > Hi community,
> > >
> > > We have been working on a Multi Cluster Kafka Source and are looking to
> > > contribute it upstream. I've given a talk about the features and
> design at
> > > a Flink meetup: https://youtu.be/H1SYOuLcUTI.
> > >
> > > The main features that it provides is:
> > > 1. Reading multiple Kafka clusters within a single source.
> > > 2. Adjusting the clusters and topics the source consumes from
> dynamically,
> > > without Flink job restart.
> > >
> > > Some of the challenging use cases that these features solve are:
> > > 1. Transparent Kafka cluster migration without Flink job restart.
> > > 2. Transparent Kafka topic migration without Flink job restart.
> > > 3. Direct integration with Hybrid Source.
> > >
> > > In addition, this is designed with wrapping and managing the existing
> > > KafkaSource components to enable these features, so it can continue to
> > > benefit from KafkaSource improvements and bug fixes. It can be
> considered
> > > as a form of a composite source.
> > >
> > > I think the contribution of this source could benefit a lot of users
> who
> > > have asked in the mailing list about Flink handling Kafka migrations
> and
> > > removing topics in the past. I would love to hear and address your
> thoughts
> > > and feedback, and if possible drive a FLIP!
> > >
> > > Best,
> > > Mason
> >
> >
>


Re: How to use connectors in PyFlink 1.15.0 when not defined in Python API?

2022-06-24 Thread Andrew Otto
I've had success using the Java in pyflink via pyflink.java_gateway.
Something like:

from pyflink.java_gateway import get_gateway
jvm = get_gateway()

# then perhaps something like:
FlinkKinesisConsumer = jvm.
org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer

There also seems to be a nice java_utils.py

 with helpers that may uh, help.

Not sure if this will work, you might need to use the python env's a java
StreamTableEnvironment to do it?  Here's an example

of how the python StreamTableEnvironment calls out to the Java one.

BTW: I'm not an authority nor I have I really tried this, so take this
advice with a grain of salt!  :)

Good luck!






On Fri, Jun 24, 2022 at 9:06 AM John Tipper  wrote:

> Hi all,
>
> There are a number of connectors which do not appear to be in the Python
> API v1.15.0, e.g. Kinesis. I can see that it's possible to use these
> connectors by using the Table API:
>
> CREATE TABLE my_table (...)
> WITH ('connector' = 'kinesis' ...)
>
>
> I guess if you wanted the stream as a DataStream you'd I guess you'd
> create the Table and then convert into a DataStream?
>
> Is there a way of directly instantiating these connectors in PyFlink
> without needed to use SQL like this (and without having to wait until
> v1.16)? e.g. the Java API looks like this:
>
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> DataStream kinesis = env.addSource(new FlinkKinesisConsumer<>(
> "kinesis_stream_name", new SimpleStringSchema(), consumerConfig));
>
>
> Many thanks,
>
> John
>


Re: Flink, JSON, and JSONSchemas

2022-06-18 Thread Andrew Otto
> > *1. Is there some easy way to use deserialized JSON in DataStream
without case classes or POJOs?*
> Could you explain what you expected? Do you mean you want to just
register a DataType that is able to bridge the received bytes to the POJO
objects.
No, heh, I don't want to have any POJO objects.  I don't want users to have
to write hardcoded java classes of our canonical JSONSchemas.  I want
someone to be able to use JSON data in Flink that we know conforms to a
JSONSchema with types that map cleanly to Java types (i.e. no random
additionalProperties and $refs) without hardcoding any
duplicate information about that data that can be retrieved via our other
internal API.  (We have an HTTP 'schema registry' for JSONSchemas.).

Row (and RowData) can do this;  I just want to use them easily with JSON in
a DataStream.

> *> 2. How can I use a DeserializationSchema to get a DataStream
or even DataStreamSource in a unit test from either a file or
String[]/byte[] of serialized JSON?*
> For DeserializationSchema, you can refer to the Kafka
connector[2]. I think it should be similar to the
DeserializationSchema.

JsonRowDeserializationSchema is marked as deprecated and recommends to use
the Table API.  I can do that, but I feel like it is overkill for just
wanting to use DataStream.   I was trying to get away with starting
and ending with the Table API always, where I can easily use DataType and
RowData, but if I do some map transformations on the DataStream to
produce a new stream, I need an explicitly declared TypeInformation that
matches the new stream schema when converting back into the Table API. If I
need to have the output TypeInformation explicitly declared anyway, I
might as well just start with TypeInformation in the first place, and
stay in DataStream world the whole time.


FWIW, I think I've been able to accomplish what I was trying to do in this
patch <https://gerrit.wikimedia.org/r/c/wikimedia-event-utilities/+/806319>.
Still needs some testing, but I've written my own JSONSchema ->
TypeInformation converter, and have copy/pasted and modified Flink's
deprecated JsonRowDeserializaitonSchema into our code.


Thank you for your responses!
-Andrew Otto
 Wikimedia Foundation



On Fri, Jun 17, 2022 at 12:33 AM Shengkai Fang  wrote:

> Hi.
>
> > *1. Is there some easy way to use deserialized JSON in DataStream
> without case classes or POJOs?*
>
> Could you explain what you expected? Do you mean you want to just register
> a DataType that is able to bridge the received bytes to the POJO objects. I
> am not sure wether the current RAW type[1] in Flink Table is enough for
> you.
>
> *> 2. How can I use a DeserializationSchema to get a DataStream
> or even DataStreamSource in a unit test from either a file or
> String[]/byte[] of serialized JSON?*
>
> For DeserializationSchema, you can refer to the Kafka
> connector[2]. I think it should be similar to the
> DeserializationSchema.
>
> Best,
> Shengkai
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/types/
> [2]
> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSource.java#L234
>
>
>
> Andrew Otto  于2022年6月17日周五 02:26写道:
>
>> At the Wikimedia Foundation, we use JSON and JSONSchemas for our events
>> in Kafka.  There are hundreds of these schemas and topics in Kafka.  I'd
>> like to provide library level integration between our 'Event Platform' JSON
>> data and Flink.  My main goal:
>>
>> *No case classes or POJOs.  *The JSONSchemas should be enough.
>>
>> I can actually do this pretty easily with the Table API. I can
>> convert from JSONSchema to a DataType, and then create a table with that
>> DataType and format('json').
>>
>> I'd like to be able to do the same for the DataStream API.  From what I
>> can tell, to do this I should be using a Row
>> <https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/types/Row.html>
>> as the record type.  I can also convert from JSONSchema to
>> TypeInformation pretty easily, using the Types factory
>> <https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/api/common/typeinfo/Types.html>
>> .
>>
>> While I can convert to and from
>> <https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/data_stream_api/>
>> the Table API to DataStream, it seems directly using DataStream
>> of our JSON could be pretty useful, and would make it possible to use Flink
>> without instantiating a StreamTableEnvironment or requiring a 'table
>> planner'.  Also, to convert back up to the Table API from a
>> DataStream, I need the explicit Typ

Flink, JSON, and JSONSchemas

2022-06-16 Thread Andrew Otto
At the Wikimedia Foundation, we use JSON and JSONSchemas for our events in
Kafka.  There are hundreds of these schemas and topics in Kafka.  I'd like
to provide library level integration between our 'Event Platform' JSON data
and Flink.  My main goal:

*No case classes or POJOs.  *The JSONSchemas should be enough.

I can actually do this pretty easily with the Table API. I can convert from
JSONSchema to a DataType, and then create a table with that DataType and
format('json').

I'd like to be able to do the same for the DataStream API.  From what I can
tell, to do this I should be using a Row

as the record type.  I can also convert from JSONSchema to
TypeInformation pretty easily, using the Types factory

.

While I can convert to and from

the Table API to DataStream, it seems directly using DataStream
of our JSON could be pretty useful, and would make it possible to use Flink
without instantiating a StreamTableEnvironment or requiring a 'table
planner'.  Also, to convert back up to the Table API from a
DataStream, I need the explicit TypeInformation, which I need to
manually construct.

Ah but, JsonRowDeserializationSchema

is
deprecated. Okay, fine I can copy it into my code and modify it for my
purposes.  But even if I do, I'm confused about something else:

DeserializationSchema is not Table API specific (e.g. it can be used as the
value deserializer in KafkaSource).  Row is also not Table API specific
(although I know the main purpose is to bridge Table to DataStream API).
However, it seems that constructing a Source using DeserializationSchema is
not really that common?  KafkaSource uses it, but FileSource and
env.fromElements don't?  I'm trying to write integration tests for this
that use the DataStream API.

*tl;dr questions:*

*1. Is there some easy way to use deserialized JSON in DataStream without
case classes or POJOs?*

*2. How can I use a DeserializationSchema to get a DataStream or
even DataStreamSource in a unit test from either a file or
String[]/byte[] of serialized JSON?*

Thank you!


Re: Flink Shaded dependencies and extending Flink APIs

2022-06-16 Thread Andrew Otto
Hi all thanks for the responses.

> Create a module let's say "wikimedia-event-utilities-shaded"
This actually doesn't help me, as wikimedia-event-utilities is used as an
API by non Flink stuff too, so I don't want to use the shaded ObjectNode in
the API params.

> Another solution is that you can serialize then deserialize the "different"
ObjectNode
Haha, I thought of this too and then was like...no way, too crazy!

> Both flink-shaded, any relocation pattern and JsonRowDataSerializationSchema
are Flink internals that users shouldn't use/rely on.
Yeah, in hindsight, I think the right solution is to make my own
SerializationSchema, even if that is mostly copy/pasting the internal Flink
one, rather than extending it.

I have another question around JSON and Flink, but I'll start a new thread
for that.

Thank you!




On Mon, Jun 13, 2022 at 7:17 AM Chesnay Schepler  wrote:

> Can we find a more robust way to support this?
>
> Both flink-shaded, any relocation pattern and
> JsonRowDataSerializationSchema are Flink internals that users shouldn't
> use/rely on.
>
> On 13/06/2022 12:26, Qingsheng Ren wrote:
> > Hi Andrew,
> >
> > This is indeed a tricky case since Flink doesn't provide non-shaded
> > JAR for flink-json. One hacky solution in my mind is like:
> >
> > 1. Create a module let's say "wikimedia-event-utilities-shaded" that
> > relocates Jackson in the same way and uses the same Jackson version as
> > flink-shaded-jackson
> > 2. Deploy the module to a local or remote Maven repository
> > 3. Let your custom format depend on the
> > "wikimedia-event-utilities-shaded" module, then all Jackson
> > dependencies are relocated in the same way.
> >
> > Another solution is that you can serialize then deserialize the
> > "different" ObjectNode to do the conversion but this sacrifices the
> > performance.
> >
> > Hope this could be helpful!
> >
> > Best regards,
> >
> > Qingsheng
> >
> > On Thu, Jun 9, 2022 at 8:29 PM Andrew Otto  wrote:
> >> Hi all,
> >>
> >> I'm working on an integration project trying to write some library code
> that will allow us at the Wikimedia Foundation to use Flink with our 'Event
> Platform'.  Specifically, I'm trying to write a reusable step near the end
> of a pipeline that will ensure our JSON events satisfy some criteria before
> producing them to Kafka.  Details here.
> >>
> >> I'm experimenting with writing my own custom format to do this.  But
> all I really need to do is override JsonRowDataSerializationSchema's
> serialize method and augment and validate the ObjectNode before it is
> serialized to byte[].
> >>
> >> I'm running into an issue where the ObjectNode that is used by Flink
> here is the shaded one:
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode,
> whereas the WMF code I want to use to augment the ObjectNode is using a
> regular non shaded one.  I can't pass the shaded ObjectNode instance to a
> function that takes a non shaded one, and I can't cast the shaded
> ObjectNode to non shaded either.
> >>
> >> My Q is: is there a way to extend Flink APIs that use shaded
> dependencies?  I suppose I could copy/paste the whole of the "json" format
> code that I need into my project and just make it my own, but this feels
> quite obnoxious.
> >>
> >> Thank you!
> >> -Andrew Otto
> >>   Wikimedia Foundation
> >>
> >>
>
>


Flink Shaded dependencies and extending Flink APIs

2022-06-09 Thread Andrew Otto
Hi all,

I'm working on an integration project trying to write some library code
that will allow us at the Wikimedia Foundation to use Flink with our 'Event
Platform <https://wikitech.wikimedia.org/wiki/Event_Platform>'.
Specifically, I'm trying to write a reusable step near the end of a
pipeline that will ensure our JSON events satisfy some criteria before
producing them to Kafka.  Details here
<https://phabricator.wikimedia.org/T310218>.

I'm experimenting with writing my own custom format
<https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sourcessinks/#encoding--decoding-formats>
to
do this.  But all I really need to do is override
JsonRowDataSerializationSchema's
serialize method
<https://github.com/apache/flink/blob/master/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataSerializationSchema.java#L90-L101>
and
augment and validate the ObjectNode before it is serialized to byte[].

I'm running into an issue where the ObjectNode that is used by Flink here
is the shaded one: org.apache.flink.shaded.jackson2.com.fasterxml.jackson.
databind.node.ObjectNode, whereas the WMF code
<https://gerrit.wikimedia.org/r/plugins/gitiles/wikimedia-event-utilities/+/refs/heads/master/eventutilities/src/main/java/org/wikimedia/eventutilities/core/event/JsonEventGenerator.java#85>
I want to use to augment the ObjectNode is using a regular non shaded one.
I can't pass the shaded ObjectNode instance to a function that takes a non
shaded one, and I can't cast the shaded ObjectNode to non shaded either.

My Q is: is there a way to extend Flink APIs that use shaded dependencies?
I suppose I could copy/paste the whole of the "json" format code that I
need into my project and just make it my own, but this feels quite
obnoxious.

Thank you!
-Andrew Otto
 Wikimedia Foundation


Re: Notify on 0 events in a Tumbling Event Time Window

2022-05-09 Thread Andrew Otto
This sounds similar to a non streaming problem we had at WMF.  We ingest
all event data from Kafka into HDFS/Hive and partition the Hive tables in
hourly directories.  If there are no events in a Kafka topic for a given
hour, we have no way of knowing if the hour has been ingested
successfully.  For all we know, the upstream producer pipeline might be
broken.

We solved this by emitting artificial 'canary' events into each topic
multiple times an hour.  The canary events producer uses the same code
pathways and services that (most) of our normal event producers do.  Then,
when ingesting into Hive, we filter out the canary events.  The ingestion
code has work to do and can mark an hour as complete, but still end up
writing no events to it.

Perhaps you could do the same?  Always emit artificial events, and filter
them out in your windowing code? The window should still fire since it will
always have events, even if you don't use them?




On Mon, May 9, 2022 at 8:55 AM Shilpa Shankar 
wrote:

> Hello,
> We are building a flink use case where we are consuming from a kafka topic
> and performing aggregations and generating alerts based on average, max,
> min thresholds. We also need to notify the users when there are 0 events in
> a Tumbling Event Time Windows. We are having trouble coming up with a
> solution to do the same. The options we considered are below, please let us
> know if there are other ideas we haven't looked into.
>
> [1] Querable State : Save the keys in each of the Process Window
> Functions. Query the state from an external application and alert when a
> key is missing after the 20min time interval has expired. We see Queryable
> state feature is being deprecated in the future. We do not want to go down
> this path when we already know there is an EOL for it.
>
> [2] Use Processing Time Windows :  Using Processing time instead of Event
> time would have been an option if our downstream applications would send
> out events in real time. Maintenances of the downstream applications,
> delays etc would result in a lot of data loss which is undesirable.
>
> Flink version : 1.14.3
>
> Thanks,
> Shilpa
>


Re: Migrating Flink apps across cloud with state

2022-05-04 Thread Andrew Otto
Have you tried MirrorMaker 2's consumer offset translation feature?  I have
not used this myself, but it sounds like what you are looking for!
https://issues.apache.org/jira/browse/KAFKA-9076
https://kafka.apache.org/26/javadoc/org/apache/kafka/connect/mirror/Checkpoint.html
https://strimzi.io/blog/2020/03/30/introducing-mirrormaker2/

I tried to find some better docs to link for you, but that's the best I got
:)  It looks like there is just the Java API.



On Wed, May 4, 2022 at 3:29 PM Hemanga Borah 
wrote:

> Thank you for the suggestions, guys!
>
> @Austin Cawley-Edwards
> Your idea is spot on! This approach would surely work. We could take a
> savepoint of each of our apps, load it using state processor apis and
> create another savepoint accounting for the delta on the offsets, and start
> the app on the new cloud using this modified savepoint.
> However, the solution will not be generic, and we have to do this for each
> of our applications. This can be quite cumbersome as we have several
> applications (around 25).
>
> We are thinking of overriding the FlinkKafkaConsumerBase to account for
> the offset deltas during the start-up of any app. Do you think it is safe
> to do that? Is there a better way of doing this?
>
> @Schwalbe Matthias
> Thank you for your suggestion. We do use exactly-once semantics, but, our
> apps can tolerate a few duplicates in rare cases like this one where we are
> migrating clouds. However, your suggestion is really helpful and we will
> use it in case some of the apps cannot tolerate duplicate data.
>
>
> On Wed, May 4, 2022 at 12:00 AM Schwalbe Matthias <
> matthias.schwa...@viseca.ch> wrote:
>
>> Hello Hemanga,
>>
>>
>>
>> MirrorMaker can cause havoc in many respects, for one, it does not have
>> strict exactly-once.semantics…
>>
>>
>>
>> The way I would tackle this problem (and have done in similar
>> situaltions):
>>
>>
>>
>>- For the source topics that need to be have exactly-once-semantics
>>and that are not intrinsically idempotent:
>>- Add one extra operator after the source that deduplicates events by
>>unique id for a rolling time range (on the source cloud provider)
>>- Take a savepoint after the rolling time-range has passed (at least
>>once completely)
>>- Move your job to the target cloud provider
>>- Reconfigure the resp. source with a new kafka consumer group.id,
>>- Change the uid() of the resp. kafka source,
>>- Configure start-by-timestamp for the resp. source with a timestamp
>>that lies within the rolling time range (of above)
>>- Configure the job to ignore  recovery for state that does not have
>>a corresponding operator in the job (the previous kafka source uid()s)
>>- Start the job on new cloud provider, wait for it to pick
>>up/back-fill
>>- Take a savepoint
>>- Remove deduplication operator if that causes too much
>>load/latency/whatever
>>
>>
>>
>> This scheme sounds more complicated than it really is … and has saved my
>> sanity quite a number of times 
>>
>>
>>
>> Good luck and ready to answer more details
>>
>>
>>
>> Thias
>>
>>
>>
>> *From:* Hemanga Borah 
>> *Sent:* Tuesday, May 3, 2022 3:12 AM
>> *To:* user@flink.apache.org
>> *Subject:* Migrating Flink apps across cloud with state
>>
>>
>>
>> Hello,
>>  We are attempting to port our Flink applications from one cloud provider
>> to another.
>>
>>  These Flink applications consume data from Kafka topics and output to
>> various destinations (Kafka or databases). The applications have states
>> stored in them. Some of these stored states are aggregations, for example,
>> at times we store hours (or days) worth of data to aggregate over time.
>> Some other applications have cached information for data enrichment, for
>> example, we store data in Flink state for days, so that we can join them
>> with newly arrived data. The amount of data on the input topics is a lot,
>> and it will be expensive to reprocess the data from the beginning of the
>> topic.
>>
>>  As such, we want to retain the state of the application when we move to
>> a different cloud provider so that we can retain the aggregations and
>> cache, and do not have to start from the beginning of the input topics.
>>
>>  We are replicating the Kafka topics using MirrorMaker 2. This is our
>> procedure:
>>
>>- Replicate the input topics of each Flink application from source
>>cloud to destination cloud.
>>- Take a savepoint of the Flink application on the source cloud
>>provider.
>>- Start the Flink application on the destination cloud provider using
>>the savepoint from the source cloud provider.
>>
>>
>> However, this does not work as we want because there is a difference in
>> offset in the new topics in the new cloud provider (because of MirrorMaker
>> implementation). The offsets of the new topic do not match the ones stored
>> on the Flink savepoint, hence, Flink cannot map to the offsets of the new
>> topic during startup.
>>
>> 

Re: [DISCUSS] Deprecate/remove Twitter connector

2022-01-31 Thread Andrew Otto
https://golb.hplar.ch/2018/02/Access-Server-Sent-Events-from-Java.html
looks like a nice tutorial.

On Mon, Jan 31, 2022 at 12:27 PM Andrew Otto  wrote:

> Any SSE/EventSource Java Client should work.  I have not personally used
> one.  From a quick search, maybe
> https://github.com/launchdarkly/okhttp-eventsource or something like it?
>
>
>
> On Mon, Jan 31, 2022 at 11:45 AM Francesco Guardiani <
> france...@ververica.com> wrote:
>
>> > Shameless plug:  Maybe the Wikipedia EventStreams
>> <https://wikitech.wikimedia.org/wiki/Event_Platform/EventStreams> SSE API
>> <https://stream.wikimedia.org/?doc#/streams> would make for a great
>> connector example in Flink?
>>
>> Sounds like a great idea! Do you have a ready to use Java Client for
>> that?
>>
>> On Mon, Jan 31, 2022 at 3:47 PM Jing Ge  wrote:
>>
>>> Thanks @Martijn for driving this! +1 for deprecating and removing it.
>>> All the concerns mentioned previously are valid. It is good to know that
>>> the upcoming connector template/archetype will help the user for the
>>> kickoff. Beyond that, speaking of using a real connector as a sample, since
>>> Flink is heading towards the unified batch and stream processing, IMHO, it
>>> would be nice to pick up a feasible connector for this trend to let the
>>> user get a sample close to the use cases.
>>>
>>> Best regards
>>> Jing
>>>
>>> On Mon, Jan 31, 2022 at 3:07 PM Andrew Otto  wrote:
>>>
>>>> Shameless plug:  Maybe the Wikipedia EventStreams
>>>> <https://wikitech.wikimedia.org/wiki/Event_Platform/EventStreams> SSE
>>>> API <https://stream.wikimedia.org/?doc#/streams> would make for a
>>>> great connector example in Flink?
>>>>
>>>> :D
>>>>
>>>> On Mon, Jan 31, 2022 at 5:41 AM Martijn Visser 
>>>> wrote:
>>>>
>>>>> Hi all,
>>>>>
>>>>> Thanks for your feedback. It's not about having this connector in the
>>>>> main repo, that has been voted on already. This is strictly about the
>>>>> connector itself, since it's not maintained and most probably also can't 
>>>>> be
>>>>> used due to changes in Twitter's API that aren't reflected in our 
>>>>> connector
>>>>> implementation. Therefore I propose to remove it.
>>>>>
>>>>> Fully agree on the template part, what's good to know is that a
>>>>> connector template/archetype is part of the goals for the external
>>>>> connector repository.
>>>>>
>>>>> Best regards,
>>>>>
>>>>> Martijn
>>>>>
>>>>> On Mon, 31 Jan 2022 at 11:32, Francesco Guardiani <
>>>>> france...@ververica.com> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> I agree with the concern about having this connector in the main
>>>>>> repo. But I think in general it doesn't harm to have a sample connector 
>>>>>> to
>>>>>> show how to develop a custom connector, and I think that the Twitter
>>>>>> connector can be a good candidate for such a template. It needs rework 
>>>>>> for
>>>>>> sure, as it has evident issues, notably it doesn't work with table.
>>>>>>
>>>>>> So i understand if we wanna remove what we have right now, but I
>>>>>> think we should have some replacement for a "connector template", which 
>>>>>> is
>>>>>> both ready to use and easy to hack to build your own connector starting
>>>>>> from it. Twitter API is a good example for such a template, as it's both
>>>>>> "related" to the known common use cases of Flink and because is quite
>>>>>> simple to get started with.
>>>>>>
>>>>>> FG
>>>>>>
>>>>>> On Sun, Jan 30, 2022 at 12:31 PM David Anderson <
>>>>>> da...@alpinegizmo.com> wrote:
>>>>>>
>>>>>>> I agree.
>>>>>>>
>>>>>>> The Twitter connector is used in a few (unofficial) tutorials, so if
>>>>>>> we remove it that will make it more difficult for those tutorials to be
>>>>>>> maintained. On the other hand, if I recall correctly, that connector 
>>>>>>> uses
>>>

Re: [DISCUSS] Deprecate/remove Twitter connector

2022-01-31 Thread Andrew Otto
Any SSE/EventSource Java Client should work.  I have not personally used
one.  From a quick search, maybe
https://github.com/launchdarkly/okhttp-eventsource or something like it?



On Mon, Jan 31, 2022 at 11:45 AM Francesco Guardiani <
france...@ververica.com> wrote:

> > Shameless plug:  Maybe the Wikipedia EventStreams
> <https://wikitech.wikimedia.org/wiki/Event_Platform/EventStreams> SSE API
> <https://stream.wikimedia.org/?doc#/streams> would make for a great
> connector example in Flink?
>
> Sounds like a great idea! Do you have a ready to use Java Client for that?
>
> On Mon, Jan 31, 2022 at 3:47 PM Jing Ge  wrote:
>
>> Thanks @Martijn for driving this! +1 for deprecating and removing it. All
>> the concerns mentioned previously are valid. It is good to know that the
>> upcoming connector template/archetype will help the user for the kickoff.
>> Beyond that, speaking of using a real connector as a sample, since Flink is
>> heading towards the unified batch and stream processing, IMHO, it would be
>> nice to pick up a feasible connector for this trend to let the user get a
>> sample close to the use cases.
>>
>> Best regards
>> Jing
>>
>> On Mon, Jan 31, 2022 at 3:07 PM Andrew Otto  wrote:
>>
>>> Shameless plug:  Maybe the Wikipedia EventStreams
>>> <https://wikitech.wikimedia.org/wiki/Event_Platform/EventStreams> SSE
>>> API <https://stream.wikimedia.org/?doc#/streams> would make for a great
>>> connector example in Flink?
>>>
>>> :D
>>>
>>> On Mon, Jan 31, 2022 at 5:41 AM Martijn Visser 
>>> wrote:
>>>
>>>> Hi all,
>>>>
>>>> Thanks for your feedback. It's not about having this connector in the
>>>> main repo, that has been voted on already. This is strictly about the
>>>> connector itself, since it's not maintained and most probably also can't be
>>>> used due to changes in Twitter's API that aren't reflected in our connector
>>>> implementation. Therefore I propose to remove it.
>>>>
>>>> Fully agree on the template part, what's good to know is that a
>>>> connector template/archetype is part of the goals for the external
>>>> connector repository.
>>>>
>>>> Best regards,
>>>>
>>>> Martijn
>>>>
>>>> On Mon, 31 Jan 2022 at 11:32, Francesco Guardiani <
>>>> france...@ververica.com> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> I agree with the concern about having this connector in the main repo.
>>>>> But I think in general it doesn't harm to have a sample connector to show
>>>>> how to develop a custom connector, and I think that the Twitter connector
>>>>> can be a good candidate for such a template. It needs rework for sure, as
>>>>> it has evident issues, notably it doesn't work with table.
>>>>>
>>>>> So i understand if we wanna remove what we have right now, but I think
>>>>> we should have some replacement for a "connector template", which is both
>>>>> ready to use and easy to hack to build your own connector starting from 
>>>>> it.
>>>>> Twitter API is a good example for such a template, as it's both "related"
>>>>> to the known common use cases of Flink and because is quite simple to get
>>>>> started with.
>>>>>
>>>>> FG
>>>>>
>>>>> On Sun, Jan 30, 2022 at 12:31 PM David Anderson 
>>>>> wrote:
>>>>>
>>>>>> I agree.
>>>>>>
>>>>>> The Twitter connector is used in a few (unofficial) tutorials, so if
>>>>>> we remove it that will make it more difficult for those tutorials to be
>>>>>> maintained. On the other hand, if I recall correctly, that connector uses
>>>>>> V1 of the Twitter API, which has been deprecated, so it's really not very
>>>>>> useful even for that purpose.
>>>>>>
>>>>>> David
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Fri, Jan 21, 2022 at 9:34 AM Martijn Visser 
>>>>>> wrote:
>>>>>>
>>>>>>> Hi everyone,
>>>>>>>
>>>>>>> I would like to discuss deprecating Flinks' Twitter connector [1].
>>>>>>> This was one of the first connectors that was added to Flink, which 
>>>>>>> could
>>>>>>> be used to access the tweets from Twitter. Given the evolution of Flink
>>>>>>> over Twitter, I don't think that:
>>>>>>>
>>>>>>> * Users are still using this connector at all
>>>>>>> * That the code for this connector should be in the main Flink
>>>>>>> codebase.
>>>>>>>
>>>>>>> Given the circumstances, I would propose to deprecate and remove
>>>>>>> this connector. I'm looking forward to your thoughts. If you agree, 
>>>>>>> please
>>>>>>> also let me know if you think we should first deprecate it in Flink 1.15
>>>>>>> and remove it in a version after that, or if you think we can remove it
>>>>>>> directly.
>>>>>>>
>>>>>>> Best regards,
>>>>>>>
>>>>>>> Martijn Visser
>>>>>>> https://twitter.com/MartijnVisser82
>>>>>>>
>>>>>>> [1]
>>>>>>> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/twitter/
>>>>>>>
>>>>>>>


Re: [DISCUSS] Deprecate/remove Twitter connector

2022-01-31 Thread Andrew Otto
Shameless plug:  Maybe the Wikipedia EventStreams
 SSE API
 would make for a great
connector example in Flink?

:D

On Mon, Jan 31, 2022 at 5:41 AM Martijn Visser 
wrote:

> Hi all,
>
> Thanks for your feedback. It's not about having this connector in the main
> repo, that has been voted on already. This is strictly about the connector
> itself, since it's not maintained and most probably also can't be used due
> to changes in Twitter's API that aren't reflected in our connector
> implementation. Therefore I propose to remove it.
>
> Fully agree on the template part, what's good to know is that a connector
> template/archetype is part of the goals for the external
> connector repository.
>
> Best regards,
>
> Martijn
>
> On Mon, 31 Jan 2022 at 11:32, Francesco Guardiani 
> wrote:
>
>> Hi,
>>
>> I agree with the concern about having this connector in the main repo.
>> But I think in general it doesn't harm to have a sample connector to show
>> how to develop a custom connector, and I think that the Twitter connector
>> can be a good candidate for such a template. It needs rework for sure, as
>> it has evident issues, notably it doesn't work with table.
>>
>> So i understand if we wanna remove what we have right now, but I think we
>> should have some replacement for a "connector template", which is both
>> ready to use and easy to hack to build your own connector starting from it.
>> Twitter API is a good example for such a template, as it's both "related"
>> to the known common use cases of Flink and because is quite simple to get
>> started with.
>>
>> FG
>>
>> On Sun, Jan 30, 2022 at 12:31 PM David Anderson 
>> wrote:
>>
>>> I agree.
>>>
>>> The Twitter connector is used in a few (unofficial) tutorials, so if we
>>> remove it that will make it more difficult for those tutorials to be
>>> maintained. On the other hand, if I recall correctly, that connector uses
>>> V1 of the Twitter API, which has been deprecated, so it's really not very
>>> useful even for that purpose.
>>>
>>> David
>>>
>>>
>>>
>>> On Fri, Jan 21, 2022 at 9:34 AM Martijn Visser 
>>> wrote:
>>>
 Hi everyone,

 I would like to discuss deprecating Flinks' Twitter connector [1]. This
 was one of the first connectors that was added to Flink, which could be
 used to access the tweets from Twitter. Given the evolution of Flink over
 Twitter, I don't think that:

 * Users are still using this connector at all
 * That the code for this connector should be in the main Flink
 codebase.

 Given the circumstances, I would propose to deprecate and remove this
 connector. I'm looking forward to your thoughts. If you agree, please also
 let me know if you think we should first deprecate it in Flink 1.15 and
 remove it in a version after that, or if you think we can remove it
 directly.

 Best regards,

 Martijn Visser
 https://twitter.com/MartijnVisser82

 [1]
 https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/twitter/




Re: [FEEDBACK] Metadata Platforms / Catalogs / Lineage integration

2022-01-13 Thread Andrew Otto
Hello!  The Wikimedia Foundation is currently doing a similar evaluation
(although we are not currently including any Flink considerations).

https://wikitech.wikimedia.org/wiki/Data_Catalog_Application_Evaluation_Rubric

More details will be published there as folks keep working on this.
Hope that helps a little bit! :)

-Andrew Otto

On Thu, Jan 13, 2022 at 10:27 AM Martijn Visser 
wrote:

> Hi everyone,
>
> I'm currently checking out different metadata platforms, such as Amundsen
> [1] and Datahub [2]. In short, these types of tools try to address problems
> related to topics such as data discovery, data lineage and an overall data
> catalogue.
>
> I'm reaching out to the Dev and User mailing lists to get some feedback.
> It would really help if you could spend a couple of minutes to let me know
> if you already use either one of the two mentioned metadata platforms or
> another one, or are you evaluating such tools? If so, is that for
> the purpose as a catalogue, for lineage or anything else? Any type of
> feedback on these types of tools is appreciated.
>
> Best regards,
>
> Martijn
>
> [1] https://github.com/amundsen-io/amundsen/
> [2] https://github.com/linkedin/datahub
>
>
>


[no subject]

2021-10-12 Thread Andrew Otto
Hello,

I'm trying to use HiveCatalog with Kerberos.  Our Hadoop cluster, our Hive
Metastore, and our Hive Server are kerberized.  I can successfully submit
Flink jobs to Yarn authenticated as my users using a cached ticket, as well
as using a keytab.

However, I can't seem to register a HiveCatalog with my TableEnvironment.
Here's my code:

import org.apache.flink.table.api._
import org.apache.flink.table.catalog.hive.HiveCatalog

val settings = EnvironmentSettings.newInstance().inStreamingMode().build()
val tableEnv = TableEnvironment.create(settings)
val catalog = new HiveCatalog("analytics_hive", "flink_test", "/etc/hive/conf")
tableEnv.registerCatalog("analytics_hive", catalog)


Which causes an exception:
Caused by: java.lang.reflect.InvocationTargetException:
org.apache.hadoop.hive.metastore.api.MetaException: Could not connect to
meta store using any of the URIs provided. Most recent failure:
org.apache.thrift.transport.TTransportException: GSS initiate failed

(Full stacktrace here
<https://gist.github.com/ottomata/79fbad1b97efebd9c71d1bf11d171ade>.)

The same error happens if I try to submit this job using my cached kerberos
ticket, or with a keytab.
I have also tried wrapping the HiveCatalog in a Hadoop UserGroupInformation
PrivilegedExceptionAction as described here
<https://blog.csdn.net/weibokong789/article/details/106427481> and got the
same result (no real idea what I'm doing here, just trying some things.)

Is there something more I have to do to use HiveCatalog with a kerberized
Hive Metastore?  Should Flink support this out of the box?

Thanks!
- Andrew Otto
  SRE, Wikimedia Foundation


Re: Prometheus Reporter Enhancement

2021-05-18 Thread Andrew Otto
Sounds useful!

On Tue, May 18, 2021 at 2:02 PM Mason Chen  wrote:

> Hi all,
>
> Would people appreciate enhancements to the prometheus reporter to include
> extra labels via a configuration, as a contribution to Flink? I can see it
> being useful for adding labels that are not job specific, but infra
> specific.
>
> The change would be nicely integrated with the Flink’s ConfigOptions and
> unit tested.
>
> Best,
> Mason
>