Re:Custom Metrics not showing in prometheus

2023-09-18 Thread Matt Wang
Hi Regards


1. You can confirm whether other metrics can be seen in prometheus and whether 
the metrics reporter is worked;
2. Confirm whether the metrics filter logic is configured[1]. If there is 
filter logic, the reported metrics will also be affected;
3. You can check the TaskManager log to see if there is any error information 
about the metrics;



[1] 
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#metrics-reporter-%3Cname%3E-filter-excludes


--

Best,
Matt Wang


 Replied Message 
| From | patricia lee |
| Date | 09/18/2023 16:58 |
| To |  |
| Subject | Custom Metrics not showing in prometheus |
Hi,



I have created a counter of records in my RichSinkFunction


myCounter.inc()


I can see the value exists in the job manager web ui > running jobs > sink 
function > task > metrics. However, I am not available to see it in my 
prometheus web ui.


I am running docker flink in my local as well as prometheus. The flink 
jobmanager and task manager are in UP status in prometheus targets, but 
checking the list of metrics from the explorer "myCounter" custom metrics 
cannot be found.


metrics.reporter.prom.factory.class: 
org.apache.flink.metrics.prometheus.PrometheusReporterFactory
metrics.reporter.prom.port:9250-9260
job_name: "jobmanager"
static_configs:
- targets: ["jobmanager:9250"]
job_name: "taskmanager"
static_configs:
- targets: ["taskmanager:9250"
Is there any configuration/steps I missed? Thanks


Regards,
Pat

Re: How to write custom serializer for dynamodb connector

2022-11-08 Thread Matt Fysh
Thanks Hong, I moved the AttributeValue creation into the ElementConverter
and it started working without any custom serde work!

The reason for creating AttributeValue instances in a previous operator is
that I was closely following the example code:
https://github.com/apache/flink-connector-aws/blob/main/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/examples/SinkIntoDynamoDb.java

Thank you again for your help and sharing those resources.

Cheers,
Matt.


On Wed, 9 Nov 2022 at 03:51, Teoh, Hong  wrote:

> Hi Matt,
>
>
>
> First of all, awesome that you are using the DynamoDB sink!
>
>
>
> To resolve your issue with serialization in the DDB sink, you are right,
> the issue only happens when you create the AttributeValue object in a
> previous operator and send it to the sink.
>
> The issue here is with serializing of ImmutableMap. Kryo tries to call the
> put(), which is unsupported since its immutable, so you can register a
> specific serializer for it. Like below:
>
>
>
> env.getConfig().registerTypeWithKryoSerializer(ImmutableMap.class,
> ImmutableMapSerializer.class);
>
>
>
> You can get ImmutableMapSerializer.class from a pre-package library like
> this: https://github.com/magro/kryo-serializers
>
> Just add the following to your pom.xml
>
>
>
> 
>
> de.javakaffee
>
> kryo-serializers
>
> 0.45
>
> 
>
>
>
> Regarding resources, I find the following helpful:
>
>- Article on serialization
>- The FlinkForward youtube channel has a couple of useful deep dives
>on Flink in general :
>https://www.youtube.com/channel/UCY8_lgiZLZErZPF47a2hXMA/playlists
>
>
>
> Hope the above helps.
>
>
>
>
>
> A more general question on your use case, what is the reason you want to
> generate the AttributeValue in a previous operator rather than in the sink
> directly? Is it for some dynamic generation of objects to write into DDB?
>
>
>
> Regards,
>
> Hong
>
>
>
>
>
> *From: *Matt Fysh 
> *Date: *Tuesday, 8 November 2022 at 14:04
> *To: *User 
> *Subject: *[EXTERNAL] How to write custom serializer for dynamodb
> connector
>
>
>
> *CAUTION*: This email originated from outside of the organization. Do not
> click links or open attachments unless you can confirm the sender and know
> the content is safe.
>
>
>
> I'm attempting to use the dynamodb sink located at
> https://github.com/apache/flink-connector-aws
>
>
>
> The example
> <https://github.com/apache/flink-connector-aws/blob/main/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/examples/SinkIntoDynamoDb.java>
> in the repo is working as expected, however when I try to create a nested
> data structure, I receive a Kryo serialization error message:
>
>
>
> Caused by: com.esotericsoftware.kryo.KryoException:
> java.lang.UnsupportedOperationException
>
> Serialization trace:
>
> m (software.amazon.awssdk.services.dynamodb.model.AttributeValue)
>
> at
> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
>
>
>
> The value that cannot be serialized is produced by this code:
>
> import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
>
>
>
> AttributeValue.builder().m(
>
>   ImmutableMap.of(
>
> "innerkey", AttributeValue.builder().s("innervalue").build()
>
>   )
>
> ).build();
>
>
>
> There are tests in the connector repo
> <https://github.com/apache/flink-connector-aws/blob/3798aabfcc6f78645bf3d7255dfd6c336cd497f0/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/util/DynamoDbSerializationUtilTest.java#L70-L84>
> for nested map structures, but they do not test that the structure can be
> ser/de by Flink, which I believe occurs when the operator that produces the
> value is separate to the sink operator.
>
>
>
> Given that this is a fairly simple data type, I should be able to register
> a custom serializer with Flink, but since I'm new to java I'm having
> trouble making sense of the docs
> <https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/datastream/fault-tolerance/serialization/types_serialization/>
> and was hoping to find someone more knowledgeable in this area for some
> pointers on what else I could start reading
>
>
>
> Thanks
>
> Matt
>


How to write custom serializer for dynamodb connector

2022-11-08 Thread Matt Fysh
I'm attempting to use the dynamodb sink located at
https://github.com/apache/flink-connector-aws

The example
<https://github.com/apache/flink-connector-aws/blob/main/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/examples/SinkIntoDynamoDb.java>
in the repo is working as expected, however when I try to create a nested
data structure, I receive a Kryo serialization error message:

Caused by: com.esotericsoftware.kryo.KryoException:
java.lang.UnsupportedOperationException
Serialization trace:
m (software.amazon.awssdk.services.dynamodb.model.AttributeValue)
at
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)


The value that cannot be serialized is produced by this code:

import software.amazon.awssdk.services.dynamodb.model.AttributeValue;

AttributeValue.builder().m(
  ImmutableMap.of(
"innerkey", AttributeValue.builder().s("innervalue").build()
  )
).build();


There are tests in the connector repo
<https://github.com/apache/flink-connector-aws/blob/3798aabfcc6f78645bf3d7255dfd6c336cd497f0/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/util/DynamoDbSerializationUtilTest.java#L70-L84>
for nested map structures, but they do not test that the structure can be
ser/de by Flink, which I believe occurs when the operator that produces the
value is separate to the sink operator.

Given that this is a fairly simple data type, I should be able to register
a custom serializer with Flink, but since I'm new to java I'm having
trouble making sense of the docs
<https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/datastream/fault-tolerance/serialization/types_serialization/>
and was hoping to find someone more knowledgeable in this area for some
pointers on what else I could start reading

Thanks
Matt


Re: Kinesis Connector does not work

2022-11-08 Thread Matt Fysh
Ok thanks, will give that a try. Is that something that should be added to
the Kinesis connector docs page? There are existing instructions there for
adding the flink-connector-kinesis jar as a dependency, but no instructions
for adding commons-logging

Or if this is something more general, it might be something to talk about
in the Python section of the docs because most Python users are not going
to understand the interplay between Java classes.

On Tue, 8 Nov 2022 at 18:50, Chesnay Schepler  wrote:

> Said dependency (on commons-logging) is not meant to be provided by the
> docker image, but bundled in your user-jar (along with the connector).
>
> On 08/11/2022 02:14, Matt Fysh wrote:
> > Hi, I'm following the kinesis connector instructions as documented
> > here:
> >
> https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/connectors/datastream/kinesis/
> >
> >
> > I'm also running Flink in standalone session mode using docker compose
> > and the Python images, as described in the Flink docs (Deployment
> section)
> >
> > When I try to run a basic datastream.print() / env.execute() example
> > with a kinesis source, I get the following error. From my limited
> > understanding of Java, it seems the Kinesis connector is using a
> > shaded version of the AWS Java SDK, and that older version of the SDK
> > is trying to load a class that is no longer present in the 1.16.0
> > Flink docker images. Is there a workaround for this? Thanks
> >
> > Caused by: java.lang.NoClassDefFoundError:
> > org/apache/commons/logging/LogFactory
> > at
> >
> org.apache.flink.kinesis.shaded.com.amazonaws.ClientConfiguration.(ClientConfiguration.java:47)
> > at
> >
> org.apache.flink.kinesis.shaded.com.amazonaws.ClientConfigurationFactory.getDefaultConfig(ClientConfigurationFactory.java:46)
> > at
> >
> org.apache.flink.kinesis.shaded.com.amazonaws.ClientConfigurationFactory.getConfig(ClientConfigurationFactory.java:36)
> > at
> >
> org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.createKinesisClient(KinesisProxy.java:268)
> > at
> >
> org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.(KinesisProxy.java:152)
>
>
>


Kinesis Connector does not work

2022-11-07 Thread Matt Fysh
Hi, I'm following the kinesis connector instructions as documented here:
https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/connectors/datastream/kinesis/

I'm also running Flink in standalone session mode using docker compose and
the Python images, as described in the Flink docs (Deployment section)

When I try to run a basic datastream.print() / env.execute() example with a
kinesis source, I get the following error. From my limited understanding of
Java, it seems the Kinesis connector is using a shaded version of the AWS
Java SDK, and that older version of the SDK is trying to load a class that
is no longer present in the 1.16.0 Flink docker images. Is there a
workaround for this? Thanks

Caused by: java.lang.NoClassDefFoundError:
org/apache/commons/logging/LogFactory
at
org.apache.flink.kinesis.shaded.com.amazonaws.ClientConfiguration.(ClientConfiguration.java:47)
at
org.apache.flink.kinesis.shaded.com.amazonaws.ClientConfigurationFactory.getDefaultConfig(ClientConfigurationFactory.java:46)
at
org.apache.flink.kinesis.shaded.com.amazonaws.ClientConfigurationFactory.getConfig(ClientConfigurationFactory.java:36)
at
org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.createKinesisClient(KinesisProxy.java:268)
at
org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.(KinesisProxy.java:152)


Re: OutOfMemoryError (java heap space) on small, local test

2022-10-31 Thread Matt Fysh
Thanks Leonard for taking a look. It seems odd that returning a list of
objects can cause a fatal error such as this, and since I am new to Flink
and also relatively new to Python, I assume that I am doing
something wrong as returning a list of objects is a fairly common data
modelling scenario.

Please let me know which sections of the docs, or which areas of Python, I
should read to learn how to find a solution to this problem

Thanks

On Mon, 31 Oct 2022 at 18:49, Leonard Xu  wrote:

> Hi, Matt
>
> I’ve checked your job is pretty simple, I've CC Xingbo who is a PyFlink
> expert to help take a quick look.
>
>
> Best,
> Leonard
>
> 2022年10月31日 上午11:47,Matt Fysh  写道:
>
> Hi there,
>
> I am running a local test with:
> * source = env.from_collection
> * sink = datastream.execute_and_collect
> with a map function between, and two very small data points in the
> collection
>
> I'm able to generate an OutOfMemoryError, and due to the nature of this
> test using simple source and sink, plus not having large data size
> requirements, I suspect this is due to a bug.
>
> I'm running v1.13.2 and have created a docker-based reproduction
> repository here: https://github.com/mattfysh/pyflink-oom
>
> Please take a look and let me know what you think
>
> Thanks!
> Matt
>
>
>


OutOfMemoryError (java heap space) on small, local test

2022-10-30 Thread Matt Fysh
Hi there,

I am running a local test with:
* source = env.from_collection
* sink = datastream.execute_and_collect
with a map function between, and two very small data points in the
collection

I'm able to generate an OutOfMemoryError, and due to the nature of this
test using simple source and sink, plus not having large data size
requirements, I suspect this is due to a bug.

I'm running v1.13.2 and have created a docker-based reproduction repository
here: https://github.com/mattfysh/pyflink-oom

Please take a look and let me know what you think

Thanks!
Matt


AWS Kinesis Analytics and Pyflink OutputTag

2022-09-11 Thread Matt Fysh
Hi there,

I am looking art using Flink to implement a fan-out of events, but
discovered a couple of issues attempting to implement on AWS Kinesis
Analytics:
- the version of Flink is pinned to 1.13.2
- there is currently no version of Pyflink that supports OutputTag

Does anyone know if it is possible to overcome these issues and have event
fan-out working on AWS?

Thanks,
Matt


Re: Flink k8s Operator on AWS?

2022-06-27 Thread Matt Casters
The problem was a misconfiguration of the initContainer which would copy my
artifacts from s3 to an ephemeral volume.  This caused the task manager to
get started for a bit and then to be shut down.  It was hard to get logging
about this since the pods were gone before I could get logging from it.  I
chalk all that up to just me lacking a bit of experience with k8s.

That being said... It's all working now and I documented the deployment
over here:

https://hop.apache.org/manual/next/pipeline/beam/flink-k8s-operator-running-hop-pipeline.html

A big thank you to everyone that helped me out!

Cheers,
Matt

On Mon, Jun 27, 2022 at 4:59 AM Yang Wang  wrote:

> Could you please share the JobManager logs of failed deployment? It will
> also help a lot if you could show the pending pod status via "kubectl
> describe ".
>
> Given that the current Flink Kubernetes Operator is built on top of native
> K8s integration[1], the Flink ResourceManager should allocate enough
> TaskManager pods automatically.
> We need to find out what is wrong via the logs. Maybe the service account
> or taint or something else.
>
>
> [1]. https://flink.apache.org/2021/02/10/native-k8s-with-ha.html
>
>
> Best,
> Yang
>
> Matt Casters  于2022年6月24日周五 23:48写道:
>
>> Yes of-course.  I already feel a bit less intelligent for having asked
>> the question ;-)
>>
>> The status now is that I managed to have it all puzzled together.
>> Copying the files from s3 to an ephemeral volume takes all of 2 seconds so
>> it's really not an issue.  The cluster starts and our fat jar and Apache
>> Hop MainBeam class is found and started.
>>
>> The only thing that remains is figuring out how to configure the Flink
>> cluster itself.  I have a couple of m5.large ec2 instances in a node group
>> on EKS and I set taskmanager.numberOfTaskSlots to "4".  However, the tasks
>> in the pipeline can't seem to find resources to start.
>>
>> Caused by:
>> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
>> Slot request bulk is not fulfillable! Could not allocate the required slot
>> within slot request timeout
>>
>> Parallelism was set to 1 for the runner and there are only 2 tasks in my
>> first Beam pipeline so it should be simple enough but it just times out.
>>
>> Next step for me is to document the result which will end up on
>> hop.apache.org.   I'll probably also want to demo this in Austin at the
>> upcoming Beam summit.
>>
>> Thanks a lot for your time and help so far!
>>
>> Cheers,
>> Matt
>>
>>


Re: Flink k8s Operator on AWS?

2022-06-24 Thread Matt Casters
Yes of-course.  I already feel a bit less intelligent for having asked the
question ;-)

The status now is that I managed to have it all puzzled together.  Copying
the files from s3 to an ephemeral volume takes all of 2 seconds so it's
really not an issue.  The cluster starts and our fat jar and Apache Hop
MainBeam class is found and started.

The only thing that remains is figuring out how to configure the Flink
cluster itself.  I have a couple of m5.large ec2 instances in a node group
on EKS and I set taskmanager.numberOfTaskSlots to "4".  However, the tasks
in the pipeline can't seem to find resources to start.

Caused by:
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
Slot request bulk is not fulfillable! Could not allocate the required slot
within slot request timeout

Parallelism was set to 1 for the runner and there are only 2 tasks in my
first Beam pipeline so it should be simple enough but it just times out.

Next step for me is to document the result which will end up on
hop.apache.org.   I'll probably also want to demo this in Austin at the
upcoming Beam summit.

Thanks a lot for your time and help so far!

Cheers,
Matt


Re: Flink k8s Operator on AWS?

2022-06-24 Thread Matt Casters
Hi Mátyás & all,

Thanks again for the advice so far. On a related note I noticed Java 8
being used, indicated in the log.

org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] -
 JAVA_HOME: /usr/local/openjdk-8

Is there a way to use Java 11 to start Flink with?

Kind regards,

Matt

On Tue, Jun 21, 2022 at 4:53 PM Őrhidi Mátyás 
wrote:

> Hi Matt,
>
> I believe an artifact fetcher (e.g
> https://hub.docker.com/r/agiledigital/s3-artifact-fetcher ) + the pod
> template (
> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/custom-resource/pod-template/#pod-template)
> is an elegant way to solve your problem.
>
> The operator uses K8s native integration under the hood:
> https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/deployment/resource-providers/native_kubernetes/#application-mode
>  In
> application mode,  the main() method of the application is executed on the
> JobManager, hence we need the jar locally.
>
> You can launch a session cluster (without job spec) on the operator that
> allows submitting jars if you would like to avoid dealing with
> authentication, but the recommended and safe approach is to use
> sessionjobs for this purpose.
>
>
> Cheers,
> Matyas
>
> On Tue, Jun 21, 2022 at 4:03 PM Matt Casters <
> matt.cast...@neotechnology.com> wrote:
>
>> Thank you very much for the help Matyas and Gyula!
>>
>> I just saw a video today where you were presenting the FKO.  Really nice
>> stuff!
>>
>> So I'm guessing we're executing "flink run" at some point on the master
>> and that this is when we need the jar file to be local?
>> Am I right in assuming that this happens after the flink cluster in
>> question was started, as part of the job execution?
>>
>> On the one hand I agree with the underlying idea that authentication and
>> security should not be a responsibility of the operator.   On the other
>> hand I could add a flink-s3 driver but then I'd also have to configure it
>> and so on and it's just hard to get that configuration to be really clean.
>>
>> Do we have some service running on the flink cluster which would allow us
>> to post/copy files from the client (running kubectl) to the master?  If so,
>> could we add an option to the job specification to that effect?  Just
>> brainstorming ;-) (and forking apache/flink-kubernetes-operator)
>>
>> All the best,
>> Matt
>>
>> On Tue, Jun 21, 2022 at 2:52 PM Őrhidi Mátyás 
>> wrote:
>>
>>> Hi Matt,
>>>
>>> - In FlinkDeployments you can utilize an init container to download your
>>> artifact onto a shared volume, then you can refer to it as local:/.. from
>>> the main container. FlinkDeployments comes with pod template support
>>> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/custom-resource/pod-template/#pod-template
>>>
>>> - FlinkSessionJobs comes with an artifact fetcher, but it may need some
>>> tweaking to make it work on your environment:
>>>
>>> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/custom-resource/overview/#flinksessionjob-spec-overview
>>>
>>> I hope it helps, let us know if you have further questions.
>>>
>>> Cheers,
>>> Matyas
>>>
>>>
>>>
>>> On Tue, Jun 21, 2022 at 2:35 PM Matt Casters <
>>> matt.cast...@neotechnology.com> wrote:
>>>
>>>> Hi Flink team!
>>>>
>>>> I'm interested in getting the new Flink Kubernetes Operator to work on
>>>> AWS EKS.  Following the documentation I got pretty far.  However, when
>>>> trying to run a job I got the following error:
>>>>
>>>> Only "local" is supported as schema for application mode. This assumes t
>>>>> hat the jar is located in the image, not the Flink client. An example
>>>>> of such path is: local:///opt/flink/examples/streaming/WindowJoin.jar
>>>>
>>>>
>>>>  I have an Apache Hop/Beam fat jar capable of running the Flink
>>>> pipeline in my yml file:
>>>>
>>>> jarURI: s3://hop-eks/hop/hop-2.1.0-fat.jar
>>>>
>>>> So how could I go about getting the fat jar in a desired location for
>>>> the operator?
>>>>
>>>> Getting this to work would be really cool for both short and long-lived
>>>> pipelines in the service of all sorts of data integration work.  It would
>>>> do away with the complexity of setting up and maintaining your own Flink
>>>> cluster.
>>>>
>>>> Thanks in advance!
>>>>
>>>> All the best,
>>>>
>>>> Matt (mcasters, Apache Hop PMC)
>>>>
>>>>


Re: Flink k8s Operator on AWS?

2022-06-22 Thread Matt Casters
Hi Yang,

Thanks for the suggestion!  I looked into this volume sharing on EKS
yesterday but I couldn't figure it out right away.
The way that people come into the Apache Hop project is often with very
little technical knowledge since that's sort of the goal of the project:
make things easy.  Following page after page of complicated instructions
just to get a few files into a pod container... I feel it's just a bit
much.
But again, this is my frustration with k8s, not with Flink ;-)

Cheers,
Matt

On Wed, Jun 22, 2022 at 5:32 AM Yang Wang  wrote:

> Matyas and Gyula have shared many great informations about how to make the
> Flink Kubernetes Operator work on the EKS.
>
> One more input about how to prepare the user jars. If you are more
> familiar with K8s, you could use persistent volume to provide the user jars
> and them mount the volume to JobManager and TaskManager.
> I think the EKS could support EBS, NFS and more other PVs.
>
> Best,
> Yang
>
> Őrhidi Mátyás  于2022年6月21日周二 23:00写道:
>
>> Hi Matt,
>>
>> I believe an artifact fetcher (e.g
>> https://hub.docker.com/r/agiledigital/s3-artifact-fetcher ) + the pod
>> template (
>> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/custom-resource/pod-template/#pod-template)
>> is an elegant way to solve your problem.
>>
>> The operator uses K8s native integration under the hood:
>> https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/deployment/resource-providers/native_kubernetes/#application-mode
>>  In
>> application mode,  the main() method of the application is executed on the
>> JobManager, hence we need the jar locally.
>>
>> You can launch a session cluster (without job spec) on the operator that
>> allows submitting jars if you would like to avoid dealing with
>> authentication, but the recommended and safe approach is to use
>> sessionjobs for this purpose.
>>
>>
>> Cheers,
>> Matyas
>>
>> On Tue, Jun 21, 2022 at 4:03 PM Matt Casters <
>> matt.cast...@neotechnology.com> wrote:
>>
>>> Thank you very much for the help Matyas and Gyula!
>>>
>>> I just saw a video today where you were presenting the FKO.  Really nice
>>> stuff!
>>>
>>> So I'm guessing we're executing "flink run" at some point on the master
>>> and that this is when we need the jar file to be local?
>>> Am I right in assuming that this happens after the flink cluster in
>>> question was started, as part of the job execution?
>>>
>>> On the one hand I agree with the underlying idea that authentication and
>>> security should not be a responsibility of the operator.   On the other
>>> hand I could add a flink-s3 driver but then I'd also have to configure it
>>> and so on and it's just hard to get that configuration to be really clean.
>>>
>>> Do we have some service running on the flink cluster which would allow
>>> us to post/copy files from the client (running kubectl) to the master?  If
>>> so, could we add an option to the job specification to that effect?  Just
>>> brainstorming ;-) (and forking apache/flink-kubernetes-operator)
>>>
>>> All the best,
>>> Matt
>>>
>>> On Tue, Jun 21, 2022 at 2:52 PM Őrhidi Mátyás 
>>> wrote:
>>>
>>>> Hi Matt,
>>>>
>>>> - In FlinkDeployments you can utilize an init container to download
>>>> your artifact onto a shared volume, then you can refer to it as local:/..
>>>> from the main container. FlinkDeployments comes with pod template support
>>>> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/custom-resource/pod-template/#pod-template
>>>>
>>>> - FlinkSessionJobs comes with an artifact fetcher, but it may need some
>>>> tweaking to make it work on your environment:
>>>>
>>>> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/custom-resource/overview/#flinksessionjob-spec-overview
>>>>
>>>> I hope it helps, let us know if you have further questions.
>>>>
>>>> Cheers,
>>>> Matyas
>>>>
>>>>
>>>>
>>>> On Tue, Jun 21, 2022 at 2:35 PM Matt Casters <
>>>> matt.cast...@neotechnology.com> wrote:
>>>>
>>>>> Hi Flink team!
>>>>>
>>>>> I'm interested in getting the new Flink Kubernetes Operator to work on
>>>>> AWS EKS.  Following the documentation I got pretty far.  However, when
>>>>> tryi

Re: Flink k8s Operator on AWS?

2022-06-22 Thread Matt Casters
Hi Matyas,

Again thank you very much for the information.  I'm a beginner and all
the help is really appreciated.  After some diving into the script
behind s3-artifiact-fetcher I kind of figured it out.  Have an folder
sync'ed into the pod container of the task manager.  Then I guess we should
be able to find the files locally.

At its core what we're trying to do with a project like Apache Hop is sit
on the side of the organizations that use the software since we want to
lower complexity, maintenance costs, learning curves and so on.  Every time
I see a cryptic scarcely documented Yaml file or complicated k8s setup I
need to ask myself in which way I'm sending our users on a week-long
mission.

In a way it makes me appreciate the work Google did with Dataflow a bit
more because they looked at this problem in a holistic way and considered
the platform (GCP), the engine (Dataflow cluster on GCP k8s) and the
executing pipeline (Beam API Jar files) to be different facets of the same
problem.  Jar files get uploaded automatically, the cluster automatically
instantiated, the pipeline run, monitored and scaled automatically and at
the end shut down properly.

I want to figure out a way to do this with Flink as well since I believe,
especially on AWS (even with Spark centric options on EMR, EMR serverless),
that running a pipeline is just too complicated.  Your work really helps!

All the best,
Matt

On Tue, Jun 21, 2022 at 4:53 PM Őrhidi Mátyás 
wrote:

> Hi Matt,
>
> I believe an artifact fetcher (e.g
> https://hub.docker.com/r/agiledigital/s3-artifact-fetcher ) + the pod
> template (
> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/custom-resource/pod-template/#pod-template)
> is an elegant way to solve your problem.
>
> The operator uses K8s native integration under the hood:
> https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/deployment/resource-providers/native_kubernetes/#application-mode
>  In
> application mode,  the main() method of the application is executed on the
> JobManager, hence we need the jar locally.
>
> You can launch a session cluster (without job spec) on the operator that
> allows submitting jars if you would like to avoid dealing with
> authentication, but the recommended and safe approach is to use
> sessionjobs for this purpose.
>
>
> Cheers,
> Matyas
>
> On Tue, Jun 21, 2022 at 4:03 PM Matt Casters <
> matt.cast...@neotechnology.com> wrote:
>
>> Thank you very much for the help Matyas and Gyula!
>>
>> I just saw a video today where you were presenting the FKO.  Really nice
>> stuff!
>>
>> So I'm guessing we're executing "flink run" at some point on the master
>> and that this is when we need the jar file to be local?
>> Am I right in assuming that this happens after the flink cluster in
>> question was started, as part of the job execution?
>>
>> On the one hand I agree with the underlying idea that authentication and
>> security should not be a responsibility of the operator.   On the other
>> hand I could add a flink-s3 driver but then I'd also have to configure it
>> and so on and it's just hard to get that configuration to be really clean.
>>
>> Do we have some service running on the flink cluster which would allow us
>> to post/copy files from the client (running kubectl) to the master?  If so,
>> could we add an option to the job specification to that effect?  Just
>> brainstorming ;-) (and forking apache/flink-kubernetes-operator)
>>
>> All the best,
>> Matt
>>
>> On Tue, Jun 21, 2022 at 2:52 PM Őrhidi Mátyás 
>> wrote:
>>
>>> Hi Matt,
>>>
>>> - In FlinkDeployments you can utilize an init container to download your
>>> artifact onto a shared volume, then you can refer to it as local:/.. from
>>> the main container. FlinkDeployments comes with pod template support
>>> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/custom-resource/pod-template/#pod-template
>>>
>>> - FlinkSessionJobs comes with an artifact fetcher, but it may need some
>>> tweaking to make it work on your environment:
>>>
>>> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/custom-resource/overview/#flinksessionjob-spec-overview
>>>
>>> I hope it helps, let us know if you have further questions.
>>>
>>> Cheers,
>>> Matyas
>>>
>>>
>>>
>>> On Tue, Jun 21, 2022 at 2:35 PM Matt Casters <
>>> matt.cast...@neotechnology.com> wrote:
>>>
>>>> Hi Flink team!
>>>>
>>>> I'm interested in getting the new Flink Kubernetes Operator to work on
&g

Re: Flink k8s Operator on AWS?

2022-06-21 Thread Matt Casters
Thank you very much for the help Matyas and Gyula!

I just saw a video today where you were presenting the FKO.  Really nice
stuff!

So I'm guessing we're executing "flink run" at some point on the master and
that this is when we need the jar file to be local?
Am I right in assuming that this happens after the flink cluster in
question was started, as part of the job execution?

On the one hand I agree with the underlying idea that authentication and
security should not be a responsibility of the operator.   On the other
hand I could add a flink-s3 driver but then I'd also have to configure it
and so on and it's just hard to get that configuration to be really clean.

Do we have some service running on the flink cluster which would allow us
to post/copy files from the client (running kubectl) to the master?  If so,
could we add an option to the job specification to that effect?  Just
brainstorming ;-) (and forking apache/flink-kubernetes-operator)

All the best,
Matt

On Tue, Jun 21, 2022 at 2:52 PM Őrhidi Mátyás 
wrote:

> Hi Matt,
>
> - In FlinkDeployments you can utilize an init container to download your
> artifact onto a shared volume, then you can refer to it as local:/.. from
> the main container. FlinkDeployments comes with pod template support
> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/custom-resource/pod-template/#pod-template
>
> - FlinkSessionJobs comes with an artifact fetcher, but it may need some
> tweaking to make it work on your environment:
>
> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/custom-resource/overview/#flinksessionjob-spec-overview
>
> I hope it helps, let us know if you have further questions.
>
> Cheers,
> Matyas
>
>
>
> On Tue, Jun 21, 2022 at 2:35 PM Matt Casters <
> matt.cast...@neotechnology.com> wrote:
>
>> Hi Flink team!
>>
>> I'm interested in getting the new Flink Kubernetes Operator to work on
>> AWS EKS.  Following the documentation I got pretty far.  However, when
>> trying to run a job I got the following error:
>>
>> Only "local" is supported as schema for application mode. This assumes t
>>> hat the jar is located in the image, not the Flink client. An example of
>>> such path is: local:///opt/flink/examples/streaming/WindowJoin.jar
>>
>>
>>  I have an Apache Hop/Beam fat jar capable of running the Flink pipeline
>> in my yml file:
>>
>> jarURI: s3://hop-eks/hop/hop-2.1.0-fat.jar
>>
>> So how could I go about getting the fat jar in a desired location for the
>> operator?
>>
>> Getting this to work would be really cool for both short and long-lived
>> pipelines in the service of all sorts of data integration work.  It would
>> do away with the complexity of setting up and maintaining your own Flink
>> cluster.
>>
>> Thanks in advance!
>>
>> All the best,
>>
>> Matt (mcasters, Apache Hop PMC)
>>
>>


Flink k8s Operator on AWS?

2022-06-21 Thread Matt Casters
Hi Flink team!

I'm interested in getting the new Flink Kubernetes Operator to work on AWS
EKS.  Following the documentation I got pretty far.  However, when trying
to run a job I got the following error:

Only "local" is supported as schema for application mode. This assumes t
> hat the jar is located in the image, not the Flink client. An example of
> such path is: local:///opt/flink/examples/streaming/WindowJoin.jar


 I have an Apache Hop/Beam fat jar capable of running the Flink pipeline in
my yml file:

jarURI: s3://hop-eks/hop/hop-2.1.0-fat.jar

So how could I go about getting the fat jar in a desired location for the
operator?

Getting this to work would be really cool for both short and long-lived
pipelines in the service of all sorts of data integration work.  It would
do away with the complexity of setting up and maintaining your own Flink
cluster.

Thanks in advance!

All the best,

Matt (mcasters, Apache Hop PMC)


Re: Flink ignoring latest checkpoint on restart?

2021-10-19 Thread LeVeck, Matt
Thanks David.  We're running Flink 3.4.10.  I don't see anything immediately 
standing out in the zookeeper logs.

I think what would help perhaps as much as help diagnosing is the following.  
When it does fail to find the lastest checkpoint in Zookeeper, Flink seems to 
go to the checkpoint in the recovery directory.  But in this instance the 
checkpoint in the recovery directory is quite old.  Is there a way to ensure 
that that is at least somewhat newer, so that less reprocessing occurs when we 
hit this issue?.


From: David Morávek 
Sent: Tuesday, October 19, 2021 3:03 AM
To: LeVeck, Matt 
Cc: user@flink.apache.org 
Subject: Re: Flink ignoring latest checkpoint on restart?

This email is from an external sender.

Hi Matt,

this seems interesting, I'm aware of some possible inconsistency issues with 
unstable connections [1], but I have to yet find out if this could be related. 
I'll do some research on this and will get back to you.

In the meantime, can you see anything relevant in the zookeeper logs? Also 
which ZK version are you using?

[1] https://issues.apache.org/jira/browse/FLINK-24543

Best,
D.

On Tue, Oct 19, 2021 at 7:49 AM LeVeck, Matt 
mailto:matt_lev...@intuit.com>> wrote:
My team and I could use some help debugging the following issue, and may 
understanding Flink's full checkpoint recovery decision tree:

We've seen a few times a scenario where a task restarts (but not the job 
manager), a recent checkpoint is saved.  But upon coming back up Flink chooses 
a much older checkpoint.  Below is a log of one such event.  In it checkpoint 
64996 is written (the log indicates this, and checking S3 confirms, but the job 
restarts with 61634.  Looking at the log I'm wondering:

  1.  Is it likely that Flink failed to update Zookeeper, despite writing the 
checkpoint to S3?
  2.  In the event where Flink fails to find an entry in Zookeeper, what is its 
fallback algorithm (where does it look next for a recovery point?)
  3.  It seems to ultimately have ended up in the checkpoint that existed at 
the time when the job started.  Is there a configuration that would allow the 
fallback checkpoint to be something more recent?

Thanks,
Matt


2021/10/11 12:22:28.137 INFO  c.i.strmprocess.ArgsPreprocessor - 
latestSavepointPrefix:desanalytics-7216-doc-comprehension-analytics-7216-prd/4/savepoints/savepoint-00-abb450590ca7/_metadata
 LastModified:2021-10-11T12:03:47Z

2021/10/11 12:22:43.188 INFO  o.a.f.r.c.CheckpointCoordinator - Starting job 
 from savepoint 
s3://spp-state-371299021277-tech-core-idx/desanalytics-7216-doc-comprehension-analytics-7216-prd/4/savepoints/savepoint-00-abb450590ca7
 (allowing non restored state)

Starting standalonejob as a console application on host 
doc-comprehension-analytics-7216-7dbfbc4bbd-rzvpw.

args: --job-id  --allowNonRestoredState 
--job-classname 
com.intuit.ifdp.doccomprehension.analytics.DocComprehensionAnalyticsProcessor  
--fromSavepoint 
s3://spp-state-371299021277-tech-core-idx/desanalytics-7216-doc-comprehension-analytics-7216-prd/4/savepoints/savepoint-00-abb450590ca7

2021/10/11 12:22:51.777 INFO  o.a.f.r.c.CheckpointCoordinator - Reset the 
checkpoint ID of job  to 61634.

2021/10/11 12:22:51.777 INFO  o.a.f.r.c.ZooKeeperCompletedCheckpointStore - 
Recovering checkpoints from ZooKeeper.

2021/10/11 12:22:51.788 INFO  o.a.f.r.c.ZooKeeperCompletedCheckpointStore - 
Found 1 checkpoints in ZooKeeper.

2021/10/11 12:22:51.788 INFO  o.a.f.r.c.ZooKeeperCompletedCheckpointStore - 
Trying to fetch 1 checkpoints from storage.

2021/10/11 12:22:51.788 INFO  o.a.f.r.c.ZooKeeperCompletedCheckpointStore - 
Trying to retrieve checkpoint 61633.

2021/10/11 12:22:51.895 INFO  o.a.f.r.c.CheckpointCoordinator - Restoring job 
 from latest valid checkpoint: Checkpoint 61633 
@ 0 for .

<<<<<<<< long run here, with a few errors & recover

>>>>>>>> NO restarts on JM

2021/10/13 23:10:37.918 INFO  o.a.f.r.c.CheckpointCoordinator - Triggering 
checkpoint 64996 @ 1634166637914 for job .

2021/10/13 23:10:49.933 INFO  o.a.f.r.c.CheckpointCoordinator - Completed 
checkpoint 64996 for job  (20015670 bytes in 
11759 ms).

2021/10/13 23:10:59.200 INFO  o.a.f.s.z.o.a.zookeeper.ClientCnxn - Unable to 
read additional data from server sessionid 0x17c099f647f1e69, likely server has 
closed socket, closing socket connection and attempting reconnect

2021/10/13 23:10:59.301 INFO  o.a.f.s.c.o.a.c.f.s.ConnectionStateManager - 
State change: SUSPENDED

2021/10/13 23:10:59.301 WARN  o.a.f.r.l.ZooKeeperLeaderRetrievalService - 
Connection to ZooKeeper suspended. Can no longer retrieve the leader from 
ZooKeeper.

2021/10/13 23:10:59.323 INFO  o.a.f.r.e.ExecutionGrap

Flink ignoring latest checkpoint on restart?

2021-10-19 Thread LeVeck, Matt
My team and I could use some help debugging the following issue, and may 
understanding Flink's full checkpoint recovery decision tree:

We've seen a few times a scenario where a task restarts (but not the job 
manager), a recent checkpoint is saved.  But upon coming back up Flink chooses 
a much older checkpoint.  Below is a log of one such event.  In it checkpoint 
64996 is written (the log indicates this, and checking S3 confirms, but the job 
restarts with 61634.  Looking at the log I'm wondering:

  1.  Is it likely that Flink failed to update Zookeeper, despite writing the 
checkpoint to S3?
  2.  In the event where Flink fails to find an entry in Zookeeper, what is its 
fallback algorithm (where does it look next for a recovery point?)
  3.  It seems to ultimately have ended up in the checkpoint that existed at 
the time when the job started.  Is there a configuration that would allow the 
fallback checkpoint to be something more recent?

Thanks,
Matt


2021/10/11 12:22:28.137 INFO  c.i.strmprocess.ArgsPreprocessor - 
latestSavepointPrefix:desanalytics-7216-doc-comprehension-analytics-7216-prd/4/savepoints/savepoint-00-abb450590ca7/_metadata
 LastModified:2021-10-11T12:03:47Z

2021/10/11 12:22:43.188 INFO  o.a.f.r.c.CheckpointCoordinator - Starting job 
 from savepoint 
s3://spp-state-371299021277-tech-core-idx/desanalytics-7216-doc-comprehension-analytics-7216-prd/4/savepoints/savepoint-00-abb450590ca7
 (allowing non restored state)

Starting standalonejob as a console application on host 
doc-comprehension-analytics-7216-7dbfbc4bbd-rzvpw.

args: --job-id  --allowNonRestoredState 
--job-classname 
com.intuit.ifdp.doccomprehension.analytics.DocComprehensionAnalyticsProcessor  
--fromSavepoint 
s3://spp-state-371299021277-tech-core-idx/desanalytics-7216-doc-comprehension-analytics-7216-prd/4/savepoints/savepoint-00-abb450590ca7

2021/10/11 12:22:51.777 INFO  o.a.f.r.c.CheckpointCoordinator - Reset the 
checkpoint ID of job  to 61634.

2021/10/11 12:22:51.777 INFO  o.a.f.r.c.ZooKeeperCompletedCheckpointStore - 
Recovering checkpoints from ZooKeeper.

2021/10/11 12:22:51.788 INFO  o.a.f.r.c.ZooKeeperCompletedCheckpointStore - 
Found 1 checkpoints in ZooKeeper.

2021/10/11 12:22:51.788 INFO  o.a.f.r.c.ZooKeeperCompletedCheckpointStore - 
Trying to fetch 1 checkpoints from storage.

2021/10/11 12:22:51.788 INFO  o.a.f.r.c.ZooKeeperCompletedCheckpointStore - 
Trying to retrieve checkpoint 61633.

2021/10/11 12:22:51.895 INFO  o.a.f.r.c.CheckpointCoordinator - Restoring job 
 from latest valid checkpoint: Checkpoint 61633 
@ 0 for .

<<<<<<<< long run here, with a few errors & recover

>>>>>>>> NO restarts on JM

2021/10/13 23:10:37.918 INFO  o.a.f.r.c.CheckpointCoordinator - Triggering 
checkpoint 64996 @ 1634166637914 for job .

2021/10/13 23:10:49.933 INFO  o.a.f.r.c.CheckpointCoordinator - Completed 
checkpoint 64996 for job  (20015670 bytes in 
11759 ms).

2021/10/13 23:10:59.200 INFO  o.a.f.s.z.o.a.zookeeper.ClientCnxn - Unable to 
read additional data from server sessionid 0x17c099f647f1e69, likely server has 
closed socket, closing socket connection and attempting reconnect

2021/10/13 23:10:59.301 INFO  o.a.f.s.c.o.a.c.f.s.ConnectionStateManager - 
State change: SUSPENDED

2021/10/13 23:10:59.301 WARN  o.a.f.r.l.ZooKeeperLeaderRetrievalService - 
Connection to ZooKeeper suspended. Can no longer retrieve the leader from 
ZooKeeper.

2021/10/13 23:10:59.323 INFO  o.a.f.r.e.ExecutionGraph - Could not restart the 
job doc-comprehension-analytics-7216 () because 
the restart strategy prevented it.

2021/10/13 23:10:59.543 INFO  o.a.f.r.z.ZooKeeperStateHandleStore - Removing 
/flink/prd/desanalytics-7216/doc-comprehension-analytics-7216/checkpoints/
 from ZooKeeper

2021/10/13 23:10:59.555 INFO  o.a.f.r.e.ExecutionGraph - Job recovers via 
failover strategy: full graph restart

2021/10/13 23:10:59.622 INFO  o.a.f.r.c.ZooKeeperCompletedCheckpointStore - 
Recovering checkpoints from ZooKeeper.

2021/10/13 23:10:59.643 INFO  o.a.f.r.c.ZooKeeperCompletedCheckpointStore - 
Found 0 checkpoints in ZooKeeper.

2021/10/13 23:10:59.643 INFO  o.a.f.r.c.ZooKeeperCompletedCheckpointStore - 
Trying to fetch 0 checkpoints from storage.

2021/10/13 23:10:59.643 INFO  o.a.f.r.c.CheckpointCoordinator - Starting job 
 from savepoint 
s3://spp-state-371299021277-tech-core-idx/desanalytics-7216-doc-comprehension-analytics-7216-prd/4/savepoints/savepoint-00-abb450590ca7
 (allowing non restored state)

2021/10/13 23:10:59.744 INFO  o.a.f.r.r.StandaloneResourceManager - Registering 
TaskManager with ResourceID 085da1dcfff1103b0f054dbd8a27

回复: flink on native k8s 资源弹性扩容问题

2021-09-28 Thread Matt Wang
可以看下使用的 k8s 是否有 VPA 的能力


--

Best,
Matt Wang


在2021年09月26日 14:23,刘建刚 写道:
这个不支持,你可以通过外部的工具来做到。比如,检测cpu到了一定程度就自动的重启作业来扩容。

赵旭晨  于2021年9月23日周四 下午9:14写道:

目前生产上环境作业参数0.2(cpu),5G
平常增量跑的时候cpu占用率不到5%,上游数据全量初始化时经常会把CPU打满

想问下:flink能否做到弹性扩容?当pod的request cpu打满时自动增加cpu,当高峰期过后处于增量阶段时再收回部分pod资源?







Re: [ANNOUNCE] New PMC member: Dian Fu

2020-08-27 Thread Matt Wang
Congratulations Dian!


--

Best,
Matt Wang


On 08/27/2020 23:01,Benchao Li wrote:
Congratulations Dian!

Cranmer, Danny  于2020年8月27日周四 下午10:55写道:

Congratulations Dian! :D

On 27/08/2020, 15:25, "Robert Metzger"  wrote:

CAUTION: This email originated from outside of the organization. Do
not click links or open attachments unless you can confirm the sender and
know the content is safe.



Congratulations Dian!

On Thu, Aug 27, 2020 at 3:09 PM Congxian Qiu 
wrote:

Congratulations Dian
Best,
Congxian


Xintong Song  于2020年8月27日周四 下午7:50写道:

Congratulations Dian~!

Thank you~

Xintong Song



On Thu, Aug 27, 2020 at 7:42 PM Jark Wu  wrote:

Congratulations Dian!

Best,
Jark

On Thu, 27 Aug 2020 at 19:37, Leonard Xu 
wrote:

Congrats, Dian!  Well deserved.

Best
Leonard

在 2020年8月27日,19:34,Kurt Young  写道:

Congratulations Dian!

Best,
Kurt


On Thu, Aug 27, 2020 at 7:28 PM Rui Li <
lirui.fu...@gmail.com>
wrote:

Congratulations Dian!

On Thu, Aug 27, 2020 at 5:39 PM Yuan Mei <
yuanmei.w...@gmail.com>
wrote:

Congrats!

On Thu, Aug 27, 2020 at 5:38 PM Xingbo Huang <
hxbks...@gmail.com

wrote:

Congratulations Dian!

Best,
Xingbo

jincheng sun  于2020年8月27日周四
下午5:24写道:

Hi all,

On behalf of the Flink PMC, I'm happy to announce that
Dian Fu
is
now
part of the Apache Flink Project Management Committee
(PMC).

Dian Fu has been very active on PyFlink component,
working on
various
important features, such as the Python UDF and Pandas
integration,
and
keeps checking and voting for our releases, and also has
successfully
produced two releases(1.9.3&1.11.1) as RM, currently
working as
RM
to push
forward the release of Flink 1.12.

Please join me in congratulating Dian Fu for becoming a
Flink
PMC
Member!

Best,
Jincheng(on behalf of the Flink PMC)



--
Best regards!
Rui Li









--

Best,
Benchao Li


回复:flink集群搭建

2020-08-10 Thread Matt Wang
通常使用 preJob 模式的更多,考虑点主要是资源隔离,缺点是作业需要等到 JM/TM 启动后才能运行(作业启动速度会慢一些),session 
模式刚好相反,你需要评估你们的场景。


--

Best,
Matt Wang


在2020年08月10日 16:21,Dream-底限 写道:
hi、
FlinkOnYarn集群部署是推荐使用yarn-session模式所有任务共用一个,还是推荐使用preJob模式每个任务起一个小集群


回复: Flink 1.11.1 on k8s 如何配置hadoop

2020-08-07 Thread Matt Wang
官网的镜像只包含 Flink 相关的内容,如果需要连接 HDFS,你需要将 Hadoop 相关包及配置打到镜像中


--

Best,
Matt Wang


在2020年08月7日 12:49,caozhen 写道:
顺手贴一下flink1.11.1的hadoop集成wiki:
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/hadoop.html

根据官网说不再提供flink-shaded-hadoop-2-uber。并给出以下两种解决方式

1、建议使用HADOOP_CLASSPATH加载hadoop依赖
2、或者将hadoop依赖放到flink客户端lib目录下

*我在用1.11.1 flink on
yarn时,使用的是第二种方式,下载hadoop-src包,将一些常用依赖拷贝到lib目录下。(这可能会和你的mainjar程序发生类冲突问题,需要调试)

我觉得目前这种方式不好,只是暂时解决问题。还是应该有flink-shaded-hadoop包,正在尝试打包,有些问题还没完全解决。
*



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: Flink 1.11 submit job timed out

2020-07-31 Thread Matt Wang
遇到了同样的问题,也是启动了 taskmanager-query-state-service.yaml 这个服务后,作业才能正常提交的,另外我是在本地装的 
k8s 集群进行测试的,如果是 GC 的问题,启不启动 TM service 应该不会有影响的


--

Best,
Matt Wang


On 07/27/2020 15:01,Yang Wang wrote:
建议先配置heartbeat.timeout的值大一些,然后把gc log打出来
看看是不是经常发生fullGC,每次持续时间是多长,从你目前提供的log看,进程内JM->RM都会心跳超时
怀疑还是和GC有关的

env.java.opts.jobmanager: -Xloggc:/jobmanager-gc.log
-XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=2 -XX:GCLogFileSize=512M


Best,
Yang

SmileSmile  于2020年7月27日周一 下午1:50写道:

Hi,Yang Wang

因为日志太长了,删了一些重复的内容。
一开始怀疑过jm gc的问题,将jm的内存调整为10g也是一样的情况。

Best



| |
a511955993
|
|
邮箱:a511955...@163.com
|

签名由 网易邮箱大师 定制

On 07/27/2020 11:36, Yang Wang wrote:
看你这个任务,失败的根本原因并不是“No hostname could be resolved
”,这个WARNING的原因可以单独讨论(如果在1.10里面不存在的话)。
你可以本地起一个Standalone的集群,也会有这样的WARNING,并不影响正常使用


失败的原因是slot 5分钟申请超时了,你给的日志里面2020-07-23 13:55:45,519到2020-07-23
13:58:18,037是空白的,没有进行省略吧?
这段时间按理应该是task开始deploy了。在日志里看到了JM->RM的心跳超时,同一个Pod里面的同一个进程通信也超时了
所以怀疑JM一直在FullGC,这个需要你确认一下


Best,
Yang

SmileSmile  于2020年7月23日周四 下午2:43写道:

Hi Yang Wang

先分享下我这边的环境版本


kubernetes:1.17.4.   CNI: weave


1 2 3 是我的一些疑惑

4 是JM日志


1. 去掉taskmanager-query-state-service.yaml后确实不行  nslookup

kubectl exec -it busybox2 -- /bin/sh
/ # nslookup 10.47.96.2
Server:  10.96.0.10
Address: 10.96.0.10:53

** server can't find 2.96.47.10.in-addr.arpa: NXDOMAIN



2. Flink1.11和Flink1.10

detail subtasks taskmanagers xxx x 这行

1.11变成了172-20-0-50。1.10是flink-taskmanager-7b5d6958b6-sfzlk:36459。这块的改动是?(目前这个集群跑着1.10和1.11,1.10可以正常运行,如果coredns有问题,1.10版本的flink应该也有一样的情况吧?)

3. coredns是否特殊配置?

在容器中解析域名是正常的,只是反向解析没有service才会有问题。coredns是否有什么需要配置?


4. time out时候的JM日志如下:



2020-07-23 13:53:00,228 INFO
org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] -
ResourceManager akka.tcp://flink@flink-jobmanager
:6123/user/rpc/resourcemanager_0
was granted leadership with fencing token

2020-07-23 13:53:00,232 INFO
org.apache.flink.runtime.rpc.akka.AkkaRpcService [] -
Starting
RPC endpoint for org.apache.flink.runtime.dispatcher.StandaloneDispatcher
at akka://flink/user/rpc/dispatcher_1 .
2020-07-23 13:53:00,233 INFO
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl []
-
Starting the SlotManager.
2020-07-23 13:53:03,472 INFO
org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] -
Registering TaskManager with ResourceID 1f9ae0cd95a28943a73be26323588696
(akka.tcp://flink@10.34.128.9:6122/user/rpc/taskmanager_0) at
ResourceManager
2020-07-23 13:53:03,777 INFO
org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] -
Registering TaskManager with ResourceID cac09e751264e61615329c20713a84b4
(akka.tcp://flink@10.32.160.6:6122/user/rpc/taskmanager_0) at
ResourceManager
2020-07-23 13:53:03,787 INFO
org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] -
Registering TaskManager with ResourceID 93c72d01d09f9ae427c5fc980ed4c1e4
(akka.tcp://flink@10.39.0.8:6122/user/rpc/taskmanager_0) at
ResourceManager
2020-07-23 13:53:04,044 INFO
org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] -
Registering TaskManager with ResourceID 8adf2f8e81b77a16d5418a9e252c61e2
(akka.tcp://flink@10.38.64.7:6122/user/rpc/taskmanager_0) at
ResourceManager
2020-07-23 13:53:04,099 INFO
org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] -
Registering TaskManager with ResourceID 23e9d2358f6eb76b9ae718d879d4f330
(akka.tcp://flink@10.42.160.6:6122/user/rpc/taskmanager_0) at
ResourceManager
2020-07-23 13:53:04,146 INFO
org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] -
Registering TaskManager with ResourceID 092f8dee299e32df13db3111662b61f8
(akka.tcp://flink@10.33.192.14:6122/user/rpc/taskmanager_0) at
ResourceManager


2020-07-23 13:55:44,220 INFO
org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] -
Received
JobGraph submission 99a030d0e3f428490a501c0132f27a56 (JobTest).
2020-07-23 13:55:44,222 INFO
org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] -
Submitting job 99a030d0e3f428490a501c0132f27a56 (JobTest).
2020-07-23 13:55:44,251 INFO
org.apache.flink.runtime.rpc.akka.AkkaRpcService [] -
Starting
RPC endpoint for org.apache.flink.runtime.jobmaster.JobMaster at
akka://flink/user/rpc/jobmanager_2 .
2020-07-23 13:55:44,260 INFO
org.apache.flink.runtime.jobmaster.JobMaster
[] - Initializing job JobTest
(99a030d0e3f428490a501c0132f27a56).
2020-07-23 13:55:44,278 INFO
org.apache.flink.runtime.jobmaster.JobMaster
[] - Using restart back off time strategy
NoRestartBackoffTimeStrategy for JobTest
(99a030d0e3f428490a501c0132f27a56).
2020-07-23 13:55:44,319 INFO
org.apache.flink.runtime.jobmaster.JobMaster
[] - Running initialization on master for job JobTest
(99a030d0e3f428490a501c0132f27a56).
2020-07-23 13:55:44,319 INFO
org.apache.flink.runtime.jobmaster.JobMaster
[] - Successfully ran initialization on master in 0 ms.
2020-07-23 13:55:44,42

Re: Running Kubernetes on Flink with Savepoint

2020-06-17 Thread Matt Magsombol
Yeah, our set up is a bit out dated ( since flink 1.7-ish ) but we're 
effectively just using helm templates...when upgrading to 1.10, I just ended up 
looking at diffs and change logs for changes...
Anyways, thanks, I was hoping that flink has a community supported way of doing 
this, but I think I know what to do internally

On 2020/06/15 15:11:32, Robert Metzger  wrote: 
> Hi Matt,
> 
> sorry for the late reply. Why are you using the "flink-docker" helm example
> instead of
> https://ci.apache.org/projects/flink/flink-docs-master/ops/deployment/kubernetes.html
>  or
> https://ci.apache.org/projects/flink/flink-docs-master/ops/deployment/native_kubernetes.html
>  ?
> I don't think that the helm charts you are mentioning are actively
> maintained or recommended for production use.
> 
> If you want to create a savepoint in Flink, you'll need to trigger it via
> the JobManager's REST API (independent of how you deploy it). I guess
> you'll have to come up with some tooling that orchestrates triggering a
> savepoint before shutting down / upgrading the job.
> See also:
> https://ci.apache.org/projects/flink/flink-docs-master/monitoring/rest_api.html#jobs-jobid-savepoints
> 
> Best,
> Robert
> 
> 
> 
> On Wed, Jun 10, 2020 at 2:48 PM Matt Magsombol  wrote:
> 
> > We're currently using this template:
> > https://github.com/docker-flink/examples/tree/master/helm/flink for
> > running kubernetes flink for running a job specific cluster ( with a nit of
> > specifying the class as the main runner for the cluster ).
> >
> >
> > How would I go about setting up adding savepoints, so that we can edit our
> > currently existing running jobs to add pipes to the flink job without
> > having to restart our state? Reasoning is that our state has a 1 day TTL
> > and updating our code without state will have to restart this from scratch.
> >
> > Through documentation, I see that I'd need to run some sort of command.
> > This is not possible to be consistent if we're using the helm charts
> > specified in the link.
> >
> > I see this email thread talking about a certain problem with savepoints +
> > kubernetes but doesn't quite specify how to set this up with helm:
> > https://lists.apache.org/thread.html/4299518f4da2810aa88fe6b21f841880b619f3f8ac264084a318c034%40%3Cuser.flink.apache.org%3E
> >
> >
> > According to hasun@zendesk from that thread, they mention that "We always
> > make a savepoint before we shutdown the job-cluster. So the savepoint is
> > always the latest. When we fix a bug or change the job graph, it can resume
> > well."
> >
> > This is the exact use case that I'm looking to appease. Other than
> > specifying configs, are there any other additional parameters that I'd need
> > to add within helm to specify that it needs to take in the latest savepoint
> > upon starting?
> >
> 


Re:kafka connector从指定timestamp开始消费

2020-06-11 Thread Matt Wang
hi,这个功能目前已经在 Flink 中实现了,参考 [1],1.11.0 开始支持


[1]. https://issues.apache.org/jira/browse/FLINK-15220;




---
Best,
Matt Wang


On 06/12/2020 10:37,Kyle Zhang wrote:
Hi,
kafka connector 
ddl能不能像flinkKafkaConsumer.setStartFromTimestamp(xx)一样从指定timestamp开始消费,我看文档里只提到了earliest-offset,latest-offset,group-offsets,specific-offsets

CREATE TABLE MyUserTable (
...
) WITH (
'connector.type' = 'kafka',   

'connector.version' = '0.11', -- required: valid connector versions are
-- "0.8", "0.9", "0.10", "0.11", and "universal"

'connector.topic' = 'topic_name', -- required: topic name from which the table 
is read

'connector.properties.zookeeper.connect' = 'localhost:2181', -- required: 
specify the ZooKeeper connection string
'connector.properties.bootstrap.servers' = 'localhost:9092', -- required: 
specify the Kafka server connection string
'connector.properties.group.id' = 'testGroup', --optional: required in Kafka 
consumer, specify consumer group
'connector.startup-mode' = 'earliest-offset',-- optional: valid modes are 
"earliest-offset",
-- "latest-offset", "group-offsets",
-- or "specific-offsets"

-- optional: used in case of startup mode with specific offsets
'connector.specific-offsets' = 'partition:0,offset:42;partition:1,offset:300',

'connector.sink-partitioner' = '...',  -- optional: output partitioning from 
Flink's partitions
-- into Kafka's partitions valid are "fixed"
-- (each Flink partition ends up in at most one Kafka partition),
-- "round-robin" (a Flink partition is distributed to
-- Kafka partitions round-robin)
-- "custom" (use a custom FlinkKafkaPartitioner subclass)
-- optional: used in case of sink partitioner custom
'connector.sink-partitioner-class' = 'org.mycompany.MyPartitioner',

'format.type' = '...', -- required: Kafka connector requires to 
specify a format,
...-- the supported formats are 'csv', 
'json' and 'avro'.
-- Please refer to Table Formats section for more details.
)



Re: Challenges Deploying Flink With Savepoints On Kubernetes

2020-06-11 Thread Matt Magsombol
I'm not the original poster, but I'm running into this same issue. What you 
just described is exactly what I want. I presume you guys are using some 
variant of this helm 
https://github.com/docker-flink/examples/tree/master/helm/flink to configure 
your k8s cluster? I'm also assuming that this cluster is running as a job 
cluster and not a session cluster right?
If so, how did you guys set up the deployments.yaml file such that it picks up 
the latest savepoint from a savepoint directory ( and what happens if that 
savepoint directory is empty? This is for cases when we're starting a new 
cluster, new job from scratch and there's no need to recover from previous 
savepoint ).

On 2019/09/24 16:23:52, Hao Sun  wrote: 
> We always make a savepoint before we shutdown the job-cluster. So the
> savepoint is always the latest. When we fix a bug or change the job graph,
> it can resume well.
> We only use checkpoints for unplanned downtime, e.g. K8S killed JM/TM,
> uncaught exception, etc.
> 
> Maybe I do not understand your use case well, I do not see a need to start
> from checkpoint after a bug fix.
> From what I know, currently you can use checkpoint as a savepoint as well
> 
> Hao Sun
> 
> 
> On Tue, Sep 24, 2019 at 7:48 AM Yuval Itzchakov  wrote:
> 
> > AFAIK there's currently nothing implemented to solve this problem, but
> > working on a possible fix can be implemented on top of
> > https://github.com/lyft/flinkk8soperator
> >  which already
> > has a pretty fancy state machine for rolling upgrades. I'd love to be
> > involved as this is an issue I've been thinking about as well.
> >
> > Yuval
> >
> > On Tue, Sep 24, 2019 at 5:02 PM Sean Hester 
> > wrote:
> >
> >> hi all--we've run into a gap (knowledge? design? tbd?) for our use cases
> >> when deploying Flink jobs to start from savepoints using the job-cluster
> >> mode in Kubernetes.
> >>
> >> we're running a ~15 different jobs, all in job-cluster mode, using a mix
> >> of Flink 1.8.1 and 1.9.0, under GKE (Google Kubernetes Engine). these are
> >> all long-running streaming jobs, all essentially acting as microservices.
> >> we're using Helm charts to configure all of our deployments.
> >>
> >> we have a number of use cases where we want to restart jobs from a
> >> savepoint to replay recent events, i.e. when we've enhanced the job logic
> >> or fixed a bug. but after the deployment we want to have the job resume
> >> it's "long-running" behavior, where any unplanned restarts resume from the
> >> latest checkpoint.
> >>
> >> the issue we run into is that any obvious/standard/idiomatic Kubernetes
> >> deployment includes the savepoint argument in the configuration. if the Job
> >> Manager container(s) have an unplanned restart, when they come back up they
> >> will start from the savepoint instead of resuming from the latest
> >> checkpoint. everything is working as configured, but that's not exactly
> >> what we want. we want the savepoint argument to be transient somehow (only
> >> used during the initial deployment), but Kubernetes doesn't really support
> >> the concept of transient configuration.
> >>
> >> i can see a couple of potential solutions that either involve custom code
> >> in the jobs or custom logic in the container (i.e. a custom entrypoint
> >> script that records that the configured savepoint has already been used in
> >> a file on a persistent volume or GCS, and potentially when/why/by which
> >> deployment). but these seem like unexpected and hacky solutions. before we
> >> head down that road i wanted to ask:
> >>
> >>- is this is already a solved problem that i've missed?
> >>- is this issue already on the community's radar?
> >>
> >> thanks in advance!
> >>
> >> --
> >> *Sean Hester* | Senior Staff Software Engineer | m. 404-828-0865
> >> 3525 Piedmont Rd. NE, Building 6, Suite 500, Atlanta, GA 30305
> >> 
> >> 
> >> *Altitude 2019 in San Francisco | Sept. 23 - 25*
> >> It’s not just an IT conference, it’s “a complete learning and networking
> >> experience” 
> >> 
> >>
> >>
> >
> > --
> > Best Regards,
> > Yuval Itzchakov.
> >
> 


Running Kubernetes on Flink with Savepoint

2020-06-10 Thread Matt Magsombol
We're currently using this template: 
https://github.com/docker-flink/examples/tree/master/helm/flink for running 
kubernetes flink for running a job specific cluster ( with a nit of specifying 
the class as the main runner for the cluster ).


How would I go about setting up adding savepoints, so that we can edit our 
currently existing running jobs to add pipes to the flink job without having to 
restart our state? Reasoning is that our state has a 1 day TTL and updating our 
code without state will have to restart this from scratch.

Through documentation, I see that I'd need to run some sort of command. This is 
not possible to be consistent if we're using the helm charts specified in the 
link.

I see this email thread talking about a certain problem with savepoints + 
kubernetes but doesn't quite specify how to set this up with helm: 
https://lists.apache.org/thread.html/4299518f4da2810aa88fe6b21f841880b619f3f8ac264084a318c034%40%3Cuser.flink.apache.org%3E


According to hasun@zendesk from that thread, they mention that "We always make 
a savepoint before we shutdown the job-cluster. So the savepoint is always the 
latest. When we fix a bug or change the job graph, it can resume well."

This is the exact use case that I'm looking to appease. Other than specifying 
configs, are there any other additional parameters that I'd need to add within 
helm to specify that it needs to take in the latest savepoint upon starting?


Re:flink??????????????????

2020-06-10 Thread Matt Wang
kafka ?? 0.11.0 ?? flink  EXACTLY-ONCE?? 
send ?? kafka commit 
?? kafka 
??  isolation.level 
 commit 


---
Best,
Matt Wang


On 06/10/2020 14:28??Yichao Yang<1048262...@qq.com> wrote??
Hi


sink 
??kafkakafka1.0??kafkaEXACTLY-ONCE


Best,
Yichao Yang




----
??:"??"

Re: Tumbling windows - increasing checkpoint size over time

2020-06-05 Thread Wissman, Matt
Guowei,

I had a different Flink app that was using 10 or15s intervals – it had a 
similar behavior but not nearly as bad as the 2s interval pipeline. Both have 
much have much longer checkpoint intervals now.

Here is the state config:

state.backend: rocksdb
state.checkpoints.dir: {{ .Values.flink.checkpointUrl }}/checkpoints
state.savepoints.dir: {{ .Values.flink.checkpointUrl }}/savepoints
state.backend.incremental: true
state.backend.rocksdb.localdir: /tmp/taskmanager

Thanks!

-Matt

From: Guowei Ma 
Date: Monday, June 1, 2020 at 1:01 AM
To: "Wissman, Matt" 
Cc: Till Rohrmann , "user@flink.apache.org" 

Subject: Re: Tumbling windows - increasing checkpoint size over time

Hi,
1. I am not the expert of Rocksdb. However, I think the state garbage 
collection depends on the rocksdb compaction especially if the checkpoint 
interval is 2s.  This is because the window element is still in the sst file 
even if the window is triggerred.
2. Do you try the checkpoint interval 15s?  I guess it might reduce the state 
size.
3. Would you like to share your rocksdb configuration? I think this could help 
other state guys to know whether it is related to rocksdb or not.
Best,
Guowei


Wissman, Matt mailto:matt.wiss...@here.com>> 
于2020年5月29日周五 下午10:30写道:
Till,

I’ll have to calculate the theoretical upper bound for our window state. Our 
data distribution and rate has a predictable pattern but the data rate pattern 
didn’t match the checkpoint size growth.

[cid:image001.png@01D63B59.521B7E70]


Here is a screenshot of the checkpoint size for the pipeline. The yellow 
section is when we had the checkpoint interval at 2 secs – the size seems to 
grow linearly and indefinitely. The blue, red and orange lines are in line with 
what I’d expect in terms of checkpoint size (100KB-2 MB).

The incoming stream data for the whole time period is consistent (follows the 
same pattern).

Changing the checkpoint interval seemed to fix the problem of the large and 
growing checkpoint size but I’m not sure why.

Thanks!

-Matt

From: Till Rohrmann mailto:trohrm...@apache.org>>
Date: Thursday, May 28, 2020 at 10:48 AM
To: "Wissman, Matt" mailto:matt.wiss...@here.com>>
Cc: Guowei Ma mailto:guowei@gmail.com>>, 
"user@flink.apache.org<mailto:user@flink.apache.org>" 
mailto:user@flink.apache.org>>
Subject: Re: Tumbling windows - increasing checkpoint size over time

Hi Matt,

when using tumbling windows, then the checkpoint size is not only dependent on 
the number of keys (which is equivalent to the number of open windows) but also 
on how many events arrive for each open window because the windows store every 
window event in its state. Hence, it can be the case that you see different 
checkpoint sizes depending on the actual data distribution which can change 
over time. Have you checked whether the data distribution and rate is constant 
over time?

What is the expected number of keys, size of events and number of events per 
key per second? Based on this information one could try to estimate an upper 
state size bound.

Cheers,
Till

On Wed, May 27, 2020 at 8:19 PM Wissman, Matt 
mailto:matt.wiss...@here.com>> wrote:

Hello Till & Guowei,



Thanks for the replies! Here is a snippet of the window function:



  SingleOutputStreamOperator aggregatedStream = dataStream

.keyBy(idKeySelector())

.window(TumblingProcessingTimeWindows.of(seconds(15)))

.apply(new Aggregator())

.name("Aggregator")

.setParallelism(3);



Checkpoint interval: 2 secs when the checkpoint size grew from 100KB to 100MB 
(we’ve since changed the 5 minutes, which has slowed the checkpoint size growth)

Lateness allowed: 0

Watermarks: nothing is set in terms of watermarks – do they apply for Process 
Time?

The set of keys processed in the stream is stable over time

The checkpoint size actually looks pretty stable now that the interval was 
increased. Is it possible that the short checkpoint interval prevented 
compaction?

Thanks!

-Matt


From: Till Rohrmann mailto:trohrm...@apache.org>>
Date: Wednesday, May 27, 2020 at 9:00 AM
To: Guowei Ma mailto:guowei@gmail.com>>
Cc: "Wissman, Matt" mailto:matt.wiss...@here.com>>, 
"user@flink.apache.org<mailto:user@flink.apache.org>" 
mailto:user@flink.apache.org>>
Subject: Re: Tumbling windows - increasing checkpoint size over time

LEARN FAST: This email originated outside of HERE.
Please do not click on links or open attachments unless you recognize the 
sender and know the content is safe. Thank you.

Hi Matt,

could you give us a bit more information about the windows you are using? They 
are tumbling windows. What's the size of the windows? Do you allow lateness of 
events? What's your checkpoint interval?

Are you using event time? If yes, how is the watermark

Re: Tumbling windows - increasing checkpoint size over time

2020-05-29 Thread Wissman, Matt
Till,

I’ll have to calculate the theoretical upper bound for our window state. Our 
data distribution and rate has a predictable pattern but the data rate pattern 
didn’t match the checkpoint size growth.

[cid:image001.png@01D6359B.BE0FD540]

Here is a screenshot of the checkpoint size for the pipeline. The yellow 
section is when we had the checkpoint interval at 2 secs – the size seems to 
grow linearly and indefinitely. The blue, red and orange lines are in line with 
what I’d expect in terms of checkpoint size (100KB-2 MB).

The incoming stream data for the whole time period is consistent (follows the 
same pattern).

Changing the checkpoint interval seemed to fix the problem of the large and 
growing checkpoint size but I’m not sure why.

Thanks!

-Matt

From: Till Rohrmann 
Date: Thursday, May 28, 2020 at 10:48 AM
To: "Wissman, Matt" 
Cc: Guowei Ma , "user@flink.apache.org" 

Subject: Re: Tumbling windows - increasing checkpoint size over time

Hi Matt,

when using tumbling windows, then the checkpoint size is not only dependent on 
the number of keys (which is equivalent to the number of open windows) but also 
on how many events arrive for each open window because the windows store every 
window event in its state. Hence, it can be the case that you see different 
checkpoint sizes depending on the actual data distribution which can change 
over time. Have you checked whether the data distribution and rate is constant 
over time?

What is the expected number of keys, size of events and number of events per 
key per second? Based on this information one could try to estimate an upper 
state size bound.

Cheers,
Till

On Wed, May 27, 2020 at 8:19 PM Wissman, Matt 
mailto:matt.wiss...@here.com>> wrote:

Hello Till & Guowei,



Thanks for the replies! Here is a snippet of the window function:



  SingleOutputStreamOperator aggregatedStream = dataStream

.keyBy(idKeySelector())

.window(TumblingProcessingTimeWindows.of(seconds(15)))

.apply(new Aggregator())

.name("Aggregator")

.setParallelism(3);



Checkpoint interval: 2 secs when the checkpoint size grew from 100KB to 100MB 
(we’ve since changed the 5 minutes, which has slowed the checkpoint size growth)

Lateness allowed: 0

Watermarks: nothing is set in terms of watermarks – do they apply for Process 
Time?

The set of keys processed in the stream is stable over time

The checkpoint size actually looks pretty stable now that the interval was 
increased. Is it possible that the short checkpoint interval prevented 
compaction?

Thanks!

-Matt


From: Till Rohrmann mailto:trohrm...@apache.org>>
Date: Wednesday, May 27, 2020 at 9:00 AM
To: Guowei Ma mailto:guowei@gmail.com>>
Cc: "Wissman, Matt" mailto:matt.wiss...@here.com>>, 
"user@flink.apache.org<mailto:user@flink.apache.org>" 
mailto:user@flink.apache.org>>
Subject: Re: Tumbling windows - increasing checkpoint size over time

LEARN FAST: This email originated outside of HERE.
Please do not click on links or open attachments unless you recognize the 
sender and know the content is safe. Thank you.

Hi Matt,

could you give us a bit more information about the windows you are using? They 
are tumbling windows. What's the size of the windows? Do you allow lateness of 
events? What's your checkpoint interval?

Are you using event time? If yes, how is the watermark generated?

You said that the number of events per window is more or less constant. Does 
this is also apply to the size of the individual events?

Cheers,
Till

On Wed, May 27, 2020 at 1:21 AM Guowei Ma 
mailto:guowei@gmail.com>> wrote:
Hi, Matt
The total size of the state of the window operator is related to the
number of windows. For example if you use keyby+tumblingwindow there
would be keys number of windows.
Hope this helps.
Best,
Guowei

Wissman, Matt mailto:matt.wiss...@here.com>> 
于2020年5月27日周三 上午3:35写道:
>
> Hello Flink Community,
>
>
>
> I’m running a Flink pipeline that uses a tumbling window and incremental 
> checkpoint with RocksDB backed by s3. The number of objects in the window is 
> stable but overtime the checkpoint size grows seemingly unbounded. Within the 
> first few hours after bringing the Flink pipeline up, the checkpoint size is 
> around 100K but after a week of operation it grows to around 100MB. The 
> pipeline isn’t using any other Flink state besides the state that the window 
> uses. I think this has something to do with RocksDB’s compaction but 
> shouldn’t the tumbling window state expire and be purged from the checkpoint?
>
>
>
> Flink Version 1.7.1
>
>
>
> Thanks!
>
>
>
> -Matt


Re: Tumbling windows - increasing checkpoint size over time

2020-05-27 Thread Wissman, Matt
Hello Till & Guowei,



Thanks for the replies! Here is a snippet of the window function:



  SingleOutputStreamOperator aggregatedStream = dataStream

.keyBy(idKeySelector())

.window(TumblingProcessingTimeWindows.of(seconds(15)))

.apply(new Aggregator())

.name("Aggregator")

.setParallelism(3);



Checkpoint interval: 2 secs when the checkpoint size grew from 100KB to 100MB 
(we’ve since changed the 5 minutes, which has slowed the checkpoint size growth)

Lateness allowed: 0

Watermarks: nothing is set in terms of watermarks – do they apply for Process 
Time?

The set of keys processed in the stream is stable over time

The checkpoint size actually looks pretty stable now that the interval was 
increased. Is it possible that the short checkpoint interval prevented 
compaction?

Thanks!

-Matt


From: Till Rohrmann 
Date: Wednesday, May 27, 2020 at 9:00 AM
To: Guowei Ma 
Cc: "Wissman, Matt" , "user@flink.apache.org" 

Subject: Re: Tumbling windows - increasing checkpoint size over time

LEARN FAST: This email originated outside of HERE.
Please do not click on links or open attachments unless you recognize the 
sender and know the content is safe. Thank you.

Hi Matt,

could you give us a bit more information about the windows you are using? They 
are tumbling windows. What's the size of the windows? Do you allow lateness of 
events? What's your checkpoint interval?

Are you using event time? If yes, how is the watermark generated?

You said that the number of events per window is more or less constant. Does 
this is also apply to the size of the individual events?

Cheers,
Till

On Wed, May 27, 2020 at 1:21 AM Guowei Ma 
mailto:guowei@gmail.com>> wrote:
Hi, Matt
The total size of the state of the window operator is related to the
number of windows. For example if you use keyby+tumblingwindow there
would be keys number of windows.
Hope this helps.
Best,
Guowei

Wissman, Matt mailto:matt.wiss...@here.com>> 
于2020年5月27日周三 上午3:35写道:
>
> Hello Flink Community,
>
>
>
> I’m running a Flink pipeline that uses a tumbling window and incremental 
> checkpoint with RocksDB backed by s3. The number of objects in the window is 
> stable but overtime the checkpoint size grows seemingly unbounded. Within the 
> first few hours after bringing the Flink pipeline up, the checkpoint size is 
> around 100K but after a week of operation it grows to around 100MB. The 
> pipeline isn’t using any other Flink state besides the state that the window 
> uses. I think this has something to do with RocksDB’s compaction but 
> shouldn’t the tumbling window state expire and be purged from the checkpoint?
>
>
>
> Flink Version 1.7.1
>
>
>
> Thanks!
>
>
>
> -Matt


Tumbling windows - increasing checkpoint size over time

2020-05-26 Thread Wissman, Matt
Hello Flink Community,

I’m running a Flink pipeline that uses a tumbling window and incremental 
checkpoint with RocksDB backed by s3. The number of objects in the window is 
stable but overtime the checkpoint size grows seemingly unbounded. Within the 
first few hours after bringing the Flink pipeline up, the checkpoint size is 
around 100K but after a week of operation it grows to around 100MB. The 
pipeline isn’t using any other Flink state besides the state that the window 
uses. I think this has something to do with RocksDB’s compaction but shouldn’t 
the tumbling window state expire and be purged from the checkpoint?

Flink Version 1.7.1

Thanks!

-Matt


Flink long state TTL Concerns

2020-03-19 Thread Matt Magsombol
Suppose I'm using state stored in-memory that has a TTL of 7 days max. Should I 
run into any issues with state this long other than potential OOM?

Let's suppose I extend this such that we add rocksdb...any concerns with this 
with respect to maintenance?

Most of the examples that I've been seeing seem to pair state with timewindows 
but I'll only read from this state every 15 seconds ( or some small timewindow 
). After each timewindow, I *won't* be cleaning up the data within the state 
b/c I'll need to re-lookup from this state on future time windows. I'll 
effectively rely on TTL based on key expiration time and I was wondering what 
potential issues I should watch out for this.


standalone flink savepoint restoration

2019-10-16 Thread Matt Anger
Hello everyone,
I am running a flink job in k8s as a standalone HA job. Now I updated my
job w/ some additional sinks, which I guess have made the checkpoints
incompatible with the newer version, meaning flink now crashes on bootup
with the following:
 Caused by: java.lang.IllegalStateException: There is no operator for the
state c9b81dfc309f1368ac7efb5864e7b693

So I rollback the deployment, log into the pod and create a savestate, and
then modify my args to add

--allowNonRestoredState
and
-s 

but it doesn't look like the standalone cluster is respecting those
arguments. I've tried searching around, but haven't found any solutions.
The docker image I have is running the docker-entrypoint.sh and the full
arg list is below as copy-pastad out of my k8s yaml file:

 47 - job-cluster
 48 - -Djobmanager.rpc.address=$(SERVICE_NAME)
 49 - -Djobmanager.rpc.port=6123
 50 - -Dresourcemanager.rpc.port=6123
 51 - -Dparallelism.default=$(NUM_WORKERS)
 52 - -Dblob.server.port=6124
 53 - -Dqueryable-state.server.ports=6125
 54 - -Ds3.access-key=$(AWS_ACCESS_KEY_ID)
 55 - -Ds3.secret-key=$(AWS_SECRET_ACCESS_KEY)
 56 - -Dhigh-availability=zookeeper
 57 - -Dhigh-availability.jobmanager.port=50010
 58 - -Dhigh-availability.storageDir=$(S3_HA)
 59 - -Dhigh-availability.zookeeper.quorum=$(ZK_QUORUM)
 60 - -Dstate.backend=filesystem
 61 - -Dstate.checkpoints.dir=$(S3_CHECKPOINT)
 62 - -Dstate.savepoints.dir=$(S3_SAVEPOINT)
 63 - --allowNonRestoredState
 64 - -s $(S3_SAVEPOINT)

I originally didn't have the last 2 args, I added them based upon various
emails I saw on this list and other google search results, to no avail.

Thanks
-Matt


Using sensitive configuration/credentials

2018-08-08 Thread Matt Moore
I'm wondering what the best practice is for using secrets in a Flink program, 
and I can't find any info in the docs or posted anywhere else.

I need to store an access token to one of my APIs for flink to use to dump 
results into, and right now I'm passing it through as a configuration 
parameter, but that doesn't seem like the most secure thing to do and the value 
shows up in the Flink Dashboard under Configuration which is less than ideal.

Has anyone else dealt with a situation like this?

Thanks,



Re: [BUG?] Cannot Load User Class on Local Environment

2017-05-29 Thread Matt
Thanks for looking into it Till!

I'll try changing that line locally and then send a JIRA issue. When it
gets officially fixed I'll probably create an Ignite-Flink connector to
replace the older and less efficient one [1]. Users will be able to create
Flink jobs on Ignite nodes, right where the data is stored.

[1] https://apacheignite-mix.readme.io/docs/flink-streamer

Best,
Matt



Matt

On Mon, May 29, 2017 at 9:37 AM, Till Rohrmann <trohrm...@apache.org> wrote:

> Hi Matt,
>
> I looked into it and it seems that the Task does not respect the context
> class loader. The problem is that the local mode was not developed with the
> intention to be executed within something like Ignite or an application
> server. It rather assumes that you have a user code jar which is sent to
> the TaskManager. This jar is then added to an URLClassLoader which is used
> for user code class loading. In the case of the local execution mode, Flink
> assumes that all user code jars are in the system class loader (which
> usually holds true when running examples from the IDE). That is the reason
> why we don’t check the TCCL. In order to fix your problem you can replace
> BlobLibraryCacheManager.java:298 with this.classLoader = new
> FlinkUserCodeClassLoader(libraryURLs, Thread.currentThread().
> getContextClassLoader());. Alternatively, you can build your job, copy
> the user code jar to IGNITE_HOME/libs and then restart ignite.
>
> If you want to get the TCCL problem properly fixed, I suggest to open a
> JIRA issue here [1].
>
> [1] https://issues.apache.org/jira/browse/FLINK/?
> selectedTab=com.atlassian.jira.jira-projects-plugin:summary-panel
>
> Cheers,
> Till
> ​
>
> On Mon, May 29, 2017 at 12:02 PM, Matt <dromitl...@gmail.com> wrote:
>
>> Hi Till,
>>
>> Have you found anything or are you still busy with the release? I have no
>> idea what may be wrong, but let me know if I can help you in any way to
>> find what may be going on.
>>
>> Best,
>> Matt
>>
>> On Wed, May 24, 2017 at 5:37 AM, Till Rohrmann <trohrm...@apache.org>
>> wrote:
>>
>>> Hi Matt,
>>>
>>> sorry for not coming back to you sooner. We're currently in the release
>>> phase and this consumes a lot of capacities.
>>>
>>> I tried to go to the linked repo, but Github tells me that it does not
>>> exist. Have you removed it?
>>>
>>> Cheers,
>>> Till
>>>
>>> On Wed, May 17, 2017 at 10:56 PM, Matt <dromitl...@gmail.com> wrote:
>>>
>>>> Check the repo at [1].
>>>>
>>>> The important step which I think is what you missed is running an
>>>> Ignite node on your computer so the Java code, which launches an Ignite
>>>> client on the JVM, connects to it and executes Flink on that node on a
>>>> local environment.
>>>>
>>>> Be aware "peerClassLoadingEnabled" should be enabled (as in
>>>> ignite.xml), because it must match the config on the client node.
>>>>
>>>> If you follow the Readme file it's everything there, if you have any
>>>> problem let me know!
>>>>
>>>> Cheers,
>>>> Matt
>>>>
>>>> [1] https://github.com/Dromit/FlinkTest
>>>>
>>>> On Wed, May 17, 2017 at 3:49 PM, Matt <dromitl...@gmail.com> wrote:
>>>>
>>>>> Thanks for your help Till.
>>>>>
>>>>> I will create a self contained test case in a moment and send you the
>>>>> link, wait for it.
>>>>>
>>>>> Cheers,
>>>>> Matt
>>>>>
>>>>> On Wed, May 17, 2017 at 4:38 AM, Till Rohrmann <trohrm...@apache.org>
>>>>> wrote:
>>>>>
>>>>>> Hi Matt,
>>>>>>
>>>>>> alright, then we have to look into it again. I tried to run your
>>>>>> example, however, it does not seem to be self-contained. Using Ignite 
>>>>>> 2.0.0
>>>>>> with -DIGNITE_QUIET=false -Xms512m the Ignite object seems to be stuck in
>>>>>> Ignite#start. In the logs I see the following warning:
>>>>>>
>>>>>> May 17, 2017 9:36:22 AM org.apache.ignite.logger.java.JavaLogger warning
>>>>>> WARNING: TcpDiscoveryMulticastIpFinder has no pre-configured addresses 
>>>>>> (it is recommended in production to specify at least one address in 
>>>>>> TcpDiscoveryMulticastIpFinder.getAddresses() configuration property)
>>>>&

Re: [BUG?] Cannot Load User Class on Local Environment

2017-05-17 Thread Matt
Check the repo at [1].

The important step which I think is what you missed is running an Ignite
node on your computer so the Java code, which launches an Ignite client on
the JVM, connects to it and executes Flink on that node on a local
environment.

Be aware "peerClassLoadingEnabled" should be enabled (as in ignite.xml),
because it must match the config on the client node.

If you follow the Readme file it's everything there, if you have any
problem let me know!

Cheers,
Matt

[1] https://github.com/Dromit/FlinkTest

On Wed, May 17, 2017 at 3:49 PM, Matt <dromitl...@gmail.com> wrote:

> Thanks for your help Till.
>
> I will create a self contained test case in a moment and send you the
> link, wait for it.
>
> Cheers,
> Matt
>
> On Wed, May 17, 2017 at 4:38 AM, Till Rohrmann <trohrm...@apache.org>
> wrote:
>
>> Hi Matt,
>>
>> alright, then we have to look into it again. I tried to run your example,
>> however, it does not seem to be self-contained. Using Ignite 2.0.0 with
>> -DIGNITE_QUIET=false -Xms512m the Ignite object seems to be stuck in
>> Ignite#start. In the logs I see the following warning:
>>
>> May 17, 2017 9:36:22 AM org.apache.ignite.logger.java.JavaLogger warning
>> WARNING: TcpDiscoveryMulticastIpFinder has no pre-configured addresses (it 
>> is recommended in production to specify at least one address in 
>> TcpDiscoveryMulticastIpFinder.getAddresses() configuration property)
>> May 17, 2017 9:36:24 AM org.apache.ignite.logger.java.JavaLogger warning
>> WARNING: IP finder returned empty addresses list. Please check IP finder 
>> configuration and make sure multicast works on your network. Will retry 
>> every 2 secs.
>>
>> However, I assume that this is not critical.
>>
>> Maybe you can tell me how I can run your example in order to debug it.
>>
>> Cheers,
>> Till
>> ​
>>
>> On Mon, May 15, 2017 at 10:05 PM, Matt <dromitl...@gmail.com> wrote:
>>
>>> Hi Till,
>>>
>>> I just tried with Flink 1.4 by compiling the current master branch on
>>> GitHub (as of this morning) and I still find the same problem as before. If
>>> I'm not wrong your PR was merged already, so your fixes should be part of
>>> the binary.
>>>
>>> I hope you have time to have a look at the test case in [1].
>>>
>>> Best,
>>> Matt
>>>
>>> [1] https://gist.github.com/17d82ee7dd921a0d649574a361cc017d
>>>
>>> On Thu, Apr 27, 2017 at 10:09 AM, Matt <dromitl...@gmail.com> wrote:
>>>
>>>> Hi Till,
>>>>
>>>> Great! Do you know if it's planned to be included in v1.2.x or should
>>>> we wait for v1.3? I'll give it a try as soon as it's merged.
>>>>
>>>> You're right about this approach launching a mini cluster on each
>>>> Ignite node. That is intentional, as described in my previous message on
>>>> the list [1].
>>>>
>>>> The idea is to collocate Flink jobs on Ignite nodes, so each dataflow
>>>> only processes the elements stored on the local in-memory database. I get
>>>> the impression this should be much faster than randomly picking a Flink
>>>> node and sending all the data over the network.
>>>>
>>>> Any insight on this?
>>>>
>>>> Cheers,
>>>> Matt
>>>>
>>>> [1] http://apache-flink-user-mailing-list-archive.2336050.n4.nab
>>>> ble.com/Flink-on-Ignite-Collocation-td12780.html
>>>>
>>>> On Thu, Apr 27, 2017 at 5:33 AM, Till Rohrmann <trohrm...@apache.org>
>>>> wrote:
>>>>
>>>>> I just copied my response because my other email address is not
>>>>> accepted on the user mailing list.
>>>>>
>>>>> Hi Matt,
>>>>>
>>>>> I think Stefan's analysis is correct. I have a PR open [1], where I
>>>>> fix the issue with the class loader.
>>>>>
>>>>> As a side note, by doing what you're doing, you will spawn on each
>>>>> Ignite node a new Flink mini cluster. These mini cluster won't communicate
>>>>> with each other and run independently. Is this what you intend to do?
>>>>>
>>>>> [1] https://github.com/apache/flink/pull/3781
>>>>>
>>>>> Cheers,
>>>>> Till
>>>>>
>>>>> On Wed, Apr 26, 2017 at 11:12 PM, Matt <dromitl...@gmail.com> wrote:
>>>>>
>>>>>> Let's wait for Till then, I 

Re: [BUG?] Cannot Load User Class on Local Environment

2017-05-17 Thread Matt
Thanks for your help Till.

I will create a self contained test case in a moment and send you the link,
wait for it.

Cheers,
Matt

On Wed, May 17, 2017 at 4:38 AM, Till Rohrmann <trohrm...@apache.org> wrote:

> Hi Matt,
>
> alright, then we have to look into it again. I tried to run your example,
> however, it does not seem to be self-contained. Using Ignite 2.0.0 with
> -DIGNITE_QUIET=false -Xms512m the Ignite object seems to be stuck in
> Ignite#start. In the logs I see the following warning:
>
> May 17, 2017 9:36:22 AM org.apache.ignite.logger.java.JavaLogger warning
> WARNING: TcpDiscoveryMulticastIpFinder has no pre-configured addresses (it is 
> recommended in production to specify at least one address in 
> TcpDiscoveryMulticastIpFinder.getAddresses() configuration property)
> May 17, 2017 9:36:24 AM org.apache.ignite.logger.java.JavaLogger warning
> WARNING: IP finder returned empty addresses list. Please check IP finder 
> configuration and make sure multicast works on your network. Will retry every 
> 2 secs.
>
> However, I assume that this is not critical.
>
> Maybe you can tell me how I can run your example in order to debug it.
>
> Cheers,
> Till
> ​
>
> On Mon, May 15, 2017 at 10:05 PM, Matt <dromitl...@gmail.com> wrote:
>
>> Hi Till,
>>
>> I just tried with Flink 1.4 by compiling the current master branch on
>> GitHub (as of this morning) and I still find the same problem as before. If
>> I'm not wrong your PR was merged already, so your fixes should be part of
>> the binary.
>>
>> I hope you have time to have a look at the test case in [1].
>>
>> Best,
>> Matt
>>
>> [1] https://gist.github.com/17d82ee7dd921a0d649574a361cc017d
>>
>> On Thu, Apr 27, 2017 at 10:09 AM, Matt <dromitl...@gmail.com> wrote:
>>
>>> Hi Till,
>>>
>>> Great! Do you know if it's planned to be included in v1.2.x or should we
>>> wait for v1.3? I'll give it a try as soon as it's merged.
>>>
>>> You're right about this approach launching a mini cluster on each Ignite
>>> node. That is intentional, as described in my previous message on the list
>>> [1].
>>>
>>> The idea is to collocate Flink jobs on Ignite nodes, so each dataflow
>>> only processes the elements stored on the local in-memory database. I get
>>> the impression this should be much faster than randomly picking a Flink
>>> node and sending all the data over the network.
>>>
>>> Any insight on this?
>>>
>>> Cheers,
>>> Matt
>>>
>>> [1] http://apache-flink-user-mailing-list-archive.2336050.n4.nab
>>> ble.com/Flink-on-Ignite-Collocation-td12780.html
>>>
>>> On Thu, Apr 27, 2017 at 5:33 AM, Till Rohrmann <trohrm...@apache.org>
>>> wrote:
>>>
>>>> I just copied my response because my other email address is not
>>>> accepted on the user mailing list.
>>>>
>>>> Hi Matt,
>>>>
>>>> I think Stefan's analysis is correct. I have a PR open [1], where I fix
>>>> the issue with the class loader.
>>>>
>>>> As a side note, by doing what you're doing, you will spawn on each
>>>> Ignite node a new Flink mini cluster. These mini cluster won't communicate
>>>> with each other and run independently. Is this what you intend to do?
>>>>
>>>> [1] https://github.com/apache/flink/pull/3781
>>>>
>>>> Cheers,
>>>> Till
>>>>
>>>> On Wed, Apr 26, 2017 at 11:12 PM, Matt <dromitl...@gmail.com> wrote:
>>>>
>>>>> Let's wait for Till then, I hope he can figure this out.
>>>>>
>>>>> On Wed, Apr 26, 2017 at 11:03 AM, Stefan Richter <
>>>>> s.rich...@data-artisans.com> wrote:
>>>>>
>>>>>> Ok, now the question is also about what classloaders Ignite is
>>>>>> creating and how they are used, but the relevant code line in Flink is
>>>>>> probably in FlinkMiniCluster.scala, line 538 (current master):
>>>>>>
>>>>>>  try {
>>>>>>  JobClient.submitJobAndWait(
>>>>>>clientActorSystem,
>>>>>>configuration,
>>>>>>leaderRetrievalService,
>>>>>>jobGraph,
>>>>>>timeout,
>>>>>>printUpdates,
>>>>>>this.getClass.getClassLoader())
>>>>>> } finally {
>>>>>>if(!useSingleActorSystem) {
>>>

Re: [BUG?] Cannot Load User Class on Local Environment

2017-05-15 Thread Matt
Hi Till,

I just tried with Flink 1.4 by compiling the current master branch on
GitHub (as of this morning) and I still find the same problem as before. If
I'm not wrong your PR was merged already, so your fixes should be part of
the binary.

I hope you have time to have a look at the test case in [1].

Best,
Matt

[1] https://gist.github.com/17d82ee7dd921a0d649574a361cc017d

On Thu, Apr 27, 2017 at 10:09 AM, Matt <dromitl...@gmail.com> wrote:

> Hi Till,
>
> Great! Do you know if it's planned to be included in v1.2.x or should we
> wait for v1.3? I'll give it a try as soon as it's merged.
>
> You're right about this approach launching a mini cluster on each Ignite
> node. That is intentional, as described in my previous message on the list
> [1].
>
> The idea is to collocate Flink jobs on Ignite nodes, so each dataflow only
> processes the elements stored on the local in-memory database. I get the
> impression this should be much faster than randomly picking a Flink node
> and sending all the data over the network.
>
> Any insight on this?
>
> Cheers,
> Matt
>
> [1] http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/Flink-on-Ignite-Collocation-td12780.html
>
> On Thu, Apr 27, 2017 at 5:33 AM, Till Rohrmann <trohrm...@apache.org>
> wrote:
>
>> I just copied my response because my other email address is not accepted
>> on the user mailing list.
>>
>> Hi Matt,
>>
>> I think Stefan's analysis is correct. I have a PR open [1], where I fix
>> the issue with the class loader.
>>
>> As a side note, by doing what you're doing, you will spawn on each Ignite
>> node a new Flink mini cluster. These mini cluster won't communicate with
>> each other and run independently. Is this what you intend to do?
>>
>> [1] https://github.com/apache/flink/pull/3781
>>
>> Cheers,
>> Till
>>
>> On Wed, Apr 26, 2017 at 11:12 PM, Matt <dromitl...@gmail.com> wrote:
>>
>>> Let's wait for Till then, I hope he can figure this out.
>>>
>>> On Wed, Apr 26, 2017 at 11:03 AM, Stefan Richter <
>>> s.rich...@data-artisans.com> wrote:
>>>
>>>> Ok, now the question is also about what classloaders Ignite is creating
>>>> and how they are used, but the relevant code line in Flink is probably in
>>>> FlinkMiniCluster.scala, line 538 (current master):
>>>>
>>>>  try {
>>>>  JobClient.submitJobAndWait(
>>>>clientActorSystem,
>>>>configuration,
>>>>leaderRetrievalService,
>>>>jobGraph,
>>>>timeout,
>>>>printUpdates,
>>>>this.getClass.getClassLoader())
>>>> } finally {
>>>>if(!useSingleActorSystem) {
>>>>  // we have to shutdown the just created actor system
>>>>  shutdownJobClientActorSystem(clientActorSystem)
>>>>}
>>>>  }
>>>>
>>>>
>>>> This is what is executed as part of executing a job through
>>>> LocalEnvironment. As we can see, the classloader is set to the classloader
>>>> of FlinkMiniCluster. Depending on the classloader structure inside Ignite,
>>>> this classloader might not know your user code. What you could do is
>>>> changing this line in a custom Flink build, changing line 538 for example
>>>> to Thread.currentThread().getContextClassloader() and ensuring that
>>>> the context classloader ins the runnable is a classloader that a) knows the
>>>> user code and b) is a child of the classloader that knows the Ignite and
>>>> Flink classes. Notice that this is not a general solution and should not
>>>> become a general fix.
>>>>
>>>> I have heard that Till is about to change some things about local
>>>> execution, so I included him in CC. Maybe he can provide additional hints
>>>> how your use case might be better supported in the upcoming Flink 1.3.
>>>>
>>>> Best,
>>>> Stefan
>>>>
>>>> Am 25.04.2017 um 22:50 schrieb Matt <dromitl...@gmail.com>:
>>>>
>>>> I updated the code a little bit for clarity, now the line #56 mentioned
>>>> in my previous message is line #25.
>>>>
>>>> In summary the error I'm getting is this:
>>>>
>>>> ---
>>>> Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException:
>>>> Cannot load user class: com.test.Test
>>>> ClassLoader info: URL ClassLoader:
>>>> Class not res

Re: [BUG?] Cannot Load User Class on Local Environment

2017-04-27 Thread Matt
Hi Till,

Great! Do you know if it's planned to be included in v1.2.x or should we
wait for v1.3? I'll give it a try as soon as it's merged.

You're right about this approach launching a mini cluster on each Ignite
node. That is intentional, as described in my previous message on the list
[1].

The idea is to collocate Flink jobs on Ignite nodes, so each dataflow only
processes the elements stored on the local in-memory database. I get the
impression this should be much faster than randomly picking a Flink node
and sending all the data over the network.

Any insight on this?

Cheers,
Matt

[1]
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-on-Ignite-Collocation-td12780.html

On Thu, Apr 27, 2017 at 5:33 AM, Till Rohrmann <trohrm...@apache.org> wrote:

> I just copied my response because my other email address is not accepted
> on the user mailing list.
>
> Hi Matt,
>
> I think Stefan's analysis is correct. I have a PR open [1], where I fix
> the issue with the class loader.
>
> As a side note, by doing what you're doing, you will spawn on each Ignite
> node a new Flink mini cluster. These mini cluster won't communicate with
> each other and run independently. Is this what you intend to do?
>
> [1] https://github.com/apache/flink/pull/3781
>
> Cheers,
> Till
>
> On Wed, Apr 26, 2017 at 11:12 PM, Matt <dromitl...@gmail.com> wrote:
>
>> Let's wait for Till then, I hope he can figure this out.
>>
>> On Wed, Apr 26, 2017 at 11:03 AM, Stefan Richter <
>> s.rich...@data-artisans.com> wrote:
>>
>>> Ok, now the question is also about what classloaders Ignite is creating
>>> and how they are used, but the relevant code line in Flink is probably in
>>> FlinkMiniCluster.scala, line 538 (current master):
>>>
>>>  try {
>>>  JobClient.submitJobAndWait(
>>>clientActorSystem,
>>>configuration,
>>>leaderRetrievalService,
>>>jobGraph,
>>>timeout,
>>>printUpdates,
>>>this.getClass.getClassLoader())
>>> } finally {
>>>if(!useSingleActorSystem) {
>>>  // we have to shutdown the just created actor system
>>>  shutdownJobClientActorSystem(clientActorSystem)
>>>}
>>>  }
>>>
>>>
>>> This is what is executed as part of executing a job through
>>> LocalEnvironment. As we can see, the classloader is set to the classloader
>>> of FlinkMiniCluster. Depending on the classloader structure inside Ignite,
>>> this classloader might not know your user code. What you could do is
>>> changing this line in a custom Flink build, changing line 538 for example
>>> to Thread.currentThread().getContextClassloader() and ensuring that the
>>> context classloader ins the runnable is a classloader that a) knows the
>>> user code and b) is a child of the classloader that knows the Ignite and
>>> Flink classes. Notice that this is not a general solution and should not
>>> become a general fix.
>>>
>>> I have heard that Till is about to change some things about local
>>> execution, so I included him in CC. Maybe he can provide additional hints
>>> how your use case might be better supported in the upcoming Flink 1.3.
>>>
>>> Best,
>>> Stefan
>>>
>>> Am 25.04.2017 um 22:50 schrieb Matt <dromitl...@gmail.com>:
>>>
>>> I updated the code a little bit for clarity, now the line #56 mentioned
>>> in my previous message is line #25.
>>>
>>> In summary the error I'm getting is this:
>>>
>>> ---
>>> Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException:
>>> Cannot load user class: com.test.Test
>>> ClassLoader info: URL ClassLoader:
>>> Class not resolvable through given classloader.
>>> ---
>>>
>>> But if I'm not wrong, after trying to load the class through
>>> URLClassLoader, Flink should try loading it with its parent ClassLoader,
>>> which should be the same ClassLoader that executed the environment, and it
>>> does have access to the class.
>>>
>>> Not sure what is wrong.
>>>
>>> On Tue, Apr 25, 2017 at 5:38 PM, Matt <dromitl...@gmail.com> wrote:
>>>
>>>> Hi Stefan,
>>>>
>>>> Check the code here: https://gist.github.com/
>>>> 17d82ee7dd921a0d649574a361cc017d , the output is at the bottom of the
>>>> page.
>>>>
>>>> Here are the results of the additional tests you mentioned:
>>>>
>>>> 1. I was a

Re: [BUG?] Cannot Load User Class on Local Environment

2017-04-26 Thread Matt
Let's wait for Till then, I hope he can figure this out.

On Wed, Apr 26, 2017 at 11:03 AM, Stefan Richter <
s.rich...@data-artisans.com> wrote:

> Ok, now the question is also about what classloaders Ignite is creating
> and how they are used, but the relevant code line in Flink is probably in
> FlinkMiniCluster.scala, line 538 (current master):
>
>  try {
>  JobClient.submitJobAndWait(
>clientActorSystem,
>configuration,
>leaderRetrievalService,
>jobGraph,
>timeout,
>printUpdates,
>this.getClass.getClassLoader())
> } finally {
>if(!useSingleActorSystem) {
>  // we have to shutdown the just created actor system
>  shutdownJobClientActorSystem(clientActorSystem)
>}
>  }
>
>
> This is what is executed as part of executing a job through
> LocalEnvironment. As we can see, the classloader is set to the classloader
> of FlinkMiniCluster. Depending on the classloader structure inside Ignite,
> this classloader might not know your user code. What you could do is
> changing this line in a custom Flink build, changing line 538 for example
> to Thread.currentThread().getContextClassloader() and ensuring that the
> context classloader ins the runnable is a classloader that a) knows the
> user code and b) is a child of the classloader that knows the Ignite and
> Flink classes. Notice that this is not a general solution and should not
> become a general fix.
>
> I have heard that Till is about to change some things about local
> execution, so I included him in CC. Maybe he can provide additional hints
> how your use case might be better supported in the upcoming Flink 1.3.
>
> Best,
> Stefan
>
> Am 25.04.2017 um 22:50 schrieb Matt <dromitl...@gmail.com>:
>
> I updated the code a little bit for clarity, now the line #56 mentioned in
> my previous message is line #25.
>
> In summary the error I'm getting is this:
>
> ---
> Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException:
> Cannot load user class: com.test.Test
> ClassLoader info: URL ClassLoader:
> Class not resolvable through given classloader.
> ---
>
> But if I'm not wrong, after trying to load the class through
> URLClassLoader, Flink should try loading it with its parent ClassLoader,
> which should be the same ClassLoader that executed the environment, and it
> does have access to the class.
>
> Not sure what is wrong.
>
> On Tue, Apr 25, 2017 at 5:38 PM, Matt <dromitl...@gmail.com> wrote:
>
>> Hi Stefan,
>>
>> Check the code here: https://gist.github.com/
>> 17d82ee7dd921a0d649574a361cc017d , the output is at the bottom of the
>> page.
>>
>> Here are the results of the additional tests you mentioned:
>>
>> 1. I was able to instantiate an inner class (Test$Foo) inside the Ignite
>> closure, no problem with that
>> 2. I tried implementing SourceFunction and SinkFunction in Test itself, I
>> was able to instantiate the class inside the Ignite closure
>> 3. I'm not sure what you meant in this point, is it something like what I
>> tried in line #56?
>>
>> Additionally, I tried implementing the SourceFunction and SinkFunction in
>> Test$Foo with the same result: it says "Cannot load user class:
>> com.test.Test$Foo"
>>
>> Looks like Flink is not using the correct ClassLoader. Any idea?
>>
>> Regards,
>> Matt
>>
>> On Tue, Apr 25, 2017 at 7:00 AM, Stefan Richter <
>> s.rich...@data-artisans.com> wrote:
>>
>>> Hi,
>>>
>>> I would expect that the local environment picks up the class path from
>>> the code that launched it. So I think the question is what happens behind
>>> the scenes when you call ignite.compute().broadcast(runnable); . Which
>>> classes are shipped and how is the classpath build in the environment that
>>> runs the code. Your example is also not fully conclusive, because
>>> com.myproj.Test (which you can successfully instantiate) and
>>> com.myproj.Test$1$2 (which fails) are different classes, so maybe only the
>>> outer class is shipped with the broadcast call. My theory is that not all
>>> classes are shipped (e.g. inner classes), but only Test . You could try
>>> three things to analyze to problem a little more:
>>>
>>> 1) Create another inner class inside Test and try if you are still able
>>> to instantiate also this class via reflection.
>>> 2) Let Test class itself implement the map function (avoiding the usage
>>> of other/inner classes) and see if this works.
>>> 3) Check and set the thread’s context classloader inside the runnable to
>>&g

Re: [BUG?] Cannot Load User Class on Local Environment

2017-04-25 Thread Matt
I updated the code a little bit for clarity, now the line #56 mentioned in
my previous message is line #25.

In summary the error I'm getting is this:

---
Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException:
Cannot load user class: com.test.Test
ClassLoader info: URL ClassLoader:
Class not resolvable through given classloader.
---

But if I'm not wrong, after trying to load the class through
URLClassLoader, Flink should try loading it with its parent ClassLoader,
which should be the same ClassLoader that executed the environment, and it
does have access to the class.

Not sure what is wrong.

On Tue, Apr 25, 2017 at 5:38 PM, Matt <dromitl...@gmail.com> wrote:

> Hi Stefan,
>
> Check the code here: https://gist.github.com/
> 17d82ee7dd921a0d649574a361cc017d , the output is at the bottom of the
> page.
>
> Here are the results of the additional tests you mentioned:
>
> 1. I was able to instantiate an inner class (Test$Foo) inside the Ignite
> closure, no problem with that
> 2. I tried implementing SourceFunction and SinkFunction in Test itself, I
> was able to instantiate the class inside the Ignite closure
> 3. I'm not sure what you meant in this point, is it something like what I
> tried in line #56?
>
> Additionally, I tried implementing the SourceFunction and SinkFunction in
> Test$Foo with the same result: it says "Cannot load user class:
> com.test.Test$Foo"
>
> Looks like Flink is not using the correct ClassLoader. Any idea?
>
> Regards,
> Matt
>
> On Tue, Apr 25, 2017 at 7:00 AM, Stefan Richter <
> s.rich...@data-artisans.com> wrote:
>
>> Hi,
>>
>> I would expect that the local environment picks up the class path from
>> the code that launched it. So I think the question is what happens behind
>> the scenes when you call ignite.compute().broadcast(runnable); . Which
>> classes are shipped and how is the classpath build in the environment that
>> runs the code. Your example is also not fully conclusive, because
>> com.myproj.Test (which you can successfully instantiate) and
>> com.myproj.Test$1$2 (which fails) are different classes, so maybe only the
>> outer class is shipped with the broadcast call. My theory is that not all
>> classes are shipped (e.g. inner classes), but only Test . You could try
>> three things to analyze to problem a little more:
>>
>> 1) Create another inner class inside Test and try if you are still able
>> to instantiate also this class via reflection.
>> 2) Let Test class itself implement the map function (avoiding the usage
>> of other/inner classes) and see if this works.
>> 3) Check and set the thread’s context classloader inside the runnable to
>> something that contains all required classes and see if this gets picked up
>> by Flink.
>>
>> Best,
>> Stefan
>>
>> Am 25.04.2017 um 07:27 schrieb Matt <dromitl...@gmail.com>:
>>
>> Hi all,
>>
>> I'm trying to run Flink using a local environment, but on an Ignite node
>> to achieve collocation (as mentioned in my previous message on this list).
>>
>> Have a look at the code in [1]. It's pretty simple, but I'm getting a
>> "cannot load user class" error as shown in [2].
>>
>> If you check line #29 on the code, I'm able to create an instance of
>> class Test, and it's the same context from which I'm creating the Flink
>> job. Shouldn't it work provided I'm using a local environment?
>>
>> It would be really nice to be able to inject a ClassLoader into the chunk
>> of code that creates the job. Is this currently possible?
>>
>> Any fix or workaround is appreciated!
>>
>> Best,
>> Matt
>>
>> [1] https://gist.github.com/f248187b9638023b95ba8bd9d7f06215
>> [2] https://gist.github.com/796ee05425535ece1736df7b1e884cce
>>
>>
>>
>


Re: [BUG?] Cannot Load User Class on Local Environment

2017-04-25 Thread Matt
Hi Stefan,

Check the code here:
https://gist.github.com/17d82ee7dd921a0d649574a361cc017d , the output is at
the bottom of the page.

Here are the results of the additional tests you mentioned:

1. I was able to instantiate an inner class (Test$Foo) inside the Ignite
closure, no problem with that
2. I tried implementing SourceFunction and SinkFunction in Test itself, I
was able to instantiate the class inside the Ignite closure
3. I'm not sure what you meant in this point, is it something like what I
tried in line #56?

Additionally, I tried implementing the SourceFunction and SinkFunction in
Test$Foo with the same result: it says "Cannot load user class:
com.test.Test$Foo"

Looks like Flink is not using the correct ClassLoader. Any idea?

Regards,
Matt

On Tue, Apr 25, 2017 at 7:00 AM, Stefan Richter <s.rich...@data-artisans.com
> wrote:

> Hi,
>
> I would expect that the local environment picks up the class path from the
> code that launched it. So I think the question is what happens behind the
> scenes when you call ignite.compute().broadcast(runnable); . Which
> classes are shipped and how is the classpath build in the environment that
> runs the code. Your example is also not fully conclusive, because
> com.myproj.Test (which you can successfully instantiate) and
> com.myproj.Test$1$2 (which fails) are different classes, so maybe only the
> outer class is shipped with the broadcast call. My theory is that not all
> classes are shipped (e.g. inner classes), but only Test . You could try
> three things to analyze to problem a little more:
>
> 1) Create another inner class inside Test and try if you are still able to
> instantiate also this class via reflection.
> 2) Let Test class itself implement the map function (avoiding the usage of
> other/inner classes) and see if this works.
> 3) Check and set the thread’s context classloader inside the runnable to
> something that contains all required classes and see if this gets picked up
> by Flink.
>
> Best,
> Stefan
>
> Am 25.04.2017 um 07:27 schrieb Matt <dromitl...@gmail.com>:
>
> Hi all,
>
> I'm trying to run Flink using a local environment, but on an Ignite node
> to achieve collocation (as mentioned in my previous message on this list).
>
> Have a look at the code in [1]. It's pretty simple, but I'm getting a
> "cannot load user class" error as shown in [2].
>
> If you check line #29 on the code, I'm able to create an instance of class
> Test, and it's the same context from which I'm creating the Flink job.
> Shouldn't it work provided I'm using a local environment?
>
> It would be really nice to be able to inject a ClassLoader into the chunk
> of code that creates the job. Is this currently possible?
>
> Any fix or workaround is appreciated!
>
> Best,
> Matt
>
> [1] https://gist.github.com/f248187b9638023b95ba8bd9d7f06215
> [2] https://gist.github.com/796ee05425535ece1736df7b1e884cce
>
>
>


Flink on Ignite - Collocation?

2017-04-24 Thread Matt
Hi all,

I've been playing around with Apache Ignite and I want to run Flink on top
of it but there's something I'm not getting.

Ignite has its own support for clustering, and data is distributed on
different nodes using a partitioned key. Then, we are able to run a closure
and do some computation on the nodes that owns the data (collocation of
computation [1]), that way saving time and bandwidth. It all looks good,
but I'm not sure how it would play with Flink's own clustering capability.

My initial idea -which I haven't tried yet- is to use collocation to run a
closure where the data resides, and use that closure to execute a Flink
pipeline locally on that node (running it using a local environment), then
using a custom made data source I should be able to plug the data from the
local Ignite cache to the Flink pipeline and back into a cache using an
Ignite sink.

I'm not sure it's a good idea to disable Flink distribution and running it
in a local environment so the data is not transferred to another node. I
think it's the same problem with Kafka, if it partitions the data on
different nodes, how do you guarantee that Flink jobs are executed where
the data resides? In case there's no way to guarantee that unless you
enable local environment, what do you think of that approach (in terms of
performance)?

Any additional insight regarding stream processing on Ignite or any other
distributed storage is very welcome!

Best regards,
Matt

[1] https://apacheignite.readme.io/docs/collocate-compute-and-data


Flink + Druid example?

2017-04-07 Thread Matt
Hi all,

I'm looking for an example of Tranquility (Druid's lib) as a Flink sink.

I'm trying to follow the code in [1] but I feel it's incomplete or maybe
outdated, it doesn't mention anything about other method (tranquilizer)
that seems to be part of the BeamFactory interface in the current version.

If anyone has any code or a working project to use as a reference that
would be awesome for me and for the rest of us looking for a time-series
database solution!

Best regards,
Matt

[1] https://github.com/druid-io/tranquility/blob/master/docs/flink.md


Re: Fw: Flink Kinesis Connector

2017-02-27 Thread Matt
Hi,


>Am I missing something obvious?
So it was that!


Thanks very much for the help, sure I'll be able to figure that out.


Matt


From: Tzu-Li (Gordon) Tai <tzuli...@apache.org>
Sent: 27 February 2017 12:17
To: user@flink.apache.org
Subject: Re: Fw: Flink Kinesis Connector

Hi Matt!

As mentioned in the docs, due to the ASL license, we do not deploy the artifact 
to the Maven central repository on Flink releases.
You will need to build the Kinesis connector by yourself (the instructions to 
do so are also in the Flink Kinesis connector docs :)), and install it to your 
local Maven repository.
Then, you’ll be able to add it as a Maven dependency in your projects.

Cheers,
Gordon



On February 27, 2017 at 8:10:52 PM, Matt 
(mattmcgowan1...@hotmail.com<mailto:mattmcgowan1...@hotmail.com>) wrote:

Hi,


I'm working through trying to connect flink up to a kinesis stream, off of 
this: 
https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/connectors/kinesis.html

Apache Flink 1.2.0 Documentation: Amazon AWS Kinesis 
...<https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/connectors/kinesis.html>
ci.apache.org
The flink-connector-kinesis_2.10 has a dependency on code licensed under the 
Amazon Software License (ASL). Linking to the flink-connector-kinesis will 
include ASL ...


It gives the following Maven dependency:


  org.apache.flink
  flink-connector-kinesis_2.10
  1.2.0



However, I'm struggling to find that. It doesn't appear to be up on 
Maven<http://search.maven.org/#search%7Cga%7C1%7Cflink-connector>, nor is it 
listed in the Apache 
repository<https://repository.apache.org/#nexus-search;quick~flink-connector->.

Does this still exist? Am I missing something obvious?


Thanks in advance for any help,


Matt


Re: Cyclic ConnectedStream

2017-02-05 Thread Matt
I really don't know what you mean, I've been reading the documentation and
examples showing iterations. but it just won't work for me I believe. Maybe
you can write a quick example? It doesn't matter the details, only the
topology.

If anyone else has an idea it's very welcome!

Matt

On Tue, Jan 31, 2017 at 3:07 PM, Gábor Gévay <gga...@gmail.com> wrote:

> I somehow still suspect that iterations might work for your use case.
> Note, that in the streaming API, iterations are currently nothing more than
> a back-edge in the topology, i.e. a low-level tool to create a cyclic
> topology, like as you say with your hypothetical setter syntax. (It's quite
> different from the iterations of the batch API.)
>
> The tricky part for your use-case is that you would want a ConnectedStream
> as your iteration head, which should get the elements from the back-edge in
> a separated way from the normal input. You could simulate this by using not
> ConnectedStream.flatMap, but a just a simple Stream.flatMap whose input
> element type is an Either type, whose two components would be the normal
> input and the back-edge input. (And you add maps before the closeWith and
> to your input1, which would appropriately wrap into the two alternatives of
> the Either type.)
>
> Best,
> Gábor
>
>
>
> 2017-01-29 15:39 GMT+01:00 Matt <dromitl...@gmail.com>:
>
>> Check this image for clarification, this is what I'm trying to do:
>> http://i.imgur.com/iZxPv04.png
>>
>> [image: Inline image 1]
>>
>> The rectangles are the two CoFlatMapFunction, sharing a state between
>> process and update (map1 and map2). It's clear from the image that I need
>> input1 and the green box to create the blue box, and input2 and the blue
>> box to create the green one.
>>
>> ---
>> *blue*  = *input1*.connect(*green*).keyBy(...).flatMap(...);
>> *green* = *input2*.connect(*blue*).keyBy(...).flatMap(...);
>> ---
>>
>> As you can see there's no cycle in the flow of data so I guess this
>> topology is valid. The problem is not having a way to define such flow.
>>
>> For instance, with the appropriate setters we would be able to do this:
>>
>> ---
>> *blue*  = *input1*.connect();
>> *green* = *input2*.connect();
>>
>> *blue.*setConnection(*green*);
>> *green*.setConnection(*blue*);
>>
>> *blue*.keyBy(...).flatMap(...);
>> *green*.keyBy(...).flatMap(...);
>> ---
>>
>> Any idea is welcome.
>>
>> Matt
>>
>> On Sat, Jan 28, 2017 at 5:31 PM, Matt <dromitl...@gmail.com> wrote:
>>
>>> I'm aware of IterativeStream but I don't think it's useful in this case.
>>>
>>> As shown in the example above, my use case is "cyclic" in that the same
>>> object goes from *Input* to *predictionStream* (flatMap1), then to
>>> *statsStream* (flatMap2, where it's updated with an object from *Input2*)
>>> and finally to *predictionStream* (flatMap2).
>>>
>>> The same operator is never applied twice to the object, thus I would say
>>> this dataflow is cyclic only in the dependencies of the stream
>>> (predictionStream depends on statsStream, but it depends on
>>> predictionStream in the first place).
>>>
>>> I hope it is clear now.
>>>
>>> Matt
>>>
>>> On Sat, Jan 28, 2017 at 3:17 PM, Gábor Gévay <gga...@gmail.com> wrote:
>>>
>>>> Hello,
>>>>
>>>> Cyclic dataflows can be built using iterations:
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.2/
>>>> dev/datastream_api.html#iterations
>>>>
>>>> Best,
>>>> Gábor
>>>>
>>>>
>>>>
>>>>
>>>> 2017-01-28 18:39 GMT+01:00 Matt <dromitl...@gmail.com>:
>>>> > I have a ConnectedStream (A) that depends on another ConnectedStream
>>>> (B),
>>>> > which depends on the first one (A).
>>>> >
>>>> > Simplified code:
>>>> >
>>>> > predictionStream = input
>>>> >   .connect(statsStream)
>>>> >   .keyBy(...)
>>>> >   .flatMap(CoFlatMapFunction {
>>>> >  flatMap1(obj, output) {
>>>> >  p = prediction(obj)
>>>> >  output.collect(p)
>>>> >  }
>>>> >  flatMap2(stat, output) {
>>>> >  updateModel(stat)
>>>> >  }
>>>> >   })
>>>> >
>>>> > statsStream = input2
>>>> >   .connect(predictionStream)
>>>> >   .keyBy(...)
>>>> >   .flatMap(CoFlatMapFunction {
>>>> >  flatMap1(obj2, output) {
>>>> > s = getStats(obj2, p)
>>>> > output.collect(s)
>>>> >  }
>>>> >  flatMap2(prediction, output) {
>>>> > p = prediction
>>>> >  }
>>>> >   })
>>>> >
>>>> > I'm guessing this should be possible to achieve, one way would be to
>>>> add a
>>>> > sink on statsStream to save the elements into Kafka and read from
>>>> that topic
>>>> > on predictionStream instead of initializing it with a reference of
>>>> > statsStream. But I would rather avoid writing unnecessarily into
>>>> kafka.
>>>> >
>>>> > Is there any other way to achieve this?
>>>> >
>>>> > Thanks,
>>>> > Matt
>>>>
>>>
>>>
>>
>


Re: Cyclic ConnectedStream

2017-01-29 Thread Matt
Check this image for clarification, this is what I'm trying to do:
http://i.imgur.com/iZxPv04.png

[image: Inline image 1]

The rectangles are the two CoFlatMapFunction, sharing a state between
process and update (map1 and map2). It's clear from the image that I need
input1 and the green box to create the blue box, and input2 and the blue
box to create the green one.

---
*blue*  = *input1*.connect(*green*).keyBy(...).flatMap(...);
*green* = *input2*.connect(*blue*).keyBy(...).flatMap(...);
---

As you can see there's no cycle in the flow of data so I guess this
topology is valid. The problem is not having a way to define such flow.

For instance, with the appropriate setters we would be able to do this:

---
*blue*  = *input1*.connect();
*green* = *input2*.connect();

*blue.*setConnection(*green*);
*green*.setConnection(*blue*);

*blue*.keyBy(...).flatMap(...);
*green*.keyBy(...).flatMap(...);
---

Any idea is welcome.

Matt

On Sat, Jan 28, 2017 at 5:31 PM, Matt <dromitl...@gmail.com> wrote:

> I'm aware of IterativeStream but I don't think it's useful in this case.
>
> As shown in the example above, my use case is "cyclic" in that the same
> object goes from *Input* to *predictionStream* (flatMap1), then to
> *statsStream* (flatMap2, where it's updated with an object from *Input2*)
> and finally to *predictionStream* (flatMap2).
>
> The same operator is never applied twice to the object, thus I would say
> this dataflow is cyclic only in the dependencies of the stream
> (predictionStream depends on statsStream, but it depends on
> predictionStream in the first place).
>
> I hope it is clear now.
>
> Matt
>
> On Sat, Jan 28, 2017 at 3:17 PM, Gábor Gévay <gga...@gmail.com> wrote:
>
>> Hello,
>>
>> Cyclic dataflows can be built using iterations:
>> https://ci.apache.org/projects/flink/flink-docs-release-1.2/
>> dev/datastream_api.html#iterations
>>
>> Best,
>> Gábor
>>
>>
>>
>>
>> 2017-01-28 18:39 GMT+01:00 Matt <dromitl...@gmail.com>:
>> > I have a ConnectedStream (A) that depends on another ConnectedStream
>> (B),
>> > which depends on the first one (A).
>> >
>> > Simplified code:
>> >
>> > predictionStream = input
>> >   .connect(statsStream)
>> >   .keyBy(...)
>> >   .flatMap(CoFlatMapFunction {
>> >  flatMap1(obj, output) {
>> >  p = prediction(obj)
>> >  output.collect(p)
>> >  }
>> >  flatMap2(stat, output) {
>> >  updateModel(stat)
>> >  }
>> >   })
>> >
>> > statsStream = input2
>> >   .connect(predictionStream)
>> >   .keyBy(...)
>> >   .flatMap(CoFlatMapFunction {
>> >  flatMap1(obj2, output) {
>> > s = getStats(obj2, p)
>> > output.collect(s)
>> >  }
>> >  flatMap2(prediction, output) {
>> > p = prediction
>> >  }
>> >   })
>> >
>> > I'm guessing this should be possible to achieve, one way would be to
>> add a
>> > sink on statsStream to save the elements into Kafka and read from that
>> topic
>> > on predictionStream instead of initializing it with a reference of
>> > statsStream. But I would rather avoid writing unnecessarily into kafka.
>> >
>> > Is there any other way to achieve this?
>> >
>> > Thanks,
>> > Matt
>>
>
>


Re: Cyclic ConnectedStream

2017-01-28 Thread Matt
I'm aware of IterativeStream but I don't think it's useful in this case.

As shown in the example above, my use case is "cyclic" in that the same
object goes from *Input* to *predictionStream* (flatMap1), then to
*statsStream* (flatMap2, where it's updated with an object from *Input2*)
and finally to *predictionStream* (flatMap2).

The same operator is never applied twice to the object, thus I would say
this dataflow is cyclic only in the dependencies of the stream
(predictionStream depends on statsStream, but it depends on
predictionStream in the first place).

I hope it is clear now.

Matt

On Sat, Jan 28, 2017 at 3:17 PM, Gábor Gévay <gga...@gmail.com> wrote:

> Hello,
>
> Cyclic dataflows can be built using iterations:
> https://ci.apache.org/projects/flink/flink-docs-
> release-1.2/dev/datastream_api.html#iterations
>
> Best,
> Gábor
>
>
>
>
> 2017-01-28 18:39 GMT+01:00 Matt <dromitl...@gmail.com>:
> > I have a ConnectedStream (A) that depends on another ConnectedStream (B),
> > which depends on the first one (A).
> >
> > Simplified code:
> >
> > predictionStream = input
> >   .connect(statsStream)
> >   .keyBy(...)
> >   .flatMap(CoFlatMapFunction {
> >  flatMap1(obj, output) {
> >  p = prediction(obj)
> >  output.collect(p)
> >  }
> >  flatMap2(stat, output) {
> >  updateModel(stat)
> >  }
> >   })
> >
> > statsStream = input2
> >   .connect(predictionStream)
> >   .keyBy(...)
> >   .flatMap(CoFlatMapFunction {
> >  flatMap1(obj2, output) {
> > s = getStats(obj2, p)
> > output.collect(s)
> >  }
> >  flatMap2(prediction, output) {
> > p = prediction
> >  }
> >   })
> >
> > I'm guessing this should be possible to achieve, one way would be to add
> a
> > sink on statsStream to save the elements into Kafka and read from that
> topic
> > on predictionStream instead of initializing it with a reference of
> > statsStream. But I would rather avoid writing unnecessarily into kafka.
> >
> > Is there any other way to achieve this?
> >
> > Thanks,
> > Matt
>


Cyclic ConnectedStream

2017-01-28 Thread Matt
I have a ConnectedStream (A) that depends on another ConnectedStream (B),
which depends on the first one (A).

Simplified code:

*predictionStream = **input*
  .connect(*statsStream*)
  .keyBy(...)
  .flatMap(CoFlatMapFunction {
 flatMap1(obj, output) {
 p = prediction(obj)
* output.collect(p)*
 }
 flatMap2(stat, output) {
 updateModel(stat)
 }
  })

*statsStream = input2*
  .connect(*predictionStream*)
  .keyBy(...)
  .flatMap(CoFlatMapFunction {
 flatMap1(obj2, output) {
s = getStats(obj2, p)
*output.collect(s)*
 }
 flatMap2(prediction, output) {
p = prediction
 }
  })

I'm guessing this should be possible to achieve, one way would be to add a
sink on statsStream to save the elements into Kafka and read from that
topic on predictionStream instead of initializing it with a reference of
statsStream. But I would rather avoid writing unnecessarily into kafka.

Is there any other way to achieve this?

Thanks,
Matt


Re: .keyBy() on ConnectedStream

2017-01-28 Thread Matt
I'll create a new thread with my last message since it's not completely
related with the original question here.

On Sat, Jan 28, 2017 at 11:55 AM, Matt <dromitl...@gmail.com> wrote:

> Aha, ok, got it!
>
> I just realized that this ConnectedStream I was talking about (A) depends
> on another ConnectedStream (B), which depends on the first one (A). So it's
> even trickier than I first thought.
>
> For instance (simplified):
>
> *predictionStream = **input*
>   .connect(*statsStream*)
>   .keyBy(...)
>   .flatMap(CoFlatMapFunction {
>  flatMap1(obj, output) {
>  p = prediction(obj)
> * output.collect(p)*
>  }
>  flatMap2(stat, output) {
>  updateModel(stat)
>  }
>   })
>
> *statsStream = input2*
>   .connect(*predictionStream*)
>   .keyBy(...)
>   .flatMap(CoFlatMapFunction {
>  flatMap1(obj2, output) {
> s = getStats(obj2, p)
> *output.collect(s)*
>  }
>  flatMap2(prediction, output) {
> p = prediction
>  }
>   })
>
> I'm guessing it should be possible to achieve, one way would be to add a
> sink on statsStream to save the elements into Kafka and read from that
> topic on predictionStream instead of initializing it with a reference of
> statsStream. I would rather avoid writing unnecessarily into kafka.
>
> Is there any other way to achieve this?
>
> Thanks,
> Matt
>
> On Fri, Jan 27, 2017 at 6:35 AM, Timo Walther <twal...@apache.org> wrote:
>
>> Hi Matt,
>>
>> the keyBy() on ConnectedStream has two parameters to specify the key of
>> the left and of the right stream. Same keys end up in the same
>> CoMapFunction/CoFlatMapFunction. If you want to group both streams on a
>> common key, then you can use .union() instead of .connect().
>>
>> I hope that helps.
>>
>> Timo
>>
>>
>> Am 27/01/17 um 07:21 schrieb Matt:
>>
>> Hi all,
>>>
>>> What's the purpose of .keyBy() on ConnectedStream? How does it affect
>>> .map() and .flatMap()?
>>>
>>> I'm not finding a way to group stream elements based on a key, something
>>> like a Window on a normal Stream, but for a ConnectedStream.
>>>
>>> Regards,
>>> Matt
>>>
>>
>>
>>
>


Re: .keyBy() on ConnectedStream

2017-01-28 Thread Matt
Aha, ok, got it!

I just realized that this ConnectedStream I was talking about (A) depends
on another ConnectedStream (B), which depends on the first one (A). So it's
even trickier than I first thought.

For instance (simplified):

*predictionStream = **input*
  .connect(*statsStream*)
  .keyBy(...)
  .flatMap(CoFlatMapFunction {
 flatMap1(obj, output) {
 p = prediction(obj)
* output.collect(p)*
 }
 flatMap2(stat, output) {
 updateModel(stat)
 }
  })

*statsStream = input2*
  .connect(*predictionStream*)
  .keyBy(...)
  .flatMap(CoFlatMapFunction {
 flatMap1(obj2, output) {
s = getStats(obj2, p)
*output.collect(s)*
 }
 flatMap2(prediction, output) {
p = prediction
 }
  })

I'm guessing it should be possible to achieve, one way would be to add a
sink on statsStream to save the elements into Kafka and read from that
topic on predictionStream instead of initializing it with a reference of
statsStream. I would rather avoid writing unnecessarily into kafka.

Is there any other way to achieve this?

Thanks,
Matt

On Fri, Jan 27, 2017 at 6:35 AM, Timo Walther <twal...@apache.org> wrote:

> Hi Matt,
>
> the keyBy() on ConnectedStream has two parameters to specify the key of
> the left and of the right stream. Same keys end up in the same
> CoMapFunction/CoFlatMapFunction. If you want to group both streams on a
> common key, then you can use .union() instead of .connect().
>
> I hope that helps.
>
> Timo
>
>
> Am 27/01/17 um 07:21 schrieb Matt:
>
> Hi all,
>>
>> What's the purpose of .keyBy() on ConnectedStream? How does it affect
>> .map() and .flatMap()?
>>
>> I'm not finding a way to group stream elements based on a key, something
>> like a Window on a normal Stream, but for a ConnectedStream.
>>
>> Regards,
>> Matt
>>
>
>
>


.keyBy() on ConnectedStream

2017-01-26 Thread Matt
Hi all,

What's the purpose of .keyBy() on ConnectedStream? How does it affect
.map() and .flatMap()?

I'm not finding a way to group stream elements based on a key, something
like a Window on a normal Stream, but for a ConnectedStream.

Regards,
Matt


Re: Objects accessible from all Flink nodes

2017-01-13 Thread Matt
Errata: How can an *object (such as the classifier, line 1)* be accessed by
any Flink node [...]

Just in case, the classifier itself can't be serialized I believe, it's
part of a framework which I can't modify. In any case, even if it's
serialized, I guess the cost of moving it to one node and then another
makes the whole data flow unpractical. It's better to move all created
instances to one single node where only one instance of the classifier
is maintained.

I'm not sure if this is possible or how to do this.

On Thu, Jan 12, 2017 at 11:11 PM, Matt <dromitl...@gmail.com> wrote:

> Hello,
>
> I have a stream of objects which I use to update the model of a
> classification algorithm and another stream with the objects I need to
> classify in real time.
>
> The problem is that the instances for training and evaluation are
> processed on potentially different Flink nodes, but the classifier should
> be applied to all instances no matter in what node it was generated (ie,
> the classifier should be accessible from any Flink node).
>
> Just to make it clearer, here is what would NOT work since these sink
> functions are not serializable: https://gist.github.com/
> b979bf742b0d2f3da8cc8e5e91207151
>
> Two questions here:
>
> *1. How can an instance be accessed by any Flink node like this (line 11
> and 19)? Maybe there's a better approach to this problem.*
>
> *2. In the example the second stream (line 15) is started right away but
> at startup the classifier is not ready to use until it has been trained
> with enough instances. Is it possible to do this? If I'm not wrong
> env.execute (line 24) can be used only once.*
>
> Regards,
> Matt
>


Objects accessible from all Flink nodes

2017-01-12 Thread Matt
Hello,

I have a stream of objects which I use to update the model of a
classification algorithm and another stream with the objects I need to
classify in real time.

The problem is that the instances for training and evaluation are processed
on potentially different Flink nodes, but the classifier should be applied
to all instances no matter in what node it was generated (ie, the
classifier should be accessible from any Flink node).

Just to make it clearer, here is what would NOT work since these sink
functions are not serializable:
https://gist.github.com/b979bf742b0d2f3da8cc8e5e91207151

Two questions here:

*1. How can an instance be accessed by any Flink node like this (line 11
and 19)? Maybe there's a better approach to this problem.*

*2. In the example the second stream (line 15) is started right away but at
startup the classifier is not ready to use until it has been trained with
enough instances. Is it possible to do this? If I'm not wrong env.execute
(line 24) can be used only once.*

Regards,
Matt


Re: Caching collected objects in .apply()

2017-01-05 Thread Matt
I'm still looking for an answer to this question. Hope you can give me some
insight!

On Thu, Dec 22, 2016 at 6:17 PM, Matt <dromitl...@gmail.com> wrote:

> Just to be clear, the stream is of String elements. The first part of the
> pipeline (up to the first .apply) receives those strings, and returns
> objects of another class ("A" let's say).
>
> On Thu, Dec 22, 2016 at 6:04 PM, Matt <dromitl...@gmail.com> wrote:
>
>> Hello,
>>
>> I have a window processing 10 objects at a time, and creating 1 as a
>> result. The problem is in order to create that object I need the object
>> from the previous window.
>>
>> I'm doing this:
>>
>> stream
>>   .keyBy(...some key...)
>>   .countWindow(10, 1)
>>   .apply(...creates an element A...)
>>   .keyBy(...same key as above...)
>>   .countWindow(2, 1)
>>   .apply(...updates A with the value of the previous element A...)
>>   .addSink(...)
>>
>> Probably there is a way to retrieve the last collected object inside the
>> first .apply(), or to cache it somehow.
>>
>> Is there a better way to achieve the same? How inefficient is this?
>>
>> Regards,
>> Matt
>>
>
>


Data Stream Mining

2016-12-29 Thread Matt
Anyone has any experience mining a Flink+Kafka stream?

I'm looking for an online analysis framework to apply some classifiers on a
time serie.

Any example of how to integrate Flink with MOA, Samoa, ADAMS, DataSketches
or any other framework is appreciated.

Regards,
Matt


Re: Caching collected objects in .apply()

2016-12-22 Thread Matt
Just to be clear, the stream is of String elements. The first part of the
pipeline (up to the first .apply) receives those strings, and returns
objects of another class ("A" let's say).

On Thu, Dec 22, 2016 at 6:04 PM, Matt <dromitl...@gmail.com> wrote:

> Hello,
>
> I have a window processing 10 objects at a time, and creating 1 as a
> result. The problem is in order to create that object I need the object
> from the previous window.
>
> I'm doing this:
>
> stream
>   .keyBy(...some key...)
>   .countWindow(10, 1)
>   .apply(...creates an element A...)
>   .keyBy(...same key as above...)
>   .countWindow(2, 1)
>   .apply(...updates A with the value of the previous element A...)
>   .addSink(...)
>
> Probably there is a way to retrieve the last collected object inside the
> first .apply(), or to cache it somehow.
>
> Is there a better way to achieve the same? How inefficient is this?
>
> Regards,
> Matt
>


Caching collected objects in .apply()

2016-12-22 Thread Matt
Hello,

I have a window processing 10 objects at a time, and creating 1 as a
result. The problem is in order to create that object I need the object
from the previous window.

I'm doing this:

stream
  .keyBy(...some key...)
  .countWindow(10, 1)
  .apply(...creates an element A...)
  .keyBy(...same key as above...)
  .countWindow(2, 1)
  .apply(...updates A with the value of the previous element A...)
  .addSink(...)

Probably there is a way to retrieve the last collected object inside the
first .apply(), or to cache it somehow.

Is there a better way to achieve the same? How inefficient is this?

Regards,
Matt


Re: Serializing NULLs

2016-12-22 Thread Matt
Here is the code of a Double wrapper with null support [1].

[1] https://gist.github.com/a8e8aa377957d3d51eadf36fe5c92a9e

On Tue, Dec 20, 2016 at 4:26 PM, Anirudh Mallem <anirudh.mal...@247-inc.com>
wrote:

> If you are using Avro generated classes then you cannot have your values
> null.
> https://cwiki.apache.org/confluence/display/AVRO/FAQ#FAQ-Whyisn'
> teveryvalueinAvronullable?
>
> From: Stephan Ewen
> Reply-To: "user@flink.apache.org"
> Date: Tuesday, December 20, 2016 at 8:17 AM
> To: "user@flink.apache.org"
> Subject: Re: Serializing NULLs
>
> Thanks for sharing the stack trace.
>
> This seems not really Flink related, it is part of the specific Avro
> encoding logic.
> The Avro Generic Record Type apparently does not allow the map value to be
> null.
>
>
>
> On Tue, Dec 20, 2016 at 4:55 PM, Matt <dromitl...@gmail.com> wrote:
>
>> Here is the back trace: https://gist.github.com
>> /56af4818bcf5dee6b97c248fd9233c67
>>
>> In the meanwhile I've solved the issue by creating a POJO class where
>> null is just Long.MIN_VALUE, that with a custom equals() made the trick. I
>> guess it's not as fast as de/serializing Double though.
>>
>> If you need any other information let me know.
>>
>> Regards,
>> Matt
>>
>> On Tue, Dec 20, 2016 at 6:46 AM, Stephan Ewen <se...@apache.org> wrote:
>>
>>> The "null" support in some types is not fully developed. However in that
>>> case I am wondering why it does not work. Can you share the stack trace, so
>>> we can take a look at the serializer?
>>>
>>>
>>>
>>> On Mon, Dec 19, 2016 at 9:56 PM, Matt <dromitl...@gmail.com> wrote:
>>>
>>>> Hello list,
>>>>
>>>> I'm getting this error:
>>>>
>>>>
>>>> *java.lang.RuntimeException: Could not forward element to next operator
>>>> *
>>>> *...*
>>>> *Caused by: java.lang.NullPointerException: in com.entities.Sector in
>>>> map in double null of double of map in field properties of
>>>> com.entities.Sector*
>>>> *...*
>>>> *Caused by: java.lang.NullPointerException*
>>>>
>>>> The field mentioned is a HashMap<String, Double>, and some keys are
>>>> mapped to null values.
>>>>
>>>> Why isn't it possible to forward/serialize those elements with null
>>>> values?
>>>> What do you do when your elements may contain nulls?
>>>>
>>>> Regards,
>>>> Matt
>>>>
>>>
>>>
>>
>


Re: Serializing NULLs

2016-12-20 Thread Matt
Here is the back trace:
https://gist.github.com/56af4818bcf5dee6b97c248fd9233c67

In the meanwhile I've solved the issue by creating a POJO class where null
is just Long.MIN_VALUE, that with a custom equals() made the trick. I guess
it's not as fast as de/serializing Double though.

If you need any other information let me know.

Regards,
Matt

On Tue, Dec 20, 2016 at 6:46 AM, Stephan Ewen <se...@apache.org> wrote:

> The "null" support in some types is not fully developed. However in that
> case I am wondering why it does not work. Can you share the stack trace, so
> we can take a look at the serializer?
>
>
>
> On Mon, Dec 19, 2016 at 9:56 PM, Matt <dromitl...@gmail.com> wrote:
>
>> Hello list,
>>
>> I'm getting this error:
>>
>>
>> *java.lang.RuntimeException: Could not forward element to next operator*
>> *...*
>> *Caused by: java.lang.NullPointerException: in com.entities.Sector in map
>> in double null of double of map in field properties of com.entities.Sector*
>> *...*
>> *Caused by: java.lang.NullPointerException*
>>
>> The field mentioned is a HashMap<String, Double>, and some keys are
>> mapped to null values.
>>
>> Why isn't it possible to forward/serialize those elements with null
>> values?
>> What do you do when your elements may contain nulls?
>>
>> Regards,
>> Matt
>>
>
>


Serializing NULLs

2016-12-19 Thread Matt
Hello list,

I'm getting this error:


*java.lang.RuntimeException: Could not forward element to next operator*
*...*
*Caused by: java.lang.NullPointerException: in com.entities.Sector in map
in double null of double of map in field properties of com.entities.Sector*
*...*
*Caused by: java.lang.NullPointerException*

The field mentioned is a HashMap<String, Double>, and some keys are mapped
to null values.

Why isn't it possible to forward/serialize those elements with null values?
What do you do when your elements may contain nulls?

Regards,
Matt


Re: Updating a Tumbling Window every second?

2016-12-19 Thread Matt
Fabian,

Thanks for your answer. Since elements in B are expensive to create, I
wanted to reuse them. I understand I can plug two consumers into stream A,
but in that case -if I'm not wrong- I would have to create repeated
elements of B: one to save them into stream B and one to create C objects
for stream C.

Anyway, I've already solved this problem a few days back.

Regards,
Matt

On Mon, Dec 19, 2016 at 5:57 AM, Fabian Hueske <fhue...@gmail.com> wrote:

> Hi Matt,
>
> the combination of a tumbling time window and a count window is one way to
> define a sliding window.
> In your example of a 30 secs tumbling window and a (3,1) count window
> results in a time sliding window of 90 secs width and 30 secs slide.
>
> You could define a time sliding window of 90 secs width and 1 secs slide
> on stream A to get a stream C with faster updates.
> If you still need stream B with the 30 secs tumbling window, you can have
> both windows defined on stream A.
>
> Hope this helps,
> Fabian
>
> 2016-12-16 12:58 GMT+01:00 Matt <dromitl...@gmail.com>:
>
>> I have reduced the problem to a simple image [1].
>>
>> Those shown on the image are the streams I have, and the problem now is
>> how to create a custom window assigner such that objects in B that *don't
>> share* elements in A, are put together in the same window.
>>
>> Why? Because in order to create elements in C (triangles), I have to
>> process n *independent* elements of B (n=2 in the example).
>>
>> Maybe there's a better or simpler way to do this. Any idea is appreciated!
>>
>> Regards,
>> Matt
>>
>> [1] http://i.imgur.com/dG5AkJy.png
>>
>> On Thu, Dec 15, 2016 at 3:22 AM, Matt <dromitl...@gmail.com> wrote:
>>
>>> Hello,
>>>
>>> I have a rather simple problem with a difficult explanation...
>>>
>>> I have 3 streams, one of objects of class A (stream A), one of class B
>>> (stream B) and one of class C (stream C). The elements of A are generated
>>> at a rate of about 3 times every second. Elements of type B encapsulates
>>> some key features of the stream A (like the number of elements of A in the
>>> window) during the last 30 seconds (tumbling window 30s). Finally, the
>>> elements of type C contains statistics (for simplicity let's say the
>>> average of elements processed by each element in B) of the last 3 elements
>>> in B and are produced on every new element of B (count window 3, 1).
>>>
>>> Illustrative example, () and [] denotes windows:
>>>
>>> ... [a10 a9 a8] [a7 a6] [a5 a4] [a3 a2 a1]
>>> ... (b4 [b3 b2) b1]
>>> ... [c2] [c1]
>>>
>>> This works fine, except for a dashboard that depends on the elements of
>>> C to be updated, and 30s is way too big of a delay. I thought I could
>>> change the tumbling window for a sliding window of size 30s and a slide of
>>> 1s, but this doesn't work.
>>>
>>> If I use a sliding window to create elements of B as mentioned, each
>>> count window would contain 3 elements of B, and I would get one element of
>>> C every second as intended, but those elements in B encapsulates almost the
>>> same elements of A. This results in stats that are wrong.
>>>
>>> For instance, c1 may have the average of b1, b2 and b3. But b1, b2 and
>>> b3 share most of the elements from stream A.
>>>
>>> Question: is there any way to create a count window with the last 3
>>> elements of B that would have gone into the same tumbling window, not with
>>> the last 3 consecutive elements?
>>>
>>> I hope the problem is clear, don't hesitate to ask for further
>>> clarification!
>>>
>>> Regards,
>>> Matt
>>>
>>>
>>
>


Re: Updating a Tumbling Window every second?

2016-12-16 Thread Matt
I have reduced the problem to a simple image [1].

Those shown on the image are the streams I have, and the problem now is how
to create a custom window assigner such that objects in B that *don't share*
elements in A, are put together in the same window.

Why? Because in order to create elements in C (triangles), I have to
process n *independent* elements of B (n=2 in the example).

Maybe there's a better or simpler way to do this. Any idea is appreciated!

Regards,
Matt

[1] http://i.imgur.com/dG5AkJy.png

On Thu, Dec 15, 2016 at 3:22 AM, Matt <dromitl...@gmail.com> wrote:

> Hello,
>
> I have a rather simple problem with a difficult explanation...
>
> I have 3 streams, one of objects of class A (stream A), one of class B
> (stream B) and one of class C (stream C). The elements of A are generated
> at a rate of about 3 times every second. Elements of type B encapsulates
> some key features of the stream A (like the number of elements of A in the
> window) during the last 30 seconds (tumbling window 30s). Finally, the
> elements of type C contains statistics (for simplicity let's say the
> average of elements processed by each element in B) of the last 3 elements
> in B and are produced on every new element of B (count window 3, 1).
>
> Illustrative example, () and [] denotes windows:
>
> ... [a10 a9 a8] [a7 a6] [a5 a4] [a3 a2 a1]
> ... (b4 [b3 b2) b1]
> ... [c2] [c1]
>
> This works fine, except for a dashboard that depends on the elements of C
> to be updated, and 30s is way too big of a delay. I thought I could change
> the tumbling window for a sliding window of size 30s and a slide of 1s, but
> this doesn't work.
>
> If I use a sliding window to create elements of B as mentioned, each count
> window would contain 3 elements of B, and I would get one element of C
> every second as intended, but those elements in B encapsulates almost the
> same elements of A. This results in stats that are wrong.
>
> For instance, c1 may have the average of b1, b2 and b3. But b1, b2 and b3
> share most of the elements from stream A.
>
> Question: is there any way to create a count window with the last 3
> elements of B that would have gone into the same tumbling window, not with
> the last 3 consecutive elements?
>
> I hope the problem is clear, don't hesitate to ask for further
> clarification!
>
> Regards,
> Matt
>
>


Updating a Tumbling Window every second?

2016-12-14 Thread Matt
Hello,

I have a rather simple problem with a difficult explanation...

I have 3 streams, one of objects of class A (stream A), one of class B
(stream B) and one of class C (stream C). The elements of A are generated
at a rate of about 3 times every second. Elements of type B encapsulates
some key features of the stream A (like the number of elements of A in the
window) during the last 30 seconds (tumbling window 30s). Finally, the
elements of type C contains statistics (for simplicity let's say the
average of elements processed by each element in B) of the last 3 elements
in B and are produced on every new element of B (count window 3, 1).

Illustrative example, () and [] denotes windows:

... [a10 a9 a8] [a7 a6] [a5 a4] [a3 a2 a1]
... (b4 [b3 b2) b1]
... [c2] [c1]

This works fine, except for a dashboard that depends on the elements of C
to be updated, and 30s is way too big of a delay. I thought I could change
the tumbling window for a sliding window of size 30s and a slide of 1s, but
this doesn't work.

If I use a sliding window to create elements of B as mentioned, each count
window would contain 3 elements of B, and I would get one element of C
every second as intended, but those elements in B encapsulates almost the
same elements of A. This results in stats that are wrong.

For instance, c1 may have the average of b1, b2 and b3. But b1, b2 and b3
share most of the elements from stream A.

Question: is there any way to create a count window with the last 3
elements of B that would have gone into the same tumbling window, not with
the last 3 consecutive elements?

I hope the problem is clear, don't hesitate to ask for further
clarification!

Regards,
Matt


Multiple consumers and custom triggers

2016-12-14 Thread Matt
Hello people,

I've written down some quick questions for which I couldn't find much or
anything in the documentation. I hope you can answer some of them!

*# Multiple consumers*

*1.* Is it possible to .union() streams of different classes? It is useful
to create a consumer that counts elements on different topics for example,
using a key such as the class name of the element, and a tumbling window of
5 mins let's say.

*2.* In case #1 is not possible, I need to launch multiple consumers to
achieve the same effect. However, I'm getting a "Factory already
initialized" error if I run environment.execute() for two consumers on
different threads. How do you .execute() more than one consumer on the same
application?

*# Custom triggers*

*3.* If a custom .trigger() overwrites the trigger of the WindowAssigner
used previously, why do we have to specify a WindowAssigner (such as
TumblingProcessingTimeWindows) in order to be able to specify a custom
trigger? Shouldn't it be possible to send a trigger to .window()?

*4.* I need a stream with a CountWindow (size 10, slide 1 let's say) that
may take more than 10 hours fill for the first time, but in the meanwhile I
want to process whatever elements already generated. I guess the way to do
this is to create a custom trigger that fires on every new element, with up
to 10 elements at a time. The result would be windows of sizes: 1 element,
then 2, 3, ..., 9, 10, 10, 10,  Is there a way to achieve this with
predefined triggers or a custom trigger is the only way to go here?

Best regards,
Matt


Re: Incremental aggregations - Example not working

2016-12-12 Thread Matt
Err, I meant if I'm not wrong *

On Mon, Dec 12, 2016 at 2:02 PM, Matt <dromitl...@gmail.com> wrote:

> I just checked with version 1.1.3 and it works fine, the problem is that
> in that version we can't use Kafka 0.10 if I'm not work. Thank you for the
> workaround!
>
> Best,
> Matt
>
> On Mon, Dec 12, 2016 at 1:52 PM, Yassine MARZOUGUI <
> y.marzou...@mindlytix.com> wrote:
>
>> Yes, it was suppoed to work. I looked into this, and as Chesnay said,
>> this is a bug in the fold function. I opened an issue in JIRA :
>> https://issues.apache.org/jira/browse/FLINK-5320, and will fix it very
>> soon, thank you for reporting it.
>> In the mean time you can workaround the problem by specifying the
>> TypeInformation along with the fold function as follows : fold(ACC,
>> FoldFunction, WindowFunction, foldAccumulatorType, resultType). In the
>> example, the foldAccumulatorType is new TupleTypeInfo<Tuple3<String,
>> Long, Integer>>(), and the resultType is also new
>> TupleTypeInfo<Tuple3<String, Long, Integer>>().
>>
>> Best,
>> Yassine
>>
>> 2016-12-12 16:38 GMT+01:00 Matt <dromitl...@gmail.com>:
>>
>>> I'm using 1.2-SNAPSHOT, should it work in that version?
>>>
>>> On Mon, Dec 12, 2016 at 12:18 PM, Yassine MARZOUGUI <
>>> y.marzou...@mindlytix.com> wrote:
>>>
>>>> Hi Matt,
>>>>
>>>> What version of Flink are you using?
>>>> The incremental agregation with fold(ACC, FoldFunction, WindowFunction)
>>>> in a new change that will be part of Flink 1.2, for Flink 1.1 the correct
>>>> way to perform incrementation aggregations is : apply(ACC,
>>>> FoldFunction, WindowFunction) (see the docs for 1.1 [1])
>>>>
>>>> [1] : https://ci.apache.org/projects/flink/flink-docs-release-1.
>>>> 1/apis/streaming/windows.html#windowfunction-with-incrementa
>>>> l-aggregation
>>>>
>>>> Best,
>>>> Yassine
>>>>
>>>> 2016-12-12 15:37 GMT+01:00 Chesnay Schepler <ches...@apache.org>:
>>>>
>>>>> Hello Matt,
>>>>>
>>>>> This looks like a bug in the fold() function to me.
>>>>>
>>>>> I'm adding Timo to the discussion, he can probably shed some light on
>>>>> this.
>>>>>
>>>>> Regards,
>>>>> Chesnay
>>>>>
>>>>>
>>>>> On 12.12.2016 15:13, Matt wrote:
>>>>>
>>>>> In case this is important, if I remove the WindowFunction, and only
>>>>> use the FoldFunction it works fine.
>>>>>
>>>>> I don't see what is wrong...
>>>>>
>>>>> On Mon, Dec 12, 2016 at 10:53 AM, Matt <dromitl...@gmail.com> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> I'm following the documentation [1] of window functions with
>>>>>> incremental aggregations, but I'm getting an "input mismatch" error.
>>>>>>
>>>>>> The code [2] is almost identical to the one in the documentation, at
>>>>>> the bottom you can find the exact error.
>>>>>>
>>>>>> What am I missing? Can you provide a working example of a fold
>>>>>> function with both a FoldFunction and a WindowFunction?
>>>>>>
>>>>>> Regards,
>>>>>> Matt
>>>>>>
>>>>>> [1] https://ci.apache.org/projects/flink/flink-docs-master/dev/w
>>>>>> indows.html#windowfunction-with-incremental-aggregation
>>>>>>
>>>>>> [2] https://gist.github.com/cc7ed5570e4ce30c3a482ab835e3983d
>>>>>>
>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>


Re: Incremental aggregations - Example not working

2016-12-12 Thread Matt
I just checked with version 1.1.3 and it works fine, the problem is that in
that version we can't use Kafka 0.10 if I'm not work. Thank you for the
workaround!

Best,
Matt

On Mon, Dec 12, 2016 at 1:52 PM, Yassine MARZOUGUI <
y.marzou...@mindlytix.com> wrote:

> Yes, it was suppoed to work. I looked into this, and as Chesnay said,
> this is a bug in the fold function. I opened an issue in JIRA :
> https://issues.apache.org/jira/browse/FLINK-5320, and will fix it very
> soon, thank you for reporting it.
> In the mean time you can workaround the problem by specifying the
> TypeInformation along with the fold function as follows : fold(ACC,
> FoldFunction, WindowFunction, foldAccumulatorType, resultType). In the
> example, the foldAccumulatorType is new TupleTypeInfo<Tuple3<String,
> Long, Integer>>(), and the resultType is also new
> TupleTypeInfo<Tuple3<String, Long, Integer>>().
>
> Best,
> Yassine
>
> 2016-12-12 16:38 GMT+01:00 Matt <dromitl...@gmail.com>:
>
>> I'm using 1.2-SNAPSHOT, should it work in that version?
>>
>> On Mon, Dec 12, 2016 at 12:18 PM, Yassine MARZOUGUI <
>> y.marzou...@mindlytix.com> wrote:
>>
>>> Hi Matt,
>>>
>>> What version of Flink are you using?
>>> The incremental agregation with fold(ACC, FoldFunction, WindowFunction)
>>> in a new change that will be part of Flink 1.2, for Flink 1.1 the correct
>>> way to perform incrementation aggregations is : apply(ACC,
>>> FoldFunction, WindowFunction) (see the docs for 1.1 [1])
>>>
>>> [1] : https://ci.apache.org/projects/flink/flink-docs-release-1.
>>> 1/apis/streaming/windows.html#windowfunction-with-incrementa
>>> l-aggregation
>>>
>>> Best,
>>> Yassine
>>>
>>> 2016-12-12 15:37 GMT+01:00 Chesnay Schepler <ches...@apache.org>:
>>>
>>>> Hello Matt,
>>>>
>>>> This looks like a bug in the fold() function to me.
>>>>
>>>> I'm adding Timo to the discussion, he can probably shed some light on
>>>> this.
>>>>
>>>> Regards,
>>>> Chesnay
>>>>
>>>>
>>>> On 12.12.2016 15:13, Matt wrote:
>>>>
>>>> In case this is important, if I remove the WindowFunction, and only use
>>>> the FoldFunction it works fine.
>>>>
>>>> I don't see what is wrong...
>>>>
>>>> On Mon, Dec 12, 2016 at 10:53 AM, Matt <dromitl...@gmail.com> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> I'm following the documentation [1] of window functions with
>>>>> incremental aggregations, but I'm getting an "input mismatch" error.
>>>>>
>>>>> The code [2] is almost identical to the one in the documentation, at
>>>>> the bottom you can find the exact error.
>>>>>
>>>>> What am I missing? Can you provide a working example of a fold
>>>>> function with both a FoldFunction and a WindowFunction?
>>>>>
>>>>> Regards,
>>>>> Matt
>>>>>
>>>>> [1] https://ci.apache.org/projects/flink/flink-docs-master/dev/w
>>>>> indows.html#windowfunction-with-incremental-aggregation
>>>>>
>>>>> [2] https://gist.github.com/cc7ed5570e4ce30c3a482ab835e3983d
>>>>>
>>>>
>>>>
>>>>
>>>
>>
>


Re: Incremental aggregations - Example not working

2016-12-12 Thread Matt
I'm using 1.2-SNAPSHOT, should it work in that version?

On Mon, Dec 12, 2016 at 12:18 PM, Yassine MARZOUGUI <
y.marzou...@mindlytix.com> wrote:

> Hi Matt,
>
> What version of Flink are you using?
> The incremental agregation with fold(ACC, FoldFunction, WindowFunction)
> in a new change that will be part of Flink 1.2, for Flink 1.1 the correct
> way to perform incrementation aggregations is : apply(ACC, FoldFunction,
> WindowFunction) (see the docs for 1.1 [1])
>
> [1] : https://ci.apache.org/projects/flink/flink-docs-
> release-1.1/apis/streaming/windows.html#windowfunction-
> with-incremental-aggregation
>
> Best,
> Yassine
>
> 2016-12-12 15:37 GMT+01:00 Chesnay Schepler <ches...@apache.org>:
>
>> Hello Matt,
>>
>> This looks like a bug in the fold() function to me.
>>
>> I'm adding Timo to the discussion, he can probably shed some light on
>> this.
>>
>> Regards,
>> Chesnay
>>
>>
>> On 12.12.2016 15:13, Matt wrote:
>>
>> In case this is important, if I remove the WindowFunction, and only use
>> the FoldFunction it works fine.
>>
>> I don't see what is wrong...
>>
>> On Mon, Dec 12, 2016 at 10:53 AM, Matt <dromitl...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> I'm following the documentation [1] of window functions with incremental
>>> aggregations, but I'm getting an "input mismatch" error.
>>>
>>> The code [2] is almost identical to the one in the documentation, at the
>>> bottom you can find the exact error.
>>>
>>> What am I missing? Can you provide a working example of a fold function
>>> with both a FoldFunction and a WindowFunction?
>>>
>>> Regards,
>>> Matt
>>>
>>> [1] https://ci.apache.org/projects/flink/flink-docs-master/dev/w
>>> indows.html#windowfunction-with-incremental-aggregation
>>>
>>> [2] https://gist.github.com/cc7ed5570e4ce30c3a482ab835e3983d
>>>
>>
>>
>>
>


Re: Incremental aggregations - Example not working

2016-12-12 Thread Matt
In case this is important, if I remove the WindowFunction, and only use the
FoldFunction it works fine.

I don't see what is wrong...

On Mon, Dec 12, 2016 at 10:53 AM, Matt <dromitl...@gmail.com> wrote:

> Hi,
>
> I'm following the documentation [1] of window functions with incremental
> aggregations, but I'm getting an "input mismatch" error.
>
> The code [2] is almost identical to the one in the documentation, at the
> bottom you can find the exact error.
>
> What am I missing? Can you provide a working example of a fold function
> with both a FoldFunction and a WindowFunction?
>
> Regards,
> Matt
>
> [1] https://ci.apache.org/projects/flink/flink-docs-
> master/dev/windows.html#windowfunction-with-incremental-aggregation
>
> [2] https://gist.github.com/cc7ed5570e4ce30c3a482ab835e3983d
>


Incremental aggregations - Example not working

2016-12-12 Thread Matt
Hi,

I'm following the documentation [1] of window functions with incremental
aggregations, but I'm getting an "input mismatch" error.

The code [2] is almost identical to the one in the documentation, at the
bottom you can find the exact error.

What am I missing? Can you provide a working example of a fold function
with both a FoldFunction and a WindowFunction?

Regards,
Matt

[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/windows.html#windowfunction-with-incremental-aggregation

[2] https://gist.github.com/cc7ed5570e4ce30c3a482ab835e3983d


Re: Serializers and Schemas

2016-12-08 Thread Matt
Hi people,

This is what I was talking about regarding a generic de/serializer for POJO
classes [1].

The Serde class in [2] can be used in both Kafka [3] and Flink [4], and it
works out of the box for any POJO class.

Do you see anything wrong in this approach? Any way to improve it?

Cheers,
Matt

[1] https://github.com/Dromit/StreamTest/
[2]
https://github.com/Dromit/StreamTest/blob/master/src/main/java/com/stream/Serde.java
[3]
https://github.com/Dromit/StreamTest/blob/master/src/main/java/com/stream/MainProducer.java#L19
[4]
https://github.com/Dromit/StreamTest/blob/master/src/main/java/com/stream/MainConsumer.java#L19



On Thu, Dec 8, 2016 at 4:15 AM, Tzu-Li (Gordon) Tai <tzuli...@apache.org>
wrote:

> Hi Matt,
>
> 1. There’s some in-progress work on wrapper util classes for Kafka
> de/serializers here [1] that allows
> Kafka de/serializers to be used with the Flink Kafka Consumers/Producers
> with minimal user overhead.
> The PR also has some proposed adds to the documentations for the wrappers.
>
> 2. I feel that it would be good to have more documentation on Flink’s
> de/serializers because they’ve been
> frequently asked about on the mailing lists, but at the same time,
> probably the fastest / efficient de/serialization
> approach would be tailored for each use case, so we’d need to think more
> on the presentation and the purpose
> of the documentation.
>
> Cheers,
> Gordon
>
> [1] https://github.com/apache/flink/pull/2705
>
> On December 8, 2016 at 5:00:19 AM, milind parikh (milindspar...@gmail.com)
> wrote:
>
> Why not use a self-describing format  (json), stream as String and read
> through a json reader and avoid top-level reflection?
>
> Github.com/milindparikh/streamingsi
>
> https://github.com/milindparikh/streamingsi/tree/master/epic-poc/sprint-2-
> simulated-data-no-cdc-advanced-eventing/2-dataprocessing
>
> ?
>
> Apologies if I misunderstood the question. But I can quite see how to
> model your Product class (or indeed POJO) in a fairly generic way ( assumes
> JSON).
>
> The real issues faced when you have different versions of same POJO class
> requires storing enough information to dynamically instantiate the actual
> version of the class; which I believe is beyond the simple use case.
>
> Milind
> On Dec 7, 2016 2:42 PM, "Matt" <dromitl...@gmail.com> wrote:
>
>> I've read your example, but I've found the same problem. You're
>> serializing your POJO as a string, where all fields are separated by "\t".
>> This may work for you, but not in general.
>>
>> https://github.com/luigiselmi/pilot-sc4-fcd-producer/blob/ma
>> ster/src/main/java/eu/bde/sc4pilot/fcd/FcdTaxiEvent.java#L60
>>
>> I would like to see a more "generic" approach for the class Product in my
>> last message. I believe a more general purpose de/serializer for POJOs
>> should be possible to achieve using reflection.
>>
>> On Wed, Dec 7, 2016 at 1:16 PM, Luigi Selmi <luigise...@gmail.com> wrote:
>>
>>> Hi Matt,
>>>
>>> I had the same problem, trying to read some records in event time using
>>> a POJO, doing some transformation and save the result into Kafka for
>>> further processing. I am not yet done but maybe the code I wrote starting
>>> from the Flink Forward 2016 training docs
>>> <http://dataartisans.github.io/flink-training/exercises/popularPlaces.html>
>>> could be useful.
>>>
>>> https://github.com/luigiselmi/pilot-sc4-fcd-producer
>>>
>>>
>>> Best,
>>>
>>> Luigi
>>>
>>> On 7 December 2016 at 16:35, Matt <dromitl...@gmail.com> wrote:
>>>
>>>> Hello,
>>>>
>>>> I don't quite understand how to integrate Kafka and Flink, after a lot
>>>> of thoughts and hours of reading I feel I'm still missing something
>>>> important.
>>>>
>>>> So far I haven't found a non-trivial but simple example of a stream of
>>>> a custom class (POJO). It would be good to have such an example in Flink
>>>> docs, I can think of many many scenarios in which using SimpleStringSchema
>>>> is not an option, but all Kafka+Flink guides insist on using that.
>>>>
>>>> Maybe we can add a simple example to the documentation [1], it would be
>>>> really helpful for many of us. Also, explaining how to create a Flink
>>>> De/SerializationSchema from a Kafka De/Serializer would be really useful
>>>> and would save a lot of time to a lot of people, it's not clear why you
>>>> need both of them or if you need both of them.
>>>>
&

Re: Serializers and Schemas

2016-12-07 Thread Matt
I've read your example, but I've found the same problem. You're serializing
your POJO as a string, where all fields are separated by "\t". This may
work for you, but not in general.

https://github.com/luigiselmi/pilot-sc4-fcd-producer/blob/master/src/main/java/eu/bde/sc4pilot/fcd/FcdTaxiEvent.java#L60

I would like to see a more "generic" approach for the class Product in my
last message. I believe a more general purpose de/serializer for POJOs
should be possible to achieve using reflection.

On Wed, Dec 7, 2016 at 1:16 PM, Luigi Selmi <luigise...@gmail.com> wrote:

> Hi Matt,
>
> I had the same problem, trying to read some records in event time using a
> POJO, doing some transformation and save the result into Kafka for further
> processing. I am not yet done but maybe the code I wrote starting from the 
> Flink
> Forward 2016 training docs
> <http://dataartisans.github.io/flink-training/exercises/popularPlaces.html>
> could be useful.
>
> https://github.com/luigiselmi/pilot-sc4-fcd-producer
>
>
> Best,
>
> Luigi
>
> On 7 December 2016 at 16:35, Matt <dromitl...@gmail.com> wrote:
>
>> Hello,
>>
>> I don't quite understand how to integrate Kafka and Flink, after a lot of
>> thoughts and hours of reading I feel I'm still missing something important.
>>
>> So far I haven't found a non-trivial but simple example of a stream of a
>> custom class (POJO). It would be good to have such an example in Flink
>> docs, I can think of many many scenarios in which using SimpleStringSchema
>> is not an option, but all Kafka+Flink guides insist on using that.
>>
>> Maybe we can add a simple example to the documentation [1], it would be
>> really helpful for many of us. Also, explaining how to create a Flink
>> De/SerializationSchema from a Kafka De/Serializer would be really useful
>> and would save a lot of time to a lot of people, it's not clear why you
>> need both of them or if you need both of them.
>>
>> As far as I know Avro is a common choice for serialization, but I've read
>> Kryo's performance is much better (true?). I guess though that the fastest
>> serialization approach is writing your own de/serializer.
>>
>> 1. What do you think about adding some thoughts on this to the
>> documentation?
>> 2. Can anyone provide an example for the following class?
>>
>> ---
>> public class Product {
>> public String code;
>> public double price;
>> public String description;
>> public long created;
>> }
>> ---
>>
>> Regards,
>> Matt
>>
>> [1] http://data-artisans.com/kafka-flink-a-practical-how-to/
>>
>
>
>
> --
> Luigi Selmi, M.Sc.
> Fraunhofer IAIS Schloss Birlinghoven .
> 53757 Sankt Augustin, Germany
> Phone: +49 2241 14-2440 <+49%202241%20142440>
>
>


Serializers and Schemas

2016-12-07 Thread Matt
Hello,

I don't quite understand how to integrate Kafka and Flink, after a lot of
thoughts and hours of reading I feel I'm still missing something important.

So far I haven't found a non-trivial but simple example of a stream of a
custom class (POJO). It would be good to have such an example in Flink
docs, I can think of many many scenarios in which using SimpleStringSchema
is not an option, but all Kafka+Flink guides insist on using that.

Maybe we can add a simple example to the documentation [1], it would be
really helpful for many of us. Also, explaining how to create a Flink
De/SerializationSchema from a Kafka De/Serializer would be really useful
and would save a lot of time to a lot of people, it's not clear why you
need both of them or if you need both of them.

As far as I know Avro is a common choice for serialization, but I've read
Kryo's performance is much better (true?). I guess though that the fastest
serialization approach is writing your own de/serializer.

1. What do you think about adding some thoughts on this to the
documentation?
2. Can anyone provide an example for the following class?

---
public class Product {
public String code;
public double price;
public String description;
public long created;
}
---

Regards,
Matt

[1] http://data-artisans.com/kafka-flink-a-practical-how-to/


Re: Why use Kafka after all?

2016-11-17 Thread Matt
Just to be clear, what I'm looking for is a way to serialize a POJO class
for Kafka but also for Flink, I'm not sure the interface of both frameworks
are compatible but it seems they aren't.

For Kafka (producer) I need a Serializer and a Deserializer class, and for
Flink (consumer) a SerializationSchema and DeserializationSchema class.

Any example of how to put this together would be greatly appreciated.

On Thu, Nov 17, 2016 at 9:12 PM, Dromit <dromitl...@gmail.com> wrote:

> Tzu-Li Tai, thanks for your response.
>
> I've seen the example you mentioned before, TaxiRideSchema.java, but it's
> way too simplified.
>
> In a real POJO class you may have multiple fields such as integers,
> strings, doubles, etc. So serializing them as a string like in the example
> wouldn't work (you can't put together two arbitrary strings and later split
> the byte array to get each of them, same for two integers, and nearly any
> other types).
>
> I feel there should be a more general way of doing this regardless of the
> fields on the class you're de/serializing.
>
> What do you do in these cases? It should be a pretty common scenario!
>
> Regards,
> Matt
>
> On Wed, Nov 16, 2016 at 2:01 PM, Philipp Bussche <
> philipp.buss...@gmail.com> wrote:
>
>> Hi Dromit
>>
>> I started using Flink with Kafka but am currently looking into Kinesis to
>> replace Kafka.
>> The reason behind this is that eventually my application will run in
>> somebody's cloud and if I go for AWS then I don't have to take care of
>> operating Kafka and Zookeeper myself. I understand this can be a
>> challenging
>> task.
>> Up to know where the Kafka bit is only running in a local test
>> environment I
>> am happy running it as I just start 2 Docker containers and it does the
>> job.
>> But this also means I have no clue how Kafka really works and what I need
>> to
>> be careful with.
>> Besides knowledge which is required as it seems for Kafka costs is another
>> aspect here.
>> If one wants to operate a Kafka cluster plus Zookeeper on let's say the
>> Amazon cloud this might actually be more expensive than "just" using
>> Kinesis
>> as a service.
>> There are apparently draw backs in terms of functionality and performance
>> but for my use case that does not seem to matter.
>>
>> Philipp
>>
>>
>>
>> --
>> View this message in context: http://apache-flink-user-maili
>> ng-list-archive.2336050.n4.nabble.com/Why-use-Kafka-after
>> -all-tp10112p10155.html
>> Sent from the Apache Flink User Mailing List archive. mailing list
>> archive at Nabble.com.
>>
>
>