Re: Using Flink k8s operator on OKD

2023-10-05 Thread Gyula Fóra
Hey,
We don’t have minimal supported version in the docs as we haven’t
experienced any issue specific to kubernetes versions so far.

We don’t really rely on any newer features

Cheers
Gyula



On Fri, 6 Oct 2023 at 06:02, Krzysztof Chmielewski <
krzysiek.chmielew...@gmail.com> wrote:

> It seems that problem was caused by k8s 1.19.
> When we deployed Flink operator on vanilla k8s 1.19 we got the same error
> that we have on OKD 4.6.0 We are planing to update OKD to newer version
> that will use more up to date k8s.
>
> What is the minimal k8s version required for/supported by Flink operator?
> I haven't found it in operator docs - is not there or I have missed it?
>
> Thanks.
>
> Krzysztof Chmielewski
>
> śr., 20 wrz 2023 o 22:32 Krzysztof Chmielewski <
> krzysiek.chmielew...@gmail.com> napisał(a):
>
>> Thank you Zach,
>> our flink-operator and flink deployments are in same namespace -> called
>> "flink". We have executed what is described in [1] before my initial
>> message.
>> We are using OKD 4.6.0 that according to the doc is using k8s 1.19. the
>> very same config is working fine on "vanilla" k8s, but for some reason it
>> is failing on that system where we have OKD installed.
>>
>> I believe we do have proper roles/sa assigned, for example:
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> *oc get saNAME SECRETS   AGEbuilder  2
>> 6d22hdefault  2 6d22hdeployer 2 6d22hflink
>>2 6d19hflink-operator   2 17hoc
>> get roleNAMECREATED ATflink   2023-09-13T11:53:42Zoc get
>> rolebindingNAME  ROLE
>>AGEflink-role-bindingRole/flink
>>6d19h*
>>
>> [1]
>> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.6/docs/operations/rbac/
>>
>>
>> Thanks, Krzysztof Chmielewski
>>
>> śr., 20 wrz 2023 o 05:40 Zach Lorimer  napisał(a):
>>
>>> I haven’t used OKD but it sounds like OLM. If that’s the case, I’m
>>> assuming the operator was deployed to the “operators” namespace. In that
>>> case, you’ll need to create the RBACs and such in the Flink namespace for
>>> that deployment to work.
>>>
>>> For example this needs to be in each namespace that you want to have
>>> Flink deployments in.
>>>
>>> kubectl apply -f - <>> apiVersion: v1
>>> kind: ServiceAccount
>>> metadata:
>>>   labels:
>>> app.kubernetes.io/name: flink-kubernetes-operator
>>> app.kubernetes.io/version: 1.5.0
>>>   name: flink
>>> ---
>>> apiVersion: rbac.authorization.k8s.io/v1
>>> kind: Role
>>> metadata:
>>>   labels:
>>> app.kubernetes.io/name: flink-kubernetes-operator
>>> app.kubernetes.io/version: 1.5.0
>>>   name: flink
>>> rules:
>>> - apiGroups:
>>>   - ""
>>>   resources:
>>>   - pods
>>>   - configmaps
>>>   verbs:
>>>   - '*'
>>> - apiGroups:
>>>   - apps
>>>   resources:
>>>   - deployments
>>>   - deployments/finalizers
>>>   verbs:
>>>   - '*'
>>> ---
>>> apiVersion: rbac.authorization.k8s.io/v1
>>> kind: RoleBinding
>>> metadata:
>>>   labels:
>>> app.kubernetes.io/name: flink-kubernetes-operator
>>> app.kubernetes.io/version: 1.5.0
>>>   name: flink-role-binding
>>> roleRef:
>>>   apiGroup: rbac.authorization.k8s.io
>>>   kind: Role
>>>   name: flink
>>> subjects:
>>> - kind: ServiceAccount
>>>   name: flink
>>> EOF
>>>
>>> Hopefully that helps.
>>>
>>>
>>> On Tue, Sep 19, 2023 at 5:40 PM Krzysztof Chmielewski <
>>> krzysiek.chmielew...@gmail.com> wrote:
>>>
 Hi community,
 I was wondering if anyone tried to deploy Flink using Flink k8s
 operator on machine where OKD [1] is installed?

 We have tried to install Flink k8s operator version 1.6 which seems to
 succeed, however when we try to deploy simple Flink deployment we are
 getting an error.

 2023-09-19 10:11:36,440 i.j.o.p.e.ReconciliationDispatcher
 [ERROR][flink/test] Error during event processing ExecutionScope{ resource
 id: ResourceID{name='test', namespace='flink'}, version: 684949788} failed.

 io.fabric8.kubernetes.client.KubernetesClientException: Failure
 executing: PUT at:
 https://172.30.0.1:443/apis/flink.apache.org/v1beta1/namespaces/flink/flinkdeployments/test.
 Message: FlinkDeployment.flink.apache.org "test" is invalid:
 [spec.ingress: Invalid value: "null": spec.ingress in body must be of type
 object: "null", spec.mode: Invalid value: "null": spec.mode in body must be
 of type string: "null", spec.mode: Unsupported value: "null": supported
 values: "native", "standalone", spec.logConfiguration: Invalid value:
 "null": spec.logConfiguration in body must be of type object: "null",
 spec.imagePullPolicy: Invalid value: "null": spec.imagePullPolicy in body
 must be of type string: "null", spec.jobManager.podTemplate: Invalid value:
 "null": spec.jobManager.podTemplate in body must be of type object: "null",
 

Re: Using Flink k8s operator on OKD

2023-10-05 Thread Krzysztof Chmielewski
It seems that problem was caused by k8s 1.19.
When we deployed Flink operator on vanilla k8s 1.19 we got the same error
that we have on OKD 4.6.0 We are planing to update OKD to newer version
that will use more up to date k8s.

What is the minimal k8s version required for/supported by Flink operator?
I haven't found it in operator docs - is not there or I have missed it?

Thanks.
Krzysztof Chmielewski

śr., 20 wrz 2023 o 22:32 Krzysztof Chmielewski <
krzysiek.chmielew...@gmail.com> napisał(a):

> Thank you Zach,
> our flink-operator and flink deployments are in same namespace -> called
> "flink". We have executed what is described in [1] before my initial
> message.
> We are using OKD 4.6.0 that according to the doc is using k8s 1.19. the
> very same config is working fine on "vanilla" k8s, but for some reason it
> is failing on that system where we have OKD installed.
>
> I believe we do have proper roles/sa assigned, for example:
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> *oc get saNAME SECRETS   AGEbuilder  2
> 6d22hdefault  2 6d22hdeployer 2 6d22hflink
>2 6d19hflink-operator   2 17hoc
> get roleNAMECREATED ATflink   2023-09-13T11:53:42Zoc get
> rolebindingNAME  ROLE
>AGEflink-role-bindingRole/flink
>6d19h*
>
> [1]
> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.6/docs/operations/rbac/
>
>
> Thanks, Krzysztof Chmielewski
>
> śr., 20 wrz 2023 o 05:40 Zach Lorimer  napisał(a):
>
>> I haven’t used OKD but it sounds like OLM. If that’s the case, I’m
>> assuming the operator was deployed to the “operators” namespace. In that
>> case, you’ll need to create the RBACs and such in the Flink namespace for
>> that deployment to work.
>>
>> For example this needs to be in each namespace that you want to have
>> Flink deployments in.
>>
>> kubectl apply -f - <> apiVersion: v1
>> kind: ServiceAccount
>> metadata:
>>   labels:
>> app.kubernetes.io/name: flink-kubernetes-operator
>> app.kubernetes.io/version: 1.5.0
>>   name: flink
>> ---
>> apiVersion: rbac.authorization.k8s.io/v1
>> kind: Role
>> metadata:
>>   labels:
>> app.kubernetes.io/name: flink-kubernetes-operator
>> app.kubernetes.io/version: 1.5.0
>>   name: flink
>> rules:
>> - apiGroups:
>>   - ""
>>   resources:
>>   - pods
>>   - configmaps
>>   verbs:
>>   - '*'
>> - apiGroups:
>>   - apps
>>   resources:
>>   - deployments
>>   - deployments/finalizers
>>   verbs:
>>   - '*'
>> ---
>> apiVersion: rbac.authorization.k8s.io/v1
>> kind: RoleBinding
>> metadata:
>>   labels:
>> app.kubernetes.io/name: flink-kubernetes-operator
>> app.kubernetes.io/version: 1.5.0
>>   name: flink-role-binding
>> roleRef:
>>   apiGroup: rbac.authorization.k8s.io
>>   kind: Role
>>   name: flink
>> subjects:
>> - kind: ServiceAccount
>>   name: flink
>> EOF
>>
>> Hopefully that helps.
>>
>>
>> On Tue, Sep 19, 2023 at 5:40 PM Krzysztof Chmielewski <
>> krzysiek.chmielew...@gmail.com> wrote:
>>
>>> Hi community,
>>> I was wondering if anyone tried to deploy Flink using Flink k8s operator
>>> on machine where OKD [1] is installed?
>>>
>>> We have tried to install Flink k8s operator version 1.6 which seems to
>>> succeed, however when we try to deploy simple Flink deployment we are
>>> getting an error.
>>>
>>> 2023-09-19 10:11:36,440 i.j.o.p.e.ReconciliationDispatcher
>>> [ERROR][flink/test] Error during event processing ExecutionScope{ resource
>>> id: ResourceID{name='test', namespace='flink'}, version: 684949788} failed.
>>>
>>> io.fabric8.kubernetes.client.KubernetesClientException: Failure
>>> executing: PUT at:
>>> https://172.30.0.1:443/apis/flink.apache.org/v1beta1/namespaces/flink/flinkdeployments/test.
>>> Message: FlinkDeployment.flink.apache.org "test" is invalid:
>>> [spec.ingress: Invalid value: "null": spec.ingress in body must be of type
>>> object: "null", spec.mode: Invalid value: "null": spec.mode in body must be
>>> of type string: "null", spec.mode: Unsupported value: "null": supported
>>> values: "native", "standalone", spec.logConfiguration: Invalid value:
>>> "null": spec.logConfiguration in body must be of type object: "null",
>>> spec.imagePullPolicy: Invalid value: "null": spec.imagePullPolicy in body
>>> must be of type string: "null", spec.jobManager.podTemplate: Invalid value:
>>> "null": spec.jobManager.podTemplate in body must be of type object: "null",
>>> spec.jobManager.resource.ephemeralStorage: Invalid value: "null":
>>> spec.jobManager.resource.ephemeralStorage in body must be of type string:
>>> "null", spec.podTemplate: Invalid value: "null": spec.podTemplate in body
>>> must be of type object: "null", spec.restartNonce: Invalid value: "null":
>>> spec.restartNonce in body must be of type integer: "null",
>>> spec.taskManager.replicas: Invalid value: "null": spec.taskManager.replicas
>>> in 

Re: Using Flink k8s operator on OKD

2023-09-20 Thread Krzysztof Chmielewski
Thank you Zach,
our flink-operator and flink deployments are in same namespace -> called
"flink". We have executed what is described in [1] before my initial
message.
We are using OKD 4.6.0 that according to the doc is using k8s 1.19. the
very same config is working fine on "vanilla" k8s, but for some reason it
is failing on that system where we have OKD installed.

I believe we do have proper roles/sa assigned, for example:



















*oc get saNAME SECRETS   AGEbuilder  2
6d22hdefault  2 6d22hdeployer 2 6d22hflink
   2 6d19hflink-operator   2 17hoc
get roleNAMECREATED ATflink   2023-09-13T11:53:42Zoc get
rolebindingNAME  ROLE
   AGEflink-role-bindingRole/flink
   6d19h*

[1]
https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.6/docs/operations/rbac/


Thanks, Krzysztof Chmielewski

śr., 20 wrz 2023 o 05:40 Zach Lorimer  napisał(a):

> I haven’t used OKD but it sounds like OLM. If that’s the case, I’m
> assuming the operator was deployed to the “operators” namespace. In that
> case, you’ll need to create the RBACs and such in the Flink namespace for
> that deployment to work.
>
> For example this needs to be in each namespace that you want to have Flink
> deployments in.
>
> kubectl apply -f - < apiVersion: v1
> kind: ServiceAccount
> metadata:
>   labels:
> app.kubernetes.io/name: flink-kubernetes-operator
> app.kubernetes.io/version: 1.5.0
>   name: flink
> ---
> apiVersion: rbac.authorization.k8s.io/v1
> kind: Role
> metadata:
>   labels:
> app.kubernetes.io/name: flink-kubernetes-operator
> app.kubernetes.io/version: 1.5.0
>   name: flink
> rules:
> - apiGroups:
>   - ""
>   resources:
>   - pods
>   - configmaps
>   verbs:
>   - '*'
> - apiGroups:
>   - apps
>   resources:
>   - deployments
>   - deployments/finalizers
>   verbs:
>   - '*'
> ---
> apiVersion: rbac.authorization.k8s.io/v1
> kind: RoleBinding
> metadata:
>   labels:
> app.kubernetes.io/name: flink-kubernetes-operator
> app.kubernetes.io/version: 1.5.0
>   name: flink-role-binding
> roleRef:
>   apiGroup: rbac.authorization.k8s.io
>   kind: Role
>   name: flink
> subjects:
> - kind: ServiceAccount
>   name: flink
> EOF
>
> Hopefully that helps.
>
>
> On Tue, Sep 19, 2023 at 5:40 PM Krzysztof Chmielewski <
> krzysiek.chmielew...@gmail.com> wrote:
>
>> Hi community,
>> I was wondering if anyone tried to deploy Flink using Flink k8s operator
>> on machine where OKD [1] is installed?
>>
>> We have tried to install Flink k8s operator version 1.6 which seems to
>> succeed, however when we try to deploy simple Flink deployment we are
>> getting an error.
>>
>> 2023-09-19 10:11:36,440 i.j.o.p.e.ReconciliationDispatcher
>> [ERROR][flink/test] Error during event processing ExecutionScope{ resource
>> id: ResourceID{name='test', namespace='flink'}, version: 684949788} failed.
>>
>> io.fabric8.kubernetes.client.KubernetesClientException: Failure
>> executing: PUT at:
>> https://172.30.0.1:443/apis/flink.apache.org/v1beta1/namespaces/flink/flinkdeployments/test.
>> Message: FlinkDeployment.flink.apache.org "test" is invalid:
>> [spec.ingress: Invalid value: "null": spec.ingress in body must be of type
>> object: "null", spec.mode: Invalid value: "null": spec.mode in body must be
>> of type string: "null", spec.mode: Unsupported value: "null": supported
>> values: "native", "standalone", spec.logConfiguration: Invalid value:
>> "null": spec.logConfiguration in body must be of type object: "null",
>> spec.imagePullPolicy: Invalid value: "null": spec.imagePullPolicy in body
>> must be of type string: "null", spec.jobManager.podTemplate: Invalid value:
>> "null": spec.jobManager.podTemplate in body must be of type object: "null",
>> spec.jobManager.resource.ephemeralStorage: Invalid value: "null":
>> spec.jobManager.resource.ephemeralStorage in body must be of type string:
>> "null", spec.podTemplate: Invalid value: "null": spec.podTemplate in body
>> must be of type object: "null", spec.restartNonce: Invalid value: "null":
>> spec.restartNonce in body must be of type integer: "null",
>> spec.taskManager.replicas: Invalid value: "null": spec.taskManager.replicas
>> in body must be of type integer: "null",
>> spec.taskManager.resource.ephemeralStorage: Invalid value: "null":
>> spec.taskManager.resource.ephemeralStorage in body must be of type string:
>> "null", spec.taskManager.podTemplate: Invalid value: "null":
>> spec.taskManager.podTemplate in body must be of type object: "null",
>> spec.job: Invalid value: "null": spec.job in body must be of type object:
>> "null", .spec.taskManager.replicas: Invalid value: 0:
>> .spec.taskManager.replicas accessor error:  is of the type ,
>> expected int64]. Received status: Status(apiVersion=v1, code=422,
>> details=StatusDetails(causes=[StatusCause(field=spec.ingress,
>> 

Re: Using Flink k8s operator on OKD

2023-09-19 Thread Zach Lorimer
I haven’t used OKD but it sounds like OLM. If that’s the case, I’m assuming
the operator was deployed to the “operators” namespace. In that case,
you’ll need to create the RBACs and such in the Flink namespace for that
deployment to work.

For example this needs to be in each namespace that you want to have Flink
deployments in.

kubectl apply -f - < wrote:

> Hi community,
> I was wondering if anyone tried to deploy Flink using Flink k8s operator
> on machine where OKD [1] is installed?
>
> We have tried to install Flink k8s operator version 1.6 which seems to
> succeed, however when we try to deploy simple Flink deployment we are
> getting an error.
>
> 2023-09-19 10:11:36,440 i.j.o.p.e.ReconciliationDispatcher
> [ERROR][flink/test] Error during event processing ExecutionScope{ resource
> id: ResourceID{name='test', namespace='flink'}, version: 684949788} failed.
>
> io.fabric8.kubernetes.client.KubernetesClientException: Failure executing:
> PUT at:
> https://172.30.0.1:443/apis/flink.apache.org/v1beta1/namespaces/flink/flinkdeployments/test.
> Message: FlinkDeployment.flink.apache.org "test" is invalid:
> [spec.ingress: Invalid value: "null": spec.ingress in body must be of type
> object: "null", spec.mode: Invalid value: "null": spec.mode in body must be
> of type string: "null", spec.mode: Unsupported value: "null": supported
> values: "native", "standalone", spec.logConfiguration: Invalid value:
> "null": spec.logConfiguration in body must be of type object: "null",
> spec.imagePullPolicy: Invalid value: "null": spec.imagePullPolicy in body
> must be of type string: "null", spec.jobManager.podTemplate: Invalid value:
> "null": spec.jobManager.podTemplate in body must be of type object: "null",
> spec.jobManager.resource.ephemeralStorage: Invalid value: "null":
> spec.jobManager.resource.ephemeralStorage in body must be of type string:
> "null", spec.podTemplate: Invalid value: "null": spec.podTemplate in body
> must be of type object: "null", spec.restartNonce: Invalid value: "null":
> spec.restartNonce in body must be of type integer: "null",
> spec.taskManager.replicas: Invalid value: "null": spec.taskManager.replicas
> in body must be of type integer: "null",
> spec.taskManager.resource.ephemeralStorage: Invalid value: "null":
> spec.taskManager.resource.ephemeralStorage in body must be of type string:
> "null", spec.taskManager.podTemplate: Invalid value: "null":
> spec.taskManager.podTemplate in body must be of type object: "null",
> spec.job: Invalid value: "null": spec.job in body must be of type object:
> "null", .spec.taskManager.replicas: Invalid value: 0:
> .spec.taskManager.replicas accessor error:  is of the type ,
> expected int64]. Received status: Status(apiVersion=v1, code=422,
> details=StatusDetails(causes=[StatusCause(field=spec.ingress,
> message=Invalid value: "null": spec.ingress in body must be of type object:
> "null", reason=FieldValueInvalid, additionalProperties={}),
> StatusCause(field=spec.mode, message=Invalid value: "null": spec.mode in
> body must be of type string: "null", reason=FieldValueInvalid,
> additionalProperties={}), StatusCause(field=spec.mode, message=Unsupported
> value: "null": supported values: "native", "standalone",
> reason=FieldValueNotSupported, additionalProperties={}),
> StatusCause(field=spec.logConfiguration, message=Invalid value: "null":
> spec.logConfiguration in body must be of type object: "null",
> reason=FieldValueInvalid, additionalProperties={}),
> StatusCause(field=spec.imagePullPolicy, message=Invalid value: "null":
> spec.imagePullPolicy in body must be of type string: "null",
> reason=FieldValueInvalid, additionalProperties={}),
> StatusCause(field=spec.jobManager.podTemplate, message=Invalid value:
> "null": spec.jobManager.podTemplate in body must be of type object: "null",
> reason=FieldValueInvalid, additionalProperties={}),
> StatusCause(field=spec.jobManager.resource.ephemeralStorage,
> message=Invalid value: "null": spec.jobManager.resource.ephemeralStorage in
> body must be of type string: "null", reason=FieldValueInvalid,
> additionalProperties={}), StatusCause(field=spec.podTemplate,
> message=Invalid value: "null": spec.podTemplate in body must be of type
> object: "null", reason=FieldValueInvalid, additionalProperties={}),
> StatusCause(field=spec.restartNonce, message=Invalid value: "null":
> spec.restartNonce in body must be of type integer: "null",
> reason=FieldValueInvalid, additionalProperties={}),
> StatusCause(field=spec.taskManager.replicas, message=Invalid value: "null":
> spec.taskManager.replicas in body must be of type integer: "null",
> reason=FieldValueInvalid, additionalProperties={}),
> StatusCause(field=spec.taskManager.resource.ephemeralStorage,
> message=Invalid value: "null": spec.taskManager.resource.ephemeralStorage
> in body must be of type string: "null", reason=FieldValueInvalid,
> additionalProperties={}), StatusCause(field=spec.taskManager.podTemplate,
> message=Invalid value: "null": 

Re: using flink retract stream and rockdb, too many intermediateresult of values cause checkpoint too heavy to finish

2021-12-16 Thread vtygoss
Hi  Arvid Heise,


Thanks for your reply! It's not classical sensor aggregation.  


The reason for not using window join is the very long time gap between 
patient's behaviors. 


There is a long gap of days even months between the appointment of doctor and 
the visit, and between tests and between hospitalization and discharge. It's a 
little like a specail session window having a very long gap, but it won't be a 
time or number based window. 


> actual use case? 
The actual use cases are based on this scenario, like doctors, patients, 
orders, visits, tests, hospitalization, nursing notes and so on. 
> What do i want to acheive? 
As mentioned above, during a long time zone, dozens of events continue to 
arrive for each patients, especally testing and nursing records. I hope that 
when the new record comes, the old result will be updated automatically. And i 
also hope the delay of the retraction and the re-sendition can be within 10 
minutes. 
> consumers of the produced dataset?
Data developers will build a data streaming production pipeline based on 
upstream datasets and produce new datasets; Data analysts will analyse data and 
model like the relationship between spending cost and medical outcomes; Doctor 
and nurse on duty will query all info of corresponding patient.   


Thanks for your any reply or suggestion. 


Best Regards!
2021-12-16 17:25:00


在 2021年12月16日 04:09,Arvid Heise 写道:


Can you please describe your actual use case? What do you want to achieve 
low-latency or high-throughput? What are the consumers of the produced dataset?



It sounds to me as if this is classical sensor aggregation. I have not heard of 
any sensor aggregation that doesn't use windowing. So you'd usually include a 
TUMBLE window of 10s and output the data in small batches. This would 
significantly reduce the pressure on the sink and may already solve some of 
your problems.



On Tue, Dec 14, 2021 at 4:29 AM Caizhi Weng  wrote:

Hi!


Changes of input tables will cause corresponding changes in output table


Which sink are you using? If it is an upsert sink then Flink SQL planner will 
filter out UPDATE_BEFORE messages automatically. Also if your sink supports 
something like "ignore delete messages" it can also filter out delete messages 
and affect the downstream less.


Mini-batch will also help in this case. If mini-batch is enabled, aggregations 
will only send updates to the downstream once per batch, thus decreasing the 
number of records flowing to downstream.


For better performance on aggregations you can also try local-global 
aggregations. See [1] for details.


Row-Based Storage


This depends on the format you use. Although Flink's current calculation model 
is row-based, it still supports column-based format like parquet and has a 
number of optimizations on it. If you enable mini-batch and two-staged 
aggregations most job will meet their performance needs.


[1] 
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/tuning/#local-global-aggregation


vtygoss  于2021年12月13日周一 17:13写道:

Hi, community!


I meet a problem in the procedure of building a streaming production pipeline 
using Flink retract stream and hudi-hdfs/kafka as storage engine and rocksdb as 
statebackend. 


In my scenario, 
- During a patient's hospitalization, multiple measurements of vital signs are 
recorded, including temperature, pulse, blood pressure and so on. 
- Each type of vital sign contains 20+ or more records with PRIMARY 
KEY(patientId, visitId, signType, time) in table tbl_vis_vital_signs mentioned 
in below code. 


And, i need to get all the vital sign records aggregations together through 
JOIN or COLLECT with FILTER, as code below. 


```
select pid, vid, 
collect(ROW(..., temperature,...)) filter(where signType='temprature') as 
temprature,
collect(ROW(..., pulse,..))filter(where signType='pulse') as pulse,
collect() filter(where ...) as bloodpressure
from tbl_vis_vital_signs 
group by pid, vid
```


With the help of FlinkCDC and Kafka/Hudi-Hdfs, we want to build streaming 
production pipeline, as the data flow below. 


DataBase--[CDC tools]-->   Kafka --[sync]--> Dynamic 
Table(kafka/hudi-hdfs)  --Flink SQL(retract stream) --> Dynamic Table 


The problem is contributed by three factors as following. 
1. Data Inflations:
1) major: Changes of input tables will cause corresponding changes in output 
table, e.g. join, aggregation. In the code above, every change of each row in 
tbl_vis_vital_signs will retract the out-dated result full of all vital signs' 
info and send new result. More serious, there are many vital sign records 
during per hospitalization, and cause too many times of retract and re-send 
operations which will be consumed by all downstreams.
2) minor: Each cdc update event will be split in two event: deletion of old 
record and insertion of new record. 
2. Kafka / Hudi-HDFS / RocksDB Append incremental data to full data: 
1) RocksDB and Hudi-HDFS use 

Re: using flink retract stream and rockdb, too many intermediate result of values cause checkpoint too heavy to finish

2021-12-15 Thread Arvid Heise
Can you please describe your actual use case? What do you want to achieve
low-latency or high-throughput? What are the consumers of the produced
dataset?

It sounds to me as if this is classical sensor aggregation. I have not
heard of any sensor aggregation that doesn't use windowing. So you'd
usually include a TUMBLE window of 10s and output the data in small
batches. This would significantly reduce the pressure on the sink and may
already solve some of your problems.

On Tue, Dec 14, 2021 at 4:29 AM Caizhi Weng  wrote:

> Hi!
>
> Changes of input tables will cause corresponding changes in output table
>
>
> Which sink are you using? If it is an upsert sink then Flink SQL planner
> will filter out UPDATE_BEFORE messages automatically. Also if your sink
> supports something like "ignore delete messages" it can also filter out
> delete messages and affect the downstream less.
>
> Mini-batch will also help in this case. If mini-batch is enabled,
> aggregations will only send updates to the downstream once per batch, thus
> decreasing the number of records flowing to downstream.
>
> For better performance on aggregations you can also try local-global
> aggregations. See [1] for details.
>
> Row-Based Storage
>
>
> This depends on the format you use. Although Flink's current calculation
> model is row-based, it still supports column-based format like parquet and
> has a number of optimizations on it. If you enable mini-batch and
> two-staged aggregations most job will meet their performance needs.
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/tuning/#local-global-aggregation
>
> vtygoss  于2021年12月13日周一 17:13写道:
>
>> Hi, community!
>>
>>
>> I meet a problem in the procedure of building a streaming production
>> pipeline using Flink retract stream and hudi-hdfs/kafka as storage engine
>> and rocksdb as statebackend.
>>
>>
>> In my scenario,
>>
>> - During a patient's hospitalization, multiple measurements of vital
>> signs are recorded, including temperature, pulse, blood pressure and so on.
>>
>> - Each type of vital sign contains 20+ or more records with PRIMARY
>> KEY(patientId, visitId, signType, time) in table tbl_vis_vital_signs
>> mentioned in below code.
>>
>>
>> And, i need to get all the vital sign records aggregations together
>> through JOIN or COLLECT with FILTER, as code below.
>>
>>
>> ```
>>
>> select pid, vid,
>>
>> collect(ROW(..., temperature,...)) filter(where signType='temprature') as
>> temprature,
>>
>> collect(ROW(..., pulse,..))filter(where signType='pulse') as pulse,
>>
>> collect() filter(where ...) as bloodpressure
>>
>> from tbl_vis_vital_signs
>>
>> group by pid, vid
>>
>> ```
>>
>>
>> With the help of FlinkCDC and Kafka/Hudi-Hdfs, we want to build streaming
>> production pipeline, as the data flow below.
>>
>>
>> DataBase--[CDC tools]-->   Kafka --[sync]--> Dynamic
>> Table(kafka/hudi-hdfs)  --Flink SQL(retract stream) --> Dynamic Table
>>
>>
>> The problem is contributed by three factors as following.
>>
>> 1. Data Inflations:
>>
>> 1) major: Changes of input tables will cause corresponding changes in
>> output table, e.g. join, aggregation. In the code above, every change of
>> each row in tbl_vis_vital_signs will retract the out-dated result full of
>> all vital signs' info and send new result. More serious, there are many
>> vital sign records during per hospitalization, and cause too many times of
>> retract and re-send operations which will be consumed by all downstreams.
>>
>> 2) minor: Each cdc update event will be split in two event: deletion of
>> old record and insertion of new record.
>>
>> 2. Kafka / Hudi-HDFS / RocksDB Append incremental data to full data:
>>
>> 1) RocksDB and Hudi-HDFS use incremental model like LSM, they append
>> incremental events to full, no matter insertion or deletion.
>>
>> 2) Even upsert-kafka, is implemented by inserting tombstones.
>>
>> 3. Row-Based Storage
>>
>>
>> In my scenario, these factors will cause problems:
>>
>> 1. A large number of low meaning intermediate results of PrimaryKey
>> consume throughput of Flink Application.
>>
>> 2. Heavy checkpoint: In every checkpoint(aligned, every 10 sec),
>> the incremental block data of rocksdb is over a few of GB, and it takes
>> over a few minutes if succussfully. But only a few GB data exists in HDFS
>> checkpoint directory.
>>
>> 3. Low performance of application and low stablity of TaskManager JVM.
>>
>>
>> So, does mini-batch have an improvement of this scenario?
>>
>> Thanks for your any reply or suggestions.
>>
>>
>> Best Regards!
>>
>>
>> 2021-12-13 17:10:00
>>
>>
>>
>>
>>


Re: using flink retract stream and rockdb, too many intermediate result of values cause checkpoint too heavy to finish

2021-12-13 Thread Caizhi Weng
Hi!

Changes of input tables will cause corresponding changes in output table


Which sink are you using? If it is an upsert sink then Flink SQL planner
will filter out UPDATE_BEFORE messages automatically. Also if your sink
supports something like "ignore delete messages" it can also filter out
delete messages and affect the downstream less.

Mini-batch will also help in this case. If mini-batch is enabled,
aggregations will only send updates to the downstream once per batch, thus
decreasing the number of records flowing to downstream.

For better performance on aggregations you can also try local-global
aggregations. See [1] for details.

Row-Based Storage


This depends on the format you use. Although Flink's current calculation
model is row-based, it still supports column-based format like parquet and
has a number of optimizations on it. If you enable mini-batch and
two-staged aggregations most job will meet their performance needs.

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/tuning/#local-global-aggregation

vtygoss  于2021年12月13日周一 17:13写道:

> Hi, community!
>
>
> I meet a problem in the procedure of building a streaming production
> pipeline using Flink retract stream and hudi-hdfs/kafka as storage engine
> and rocksdb as statebackend.
>
>
> In my scenario,
>
> - During a patient's hospitalization, multiple measurements of vital signs
> are recorded, including temperature, pulse, blood pressure and so on.
>
> - Each type of vital sign contains 20+ or more records with PRIMARY
> KEY(patientId, visitId, signType, time) in table tbl_vis_vital_signs
> mentioned in below code.
>
>
> And, i need to get all the vital sign records aggregations together
> through JOIN or COLLECT with FILTER, as code below.
>
>
> ```
>
> select pid, vid,
>
> collect(ROW(..., temperature,...)) filter(where signType='temprature') as
> temprature,
>
> collect(ROW(..., pulse,..))filter(where signType='pulse') as pulse,
>
> collect() filter(where ...) as bloodpressure
>
> from tbl_vis_vital_signs
>
> group by pid, vid
>
> ```
>
>
> With the help of FlinkCDC and Kafka/Hudi-Hdfs, we want to build streaming
> production pipeline, as the data flow below.
>
>
> DataBase--[CDC tools]-->   Kafka --[sync]--> Dynamic
> Table(kafka/hudi-hdfs)  --Flink SQL(retract stream) --> Dynamic Table
>
>
> The problem is contributed by three factors as following.
>
> 1. Data Inflations:
>
> 1) major: Changes of input tables will cause corresponding changes in
> output table, e.g. join, aggregation. In the code above, every change of
> each row in tbl_vis_vital_signs will retract the out-dated result full of
> all vital signs' info and send new result. More serious, there are many
> vital sign records during per hospitalization, and cause too many times of
> retract and re-send operations which will be consumed by all downstreams.
>
> 2) minor: Each cdc update event will be split in two event: deletion of
> old record and insertion of new record.
>
> 2. Kafka / Hudi-HDFS / RocksDB Append incremental data to full data:
>
> 1) RocksDB and Hudi-HDFS use incremental model like LSM, they append
> incremental events to full, no matter insertion or deletion.
>
> 2) Even upsert-kafka, is implemented by inserting tombstones.
>
> 3. Row-Based Storage
>
>
> In my scenario, these factors will cause problems:
>
> 1. A large number of low meaning intermediate results of PrimaryKey
> consume throughput of Flink Application.
>
> 2. Heavy checkpoint: In every checkpoint(aligned, every 10 sec),
> the incremental block data of rocksdb is over a few of GB, and it takes
> over a few minutes if succussfully. But only a few GB data exists in HDFS
> checkpoint directory.
>
> 3. Low performance of application and low stablity of TaskManager JVM.
>
>
> So, does mini-batch have an improvement of this scenario?
>
> Thanks for your any reply or suggestions.
>
>
> Best Regards!
>
>
> 2021-12-13 17:10:00
>
>
>
>
>


Re: Using flink-connector-kafka-1.9.1 with flink-core-1.7.2

2020-05-18 Thread Arvid Heise
Hi Nick,

yes, you can be lucky that no involved classes have changed (much), but
there is no guarantee.
You could try to fiddle around and add the respective class (
*ClosureCleanerLevel)* from Flink 1.9 in your jar, but it's hacky at best.

Another option is to bundle Flink 1.9 with your code if you cannot upgrade
the Flink cluster. That works, for example, when working with Yarn.

On Thu, May 14, 2020 at 3:22 PM Nick Bendtner  wrote:

> Hi Arvid,
> I had no problems using flink Kafka connector 1.8.0 with flink 1.7.2 core
> .
>
> Best
> Nick
>
> On Thu, May 7, 2020 at 1:34 AM Arvid Heise  wrote:
>
>> Hi Nick,
>>
>> all Flink dependencies are only compatible with the same major version.
>>
>> You can workaround it by checking out the code [1] and manually set the
>> dependency of the respective module to your flink-core version and revert
>> all changes that are not compiling. But there is no guarantee that this
>> will ultimately work as you are pretty much backporting some changes to the
>> old version.
>>
>> [1] https://github.com/AHeise/flink
>>
>> On Thu, May 7, 2020 at 2:35 AM Nick Bendtner  wrote:
>>
>>> Hi guys,
>>> I am using flink 1.7.2 version. I have to deserialize data from kafka
>>> into  consumer records therefore I decided to update
>>> the flink-connector-kafka to 1.9.1 which provides support for consumer
>>> record. We use child first class loading. However it seems like I have
>>> compatibility issue as I get this exception, *Exception in thread
>>> "main" java.lang.NoClassDefFoundError:
>>> org/apache/flink/api/common/ExecutionConfig$ClosureCleanerLevel *.
>>>
>>> Any tricks to make this work without changing the version of flink-core
>>> ?
>>>
>>>
>>> Best,
>>> Nick.
>>>
>>>
>>
>> --
>>
>> Arvid Heise | Senior Java Developer
>>
>> 
>>
>> Follow us @VervericaData
>>
>> --
>>
>> Join Flink Forward  - The Apache Flink
>> Conference
>>
>> Stream Processing | Event Driven | Real Time
>>
>> --
>>
>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>> 
>>
>> --
>> Ververica GmbH
>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
>> (Toni) Cheng
>>
>

-- 

Arvid Heise | Senior Java Developer



Follow us @VervericaData

--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng


Re: Using flink-connector-kafka-1.9.1 with flink-core-1.7.2

2020-05-07 Thread Arvid Heise
Hi Nick,

all Flink dependencies are only compatible with the same major version.

You can workaround it by checking out the code [1] and manually set the
dependency of the respective module to your flink-core version and revert
all changes that are not compiling. But there is no guarantee that this
will ultimately work as you are pretty much backporting some changes to the
old version.

[1] https://github.com/AHeise/flink

On Thu, May 7, 2020 at 2:35 AM Nick Bendtner  wrote:

> Hi guys,
> I am using flink 1.7.2 version. I have to deserialize data from kafka
> into  consumer records therefore I decided to update
> the flink-connector-kafka to 1.9.1 which provides support for consumer
> record. We use child first class loading. However it seems like I have
> compatibility issue as I get this exception, *Exception in thread "main"
> java.lang.NoClassDefFoundError:
> org/apache/flink/api/common/ExecutionConfig$ClosureCleanerLevel *.
>
> Any tricks to make this work without changing the version of flink-core ?
>
>
> Best,
> Nick.
>
>

-- 

Arvid Heise | Senior Java Developer



Follow us @VervericaData

--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng


Re: Using Flink in an university course

2019-03-06 Thread Wouter Zorgdrager
Hi all,

Thanks for the input. Much appreciated.

Regards,
Wouter

Op ma 4 mrt. 2019 om 20:40 schreef Addison Higham :

> Hi there,
>
> As far as a runtime for students, it seems like docker is your best bet.
> However, you could have them instead package a jar using some interface
> (for example, see
> https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/packaging.html,
> which details the `Program` interface) and then execute it inside a custom
> runner. That *might* result in something less prone to breakage as it would
> need to conform to an interface, but it may require a fair amount of custom
> code to reduce the boiler plate to build up a program plan as well as the
> custom runner. The code for how flink loads a jar and turns it into
> something it can execute is mostly encapsulated
> in org.apache.flink.client.program.PackagedProgram, which might be a good
> thing to read and understand if you go down this route.
>
> If you want to give more insight, you could build some tooling to traverse
> the underlying graphs that the students build up in their data stream
> application. For example, calling
> `StreamExecutionEnvironment.getStreamGraph` after the data stream is built
> will get a graph of the current job, which you can then use to traverse a
> graph and see which operators and edges are in use. This is very similar to
> the process flink uses to build the job DAG it renders in the UI. I am not
> sure what you could do as an automated analysis, but the StreamGraph API is
> quite low level and exposes a lot of information about the program.
>
> Hopefully that is a little bit helpful. Good luck and sounds like a fun
> course!
>
>
> On Mon, Mar 4, 2019 at 7:16 AM Wouter Zorgdrager <
> w.d.zorgdra...@tudelft.nl> wrote:
>
>> Hey all,
>>
>> Thanks for the replies. The issues we were running into (which are not
>> specific to Docker):
>> - Students changing the template wrongly failed the container.
>> - We give full points if the output matches our solutions (and none
>> otherwise), but it would be nice if we could give partial grades per
>> assignment (and better feedback). This would require instead of looking
>> only at results also at the operators used. The pitfall is that in many
>> cases a correct solution can be achieved in multiple ways. I came across a
>> Flink test library [1] which allows to test Flink code more extensively but
>> seems to be only in Java.
>>
>> In retrospective, I do think using Docker is a good approach as Fabian
>> confirms. However, the way we currently assess student solutions might be
>> improved. I assume that in your trainings manual feedback is given, but
>> unfortunately this is quite difficult for so many students.
>>
>> Cheers,
>> Wouter
>>
>> 1: https://github.com/ottogroup/flink-spector
>>
>>
>> Op ma 4 mrt. 2019 om 14:39 schreef Fabian Hueske :
>>
>>> Hi Wouter,
>>>
>>> We are using Docker Compose (Flink JM, Flink TM, Kafka, Zookeeper)
>>> setups for our trainings and it is working very well.
>>> We have an additional container that feeds a Kafka topic via the
>>> commandline producer to simulate a somewhat realistic behavior.
>>> Of course, you can do it without Kafka as and use some kind of data
>>> generating source that reads from a file that is replace for evaluation.
>>>
>>> The biggest benefit that I see with using Docker is that the students
>>> have an environment that is close to grading situation for development and
>>> testing.
>>> You do not need to provide infrastructure but everyone is running it
>>> locally in a well-defined context.
>>>
>>> So, as Joern said, what problems do you see with Docker?
>>>
>>> Best,
>>> Fabian
>>>
>>> Am Mo., 4. März 2019 um 13:44 Uhr schrieb Jörn Franke <
>>> jornfra...@gmail.com>:
>>>
 It would help to understand the current issues that you have with this
 approach? I used a similar approach (not with Flink, but a similar big data
 technology) some years ago

 > Am 04.03.2019 um 11:32 schrieb Wouter Zorgdrager <
 w.d.zorgdra...@tudelft.nl>:
 >
 > Hi all,
 >
 > I'm working on a setup to use Apache Flink in an assignment for a Big
 Data (bachelor) university course and I'm interested in your view on this.
 To sketch the situation:
 > -  > 200 students follow this course
 > - students have to write some (simple) Flink applications using the
 DataStream API; the focus is on writing the transformation code
 > - students need to write Scala code
 > - we provide a dataset and a template (Scala class) with function
 signatures and detailed description per application.
 > e.g.: def assignment_one(input: DataStream[Event]):
 DataStream[(String, Int)] = ???
 > - we provide some setup code like parsing of data and setting up the
 streaming environment
 > - assignments need to be auto-graded, based on correct results
 >
 > In last years course edition we approached this by a custom Docker
 container. This 

Re: Using Flink in an university course

2019-03-04 Thread Addison Higham
Hi there,

As far as a runtime for students, it seems like docker is your best bet.
However, you could have them instead package a jar using some interface
(for example, see
https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/packaging.html,
which details the `Program` interface) and then execute it inside a custom
runner. That *might* result in something less prone to breakage as it would
need to conform to an interface, but it may require a fair amount of custom
code to reduce the boiler plate to build up a program plan as well as the
custom runner. The code for how flink loads a jar and turns it into
something it can execute is mostly encapsulated
in org.apache.flink.client.program.PackagedProgram, which might be a good
thing to read and understand if you go down this route.

If you want to give more insight, you could build some tooling to traverse
the underlying graphs that the students build up in their data stream
application. For example, calling
`StreamExecutionEnvironment.getStreamGraph` after the data stream is built
will get a graph of the current job, which you can then use to traverse a
graph and see which operators and edges are in use. This is very similar to
the process flink uses to build the job DAG it renders in the UI. I am not
sure what you could do as an automated analysis, but the StreamGraph API is
quite low level and exposes a lot of information about the program.

Hopefully that is a little bit helpful. Good luck and sounds like a fun
course!


On Mon, Mar 4, 2019 at 7:16 AM Wouter Zorgdrager 
wrote:

> Hey all,
>
> Thanks for the replies. The issues we were running into (which are not
> specific to Docker):
> - Students changing the template wrongly failed the container.
> - We give full points if the output matches our solutions (and none
> otherwise), but it would be nice if we could give partial grades per
> assignment (and better feedback). This would require instead of looking
> only at results also at the operators used. The pitfall is that in many
> cases a correct solution can be achieved in multiple ways. I came across a
> Flink test library [1] which allows to test Flink code more extensively but
> seems to be only in Java.
>
> In retrospective, I do think using Docker is a good approach as Fabian
> confirms. However, the way we currently assess student solutions might be
> improved. I assume that in your trainings manual feedback is given, but
> unfortunately this is quite difficult for so many students.
>
> Cheers,
> Wouter
>
> 1: https://github.com/ottogroup/flink-spector
>
>
> Op ma 4 mrt. 2019 om 14:39 schreef Fabian Hueske :
>
>> Hi Wouter,
>>
>> We are using Docker Compose (Flink JM, Flink TM, Kafka, Zookeeper) setups
>> for our trainings and it is working very well.
>> We have an additional container that feeds a Kafka topic via the
>> commandline producer to simulate a somewhat realistic behavior.
>> Of course, you can do it without Kafka as and use some kind of data
>> generating source that reads from a file that is replace for evaluation.
>>
>> The biggest benefit that I see with using Docker is that the students
>> have an environment that is close to grading situation for development and
>> testing.
>> You do not need to provide infrastructure but everyone is running it
>> locally in a well-defined context.
>>
>> So, as Joern said, what problems do you see with Docker?
>>
>> Best,
>> Fabian
>>
>> Am Mo., 4. März 2019 um 13:44 Uhr schrieb Jörn Franke <
>> jornfra...@gmail.com>:
>>
>>> It would help to understand the current issues that you have with this
>>> approach? I used a similar approach (not with Flink, but a similar big data
>>> technology) some years ago
>>>
>>> > Am 04.03.2019 um 11:32 schrieb Wouter Zorgdrager <
>>> w.d.zorgdra...@tudelft.nl>:
>>> >
>>> > Hi all,
>>> >
>>> > I'm working on a setup to use Apache Flink in an assignment for a Big
>>> Data (bachelor) university course and I'm interested in your view on this.
>>> To sketch the situation:
>>> > -  > 200 students follow this course
>>> > - students have to write some (simple) Flink applications using the
>>> DataStream API; the focus is on writing the transformation code
>>> > - students need to write Scala code
>>> > - we provide a dataset and a template (Scala class) with function
>>> signatures and detailed description per application.
>>> > e.g.: def assignment_one(input: DataStream[Event]):
>>> DataStream[(String, Int)] = ???
>>> > - we provide some setup code like parsing of data and setting up the
>>> streaming environment
>>> > - assignments need to be auto-graded, based on correct results
>>> >
>>> > In last years course edition we approached this by a custom Docker
>>> container. This container first compiled the students code, run all the
>>> Flink applications against a different dataset and then verified the output
>>> against our solutions. This was turned into a grade and reported back to
>>> the student. Although this was a working approach, I think we can 

Re: Using Flink in an university course

2019-03-04 Thread Wouter Zorgdrager
Hey all,

Thanks for the replies. The issues we were running into (which are not
specific to Docker):
- Students changing the template wrongly failed the container.
- We give full points if the output matches our solutions (and none
otherwise), but it would be nice if we could give partial grades per
assignment (and better feedback). This would require instead of looking
only at results also at the operators used. The pitfall is that in many
cases a correct solution can be achieved in multiple ways. I came across a
Flink test library [1] which allows to test Flink code more extensively but
seems to be only in Java.

In retrospective, I do think using Docker is a good approach as Fabian
confirms. However, the way we currently assess student solutions might be
improved. I assume that in your trainings manual feedback is given, but
unfortunately this is quite difficult for so many students.

Cheers,
Wouter

1: https://github.com/ottogroup/flink-spector


Op ma 4 mrt. 2019 om 14:39 schreef Fabian Hueske :

> Hi Wouter,
>
> We are using Docker Compose (Flink JM, Flink TM, Kafka, Zookeeper) setups
> for our trainings and it is working very well.
> We have an additional container that feeds a Kafka topic via the
> commandline producer to simulate a somewhat realistic behavior.
> Of course, you can do it without Kafka as and use some kind of data
> generating source that reads from a file that is replace for evaluation.
>
> The biggest benefit that I see with using Docker is that the students have
> an environment that is close to grading situation for development and
> testing.
> You do not need to provide infrastructure but everyone is running it
> locally in a well-defined context.
>
> So, as Joern said, what problems do you see with Docker?
>
> Best,
> Fabian
>
> Am Mo., 4. März 2019 um 13:44 Uhr schrieb Jörn Franke <
> jornfra...@gmail.com>:
>
>> It would help to understand the current issues that you have with this
>> approach? I used a similar approach (not with Flink, but a similar big data
>> technology) some years ago
>>
>> > Am 04.03.2019 um 11:32 schrieb Wouter Zorgdrager <
>> w.d.zorgdra...@tudelft.nl>:
>> >
>> > Hi all,
>> >
>> > I'm working on a setup to use Apache Flink in an assignment for a Big
>> Data (bachelor) university course and I'm interested in your view on this.
>> To sketch the situation:
>> > -  > 200 students follow this course
>> > - students have to write some (simple) Flink applications using the
>> DataStream API; the focus is on writing the transformation code
>> > - students need to write Scala code
>> > - we provide a dataset and a template (Scala class) with function
>> signatures and detailed description per application.
>> > e.g.: def assignment_one(input: DataStream[Event]): DataStream[(String,
>> Int)] = ???
>> > - we provide some setup code like parsing of data and setting up the
>> streaming environment
>> > - assignments need to be auto-graded, based on correct results
>> >
>> > In last years course edition we approached this by a custom Docker
>> container. This container first compiled the students code, run all the
>> Flink applications against a different dataset and then verified the output
>> against our solutions. This was turned into a grade and reported back to
>> the student. Although this was a working approach, I think we can do better.
>> >
>> > I'm wondering if any of you have experience with using Apache Flink in
>> a university course (or have seen this somewhere) as well as assessing
>> Flink code.
>> >
>> > Thanks a lot!
>> >
>> > Kind regards,
>> > Wouter Zorgdrager
>>
>


Re: Using Flink in an university course

2019-03-04 Thread Fabian Hueske
Hi Wouter,

We are using Docker Compose (Flink JM, Flink TM, Kafka, Zookeeper) setups
for our trainings and it is working very well.
We have an additional container that feeds a Kafka topic via the
commandline producer to simulate a somewhat realistic behavior.
Of course, you can do it without Kafka as and use some kind of data
generating source that reads from a file that is replace for evaluation.

The biggest benefit that I see with using Docker is that the students have
an environment that is close to grading situation for development and
testing.
You do not need to provide infrastructure but everyone is running it
locally in a well-defined context.

So, as Joern said, what problems do you see with Docker?

Best,
Fabian

Am Mo., 4. März 2019 um 13:44 Uhr schrieb Jörn Franke :

> It would help to understand the current issues that you have with this
> approach? I used a similar approach (not with Flink, but a similar big data
> technology) some years ago
>
> > Am 04.03.2019 um 11:32 schrieb Wouter Zorgdrager <
> w.d.zorgdra...@tudelft.nl>:
> >
> > Hi all,
> >
> > I'm working on a setup to use Apache Flink in an assignment for a Big
> Data (bachelor) university course and I'm interested in your view on this.
> To sketch the situation:
> > -  > 200 students follow this course
> > - students have to write some (simple) Flink applications using the
> DataStream API; the focus is on writing the transformation code
> > - students need to write Scala code
> > - we provide a dataset and a template (Scala class) with function
> signatures and detailed description per application.
> > e.g.: def assignment_one(input: DataStream[Event]): DataStream[(String,
> Int)] = ???
> > - we provide some setup code like parsing of data and setting up the
> streaming environment
> > - assignments need to be auto-graded, based on correct results
> >
> > In last years course edition we approached this by a custom Docker
> container. This container first compiled the students code, run all the
> Flink applications against a different dataset and then verified the output
> against our solutions. This was turned into a grade and reported back to
> the student. Although this was a working approach, I think we can do better.
> >
> > I'm wondering if any of you have experience with using Apache Flink in a
> university course (or have seen this somewhere) as well as assessing Flink
> code.
> >
> > Thanks a lot!
> >
> > Kind regards,
> > Wouter Zorgdrager
>


Re: Using Flink in an university course

2019-03-04 Thread Jörn Franke
It would help to understand the current issues that you have with this 
approach? I used a similar approach (not with Flink, but a similar big data 
technology) some years ago

> Am 04.03.2019 um 11:32 schrieb Wouter Zorgdrager :
> 
> Hi all,
> 
> I'm working on a setup to use Apache Flink in an assignment for a Big Data 
> (bachelor) university course and I'm interested in your view on this. To 
> sketch the situation:
> -  > 200 students follow this course
> - students have to write some (simple) Flink applications using the 
> DataStream API; the focus is on writing the transformation code
> - students need to write Scala code
> - we provide a dataset and a template (Scala class) with function signatures 
> and detailed description per application.
> e.g.: def assignment_one(input: DataStream[Event]): DataStream[(String, Int)] 
> = ???
> - we provide some setup code like parsing of data and setting up the 
> streaming environment
> - assignments need to be auto-graded, based on correct results
> 
> In last years course edition we approached this by a custom Docker container. 
> This container first compiled the students code, run all the Flink 
> applications against a different dataset and then verified the output against 
> our solutions. This was turned into a grade and reported back to the student. 
> Although this was a working approach, I think we can do better.
> 
> I'm wondering if any of you have experience with using Apache Flink in a 
> university course (or have seen this somewhere) as well as assessing Flink 
> code.
> 
> Thanks a lot!
> 
> Kind regards,
> Wouter Zorgdrager


Re: Using Flink Ml with DataStream

2017-11-03 Thread Adarsh Jain
Hi Chesnay,

Thanks for the reply, do you know how to serve using the trained model?

Where is the model saved?

Regards,
Adarsh



‌

On Wed, Nov 1, 2017 at 4:46 PM, Chesnay Schepler  wrote:

> I don't believe this to be possible. The ML library works exclusively with
> the Batch API.
>
>
> On 30.10.2017 12:52, Adarsh Jain wrote:
>
>
> Hi,
>
> Is there a way to use Stochastic Outlier Selection (SOS) and/or SVM using
> CoCoA with streaming data.
>
> Please suggest and give pointers.
>
> Regards,
> Adarsh
>
> ‌
>
>
>


Re: Using Flink Ml with DataStream

2017-11-01 Thread Chesnay Schepler
I don't believe this to be possible. The ML library works exclusively 
with the Batch API.


On 30.10.2017 12:52, Adarsh Jain wrote:


Hi,

Is there a way to use Stochastic Outlier Selection (SOS) and/or SVM 
using CoCoA with streaming data.


Please suggest and give pointers.

Regards,
Adarsh

‌





Re: Using Flink with Accumulo

2016-11-07 Thread Oliver Swoboda
Hi Josh, thank you for your quick answer!

2016-11-03 17:03 GMT+01:00 Josh Elser :

> Hi Oliver,
>
> Cool stuff. I wish I knew more about Flink to make some better
> suggestions. Some points inline, and sorry in advance if I suggest
> something outright wrong. Hopefully someone from the Flink side can help
> give context where necessary :)
>
> Oliver Swoboda wrote:
>
>> Hello,
>>
>> I'm using Flink with Accumulo and wanted to read data from the database
>> by using the createHadoopInput function. Therefore I configure an
>> AccumuloInputFormat. The source code you can find here:
>> https://github.com/OSwoboda/masterthesis/blob/master/aggrega
>> tion.flink/src/main/java/de/oswoboda/aggregation/Main.java
>> > ation.flink/src/main/java/de/oswoboda/aggregation/Main.java>
>>
>> I'm using a 5 Node Cluster (1 Master, 4 Worker).
>> Accumulo is installed with Ambari and has 1 Master Server on the Master
>> Node and 4 Tablet Servers (one on each Worker).
>> Flink is installed standalone with the Jobmanager on the Master Node and
>> 4 Taskmanagers (one on each Worker). Every Taskmanager can have 4 Tasks,
>> so there are 32 in total.
>>
>> First problem I have:
>> If I start serveral Flink Jobs the client count for Zookeeper in the
>> Accumulo Overview is constantly increasing. I assume that the used
>> scanner isn't correctly closed. The client count only decreases to
>> normal values when I restart Flink.
>>
>
> Hrm, this does seem rather bad. Eventually, you'll saturate the
> connections to ZK and ZK itself will start limiting new connections (per
> the maxClientCnxns property).
>
> This sounds somewhat familiar to https://issues.apache.org/jira
> /browse/ACCUMULO-2113. The lack of a proper "close()" method on the
> Instance interface is a known deficiency. I'm not sure how Flink execution
> happens, so I am kind of just guessing.
>
> You might be able to try to use the CleanUp[1] utility to close out the
> thread pools/connections when your Flink "task" is done.


Unfortunately that didn't worked. I guess because Flink is starting the
tasks with the scanners by a TaskManager and I can't access those tasks
with my program. So after the task is done, I can't close the connections
with the utility, because the thread where I use it hasn't startet the
scanners.

Second problem I have:
>> I want to compare aggregations on time series data with Accumulo (with
>> Iterators) and with flink. Unfortunately, the results vary inexplicable
>> when I'm using Flink. I wanted to compare the results for a full table
>> scan (called baseline in the code), but sometimes it takes 17-18 minutes
>> and sometimes its between 30 and 60 minutes. In the longer case I can
>> see in the Accumulo Overview that after some time only one worker is
>> left with running scans and there are just a few entries/s sanned (4
>> million at the beginning when all workers are running to 200k when the
>> one worker is left). Because there are 2.5 billion records to scan and
>> almost 500 million left it takes really long.
>> This problem doesn't occur with Accumulo using Iterators and a batch
>> scanner on the master node, each scan has almost identical durations and
>> graphics in the Accumulo Overview for entries/s, MB/s scanned and seeks
>> are for each scan the same.
>>
>
> It sounds like maybe your partitioning was sub-optimal and caused one task
> to get a majority of the data? Having the autoAdjustRanges=true (as you do
> by default) should help get many batches of work based on the tablet
> boundaries in Accumulo. I'm not sure how Flink actually executes them
> though.
>

The problem was that half of the data was on one node after a restart of
accumulo. It seems that it has something to do with the problem described
here: https://issues.apache.org/jira/browse/ACCUMULO-4353. I stopped and
then startet accumulo instead of doing a restart and then the data is
distributed evenly across all nodes. For my tests I keep accumulo running
now, because after each restart the data distribution is changed and I
don't want to upgrade to 1.8.

Yours faithfully,
>> Oliver Swoboda
>>
>
>
> [1] https://github.com/apache/accumulo/blob/e900e67425d950bd4c0c
> 5288a6270d7b362ac458/core/src/main/java/org/apache/accumulo/
> core/util/CleanUp.java#L36
>
>


Re: Using Flink with Accumulo

2016-11-03 Thread Josh Elser

Hi Oliver,

Cool stuff. I wish I knew more about Flink to make some better 
suggestions. Some points inline, and sorry in advance if I suggest 
something outright wrong. Hopefully someone from the Flink side can help 
give context where necessary :)


Oliver Swoboda wrote:

Hello,

I'm using Flink with Accumulo and wanted to read data from the database
by using the createHadoopInput function. Therefore I configure an
AccumuloInputFormat. The source code you can find here:
https://github.com/OSwoboda/masterthesis/blob/master/aggregation.flink/src/main/java/de/oswoboda/aggregation/Main.java


I'm using a 5 Node Cluster (1 Master, 4 Worker).
Accumulo is installed with Ambari and has 1 Master Server on the Master
Node and 4 Tablet Servers (one on each Worker).
Flink is installed standalone with the Jobmanager on the Master Node and
4 Taskmanagers (one on each Worker). Every Taskmanager can have 4 Tasks,
so there are 32 in total.

First problem I have:
If I start serveral Flink Jobs the client count for Zookeeper in the
Accumulo Overview is constantly increasing. I assume that the used
scanner isn't correctly closed. The client count only decreases to
normal values when I restart Flink.


Hrm, this does seem rather bad. Eventually, you'll saturate the 
connections to ZK and ZK itself will start limiting new connections (per 
the maxClientCnxns property).


This sounds somewhat familiar to 
https://issues.apache.org/jira/browse/ACCUMULO-2113. The lack of a 
proper "close()" method on the Instance interface is a known deficiency. 
I'm not sure how Flink execution happens, so I am kind of just guessing.


You might be able to try to use the CleanUp[1] utility to close out the 
thread pools/connections when your Flink "task" is done.



Second problem I have:
I want to compare aggregations on time series data with Accumulo (with
Iterators) and with flink. Unfortunately, the results vary inexplicable
when I'm using Flink. I wanted to compare the results for a full table
scan (called baseline in the code), but sometimes it takes 17-18 minutes
and sometimes its between 30 and 60 minutes. In the longer case I can
see in the Accumulo Overview that after some time only one worker is
left with running scans and there are just a few entries/s sanned (4
million at the beginning when all workers are running to 200k when the
one worker is left). Because there are 2.5 billion records to scan and
almost 500 million left it takes really long.
This problem doesn't occur with Accumulo using Iterators and a batch
scanner on the master node, each scan has almost identical durations and
graphics in the Accumulo Overview for entries/s, MB/s scanned and seeks
are for each scan the same.


It sounds like maybe your partitioning was sub-optimal and caused one 
task to get a majority of the data? Having the autoAdjustRanges=true (as 
you do by default) should help get many batches of work based on the 
tablet boundaries in Accumulo. I'm not sure how Flink actually executes 
them though.



Yours faithfully,
Oliver Swoboda



[1] 
https://github.com/apache/accumulo/blob/e900e67425d950bd4c0c5288a6270d7b362ac458/core/src/main/java/org/apache/accumulo/core/util/CleanUp.java#L36




Re: Using Flink

2016-10-04 Thread Tzu-Li (Gordon) Tai
Hi Govindarajan,

Regarding the stagnant Kakfa offsets, it’ll be helpful if you can supply more 
information for the following to help us identify the cause:
1. What is your checkpointing interval set to?
2. Did you happen to have set the “max.partition.fetch.bytes” property in the 
properties given to FlinkKafkaConsumer? I’m suspecting with some recent changes 
to the offset committing, large fetches can also affect when offsets are 
committed to Kafka.
3. I’m assuming that you’ve built the Kafka connector from source. Could you 
tell which commit it was built on?

If you could, you can also reply with the taskmanager logs (or via private 
email) so we can check in detail, that would definitely be helpful!

Best Regards,
Gordon


On October 4, 2016 at 3:51:59 PM, Till Rohrmann (trohrm...@apache.org) wrote:

Hi Govindarajan,

you can broadcast the stream with debug logger information by calling 
`stream.broadcast`. Then every stream record should be send to all sub-tasks of 
the downstream operator.

Cheers,
Till

On Mon, Oct 3, 2016 at 5:13 PM, Govindarajan Srinivasaraghavan 
 wrote:
Hi Gordon,

- I'm currently running my flink program on 1.2 SNAPSHOT with kafka source and 
I have checkpoint enabled. When I look at the consumer offsets in kafka it 
appears to be stagnant and there is a huge lag. But I can see my flink program 
is in pace with kafka source in JMX metrics and outputs. Is there a way to 
identify why the offsets are not committed to kafka?

On which commit was your Kafka connector built? There was a recent change to 
the offset committing for Kafka 0.9 consumer, so identifying the exact commit 
will help clarify whether the recent change introduced any new problems. Also, 
what is your configured checkpoint interval? When checkpointing is enabled, the 
Kafka consumer only commits to Kafka when checkpoints are completed. So, 
offsets in Kafka are not updated until the next checkpoint is triggered.

I am using 1.2-SNAPSHOT
'org.apache.flink', name: 'flink-connector-kafka-base_2.11', version: 
'1.2-SNAPSHOT'
'org.apache.flink', name: 'flink-connector-kafka-0.9_2.11', version: 
'1.2-SNAPSHOT'

- In my current application we custom loggers for debugging purposes. Let’s say 
we want to find what’s happening for a particular user, we fire an api request 
to add the custom logger for that particular user and use it for logging along 
the data path. Is there a way to achieve this in flink? Are there any global 
mutable parameters that I can use to achieve this functionality?

I’m not sure if I understand the use case correctly, but it seems like you will 
need to change configuration / behaviour of a specific Flink operator at 
runtime. To my knowledge, the best way to do this in Flink right now is to 
translate your original logging-trigger api requests to a stream of events fed 
to Flink. This stream of events will then basically be changes of your user 
logger behaviour, and your operators can change its logging behaviour according 
to this stream.

I can send the changes as streams, but I need this change for all the operators 
in my pipeline. Instead of using coflatmap at each operator to combine the 
streams, is there a way to send a change to all the operators?

- Can I pass on state between operators? If I need the state stored on previous 
operators, how can I fetch it?
I don’t think this is possible.
Fine, thanks.

Thanks.

On Mon, Oct 3, 2016 at 12:23 AM, Tzu-Li (Gordon) Tai  
wrote:
Hi!

- I'm currently running my flink program on 1.2 SNAPSHOT with kafka source and 
I have checkpoint enabled. When I look at the consumer offsets in kafka it 
appears to be stagnant and there is a huge lag. But I can see my flink program 
is in pace with kafka source in JMX metrics and outputs. Is there a way to 
identify why the offsets are not committed to kafka?

On which commit was your Kafka connector built? There was a recent change to 
the offset committing for Kafka 0.9 consumer, so identifying the exact commit 
will help clarify whether the recent change introduced any new problems. Also, 
what is your configured checkpoint interval? When checkpointing is enabled, the 
Kafka consumer only commits to Kafka when checkpoints are completed. So, 
offsets in Kafka are not updated until the next checkpoint is triggered.

- In my current application we custom loggers for debugging purposes. Let’s say 
we want to find what’s happening for a particular user, we fire an api request 
to add the custom logger for that particular user and use it for logging along 
the data path. Is there a way to achieve this in flink? Are there any global 
mutable parameters that I can use to achieve this functionality?

I’m not sure if I understand the use case correctly, but it seems like you will 
need to change configuration / behaviour of a specific Flink operator at 
runtime. To my knowledge, the best way to do this in Flink right now is to 
translate your original 

Re: Using Flink

2016-10-04 Thread Till Rohrmann
Hi Govindarajan,

you can broadcast the stream with debug logger information by calling
`stream.broadcast`. Then every stream record should be send to all
sub-tasks of the downstream operator.

Cheers,
Till

On Mon, Oct 3, 2016 at 5:13 PM, Govindarajan Srinivasaraghavan <
govindragh...@gmail.com> wrote:

> Hi Gordon,
>
> - I'm currently running my flink program on 1.2 SNAPSHOT with kafka source
> and I have checkpoint enabled. When I look at the consumer offsets in kafka
> it appears to be stagnant and there is a huge lag. But I can see my flink
> program is in pace with kafka source in JMX metrics and outputs. Is there a
> way to identify why the offsets are not committed to kafka?
>
> On which commit was your Kafka connector built? There was a recent change
> to the offset committing for Kafka 0.9 consumer, so identifying the exact
> commit will help clarify whether the recent change introduced any new
> problems. Also, what is your configured checkpoint interval? When
> checkpointing is enabled, the Kafka consumer only commits to Kafka when
> checkpoints are completed. So, offsets in Kafka are not updated until the
> next checkpoint is triggered.
>
> I am using 1.2-SNAPSHOT
> 'org.apache.flink', name: 'flink-connector-kafka-base_2.11', version:
> '1.2-SNAPSHOT'
> 'org.apache.flink', name: 'flink-connector-kafka-0.9_2.11', version:
> '1.2-SNAPSHOT'
>
> - In my current application we custom loggers for debugging purposes.
> Let’s say we want to find what’s happening for a particular user, we fire
> an api request to add the custom logger for that particular user and use it
> for logging along the data path. Is there a way to achieve this in flink?
> Are there any global mutable parameters that I can use to achieve this
> functionality?
>
> I’m not sure if I understand the use case correctly, but it seems like you
> will need to change configuration / behaviour of a specific Flink operator
> at runtime. To my knowledge, the best way to do this in Flink right now is
> to translate your original logging-trigger api requests to a stream of
> events fed to Flink. This stream of events will then basically be changes
> of your user logger behaviour, and your operators can change its logging
> behaviour according to this stream.
>
> I can send the changes as streams, but I need this change for all the
> operators in my pipeline. Instead of using coflatmap at each operator to
> combine the streams, is there a way to send a change to all the operators?
>
> - Can I pass on state between operators? If I need the state stored on
> previous operators, how can I fetch it?
> I don’t think this is possible.
> Fine, thanks.
>
> Thanks.
>
> On Mon, Oct 3, 2016 at 12:23 AM, Tzu-Li (Gordon) Tai 
> wrote:
>
>> Hi!
>>
>> - I'm currently running my flink program on 1.2 SNAPSHOT with kafka
>> source and I have checkpoint enabled. When I look at the consumer offsets
>> in kafka it appears to be stagnant and there is a huge lag. But I can see
>> my flink program is in pace with kafka source in JMX metrics and outputs.
>> Is there a way to identify why the offsets are not committed to kafka?
>>
>> On which commit was your Kafka connector built? There was a recent change
>> to the offset committing for Kafka 0.9 consumer, so identifying the exact
>> commit will help clarify whether the recent change introduced any new
>> problems. Also, what is your configured checkpoint interval? When
>> checkpointing is enabled, the Kafka consumer only commits to Kafka when
>> checkpoints are completed. So, offsets in Kafka are not updated until the
>> next checkpoint is triggered.
>>
>> - In my current application we custom loggers for debugging purposes.
>> Let’s say we want to find what’s happening for a particular user, we fire
>> an api request to add the custom logger for that particular user and use it
>> for logging along the data path. Is there a way to achieve this in flink?
>> Are there any global mutable parameters that I can use to achieve this
>> functionality?
>>
>> I’m not sure if I understand the use case correctly, but it seems like
>> you will need to change configuration / behaviour of a specific Flink
>> operator at runtime. To my knowledge, the best way to do this in Flink
>> right now is to translate your original logging-trigger api requests to
>> a stream of events fed to Flink. This stream of events will then basically
>> be changes of your user logger behaviour, and your operators can change its
>> logging behaviour according to this stream.
>>
>> - Can I pass on state between operators? If I need the state stored on
>> previous operators, how can I fetch it?
>>
>> I don’t think this is possible.
>>
>>
>> Best Regards,
>> Gordon
>>
>>
>> On October 3, 2016 at 2:08:31 PM, Govindarajan Srinivasaraghavan (
>> govindragh...@gmail.com) wrote:
>>
>> Hi,
>>
>>
>>
>> I have few questions on how I need to model my use case in flink. Please
>> advise. Thanks for the help.
>>
>>
>>
>> - I'm currently running my 

Re: Using Flink and Cassandra with Scala

2016-10-04 Thread Fabian Hueske
FYI: FLINK-4497 [1] requests Scala tuple and case class support for the
Cassandra sink and was opened about a month ago.

[1] https://issues.apache.org/jira/browse/FLINK-4497

2016-09-30 23:14 GMT+02:00 Stephan Ewen :

> How hard would it be to add case class support?
>
> Internally, tuples and case classes are treated quite similar, so I think
> it could be a quite simple extension...
>
> On Fri, Sep 30, 2016 at 4:22 PM, Sanne de Roever <
> sanne.de.roe...@gmail.com> wrote:
>
>> Thanks Chesnay. Have a good weekend.
>>
>> On Thu, Sep 29, 2016 at 5:03 PM, Chesnay Schepler 
>> wrote:
>>
>>> the cassandra sink only supports java tuples and POJO's.
>>>
>>>
>>> On 29.09.2016 16:33, Sanne de Roever wrote:
>>>
 Hi,

 Does the Cassandra sink support Scala and case classes? It looks like
 using Java is at the moment best practice.

 Cheers,

 Sanne

>>>
>>>
>>
>


Re: Using Flink

2016-10-03 Thread Govindarajan Srinivasaraghavan
Hi Gordon,

- I'm currently running my flink program on 1.2 SNAPSHOT with kafka source
and I have checkpoint enabled. When I look at the consumer offsets in kafka
it appears to be stagnant and there is a huge lag. But I can see my flink
program is in pace with kafka source in JMX metrics and outputs. Is there a
way to identify why the offsets are not committed to kafka?

On which commit was your Kafka connector built? There was a recent change
to the offset committing for Kafka 0.9 consumer, so identifying the exact
commit will help clarify whether the recent change introduced any new
problems. Also, what is your configured checkpoint interval? When
checkpointing is enabled, the Kafka consumer only commits to Kafka when
checkpoints are completed. So, offsets in Kafka are not updated until the
next checkpoint is triggered.

I am using 1.2-SNAPSHOT
'org.apache.flink', name: 'flink-connector-kafka-base_2.11', version:
'1.2-SNAPSHOT'
'org.apache.flink', name: 'flink-connector-kafka-0.9_2.11', version:
'1.2-SNAPSHOT'

- In my current application we custom loggers for debugging purposes. Let’s
say we want to find what’s happening for a particular user, we fire an api
request to add the custom logger for that particular user and use it for
logging along the data path. Is there a way to achieve this in flink? Are
there any global mutable parameters that I can use to achieve this
functionality?

I’m not sure if I understand the use case correctly, but it seems like you
will need to change configuration / behaviour of a specific Flink operator
at runtime. To my knowledge, the best way to do this in Flink right now is
to translate your original logging-trigger api requests to a stream of
events fed to Flink. This stream of events will then basically be changes
of your user logger behaviour, and your operators can change its logging
behaviour according to this stream.

I can send the changes as streams, but I need this change for all the
operators in my pipeline. Instead of using coflatmap at each operator to
combine the streams, is there a way to send a change to all the operators?

- Can I pass on state between operators? If I need the state stored on
previous operators, how can I fetch it?
I don’t think this is possible.
Fine, thanks.

Thanks.

On Mon, Oct 3, 2016 at 12:23 AM, Tzu-Li (Gordon) Tai 
wrote:

> Hi!
>
> - I'm currently running my flink program on 1.2 SNAPSHOT with kafka source
> and I have checkpoint enabled. When I look at the consumer offsets in kafka
> it appears to be stagnant and there is a huge lag. But I can see my flink
> program is in pace with kafka source in JMX metrics and outputs. Is there a
> way to identify why the offsets are not committed to kafka?
>
> On which commit was your Kafka connector built? There was a recent change
> to the offset committing for Kafka 0.9 consumer, so identifying the exact
> commit will help clarify whether the recent change introduced any new
> problems. Also, what is your configured checkpoint interval? When
> checkpointing is enabled, the Kafka consumer only commits to Kafka when
> checkpoints are completed. So, offsets in Kafka are not updated until the
> next checkpoint is triggered.
>
> - In my current application we custom loggers for debugging purposes.
> Let’s say we want to find what’s happening for a particular user, we fire
> an api request to add the custom logger for that particular user and use it
> for logging along the data path. Is there a way to achieve this in flink?
> Are there any global mutable parameters that I can use to achieve this
> functionality?
>
> I’m not sure if I understand the use case correctly, but it seems like you
> will need to change configuration / behaviour of a specific Flink operator
> at runtime. To my knowledge, the best way to do this in Flink right now is
> to translate your original logging-trigger api requests to a stream of
> events fed to Flink. This stream of events will then basically be changes
> of your user logger behaviour, and your operators can change its logging
> behaviour according to this stream.
>
> - Can I pass on state between operators? If I need the state stored on
> previous operators, how can I fetch it?
>
> I don’t think this is possible.
>
>
> Best Regards,
> Gordon
>
>
> On October 3, 2016 at 2:08:31 PM, Govindarajan Srinivasaraghavan (
> govindragh...@gmail.com) wrote:
>
> Hi,
>
>
>
> I have few questions on how I need to model my use case in flink. Please
> advise. Thanks for the help.
>
>
>
> - I'm currently running my flink program on 1.2 SNAPSHOT with kafka source
> and I have checkpoint enabled. When I look at the consumer offsets in kafka
> it appears to be stagnant and there is a huge lag. But I can see my flink
> program is in pace with kafka source in JMX metrics and outputs. Is there a
> way to identify why the offsets are not committed to kafka?
>
>
>
> - In my current application we custom loggers for debugging purposes.
> Let’s say we want to find 

Re: Using Flink

2016-10-03 Thread Tzu-Li (Gordon) Tai
Hi!

- I'm currently running my flink program on 1.2 SNAPSHOT with kafka source and 
I have checkpoint enabled. When I look at the consumer offsets in kafka it 
appears to be stagnant and there is a huge lag. But I can see my flink program 
is in pace with kafka source in JMX metrics and outputs. Is there a way to 
identify why the offsets are not committed to kafka?

On which commit was your Kafka connector built? There was a recent change to 
the offset committing for Kafka 0.9 consumer, so identifying the exact commit 
will help clarify whether the recent change introduced any new problems. Also, 
what is your configured checkpoint interval? When checkpointing is enabled, the 
Kafka consumer only commits to Kafka when checkpoints are completed. So, 
offsets in Kafka are not updated until the next checkpoint is triggered.

- In my current application we custom loggers for debugging purposes. Let’s say 
we want to find what’s happening for a particular user, we fire an api request 
to add the custom logger for that particular user and use it for logging along 
the data path. Is there a way to achieve this in flink? Are there any global 
mutable parameters that I can use to achieve this functionality?

I’m not sure if I understand the use case correctly, but it seems like you will 
need to change configuration / behaviour of a specific Flink operator at 
runtime. To my knowledge, the best way to do this in Flink right now is to 
translate your original logging-trigger api requests to a stream of events fed 
to Flink. This stream of events will then basically be changes of your user 
logger behaviour, and your operators can change its logging behaviour according 
to this stream.

- Can I pass on state between operators? If I need the state stored on previous 
operators, how can I fetch it?

I don’t think this is possible.


Best Regards,
Gordon


On October 3, 2016 at 2:08:31 PM, Govindarajan Srinivasaraghavan 
(govindragh...@gmail.com) wrote:

Hi,

 

I have few questions on how I need to model my use case in flink. Please 
advise. Thanks for the help.

 

- I'm currently running my flink program on 1.2 SNAPSHOT with kafka source and 
I have checkpoint enabled. When I look at the consumer offsets in kafka it 
appears to be stagnant and there is a huge lag. But I can see my flink program 
is in pace with kafka source in JMX metrics and outputs. Is there a way to 
identify why the offsets are not committed to kafka?

 

- In my current application we custom loggers for debugging purposes. Let’s say 
we want to find what’s happening for a particular user, we fire an api request 
to add the custom logger for that particular user and use it for logging along 
the data path. Is there a way to achieve this in flink? Are there any global 
mutable parameters that I can use to achieve this functionality?

 

- Can I pass on state between operators? If I need the state stored on previous 
operators, how can I fetch it?

 

Thanks

Re: Using Flink and Cassandra with Scala

2016-09-30 Thread Stephan Ewen
How hard would it be to add case class support?

Internally, tuples and case classes are treated quite similar, so I think
it could be a quite simple extension...

On Fri, Sep 30, 2016 at 4:22 PM, Sanne de Roever 
wrote:

> Thanks Chesnay. Have a good weekend.
>
> On Thu, Sep 29, 2016 at 5:03 PM, Chesnay Schepler 
> wrote:
>
>> the cassandra sink only supports java tuples and POJO's.
>>
>>
>> On 29.09.2016 16:33, Sanne de Roever wrote:
>>
>>> Hi,
>>>
>>> Does the Cassandra sink support Scala and case classes? It looks like
>>> using Java is at the moment best practice.
>>>
>>> Cheers,
>>>
>>> Sanne
>>>
>>
>>
>


Re: Using Flink and Cassandra with Scala

2016-09-30 Thread Sanne de Roever
Thanks Chesnay. Have a good weekend.

On Thu, Sep 29, 2016 at 5:03 PM, Chesnay Schepler 
wrote:

> the cassandra sink only supports java tuples and POJO's.
>
>
> On 29.09.2016 16:33, Sanne de Roever wrote:
>
>> Hi,
>>
>> Does the Cassandra sink support Scala and case classes? It looks like
>> using Java is at the moment best practice.
>>
>> Cheers,
>>
>> Sanne
>>
>
>


Re: Using Flink watermarks and a large window state for scheduling

2016-06-09 Thread Josh
Ok, thanks Aljoscha.

As an alternative to using Flink to maintain the schedule state, I could
take the (e, t2) stream and write to a external key-value store with a
bucket for each minute. Then have a separate service which polls the
key-value store every minute and retrieves the current bucket, and does the
final transformation.

I just thought there might be a nicer way to do it using Flink!

On Thu, Jun 9, 2016 at 2:23 PM, Aljoscha Krettek 
wrote:

> Hi Josh,
> I'll have to think a bit about that one. Once I have something I'll get
> back to you.
>
> Best,
> Aljoscha
>
> On Wed, 8 Jun 2016 at 21:47 Josh  wrote:
>
>> This is just a question about a potential use case for Flink:
>>
>> I have a Flink job which receives tuples with an event id and a timestamp
>> (e, t) and maps them into a stream (e, t2) where t2 is a future timestamp
>> (up to 1 year in the future, which indicates when to schedule a
>> transformation of e). I then want to key by e and keep track of the max t2
>> for each e. Now the tricky bit: I want to periodically, say every minute
>> (in event time world) take all (e, t2) where t2 occurred in the last
>> minute, do a transformation and emit the result. It is important that the
>> final transformation happens after t2 (preferably as soon as possible, but
>> a delay of minutes is fine).
>>
>> Is it possible to use Flink's windowing and watermark mechanics to
>> achieve this? I want to maintain a large state for the (e, t2) window, e.g.
>> over a year (probably too large to fit in memory). And somehow use
>> watermarks to execute the scheduled transformations.
>>
>> If anyone has any views on how this could be done, (or whether it's even
>> possible/a good idea to do) with Flink then it would be great to hear!
>>
>> Thanks,
>>
>> Josh
>>
>


Re: Using Flink watermarks and a large window state for scheduling

2016-06-09 Thread Aljoscha Krettek
Hi Josh,
I'll have to think a bit about that one. Once I have something I'll get
back to you.

Best,
Aljoscha

On Wed, 8 Jun 2016 at 21:47 Josh  wrote:

> This is just a question about a potential use case for Flink:
>
> I have a Flink job which receives tuples with an event id and a timestamp
> (e, t) and maps them into a stream (e, t2) where t2 is a future timestamp
> (up to 1 year in the future, which indicates when to schedule a
> transformation of e). I then want to key by e and keep track of the max t2
> for each e. Now the tricky bit: I want to periodically, say every minute
> (in event time world) take all (e, t2) where t2 occurred in the last
> minute, do a transformation and emit the result. It is important that the
> final transformation happens after t2 (preferably as soon as possible, but
> a delay of minutes is fine).
>
> Is it possible to use Flink's windowing and watermark mechanics to achieve
> this? I want to maintain a large state for the (e, t2) window, e.g. over a
> year (probably too large to fit in memory). And somehow use watermarks to
> execute the scheduled transformations.
>
> If anyone has any views on how this could be done, (or whether it's even
> possible/a good idea to do) with Flink then it would be great to hear!
>
> Thanks,
>
> Josh
>


Re: Using Flink with Scala 2.11 and Java 8

2015-12-10 Thread Maximilian Michels
Hi Cory,

The issue has been fixed in the master and the latest Maven snapshot.
https://issues.apache.org/jira/browse/FLINK-3143

Cheers,
Max

On Tue, Dec 8, 2015 at 12:35 PM, Maximilian Michels  wrote:
> Thanks for the stack trace, Cory. Looks like you were on the right
> path with the Spark issue. We will file an issue and correct it soon.
>
> Thanks,
> Max
>
> On Mon, Dec 7, 2015 at 8:20 PM, Stephan Ewen  wrote:
>> Sorry, correcting myself:
>>
>> The ClosureCleaner uses Kryo's bundled ASM 4 without any reason - simply
>> adjusting the imports to use the common ASM (which is 5.0) should do it ;-)
>>
>> On Mon, Dec 7, 2015 at 8:18 PM, Stephan Ewen  wrote:
>>>
>>> Flink's own asm is 5.0, but the Kryo version used in Flink bundles
>>> reflectasm with a dedicated asm version 4 (no lambdas supported).
>>>
>>> Might be as simple as bumping the kryo version...
>>>
>>>
>>>
>>> On Mon, Dec 7, 2015 at 7:59 PM, Cory Monty 
>>> wrote:

 Thanks, Max.

 Here is the stack trace I receive:

 java.lang.IllegalArgumentException:
 at
 com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.ClassReader.(Unknown
 Source)
 at
 com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.ClassReader.(Unknown
 Source)
 at
 com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.ClassReader.(Unknown
 Source)
 at
 org.apache.flink.api.scala.ClosureCleaner$.org$apache$flink$api$scala$ClosureCleaner$$getClassReader(ClosureCleaner.scala:47)
 at
 org.apache.flink.api.scala.ClosureCleaner$.getInnerClasses(ClosureCleaner.scala:90)
 at
 org.apache.flink.api.scala.ClosureCleaner$.clean(ClosureCleaner.scala:113)
 at
 org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.scalaClean(StreamExecutionEnvironment.scala:555)
 at
 org.apache.flink.streaming.api.scala.DataStream.clean(DataStream.scala:764)
 at
 org.apache.flink.streaming.api.scala.DataStream.flatMap(DataStream.scala:473)

 On Mon, Dec 7, 2015 at 11:58 AM, Maximilian Michels 
 wrote:
>
> For completeness, could you provide a stack trace of the error message?
>
> On Mon, Dec 7, 2015 at 6:56 PM, Maximilian Michels 
> wrote:
> > Hi Cory,
> >
> > Thanks for reporting the issue. Scala should run independently of the
> > Java version. We are already using ASM version 5.0.4. However, some
> > code uses the ASM4 op codes which don't seem to be work with Java 8.
> > This needs to be fixed. I'm filing a JIRA.
> >
> > Cheers,
> > Max
> >
> > On Mon, Dec 7, 2015 at 4:15 PM, Cory Monty
> >  wrote:
> >> Is it possible to use Scala 2.11 and Java 8?
> >>
> >> I'm able to get our project to compile correctly, however there are
> >> runtime
> >> errors with the Reflectasm library (I'm guessing due to Kyro). I
> >> looked into
> >> the error and it seems Spark had the same issue
> >> (https://issues.apache.org/jira/browse/SPARK-6152,
> >> https://github.com/EsotericSoftware/reflectasm/issues/35) because of
> >> an
> >> outdated version of Kyro.
> >>
> >> I'm also unsure if maybe we have to build Flink with Scala 2.11
> >>
> >> (https://ci.apache.org/projects/flink/flink-docs-release-0.10/setup/building.html)
> >> in order to run Flink correctly with Java 8.
> >>
> >> Cheers,
> >>
> >> Cory


>>>
>>


Re: Using Flink with Scala 2.11 and Java 8

2015-12-10 Thread Cory Monty
Thanks!

On Thu, Dec 10, 2015 at 12:32 PM, Maximilian Michels  wrote:

> Hi Cory,
>
> The issue has been fixed in the master and the latest Maven snapshot.
> https://issues.apache.org/jira/browse/FLINK-3143
>
> Cheers,
> Max
>
> On Tue, Dec 8, 2015 at 12:35 PM, Maximilian Michels 
> wrote:
> > Thanks for the stack trace, Cory. Looks like you were on the right
> > path with the Spark issue. We will file an issue and correct it soon.
> >
> > Thanks,
> > Max
> >
> > On Mon, Dec 7, 2015 at 8:20 PM, Stephan Ewen  wrote:
> >> Sorry, correcting myself:
> >>
> >> The ClosureCleaner uses Kryo's bundled ASM 4 without any reason - simply
> >> adjusting the imports to use the common ASM (which is 5.0) should do it
> ;-)
> >>
> >> On Mon, Dec 7, 2015 at 8:18 PM, Stephan Ewen  wrote:
> >>>
> >>> Flink's own asm is 5.0, but the Kryo version used in Flink bundles
> >>> reflectasm with a dedicated asm version 4 (no lambdas supported).
> >>>
> >>> Might be as simple as bumping the kryo version...
> >>>
> >>>
> >>>
> >>> On Mon, Dec 7, 2015 at 7:59 PM, Cory Monty <
> cory.mo...@getbraintree.com>
> >>> wrote:
> 
>  Thanks, Max.
> 
>  Here is the stack trace I receive:
> 
>  java.lang.IllegalArgumentException:
>  at
> 
> com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.ClassReader.(Unknown
>  Source)
>  at
> 
> com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.ClassReader.(Unknown
>  Source)
>  at
> 
> com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.ClassReader.(Unknown
>  Source)
>  at
> 
> org.apache.flink.api.scala.ClosureCleaner$.org$apache$flink$api$scala$ClosureCleaner$$getClassReader(ClosureCleaner.scala:47)
>  at
> 
> org.apache.flink.api.scala.ClosureCleaner$.getInnerClasses(ClosureCleaner.scala:90)
>  at
> 
> org.apache.flink.api.scala.ClosureCleaner$.clean(ClosureCleaner.scala:113)
>  at
> 
> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.scalaClean(StreamExecutionEnvironment.scala:555)
>  at
> 
> org.apache.flink.streaming.api.scala.DataStream.clean(DataStream.scala:764)
>  at
> 
> org.apache.flink.streaming.api.scala.DataStream.flatMap(DataStream.scala:473)
> 
>  On Mon, Dec 7, 2015 at 11:58 AM, Maximilian Michels 
>  wrote:
> >
> > For completeness, could you provide a stack trace of the error
> message?
> >
> > On Mon, Dec 7, 2015 at 6:56 PM, Maximilian Michels 
> > wrote:
> > > Hi Cory,
> > >
> > > Thanks for reporting the issue. Scala should run independently of
> the
> > > Java version. We are already using ASM version 5.0.4. However, some
> > > code uses the ASM4 op codes which don't seem to be work with Java
> 8.
> > > This needs to be fixed. I'm filing a JIRA.
> > >
> > > Cheers,
> > > Max
> > >
> > > On Mon, Dec 7, 2015 at 4:15 PM, Cory Monty
> > >  wrote:
> > >> Is it possible to use Scala 2.11 and Java 8?
> > >>
> > >> I'm able to get our project to compile correctly, however there
> are
> > >> runtime
> > >> errors with the Reflectasm library (I'm guessing due to Kyro). I
> > >> looked into
> > >> the error and it seems Spark had the same issue
> > >> (https://issues.apache.org/jira/browse/SPARK-6152,
> > >> https://github.com/EsotericSoftware/reflectasm/issues/35)
> because of
> > >> an
> > >> outdated version of Kyro.
> > >>
> > >> I'm also unsure if maybe we have to build Flink with Scala 2.11
> > >>
> > >> (
> https://ci.apache.org/projects/flink/flink-docs-release-0.10/setup/building.html
> )
> > >> in order to run Flink correctly with Java 8.
> > >>
> > >> Cheers,
> > >>
> > >> Cory
> 
> 
> >>>
> >>
>


Re: Using Flink with Scala 2.11 and Java 8

2015-12-07 Thread Maximilian Michels
Hi Cory,

Thanks for reporting the issue. Scala should run independently of the
Java version. We are already using ASM version 5.0.4. However, some
code uses the ASM4 op codes which don't seem to be work with Java 8.
This needs to be fixed. I'm filing a JIRA.

Cheers,
Max

On Mon, Dec 7, 2015 at 4:15 PM, Cory Monty  wrote:
> Is it possible to use Scala 2.11 and Java 8?
>
> I'm able to get our project to compile correctly, however there are runtime
> errors with the Reflectasm library (I'm guessing due to Kyro). I looked into
> the error and it seems Spark had the same issue
> (https://issues.apache.org/jira/browse/SPARK-6152,
> https://github.com/EsotericSoftware/reflectasm/issues/35) because of an
> outdated version of Kyro.
>
> I'm also unsure if maybe we have to build Flink with Scala 2.11
> (https://ci.apache.org/projects/flink/flink-docs-release-0.10/setup/building.html)
> in order to run Flink correctly with Java 8.
>
> Cheers,
>
> Cory


Re: Using Flink with Scala 2.11 and Java 8

2015-12-07 Thread Stephan Ewen
Sorry, correcting myself:

The ClosureCleaner uses Kryo's bundled ASM 4 without any reason - simply
adjusting the imports to use the common ASM (which is 5.0) should do it ;-)

On Mon, Dec 7, 2015 at 8:18 PM, Stephan Ewen  wrote:

> Flink's own asm is 5.0, but the Kryo version used in Flink bundles
> reflectasm with a dedicated asm version 4 (no lambdas supported).
>
> Might be as simple as bumping the kryo version...
>
>
>
> On Mon, Dec 7, 2015 at 7:59 PM, Cory Monty 
> wrote:
>
>> Thanks, Max.
>>
>> Here is the stack trace I receive:
>>
>> java.lang.IllegalArgumentException:
>>
>> at
>> com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.ClassReader.(Unknown
>> Source)
>> at
>> com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.ClassReader.(Unknown
>> Source)
>> at
>> com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.ClassReader.(Unknown
>> Source)
>> at
>> org.apache.flink.api.scala.ClosureCleaner$.org$apache$flink$api$scala$ClosureCleaner$$getClassReader(ClosureCleaner.scala:47)
>> at
>> org.apache.flink.api.scala.ClosureCleaner$.getInnerClasses(ClosureCleaner.scala:90)
>>
>> at
>> org.apache.flink.api.scala.ClosureCleaner$.clean(ClosureCleaner.scala:113)
>>
>> at
>> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.scalaClean(StreamExecutionEnvironment.scala:555)
>>
>> at
>> org.apache.flink.streaming.api.scala.DataStream.clean(DataStream.scala:764)
>>
>> at
>> org.apache.flink.streaming.api.scala.DataStream.flatMap(DataStream.scala:473)
>>
>>
>> On Mon, Dec 7, 2015 at 11:58 AM, Maximilian Michels 
>> wrote:
>>
>>> For completeness, could you provide a stack trace of the error message?
>>>
>>> On Mon, Dec 7, 2015 at 6:56 PM, Maximilian Michels 
>>> wrote:
>>> > Hi Cory,
>>> >
>>> > Thanks for reporting the issue. Scala should run independently of the
>>> > Java version. We are already using ASM version 5.0.4. However, some
>>> > code uses the ASM4 op codes which don't seem to be work with Java 8.
>>> > This needs to be fixed. I'm filing a JIRA.
>>> >
>>> > Cheers,
>>> > Max
>>> >
>>> > On Mon, Dec 7, 2015 at 4:15 PM, Cory Monty <
>>> cory.mo...@getbraintree.com> wrote:
>>> >> Is it possible to use Scala 2.11 and Java 8?
>>> >>
>>> >> I'm able to get our project to compile correctly, however there are
>>> runtime
>>> >> errors with the Reflectasm library (I'm guessing due to Kyro). I
>>> looked into
>>> >> the error and it seems Spark had the same issue
>>> >> (https://issues.apache.org/jira/browse/SPARK-6152,
>>> >> https://github.com/EsotericSoftware/reflectasm/issues/35) because of
>>> an
>>> >> outdated version of Kyro.
>>> >>
>>> >> I'm also unsure if maybe we have to build Flink with Scala 2.11
>>> >> (
>>> https://ci.apache.org/projects/flink/flink-docs-release-0.10/setup/building.html
>>> )
>>> >> in order to run Flink correctly with Java 8.
>>> >>
>>> >> Cheers,
>>> >>
>>> >> Cory
>>>
>>
>>
>


Re: Using Flink with Scala 2.11 and Java 8

2015-12-07 Thread Maximilian Michels
For completeness, could you provide a stack trace of the error message?

On Mon, Dec 7, 2015 at 6:56 PM, Maximilian Michels  wrote:
> Hi Cory,
>
> Thanks for reporting the issue. Scala should run independently of the
> Java version. We are already using ASM version 5.0.4. However, some
> code uses the ASM4 op codes which don't seem to be work with Java 8.
> This needs to be fixed. I'm filing a JIRA.
>
> Cheers,
> Max
>
> On Mon, Dec 7, 2015 at 4:15 PM, Cory Monty  
> wrote:
>> Is it possible to use Scala 2.11 and Java 8?
>>
>> I'm able to get our project to compile correctly, however there are runtime
>> errors with the Reflectasm library (I'm guessing due to Kyro). I looked into
>> the error and it seems Spark had the same issue
>> (https://issues.apache.org/jira/browse/SPARK-6152,
>> https://github.com/EsotericSoftware/reflectasm/issues/35) because of an
>> outdated version of Kyro.
>>
>> I'm also unsure if maybe we have to build Flink with Scala 2.11
>> (https://ci.apache.org/projects/flink/flink-docs-release-0.10/setup/building.html)
>> in order to run Flink correctly with Java 8.
>>
>> Cheers,
>>
>> Cory


Re: Using Flink with Redis question

2015-09-04 Thread Márton Balassi
Hey Jerry,

Jay is on the right track, this issue has to do with the Flink operator
life cycle. When you hit execute all your user defined classes get
serialized, so that they can be shipped to the workers on the cluster. To
execute some code before your FlatMapFunction starts processing the data
you can use the open() function of the RichFlatMapFunction, thus enabling
you to make the Jedis attribute transient:

public static class RedisJoinBolt implements
RichFlatMapFunction
, Tuple6> {
 private transient Jedis jedis;

private Jedis jedisServer;

private HashMap ad_to_campaign;

public RedisJoinBolt(String jedisServer) {
//initialize jedis
this.jedisServer = jedisServer;
}

@Override
public void open(Configuration parameters) {
  //initialize jedis
  this.jedis = new Jedis(jedisServer);
 }


@Override
public void flatMap(Tuple5 input,
Collector> out)
throws Exception
{


On Fri, Sep 4, 2015 at 8:11 PM, Jay Vyas 
wrote:

> Maybe wrapping Jedis with a serializable class will do the trick?
>
> But in general is there a way to reference jar classes  in flink apps
> without serializable them?
>
>
> On Sep 4, 2015, at 1:36 PM, Jerry Peng 
> wrote:
>
> Hello,
>
> So I am trying to use jedis (redis java client) with Flink streaming api,
> but I get an exception:
>
> org.apache.flink.client.program.ProgramInvocationException: The main
> method caused an error.
>
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:452)
>
> at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:353)
>
> at org.apache.flink.client.program.Client.run(Client.java:278)
>
> at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:631)
>
> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:319)
>
> at
> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:954)
>
> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1004)
>
> Caused by: org.apache.flink.api.common.InvalidProgramException: Object
> flink.benchmark.AdvertisingTopologyNative$RedisJoinBolt@21e360a not
> serializable
>
> at
> org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:97)
>
> at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:59)
>
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1320)
>
> at
> org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:144)
>
> at
> org.apache.flink.streaming.api.datastream.DataStream.flatMap(DataStream.java:624)
>
> at
> flink.benchmark.AdvertisingTopologyNative.main(AdvertisingTopologyNative.java:50)
>
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>
> at java.lang.reflect.Method.invoke(Method.java:483)
>
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:437)
>
> ... 6 more
>
> Caused by: java.io.NotSerializableException: redis.clients.jedis.Jedis
>
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
>
> at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>
> at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>
> at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>
> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
>
> at
> org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:306)
>
> at
> org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:95)
>
> ... 16 more
>
>
>
>
>
> so my code I am using:
>
>
> public static class RedisJoinBolt implements FlatMapFunction String,String,String,String>
> , Tuple6> {
>  private Jedis jedis;
>  private HashMap ad_to_campaign;
>
>  public RedisJoinBolt(String jedisServer) {
>   //initialize jedis
>   this.jedis = new Jedis(jedisServer);
>  }
>
>  @Override
>  public void flatMap(Tuple5 input,
>Collector> out) 
> throws Exception {
>
> .
>
> .
>
> .
>
>
> Any one know a fix for this?
>
>


Re: Using Flink with Redis question

2015-09-04 Thread Jay Vyas
Maybe wrapping Jedis with a serializable class will do the trick?

But in general is there a way to reference jar classes  in flink apps without 
serializable them?


> On Sep 4, 2015, at 1:36 PM, Jerry Peng  wrote:
> 
> Hello,
> 
> So I am trying to use jedis (redis java client) with Flink streaming api, but 
> I get an exception:
> 
> org.apache.flink.client.program.ProgramInvocationException: The main method 
> caused an error.
>   at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:452)
>   at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:353)
>   at org.apache.flink.client.program.Client.run(Client.java:278)
>   at 
> org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:631)
>   at org.apache.flink.client.CliFrontend.run(CliFrontend.java:319)
>   at 
> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:954)
>   at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1004)
> Caused by: org.apache.flink.api.common.InvalidProgramException: Object 
> flink.benchmark.AdvertisingTopologyNative$RedisJoinBolt@21e360a not 
> serializable
>   at 
> org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:97)
>   at 
> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:59)
>   at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1320)
>   at 
> org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:144)
>   at 
> org.apache.flink.streaming.api.datastream.DataStream.flatMap(DataStream.java:624)
>   at 
> flink.benchmark.AdvertisingTopologyNative.main(AdvertisingTopologyNative.java:50)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:483)
>   at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:437)
>   ... 6 more
> Caused by: java.io.NotSerializableException: redis.clients.jedis.Jedis
>   at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
>   at 
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>   at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>   at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>   at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>   at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
>   at 
> org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:306)
>   at 
> org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:95)
>   ... 16 more
> 
> 
> 
> 
> so my code I am using: 
> 
> public static class RedisJoinBolt implements FlatMapFunction String,String,String,String>
> , Tuple6> {
>  private Jedis jedis;
>  private HashMap ad_to_campaign;
> 
>  public RedisJoinBolt(String jedisServer) {
>   //initialize jedis
>   this.jedis = new Jedis(jedisServer);
>  }
> 
>  @Override
>  public void flatMap(Tuple5 input,
>Collector> out) 
> throws Exception {
> .
> .
> .
> 
> Any one know a fix for this?