Re: Handling environment variables in Flink and within the Flink Operator

2022-08-23 Thread Francis Conroy
Hi Edgar,

this is how we've been doing it and it's working well.

apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  namespace: flink
  name: redacted-job
spec:
  image: flink:1.15-java11
  flinkVersion: v1_15
  flinkConfiguration:
taskmanager.numberOfTaskSlots: "1"
kubernetes.env.secretKeyRef:
"env:DJANGO_TOKEN,secret:special-secret-token,key:token"


Unfortunately, I don't think this is in the docs yet.

Francis Conroy I Software Engineer

Level 1, Building B, 91 Parry Street

Newcastle NSW 2302

*P* (02) 4786 0426 Ext 233

E francis.con...@switchdin.com

www.switchdin.com




On Tue, 23 Aug 2022 at 22:54, Edgar H  wrote:

> Morning all!
>
> Recently we've been trying to migrate from some custom in-house code we've
> got to manage our k8s cluster with Flink running into the recently released
> operator.
>
> However, we found an issue when parsing environment variables into the
> arguments which are sent into the jobs - not being able to read those when
> building that - https://issues.apache.org/jira/browse/FLINK-27491
>
> Trying different workarounds we were thinking if environment variables in
> the deployment could work but still, no luck so far.
>
> So, my question is:
>
> Which is the proper way to send environment variables into Flink jobs? Is
> it even possible?
> If it isn't, are there any plans on FLINK-27491 short-term?
>
> Could it be possible to mount a file as a secret and read it from the job?
> I assume we would be in the same situation as before and not even having
> the file in the launched job.
>
> Thanks!
>

-- 
This email and any attachments are proprietary and confidential and are 
intended solely for the use of the individual to whom it is addressed. Any 
views or opinions expressed are solely those of the author and do not 
necessarily reflect or represent those of SwitchDin Pty Ltd. If you have 
received this email in error, please let us know immediately by reply email 
and delete it from your system. You may not use, disseminate, distribute or 
copy this message nor disclose its contents to anyone. 
SwitchDin Pty Ltd 
(ABN 29 154893857) PO Box 1165, Newcastle NSW 2300 Australia


Re: Decompressing RMQ streaming messages

2022-07-24 Thread Francis Conroy
Hi Venkat,

I guess you're using another compression algorithm which isn't zlib, you'll
have to adapt the code to work with your algorithm of choice.

Kind regards,
Francis

On Fri, 22 Jul 2022 at 17:27, Ramana  wrote:

> Hi Francis - Thanks for the snippet. I tried using the same, however I get
> an error.
>
> Following is the error -
>
> java.util.zip.DataFormatException: incorrect header check.
>
> I see multiple errors, i beleive for every message i am seeing this stack
> trace?
>
> Any idea as to what could be causing this?
>
> Thanks
> Venkat
>
> On Fri, Jul 22, 2022, 06:05 Francis Conroy 
> wrote:
>
>> Hi Venkat,
>>
>> there's nothing that I know of, but I've written a zlib decompressor for
>> our payloads which was pretty straightforward.
>>
>> public class ZlibDeserializationSchema extends 
>> AbstractDeserializationSchema {
>> @Override
>> public byte[] deserialize(byte[] message) throws IOException {
>> Inflater decompressor = new Inflater();
>> ByteArrayOutputStream bos = new ByteArrayOutputStream();
>> decompressor.setInput(message);
>> byte[] buffer = new byte[1024];
>> int len=0;
>> do {
>> try {
>> len = decompressor.inflate(buffer);
>> } catch (DataFormatException e) {
>> e.printStackTrace();
>> }
>> bos.write(buffer, 0, len);
>> } while (len > 0);
>> decompressor.end();
>> bos.close();
>> return bos.toByteArray();
>> }
>> }
>>
>> hope that helps.
>>
>> On Thu, 21 Jul 2022 at 21:13, Ramana  wrote:
>>
>>> Hi - We have a requirement to read the compressed messages emitting out
>>> of RabbitMQ and to have them processed using PyFlink. However, I am not
>>> finding any out of the box functionality in PyFlink which can help
>>> decompress the messages.
>>>
>>> Could anybody help me with an example of how to go about this?
>>>
>>> Appreciate any help here.
>>>
>>> Thanks
>>>
>>> ~Venkat
>>>
>>>
>> This email and any attachments are proprietary and confidential and are
>> intended solely for the use of the individual to whom it is addressed. Any
>> views or opinions expressed are solely those of the author and do not
>> necessarily reflect or represent those of SwitchDin Pty Ltd. If you have
>> received this email in error, please let us know immediately by reply email
>> and delete it from your system. You may not use, disseminate, distribute or
>> copy this message nor disclose its contents to anyone.
>> SwitchDin Pty Ltd (ABN 29 154893857) PO Box 1165, Newcastle NSW 2300
>> Australia
>>
>

-- 
This email and any attachments are proprietary and confidential and are 
intended solely for the use of the individual to whom it is addressed. Any 
views or opinions expressed are solely those of the author and do not 
necessarily reflect or represent those of SwitchDin Pty Ltd. If you have 
received this email in error, please let us know immediately by reply email 
and delete it from your system. You may not use, disseminate, distribute or 
copy this message nor disclose its contents to anyone. 
SwitchDin Pty Ltd 
(ABN 29 154893857) PO Box 1165, Newcastle NSW 2300 Australia


Re: Decompressing RMQ streaming messages

2022-07-21 Thread Francis Conroy
Hi Venkat,

there's nothing that I know of, but I've written a zlib decompressor for
our payloads which was pretty straightforward.

public class ZlibDeserializationSchema extends
AbstractDeserializationSchema {
@Override
public byte[] deserialize(byte[] message) throws IOException {
Inflater decompressor = new Inflater();
ByteArrayOutputStream bos = new ByteArrayOutputStream();
decompressor.setInput(message);
byte[] buffer = new byte[1024];
int len=0;
do {
try {
len = decompressor.inflate(buffer);
} catch (DataFormatException e) {
e.printStackTrace();
}
bos.write(buffer, 0, len);
} while (len > 0);
decompressor.end();
bos.close();
return bos.toByteArray();
}
}

hope that helps.

On Thu, 21 Jul 2022 at 21:13, Ramana  wrote:

> Hi - We have a requirement to read the compressed messages emitting out of
> RabbitMQ and to have them processed using PyFlink. However, I am not
> finding any out of the box functionality in PyFlink which can help
> decompress the messages.
>
> Could anybody help me with an example of how to go about this?
>
> Appreciate any help here.
>
> Thanks
>
> ~Venkat
>
>

-- 
This email and any attachments are proprietary and confidential and are 
intended solely for the use of the individual to whom it is addressed. Any 
views or opinions expressed are solely those of the author and do not 
necessarily reflect or represent those of SwitchDin Pty Ltd. If you have 
received this email in error, please let us know immediately by reply email 
and delete it from your system. You may not use, disseminate, distribute or 
copy this message nor disclose its contents to anyone. 
SwitchDin Pty Ltd 
(ABN 29 154893857) PO Box 1165, Newcastle NSW 2300 Australia


Re: Using the official flink operator and kubernetes secrets

2022-05-04 Thread Francis Conroy
Hi all,

Thanks for looking into this. Yeah, I kept trying different variations of
the replacement fields with no success. I'm trying to use the .getenv()
technique now but our cluster is having problems and I haven't been able to
reinstall the operator.
I'll reply once it's all working.

Thanks,
Francis

On Thu, 5 May 2022 at 03:23, Meissner, Dylan 
wrote:

> Flink deployment resources support env interpolation natively using $()
> syntax. I expected this to "just work" like other resources when using the
> operator, but it does not.
>
>
> https://kubernetes.io/docs/tasks/inject-data-application/_print/#use-environment-variables-to-define-arguments
>
> job:
>   jarURI: local:///my.jar
>   entryClass: my.JobMainKt
>   args:
> - "--kafka.bootstrap.servers"
> - "my.kafka.host:9093"
> - "--kafka.sasl.username"
> - "$(KAFKA_SASL_USERNAME)"
> - "--kafka.sasl.password"
> - "$(KAFKA_SASL_PASSWORD)"
> ​
>
> It would be a great addition, simplifying job startup decision-making
> while following existing conventions.
>
> --
> *From:* Yang Wang 
> *Sent:* Tuesday, May 3, 2022 7:22 AM
> *To:* Őrhidi Mátyás 
> *Cc:* Francis Conroy ; user <
> user@flink.apache.org>
> *Subject:* Re: Using the official flink operator and kubernetes secrets
>
> Flink could not support environment replacement in the args. I think you
> could access the env via "*System.getenv()*" in the user main method.
> It should work since the user main method is executed in the JobManager
> side.
>
> Best,
> Yang
>
> Őrhidi Mátyás  于2022年4月28日周四 19:27写道:
>
> Also,
>
> just declaring it in the flink configs should be sufficient, no need to
> define it in the pod templates:
>
> flinkConfiguration:
> kubernetes.env.secretKeyRef: 
> "env:DJANGO_TOKEN,secret:switchdin-django-token,key:token"
>
>
> Best,
> Matyas
>
> On Thu, Apr 28, 2022 at 1:17 PM Őrhidi Mátyás 
> wrote:
>
> Hi Francis,
>
> I suggest accessing the environment variables directly, no need to pass
> them as command arguments I guess.
>
> Best,
> Matyas
>
> On Thu, Apr 28, 2022 at 11:31 AM Francis Conroy <
> francis.con...@switchdin.com> wrote:
>
> Hi all,
>
> I'm trying to use a kubernetes secret as a command line argument in my job
> and the text replacement doesn't seem to be happening. I've verified
> passing the custom args via the command line on my local flink cluster but
> can't seem to get the environment var replacement to work.
>
> apiVersion: flink.apache.org/v1alpha1
> kind: FlinkDeployment
> metadata:
>   namespace: default
>   name: http-over-mqtt
> spec:
>   image: flink:1.14.4-scala_2.12-java11
>   flinkVersion: v1_14
>   flinkConfiguration:
> taskmanager.numberOfTaskSlots: "2"
> kubernetes.env.secretKeyRef: 
> "env:DJANGO_TOKEN,secret:switchdin-django-token,key:token"
> #containerized.taskmanager.env.DJANGO_TOKEN: "$DJANGO_TOKEN"
>   serviceAccount: flink
>   jobManager:
> replicas: 1
> resource:
>   memory: "1024m"
>   cpu: 1
>   taskManager:
> resource:
>   memory: "1024m"
>   cpu: 1
>   podTemplate:
> spec:
>   serviceAccount: flink
>   containers:
> - name: flink-main-container
>   volumeMounts:
> - mountPath: /flink-job
>   name: flink-jobs
>   env:
> - name: DJANGO_TOKEN  # kubectl create secret generic 
> switchdin-django-token --from-literal=token='[TOKEN]'
>   valueFrom:
> secretKeyRef:
>   name: switchdin-django-token
>   key: token
>   optional: false
>   initContainers:
> - name: grab-mqtt-over-http-jar
>   image: docker-push.k8s.local/test/switchdin/platform_flink:job-41
>   command: [ "/bin/sh", "-c",
>  "cp /opt/switchdin/* /tmp/job/." ]  # Copies the jar in 
> the init container to the flink-jobs volume
>   volumeMounts:
> - name: flink-jobs
>   mountPath: /tmp/job
>   volumes:
> - name: flink-jobs
>   emptyDir: { }
>   job:
> jarURI: local:///flink-job/switchdin-topologies-1.0-SNAPSHOT.jar
> args: ["--swit-django-token", "$DJANGO_TOKEN",
>"--swit-prod","false"]
> entryClass: org.switchdin.HTTPOverMQTT
> parallelism: 1
> upgradeMode: stateless
> state: running
&g

Re: How should I call external HTTP services with PyFlink?

2022-05-02 Thread Francis Conroy
Hi Dhavan,

We have looked at using pyflink for data stream enrichment and found the
performance lacking compared to the java counterpart. One option for you
might be to use statefun for the enrichment stages. We've also changed our
model for enrichment, we're pushing the enrichment data into the pipeline
instead of pulling it, but this won't work in a lot of situations.

Hope that gives you some ideas.

On Mon, 2 May 2022 at 22:54, Dhavan Vaidya 
wrote:

> Hello!
>
> I want to make HTTP(S) calls to enrich data streams. The HTTP services are
> running on our VPC, so the delay is limited, but sometimes these services
> end up calling third party APIs, and latencies become high.
>
> From documentation (
> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/python/overview/)
> it seems PyFlink does not support "asyncio operator" like Java does (
> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/operators/asyncio/).
> Am I missing something? How should this be approached?
>
> Thanks!
>
> --
> Dhavan
>

-- 
This email and any attachments are proprietary and confidential and are 
intended solely for the use of the individual to whom it is addressed. Any 
views or opinions expressed are solely those of the author and do not 
necessarily reflect or represent those of SwitchDin Pty Ltd. If you have 
received this email in error, please let us know immediately by reply email 
and delete it from your system. You may not use, disseminate, distribute or 
copy this message nor disclose its contents to anyone. 
SwitchDin Pty Ltd 
(ABN 29 154893857) PO Box 1165, Newcastle NSW 2300 Australia


Using the official flink operator and kubernetes secrets

2022-04-28 Thread Francis Conroy
Hi all,

I'm trying to use a kubernetes secret as a command line argument in my job
and the text replacement doesn't seem to be happening. I've verified
passing the custom args via the command line on my local flink cluster but
can't seem to get the environment var replacement to work.

apiVersion: flink.apache.org/v1alpha1
kind: FlinkDeployment
metadata:
  namespace: default
  name: http-over-mqtt
spec:
  image: flink:1.14.4-scala_2.12-java11
  flinkVersion: v1_14
  flinkConfiguration:
taskmanager.numberOfTaskSlots: "2"
kubernetes.env.secretKeyRef:
"env:DJANGO_TOKEN,secret:switchdin-django-token,key:token"
#containerized.taskmanager.env.DJANGO_TOKEN: "$DJANGO_TOKEN"
  serviceAccount: flink
  jobManager:
replicas: 1
resource:
  memory: "1024m"
  cpu: 1
  taskManager:
resource:
  memory: "1024m"
  cpu: 1
  podTemplate:
spec:
  serviceAccount: flink
  containers:
- name: flink-main-container
  volumeMounts:
- mountPath: /flink-job
  name: flink-jobs
  env:
- name: DJANGO_TOKEN  # kubectl create secret generic
switchdin-django-token --from-literal=token='[TOKEN]'
  valueFrom:
secretKeyRef:
  name: switchdin-django-token
  key: token
  optional: false
  initContainers:
- name: grab-mqtt-over-http-jar
  image: docker-push.k8s.local/test/switchdin/platform_flink:job-41
  command: [ "/bin/sh", "-c",
 "cp /opt/switchdin/* /tmp/job/." ]  # Copies the
jar in the init container to the flink-jobs volume
  volumeMounts:
- name: flink-jobs
  mountPath: /tmp/job
  volumes:
- name: flink-jobs
  emptyDir: { }
  job:
jarURI: local:///flink-job/switchdin-topologies-1.0-SNAPSHOT.jar
args: ["--swit-django-token", "$DJANGO_TOKEN",
   "--swit-prod","false"]
entryClass: org.switchdin.HTTPOverMQTT
parallelism: 1
upgradeMode: stateless
state: running

In the logs I can see:

2022-04-28 08:43:02,329 WARN org.switchdin.HTTPOverMQTT [] - ARGS ARE {}
2022-04-28 08:43:02,329 WARN org.switchdin.HTTPOverMQTT [] -
--swit-django-token
2022-04-28 08:43:02,330 WARN org.switchdin.HTTPOverMQTT [] - $DJANGO_TOKEN
2022-04-28 08:43:02,330 WARN org.switchdin.HTTPOverMQTT [] - --swit-prod
2022-04-28 08:43:02,330 WARN org.switchdin.HTTPOverMQTT [] - false

Anyone know how I can do this? I'm considering mounting it in a volume, but
that seems like a lot of hassle for such a small thing.

Thanks in advance!

-- 
This email and any attachments are proprietary and confidential and are 
intended solely for the use of the individual to whom it is addressed. Any 
views or opinions expressed are solely those of the author and do not 
necessarily reflect or represent those of SwitchDin Pty Ltd. If you have 
received this email in error, please let us know immediately by reply email 
and delete it from your system. You may not use, disseminate, distribute or 
copy this message nor disclose its contents to anyone. 
SwitchDin Pty Ltd 
(ABN 29 154893857) PO Box 1165, Newcastle NSW 2300 Australia


Re: UUID on TableAPI

2022-04-25 Thread Francis Conroy
 Hi Quynh,

My understanding is mostly based on the documentation I linked in the first
reply. If the flink version and the query both remain the same then you can
restart a job from a savepoint, this means that it might be workable for
running a low-criticality job on say an AWS spot instance. That's about all
I know.

On Tue, 26 Apr 2022 at 10:17, lan tran  wrote:

> Hi Francis,
>
> Thanks for the reply. However, can you elaborate more on the part ‘work
> for cases where you wish to pause/resume a job’ ? Is there another way
> besides savepoint having this mechanism that can implement in Table API ?
>
> Best,
> Quynh
>
>
>
> Sent from Mail <https://go.microsoft.com/fwlink/?LinkId=550986> for
> Windows
>
>
>
> *From: *Francis Conroy 
> *Sent: *Tuesday, April 26, 2022 7:07 AM
> *To: *lan tran 
> *Cc: *user@flink.apache.org
> *Subject: *Re: UUID on TableAPI
>
>
>
> Hi  Quynh,
>
>
>
> Set the savepoint UUID? Is that what you mean? The savepoint UUIDs are
> issued dynamically when you request them, flink won't know automatically
> what the last savepoint was, but you can start a new job and restore from a
> savepoint by passing in the UUID. All that said there are limitations
> around using savepoints and Flink SQL because of the way the planner works
> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/concepts/overview/#stateful-upgrades-and-evolution.
> However it might work for cases where you wish to pause/resume a job.
>
>
>
> On Fri, 22 Apr 2022 at 13:54, lan tran  wrote:
>
> Hi team,
> Currently, I want to use savepoints in Flink. However, one of the things
> that I concern is that is there any way we can set the UUID while using
> Table API (SQL API) ? If not, does it has any mechanism to know that when
> we start the Flink again, it will know that it was that UUID ?
>
> Best,
> Quynh.
>
>
>
> Sent from Mail <https://go.microsoft.com/fwlink/?LinkId=550986> for
> Windows
>
>
>
>
>
> This email and any attachments are proprietary and confidential and are
> intended solely for the use of the individual to whom it is addressed. Any
> views or opinions expressed are solely those of the author and do not
> necessarily reflect or represent those of SwitchDin Pty Ltd. If you have
> received this email in error, please let us know immediately by reply email
> and delete it from your system. You may not use, disseminate, distribute or
> copy this message nor disclose its contents to anyone.
>
> SwitchDin Pty Ltd (ABN 29 154893857) PO Box 1165, Newcastle NSW 2300
> Australia
>
>
>

-- 
This email and any attachments are proprietary and confidential and are 
intended solely for the use of the individual to whom it is addressed. Any 
views or opinions expressed are solely those of the author and do not 
necessarily reflect or represent those of SwitchDin Pty Ltd. If you have 
received this email in error, please let us know immediately by reply email 
and delete it from your system. You may not use, disseminate, distribute or 
copy this message nor disclose its contents to anyone. 
SwitchDin Pty Ltd 
(ABN 29 154893857) PO Box 1165, Newcastle NSW 2300 Australia


Re: UUID on TableAPI

2022-04-25 Thread Francis Conroy
Hi  Quynh,

Set the savepoint UUID? Is that what you mean? The savepoint UUIDs are
issued dynamically when you request them, flink won't know automatically
what the last savepoint was, but you can start a new job and restore from a
savepoint by passing in the UUID. All that said there are limitations
around using savepoints and Flink SQL because of the way the planner works
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/concepts/overview/#stateful-upgrades-and-evolution.
However it might work for cases where you wish to pause/resume a job.

On Fri, 22 Apr 2022 at 13:54, lan tran  wrote:

> Hi team,
> Currently, I want to use savepoints in Flink. However, one of the things
> that I concern is that is there any way we can set the UUID while using
> Table API (SQL API) ? If not, does it has any mechanism to know that when
> we start the Flink again, it will know that it was that UUID ?
>
> Best,
> Quynh.
>
>
>
> Sent from Mail  for
> Windows
>
>
>

-- 
This email and any attachments are proprietary and confidential and are 
intended solely for the use of the individual to whom it is addressed. Any 
views or opinions expressed are solely those of the author and do not 
necessarily reflect or represent those of SwitchDin Pty Ltd. If you have 
received this email in error, please let us know immediately by reply email 
and delete it from your system. You may not use, disseminate, distribute or 
copy this message nor disclose its contents to anyone. 
SwitchDin Pty Ltd 
(ABN 29 154893857) PO Box 1165, Newcastle NSW 2300 Australia


Re: Official Flink operator additional class paths

2022-04-07 Thread Francis Conroy
Hi Yang,

thanks a lot for your help. It ended up being the case that my command in
the initContainer was specified incorrectly.


On Thu, 7 Apr 2022 at 18:41, Yang Wang  wrote:

> It seems that you have a typo when specifying the pipeline classpath.
> "file:///flink-jar/flink-connector-rabbitmq_2.12-1.14.4.jar" ->
> "file:///flink-jars/flink-connector-rabbitmq_2.12-1.14.4.jar"
>
> If this is not the root cause, maybe you could have a try with downloading
> the connector jars to /opt/flink/usrlib. The usrlib will be loaded to the
> user classloader automatically without any configuration.
>
> BTW, I am not aware of any other bugs which will cause pipeline classpath
> not take effect except FLINK-21289[1].
>
> [1]. https://issues.apache.org/jira/browse/FLINK-21289
>
> Best,
> Yang
>
> Francis Conroy  于2022年4月7日周四 15:14写道:
>
>> Hi all,
>> thanks in advance for any tips.
>>
>> I've been trying to specify some additional classpaths in my kubernetes
>> yaml file when using the official flink operator and nothing seems to work.
>>
>> I know the technique for getting my job jar works fine since it's finding
>> the class ok, but I cannot get the RabbitMQ connector jar to load.
>>
>> apiVersion: flink.apache.org/v1alpha1
>> kind: FlinkDeployment
>> metadata:
>>   namespace: default
>>   name: http-over-mqtt
>> spec:
>>   image: flink:1.14.4-scala_2.12-java11
>>   flinkVersion: v1_14
>>   flinkConfiguration:
>> taskmanager.numberOfTaskSlots: "2"
>> pipeline.classpaths: 
>> "file:///flink-jar/flink-connector-rabbitmq_2.12-1.14.4.jar"
>>   serviceAccount: flink
>>   jobManager:
>> replicas: 1
>> resource:
>>   memory: "1024m"
>>   cpu: 1
>>   taskManager:
>> resource:
>>   memory: "1024m"
>>   cpu: 1
>>   podTemplate:
>> spec:
>>   serviceAccount: flink
>>   containers:
>> - name: flink-main-container
>>   volumeMounts:
>> - mountPath: /flink-job
>>   name: flink-jobs
>> - mountPath: /flink-jars
>>   name: flink-jars
>>   initContainers:
>> - name: grab-mqtt-over-http-jar
>>   image: busybox
>>   command: [ '/bin/sh', '-c',
>>  'cd /tmp/job; wget 
>> https://jenkins/job/platform_flink/job/master/39/artifact/src-java/switchdin-topologies/target/switchdin-topologies-1.0-SNAPSHOT.jar
>>  --no-check-certificate;',
>>  'cd /tmp/jar; wget 
>> https://repo1.maven.org/maven2/org/apache/flink/flink-connector-rabbitmq_2.12/1.14.4/flink-connector-rabbitmq_2.12-1.14.4.jar'
>>  ]
>>   volumeMounts:
>> - name: flink-jobs
>>   mountPath: /tmp/job
>> - name: flink-jars
>>   mountPath: /tmp/jar
>>   volumes:
>> - name: flink-jobs
>>   emptyDir: { }
>> - name: flink-jars
>>   emptyDir: { }
>>   job:
>> jarURI: local:///flink-job/switchdin-topologies-1.0-SNAPSHOT.jar
>> entryClass: org.switchdin.HTTPOverMQTT
>> parallelism: 1
>> upgradeMode: stateless
>> state: running
>>
>> Any ideas? I've looked at the ConfigMaps that result and they also look
>> fine.
>> apiVersion: v1
>> data:
>>   flink-conf.yaml: "blob.server.port: 6124\nkubernetes.jobmanager.replicas:
>> 1\njobmanager.rpc.address:
>> http-over-mqtt.default\nkubernetes.taskmanager.cpu: 1.0\n
>> kubernetes.service-account:
>> flink\nkubernetes.cluster-id: http-over-mqtt\n
>> $internal.application.program-args:
>> \nkubernetes.container.image: flink:1.14.4-scala_2.12-java11\n
>> parallelism.default:
>> 1\nkubernetes.namespace: default\ntaskmanager.numberOfTaskSlots: 2\n
>> kubernetes.rest-service.exposed.type:
>> ClusterIP\n$internal.application.main: org.switchdin.HTTPOverMQTT\n
>> taskmanager.memory.process.size:
>> 1024m\nkubernetes.internal.jobmanager.entrypoint.class:
>> org.apache.flink.kubernetes.entrypoint.KubernetesApplicationClusterEntrypoint
>> \nkubernetes.pod-template-file:
>> /tmp/podTemplate_11292791104169595925.yaml\n
>> kubernetes.pod-template-file.taskmanager:
>> /tmp/podTemplate_17362225267763549900.yaml\nexecution.target:
>> kubernetes-application\njobmanager.memory.process.size:
>> 1024m\njobmanager.rpc.port: 6123\ntaskmanager.rpc.port: 6122\n
>> internal.cluster.execution

Official Flink operator additional class paths

2022-04-07 Thread Francis Conroy
Hi all,
thanks in advance for any tips.

I've been trying to specify some additional classpaths in my kubernetes
yaml file when using the official flink operator and nothing seems to work.

I know the technique for getting my job jar works fine since it's finding
the class ok, but I cannot get the RabbitMQ connector jar to load.

apiVersion: flink.apache.org/v1alpha1
kind: FlinkDeployment
metadata:
  namespace: default
  name: http-over-mqtt
spec:
  image: flink:1.14.4-scala_2.12-java11
  flinkVersion: v1_14
  flinkConfiguration:
taskmanager.numberOfTaskSlots: "2"
pipeline.classpaths:
"file:///flink-jar/flink-connector-rabbitmq_2.12-1.14.4.jar"
  serviceAccount: flink
  jobManager:
replicas: 1
resource:
  memory: "1024m"
  cpu: 1
  taskManager:
resource:
  memory: "1024m"
  cpu: 1
  podTemplate:
spec:
  serviceAccount: flink
  containers:
- name: flink-main-container
  volumeMounts:
- mountPath: /flink-job
  name: flink-jobs
- mountPath: /flink-jars
  name: flink-jars
  initContainers:
- name: grab-mqtt-over-http-jar
  image: busybox
  command: [ '/bin/sh', '-c',
 'cd /tmp/job; wget
https://jenkins/job/platform_flink/job/master/39/artifact/src-java/switchdin-topologies/target/switchdin-topologies-1.0-SNAPSHOT.jar
--no-check-certificate;',
 'cd /tmp/jar; wget
https://repo1.maven.org/maven2/org/apache/flink/flink-connector-rabbitmq_2.12/1.14.4/flink-connector-rabbitmq_2.12-1.14.4.jar'
]
  volumeMounts:
- name: flink-jobs
  mountPath: /tmp/job
- name: flink-jars
  mountPath: /tmp/jar
  volumes:
- name: flink-jobs
  emptyDir: { }
- name: flink-jars
  emptyDir: { }
  job:
jarURI: local:///flink-job/switchdin-topologies-1.0-SNAPSHOT.jar
entryClass: org.switchdin.HTTPOverMQTT
parallelism: 1
upgradeMode: stateless
state: running

Any ideas? I've looked at the ConfigMaps that result and they also look
fine.
apiVersion: v1
data:
  flink-conf.yaml: "blob.server.port: 6124\nkubernetes.jobmanager.replicas:
1\njobmanager.rpc.address:
http-over-mqtt.default\nkubernetes.taskmanager.cpu: 1.0\n
kubernetes.service-account:
flink\nkubernetes.cluster-id: http-over-mqtt\n
$internal.application.program-args:
\nkubernetes.container.image: flink:1.14.4-scala_2.12-java11\n
parallelism.default:
1\nkubernetes.namespace: default\ntaskmanager.numberOfTaskSlots: 2\n
kubernetes.rest-service.exposed.type:
ClusterIP\n$internal.application.main: org.switchdin.HTTPOverMQTT\n
taskmanager.memory.process.size:
1024m\nkubernetes.internal.jobmanager.entrypoint.class:
org.apache.flink.kubernetes.entrypoint.KubernetesApplicationClusterEntrypoint
\nkubernetes.pod-template-file:
/tmp/podTemplate_11292791104169595925.yaml\n
kubernetes.pod-template-file.taskmanager:
/tmp/podTemplate_17362225267763549900.yaml\nexecution.target:
kubernetes-application\njobmanager.memory.process.size:
1024m\njobmanager.rpc.port: 6123\ntaskmanager.rpc.port: 6122\n
internal.cluster.execution-mode:
NORMAL\nqueryable-state.proxy.ports: 6125\npipeline.jars:
local:///flink-job/switchdin-topologies-1.0-SNAPSHOT.jar\n
kubernetes.jobmanager.cpu:
1.0\npipeline.classpath:
file:///flink-jars/flink-connector-rabbitmq_2.12-1.14.4.jar\n
kubernetes.pod-template-file.jobmanager:
/tmp/podTemplate_17029501154997462433.yaml\n"

-- 
This email and any attachments are proprietary and confidential and are 
intended solely for the use of the individual to whom it is addressed. Any 
views or opinions expressed are solely those of the author and do not 
necessarily reflect or represent those of SwitchDin Pty Ltd. If you have 
received this email in error, please let us know immediately by reply email 
and delete it from your system. You may not use, disseminate, distribute or 
copy this message nor disclose its contents to anyone. 
SwitchDin Pty Ltd 
(ABN 29 154893857) PO Box 1165, Newcastle NSW 2300 Australia


Re: pyflink object to java object

2022-02-28 Thread Francis Conroy
Hi Xingbo,

I think that might work for me, I'll give it a try

On Tue, 1 Mar 2022 at 15:06, Xingbo Huang  wrote:

> Hi,
> With py4j, you can call any Java method. On how to create a Java Row, you
> can call the `createRowWithNamedPositions` method of `RowUtils`[1].
>
> [1]
> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/types/RowUtils.java#
>
> Best,
> Xingbo
>
> Francis Conroy  于2022年2月25日周五 14:35写道:
>
>> Hi all,
>>
>> we're using pyflink for most of our flink work and are sometimes into a
>> java process function.
>> Our new java process function takes an argument in in the constructor
>> which is a Row containing default values. I've declared my Row in pyflink
>> like this:
>>
>> default_row = Row(ep_uuid="",
>>   unit_uuid=None,
>>   unit_longitude=None,
>>   unit_latitude=None,
>>   unit_state=None,
>>   unit_country=None,
>>   pf_uuid=None,
>>   pf_name=None)
>>
>> row_type_information = RowTypeInfo([Types.STRING(),  # ep_uuid
>>Types.STRING(),  # unit_uuid
>>Types.DOUBLE(),  # unit_longitude
>>Types.DOUBLE(),  # unit_latitude
>>Types.STRING(),  # unit_state
>>Types.STRING(),  # unit_country
>>Types.STRING(),  # pf_uuid
>>Types.STRING()  # pf_name
>>])
>>
>> I'm now trying to get a handle to a java row object in the jvm so I can
>> pass that into the process function's constructor.
>>
>> endpoint_info_enriched_stream = 
>> DataStream(ds._j_data_stream.connect(endpoint_info_stream._j_data_stream).process(
>> jvm.org.switchdin.operators.TableEnrich(j_obj)))
>>
>> I've tried a few approaches, but I really can't figure out how to do
>> this, I'm not sure what I need on each side for this, a coder, serializer,
>> pickler?
>>
>>
>> This email and any attachments are proprietary and confidential and are
>> intended solely for the use of the individual to whom it is addressed. Any
>> views or opinions expressed are solely those of the author and do not
>> necessarily reflect or represent those of SwitchDin Pty Ltd. If you have
>> received this email in error, please let us know immediately by reply email
>> and delete it from your system. You may not use, disseminate, distribute or
>> copy this message nor disclose its contents to anyone.
>> SwitchDin Pty Ltd (ABN 29 154893857) PO Box 1165, Newcastle NSW 2300
>> Australia
>>
>

-- 
This email and any attachments are proprietary and confidential and are 
intended solely for the use of the individual to whom it is addressed. Any 
views or opinions expressed are solely those of the author and do not 
necessarily reflect or represent those of SwitchDin Pty Ltd. If you have 
received this email in error, please let us know immediately by reply email 
and delete it from your system. You may not use, disseminate, distribute or 
copy this message nor disclose its contents to anyone. 
SwitchDin Pty Ltd 
(ABN 29 154893857) PO Box 1165, Newcastle NSW 2300 Australia


pyflink object to java object

2022-02-24 Thread Francis Conroy
Hi all,

we're using pyflink for most of our flink work and are sometimes into a
java process function.
Our new java process function takes an argument in in the constructor which
is a Row containing default values. I've declared my Row in pyflink like
this:

default_row = Row(ep_uuid="",
  unit_uuid=None,
  unit_longitude=None,
  unit_latitude=None,
  unit_state=None,
  unit_country=None,
  pf_uuid=None,
  pf_name=None)

row_type_information = RowTypeInfo([Types.STRING(),  # ep_uuid
   Types.STRING(),  # unit_uuid
   Types.DOUBLE(),  # unit_longitude
   Types.DOUBLE(),  # unit_latitude
   Types.STRING(),  # unit_state
   Types.STRING(),  # unit_country
   Types.STRING(),  # pf_uuid
   Types.STRING()  # pf_name
   ])

I'm now trying to get a handle to a java row object in the jvm so I can
pass that into the process function's constructor.

endpoint_info_enriched_stream =
DataStream(ds._j_data_stream.connect(endpoint_info_stream._j_data_stream).process(
jvm.org.switchdin.operators.TableEnrich(j_obj)))

I've tried a few approaches, but I really can't figure out how to do this,
I'm not sure what I need on each side for this, a coder, serializer,
pickler?

-- 
This email and any attachments are proprietary and confidential and are 
intended solely for the use of the individual to whom it is addressed. Any 
views or opinions expressed are solely those of the author and do not 
necessarily reflect or represent those of SwitchDin Pty Ltd. If you have 
received this email in error, please let us know immediately by reply email 
and delete it from your system. You may not use, disseminate, distribute or 
copy this message nor disclose its contents to anyone. 
SwitchDin Pty Ltd 
(ABN 29 154893857) PO Box 1165, Newcastle NSW 2300 Australia


Joining a flink materialised view table with a streaming table

2022-02-22 Thread Francis Conroy
Hi all,
I recently put up a question about a deduplication query related to a join
and realised that I was probably asking the wrong question. I'm using Flink
1.15-SNAPSHOT (97ddc39945cda9bf1f52ab159852fdb606201cf2) as we're using the
RabbitMQ connector with pyflink. We won't go to prod until 1.15 is released.

I've got a materialised view based on some CDC tables, I've also run a
deduplication query on this to ensure that the primary key is unique, I
just want to join this with a table created from a dataStream and don't
care about it being deterministic or having a temporal join on a versioned
table, I just want to join with the corresponding value in the table at
that time (wall clock time).

I think I want something like a lookup join(
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/#lookup-join)
but where I'm joining with a dynamic deduplicated view instead of a JDBC
table.

I tried doing a temporal join against a versioned table but the join is
always empty for some reason.
Thanks for your help.

-- 
This email and any attachments are proprietary and confidential and are 
intended solely for the use of the individual to whom it is addressed. Any 
views or opinions expressed are solely those of the author and do not 
necessarily reflect or represent those of SwitchDin Pty Ltd. If you have 
received this email in error, please let us know immediately by reply email 
and delete it from your system. You may not use, disseminate, distribute or 
copy this message nor disclose its contents to anyone. 
SwitchDin Pty Ltd 
(ABN 29 154893857) PO Box 1165, Newcastle NSW 2300 Australia


Re: Flink 1.15 deduplication view and lookup join

2022-02-22 Thread Francis Conroy
Hi Yun,

The joined data is the versioned table in this case, I managed to get it as
far as fixing all of the static errors but the temporal join just doesn't
have a result... No idea what's going on.
In reality I don't think we even want a temporal join, we just want to add
a few extra columns to each row in the streaming table and not keep the
result in state.
I'll ask this question specifically to the user ML and see if anyone has an
idea.

On Fri, 18 Feb 2022 at 15:15, Yun Gao  wrote:

>
> Hi Francis,
>
> I think requiring primary for versioned table[1] used in temporarl join[2]
> should be
> expected. May I have a double confirmation that which table serves as the
> versioned
> table in this case? Is it the streaming table from the rabbitmq or the
> joined data?
>
> Best,
> Yun
>
>
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/concepts/versioned_tables/
> [2]
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/#temporal-joins
>
>
>
> ------Original Mail --
> *Sender:*Francis Conroy 
> *Send Date:*Thu Feb 17 11:27:01 2022
> *Recipients:*user 
> *Subject:*Flink 1.15 deduplication view and lookup join
>
>> Hi user group,
>>
>> I'm using flink 1.15 currently (we're waiting for it to be released) to
>> build up some streaming pipelines and I'm trying to do a temporal lookup
>> join.
>>
>> I've got several tables(all with primary keys) defined which are
>> populated by Debezium CDC data, let's call them a, b and c.
>>
>> I've defined a view which joins all three tables to give some
>> hierarchical association data rows like in the diagram.
>> [image: image.png]
>> This all works fine so far.
>> I'm trying to join this table with a table from a datastream, using a
>> lookup join (
>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/#lookup-join)
>> like follows:
>> [image: image.png]
>> I've added a time field to both tables now and I'm getting the following
>> validation exception:
>> *Temporal Table Join requires primary key in versioned table, but no
>> primary key can be found.*
>>
>>  I went and implemented another view on the joined data which
>> implemented the deduplication query (
>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/deduplication/#deduplication)
>>
>>
>> Here is my view definition:
>>
>> CREATE VIEW versioned_endpoint_association AS
>> SELECT device_id,
>>leg_dt_id,
>>ldt_id,
>>ep_uuid,
>>unit_uuid,
>>pf_uuid,
>>update_time
>> FROM (
>> SELECT *,
>>ROW_NUMBER() OVER (PARTITION BY device_id
>>ORDER BY update_time DESC) as rownum
>>   FROM endpoint_association)
>> WHERE rownum = 1;
>>
>> After taking all steps I cannot get the temporal join to work, am I
>> missing some detail which will tell flink that
>> versioned_endpoint_association should in-fact be interpreted as a versioned
>> table?
>>
>> Looking at the log it's important that there is a LogicalRank node which
>> can convert to a Deduplicate node, but the conversion isn't happening.
>>
>> [image: image.png]
>>
>>
>> This email and any attachments are proprietary and confidential and are
>> intended solely for the use of the individual to whom it is addressed. Any
>> views or opinions expressed are solely those of the author and do not
>> necessarily reflect or represent those of SwitchDin Pty Ltd. If you have
>> received this email in error, please let us know immediately by reply email
>> and delete it from your system. You may not use, disseminate, distribute or
>> copy this message nor disclose its contents to anyone.
>> SwitchDin Pty Ltd (ABN 29 154893857) PO Box 1165, Newcastle NSW 2300
>> Australia
>>
>

-- 
This email and any attachments are proprietary and confidential and are 
intended solely for the use of the individual to whom it is addressed. Any 
views or opinions expressed are solely those of the author and do not 
necessarily reflect or represent those of SwitchDin Pty Ltd. If you have 
received this email in error, please let us know immediately by reply email 
and delete it from your system. You may not use, disseminate, distribute or 
copy this message nor disclose its contents to anyone. 
SwitchDin Pty Ltd 
(ABN 29 154893857) PO Box 1165, Newcastle NSW 2300 Australia


Re: CVE-2021-44228 - Log4j2 vulnerability

2022-02-20 Thread Francis Conroy
The release notification email came out a few days ago.



On Mon, 21 Feb 2022 at 14:18, Surendra Lalwani 
wrote:

> Hi Team,
>
> Any updates on Flink 1.13.6 version release?
>
> Regards,
> Surendra Lalwani
>
>
> On Fri, Feb 4, 2022 at 1:23 PM Martijn Visser 
> wrote:
>
>> Hi Surendra,
>>
>> You can follow the discussion on this topic in the Dev mailing list [1].
>> I would expect it in the next couple of weeks.
>>
>> Best regards,
>>
>> Martijn
>>
>> [1] https://lists.apache.org/thread/n417406j125n080vopljgfflc45yygh4
>>
>> On Fri, 4 Feb 2022 at 08:49, Surendra Lalwani 
>> wrote:
>>
>>> Hi Team,
>>>
>>> Any ETA on Flink version 1.13.6 release.
>>>
>>> Thanks and Regards ,
>>> Surendra Lalwani
>>>
>>>
>>> On Sun, Jan 9, 2022 at 3:50 PM David Morávek  wrote:
>>>
 Flink community officially only supports current and previous minor
 versions [1] (1.13, 1.14) with bug fixes. Personally I wouldn’t expect
 there will be another patch release for 1.12.

 If you really need an extra release for the unsupported version, the
 most straightforward approach would be manually building the Flink
 distribution from sources [2] with the patches you need.

 [1]
 https://flink.apache.org/downloads.html#update-policy-for-old-releases
 [2]

 https://github.com/apache/flink/tree/release-1.12#building-apache-flink-from-source

 D.

 On Sun 9. 1. 2022 at 10:10, V N, Suchithra (Nokia - IN/Bangalore) <
 suchithra@nokia.com> wrote:

> Hi David,
>
>
>
> As per the below comments, Flink 1.14.3 is in preparation and this
> hasn't started yet for Flink 1.13.6. Flink 1.12.8 release will be
> planned after this? If there is no current plan, could you please let us
> know what will be the regular release timing for 1.12.8 version.
>
>
>
> Regards,
>
> Suchithra
>
>
>
> *From:* David Morávek 
> *Sent:* Sunday, January 9, 2022 12:11 AM
> *To:* V N, Suchithra (Nokia - IN/Bangalore) 
> *Cc:* Chesnay Schepler ; Martijn Visser <
> mart...@ververica.com>; Michael Guterl ; Parag
> Somani ; patrick.eif...@sony.com; Richard
> Deurwaarder ; User ;
> subharaj.ma...@gmail.com; swamy.haj...@gmail.com
> *Subject:* Re: CVE-2021-44228 - Log4j2 vulnerability
>
>
>
> Hi Suchithra,
>
>
>
> there is currently no plan on doing another 1.12 release
>
>
>
> D.
>
>
>
> On Sat 8. 1. 2022 at 18:02, V N, Suchithra (Nokia - IN/Bangalore) <
> suchithra@nokia.com> wrote:
>
> Hi,
>
>
>
> When can we expect the flink 1.12 releases with log4j 2.17.1?
>
>
>
> Thanks,
>
> Suchithra
>
>
>
> *From:* Martijn Visser 
> *Sent:* Thursday, January 6, 2022 7:45 PM
> *To:* patrick.eif...@sony.com
> *Cc:* David Morávek ; swamy.haj...@gmail.com;
> subharaj.ma...@gmail.com; V N, Suchithra (Nokia - IN/Bangalore) <
> suchithra@nokia.com>; Chesnay Schepler ; User
> ; Michael Guterl ; Richard
> Deurwaarder ; Parag Somani 
> *Subject:* Re: CVE-2021-44228 - Log4j2 vulnerability
>
>
>
> Hi all,
>
>
>
> The ticket for upgrading Log4J to 2.17.0 is
> https://issues.apache.org/jira/browse/FLINK-25375. There's also the
> update to Log4j 2.17.1 which is tracked under
> https://issues.apache.org/jira/browse/FLINK-25472
>
>
>
> As you can see, both have a fix version set to 1.14.3 and 1.13.6.
> These versions haven't been released yet. Flink 1.14.3 is in preparation,
> this hasn't started yet for Flink 1.13.6.
>
>
>
> Best regards,
>
>
>
> Martijn
>
>
>
> On Thu, 6 Jan 2022 at 15:05,  wrote:
>
> Hi,
>
>
>
> just to be sure: Which Flink Releases for 1.14 and 1.13 have the
> upgraded log4j version 2.17.0?
>
> Are those already deployed to docker?
>
>
>
> Many Thanks in Advance.
>
>
>
> Kind Regards,
>
>
>
> Patrick
>
> --
>
> Patrick Eifler
>
>
>
> Senior Software Engineer (BI)
>
> Cloud Gaming Engineering & Infrastructure
> Sony Interactive Entertainment LLC
>
> Wilhelmstraße 118, 10963 Berlin
>
>
> Germany
>
> E: patrick.eif...@sony.com
>
>
>
> *From: *David Morávek 
> *Date: *Wednesday, 29. December 2021 at 09:35
> *To: *narasimha 
> *Cc: *Debraj Manna , Martijn Visser <
> mart...@ververica.com>, V N, Suchithra (Nokia - IN/Bangalore) <
> suchithra@nokia.com>, Chesnay Schepler , user
> , Michael Guterl , Richard
> Deurwaarder , Parag Somani 
> *Subject: *Re: CVE-2021-44228 - Log4j2 vulnerability
>
> Please follow the above mentioned ML thread for more details. Please
> note that this is a REGULAR release that is not 

Re: Change column names Pyflink Table/Datastream API

2022-02-16 Thread Francis Conroy
Hi Dian,

Using .alias ended up working for me. Thanks for getting back to me.


On Thu, 17 Feb 2022 at 01:15, Dian Fu  wrote:

> Hi Francis,
>
> There should be multiple ways to achieve this. Do you mean that all these
> methods don't work for you? If so, could you show the sample code? Besides,
> another way you may try is `inputmetrics.alias("timestamp, device, name,
> value")`.
>
> Regards,
> Dian
>
> On Wed, Feb 16, 2022 at 8:14 AM Francis Conroy <
> francis.con...@switchdin.com> wrote:
>
>> Hi all,
>>
>> I'm hoping to be able to change the column names when creating a table
>> from a datastream, the flatmap function generating the stream is returning
>> a Tuple4.
>>
>> It's currently working as follows:
>>
>> inputmetrics = table_env.from_data_stream(ds, Schema.new_builder()
>>   .column("f0", "BIGINT")
>>   .column("f1", "STRING")
>>   .column("f2", "STRING")
>>   .column("f3", "DOUBLE")
>>   .build())
>>
>> I'm trying to rename the columns f0, f1, f2, f3 to proper names e.g.
>> timestamp, device, name, value. So far I've tried using from_fields, and
>>
>> column_by_expression("timestamp", "f0")
>>
>> I'd prefer not to change the output type of my previous flatMapFunction
>> (to say a named Row) for performance purposes.
>>
>> Thanks,
>> Francis
>>
>> This email and any attachments are proprietary and confidential and are
>> intended solely for the use of the individual to whom it is addressed. Any
>> views or opinions expressed are solely those of the author and do not
>> necessarily reflect or represent those of SwitchDin Pty Ltd. If you have
>> received this email in error, please let us know immediately by reply email
>> and delete it from your system. You may not use, disseminate, distribute or
>> copy this message nor disclose its contents to anyone.
>> SwitchDin Pty Ltd (ABN 29 154893857) PO Box 1165, Newcastle NSW 2300
>> Australia
>>
>

-- 
This email and any attachments are proprietary and confidential and are 
intended solely for the use of the individual to whom it is addressed. Any 
views or opinions expressed are solely those of the author and do not 
necessarily reflect or represent those of SwitchDin Pty Ltd. If you have 
received this email in error, please let us know immediately by reply email 
and delete it from your system. You may not use, disseminate, distribute or 
copy this message nor disclose its contents to anyone. 
SwitchDin Pty Ltd 
(ABN 29 154893857) PO Box 1165, Newcastle NSW 2300 Australia


Change column names Pyflink Table/Datastream API

2022-02-15 Thread Francis Conroy
Hi all,

I'm hoping to be able to change the column names when creating a table from
a datastream, the flatmap function generating the stream is returning a
Tuple4.

It's currently working as follows:

inputmetrics = table_env.from_data_stream(ds, Schema.new_builder()
  .column("f0", "BIGINT")
  .column("f1", "STRING")
  .column("f2", "STRING")
  .column("f3", "DOUBLE")
  .build())

I'm trying to rename the columns f0, f1, f2, f3 to proper names e.g.
timestamp, device, name, value. So far I've tried using from_fields, and

column_by_expression("timestamp", "f0")

I'd prefer not to change the output type of my previous flatMapFunction (to
say a named Row) for performance purposes.

Thanks,
Francis

-- 
This email and any attachments are proprietary and confidential and are 
intended solely for the use of the individual to whom it is addressed. Any 
views or opinions expressed are solely those of the author and do not 
necessarily reflect or represent those of SwitchDin Pty Ltd. If you have 
received this email in error, please let us know immediately by reply email 
and delete it from your system. You may not use, disseminate, distribute or 
copy this message nor disclose its contents to anyone. 
SwitchDin Pty Ltd 
(ABN 29 154893857) PO Box 1165, Newcastle NSW 2300 Australia


Flink SQL kafka debezium CDC and postgreSQL

2022-02-09 Thread Francis Conroy
Hi all,

I'm using flink 1.13.5 (as I was originally using the ververica Flink CDC
connector) and am trying to understand something.
I'm just using the Flink SQL CLI at this stage to verify that I can stream
a PostgreSQL table into Flink SQL to compute a continuous materialised
view. I was inspecting the kafka messages that were being published by
debezium originally and noticed that they were incredibly verbose including
all new/old values, this was because I was using REPLICA IDENTITY FULL on
the source tables.

I've now created unique indexes using the primary keys and REPLICA IDENTITY
USING INDEX [INDEX]. I understand that the changed rows can now be matched
using their index row, meaning we don't need to send the before contents of
the row to identify it. When running my simple select * query on the table
I get the following error:

Flink SQL> select * from devices_device;

*[ERROR] Could not execute SQL statement.
Reason:java.lang.IllegalStateException: The "before" field of UPDATE
message is null, if you are using Debezium Postgres Connector, please check
the Postgres table has been set REPLICA IDENTITY to FULL level.*

My table definition:

CREATE TABLE devices_device
(
id   INT  NOT NULL,
legacy_device_type_id   INT,
endpoint_id  INT  NOT NULL,
logical_device_idINT,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
  'connector' = 'kafka',
  'topic' = 'django-db.public.devices_device',
  'properties.bootstrap.servers' = 'kafka:9092',
  'format' = 'debezium-json'
);

The message makes perfect sense, but I can't quite understand why I can't
use REPLICA IDENTITY USING INDEX? Does anyone know if this was a decision
that was made at some point or it's not technically possible for some
reason?

Note: I will change to using REPLICA IDENTITY FULL so I can continue
working for now but It's not something I want to put into production.

Thanks for your consideration!

-- 
This email and any attachments are proprietary and confidential and are 
intended solely for the use of the individual to whom it is addressed. Any 
views or opinions expressed are solely those of the author and do not 
necessarily reflect or represent those of SwitchDin Pty Ltd. If you have 
received this email in error, please let us know immediately by reply email 
and delete it from your system. You may not use, disseminate, distribute or 
copy this message nor disclose its contents to anyone. 
SwitchDin Pty Ltd 
(ABN 29 154893857) PO Box 1165, Newcastle NSW 2300 Australia


Re: Socket stream source in Python?

2022-01-30 Thread Francis Conroy
 Hi Philippe,
after checking the source Flink master I think you're right, there is
currently no binding from python to Flink socketTextStream (via py4j) in
pyFlink. The py4j interface isn't too complicated to modify for some tasks
and I suspect that it should be fairly trivial to extend pyflink to support
this. I imagine that you could take
read_text_file
in 'stream_execution_environment.py' as a starting point.
Happy to provide some more information on this if you'd like.

Kind regards,
Francis

On Sat, 29 Jan 2022 at 01:20, Philippe Rigaux 
wrote:

> Hi there
>
> I would like to use a socket stream as input for my Flink workflow in
> Python. This works in scala with the socketTextStream() method, for instance
>
> val stream = senv.socketTextStream("localhost", 9000, '\n')
>
> I cannot find an equivalent in PyFlink, although it is briefly mentioned
> in the documentation.
>
> Any help is much appreciated.
>
> Philippe
>
>

-- 
This email and any attachments are proprietary and confidential and are 
intended solely for the use of the individual to whom it is addressed. Any 
views or opinions expressed are solely those of the author and do not 
necessarily reflect or represent those of SwitchDin Pty Ltd. If you have 
received this email in error, please let us know immediately by reply email 
and delete it from your system. You may not use, disseminate, distribute or 
copy this message nor disclose its contents to anyone. 
SwitchDin Pty Ltd 
(ABN 29 154893857) PO Box 1165, Newcastle NSW 2300 Australia


Re: MAP data type (PyFlink)

2022-01-30 Thread Francis Conroy
 Hi Philippe,

I don't think it's quite that simple unfortunately. A python dict can map
from any hashable type to any value, however the 'equivalent' POJO, 'Map'
in this case, requires all key types to be the same and all value types to
be the same. You cannot specify multiple types for the key or value in one
map object. This makes me think that the Map is not actually what you are
looking for.
If you have multiple named fields with different types you could use a Row,
for these you specify a name and a value type for each column in the row.
I'm very green in the Flink/Java area but I hope that at least gives you
something to move forward with for now.

On Sat, 29 Jan 2022 at 02:12, Philippe Rigaux 
wrote:

> Hello
>
> I want to send and receive dict Python values. According to the PyFlink
> doc, this is specified with Types.MAP(). Unfortunately I
> found no example of the required arguments,  and I am stuck with the
> following error:
>
> TypeError: MAP() missing 2 required positional arguments: 'key_type_info'
> and 'value_type_info'
>
> How should I specify for instance the type for {‘url’: ‘’, ‘count’: 2}
> ?
>
> Thanks for your help.
>
> Philippe
>
>

-- 
This email and any attachments are proprietary and confidential and are 
intended solely for the use of the individual to whom it is addressed. Any 
views or opinions expressed are solely those of the author and do not 
necessarily reflect or represent those of SwitchDin Pty Ltd. If you have 
received this email in error, please let us know immediately by reply email 
and delete it from your system. You may not use, disseminate, distribute or 
copy this message nor disclose its contents to anyone. 
SwitchDin Pty Ltd 
(ABN 29 154893857) PO Box 1165, Newcastle NSW 2300 Australia


Re: pyflink mixed with Java operators

2022-01-10 Thread Francis Conroy
Thanks for this Dian. I'll give that a try.

On Mon, 10 Jan 2022 at 22:51, Dian Fu  wrote:

> Hi,
>
> You could try the following method:
>
> ```
> from pyflink.java_gateway import get_gateway
>
> jvm = get_gateway().jvm
> ds = (
> DataStream(ds._j_data_stream.map(jvm.com.example.MyJavaMapFunction()))
> )
> ```
>
> Regards,
> Dian
>
> On Fri, Jan 7, 2022 at 1:00 PM Francis Conroy <
> francis.con...@switchdin.com> wrote:
>
>> Hi all,
>>
>> Does anyone know if it's possible to specify a java map function at some
>> intermediate point in a pyflink job? In this case
>>
>> SimpleCountMeasurementsPerUUID
>>
>> is a flink java MapFunction. The reason we want to do this is that
>> performance in pyflink seems quite poor.
>> e.g.
>>
>> import logging
>> import os
>> import sys
>> import zlib
>>
>> import Measurements_pb2
>> from pyflink.common import Types
>> from pyflink.common.serialization import 
>> KafkaRecordSerializationSchemaBuilder, SimpleStringSchema
>> from pyflink.datastream import StreamExecutionEnvironment, 
>> RuntimeExecutionMode, MapFunction, RuntimeContext, \
>> CheckpointingMode
>> from pyflink.datastream.connectors import RMQConnectionConfig, RMQSource, 
>> KafkaSink
>>
>> from functions.common import KeyByUUID
>> from functions.file_lister import auto_load_python_files
>> from customisations.serialisation import ZlibDeserializationSchema
>>
>>
>> class ZlibDecompressor(MapFunction):
>> def map(self, value):
>> decomp = zlib.decompress(value[1])
>> return value[0], decomp
>>
>>
>> class MeasurementSnapshotCountMapFunction(MapFunction):
>> def map(self, value):
>> pb_body = Measurements_pb2.MeasurementSnapshot()
>> pb_body.ParseFromString(value)
>> meas_count = len(pb_body.measurements)
>> if meas_count > 0:
>> first_measurement = pb_body.measurements[0]
>> point_uuid = first_measurement.point_uuid.value
>> timestamp = first_measurement.time
>>
>> return timestamp, point_uuid, meas_count
>>
>> return None
>>
>>
>> def word_count():
>> env = StreamExecutionEnvironment.get_execution_environment()
>> jarpath = 
>> f"file://{os.getcwd()}/../src-java/switchdin-flink-serialization/target/serialization-1.0-SNAPSHOT.jar"
>> env.add_jars(jarpath)
>> auto_load_python_files(env)
>> env.set_runtime_mode(RuntimeExecutionMode.STREAMING)
>> # write all the data to one file
>> env.set_parallelism(1)
>> env.enable_checkpointing(1000, CheckpointingMode.AT_LEAST_ONCE)
>>
>> connection_config = RMQConnectionConfig.Builder() \
>> .set_host("rabbitmq") \
>> .set_port(5672) \
>> .set_virtual_host("/") \
>> .set_user_name("guest") \
>> .set_password("guest") \
>> .set_connection_timeout(60) \
>> .set_prefetch_count(5000) \
>> .build()
>>
>> deserialization_schema = ZlibDeserializationSchema()
>>
>> stream = env.add_source(RMQSource(
>> connection_config,
>> "flink-test",
>> False,
>> deserialization_schema,
>> )).set_parallelism(1)
>>
>> # compute word count
>> dstream = 
>> stream.map(MeasurementSnapshotCountMapFunction()).uid("DecompressRMQData") \
>> .key_by(KeyByUUID(), key_type=Types.STRING()) \
>> .jMap("org.switchdin.operators.SimpleCountMeasurementsPerUUID")  # 
>> Hypothetical
>>
>> kafka_serialisation_schema = KafkaRecordSerializationSchemaBuilder() \
>> .set_value_serialization_schema(SimpleStringSchema()) \
>> .set_topic("flink-test-kafka") \
>> .build()
>>
>> dstream.sink_to(
>> KafkaSink.builder() \
>> .set_record_serializer(kafka_serialisation_schema) \
>> .set_bootstrap_servers("kafka:9092") \
>> .build()
>> )
>>
>> # submit for execution
>> env.execute()
>>
>>
>> if __name__ == '__main__':
>> logging.basicConfig(stream=sys.stdout, level=logging.INFO, 
>> format="%(message)s")
>> word_count()
>>
>>
>>
>> This email and any attachments are proprietary and confidential and are
>> inten

pyflink mixed with Java operators

2022-01-06 Thread Francis Conroy
Hi all,

Does anyone know if it's possible to specify a java map function at some
intermediate point in a pyflink job? In this case

SimpleCountMeasurementsPerUUID

is a flink java MapFunction. The reason we want to do this is that
performance in pyflink seems quite poor.
e.g.

import logging
import os
import sys
import zlib

import Measurements_pb2
from pyflink.common import Types
from pyflink.common.serialization import
KafkaRecordSerializationSchemaBuilder, SimpleStringSchema
from pyflink.datastream import StreamExecutionEnvironment,
RuntimeExecutionMode, MapFunction, RuntimeContext, \
CheckpointingMode
from pyflink.datastream.connectors import RMQConnectionConfig,
RMQSource, KafkaSink

from functions.common import KeyByUUID
from functions.file_lister import auto_load_python_files
from customisations.serialisation import ZlibDeserializationSchema


class ZlibDecompressor(MapFunction):
def map(self, value):
decomp = zlib.decompress(value[1])
return value[0], decomp


class MeasurementSnapshotCountMapFunction(MapFunction):
def map(self, value):
pb_body = Measurements_pb2.MeasurementSnapshot()
pb_body.ParseFromString(value)
meas_count = len(pb_body.measurements)
if meas_count > 0:
first_measurement = pb_body.measurements[0]
point_uuid = first_measurement.point_uuid.value
timestamp = first_measurement.time

return timestamp, point_uuid, meas_count

return None


def word_count():
env = StreamExecutionEnvironment.get_execution_environment()
jarpath = 
f"file://{os.getcwd()}/../src-java/switchdin-flink-serialization/target/serialization-1.0-SNAPSHOT.jar"
env.add_jars(jarpath)
auto_load_python_files(env)
env.set_runtime_mode(RuntimeExecutionMode.STREAMING)
# write all the data to one file
env.set_parallelism(1)
env.enable_checkpointing(1000, CheckpointingMode.AT_LEAST_ONCE)

connection_config = RMQConnectionConfig.Builder() \
.set_host("rabbitmq") \
.set_port(5672) \
.set_virtual_host("/") \
.set_user_name("guest") \
.set_password("guest") \
.set_connection_timeout(60) \
.set_prefetch_count(5000) \
.build()

deserialization_schema = ZlibDeserializationSchema()

stream = env.add_source(RMQSource(
connection_config,
"flink-test",
False,
deserialization_schema,
)).set_parallelism(1)

# compute word count
dstream = 
stream.map(MeasurementSnapshotCountMapFunction()).uid("DecompressRMQData")
\
.key_by(KeyByUUID(), key_type=Types.STRING()) \
.jMap("org.switchdin.operators.SimpleCountMeasurementsPerUUID")
 # Hypothetical

kafka_serialisation_schema = KafkaRecordSerializationSchemaBuilder() \
.set_value_serialization_schema(SimpleStringSchema()) \
.set_topic("flink-test-kafka") \
.build()

dstream.sink_to(
KafkaSink.builder() \
.set_record_serializer(kafka_serialisation_schema) \
.set_bootstrap_servers("kafka:9092") \
.build()
)

# submit for execution
env.execute()


if __name__ == '__main__':
logging.basicConfig(stream=sys.stdout, level=logging.INFO,
format="%(message)s")
word_count()

-- 
This email and any attachments are proprietary and confidential and are 
intended solely for the use of the individual to whom it is addressed. Any 
views or opinions expressed are solely those of the author and do not 
necessarily reflect or represent those of SwitchDin Pty Ltd. If you have 
received this email in error, please let us know immediately by reply email 
and delete it from your system. You may not use, disseminate, distribute or 
copy this message nor disclose its contents to anyone. 
SwitchDin Pty Ltd 
(ABN 29 154893857) PO Box 1165, Newcastle NSW 2300 Australia


Re: PyFlink Perfomance

2021-12-21 Thread Francis Conroy
Hi Dian, I'll build up something similar and post it, my current test code
contains proprietary information.

On Wed, 22 Dec 2021 at 14:49, Dian Fu  wrote:

> Hi Francis,
>
> Could you share the benchmark code you use?
>
> Regards,
> Dian
>
> On Wed, Dec 22, 2021 at 11:31 AM Francis Conroy <
> francis.con...@switchdin.com> wrote:
>
>> I've just run an analysis using a similar example which involves a single
>> python flatmap operator and we're getting 100x less through by using python
>> over java. I'm interested to know if you can do such a comparison. I'm
>> using Flink 14.0.
>>
>> Thanks,
>> Francis
>>
>> On Thu, 18 Nov 2021 at 02:20, Thomas Portugal 
>> wrote:
>>
>>> Hello community,
>>> My team is developing an application using Pyflink. We are using the
>>> Datastream API. Basically, we read from a kafka topic, do some maps, and
>>> write on another kafka topic. One restriction about it is the first map,
>>> that has to be serialized and with parallelism equals to one. This is
>>> causing a bottleneck on the throughput, and we are achieving approximately
>>> 2k msgs/sec. Monitoring the cpu usage and the number of records on each
>>> operator, it seems that the first operator is causing the issue.
>>> The first operator is like a buffer that groups the messages from kafka
>>> and sends them to the next operators. We are using a dequeue from python's
>>> collections. Since we are stuck on this issue, could you answer some
>>> questions about this matter?
>>>
>>> 1 - Using data structures from python can introduce some latency or
>>> increase the CPU usage?
>>> 2 - There are alternatives to this approach? We were thinking about
>>> Window structure, from Flink, but in our case it's not time based, and we
>>> didn't find an equivalent on python API.
>>> 3 - Using Table API to read from Kafka Topic and do the windowing can
>>> improve our performance?
>>>
>>> We already set some parameters like python.fn-execution.bundle.time and
>>> buffer.timeout to improve our performance.
>>>
>>> Thanks for your attention.
>>> Best Regards
>>>
>>
>> This email and any attachments are proprietary and confidential and are
>> intended solely for the use of the individual to whom it is addressed. Any
>> views or opinions expressed are solely those of the author and do not
>> necessarily reflect or represent those of SwitchDin Pty Ltd. If you have
>> received this email in error, please let us know immediately by reply email
>> and delete it from your system. You may not use, disseminate, distribute or
>> copy this message nor disclose its contents to anyone.
>> SwitchDin Pty Ltd (ABN 29 154893857) PO Box 1165, Newcastle NSW 2300
>> Australia
>>
>

-- 
This email and any attachments are proprietary and confidential and are 
intended solely for the use of the individual to whom it is addressed. Any 
views or opinions expressed are solely those of the author and do not 
necessarily reflect or represent those of SwitchDin Pty Ltd. If you have 
received this email in error, please let us know immediately by reply email 
and delete it from your system. You may not use, disseminate, distribute or 
copy this message nor disclose its contents to anyone. 
SwitchDin Pty Ltd 
(ABN 29 154893857) PO Box 1165, Newcastle NSW 2300 Australia


Re: FlinkKafkaProducer deprecated in 1.14 but pyflink binding still present?

2021-12-21 Thread Francis Conroy
Thanks for the response Dian,

I made the changes to a fork of flink and have been using them. The changes
aren't ready to be merged back though as a lot is missing, documentation
updates, testing, etc.
Thanks,
Francis

On Wed, 27 Oct 2021 at 13:40, Dian Fu  wrote:

> Hi Francis,
>
> Yes, you are right. It's still not updated in PyFlink as
> KafkaSource/KafkaSink are still not supported in PyFlink. Hopeful we could
> add that support in 1.15 and then we could deprecate/remove the legacy
> interfaces.
>
> Regards,
> Dian
>
> On Tue, Oct 26, 2021 at 12:53 PM Francis Conroy <
> francis.con...@switchdin.com> wrote:
>
>> Looks like this got deprecated in 1.14 in favour of KafkaSink/KafkaSource
>> but the python binding didn't get updated? Can someone confirm this?
>>
>> Francis Conroy
>>
>> This email and any attachments are proprietary and confidential and are
>> intended solely for the use of the individual to whom it is addressed. Any
>> views or opinions expressed are solely those of the author and do not
>> necessarily reflect or represent those of SwitchDin Pty Ltd. If you have
>> received this email in error, please let us know immediately by reply email
>> and delete it from your system. You may not use, disseminate, distribute or
>> copy this message nor disclose its contents to anyone.
>> SwitchDin Pty Ltd (ABN 29 154893857) PO Box 1165, Newcastle NSW 2300
>> Australia
>>
>

-- 
This email and any attachments are proprietary and confidential and are 
intended solely for the use of the individual to whom it is addressed. Any 
views or opinions expressed are solely those of the author and do not 
necessarily reflect or represent those of SwitchDin Pty Ltd. If you have 
received this email in error, please let us know immediately by reply email 
and delete it from your system. You may not use, disseminate, distribute or 
copy this message nor disclose its contents to anyone. 
SwitchDin Pty Ltd 
(ABN 29 154893857) PO Box 1165, Newcastle NSW 2300 Australia


Re: PyFlink Perfomance

2021-12-21 Thread Francis Conroy
I've just run an analysis using a similar example which involves a single
python flatmap operator and we're getting 100x less through by using python
over java. I'm interested to know if you can do such a comparison. I'm
using Flink 14.0.

Thanks,
Francis

On Thu, 18 Nov 2021 at 02:20, Thomas Portugal 
wrote:

> Hello community,
> My team is developing an application using Pyflink. We are using the
> Datastream API. Basically, we read from a kafka topic, do some maps, and
> write on another kafka topic. One restriction about it is the first map,
> that has to be serialized and with parallelism equals to one. This is
> causing a bottleneck on the throughput, and we are achieving approximately
> 2k msgs/sec. Monitoring the cpu usage and the number of records on each
> operator, it seems that the first operator is causing the issue.
> The first operator is like a buffer that groups the messages from kafka
> and sends them to the next operators. We are using a dequeue from python's
> collections. Since we are stuck on this issue, could you answer some
> questions about this matter?
>
> 1 - Using data structures from python can introduce some latency or
> increase the CPU usage?
> 2 - There are alternatives to this approach? We were thinking about Window
> structure, from Flink, but in our case it's not time based, and we didn't
> find an equivalent on python API.
> 3 - Using Table API to read from Kafka Topic and do the windowing can
> improve our performance?
>
> We already set some parameters like python.fn-execution.bundle.time and
> buffer.timeout to improve our performance.
>
> Thanks for your attention.
> Best Regards
>

-- 
This email and any attachments are proprietary and confidential and are 
intended solely for the use of the individual to whom it is addressed. Any 
views or opinions expressed are solely those of the author and do not 
necessarily reflect or represent those of SwitchDin Pty Ltd. If you have 
received this email in error, please let us know immediately by reply email 
and delete it from your system. You may not use, disseminate, distribute or 
copy this message nor disclose its contents to anyone. 
SwitchDin Pty Ltd 
(ABN 29 154893857) PO Box 1165, Newcastle NSW 2300 Australia


FlinkKafkaProducer deprecated in 1.14 but pyflink binding still present?

2021-10-25 Thread Francis Conroy
Looks like this got deprecated in 1.14 in favour of KafkaSink/KafkaSource
but the python binding didn't get updated? Can someone confirm this?

Francis Conroy

-- 
This email and any attachments are proprietary and confidential and are 
intended solely for the use of the individual to whom it is addressed. Any 
views or opinions expressed are solely those of the author and do not 
necessarily reflect or represent those of SwitchDin Pty Ltd. If you have 
received this email in error, please let us know immediately by reply email 
and delete it from your system. You may not use, disseminate, distribute or 
copy this message nor disclose its contents to anyone. 
SwitchDin Pty Ltd 
(ABN 29 154893857) PO Box 1165, Newcastle NSW 2300 Australia


Using the flink CLI option --pyRequirements

2021-10-18 Thread Francis Conroy
 Hi,

I'm trying to install some required modules by supplying a requirements
file when submitting to the cluster and the CLI just seems to stall. I've
built 1.15-SNAPSHOT@7578758fa8c84314b8b3206629b3afa9ff41b636 and have run
the wordcount example, everything else seems to work, I just can't submit a
pyflink job to my cluster when using the --pyRequirements option.

I started going down the line of debugging the flink CLI using intellij
idea, but wasn't able to figure out how to make my venv with pyflink
installed available to the debug environment.

Thanks,
Francis Conroy

-- 
This email and any attachments are proprietary and confidential and are 
intended solely for the use of the individual to whom it is addressed. Any 
views or opinions expressed are solely those of the author and do not 
necessarily reflect or represent those of SwitchDin Pty Ltd. If you have 
received this email in error, please let us know immediately by reply email 
and delete it from your system. You may not use, disseminate, distribute or 
copy this message nor disclose its contents to anyone. 
SwitchDin Pty Ltd 
(ABN 29 154893857) PO Box 1165, Newcastle NSW 2300 Australia