Re: DI in flink

2023-02-14 Thread Austin Cawley-Edwards
(note: please keep user@flink.apache.org included in replies)

Ah, I see. Then no, this is not provided by Flink. When I've used
dependency inject with Flink in the past, I instantiated everything in the
`open()` method of the Flink Rich* classes. Could you solve this by having
a common base Sink class or builder that does the configuring? I'm just
wondering why it's necessary to solve it in Flink itself.

Best,
Austin

On Tue, Feb 14, 2023 at 11:05 AM Yashoda Krishna T <
yashoda.kris...@unbxd.com> wrote:

> This is my use case.
> I have a sink function to push streaming data to S3. And I have a class
> lets call S3ConnProvider that provides me a connection object to S3, and a
> class lets say S3Util that has functions over S3 which injects
> S3ConnProvider.
> If dependency injection works I can inject S3Util alone in my SinkFunction
> class. If not I have to initialize S3ConnProvider first and then S3Util.
> This can become complex if there are too many initializations
> required depending on the use case.
>
>>


Re: DI in flink

2023-02-14 Thread Austin Cawley-Edwards
What would be the benefits and features over what can be done in user land?

On Tue, Feb 14, 2023 at 10:41 Yashoda Krishna T 
wrote:

> Hi Austin
>
> Yes this can be done in Usrr land.
> Can we do it in flink land too?
>
> Thanks
> Yashoda
>
> On Tue, 14 Feb 2023, 9:05 pm Austin Cawley-Edwards, <
> austin.caw...@gmail.com> wrote:
>
>> Hey Yashoda,
>>
>> This can be done in userland (eg with Dagger <https://dagger.dev/>)
>> unless you're wanting Flink to do something in addition?
>>
>> Best,
>> Austin
>>
>> On Tue, Feb 14, 2023 at 10:01 AM Yashoda Krishna T <
>> yashoda.kris...@unbxd.com> wrote:
>>
>>> Does flink support dependency injection in flink task functions in java?
>>> If not is there an alternative?
>>>
>>


Re: DI in flink

2023-02-14 Thread Austin Cawley-Edwards
Hey Yashoda,

This can be done in userland (eg with Dagger ) unless
you're wanting Flink to do something in addition?

Best,
Austin

On Tue, Feb 14, 2023 at 10:01 AM Yashoda Krishna T <
yashoda.kris...@unbxd.com> wrote:

> Does flink support dependency injection in flink task functions in java?
> If not is there an alternative?
>


Re: How to process mini-batch events in Flink with Datastream API

2023-02-10 Thread Austin Cawley-Edwards
It's been a while, but I think I've done something similar before with
Async I/O [1] and batching records with a window.

This was years ago, so no idea if this was/is good practice, but
essentially it was:

-> Window by batch size (with a timeout trigger to maintain some SLA)
-> Process function that just collects all records in the window
-> Send the entire batch to the AsyncFunction

This approach definitely has some downside, where you don't get to take
advantage of some of the nice per-record things Async I/O gives you
(ordering, retries, etc.) but it does greatly reduce the load on external
services.

Hope that helps,
Austin

[1]:
https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/datastream/operators/asyncio/

On Fri, Feb 10, 2023 at 3:22 PM Leon Xu  wrote:

> I wonder if windows will be the solution when it comes to datastream API.
>
> On Fri, Feb 10, 2023 at 12:07 PM Leon Xu  wrote:
>
>> Hi Flink Users,
>>
>> We wanted to use Flink to run a decoration pipeline, where we would like
>> to make calls to some external service to fetch data and alter the event in
>> the Flink pipeline.
>>
>> Since there's external service call involved so we want to do batch calls
>> so that it can reduce the load on the external service.(batching multiple
>> flink events and just make one external service call)
>>
>> It looks like min-batch might be something we can leverage to achieve
>> that but that feature seems to only exist in table API. We are using
>> datastream API and we are wondering if there's any solution/workaround for
>> this?
>>
>>
>> Thanks
>> Leon
>>
>


Re: Deploying Jobmanager on k8s as a Deployment

2022-09-07 Thread Austin Cawley-Edwards
Hey Gil,

I'm referring to when a pod exits on its own, not when being deleted.
Deployments only support the "Always" restart policy [1].

In my understanding, the JM only cleans up HA data when it is shutdown[2],
after which the process will exit which leads to the problem with k8s
Deployment restart policies.

Best,
Austin

[1]:
https://kubernetes.io/docs/concepts/workloads/controllers/deployment/#pod-template
[2]:
https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/ops/rest_api/#cluster

On Wed, Sep 7, 2022 at 4:43 PM Gil De Grove  wrote:

> Hello Austin,
>
> I'm not aware of any limitations of deployement not letting pod exit
> (correctly or incorrectly). What do you mean by that exactly? Would it be
> possible for you to point out to piece of documentation that make you think
> that ?
>
> A pod, if correctly setup will be exited when receiving it's sigterm or
> sigkill from the orchestrator.
> So when "deleting" the deployment, the pods are quitted correctly. In the
> case flink did triggered a savepoint before, you can then restart from that
> savepoint.
> Usually, when a pod is not being terminated this means that the SIG is not
> transferred to the correct process.
>
> Hopes this helps.
>
> Regards,
> Gil
>
>
> On Wed, Sep 7, 2022, 21:16 Austin CawleyEdwards 
> wrote:
>
>> Cool, thanks! How does it clean up the HA data, if the cluster is never
>> able to shut down (due to the k8s Deployment restriction)?
>>
>> Best,
>> Austin
>>
>> On Mon, Sep 5, 2022 at 6:51 PM Gyula Fóra  wrote:
>>
>>> Hi!
>>>
>>> The operator supports both Flink native and standalone deployment modes
>>> and in both cases the JM is deployed as k8s Deployment.
>>>
>>> During upgrade Flink/operator deletes the deployment after savepoint and
>>> waits for termination before it creates a new one with the updated spec.
>>>
>>> Cheers,
>>> Gyula
>>>
>>> On Mon, 5 Sep 2022 at 07:41, Austin Cawley-Edwards <
>>> austin.caw...@gmail.com> wrote:
>>>
>>>> Hey Marco,
>>>>
>>>> Unfortunately there is no built in k8s API that models an application
>>>> mode JM exactly but Deployments should be fine, in general. As Gyula notes,
>>>> where they can be difficult is during application upgrades as Deployments
>>>> never let their pods exit, even if successful, so there is no way to stop
>>>> the cluster gracefully.
>>>>
>>>> Is stopping your application with a savepoint and redeploying a
>>>> workable solution for image upgrades? In this way a Job could still be
>>>> used.
>>>>
>>>>
>>>> @Gyula, how are JMs handled in the operator? Job, Deployment, or
>>>> something custom?
>>>>
>>>>
>>>> Best,
>>>> Austin
>>>>
>>>>
>>>>
>>>> On Mon, Sep 5, 2022 at 6:15 AM Gyula Fóra  wrote:
>>>>
>>>>> You can use deployments of course , the operator and native k8s
>>>>> integration does exactly that.
>>>>>
>>>>> Even then job updates can be tricky so I believe you are much better
>>>>> off with the operator.
>>>>>
>>>>> Gyula
>>>>>
>>>>> On Sun, 4 Sep 2022 at 11:11, marco andreas 
>>>>> wrote:
>>>>>
>>>>>> Hello,
>>>>>>
>>>>>> Thanks for the response, I will take a look at it.
>>>>>>
>>>>>> But if we aren't able to use the flink operator due to technical
>>>>>> constraints is it possible to deploy the JM as deployment without any
>>>>>> consequences that I am not aware of?
>>>>>>
>>>>>> Sincerely,
>>>>>>
>>>>>> Le sam. 3 sept. 2022 à 23:27, Gyula Fóra  a
>>>>>> écrit :
>>>>>>
>>>>>>> Hi!
>>>>>>> You should check out the Flink Kubernetes Operator. I think that
>>>>>>> covers all your needs .
>>>>>>>
>>>>>>>
>>>>>>> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/
>>>>>>>
>>>>>>> Cheers,
>>>>>>> Gyula
>>>>>>>
>>>>>>> On Sat, 3 Sep 2022 at 13:45, marco andreas <
>>>>>>> marcoandreas...@gmail.com> wrote:
>>>>>>>
>>>>>>>>
>>>>>>>> We are deploying a flink application cluster on k8S. Following the
>>>>>>>> official documentation the JM is deployed As a job resource , however 
>>>>>>>> we
>>>>>>>> are deploying a long running flink job that is not supposed to be
>>>>>>>> terminated and also we need to update the image of the flink job.
>>>>>>>>
>>>>>>>>  The problem is that the job is an immutable resource, we
>>>>>>>> cant update it.
>>>>>>>>
>>>>>>>> So I'm wondering if it's possible to use a deployment resource for
>>>>>>>> the jobmanager and if there will be any side effects or repercussions.
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>>
>>>>>>> Gil De Grove
>


Re: Deploying Jobmanager on k8s as a Deployment

2022-09-07 Thread Austin Cawley-Edwards
Cool, thanks! How does it clean up the HA data, if the cluster is never
able to shut down (due to the k8s Deployment restriction)?

Best,
Austin

On Mon, Sep 5, 2022 at 6:51 PM Gyula Fóra  wrote:

> Hi!
>
> The operator supports both Flink native and standalone deployment modes
> and in both cases the JM is deployed as k8s Deployment.
>
> During upgrade Flink/operator deletes the deployment after savepoint and
> waits for termination before it creates a new one with the updated spec.
>
> Cheers,
> Gyula
>
> On Mon, 5 Sep 2022 at 07:41, Austin Cawley-Edwards <
> austin.caw...@gmail.com> wrote:
>
>> Hey Marco,
>>
>> Unfortunately there is no built in k8s API that models an application
>> mode JM exactly but Deployments should be fine, in general. As Gyula notes,
>> where they can be difficult is during application upgrades as Deployments
>> never let their pods exit, even if successful, so there is no way to stop
>> the cluster gracefully.
>>
>> Is stopping your application with a savepoint and redeploying a workable
>> solution for image upgrades? In this way a Job could still be used.
>>
>>
>> @Gyula, how are JMs handled in the operator? Job, Deployment, or
>> something custom?
>>
>>
>> Best,
>> Austin
>>
>>
>>
>> On Mon, Sep 5, 2022 at 6:15 AM Gyula Fóra  wrote:
>>
>>> You can use deployments of course , the operator and native k8s
>>> integration does exactly that.
>>>
>>> Even then job updates can be tricky so I believe you are much better off
>>> with the operator.
>>>
>>> Gyula
>>>
>>> On Sun, 4 Sep 2022 at 11:11, marco andreas 
>>> wrote:
>>>
>>>> Hello,
>>>>
>>>> Thanks for the response, I will take a look at it.
>>>>
>>>> But if we aren't able to use the flink operator due to technical
>>>> constraints is it possible to deploy the JM as deployment without any
>>>> consequences that I am not aware of?
>>>>
>>>> Sincerely,
>>>>
>>>> Le sam. 3 sept. 2022 à 23:27, Gyula Fóra  a
>>>> écrit :
>>>>
>>>>> Hi!
>>>>> You should check out the Flink Kubernetes Operator. I think that
>>>>> covers all your needs .
>>>>>
>>>>> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/
>>>>>
>>>>> Cheers,
>>>>> Gyula
>>>>>
>>>>> On Sat, 3 Sep 2022 at 13:45, marco andreas 
>>>>> wrote:
>>>>>
>>>>>>
>>>>>> We are deploying a flink application cluster on k8S. Following the
>>>>>> official documentation the JM is deployed As a job resource , however we
>>>>>> are deploying a long running flink job that is not supposed to be
>>>>>> terminated and also we need to update the image of the flink job.
>>>>>>
>>>>>>  The problem is that the job is an immutable resource, we cant update
>>>>>> it.
>>>>>>
>>>>>> So I'm wondering if it's possible to use a deployment resource for
>>>>>> the jobmanager and if there will be any side effects or repercussions.
>>>>>>
>>>>>> Thanks,
>>>>>>
>>>>>


Re: Deploying Jobmanager on k8s as a Deployment

2022-09-05 Thread Austin Cawley-Edwards
Hey Marco,

Unfortunately there is no built in k8s API that models an application mode
JM exactly but Deployments should be fine, in general. As Gyula notes,
where they can be difficult is during application upgrades as Deployments
never let their pods exit, even if successful, so there is no way to stop
the cluster gracefully.

Is stopping your application with a savepoint and redeploying a workable
solution for image upgrades? In this way a Job could still be used.


@Gyula, how are JMs handled in the operator? Job, Deployment, or something
custom?


Best,
Austin



On Mon, Sep 5, 2022 at 6:15 AM Gyula Fóra  wrote:

> You can use deployments of course , the operator and native k8s
> integration does exactly that.
>
> Even then job updates can be tricky so I believe you are much better off
> with the operator.
>
> Gyula
>
> On Sun, 4 Sep 2022 at 11:11, marco andreas 
> wrote:
>
>> Hello,
>>
>> Thanks for the response, I will take a look at it.
>>
>> But if we aren't able to use the flink operator due to technical
>> constraints is it possible to deploy the JM as deployment without any
>> consequences that I am not aware of?
>>
>> Sincerely,
>>
>> Le sam. 3 sept. 2022 à 23:27, Gyula Fóra  a écrit :
>>
>>> Hi!
>>> You should check out the Flink Kubernetes Operator. I think that covers
>>> all your needs .
>>>
>>> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/
>>>
>>> Cheers,
>>> Gyula
>>>
>>> On Sat, 3 Sep 2022 at 13:45, marco andreas 
>>> wrote:
>>>

 We are deploying a flink application cluster on k8S. Following the
 official documentation the JM is deployed As a job resource , however we
 are deploying a long running flink job that is not supposed to be
 terminated and also we need to update the image of the flink job.

  The problem is that the job is an immutable resource, we cant update
 it.

 So I'm wondering if it's possible to use a deployment resource for the
 jobmanager and if there will be any side effects or repercussions.

 Thanks,

>>>


Re: Flink config driven tool ?

2022-06-07 Thread Austin Cawley-Edwards
They support Flink as well. Looks like they even support the new Flink k8s
operator.[1]

Austin

[1]:
https://seatunnel.apache.org/docs/2.1.1/start/kubernetes#deploying-the-operator

On Tue, Jun 7, 2022 at 3:11 PM sri hari kali charan Tummala <
kali.tumm...@gmail.com> wrote:

> thanks but looks like a spark tool is there something similar in flink?
>
> Thanks
> Sri
>
> On Tue, Jun 7, 2022 at 12:07 PM Austin Cawley-Edwards <
> austin.caw...@gmail.com> wrote:
>
>> Hey there,
>>
>> No idea if it's any good, but just saw Apache SeaTunnel[1] today which
>> seems to fit your requirements.
>>
>> Best,
>> Austin
>>
>> [1]: https://seatunnel.apache.org/
>>
>> On Tue, Jun 7, 2022 at 2:19 PM sri hari kali charan Tummala <
>> kali.tumm...@gmail.com> wrote:
>>
>>> Hi Flink Community,
>>>
>>> can someone point me to a good config-driven flink data movement tool
>>> Github repos? Imagine I build my ETL dag connecting source -->
>>> transformations --> target just using a config file.
>>>
>>> below are a few spark examples:-
>>> https://github.com/mvrpl/big-shipper
>>> https://github.com/BitwiseInc/Hydrograph
>>>
>>> Thanks & Regards
>>> Sri Tummala
>>>
>>>
>
> --
> Thanks & Regards
> Sri Tummala
>
>


Re: Flink config driven tool ?

2022-06-07 Thread Austin Cawley-Edwards
Hey there,

No idea if it's any good, but just saw Apache SeaTunnel[1] today which
seems to fit your requirements.

Best,
Austin

[1]: https://seatunnel.apache.org/

On Tue, Jun 7, 2022 at 2:19 PM sri hari kali charan Tummala <
kali.tumm...@gmail.com> wrote:

> Hi Flink Community,
>
> can someone point me to a good config-driven flink data movement tool
> Github repos? Imagine I build my ETL dag connecting source -->
> transformations --> target just using a config file.
>
> below are a few spark examples:-
> https://github.com/mvrpl/big-shipper
> https://github.com/BitwiseInc/Hydrograph
>
> Thanks & Regards
> Sri Tummala
>
>


Re: HTTP REST API as Ingress/Egress

2022-05-19 Thread Austin Cawley-Edwards
Hi Himanshu,

Unfortunately, this is not supported by Statefun, though this type of
application should not be too difficult to using something like the Kafka
Request/Reply pattern[1], and putting that in front of a Statefun cluster.

Best,
Austin

[1]:
https://dzone.com/articles/synchronous-kafka-using-spring-request-reply-1


On Thu, May 19, 2022 at 5:35 AM Himanshu Sareen 
wrote:

> Hi All,
>
>
> It will be of great help if someone can share views.
>
> As per application design. Synchronous access to a stateful fucntion.
>
>1. Application will access/invoke a stateful function via a HTTP call.
>2. Application will wait for an response.
>3. Once Stateful function completes the execution return the response
>back to the Application.
>
>
> Regards
> Himanshu
> --
> *From:* Himanshu Sareen
> *Sent:* Sunday, April 24, 2022 6:59 AM
> *To:* user@flink.apache.org 
> *Subject:* HTTP REST API as Ingress/Egress
>
> Team,
>
> Does flink-statefun support HTTP REST as Ingress ( like Kafka and kinesis )
>
> I'm looking for a fault tolerant solution where an external API can invoke
> stateful function , access state and return response.
>
> We are using python sdk for statefun application
>
> Regards,
> Himanshu
>
>


Re: [Discuss] Creating an Apache Flink slack workspace

2022-05-13 Thread Austin Cawley-Edwards
Hi all,

Would just like to share an interesting article from the dbt community[1],
which in part describes some of their challenges in managing Slack in a
large community. The biggest point it seems to make is that their Slack has
become a marketing tool for dbt/data vendors instead of a community space —
given the diversity of vendors in the Flink space, we may face similar
challenges. Perhaps their experience can help us with the initial
setup/guidelines.

Cheers,
Austin

[1]: https://pedram.substack.com/p/we-need-to-talk-about-dbt?s=r

On Thu, May 12, 2022 at 6:04 AM Robert Metzger  wrote:

> +1 on setting up our own Slack instance (PMC owned)
> +1 for having a separate discussion about setting up a discussion forum (I
> like the idea of using GH discussions)
>
> Besides, we still need to investigate how
>> http://apache-airflow.slack-archives.org works, I think
>> a slack of our own can be easier to set up the archive.
>
>
> This is the code used by airflow: https://github.com/ashb/slackarchive.
> I'm happy to look into setting up the archive for the community.
>
>
> On Thu, May 12, 2022 at 11:00 AM Jark Wu  wrote:
>
>> Hi,
>>
>> I would +1 to create Apache Flink Slack for the lower barriers to entry
>> as Jingsong mentioned.
>> Besides, we still need to investigate how
>> http://apache-airflow.slack-archives.org works, I think
>> a slack of our own can be easier to set up the archive.
>>
>> Regarding Discourse vs Slack, I think they are not exclusive, but
>> complementary.
>> Someday in the future, we might be able to provide them both. But what we
>> are seeking today
>> is a tool that can provide real-time communication and ad-hoc questions
>> and interactions.
>> A forum is more similar to a mailing list. Forum is modern mailing list
>> but can't solve the problems
>> mentioned above. With slack-archives, the information and thoughtful
>> discussion in Slack can also be searchable.
>>
>> I think we can open another thread to discuss creating a forum for Flink
>> and keep this thread focused
>> on Slack. IMO, we can investigate more kinds of forums, for example
>> GitHub Discussion which is free, powerful
>>  and fully-managed. Airflow[1] and Next.JS also use GitHub Discussion as
>> their forum.
>>
>> Best,
>> Jark
>>
>> [1]: https://github.com/apache/airflow/discussions
>> [2]: https://github.com/vercel/next.js/discussions
>>
>>
>> On Thu, 12 May 2022 at 15:24, Martijn Visser 
>> wrote:
>>
>>> Hi,
>>>
>>> I would +1 setting up our own Slack. It will allow us to provide the best
>>> experience for those in the community who want to use Slack.
>>> More than happy to help with setting up community guidelines on how to
>>> use
>>> Slack.
>>>
>>> Best regards,
>>>
>>> Martijn
>>>
>>> On Thu, 12 May 2022 at 05:22, Yun Tang  wrote:
>>>
>>> > Hi all,
>>> >
>>> > I think forum might be a good choice for search and maintain. However,
>>> > unlike slack workspace, it seems no existing popular product could be
>>> > leveraged easily.
>>> >
>>> > Thus, I am +1 to create an Apache Flink slack channel. If the ASF slack
>>> > cannot be joined easily for most of users, I prefer to set up our own
>>> slack
>>> > workspace.
>>> >
>>> > Best
>>> > Yun Tang
>>> > --
>>> > *From:* Jingsong Li 
>>> > *Sent:* Thursday, May 12, 2022 10:49
>>> > *To:* Xintong Song 
>>> > *Cc:* dev ; user 
>>> > *Subject:* Re: [Discuss] Creating an Apache Flink slack workspace
>>> >
>>> > Hi all,
>>> >
>>> > Regarding using ASF slack. I share the problems I saw in the Apache
>>> Druid
>>> > community. [1]
>>> >
>>> > > As you may have heard, it’s become increasingly difficult for new
>>> users
>>> > without an @apache.org email address to join the ASF #druid Slack
>>> channel.
>>> > ASF Infra disabled the option to publicly provide a link to the
>>> workspace
>>> > to anyone who wanted it, after encountering issues with spammers.
>>> >
>>> > > Per Infra’s guidance (https://infra.apache.org/slack.html), new
>>> > community
>>> > members should only be invited as single-channel guests. Unfortunately,
>>> > single-channel guests are unable to extend invitations to new members,
>>> > including their colleagues who are using Druid. Only someone with full
>>> > member privileges is able to extend an invitation to new members. This
>>> lack
>>> > of consistency doesn’t make the community feel inclusive.
>>> >
>>> > > There is a workaround in place (
>>> > https://github.com/apache/druid-website-src/pull/278) – users can
>>> send an
>>> > email to druid-u...@googlegroups.com to request an invite to the Slack
>>> > channel from an existing member – but this still poses a barrier to
>>> entry,
>>> > and isn’t a viable permanent solution. It also creates potential
>>> privacy
>>> > issues as not everyone is at liberty to announce they’re using Druid
>>> nor
>>> > wishes to display their email address in a public forum.
>>> >
>>> > [1] https://lists.apache.org/thread/f36tvfwfo2ssf1x3jb4q0v2pftdyo5z5
>>> >
>>> > Best,
>>> > 

Re: Migrating Flink apps across cloud with state

2022-05-03 Thread Austin Cawley-Edwards
Hey Hemanga,

That's quite annoying of MirrorMaker to change the offsets on you. One
solution would be to use the State Processor API[1] to read the savepoint
and update the offsets to the new ones — does MirrorMaker give you these
ahead of time? There might also be more specific tricks people could give
if you're able to share which cloud/ cloud services you're migrating to and
from.

Best,
Austin

[1]:
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/libs/state_processor_api/

On Tue, May 3, 2022 at 5:11 PM Hemanga Borah 
wrote:

> Any ideas, guys?
>
> On Mon, May 2, 2022 at 6:11 PM Hemanga Borah 
> wrote:
>
>> Hello,
>>  We are attempting to port our Flink applications from one cloud provider
>> to another.
>>
>>  These Flink applications consume data from Kafka topics and output to
>> various destinations (Kafka or databases). The applications have states
>> stored in them. Some of these stored states are aggregations, for example,
>> at times we store hours (or days) worth of data to aggregate over time.
>> Some other applications have cached information for data enrichment, for
>> example, we store data in Flink state for days, so that we can join them
>> with newly arrived data. The amount of data on the input topics is a lot,
>> and it will be expensive to reprocess the data from the beginning of the
>> topic.
>>
>>  As such, we want to retain the state of the application when we move to
>> a different cloud provider so that we can retain the aggregations and
>> cache, and do not have to start from the beginning of the input topics.
>>
>>  We are replicating the Kafka topics using MirrorMaker 2. This is our
>> procedure:
>>
>>- Replicate the input topics of each Flink application from source
>>cloud to destination cloud.
>>- Take a savepoint of the Flink application on the source cloud
>>provider.
>>- Start the Flink application on the destination cloud provider using
>>the savepoint from the source cloud provider.
>>
>>
>> However, this does not work as we want because there is a difference in
>> offset in the new topics in the new cloud provider (because of MirrorMaker
>> implementation). The offsets of the new topic do not match the ones stored
>> on the Flink savepoint, hence, Flink cannot map to the offsets of the new
>> topic during startup.
>>
>> Has anyone tried to move clouds while retaining the Flink state?
>>
>> Thanks,
>> Hemanga
>>
>


Re: how to initialize few things at task managers

2022-04-18 Thread Austin Cawley-Edwards
If you are using Kubernetes to deploy Flink, you could think about an
initContainer on the TMs or a custom Docker entry point that does this
initialization.

Best,
Austin

On Mon, Apr 18, 2022 at 7:49 AM huweihua  wrote:

> Hi, Init stuff when task manager comes up is not an option.
> But if the Keystore file is not changeable and you are using yarn mode,
> maybe you can use ‘yarn.ship-files’[1] to localize it.
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/zh/docs/deployment/config/#yarn-ship-files
>
> 2022年4月16日 下午11:44,Great Info  写道:
>
> I need to download Keystore and use it while creating the source
> connector, currently, I am overriding the open method
> 
>  but
> this gets called for each of my source connectors.
>
>  @Override
> public void open(Configuration configuration) throws Exception {
>
>   // do few things like download Keystore to default path etc
>  super.open(configuration)
> }
>
> Is there an option to init a few pre stuff as soon as the task manager
> comes up?
>
>
>


Re: DataStream request / response

2022-04-08 Thread Austin Cawley-Edwards
Good suggestion – though a common misconception with Statefun is that HTTP
ingestion is possible. Last time I checked it was still under theoretical
discussion. Do you know the current  state there?

Austin

On Fri, Apr 8, 2022 at 1:19 PM Roman Khachatryan  wrote:

> Hi,
>
> Besides the solution suggested by Austing, you might also want to look
> at Stateful Functions [1]. They provide a more convenient programming
> model for the use-case I think, while DataStream is a relatively
> low-level API.
>
> [1]
> https://nightlies.apache.org/flink/flink-statefun-docs-stable/
>
> Regards,
> Roman
>
> On Fri, Apr 8, 2022 at 6:56 PM Austin Cawley-Edwards
>  wrote:
> >
> > Hi Jason,
> >
> > No, there is no HTTP source/ sink support that I know of for Flink.
> Would running the Spring + Kafka solution in front of Flink work for you?
> >
> > On a higher level, what drew you to migrating the microservice to Flink?
> >
> > Best,
> > Austin
> >
> > On Fri, Apr 8, 2022 at 12:35 PM Jason Thomas 
> wrote:
> >>
> >> I'm taking an existing REST based microservice application and moving
> all of the logic into Flink DataStreams.
> >>
> >> Is there an easy way to get a request/response from a Flink DataStream
> so I can 'call' into it from a REST service?   For example, something
> similar to this Kafka streams example that uses Spring
> ReplyingKafkaTemplate - https://stackoverflow.com/a/58202587.
> >>
> >> Thanks for any help!
> >>
> >> -Jason
> >>
>


Re: DataStream request / response

2022-04-08 Thread Austin Cawley-Edwards
Hi Jason,

No, there is no HTTP source/ sink support that I know of for Flink. Would
running the Spring + Kafka solution in front of Flink work for you?

On a higher level, what drew you to migrating the microservice to Flink?

Best,
Austin

On Fri, Apr 8, 2022 at 12:35 PM Jason Thomas 
wrote:

> I'm taking an existing REST based microservice application and moving all
> of the logic into Flink DataStreams.
>
> Is there an easy way to get a request/response from a Flink DataStream
> so I can 'call' into it from a REST service?   For example, something
> similar to this Kafka streams example that uses Spring
> ReplyingKafkaTemplate - https://stackoverflow.com/a/58202587.
>
> Thanks for any help!
>
> -Jason
>
>


Re: Cannot upgrade helm chart

2022-02-21 Thread Austin Cawley-Edwards
Hey Marco,

There’s unfortunately no perfect fit here, at least that I know of. A
Deployment will make it possible to upgrade the image, but does not support
container exits (eg if the Flink job completes, even successfully, K8s will
still restart the container). If you are only running long lived streaming
jobs, this may be acceptable for you, but makes it difficult to stop the
job with a final savepoint (since it will exit after completion).

What you could look into with the Job approach is building an upgrade
procedure that takes the final savepoint, allows the Job to exit, then
deploys a new helm release with the upgraded image and savepoint path. It
is more expensive, but may be more flexible.


Hope that helps,
Austin

On Mon, Feb 21, 2022 at 2:40 PM marco andreas 
wrote:

> Hello flink community,
>
> I am deploying a flink application cluster using a helm chart , the
> problem is that the jobmanager component type is a "Job" , and with helm i
> can't do an upgrade of the chart in order to change the application image
> version  because helm is unable to upgrade the docker image of the kind
> "Job" so i am wondering if i can change the jobmanager kind to a
> "deployment" and if there are any drawbacks in doing do.
>
> Thanks.
>


Re: RMQSource non-parallel, seems inconsistent with documentation

2022-02-18 Thread Austin Cawley-Edwards
Hey Daniel,

I think you're right that the docs are misleading in this case – anything
that extends SourceFunction will always execute at parallelism 1, set
parallelism is ignored. Explicitly setting parallelism in the example in
the docs is unnecessary and confusing. I personally have only used this
with exactly-once semantics (thus always configured parallelism 1), so not
sure what performance limitations there may be with high volume streams. It
might be worth a try to build your own on RichParallelSourceFunction if
you're up for it – I'm sure other people would be interested in the
response.

Best,
Austin

On Fri, Feb 18, 2022 at 5:09 PM Daniel Hristov  wrote:

> Hello,
>
>
>
> I’ve noticed that
> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/rabbitmq/
> suggests that the RabbitMQ Source can be used with parallelization bigger
> than 1, but one can’t get exactly-once delivery guarantee there. However,
> the inheritance chain seems to show RMQSource ->
> MultipleIdsMessageAcknowledgingSourceBase -> MessageAcknowledgingSourceBase
> -> RichSourceFunction (rather than RichParallelSourceFunction)
>
> I imagine either the documentation or the implementation has to be
> corrected here. Any thoughts on what imposes that parallelism on 1 here?
> The only place I found it to be checked up the hierarchy seems to be
> MessageAcknowledgingSourceBase::initializeState
>
>
>
> Have you found this to impose a limitation on the performance of pulling
> messages from Rabbit (assuming that heavier enrichments down the chain are
> properly parallelized)?
>
>
>
> Best, Daniel
>


Re: No effect from --allowNonRestoredState or "execution.savepoint.ignore-unclaimed-state" in K8S application mode

2022-02-18 Thread Austin Cawley-Edwards
Hi Andrey,

It's unclear to me from the docs[1] if the flink native-kubernetes
integration supports setting arbitrary config keys via the CLI. I'm cc'ing
Yang Wang, who has worked a lot in this area and can hopefully help us out.

Best,
Austin

[1]:
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/resource-providers/native_kubernetes/#configuring-flink-on-kubernetes

On Fri, Feb 18, 2022 at 5:14 PM Andrey Bulgakov  wrote:

> Hey all,
>
> I'm working on migrating our Flink job away from Hadoop session mode to
> K8S application mode.
> It's been going great so far but I'm hitting a wall with this seemingly
> simple thing.
>
> In the first phase of the migration I want to remove some operators (their
> state can be discarded) and focus on getting the primary pipeline running
> first.
> For that I have to start the cluster from a savepoint with the
> "allowNonRestoredState" parameter turned on.
>
> The problem is that I can't set it in any way that I'm aware of. I tried 4
> ways separately and simultaneously:
>
> 1) Adding --allowNonRestoredState to flink run-application
> -t kubernetes-application
> 2) Adding -Dexecution.savepoint.ignore-unclaimed-state=true to flink
> run-application -t kubernetes-application
> 3) Adding "execution.savepoint.ignore-unclaimed-state: true" to my local
> flink-conf.yaml where I'm running flink run-application
> 4) Overriding it in the application code:
> val sigh = new Configuration()
> sigh.setBoolean(SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE,
> true)
> env.configure(sigh)
>
> Every time the resulting pod ends up with "false" value for this setting
> in its configmap:
> $ kc describe cm/flink-config-flink-test | grep ignore
> execution.savepoint.ignore-unclaimed-state: false
>
> And I get the exception:
> java.lang.IllegalStateException: Failed to rollback to
> checkpoint/savepoint . Cannot map checkpoint/savepoint state for
> operator 68895e9129981bfc6d96d1dad715298e to the new program, because the
> operator is not available in the new program. If you want to allow to skip
> this, you can set the --allowNonRestoredState option on the CLI.
>
> It seems like something overrides it to false and it never has any effect.
>
> Can this be a bug or am I doing something wrong?
>
> For context, the savepoint is produced by Flink 1.8.2 and the version I'm
> trying to run on K8S is 1.14.3.
>
> --
> With regards,
> Andrey Bulgakov
>
>


Re: Flink Overwrite parameters in ExecutorUtils

2022-02-18 Thread Austin Cawley-Edwards
Hi Dan,

I'm not exactly sure why, but could you help me understand the use case for
changing these parameters in Flink SQL?

Thanks,
Austin

On Fri, Feb 18, 2022 at 8:01 AM Zou Dan  wrote:

> Hi,
> I am using Flink Batch SQL in version 1.11. I find that Flink will
> overwrite some configurations in ExecutorUtils, which means this parameters
> bacome un-configurable, such as `pipeline.object-reuse` and
> `execution.buffer-timeout`, and the default value for this parameters will
> be not align with the doc
> https://nightlies.apache.org/flink/flink-docs-release-1.11/ops/config.html.
> I wonder if there is any special reason for this?
>
> Best,
> Dan Zou
>
>


Re: stream consume from kafka after DB scan is done

2021-11-05 Thread Austin Cawley-Edwards
Hey Qihua,

If I understand correctly, you should be able to with the HybridSource,
released in 1.14 [1]

Best,
Austin

[1]:
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/hybridsource/

On Fri, Nov 5, 2021 at 3:53 PM Qihua Yang  wrote:

> Hi,
>
> Our stream has two sources. one is a Kafka topic, one is a database. Is it
> possible to consume from kafka topic only after DB scan is completed? We
> configured it in batch mode.
>
> Thanks,
> Qihua
>


Re: Running a performance benchmark load test of Flink on new CPUs

2021-11-05 Thread Austin Cawley-Edwards
Hi Vijay,

I'm not too familiar with the subject, but maybe you could have a look at
the flink-faker[1], which generates fake data. I would think you could use
it to write to kafka in one Flink job, and then have another Flink job to
ingest and run your benchmarks.

There is also this microbenchmark repo[2], perhaps that could be useful to
run on different CPUs.

Hope those help,
Austin

[1]: https://github.com/knaufk/flink-faker
[2]: https://github.com/apache/flink-benchmarks

On Fri, Nov 5, 2021 at 5:14 PM Vijay Balakrishnan 
wrote:

> Hi,
> I am a newbie to running a performance benchmark load test of Flink on new
> CPUs.
> Is there an* existing workload generator* that I can use with Kafka and
> then ingest it with Flink KafkaConnector & test the performance against
> various new chips on servers ?
>
> Measuring CPU performance etc, vCPU usage, Latency, throughput etc.
> Pls pardon my ignorance in a lot of these performance related topics.
>
> TIA,
> Vijay
>


Re: IterativeCondition instead of SimpleCondition not matching pattern

2021-11-04 Thread Austin Cawley-Edwards
Thanks for the update, the requirements make sense.

Some follow up questions:
* What time characteristic are you using? Processing or Event?
* Can you describe a bit more what you mean by "input like the one I have
commented bellow"? What is special about the one you have commented?

Best,
Austin

On Thu, Nov 4, 2021 at 4:09 PM Isidoros Ioannou  wrote:

>
>
> -- Forwarded message -
> Από: Isidoros Ioannou 
> Date: Πέμ, 4 Νοε 2021 στις 10:01 μ.μ.
> Subject: Re: IterativeCondition instead of SimpleCondition not matching
> pattern
> To: Austin Cawley-Edwards 
>
>
> Hi Austin,
> thank you for your answer and I really appreciate your willingness to help.
>
> Actually the desired output is the one below
>
> {start=[Model{id=4, text='A', symbol='AU'}], second=[Model{id=5, text='B',
> symbol='AU'}], third=[Model{id=6, text='C', symbol='AU'}],
> fourth=[Model{id=8, text='D', symbol='AU'}]} {start=[Model{id=9, text='A',
> symbol='GB'}], second=[Model{id=10, text='B', symbol='GB'}],
> third=[Model{id=11, text='C', symbol='GB'}], fourth=[Model{id=12, text='D',
> symbol='GB'}]}
> I would like only to generate sequences of Models that have the same
> symbol. I noticed that if an event does not come as input
> like the one I have commented bellow, it breaks all the pattern match and
> the desired output is never produced
>
> DataStream inputStream = env.fromElements(
> Model.of(1, "A", "US"),
> Model.of(2, "B", "US"),
> Model.of(3, "C", "US"),
> Model.of(4, "A", "AU"),
> Model.of(5, "B", "AU"),
> Model.of(6, "C", "AU"),
>   //Model.of(7, "D", "US"),
> Model.of(8, "D", "AU"),
>         Model.of(9, "A", "GB"),
> Model.of(10, "B", "GB"),
> Model.of(13, "D", "GB"),
> Model.of(11, "C", "GB"),
> Model.of(12, "D", "GB")
>
> Kind Regards,
> Isidoros
>
>
> Στις Πέμ, 4 Νοε 2021 στις 8:40 μ.μ., ο/η Austin Cawley-Edwards <
> austin.caw...@gmail.com> έγραψε:
>
>> Hi Isidoros,
>>
>> Thanks for reaching out to the mailing list. I haven't worked with the
>> CEP library in a long time but can try to help. I'm having a little trouble
>> understanding the desired output + rules. Can you mock up the desired
>> output like you have for the fulfilled pattern sequence?
>>
>> Best,
>> Austin
>>
>> On Thu, Nov 4, 2021 at 4:12 AM Isidoros Ioannou 
>> wrote:
>>
>>>
>>> I face an issue when try to match some elements in a Pattern sequence.
>>> Flink 1.11.1 version. Here is my case:
>>>
>>> final StreamExecutionEnvironment env = EnvironmentProvider.getEnvironment();
>>> DataStream inputStream = env.fromElements(
>>> Model.of(1, "A", "US"),
>>> Model.of(2, "B", "US"),
>>> Model.of(3, "C", "US"),
>>> Model.of(4, "A", "AU"),
>>> Model.of(5, "B", "AU"),
>>> Model.of(6, "C", "AU"),
>>>   //Model.of(7, "D"),
>>> Model.of(8, "D", "AU"),
>>> Model.of(9, "A", "GB"),
>>> Model.of(10, "B", "GB"),
>>> Model.of(13, "D", "GB"),
>>> Model.of(11, "C", "GB"),
>>> Model.of(12, "D", "GB")
>>>
>>>
>>> 
>>> ).assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps())
>>> .forceNonParallel();
>>>
>>> Pattern pattern = Pattern.begin("start", 
>>> AfterMatchSkipStrategy.skipToNext())
>>> .where(new IterativeCondition() {
>>> @Override
>>> public boolean filter(Model value, Context ctx) 
>>> throws Exception {
>>> return value.getText().equalsIgnoreCase("A");
>>> }
>>> }).followedBy("second")
>>> .where(new IterativeCondition() {
>>> @Override
>>> public boolean filter(Model value, Context ctx) 

Re: GenericWriteAheadSink, declined checkpoint for a finished source

2021-11-04 Thread Austin Cawley-Edwards
Hi James,

You are correct that since Flink 1.14 [1] (which included FLIP-147 [2])
there is support for checkpointing after some tasks has finished, which
sounds like it will solve this use case.

You may also want to look at the JDBC sink[3] which also supports batching,
as well as some other nice things like retries and batch intervals.

Hope that helps,
Austin


[1]:
https://flink.apache.org/news/2021/09/29/release-1.14.0.html#checkpointing-and-bounded-streams
[2]:
https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finished
[3]:
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/jdbc/

On Wed, Nov 3, 2021 at 12:25 PM James Sandys-Lumsdaine 
wrote:

> Hello,
>
> I have a Flink workflow where I need to upload the output data into a
> legacy SQL Server database and so I have read the section in the Flink book
> about data sinks and utilizing the GenericWriteAheadSink base class. I am
> currently using Flink 1.12.3 although we plan to upgrade to 1.14 shortly.
>
> Firstly, given I will be generating a large amount of data I feel it best
> to use the GenericWriteAheadSink base class so I can bulk copy all the data
> into my SQL Server database rather than attempt a row by row insertion
> which would be too slow. Hopefully this is a good use case for this class
> or is there now a better approach?
>
> Secondly, one thing I noticed is my JDBC source emits ~50,000 rows but the
> program actually exists before a final checkpoint is taken so I miss many
> of the final rows - I have to put in a Thread.sleep(5000) before allowing
> the JDBC source to exit. This might be related to FLINK-21215 as I see the
> following error:
> *org.apache.flink.util.SerializedThrowable: Task NameSource: Trade JDBC
> Source (1/1)#0 Failure reason: Checkpoint was declined (tasks not ready)*
> With the extra Thread.sleep(5000) I see all the rows handled by the
> sendValues() method.
>
> I have included the test code below which just logs the "insertions" for
> now (and doesn't do real db access) but demonstrates the problem:
>
> private void checkpointTest() throws Exception {
> Configuration conf = new Configuration();
> conf.setBoolean(ConfigConstants.*LOCAL_START_WEBSERVER*, true);
> final StreamExecutionEnvironment env = StreamExecutionEnvironment.
> *createLocalEnvironmentWithWebUI*(conf);
> env.setParallelism(1);
> env.enableCheckpointing(500);
>
>
> MyJDBCSource myJDBCSource = new MyJDBCSource(tradesDBConnectionParams,
> fromDttm, toDttm, asOf);
> DataStream jdbcStreamIn = env.addSource(myJDBCSource, "My JDBC
> Source");
>
> jdbcTradesStreamIn.transform("SqlServerSink", TypeInformation.*of*
> (MyObj.class), new SqlServerBulkCopySink(
> new FileCheckpointCommitter("c:\\temp\\FlinkTemp"),
> TypeExtractor.*createTypeInfo*(MyObj.class).createSerializer(new
> ExecutionConfig()),
> UUID.*randomUUID*().toString()));
>
> env.execute();
> }
>
> private static class SqlServerBulkCopySink extends 
> GenericWriteAheadSink
> {
> public SqlServerBulkCopySink(CheckpointCommitter committer, 
> TypeSerializer
> serializer, String jobID) throws Exception {
> super(committer, serializer, jobID);
> }
>
> @Override
> protected boolean sendValues(Iterable objects, long
> checkpointId, long timestamp) {
> *logger*.info("Sending
> {},{}---", checkpointId,
> timestamp);
> for (MyObj myObj: objects)
> *logger*.info("  {},{}: {}", checkpointId, timestamp, trade); //
> this will eventually be a bulk copy insert into the SQL Server database
> return true;
> }
> }
>
>
>
> Am I right in thinking the latest versions of Flink will not suffer from
> this problem or am I hitting something else? To be clear, I am expecting a
> checkpoint to be invoked by Flink to cover all the data I want to insert
> into my DB - how else would I do the final bulk copy if my sendValues() is
> not called?
>
>
> I have more questions about my data sink but I will wait to hear your
> answers.
>
>
> Many thanks in advance,
>
>
> James.
>
>
>


Re: IterativeCondition instead of SimpleCondition not matching pattern

2021-11-04 Thread Austin Cawley-Edwards
Hi Isidoros,

Thanks for reaching out to the mailing list. I haven't worked with the CEP
library in a long time but can try to help. I'm having a little trouble
understanding the desired output + rules. Can you mock up the desired
output like you have for the fulfilled pattern sequence?

Best,
Austin

On Thu, Nov 4, 2021 at 4:12 AM Isidoros Ioannou  wrote:

>
> I face an issue when try to match some elements in a Pattern sequence.
> Flink 1.11.1 version. Here is my case:
>
> final StreamExecutionEnvironment env = EnvironmentProvider.getEnvironment();
> DataStream inputStream = env.fromElements(
> Model.of(1, "A", "US"),
> Model.of(2, "B", "US"),
> Model.of(3, "C", "US"),
> Model.of(4, "A", "AU"),
> Model.of(5, "B", "AU"),
> Model.of(6, "C", "AU"),
>   //Model.of(7, "D"),
> Model.of(8, "D", "AU"),
> Model.of(9, "A", "GB"),
> Model.of(10, "B", "GB"),
> Model.of(13, "D", "GB"),
> Model.of(11, "C", "GB"),
> Model.of(12, "D", "GB")
>
>
> 
> ).assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps())
> .forceNonParallel();
>
> Pattern pattern = Pattern.begin("start", 
> AfterMatchSkipStrategy.skipToNext())
> .where(new IterativeCondition() {
> @Override
> public boolean filter(Model value, Context ctx) throws 
> Exception {
> return value.getText().equalsIgnoreCase("A");
> }
> }).followedBy("second")
> .where(new IterativeCondition() {
> @Override
> public boolean filter(Model value, Context ctx) throws 
> Exception {
>
> return value.getText().equalsIgnoreCase("B");
> }
> }).followedBy("third")
> .where(new IterativeCondition() {
> @Override
> public boolean filter(Model value, Context ctx) throws 
> Exception {
>
> return value.getText().equalsIgnoreCase("C");
> }
> }).followedBy("fourth")
> .where(new IterativeCondition() {
> @Override
> public boolean filter(Model value, Context ctx) throws 
> Exception {
> var  list = 
> StreamSupport.stream(ctx.getEventsForPattern("third").spliterator(), 
> false).collect(Collectors.toList());
> var val = 
> list.get(0).getSymbol().equalsIgnoreCase(value.getSymbol());
> return value.getText().equalsIgnoreCase("D") && val;
> }
> });
>
>
> PatternStream marketOpenPatternStream = 
> CEP.pattern(inputStream, pattern);
>
>  SingleOutputStreamOperator> marketOpenOutput =
> marketOpenPatternStream
> .process(new PatternProcessFunction>() 
> {
> @Override
> public void processMatch(Map> 
> match, Context ctx, Collector> out) throws Exception {
> System.out.println(match);
> out.collect(new ArrayList(match.values()));
> }
> })
>
> What I am trying to succeed is to match only patterns that have the same 
> symbol. If I use SimpleCondition with checks only about the text of the 
> Model(A, B,C..) without the symbol check in the last pattern, the pattern 
> sequence is fulfilled and I get the following output:
>
> {start=[Model{id=1, text='A', symbol='US'}], second=[Model{id=2, text='B',
> symbol='US'}], third=[Model{id=3, text='C', symbol='US'}], 
> fourth=[Model{id=8,text='D',symbol='AU'}]}
>
> {start=[Model{id=4, text='A', symbol='AU'}], second=[Model{id=5, text='B', 
> symbol='AU'}], third=[Model{id=6, text='C', symbol='AU'}], 
> fourth=[Model{id=8, text='D', symbol='AU'}]}
>
> {start=[Model{id=9, text='A', symbol='GB'}], second=[Model{id=10, text='B', 
> symbol='GB'}], third=[Model{id=11, text='C', symbol='GB'}], 
> fourth=[Model{id=12, text='D', symbol='GB'}]}
>
> However I want to avoid the match of elements with id= 1(A),2(B),3(C) with
> the element with id = 8(D). For this reason I put the symbol check with the
> event matched in the previous pattern in the last condition so I dont get
> match since they do not have the same symbol. But after applying the
> condition, now I do not get any output. none of the elements match the
> pattern. What I am missing? Could someone help?
>
>


Re: Flink + K8s

2021-11-02 Thread Austin Cawley-Edwards
Hi Rommel,

That’s correct that K8s will restart the JM pod (assuming it’s been created
by a K8s Job or Deployment), and it will pick up the HA data and resume
work. The only use case for having multiple replicas is faster failover, so
you don’t have to wait for K8s to provision that new pod (which can be
potentially a decent amount of time, if the cluster is scaling up, etc.).

Hope that helps,
Austin

On Tue, Nov 2, 2021 at 4:36 PM Rommel Holmes  wrote:

> Hi,
>
> From my understanding, when i set Flink in HA mode in K8s, I don't need to
> setup more than 1 job manager, because once the job manager dies, K8s will
> restart it for me. Is that the correct understanding or for the HA purpose,
> I still need to setup more than 1 job manager?
>
> Thanks.
>
> Rommel
>
> --
>  Yours
>  Rommel
>
>


Re: HTTP or REST SQL Client

2021-10-02 Thread Austin Cawley-Edwards
Hi Declan,

I think the Flink-sql-gateway[1] is what you’re after, though I’m not sure
of its current state. I’m cc’ing Ingo, who may be able to help direct us.

Best,
Austin

[1]: https://github.com/ververica/flink-sql-gateway

On Sat, Oct 2, 2021 at 10:56 AM Declan Harrison 
wrote:

> Hi All
>
> I know Flink has support for a command line SQL client however I was
> wondering if there is any support for providing a RESTful SQL client over
> HTTP for example?  If not is this something the team would consider
> supporting in the near future?
>
> Thanks
> Declan
>


Re: Can we release new flink-connector-redis? Thanks!

2021-08-17 Thread Austin Cawley-Edwards
Hi Hongbo,

Thanks for your interest in the Redis connector! I'm not entirely sure what
the release process is like for Bahir, but I've pulled in @Robert Metzger
 who has been involved in the project in the past and
can give an update there.

Best,
Austin

On Tue, Aug 17, 2021 at 10:41 AM Hongbo Miao 
wrote:

> Hi Flink friends,
>
> I recently have a question about how to set TTL to make Redis keys expire
> in flink-connector-redis.
> I originally posted at Stack Overflow at
> https://stackoverflow.com/questions/68795044/how-to-set-ttl-to-make-redis-keys-expire-in-flink-connector-redis
>
> Then I found there is a pull request added this feature about 2 years ago
> at https://github.com/apache/bahir-flink/pull/66
> However, it didn’t got released, which confirmed by David in Stack
> Overflow.
>
> I opened a requesting release ticket at
> https://issues.apache.org/jira/browse/BAHIR-279
> Please let me know if I there is a better way to request. Thanks!
>
> Best
> Hongbo
> www.hongbomiao.com
>


Re: Recover from savepoints with Kubernetes HA

2021-07-23 Thread Austin Cawley-Edwards
Great, glad it was an easy fix :) Thanks for following up!

On Fri, Jul 23, 2021 at 3:54 AM Thms Hmm  wrote:

> Finally I found the mistake. I put the „—host 10.1.2.3“ param as one
> argument. I think the savepoint argument was not interpreted correctly or
> ignored. Might be that the „-s“ param was used as value for „—host
> 10.1.2.3“ and „s3p://…“ as new param and because these are not valid
> arguments they were ignored.
>
> Not working:
>
> 23.07.2021 09:19:54.546 INFO
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint -  Program Arguments:
>
> ...
>
> 23.07.2021 09:19:54.549 INFO
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint - --host 10.1.2.3
>
> 23.07.2021 09:19:54.549 INFO
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint - -s
>
> 23.07.2021 09:19:54.549 INFO
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
> s3p://bucket/job1/savepoints/savepoint-00-1234
>
> -
>
> Working:
>
> 23.07.2021 09:19:54.546 INFO
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint -  Program Arguments:
>
> ...
>
> 23.07.2021 09:19:54.549 INFO
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint - --host
>
> 23.07.2021 09:19:54.549 INFO
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint - 10.1.2.3
>
> 23.07.2021 09:19:54.549 INFO
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint - -s
>
> 23.07.2021 09:19:54.549 INFO
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
> s3p://bucket/job1/savepoints/savepoint-00-1234
>
> ...
>
> 23.07.2021 09:37:12.932 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Starting job
>  from savepoint
> s3p://bucket/job1/savepoints/savepoint-00-1234 ()
>
> Thanks again for your help.
>
> Kr Thomas
>
> Yang Wang  schrieb am Fr. 23. Juli 2021 um 04:34:
>
>> Please note that when the job is canceled, the HA data(including the
>> checkpoint pointers) stored in the ConfigMap/ZNode will be deleted.
>>
>> But it is strange that the "-s/--fromSavepoint" does not take effect when
>> redeploying the Flink application. The JobManager logs could help a lot to
>> find the root cause.
>>
>> Best,
>> Yang
>>
>> Austin Cawley-Edwards  于2021年7月22日周四 下午11:09写道:
>>
>>> Hey Thomas,
>>>
>>> Hmm, I see no reason why you should not be able to update the checkpoint
>>> interval at runtime, and don't believe that information is stored in a
>>> savepoint. Can you share the JobManager logs of the job where this is
>>> ignored?
>>>
>>> Thanks,
>>> Austin
>>>
>>> On Wed, Jul 21, 2021 at 11:47 AM Thms Hmm  wrote:
>>>
>>>> Hey Austin,
>>>>
>>>> Thanks for your help.
>>>>
>>>> I tried to change the checkpoint interval as example. The value for it
>>>> comes from an additional config file and is read and set within main() of
>>>> the job.
>>>>
>>>> The job is running in Application mode. Basically the same
>>>> configuration as from the official Flink website but instead of running the
>>>> JobManager as job it is created as deployment.
>>>>
>>>> For the redeployment of the job the REST API is triggered to create a
>>>> savepoint and cancel the job. After completion the deployment is updated
>>>> and the pods are recreated. The -s  Is always added as a
>>>> parameter to start the JobManager (standalone-job.sh). CLI is not involved.
>>>> We have automated these steps. But I tried the steps manually and have the
>>>> same results.
>>>>
>>>> I also tried to trigger a savepoint, scale the pods down, update the
>>>> start parameter with the recent savepoint and renamed
>>>> ‚kubernetes.cluster-id‘ as well as ‚high-availability.storageDir‘.
>>>>
>>>> When I trigger a savepoint with cancel, I also see that the HA config
>>>> maps are cleaned up.
>>>>
>>>>
>>>> Kr Thomas
>>>>
>>>> Austin Cawley-Edwards  schrieb am Mi. 21.
>>>> Juli 2021 um 16:52:
>>>>
>>>>> Hi Thomas,
>>>>>
>>>>> I've got a few questions that will hopefully help get to find an
>>>>> answer:
>>>>>
>>>>> What job properties are you trying to change? Something like
>>>>> parallelism?
>>>>>
>>>>> What mode is your job running in? i.e., Session, Per

Re: Recover from savepoints with Kubernetes HA

2021-07-22 Thread Austin Cawley-Edwards
Hey Thomas,

Hmm, I see no reason why you should not be able to update the checkpoint
interval at runtime, and don't believe that information is stored in a
savepoint. Can you share the JobManager logs of the job where this is
ignored?

Thanks,
Austin

On Wed, Jul 21, 2021 at 11:47 AM Thms Hmm  wrote:

> Hey Austin,
>
> Thanks for your help.
>
> I tried to change the checkpoint interval as example. The value for it
> comes from an additional config file and is read and set within main() of
> the job.
>
> The job is running in Application mode. Basically the same configuration
> as from the official Flink website but instead of running the JobManager as
> job it is created as deployment.
>
> For the redeployment of the job the REST API is triggered to create a
> savepoint and cancel the job. After completion the deployment is updated
> and the pods are recreated. The -s  Is always added as a
> parameter to start the JobManager (standalone-job.sh). CLI is not involved.
> We have automated these steps. But I tried the steps manually and have the
> same results.
>
> I also tried to trigger a savepoint, scale the pods down, update the start
> parameter with the recent savepoint and renamed ‚kubernetes.cluster-id‘ as
> well as ‚high-availability.storageDir‘.
>
> When I trigger a savepoint with cancel, I also see that the HA config maps
> are cleaned up.
>
>
> Kr Thomas
>
> Austin Cawley-Edwards  schrieb am Mi. 21. Juli
> 2021 um 16:52:
>
>> Hi Thomas,
>>
>> I've got a few questions that will hopefully help get to find an answer:
>>
>> What job properties are you trying to change? Something like parallelism?
>>
>> What mode is your job running in? i.e., Session, Per-Job, or Application?
>>
>> Can you also describe how you're redeploying the job? Are you using the
>> Native Kubernetes integration or Standalone (i.e. writing k8s  manifest
>> files yourself)? It sounds like you are using the Flink CLI as well, is
>> that correct?
>>
>> Thanks,
>> Austin
>>
>> On Wed, Jul 21, 2021 at 4:05 AM Thms Hmm  wrote:
>>
>>> Hey,
>>>
>>> we have some application clusters running on Kubernetes and explore the
>>> HA mode which is working as expected. When we try to upgrade a job, e.g.
>>> trigger a savepoint, cancel the job and redeploy, Flink is not restarting
>>> from the savepoint we provide using the -s parameter. So all state is lost.
>>>
>>> If we just trigger the savepoint without canceling the job and redeploy
>>> the HA mode picks up from the latest savepoint.
>>>
>>> But this way we can not upgrade job properties as they were picked up
>>> from the savepoint as it seems.
>>>
>>> Is there any advice on how to do upgrades with HA enabled?
>>>
>>> Flink version is 1.12.2.
>>>
>>> Thanks for your help.
>>>
>>> Kr thomas
>>>
>>


Re: Recover from savepoints with Kubernetes HA

2021-07-21 Thread Austin Cawley-Edwards
Hi Thomas,

I've got a few questions that will hopefully help get to find an answer:

What job properties are you trying to change? Something like parallelism?

What mode is your job running in? i.e., Session, Per-Job, or Application?

Can you also describe how you're redeploying the job? Are you using the
Native Kubernetes integration or Standalone (i.e. writing k8s  manifest
files yourself)? It sounds like you are using the Flink CLI as well, is
that correct?

Thanks,
Austin

On Wed, Jul 21, 2021 at 4:05 AM Thms Hmm  wrote:

> Hey,
>
> we have some application clusters running on Kubernetes and explore the HA
> mode which is working as expected. When we try to upgrade a job, e.g.
> trigger a savepoint, cancel the job and redeploy, Flink is not restarting
> from the savepoint we provide using the -s parameter. So all state is lost.
>
> If we just trigger the savepoint without canceling the job and redeploy
> the HA mode picks up from the latest savepoint.
>
> But this way we can not upgrade job properties as they were picked up from
> the savepoint as it seems.
>
> Is there any advice on how to do upgrades with HA enabled?
>
> Flink version is 1.12.2.
>
> Thanks for your help.
>
> Kr thomas
>


Re: Running two versions of Flink with testcontainers

2021-07-13 Thread Austin Cawley-Edwards
Great, glad it worked out for you!

On Tue, Jul 13, 2021 at 10:32 AM Farouk  wrote:

> Thanks
>
> Finally I tried by running docker commands (thanks for the documentation)
> and it works fine.
>
> Thanks
> Farouk
>
> Le mar. 13 juil. 2021 à 15:48, Austin Cawley-Edwards <
> austin.caw...@gmail.com> a écrit :
>
>> You might also be able to put them in separate networks[1] to get around
>> changing all the ports and still ensuring that they don't see eachother.
>>
>> [1]:
>> https://www.testcontainers.org/features/networking#advanced-networking
>>
>> On Tue, Jul 13, 2021 at 9:07 AM Chesnay Schepler 
>> wrote:
>>
>>> It is possible but you need to make sure that all ports a configured
>>> such that the 2 clusters don't see each other.
>>>
>>> On 13/07/2021 13:21, Farouk wrote:
>>> > Hi
>>> >
>>> > For e2e testing, we run tests with testcontainers. We have several
>>> > jobs and we want to upgrade them one by one
>>> >
>>> > Do you know if it is possible in Docker to run one JM + one TM for
>>> > version 1 and version 2 at the same time?
>>> >
>>> > It looks like either the taskmanager registration is failing for the
>>> > second cluster, either there is a classcastexception
>>> >
>>> > ERROR org.apache.flink.runtime.taskexecutor.TaskExecutor  -
>>> > Registration at ResourceManager failed due to an error
>>> > java.util.concurrent.CompletionException:
>>> > java.lang.ClassCastException: class
>>> > org.apache.flink.util.SerializedValue cannot be cast to class
>>> > org.apache.flink.runtime.registration.RegistrationResponse
>>> > (org.apache.flink.util.SerializedValue and
>>> > org.apache.flink.runtime.registration.RegistrationResponse are in
>>> > unnamed module of loader 'app')
>>> > at
>>> >
>>> java.base/java.util.concurrent.CompletableFuture.encodeThrowable(Unknown
>>> > Source)
>>> > at
>>> >
>>> java.base/java.util.concurrent.CompletableFuture.completeThrowable(Unknown
>>> > Source)
>>> > at
>>> >
>>> java.base/java.util.concurrent.CompletableFuture$UniAccept.tryFire(Unknown
>>> > Source)
>>> > at
>>> >
>>> java.base/java.util.concurrent.CompletableFuture$Completion.run(Unknown
>>> > Source)
>>> > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
>>> > at
>>> >
>>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
>>> > at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>> > at
>>> >
>>> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>> > at
>>> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>> > at
>>> >
>>> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>> > Caused by: java.lang.ClassCastException: class
>>> > org.apache.flink.util.SerializedValue cannot be cast to class
>>> > org.apache.flink.runtime.registration.RegistrationResponse
>>> > (org.apache.flink.util.SerializedValue and
>>> > org.apache.flink.runtime.registration.RegistrationResponse are in
>>> > unnamed module of loader 'app')
>>> > ... 8 common frames omitted
>>> >
>>> >
>>> > Thanks
>>> > Farouk
>>>
>>>
>>>


Re: Running two versions of Flink with testcontainers

2021-07-13 Thread Austin Cawley-Edwards
You might also be able to put them in separate networks[1] to get around
changing all the ports and still ensuring that they don't see eachother.

[1]: https://www.testcontainers.org/features/networking#advanced-networking

On Tue, Jul 13, 2021 at 9:07 AM Chesnay Schepler  wrote:

> It is possible but you need to make sure that all ports a configured
> such that the 2 clusters don't see each other.
>
> On 13/07/2021 13:21, Farouk wrote:
> > Hi
> >
> > For e2e testing, we run tests with testcontainers. We have several
> > jobs and we want to upgrade them one by one
> >
> > Do you know if it is possible in Docker to run one JM + one TM for
> > version 1 and version 2 at the same time?
> >
> > It looks like either the taskmanager registration is failing for the
> > second cluster, either there is a classcastexception
> >
> > ERROR org.apache.flink.runtime.taskexecutor.TaskExecutor  -
> > Registration at ResourceManager failed due to an error
> > java.util.concurrent.CompletionException:
> > java.lang.ClassCastException: class
> > org.apache.flink.util.SerializedValue cannot be cast to class
> > org.apache.flink.runtime.registration.RegistrationResponse
> > (org.apache.flink.util.SerializedValue and
> > org.apache.flink.runtime.registration.RegistrationResponse are in
> > unnamed module of loader 'app')
> > at
> > java.base/java.util.concurrent.CompletableFuture.encodeThrowable(Unknown
> > Source)
> > at
> >
> java.base/java.util.concurrent.CompletableFuture.completeThrowable(Unknown
> > Source)
> > at
> >
> java.base/java.util.concurrent.CompletableFuture$UniAccept.tryFire(Unknown
> > Source)
> > at
> > java.base/java.util.concurrent.CompletableFuture$Completion.run(Unknown
> > Source)
> > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
> > at
> >
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
> > at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> > at
> >
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> > at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> > at
> >
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> > Caused by: java.lang.ClassCastException: class
> > org.apache.flink.util.SerializedValue cannot be cast to class
> > org.apache.flink.runtime.registration.RegistrationResponse
> > (org.apache.flink.util.SerializedValue and
> > org.apache.flink.runtime.registration.RegistrationResponse are in
> > unnamed module of loader 'app')
> > ... 8 common frames omitted
> >
> >
> > Thanks
> > Farouk
>
>
>


Re: Jobmanagers are in a crash loop after upgrade from 1.12.2 to 1.13.1

2021-07-01 Thread Austin Cawley-Edwards
Hi Shilpa,

I've confirmed that "recovered" jobs are not compatible between minor
versions of Flink (e.g., between 1.12 and 1.13). I believe the issue is
that the session cluster was upgraded to 1.13 without first stopping the
jobs running on it.

If this is the case, the workaround is to stop each job on the 1.12 session
cluster with a savepoint, upgrade the session cluster to 1.13, and then
resubmit each job with the desired savepoint.

Is that the case / does the procedure make sense?

Best,
Austin

On Thu, Jul 1, 2021 at 7:52 AM Shilpa Shankar 
wrote:

> Hi Zhu,
>
> Does is mean our upgrades are going to fail and the jobs are not backward
> compatible?
> I did verify the job itself is built using 1.13.0.
>
> Is there a workaround for this?
>
> Thanks,
> Shilpa
>
>
> On Wed, Jun 30, 2021 at 11:14 PM Zhu Zhu  wrote:
>
>> Hi Shilpa,
>>
>> JobType was introduced in 1.13. So I guess the cause is that the client
>> which creates and submit
>> the job is still 1.12.2. The client generates a outdated job graph which
>> does not have its JobType
>> set and resulted in this NPE problem.
>>
>> Thanks,
>> Zhu
>>
>> Austin Cawley-Edwards  于2021年7月1日周四 上午1:54写道:
>>
>>> Hi Shilpa,
>>>
>>> Thanks for reaching out to the mailing list and providing those logs!
>>> The NullPointerException looks odd to me, but in order to better guess
>>> what's happening, can you tell me a little bit more about what your setup
>>> looks like? How are you deploying, i.e., standalone with your own
>>> manifests, the Kubernetes integration of the Flink CLI, some open-source
>>> operator, etc.?
>>>
>>> Also, are you using a High Availability setup for the JobManager?
>>>
>>> Best,
>>> Austin
>>>
>>>
>>> On Wed, Jun 30, 2021 at 12:31 PM Shilpa Shankar 
>>> wrote:
>>>
>>>> Hello,
>>>>
>>>> We have a flink session cluster in kubernetes running on 1.12.2. We
>>>> attempted an upgrade to v 1.13.1, but the jobmanager pods are continuously
>>>> restarting and are in a crash loop.
>>>>
>>>> Logs are attached for reference.
>>>>
>>>> How do we recover from this state?
>>>>
>>>> Thanks,
>>>> Shilpa
>>>>
>>>


Re: Jobmanagers are in a crash loop after upgrade from 1.12.2 to 1.13.1

2021-06-30 Thread Austin Cawley-Edwards
Hi Shilpa,

Thanks for reaching out to the mailing list and providing those logs! The
NullPointerException looks odd to me, but in order to better guess what's
happening, can you tell me a little bit more about what your setup looks
like? How are you deploying, i.e., standalone with your own manifests, the
Kubernetes integration of the Flink CLI, some open-source operator, etc.?

Also, are you using a High Availability setup for the JobManager?

Best,
Austin


On Wed, Jun 30, 2021 at 12:31 PM Shilpa Shankar 
wrote:

> Hello,
>
> We have a flink session cluster in kubernetes running on 1.12.2. We
> attempted an upgrade to v 1.13.1, but the jobmanager pods are continuously
> restarting and are in a crash loop.
>
> Logs are attached for reference.
>
> How do we recover from this state?
>
> Thanks,
> Shilpa
>


Re: Protobuf + Confluent Schema Registry support

2021-06-30 Thread Austin Cawley-Edwards
Hi Vishal,

I don't believe there is another way to solve the problem currently besides
rolling your own serializer.

For the Avro + Schema Registry format, is this Table API format[1] what
you're referring to? It doesn't look there have been discussions around
adding a similar format for Protobuf yet, but perhaps you could start one
based on the avro one[2]?

Best,
Austin

[1]:
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/formats/avro-confluent/
[2]:
https://issues.apache.org/jira/browse/FLINK-11160?jql=project%20%3D%20FLINK%20AND%20text%20~%20%22avro%20schema%22


On Wed, Jun 30, 2021 at 4:50 AM Vishal Surana  wrote:

> Using the vanilla kafka producer, I can write protobuf messages to kafka
> while leveraging schema registry support as well. A flink kafka producer
> requires us to explicity provide a serializer which converts the message to
> a producerrecord containing the serialized bytes of the message. We can't
> make use of the KafkaProtoobufSerializer[T] provided by Confluent. Thus the
> only way I could think of would be to create an instance of
> KafkaProtobufSerializer inside a FlinkSerializationSchema class and use it
> to serialize my messages. The problem with that would be that I would have
> to implement registration of the schema and other tasks done by
> KafkaProtobufSerializer.
>
> Is there any other way to solve this problem?
> Is there a plan to support protobuf serialization along with schema
> registry support?
> I noticed you've recently added Avro + Schema Registry support to your
> codebase but haven't documented it. Is it ready for use?
>


Re: Flink PrometheusReporter support for HTTPS

2021-06-16 Thread Austin Cawley-Edwards
Hi Ashutosh,

Sorry for the delayed response + thanks Robert for the good links and idea.

Alternatively, Flink on K8s is a perfect scenario for running a sidecar
proxy or gateway that handles HTTPS connections. The advantage here is that
you decouple managing SSL certifications + rotation from your actual Flink
applications. If you were to run an HTTPS server directly within Flink, and
changes to that server would inherently cause you to modify your running
Flink application (downtime). In either of the other solutions, Flink
applications stay running independently of "SSL management".

Running a gateway that exposes an HTTPS server, terminates TLS, and
forwards traffic to the Flink pods over HTTP is the simpler approach. You
will still have HTTP traffic to/from your Flink pods, though this may be
acceptable in some situations. For this approach, I would consider either a
Kong[1] or Nginx[2] gateway, which both have good K8s support.

The second option would be to introduce a service mesh like Kuma[3] or
Istio[4] (disclaimer: I'm a Kuma maintainer), which runs a proxy container
next to each of your Flink containers (i.e., on each JobManager and
TaskManager). This ensures that HTTPS traffic goes directly to each Pod,
but can be a bit more cumbersome to set up though they automate much more
of the SSL lifecycle than gateways.

Do those ideas make sense? There is a nice article on the CNCF blog about
API Gateways vs. Service Meshes[5], which I've found helpful in the past.

Best,
Austin

[1]: https://github.com/kong/kong
[2]: https://nginx.org/en/docs/http/configuring_https_servers.html
[3]: https://kuma.io/
[4]: https://istio.io/latest/
[5]:
https://www.cncf.io/blog/2020/03/06/the-difference-between-api-gateways-and-service-mesh/

On Wed, Jun 16, 2021 at 8:50 AM Robert Metzger  wrote:

> It seems like the PrometheusReporter doesn't support HTTPS.
>
> The Flink reporter seems to be based on the HttpServer prometheus client.
> I wonder if using the servlet client would allow us to add HTTPS support:
> https://github.com/prometheus/client_java/blob/master/simpleclient_servlet/src/main/java/io/prometheus/client/exporter/MetricsServlet.java
> / https://github.com/prometheus/client_java#http
> Running the Servlet inside an SSL enabled Jetty should do the trick.
>
> If this is reliable, you could consider contributing this back to Flink.
>
> On Sun, Jun 13, 2021 at 4:27 PM Ashutosh Uttam 
> wrote:
>
>> Hi Austin,
>>
>> I am deploying Flink on K8s with multiple Job Manager pods (For HA)  &
>> Task Manager pods.
>>
>> Each JobManager & Task Manager are running an PrometheusReporter instance
>> and using Prometheus’ service discovery support for Kubernetes to discover
>> all pods (Job Manager & Task Manager) and expose the container as targets
>>
>> Please let me know if a reverse proxy can work on this deployment as we
>> have multiple JMs & TMs and cannot use static scrape targets
>>
>> Regards,
>> Ashutosh
>>
>> On Sun, Jun 13, 2021 at 2:25 AM Austin Cawley-Edwards <
>> austin.caw...@gmail.com> wrote:
>>
>>> Hi Ashutosh,
>>>
>>> How are you deploying your Flink apps? Would running a reverse proxy
>>> like Nginx or Envoy that handles the HTTPS connection work for you?
>>>
>>> Best,
>>> Austin
>>>
>>> On Sat, Jun 12, 2021 at 1:11 PM Ashutosh Uttam 
>>> wrote:
>>>
>>>> Hi All,
>>>>
>>>> Does PrometheusReporter provide support for HTTPS?. I couldn't find any
>>>> information in flink documentation.
>>>>
>>>> Is there any way we can achieve the same?
>>>>
>>>> Thanks & Regards,
>>>> Ashutosh
>>>>
>>>>
>>>>


Re: Flink PrometheusReporter support for HTTPS

2021-06-12 Thread Austin Cawley-Edwards
Hi Ashutosh,

How are you deploying your Flink apps? Would running a reverse proxy like
Nginx or Envoy that handles the HTTPS connection work for you?

Best,
Austin

On Sat, Jun 12, 2021 at 1:11 PM Ashutosh Uttam 
wrote:

> Hi All,
>
> Does PrometheusReporter provide support for HTTPS?. I couldn't find any
> information in flink documentation.
>
> Is there any way we can achieve the same?
>
> Thanks & Regards,
> Ashutosh
>
>
>


Re: RabbitMQ source does not stop unless message arrives in queue

2021-05-18 Thread Austin Cawley-Edwards
Hey all,

Thanks for the details, John! Hmm, that doesn't look too good either  but
probably a different issue with the RMQ source/ sink. Hopefully, the new
FLIP-27 sources will help you guys out there! The upcoming HybridSource in
FLIP-150 [1] might also be interesting to you in finely controlling sources.

@Jose Vargas  I've created FLINK-22698 [2] to
track your issue. Do you have a small reproducible case/ GitHub repo? Also,
would you be able to provide a little bit more about the Flink job that you
see this issue in? i.e. overall parallelism, the parallelism of the
sources/ sinks, checkpointing mode.

Best,
Austin

[1]:
https://cwiki.apache.org/confluence/display/FLINK/FLIP-150%3A+Introduce+Hybrid+Source
[2]: https://issues.apache.org/jira/browse/FLINK-22698

On Thu, May 13, 2021 at 9:25 PM John Morrow 
wrote:

> Hi Jose, hey Austin!!
>
> I know we were just recently looking at trying to consume a fixed number
> of messages from an RMQ source, process them and output them to an RMQ
> sink. As a naive first attempt at stopping the job when the target number
> of messaged had been processed, we put a counter state in the process
> function and tried throwing an exception when the counter >= the target
> message count.
>
> The job had:
>
>- parallelism: 1
>- checkpointing: 1000 (1 sec)
>- restartStrategy: noRestart
>- prefetchCount: 100
>
> Running it with 150 messages in the input queue and 150 also as the target
> number, at the end the queues had:
>
>- output queue - 150
>- input queue - 50
>
> So it looks like it did transfer all the messages, but some unack'd ones
> also got requeued back at the source so end up as duplicates. I know
> throwing an exception in the Flink job is not the same as triggering a
> stateful shutdown, but it might be hitting similar unack issues.
>
> John
>
> --
> *From:* Austin Cawley-Edwards 
> *Sent:* Thursday 13 May 2021 16:49
> *To:* Jose Vargas ; John Morrow <
> johnniemor...@hotmail.com>
> *Cc:* user 
> *Subject:* Re: RabbitMQ source does not stop unless message arrives in
> queue
>
> Hey Jose,
>
> Thanks for bringing this up – it indeed sounds like a bug. There is
> ongoing work to update the RMQ source to the new interface, which might
> address some of these issues (or should, if it is not already), tracked in
> FLINK-20628[1]. Would you be able to create a JIRA issue for this, or would
> you like me to?
>
> At my previous company, we only consumed one Rabbit queue per application,
> so we didn't run into this exactly but did see other weird behavior in the
> RMQ source that could be related. I'm going to cc @John Morrow
>  who might be able to contribute to what he's
> seen working with the source, if he's around. I remember some messages not
> properly being ack'ed during a stateful shutdown via the Ververica
> Platform's stop-with-savepoint functionality that you mention, though that
> might be more related to FLINK-20244[2], perhaps.
>
>
> Best,
> Austin
>
> [1]: https://issues.apache.org/jira/browse/FLINK-20628
> [2]: https://issues.apache.org/jira/browse/FLINK-20244
>
> On Thu, May 13, 2021 at 10:23 AM Jose Vargas 
> wrote:
>
> Hi,
>
> I am using Flink 1.12 to read from and write to a RabbitMQ cluster.
> Flink's RabbitMQ source has some surprising behavior when a
> stop-with-savepoint request is made.
>
> *Expected Behavior:*
> The stop-with-savepoint request stops the job with a FINISHED state.
>
> *Actual Behavior:*
> The stop-with-savepoint request either times out or hangs indefinitely
> unless a message arrives in all the queues that the job consumes from after
> the stop-with-savepoint request is made.
>
>
> I know that one possible workaround is to send a sentinel value to each of
> the queues consumed by the job that the deserialization schema checks in
> its isEndOfStream method. However, this is somewhat cumbersome and
> complicates the continuous delivery of a Flink job. For example,
> Ververica Platform will trigger a stop-with-savepoint for the user if one
> of many possible Flink configurations for a job are changed. The
> stop-with-savepoint can then hang indefinitely because only some of the
> RabbitMQ sources will have reached a FINISHED state.
>
> I have attached the TaskManager thread dump after the save-with-savepoint
> request was made. Most every thread is either sleeping or waiting around
> for locks to be released, and then there are a handful of threads trying to
> read data from a socket via the com.rabbitmq.client.impl.Frame.readFrom
> method.
>
> Ideally, once a stop-with-savepoint request is made, the threads trying to
> read data from RabbitMQ would be interrupted so tha

Re: Helm chart for Flink

2021-05-18 Thread Austin Cawley-Edwards
Hey all,

Yeah, I'd be interested to see the Helm pre-upgrade hook setup, though I'd
agree with you, Alexey, that it does not provide enough control to be a
stable solution.

@Pedro Silva  I don't know if there are talks for an
official operator yet, but Kubernetes support is important to the community
and has heavy investment, as seen on the Roadmap[1], so it will only get
better! Currently, the recommended way to get started on Kubernetes is with
the "Native Kubernetes" resource manager[2], which allows running Flink
Applications on Kubernetes from the command line and might work for you.

Hope that helps!
Austin

[1]: https://flink.apache.org/roadmap.html
[2]:
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/resource-providers/native_kubernetes

On Mon, May 17, 2021 at 7:31 PM Alexey Trenikhun  wrote:

> I think it should be possible to use Helm pre-upgrade hook to take
> savepoint and stop currently running job and then Helm will upgrade image
> tags. The problem is that if you hit timeout while taking savepoint, it is
> not clear how to recover from this situation
>
> Alexey
> ----------
> *From:* Austin Cawley-Edwards 
> *Sent:* Monday, May 17, 2021 1:02 PM
> *To:* Pedro Silva
> *Cc:* user
> *Subject:* Re: Helm chart for Flink
>
> Hi Pedro,
>
> There is currently no official Kubernetes Operator for Flink and, by
> extension, there is no official Helm chart. It would be relatively easy to
> create a chart for simply deploying standalone Flink resources via the
> Kubernetes manifests described here[1], though it would leave out the
> ability to upgrade your Flink application via Helm.
>
> If you need upgrade capabilities (which most people do) *and* need to use
> Helm, the Kubernetes Operator approach is the only option for an
> "all-in-one" experience. In addition to the GCP Operator you mentioned,
> there's also a Helm chart for Lyft's Operator by lightbend[2] as well as an
> operator for the Ververica Platform with support for Helm that I've built
> here[3].
>
>
> Are you already running Flink on Kubernetes, or just looking to get
> started easily?
>
> Best,
> Austin
>
>
> [1]:
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/resource-providers/standalone/kubernetes/
> [2]: https://github.com/lightbend/flink-operator
> [3]:
> https://github.com/fintechstudios/ververica-platform-k8s-operator/blob/master/docs/guides/deployment.md
>
>
> On Mon, May 17, 2021 at 11:01 AM Pedro Silva 
> wrote:
>
>> Hello,
>>
>> Forwarding this question from the dev mailing list in case this is a more
>> appropriate list.
>>
>> Does flink have an official Helm Chart? I haven't been able to find any,
>> the closest most up-to-date one seems to be
>> https://github.com/GoogleCloudPlatform/flink-on-k8s-operator.
>> Is this correct or is there a more mature and/or recommeded helm chart to
>> use?
>>
>> Thank you.
>>
>


Re: Helm chart for Flink

2021-05-17 Thread Austin Cawley-Edwards
Hi Pedro,

There is currently no official Kubernetes Operator for Flink and, by
extension, there is no official Helm chart. It would be relatively easy to
create a chart for simply deploying standalone Flink resources via the
Kubernetes manifests described here[1], though it would leave out the
ability to upgrade your Flink application via Helm.

If you need upgrade capabilities (which most people do) *and* need to use
Helm, the Kubernetes Operator approach is the only option for an
"all-in-one" experience. In addition to the GCP Operator you mentioned,
there's also a Helm chart for Lyft's Operator by lightbend[2] as well as an
operator for the Ververica Platform with support for Helm that I've built
here[3].


Are you already running Flink on Kubernetes, or just looking to get started
easily?

Best,
Austin


[1]:
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/resource-providers/standalone/kubernetes/
[2]: https://github.com/lightbend/flink-operator
[3]:
https://github.com/fintechstudios/ververica-platform-k8s-operator/blob/master/docs/guides/deployment.md


On Mon, May 17, 2021 at 11:01 AM Pedro Silva  wrote:

> Hello,
>
> Forwarding this question from the dev mailing list in case this is a more
> appropriate list.
>
> Does flink have an official Helm Chart? I haven't been able to find any,
> the closest most up-to-date one seems to be
> https://github.com/GoogleCloudPlatform/flink-on-k8s-operator.
> Is this correct or is there a more mature and/or recommeded helm chart to
> use?
>
> Thank you.
>


Re: RabbitMQ source does not stop unless message arrives in queue

2021-05-13 Thread Austin Cawley-Edwards
Hey Jose,

Thanks for bringing this up – it indeed sounds like a bug. There is ongoing
work to update the RMQ source to the new interface, which might address
some of these issues (or should, if it is not already), tracked in
FLINK-20628[1]. Would you be able to create a JIRA issue for this, or would
you like me to?

At my previous company, we only consumed one Rabbit queue per application,
so we didn't run into this exactly but did see other weird behavior in the
RMQ source that could be related. I'm going to cc @John Morrow
 who might be able to contribute to what he's
seen working with the source, if he's around. I remember some messages not
properly being ack'ed during a stateful shutdown via the Ververica
Platform's stop-with-savepoint functionality that you mention, though that
might be more related to FLINK-20244[2], perhaps.


Best,
Austin

[1]: https://issues.apache.org/jira/browse/FLINK-20628
[2]: https://issues.apache.org/jira/browse/FLINK-20244

On Thu, May 13, 2021 at 10:23 AM Jose Vargas 
wrote:

> Hi,
>
> I am using Flink 1.12 to read from and write to a RabbitMQ cluster.
> Flink's RabbitMQ source has some surprising behavior when a
> stop-with-savepoint request is made.
>
> *Expected Behavior:*
> The stop-with-savepoint request stops the job with a FINISHED state.
>
> *Actual Behavior:*
> The stop-with-savepoint request either times out or hangs indefinitely
> unless a message arrives in all the queues that the job consumes from after
> the stop-with-savepoint request is made.
>
>
> I know that one possible workaround is to send a sentinel value to each of
> the queues consumed by the job that the deserialization schema checks in
> its isEndOfStream method. However, this is somewhat cumbersome and
> complicates the continuous delivery of a Flink job. For example,
> Ververica Platform will trigger a stop-with-savepoint for the user if one
> of many possible Flink configurations for a job are changed. The
> stop-with-savepoint can then hang indefinitely because only some of the
> RabbitMQ sources will have reached a FINISHED state.
>
> I have attached the TaskManager thread dump after the save-with-savepoint
> request was made. Most every thread is either sleeping or waiting around
> for locks to be released, and then there are a handful of threads trying to
> read data from a socket via the com.rabbitmq.client.impl.Frame.readFrom
> method.
>
> Ideally, once a stop-with-savepoint request is made, the threads trying to
> read data from RabbitMQ would be interrupted so that all RabbitMQ sources
> would reach a FINISHED state.
>
> Regular checkpoints and savepoints complete successfully, it is only the
> stop-with-savepoint request where I see this behavior.
>
>
> Respectfully,
>
>
> Jose Vargas
>
> Software Engineer, Data Engineering
>
> E: jose.var...@fiscalnote.com
>
> fiscalnote.com   |  info.cq.com
>   | rollcall.com 
>
>


Re: Apache Flink - A question about Tables API and SQL interfaces

2021-05-13 Thread Austin Cawley-Edwards
Hi Mans,

There are currently no public APIs for doing so, though if you're willing
to deal with some breaking changes there are some experimental config
options for late events in the Table API and SQL, seen in the
WIndowEmitStrategy class[1].

Best,
Austin

[1]:
https://github.com/apache/flink/blob/607919c6ea6983ae5ad3f82d63b7d6455c73d225/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/WindowEmitStrategy.scala#L173-L211

On Wed, May 12, 2021 at 5:12 PM M Singh  wrote:

> Thanks Austin for your helpful references.
>
> I did take a look at [2]/[3] - but did not find anything relevant on
> searching for string 'late' (for allowed lateness etc) or side output.  So
> from my understanding the late events will be dropped if I am using Table
> API or SQL and the only option is to use datastream interface.  Please let
> me know if I missed anything.
>
> Thanks again.
>
>
> On Wednesday, May 12, 2021, 04:36:06 PM EDT, Austin Cawley-Edwards <
> austin.caw...@gmail.com> wrote:
>
>
> Hi Mans,
>
> I don't believe there are explicit triggers/evictors/timers in the Table
> API/ SQL, as that is abstracted away from the lower-level DataStream API.
> If you need to get into the fine-grained details, Flink 1.13 has made some
> good improvements in going from the Table API to the DataStream API, and
> back again. [1]
>
> For working with time and lateness with Table API and SQL, some good
> places to look are the GroupBy Window Aggregation section of the Table API
> docs[2], as well as the SQL cookbook[3] and Ververica's SQL training
> wiki[4].
>
> Hope that helps,
> Austin
>
> [1]:
> https://flink.apache.org/news/2021/05/03/release-1.13.0.html#improved-interoperability-between-datastream-api-and-table-apisql
> [2]:
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/tableapi/#groupby-window-aggregation
> [3]:
> https://github.com/ververica/flink-sql-cookbook#aggregations-and-analytics
> [4]: https://github.com/ververica/sql-training/wiki/Queries-and-Time
>
> On Wed, May 12, 2021 at 1:30 PM M Singh  wrote:
>
> Hey Folks:
>
> I have the following questions regarding Table API/SQL in streaming mode:
>
> 1. Is there is a notion triggers/evictors/timers when using Table API or
> SQL interfaces ?
> 2. Is there anything like side outputs and ability to define allowed
> lateness when dealing with the Table API or SQL interfaces ?
>
> If there are any alternate ways for the above when using Table API or SQL,
> please let me know where I can find the relevant documentation/examples.
>
> Thanks for your help.
>
> Mans
>
>
>
>
>


Re: Regarding Stateful Functions

2021-05-12 Thread Austin Cawley-Edwards
Hey Jessy,

I'm not a Statefun expert but, hopefully, I can point you in the right
direction for some of your questions. I'll also cc Gordan, who helps to
maintain Statefun.

*1. Is the stateful function a good candidate for a system(as above) that
> should process incoming requests at the rate of 10K/s depending on various
> dynamic rules and static rules*
> *? *
>

The scale is definitely manageable in a Statefun cluster, and could
possibly be a good fit for dynamic and static rules. Hopefully Gordon can
comment more there. For the general Flink solution to this problem, I
always turn to this great series of blog posts around fraud detection with
dynamic rules[1].

2.* Is Flink capable of accommodating the above-mentioned dynamic rules in
> its states (about 1500 rules per Keyed Event ) for the faster
> transformation of incoming streams? *
>

This may be manageable as well, depending on how you are applying these
rules and what they look like (size, etc.). Can you give any more
information there?


*3.** I**f we are not interested in using AWS lambda or Azure functions,
> what are the other options?. What about using co-located functions and
> embedded functions? * *Is there any benefit in using one over the other
> for my data processing flow?*
>

Yes, you can embed JVM functions via Embedded Modules[2], which in your
case might benefit from the Flink DataStream integration[3]. You can also
host remote functions anywhere, i.e. Kubernetes, behind an NGINX server,
etc. The Module Configuration section[4] will likely shed more light on
what is available. I think the main tradeoffs here are availability,
scalability, and network latency for external functions.

4*.If we are going with embedded functions/co-located functions, is it
> possible to autoscale the application using the recently released reactive
> mode in Flink 1.13?*
>

Statefun 3.0 uses Flink 1.12 but is expected to upgrade to Flink 1.13 in
the next release cycle. There are a few other changes that are necessary to
be compatible with Reactive Mode (i.e make the Statefun Cluster a regular
Flink Application tracked in FLINK-16930 [5]), but it's coming!


On a higher note, what made you interested in Statefun for this use case?
The community is currently trying to expand our understanding of potential
users, so it would be great to hear a bit more!

Best,
Austin

[1]: https://flink.apache.org/news/2020/01/15/demo-fraud-detection.html
[2]:
https://ci.apache.org/projects/flink/flink-statefun-docs-release-3.0/docs/deployment/embedded/#embedded-module-configuration
[3]:
https://ci.apache.org/projects/flink/flink-statefun-docs-release-3.0/docs/sdk/flink-datastream/
[4]:
https://ci.apache.org/projects/flink/flink-statefun-docs-release-3.0/docs/deployment/module/#module-configuration
[5]: https://issues.apache.org/jira/browse/FLINK-16930

On Wed, May 12, 2021 at 11:53 AM Jessy Ping 
wrote:

> Hi all,
>
>
> I have gone through the stateful function's documentation and required
> some expert advice or clarification regarding the following points.
>
>
> *Note: My data processing flow is as follows,*
>
>
> *ingress(10k/s)--> First transformation based on certain static rules -->
> second transformation based on certain dynamic rules --> Third and final
> transformation based on certain dynamic and static rules --> egress*
>
>
> *Questions*
>
> *1. Is the stateful function a good candidate for a system(as above) that
> should process incoming requests at the rate of 10K/s depending on various
> dynamic rules and static rules**? *
>
>
> 2.* Is Flink capable of accommodating the above-mentioned dynamic rules
> in its states (about 1500 rules per Keyed Event ) for the faster
> transformation of incoming streams? *
>
>
> *3.** I**f we are not interested in using AWS lambda or Azure functions,
> what are the other options?. What about using co-located functions and
> embedded functions? * *Is there any benefit in using one over the other
> for my data processing flow?*
>
>
> 4*.If we are going with embedded functions/co-located functions, is it
> possible to autoscale the application using the recently released reactive
> mode in Flink 1.13?*
>
>
> *Thanks*
>
> *Jessy*
>
>
>


Re: Apache Flink - A question about Tables API and SQL interfaces

2021-05-12 Thread Austin Cawley-Edwards
Hi Mans,

I don't believe there are explicit triggers/evictors/timers in the Table
API/ SQL, as that is abstracted away from the lower-level DataStream API.
If you need to get into the fine-grained details, Flink 1.13 has made some
good improvements in going from the Table API to the DataStream API, and
back again. [1]

For working with time and lateness with Table API and SQL, some good places
to look are the GroupBy Window Aggregation section of the Table API
docs[2], as well as the SQL cookbook[3] and Ververica's SQL training
wiki[4].

Hope that helps,
Austin

[1]:
https://flink.apache.org/news/2021/05/03/release-1.13.0.html#improved-interoperability-between-datastream-api-and-table-apisql
[2]:
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/tableapi/#groupby-window-aggregation
[3]:
https://github.com/ververica/flink-sql-cookbook#aggregations-and-analytics
[4]: https://github.com/ververica/sql-training/wiki/Queries-and-Time

On Wed, May 12, 2021 at 1:30 PM M Singh  wrote:

> Hey Folks:
>
> I have the following questions regarding Table API/SQL in streaming mode:
>
> 1. Is there is a notion triggers/evictors/timers when using Table API or
> SQL interfaces ?
> 2. Is there anything like side outputs and ability to define allowed
> lateness when dealing with the Table API or SQL interfaces ?
>
> If there are any alternate ways for the above when using Table API or SQL,
> please let me know where I can find the relevant documentation/examples.
>
> Thanks for your help.
>
> Mans
>
>
>
>
>


Re: Setup of Scala/Flink project using Bazel

2021-05-12 Thread Austin Cawley-Edwards
I know @Aaron Levin  is using `rules_scala` for
building Flink apps, perhaps he can help us out here (and hope he doesn't
mind the ping).



On Wed, May 12, 2021 at 4:13 PM Austin Cawley-Edwards <
austin.caw...@gmail.com> wrote:

> Yikes, I see what you mean. I also can not get `neverlink` or adding the
> org.scala.lang artifacts to the deploy_env to remove them from the uber jar.
>
> I'm not super familiar with sbt/ scala, but do you know how exactly the
> assembly `includeScala` works? Is it just a flag that is passed to scalac?
>
> I've found where rules_scala defines how to call `scalac`, but am lost
> here[1].
>
> Best,
> Austin
>
> [1]:
> https://github.com/bazelbuild/rules_scala/blob/c9cc7c261d3d740eb91ef8ef048b7cd2229d12ec/scala/private/rule_impls.bzl#L72-L139
>
> On Wed, May 12, 2021 at 3:20 PM Salva Alcántara 
> wrote:
>
>> Hi Austin,
>>
>> Yep, removing Flink dependencies is working well as you pointed out.
>>
>> The problem now is that I would also need to remove the scala library...by
>> inspecting the jar you will see a lot of scala-related classes. If you
>> take
>> a look at the end of the build.sbt file, I have
>>
>> ```
>> // exclude Scala library from assembly
>> assembly / assemblyOption  := (assembly /
>> assemblyOption).value.copy(includeScala = false)
>> ```
>>
>> so the fat jar generated by running `sbt assembly` does not contain
>> scala-related classes, which are also "provided". You can compare the
>> bazel-built jar with the one built by sbt
>>
>> ```
>> $ jar tf target/scala-2.12/bazel-flink-scala-assembly-0.1-SNAPSHOT.jar
>> META-INF/MANIFEST.MF
>> org/
>> org/example/
>> BUILD
>> log4j.properties
>> org/example/WordCount$$anon$1$$anon$2.class
>> org/example/WordCount$$anon$1.class
>> org/example/WordCount$.class
>> org/example/WordCount.class
>> ```
>>
>> Note that there are neither Flink nor Scala classes. In the jar generated
>> by
>> bazel, however, I can still see Scala classes...
>>
>>
>>
>> --
>> Sent from:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>
>


Re: Setup of Scala/Flink project using Bazel

2021-05-12 Thread Austin Cawley-Edwards
Yikes, I see what you mean. I also can not get `neverlink` or adding the
org.scala.lang artifacts to the deploy_env to remove them from the uber jar.

I'm not super familiar with sbt/ scala, but do you know how exactly the
assembly `includeScala` works? Is it just a flag that is passed to scalac?

I've found where rules_scala defines how to call `scalac`, but am lost
here[1].

Best,
Austin

[1]:
https://github.com/bazelbuild/rules_scala/blob/c9cc7c261d3d740eb91ef8ef048b7cd2229d12ec/scala/private/rule_impls.bzl#L72-L139

On Wed, May 12, 2021 at 3:20 PM Salva Alcántara 
wrote:

> Hi Austin,
>
> Yep, removing Flink dependencies is working well as you pointed out.
>
> The problem now is that I would also need to remove the scala library...by
> inspecting the jar you will see a lot of scala-related classes. If you take
> a look at the end of the build.sbt file, I have
>
> ```
> // exclude Scala library from assembly
> assembly / assemblyOption  := (assembly /
> assemblyOption).value.copy(includeScala = false)
> ```
>
> so the fat jar generated by running `sbt assembly` does not contain
> scala-related classes, which are also "provided". You can compare the
> bazel-built jar with the one built by sbt
>
> ```
> $ jar tf target/scala-2.12/bazel-flink-scala-assembly-0.1-SNAPSHOT.jar
> META-INF/MANIFEST.MF
> org/
> org/example/
> BUILD
> log4j.properties
> org/example/WordCount$$anon$1$$anon$2.class
> org/example/WordCount$$anon$1.class
> org/example/WordCount$.class
> org/example/WordCount.class
> ```
>
> Note that there are neither Flink nor Scala classes. In the jar generated
> by
> bazel, however, I can still see Scala classes...
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: Setup of Scala/Flink project using Bazel

2021-05-12 Thread Austin Cawley-Edwards
Hi Salva,

I think you're almost there. Confusion is definitely not helped by the
ADDONS/ PROVIDED_ADDONS thingy – I think I tried to get too fancy with that
in the linked thread.

I think the only thing you have to do differently is to adjust the target
you are building/ deploying – instead of
`//src/main/scala/org/example:flink_app_deploy.jar`, your target with the
provided env applied is
`//src/main/scala/org/example:word_count_deploy.jar`. I've verified this in
the following ways:

1. Building and checking the JAR itself
* bazel build //src/main/scala/org/example:word_count_deploy.jar
* jar -tf bazel-bin/src/main/scala/org/example/word_count_deploy.jar | grep
flink
  * Shows only the tools/flink/NoOp class

2. Running the word count jar locally, to ensure the main class is picked
up correctly:
./bazel-bin/src/main/scala/org/example/word_count
USAGE:
WordCount  

3. Had fun with the Bazel query language[1], inspecting the difference in
the dependencies between the deploy env and the word_cound_deploy.jar:

bazel query 'filter("@maven//:org_apache_flink.*",
deps(//src/main/scala/org/example:word_count_deploy.jar) except
deps(//:default_flink_deploy_env))'
INFO: Empty results
Loading: 0 packages loaded

This is to say that there are no Flink dependencies in the deploy JAR that
are not accounted for in the deploy env.


So I think you're all good, but let me know if I've misunderstood! Or if
you find a better way of doing the provided deps – I'd be very interested!

Best,
Austin

[1]: https://docs.bazel.build/versions/master/query.htm

On Wed, May 12, 2021 at 10:28 AM Salva Alcántara 
wrote:

> Hi Austin,
>
> I followed your instructions and gave `rules_jvm_external` a try.
>
> Overall, I think I advanced a bit, but I'm not quite there yet. I have
> followed the link [1] given by Matthias, making the necessary changes to my
> repo:
>
> https://github.com/salvalcantara/bazel-flink-scala
>
> In particular, the relevant (bazel) BUILD file looks like this:
>
> ```
> package(default_visibility = ["//visibility:public"])
>
> load("@io_bazel_rules_scala//scala:scala.bzl", "scala_library",
> "scala_test")
>
> filegroup(
> name = "scala-main-srcs",
> srcs = glob(["*.scala"]),
> )
>
> scala_library(
> name = "flink_app",
> srcs = [":scala-main-srcs"],
> deps = [
> "@maven//:org_apache_flink_flink_core",
> "@maven//:org_apache_flink_flink_clients_2_12",
> "@maven//:org_apache_flink_flink_scala_2_12",
> "@maven//:org_apache_flink_flink_streaming_scala_2_12",
> "@maven//:org_apache_flink_flink_streaming_java_2_12",
> ],
> )
>
> java_binary(
> name = "word_count",
> srcs = ["//tools/flink:noop"],
> deploy_env = ["//:default_flink_deploy_env"],
> main_class = "org.example.WordCount",
> deps = [
> ":flink_app",
> ],
> )
> ```
>
> The idea is to use `deploy_env` within `java_binary` for providing the
> flink
> dependencies. This causes those dependencies to get removed from the final
> fat jar that one gets by running:
>
> ```
> bazel build //src/main/scala/org/example:flink_app_deploy.jar
> ```
>
> The problem now is that the jar still includes the Scala library, which
> should also be dropped from the jar as it is part of the provided
> dependencies within the Flink cluster. I am reading this blog post in [2]
> without luck yet...
>
> Regards,
>
> Salva
>
> [1]
>
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Does-anyone-have-an-example-of-Bazel-working-with-Flink-td35898.html
>
> [2]
>
> https://yishanhe.net/address-dependency-conflict-for-bazel-built-scala-spark/
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: StopWithSavepoint() method doesn't work in Java based flink native k8s operator

2021-05-04 Thread Austin Cawley-Edwards
Hey all,

Thanks for the ping, Matthias. I'm not super familiar with the details of @Yang
Wang 's operator, to be honest :(. Can you share
some of your FlinkApplication specs?

For the `kubectl logs` command, I believe that just reads stdout from the
container. Which logging framework are you using in your application and
how have you configured it? There's a good guide for configuring the
popular ones in the Flink docs[1]. For instance, if you're using the
default Log4j 2 framework you should configure a ConsoleAppender[2].

Hope that helps a bit,
Austin

[1]:
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/advanced/logging/
[2]:
https://logging.apache.org/log4j/2.x/manual/appenders.html#ConsoleAppender

On Tue, May 4, 2021 at 1:59 AM Matthias Pohl  wrote:

> Hi Fuyao,
> sorry for not replying earlier. The stop-with-savepoint operation
> shouldn't only suspend but terminate the job. Is it that you might have a
> larger state that makes creating the savepoint take longer? Even though,
> considering that you don't experience this behavior with your 2nd solution,
> I'd assume that we could ignore this possibility.
>
> I'm gonna add Austin to the conversation as he worked with k8s operators
> as well already. Maybe, he can also give you more insights on the logging
> issue which would enable us to dig deeper into what's going on with
> stop-with-savepoint.
>
> Best,
> Matthias
>
> On Tue, May 4, 2021 at 4:33 AM Fuyao Li  wrote:
>
>> Hello,
>>
>>
>>
>> Update:
>>
>> I think stopWithSavepoint() only suspend the job. It doesn’t actually
>> terminate (./bin/flink cancel) the job. I switched to cancelWithSavepoint()
>> and it works here.
>>
>>
>>
>> Maybe stopWithSavepoint() should only be used to update the
>> configurations like parallelism? For updating the image, this seems to be
>> not suitable, please correct me if I am wrong.
>>
>>
>>
>> For the log issue, I am still a bit confused. Why it is not available in
>> kubectl logs. How should I get access to it?
>>
>>
>>
>> Thanks.
>>
>> Best,
>>
>> Fuyao
>>
>>
>>
>> *From: *Fuyao Li 
>> *Date: *Sunday, May 2, 2021 at 00:36
>> *To: *user , Yang Wang 
>> *Subject: *[External] : Re: StopWithSavepoint() method doesn't work in
>> Java based flink native k8s operator
>>
>> Hello,
>>
>>
>>
>> I noticed that first trigger a savepoint and then delete the deployment
>> might cause the duplicate data issue. That could pose a bad influence to
>> the semantic correctness. Please give me some hints on how to make the
>> stopWithSavepoint() work correctly with Fabric8io Java k8s client to
>> perform this image update operation. Thanks!
>>
>>
>>
>> Best,
>>
>> Fuyao
>>
>>
>>
>>
>>
>>
>>
>> *From: *Fuyao Li 
>> *Date: *Friday, April 30, 2021 at 18:03
>> *To: *user , Yang Wang 
>> *Subject: *[External] : Re: StopWithSavepoint() method doesn't work in
>> Java based flink native k8s operator
>>
>> Hello Community, Yang,
>>
>>
>>
>> I have one more question for logging. I also noticed that if I execute
>> kubectl logs  command to the JM. The pods provisioned by the operator can’t
>> print out the internal Flink logs in the kubectl logs. I can only get
>> something like the logs below. No actual flink logs is printed here… Where
>> can I find the path to the logs? Maybe use a sidecar container to get it
>> out? How can I get the logs without checking the Flink WebUI? Also, the sed
>> error makes me confused here. In fact, the application is already up and
>> running correctly if I access the WebUI through Ingress.
>>
>>
>>
>> Reference:
>> https://github.com/wangyang0918/flink-native-k8s-operator/issues/4
>> 
>>
>>
>>
>>
>>
>> [root@bastion deploy]# kubectl logs -f flink-demo-594946fd7b-822xk
>>
>>
>>
>> sed: couldn't open temporary file /opt/flink/conf/sedh1M3oO: Read-only
>> file system
>>
>> sed: couldn't open temporary file /opt/flink/conf/sed8TqlNR: Read-only
>> file system
>>
>> /docker-entrypoint.sh: line 75: /opt/flink/conf/flink-conf.yaml:
>> Read-only file system
>>
>> sed: couldn't open temporary file /opt/flink/conf/sedvO2DFU: Read-only
>> file system
>>
>> /docker-entrypoint.sh: line 88: /opt/flink/conf/flink-conf.yaml:
>> Read-only file system
>>
>> /docker-entrypoint.sh: line 90: /opt/flink/conf/flink-conf.yaml.tmp:
>> Read-only file system
>>
>> Start command: $JAVA_HOME/bin/java -classpath $FLINK_CLASSPATH
>> -Xmx3462817376 -Xms3462817376 -XX:MaxMetaspaceSize=268435456
>> org.apache.flink.kubernetes.entrypoint.KubernetesApplicationClusterEntrypoint
>> -D jobmanager.memory.off-heap.size=134217728b -D
>> jobmanager.memory.jvm-overhead.min=429496736b -D
>> jobmanager.memory.jvm-metaspace.size=268435456b -D
>> jobmanager.memory.heap.size=3462817376b -D
>> jobmanager.memory.jvm-overhead.max=429496736b
>>
>> ERROR StatusLogger No Log4j 2 configuration file found. Using 

Re: Setup of Scala/Flink project using Bazel

2021-05-04 Thread Austin Cawley-Edwards
Great! Feel free to post back if you run into anything else or come up with
a nice template – I agree it would be a nice thing for the community to
have.

Best,
Austin

On Tue, May 4, 2021 at 12:37 AM Salva Alcántara 
wrote:

> Hey Austin,
>
> There was no special reason for vendoring using `bazel-deps`, really. I
> just
> took another project as a reference for mine and that project was already
> using `bazel-deps`. I am going to give `rules_jvm_external` a try, and
> hopefully I can make it work!
>
> Regards,
>
> Salva
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: Setup of Scala/Flink project using Bazel

2021-05-03 Thread Austin Cawley-Edwards
Hey Salva,

This appears to be a bug in the `bazel-deps` tool, caused by mixing scala
and Java dependencies. The tool seems to use the same target name for both,
and thus produces duplicate targets (one for scala and one for java).

If you look at the dict lines that are reported as conflicting, you'll see
the duplicate "vendor/org/apache/flink:flink_clients" target:

*"vendor/org/apache/flink:flink_clients":
["lang||java","name||//vendor/org/apache/flink:flink_clients",*
...],
*"vendor/org/apache/flink:flink_clients":
["lang||scala:2.12.11","name||//vendor/org/apache/flink:flink_clients",
*...],

Can I ask what made you choose the `bazel-deps` too instead of the official
bazelbuild/rules_jvm_external[1]? That might be a bit more verbose, but has
better support and supports scala as well.


Alternatively, you might look into customizing the target templates for
`bazel-deps` to suffix targets with the lang? Something like:

_JAVA_LIBRARY_TEMPLATE = """
java_library(
  name = "{name}_java",
..."""

_SCALA_IMPORT_TEMPLATE = """
scala_import(
name = "{name}_scala",
..."""


Best,
Austin

[1]: https://github.com/bazelbuild/rules_jvm_external

On Mon, May 3, 2021 at 1:20 PM Salva Alcántara 
wrote:

> Hi Matthias,
>
> Thanks a lot for your reply. I am already aware of that reference, but it's
> not exactly what I need. What I'd like to have is the typical word count
> (hello world) app migrated from sbt to bazel, in order to use it as a
> template for my Flink/Scala apps.
>
>
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: Alternatives to JDBCAppendTableSink in Flink 1.11

2021-04-21 Thread Austin Cawley-Edwards
Great to hear!

Austin

On Wed, Apr 21, 2021 at 6:19 AM Sambaran  wrote:

> Hi Austin,
>
> Many thanks, we indeed were using the Api incorrectly. Now in local tests
> we can see the data population happened in the postgres.
>
> Have a nice day!
>
> Regards
> Sambaran
>
> On Tue, Apr 20, 2021 at 8:11 PM Austin Cawley-Edwards <
> austin.caw...@gmail.com> wrote:
>
>> Hi Sambaran,
>>
>> I'm not sure if this is the best approach, though I don't know your full
>> use case/ implementation.
>>
>> What kind of error do you get when trying to map into a
>> PreparedStatement? I assume you tried something like this?
>>
>> SingleOutputStreamOperator stream =
>> env.fromElements(Row.of("YourProcedureA"), Row.of("YourProcedureB"));
>>
>> stream.addSink(JdbcSink.sink(
>>"EXEC ?",
>>(preparedStatement, row) -> {
>>   // extend `preparedStatement` with row info
>>   preparedStatement.setString(0, (String) row.getField(0));
>>},
>>new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
>>   .withUrl("jdbc:derby:memory:ebookshop")
>>   .withDriverName("org.apache.derby.jdbc.EmbeddedDriver")
>>   .build()));
>>
>> Best,
>> Austin
>>
>> On Tue, Apr 20, 2021 at 12:42 PM Sambaran  wrote:
>>
>>> Hi Austin,
>>>
>>> We are using this for jdbc interfacing to populate postgres tables based
>>> on the data coming from an event source.
>>>
>>> We tried with the approach mentioned in the doc
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/connectors/jdbc.html
>>> but did not find a suitable way to map SingleOutputStreamOperator .
>>> Can you please let me know if this is the right approach and if yes, how do
>>> we map the SingleOutputStreamOperator to the preparedstatement in
>>> JdbcStatementBuilder?
>>>
>>> Thanks for your help!
>>>
>>> Regards
>>> Sambaran
>>>
>>> On Tue, Apr 20, 2021 at 6:30 PM Austin Cawley-Edwards <
>>> austin.caw...@gmail.com> wrote:
>>>
>>>> Great, thanks for the clarification! I'm checking with others now. Are
>>>> you using other parts of the Table/SQL APIs, or just this for JDBC
>>>> interfacing?
>>>>
>>>> Best,
>>>> Austin
>>>>
>>>> On Tue, Apr 20, 2021 at 12:20 PM Sambaran 
>>>> wrote:
>>>>
>>>>> Hi Austin,
>>>>>
>>>>> Thanks for replying. This is exactly as you mentioned here. Do we have
>>>>> a way to execute the procedure with 1.11 or upper version as
>>>>> JDBCAppendTableSink is no longer available with these?
>>>>>
>>>>> Regards
>>>>> Sambaran
>>>>>
>>>>> On Tue, Apr 20, 2021 at 6:11 PM Austin Cawley-Edwards <
>>>>> austin.caw...@gmail.com> wrote:
>>>>>
>>>>>> Hey Sambaran,
>>>>>>
>>>>>> I'm not too familiar with the 1.7 JDBCAppendTableSink, but to make
>>>>>> sure I understand what you're current solution looks like, it's something
>>>>>> like the following, where you're triggering a procedure on each element 
>>>>>> of
>>>>>> a stream?
>>>>>>
>>>>>>   JDBCAppendTableSink sink = JDBCAppendTableSink.builder()
>>>>>> .setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
>>>>>> .setDBUrl("jdbc:derby:memory:ebookshop")
>>>>>> .setQuery("EXEC YourProcedure")
>>>>>> .build();
>>>>>>
>>>>>> SingleOutputStreamOperator stream =
>>>>>> env.fromElements(Row.of("a"), Row.of("b"));
>>>>>> sink.emitDataStream(stream);
>>>>>>
>>>>>> Or something else?
>>>>>>
>>>>>> Best,
>>>>>> Austin
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Tue, Apr 20, 2021 at 11:10 AM Sambaran 
>>>>>> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> I am currently using JDBCAppendTableSink to execute database stored
>>>>>>> procedures from flink to populate data to external tables using
>>>>>>>  SingleOutputStreamOperator (version 1.7). Now we are trying to update 
>>>>>>> to
>>>>>>> Flink 1.11/ later and found JDBCAppendTableSink has been removed, 
>>>>>>> currently
>>>>>>> when looking for an alternative I could not find any suitable approach
>>>>>>> which would call database stored procedure. Is there any alternative
>>>>>>> approach to resolve this?
>>>>>>>
>>>>>>> Regards
>>>>>>> Sambaran
>>>>>>>
>>>>>>


Re: Alternatives to JDBCAppendTableSink in Flink 1.11

2021-04-20 Thread Austin Cawley-Edwards
Hi Sambaran,

I'm not sure if this is the best approach, though I don't know your full
use case/ implementation.

What kind of error do you get when trying to map into a PreparedStatement?
I assume you tried something like this?

SingleOutputStreamOperator stream =
env.fromElements(Row.of("YourProcedureA"), Row.of("YourProcedureB"));

stream.addSink(JdbcSink.sink(
   "EXEC ?",
   (preparedStatement, row) -> {
  // extend `preparedStatement` with row info
  preparedStatement.setString(0, (String) row.getField(0));
   },
   new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
  .withUrl("jdbc:derby:memory:ebookshop")
  .withDriverName("org.apache.derby.jdbc.EmbeddedDriver")
  .build()));

Best,
Austin

On Tue, Apr 20, 2021 at 12:42 PM Sambaran  wrote:

> Hi Austin,
>
> We are using this for jdbc interfacing to populate postgres tables based
> on the data coming from an event source.
>
> We tried with the approach mentioned in the doc
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/connectors/jdbc.html
> but did not find a suitable way to map SingleOutputStreamOperator .
> Can you please let me know if this is the right approach and if yes, how do
> we map the SingleOutputStreamOperator to the preparedstatement in
> JdbcStatementBuilder?
>
> Thanks for your help!
>
> Regards
> Sambaran
>
> On Tue, Apr 20, 2021 at 6:30 PM Austin Cawley-Edwards <
> austin.caw...@gmail.com> wrote:
>
>> Great, thanks for the clarification! I'm checking with others now. Are
>> you using other parts of the Table/SQL APIs, or just this for JDBC
>> interfacing?
>>
>> Best,
>> Austin
>>
>> On Tue, Apr 20, 2021 at 12:20 PM Sambaran  wrote:
>>
>>> Hi Austin,
>>>
>>> Thanks for replying. This is exactly as you mentioned here. Do we have a
>>> way to execute the procedure with 1.11 or upper version as
>>> JDBCAppendTableSink is no longer available with these?
>>>
>>> Regards
>>> Sambaran
>>>
>>> On Tue, Apr 20, 2021 at 6:11 PM Austin Cawley-Edwards <
>>> austin.caw...@gmail.com> wrote:
>>>
>>>> Hey Sambaran,
>>>>
>>>> I'm not too familiar with the 1.7 JDBCAppendTableSink, but to make sure
>>>> I understand what you're current solution looks like, it's something like
>>>> the following, where you're triggering a procedure on each element of a
>>>> stream?
>>>>
>>>>   JDBCAppendTableSink sink = JDBCAppendTableSink.builder()
>>>> .setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
>>>> .setDBUrl("jdbc:derby:memory:ebookshop")
>>>> .setQuery("EXEC YourProcedure")
>>>> .build();
>>>>
>>>> SingleOutputStreamOperator stream =
>>>> env.fromElements(Row.of("a"), Row.of("b"));
>>>> sink.emitDataStream(stream);
>>>>
>>>> Or something else?
>>>>
>>>> Best,
>>>> Austin
>>>>
>>>>
>>>>
>>>>
>>>> On Tue, Apr 20, 2021 at 11:10 AM Sambaran 
>>>> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> I am currently using JDBCAppendTableSink to execute database stored
>>>>> procedures from flink to populate data to external tables using
>>>>>  SingleOutputStreamOperator (version 1.7). Now we are trying to update to
>>>>> Flink 1.11/ later and found JDBCAppendTableSink has been removed, 
>>>>> currently
>>>>> when looking for an alternative I could not find any suitable approach
>>>>> which would call database stored procedure. Is there any alternative
>>>>> approach to resolve this?
>>>>>
>>>>> Regards
>>>>> Sambaran
>>>>>
>>>>


Re: Max-parellelism limitation

2021-04-20 Thread Austin Cawley-Edwards
Hi Olivier,

Someone will correct me if I'm wrong, but I believe the max-parallelism
limitation, where you cannot scale up past the previously defined
max-parallelism, applies to all stateful jobs no matter which type of state
you are using.

If you haven't seen it already, I think the Production Readiness checklist
[1] offers a good explanation for this behavior/ best practices for setting
it.

Best,
Austin

[1]:
https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/production_ready/#set-an-explicit-max-parallelism

On Tue, Apr 20, 2021 at 9:33 AM Olivier Nouguier 
wrote:

> Hi, thank you all for your reply, by limitation I meant the impossibility
> to resume a job when scaling up because of this max-parallelism.
> To be more precise, in our deployment, the operator (max) parallelism is
> computed from the number of available slots ( ~~ task-manager * core ),
> this approach is probably too naive, at least with keyed state.
>
>
>
> On Tue, Apr 20, 2021 at 10:46 AM Chesnay Schepler 
> wrote:
>
>> @Olivier Could you clarify which limitation you are referring to?
>>
>> On 4/20/2021 5:23 AM, Guowei Ma wrote:
>>
>> Hi, Olivier
>> Yes. The introduction of this concept is to solve the problem of
>> rescaling the keystate.
>> Best,
>> Guowei
>>
>>
>> On Mon, Apr 19, 2021 at 8:56 PM Olivier Nouguier <
>> olivier.nougu...@teads.com> wrote:
>>
>>> Hi,
>>>   May I have the confirmation that the max-parallelism limitation only
>>> occurs when keyed states are used ?
>>>
>>>
>>> --
>>>
>>> Olivier Nouguier
>>>
>>> SSE
>>>
>>> e | olivier.nougu...@teads.com m | 0651383971
>>>
>>> Teads France SAS, 159 rue de Thor, Business Plaza, Bat. 4, 34000
>>> Montpellier, France
>>> [image: image] 
>>> The information in this email is confidential and intended only for the
>>> addressee(s) named above. If you are not the intended recipient any
>>> disclosure, copying, distribution or any action taken or omitted to be
>>> taken in reliance on it is prohibited and may be unlawful. Teads does
>>> not warrant that any attachment(s) are free from viruses or other defects
>>> and accept no liability for any losses resulting from infected email
>>> transmission. Please note that any views expressed in this email may be
>>> those of the originator and do not necessarily reflect those of the
>>> organization.
>>>
>>
>>
>
> --
>
> Olivier Nouguier
>
> SSE
>
> e | olivier.nougu...@teads.com m | 0651383971
>
> Teads France SAS, 159 rue de Thor, Business Plaza, Bat. 4, 34000
> Montpellier, France
> [image: image] 
>
> The information in this email is confidential and intended only for the
> addressee(s) named above. If you are not the intended recipient any
> disclosure, copying, distribution or any action taken or omitted to be
> taken in reliance on it is prohibited and may be unlawful. Teads does not
> warrant that any attachment(s) are free from viruses or other defects and
> accept no liability for any losses resulting from infected email
> transmission. Please note that any views expressed in this email may be
> those of the originator and do not necessarily reflect those of the
> organization.
>


Re: Are configs stored as part of savepoints

2021-04-20 Thread Austin Cawley-Edwards
Hi Guarav,

Which configs are you referring to? Everything usually stored in
`flink-conf.yaml`[1]? The State Processor API[2] is also a good resource to
understand what is actually stored, and how you can access it outside of a
running job. The SavepointMetadata class[3] is another place to reference,
which seems to say that the only stored bit that could be influenced by the
config is the `maxParallelism`.

When restoring from a savepoint, you usually want to specify the
configuration as well.

Hope that helps,
Austin


[1]:
https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/config/#configuration
[2]:
https://ci.apache.org/projects/flink/flink-docs-master/docs/libs/state_processor_api/
[3]:
https://github.com/apache/flink/blob/c6997c97c575d334679915c328792b8a3067cfb5/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/metadata/SavepointMetadata.java

On Mon, Apr 19, 2021 at 8:54 PM gaurav kulkarni 
wrote:

> Hi,
>
> I was wondering if configs applied while creating a flink application are
> also stored as part of savepoint? If yes, an app is restored from a
> savepoint, does it start with the same configs?
>
> Thanks
>
>


Re: Alternatives to JDBCAppendTableSink in Flink 1.11

2021-04-20 Thread Austin Cawley-Edwards
Hey Sambaran,

I'm not too familiar with the 1.7 JDBCAppendTableSink, but to make sure I
understand what you're current solution looks like, it's something like the
following, where you're triggering a procedure on each element of a stream?

  JDBCAppendTableSink sink = JDBCAppendTableSink.builder()
.setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
.setDBUrl("jdbc:derby:memory:ebookshop")
.setQuery("EXEC YourProcedure")
.build();

SingleOutputStreamOperator stream =
env.fromElements(Row.of("a"), Row.of("b"));
sink.emitDataStream(stream);

Or something else?

Best,
Austin




On Tue, Apr 20, 2021 at 11:10 AM Sambaran  wrote:

> Hi,
>
> I am currently using JDBCAppendTableSink to execute database stored
> procedures from flink to populate data to external tables using
>  SingleOutputStreamOperator (version 1.7). Now we are trying to update to
> Flink 1.11/ later and found JDBCAppendTableSink has been removed, currently
> when looking for an alternative I could not find any suitable approach
> which would call database stored procedure. Is there any alternative
> approach to resolve this?
>
> Regards
> Sambaran
>


Re: Flink support for Kafka versions

2021-04-20 Thread Austin Cawley-Edwards
Hi Prasanna,

It looks like the Kafka 2.5.0 connector upgrade is tied to dropping support
for Scala 2.11. The best place to track that would be the ticket for Scala
2.13 support, FLINK-13414 [1], and its subtask FLINK-20845 [2].

I have listed FLINK-20845 as a blocker for FLINK-19168 for better
visibility.

Best,
Austin

[1]: https://issues.apache.org/jira/browse/FLINK-13414
[2]: https://issues.apache.org/jira/browse/FLINK-20845

On Tue, Apr 20, 2021 at 9:08 AM Prasanna kumar <
prasannakumarram...@gmail.com> wrote:

> Hi Flinksters,
>
> We are researching about if we could use the latest version of kafka
> (2.6.1 or 2.7.0)
>
> Since we are using Flink as a processor , we came across this
> https://issues.apache.org/jira/browse/FLINK-19168.
>
> It says that it does not support version 2.5.0 and beyond.
>
> That was created 8 months back , just checking if there is any effort on
> that front.
>
> Thanks,
> Prasanna
>


Re: CRD compatible with native and standalone mode

2021-04-20 Thread Austin Cawley-Edwards
Hi Gaurav,

I think the name "Native Kubernetes" is a bit misleading – this just means
that you can use the Flink CLI/ scripts to run Flink applications on
Kubernetes without using the Kubernetes APIs/ kubectl directly. What
features are you looking to use in the native mode?

I think it would be difficult to use this directly inside an operator, but
keeping "feature parity" with it is a good goal for your CRDs. Since CRDs
are essentially just a new API, the design approach should be user/
feature-first. By feature parity, I mean taking currently supported "Native
Kubernetes" functionality as the feature list for your CRDs, for example:
* Allowing Secrets to be mounted as files and environment variables [1]
* Allowing templating of the JobManager and TaskManager Pods (containers,
etc.) [2]
* Easy use of built-in plugins [3]
* etc.

Other "Native Kubernetes" "features", like RBAC and logs, will come "out of
the box" by defining Custom Resources.

Yang's resources are a great place to start, though I'd suggest defining
your API spec within the CRD explicitly[4], which will be clearer for your
users and will allow for schema validation by other tools.

Best,
Austin


[1]:
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/resource-providers/native_kubernetes/#using-secrets
[2]:
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/resource-providers/native_kubernetes/#pod-template
[3]:
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/resource-providers/native_kubernetes/#using-plugins
[4]:
https://kubernetes.io/docs/tasks/extend-kubernetes/custom-resources/custom-resource-definitions/#specifying-a-structural-schema

On Tue, Apr 20, 2021 at 2:13 AM Yang Wang  wrote:

> I think the compatibility depends on you. For example, you could have the
> same
> CustomResourceDefinition for standalone and native Flink applications.
> They could
> look like this[1].
>
> Since the CR is defined in yaml[2], native and standalone could have some
> dedicated fields.
> And you could easily parse them in your K8s operator.
>
> [1].
> https://github.com/wangyang0918/flink-native-k8s-operator/blob/master/deploy/crd.yaml
> [2].
> https://github.com/wangyang0918/flink-native-k8s-operator/blob/master/deploy/cr.yaml
>
>
> Best,
> Yang
>
> gaurav kulkarni  于2021年4月20日周二 上午8:57写道:
>
>> Hi,
>>
>> I plan to create a flink K8s operator which supports standalone mode, and
>> and switch to native mode sometime later. I was wondering what are some of
>> the approaches to ensure that CRD is compatible with both native and
>> standalone mode?
>>
>> Thanks
>>
>


Re: Async + Broadcast?

2021-04-07 Thread Austin Cawley-Edwards
Hey Alex,

I'm not sure if there is a best practice here, but what I can tell you is
that I worked on a job that did exactly what you're suggesting with a
non-async operator to create a [record, config] tuple, which was then
passed to the async stage. Our config objects were also not tiny (~500kb)
and our pipeline not huge (~1M records/day and 1GB data/ day), but this
setup worked quite well. I'd say if latency isn't your most important
metric, or if your pipeline is a similar size, the ease of async IO is
worth it.

One thing you'll have to look out for (if you haven't already) is
bootstrapping the config objects when the job starts, since the broadcast
from the polling source can happen later than recieving the first record –
we solved this by calling the polling source's service in the `open()`
method of the non-async operator and storing the initial configs in memory.

Hope that helps a bit,
Austin

On Wed, Apr 7, 2021 at 4:39 PM Alex Cruise  wrote:

> Hi folks,
>
> I have a somewhat complex Flink job that has a few async stages, and a few
> stateful stages. It currently loads its configuration on startup, and
> doesn't attempt to refresh it.
>
> Now I'm working on dynamic reconfiguration. I've written a polling source
> which sends a configuration snapshot whenever anything has changed, I've
> set up a broadcast of that source, and I'm updating the operators in the
> data (i.e. not config) stream to be BroadcastProcessFunctions. But now I've
> reached the first async operator, and I recall that async functions aren't
> allowed to be stateful.
>
> I've tried to find a best practice for this situation, without much luck.
> My best idea so far is to insert a new stage before the async one, which
> would tuple up each record with its corresponding config snapshot from the
> most recent broadcast state. This would increase the amount of data that
> needs to be serialized, and some of the configs are quite large, but would
> allow me to continue using async IO.
>
> Any suggestions?
>
> Thanks!
>
> -0xe1a
>


Re: Flink - Pod Identity

2021-04-06 Thread Austin Cawley-Edwards
Great, glad to hear it Swagat!

Did you end up using Flink 1.6 or were you able to upgrade to Flink 1.12?
Could you also link the ticket back here if you've already made it/ make
sure it is not a duplicate of FLINK-18676
<https://issues.apache.org/jira/browse/FLINK-18676>?

Best,
Austin

On Tue, Apr 6, 2021 at 12:29 PM Swagat Mishra  wrote:

> I was able to solve the issue by providing a custom version of the presto
> jar. I will create a ticket and raise a pull request so that others can
> benefit from it. I will share the details here shortly.
>
> Thanks everyone for your help and support. Especially Austin, he stands
> out due to his interest in the issue and helping to find ways to resolve it.
>
> Regards,
> Swagat
>
> On Tue, Apr 6, 2021 at 2:35 AM Austin Cawley-Edwards <
> austin.caw...@gmail.com> wrote:
>
>> And actually, I've found that the correct version of the AWS SDK *is*
>> included in Flink 1.12, which was reported and fixed in FLINK-18676
>> (see[1]). Since you said you saw this also occur in 1.12, can you share
>> more details about what you saw there?
>>
>> Best,
>> Austin
>>
>> [1]: https://issues.apache.org/jira/browse/FLINK-18676
>>
>> On Mon, Apr 5, 2021 at 4:53 PM Austin Cawley-Edwards <
>> austin.caw...@gmail.com> wrote:
>>
>>> That looks interesting! I've also found the full list of S3
>>> properties[1] for the version of presto-hive bundled with Flink 1.12 (see
>>> [2]), which includes an option for a KMS key (hive.s3.kms-key-id).
>>>
>>> (also, adding back the user list)
>>>
>>> [1]:
>>> https://prestodb.io/docs/0.187/connector/hive.html#amazon-s3-configuration
>>> [2]:
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/filesystems/s3.html#hadooppresto-s3-file-systems-plugins
>>>
>>> On Mon, Apr 5, 2021 at 4:21 PM Swagat Mishra  wrote:
>>>
>>>> Btw, there is also an option to provide a custom credential provider,
>>>> what are your thoughts on this?
>>>>
>>>> presto.s3.credentials-provider
>>>>
>>>>
>>>> On Tue, Apr 6, 2021 at 12:43 AM Austin Cawley-Edwards <
>>>> austin.caw...@gmail.com> wrote:
>>>>
>>>>> I've confirmed that for the bundled + shaded aws dependency, the only
>>>>> way to upgrade it is to build a flink-s3-fs-presto jar with the updated
>>>>> dependency. Let me know if this is feasible for you, if the KMS key
>>>>> solution doesn't work.
>>>>>
>>>>> Best,
>>>>> Austin
>>>>>
>>>>> On Mon, Apr 5, 2021 at 2:18 PM Austin Cawley-Edwards <
>>>>> austin.caw...@gmail.com> wrote:
>>>>>
>>>>>> Hi Swagat,
>>>>>>
>>>>>> I don't believe there is an explicit configuration option for the KMS
>>>>>> key – please let me know if you're able to make that work!
>>>>>>
>>>>>> Best,
>>>>>> Austin
>>>>>>
>>>>>> On Mon, Apr 5, 2021 at 1:45 PM Swagat Mishra 
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Austin,
>>>>>>>
>>>>>>> Let me know what you think on my latest email, if the approach might
>>>>>>> work, or if it is already supported and I am not using the 
>>>>>>> configurations
>>>>>>> properly.
>>>>>>>
>>>>>>> Thanks for your interest and support.
>>>>>>>
>>>>>>> Regards,
>>>>>>> Swagat
>>>>>>>
>>>>>>> On Mon, Apr 5, 2021 at 10:39 PM Austin Cawley-Edwards <
>>>>>>> austin.caw...@gmail.com> wrote:
>>>>>>>
>>>>>>>> Hi Swagat,
>>>>>>>>
>>>>>>>> It looks like Flink 1.6 bundles the 1.11.165 version of the
>>>>>>>> aws-java-sdk-core with the Presto implementation (transitively from 
>>>>>>>> Presto
>>>>>>>> 0.185[1]).
>>>>>>>> The minimum support version for the ServiceAccount authentication
>>>>>>>> approach is 1.11.704 (see [2]) which was released on Jan 9th, 2020[3], 
>>>>>>>> long
>>>>>>>> after Flink 1.6 was released. It looks like even the most recent 
>>>>>>&

Re: Flink - Pod Identity

2021-04-05 Thread Austin Cawley-Edwards
And actually, I've found that the correct version of the AWS SDK *is*
included in Flink 1.12, which was reported and fixed in FLINK-18676
(see[1]). Since you said you saw this also occur in 1.12, can you share
more details about what you saw there?

Best,
Austin

[1]: https://issues.apache.org/jira/browse/FLINK-18676

On Mon, Apr 5, 2021 at 4:53 PM Austin Cawley-Edwards <
austin.caw...@gmail.com> wrote:

> That looks interesting! I've also found the full list of S3 properties[1]
> for the version of presto-hive bundled with Flink 1.12 (see [2]), which
> includes an option for a KMS key (hive.s3.kms-key-id).
>
> (also, adding back the user list)
>
> [1]:
> https://prestodb.io/docs/0.187/connector/hive.html#amazon-s3-configuration
> [2]:
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/filesystems/s3.html#hadooppresto-s3-file-systems-plugins
>
> On Mon, Apr 5, 2021 at 4:21 PM Swagat Mishra  wrote:
>
>> Btw, there is also an option to provide a custom credential provider,
>> what are your thoughts on this?
>>
>> presto.s3.credentials-provider
>>
>>
>> On Tue, Apr 6, 2021 at 12:43 AM Austin Cawley-Edwards <
>> austin.caw...@gmail.com> wrote:
>>
>>> I've confirmed that for the bundled + shaded aws dependency, the only
>>> way to upgrade it is to build a flink-s3-fs-presto jar with the updated
>>> dependency. Let me know if this is feasible for you, if the KMS key
>>> solution doesn't work.
>>>
>>> Best,
>>> Austin
>>>
>>> On Mon, Apr 5, 2021 at 2:18 PM Austin Cawley-Edwards <
>>> austin.caw...@gmail.com> wrote:
>>>
>>>> Hi Swagat,
>>>>
>>>> I don't believe there is an explicit configuration option for the KMS
>>>> key – please let me know if you're able to make that work!
>>>>
>>>> Best,
>>>> Austin
>>>>
>>>> On Mon, Apr 5, 2021 at 1:45 PM Swagat Mishra 
>>>> wrote:
>>>>
>>>>> Hi Austin,
>>>>>
>>>>> Let me know what you think on my latest email, if the approach might
>>>>> work, or if it is already supported and I am not using the configurations
>>>>> properly.
>>>>>
>>>>> Thanks for your interest and support.
>>>>>
>>>>> Regards,
>>>>> Swagat
>>>>>
>>>>> On Mon, Apr 5, 2021 at 10:39 PM Austin Cawley-Edwards <
>>>>> austin.caw...@gmail.com> wrote:
>>>>>
>>>>>> Hi Swagat,
>>>>>>
>>>>>> It looks like Flink 1.6 bundles the 1.11.165 version of the
>>>>>> aws-java-sdk-core with the Presto implementation (transitively from 
>>>>>> Presto
>>>>>> 0.185[1]).
>>>>>> The minimum support version for the ServiceAccount authentication
>>>>>> approach is 1.11.704 (see [2]) which was released on Jan 9th, 2020[3], 
>>>>>> long
>>>>>> after Flink 1.6 was released. It looks like even the most recent Presto 
>>>>>> is
>>>>>> on a version below that, concretely 1.11.697 in the master branch[4], so 
>>>>>> I
>>>>>> don't think even upgrading Flink to 1.6+ will solve this though it looks 
>>>>>> to
>>>>>> me like the AWS dependency is managed better in more recent Flink 
>>>>>> versions.
>>>>>> I'll have more for you on that front tomorrow, after the Easter break.
>>>>>>
>>>>>> I think what you would have to do to make this authentication
>>>>>> approach work for Flink 1.6 is building a custom version of the
>>>>>> flink-s3-fs-presto jar, replacing the bundled AWS dependency with the
>>>>>> 1.11.704 version, and then shading it the same way.
>>>>>>
>>>>>> In the meantime, would you mind creating a JIRA ticket with this use
>>>>>> case? That'll give you the best insight into the status of fixing this :)
>>>>>>
>>>>>> Let me know if that makes sense,
>>>>>> Austin
>>>>>>
>>>>>> [1]:
>>>>>> https://github.com/prestodb/presto/blob/1d4ee196df4327568c0982811d8459a44f1792b9/pom.xml#L53
>>>>>> [2]:
>>>>>> https://docs.aws.amazon.com/eks/latest/userguide/iam-roles-for-service-accounts-minimum-sdk.html
>>>>>> [3]: https://github.com/aws/aws-sdk-java/re

Re: Flink - Pod Identity

2021-04-05 Thread Austin Cawley-Edwards
That looks interesting! I've also found the full list of S3 properties[1]
for the version of presto-hive bundled with Flink 1.12 (see [2]), which
includes an option for a KMS key (hive.s3.kms-key-id).

(also, adding back the user list)

[1]:
https://prestodb.io/docs/0.187/connector/hive.html#amazon-s3-configuration
[2]:
https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/filesystems/s3.html#hadooppresto-s3-file-systems-plugins

On Mon, Apr 5, 2021 at 4:21 PM Swagat Mishra  wrote:

> Btw, there is also an option to provide a custom credential provider,
> what are your thoughts on this?
>
> presto.s3.credentials-provider
>
>
> On Tue, Apr 6, 2021 at 12:43 AM Austin Cawley-Edwards <
> austin.caw...@gmail.com> wrote:
>
>> I've confirmed that for the bundled + shaded aws dependency, the only way
>> to upgrade it is to build a flink-s3-fs-presto jar with the updated
>> dependency. Let me know if this is feasible for you, if the KMS key
>> solution doesn't work.
>>
>> Best,
>> Austin
>>
>> On Mon, Apr 5, 2021 at 2:18 PM Austin Cawley-Edwards <
>> austin.caw...@gmail.com> wrote:
>>
>>> Hi Swagat,
>>>
>>> I don't believe there is an explicit configuration option for the KMS
>>> key – please let me know if you're able to make that work!
>>>
>>> Best,
>>> Austin
>>>
>>> On Mon, Apr 5, 2021 at 1:45 PM Swagat Mishra  wrote:
>>>
>>>> Hi Austin,
>>>>
>>>> Let me know what you think on my latest email, if the approach might
>>>> work, or if it is already supported and I am not using the configurations
>>>> properly.
>>>>
>>>> Thanks for your interest and support.
>>>>
>>>> Regards,
>>>> Swagat
>>>>
>>>> On Mon, Apr 5, 2021 at 10:39 PM Austin Cawley-Edwards <
>>>> austin.caw...@gmail.com> wrote:
>>>>
>>>>> Hi Swagat,
>>>>>
>>>>> It looks like Flink 1.6 bundles the 1.11.165 version of the
>>>>> aws-java-sdk-core with the Presto implementation (transitively from Presto
>>>>> 0.185[1]).
>>>>> The minimum support version for the ServiceAccount authentication
>>>>> approach is 1.11.704 (see [2]) which was released on Jan 9th, 2020[3], 
>>>>> long
>>>>> after Flink 1.6 was released. It looks like even the most recent Presto is
>>>>> on a version below that, concretely 1.11.697 in the master branch[4], so I
>>>>> don't think even upgrading Flink to 1.6+ will solve this though it looks 
>>>>> to
>>>>> me like the AWS dependency is managed better in more recent Flink 
>>>>> versions.
>>>>> I'll have more for you on that front tomorrow, after the Easter break.
>>>>>
>>>>> I think what you would have to do to make this authentication approach
>>>>> work for Flink 1.6 is building a custom version of the flink-s3-fs-presto
>>>>> jar, replacing the bundled AWS dependency with the 1.11.704 version, and
>>>>> then shading it the same way.
>>>>>
>>>>> In the meantime, would you mind creating a JIRA ticket with this use
>>>>> case? That'll give you the best insight into the status of fixing this :)
>>>>>
>>>>> Let me know if that makes sense,
>>>>> Austin
>>>>>
>>>>> [1]:
>>>>> https://github.com/prestodb/presto/blob/1d4ee196df4327568c0982811d8459a44f1792b9/pom.xml#L53
>>>>> [2]:
>>>>> https://docs.aws.amazon.com/eks/latest/userguide/iam-roles-for-service-accounts-minimum-sdk.html
>>>>> [3]: https://github.com/aws/aws-sdk-java/releases/tag/1.11.704
>>>>> [4]: https://github.com/prestodb/presto/blob/master/pom.xml#L52
>>>>>
>>>>> On Sun, Apr 4, 2021 at 3:32 AM Swagat Mishra 
>>>>> wrote:
>>>>>
>>>>>> Austin -
>>>>>>
>>>>>> In my case the set up is such that services are deployed on
>>>>>> Kubernetes with Docker, running on EKS. There is also an istio service
>>>>>> mesh. So all the services communicate and access AWS resources like S3
>>>>>> using the service account. Service account is associated with IAM roles. 
>>>>>> I
>>>>>> have verified that the service account has access to S3, by running a
>>>>>> program that connects to S3 to read a file al

Re: Flink - Pod Identity

2021-04-05 Thread Austin Cawley-Edwards
Hi Swagat,

It looks like Flink 1.6 bundles the 1.11.165 version of the
aws-java-sdk-core with the Presto implementation (transitively from Presto
0.185[1]).
The minimum support version for the ServiceAccount authentication approach
is 1.11.704 (see [2]) which was released on Jan 9th, 2020[3], long after
Flink 1.6 was released. It looks like even the most recent Presto is on a
version below that, concretely 1.11.697 in the master branch[4], so I don't
think even upgrading Flink to 1.6+ will solve this though it looks to me
like the AWS dependency is managed better in more recent Flink versions.
I'll have more for you on that front tomorrow, after the Easter break.

I think what you would have to do to make this authentication approach work
for Flink 1.6 is building a custom version of the flink-s3-fs-presto jar,
replacing the bundled AWS dependency with the 1.11.704 version, and then
shading it the same way.

In the meantime, would you mind creating a JIRA ticket with this use case?
That'll give you the best insight into the status of fixing this :)

Let me know if that makes sense,
Austin

[1]:
https://github.com/prestodb/presto/blob/1d4ee196df4327568c0982811d8459a44f1792b9/pom.xml#L53
[2]:
https://docs.aws.amazon.com/eks/latest/userguide/iam-roles-for-service-accounts-minimum-sdk.html
[3]: https://github.com/aws/aws-sdk-java/releases/tag/1.11.704
[4]: https://github.com/prestodb/presto/blob/master/pom.xml#L52

On Sun, Apr 4, 2021 at 3:32 AM Swagat Mishra  wrote:

> Austin -
>
> In my case the set up is such that services are deployed on Kubernetes
> with Docker, running on EKS. There is also an istio service mesh. So all
> the services communicate and access AWS resources like S3 using the service
> account. Service account is associated with IAM roles. I have verified that
> the service account has access to S3, by running a program that connects to
> S3 to read a file also aws client when packaged into the pod is able to
> access S3. So that means the roles and policies are good.
>
> When I am running flink, I am following the same configuration for job
> manager and task manager as provided here:
>
>
> https://ci.apache.org/projects/flink/flink-docs-stable/deployment/resource-providers/standalone/kubernetes.html
>
> The exception we are getting is -
> org.apache.flink.fs.s3presto.shaded.com.amazonaws.SDKClientException:
> Unable to load credentials from service end point.
>
> This happens in the EC2CredentialFetcher class method fetchCredentials -
> line number 66, when it tries to read resource, effectively executing
> CURL 169.254.170.2/AWS_CONTAINER_CREDENTIALS_RELATIVE_URI
>
> I am not setting the variable AWS_CONTAINER_CREDENTIALS_RELATIVE_URI
> because its not the right way to do it for us, we are on EKS. Similarly any
> of the ~/.aws/credentials file approach will also not work for us.
>
>
> Atm, I haven't tried the kuberenetes service account property you
> mentioned above. I will try and let you know how it goes.
>
> Question - do i need to provide any parameters while building the docker
> image or any configuration in the flink config to tell flink that for all
> purposes it should be using the service account and not try to get into
> the EC2CredentialFetcher class.
>
> One more thing - we were trying this on the 1.6 version of Flink and not
> the 1.12 version.
>
> Regards,
> Swagat
>
> On Sun, Apr 4, 2021 at 8:56 AM Sameer Wadkar  wrote:
>
>> Kube2Iam needs to modify IPtables to proxy calls to ec2 metadata to a
>> daemonset which runs privileged pods which maps a IP Address of the pods
>> and its associated service account to make STS calls and return temporary
>> AWS credentials. Your pod “thinks” the ec2 metadata url works locally like
>> in an ec2 instance.
>>
>> I have found that mutating webhooks are easier to deploy (when you have
>> no control over the Kubernetes environment - say you cannot change iptables
>> or run privileged pods). These can configure the ~/.aws/credentials file.
>> The webhook can make the STS call for the service account to role mapping.
>> A side car container to which the main container has no access can even
>> renew credentials becoz STS returns temp credentials.
>>
>> Sent from my iPhone
>>
>> On Apr 3, 2021, at 10:29 PM, Austin Cawley-Edwards <
>> austin.caw...@gmail.com> wrote:
>>
>> 
>> If you’re just looking to attach a service account to a pod using the
>> native AWS EKS IAM mapping[1], you should be able to attach the service
>> account to the pod via the `kubernetes.service-account` configuration
>> option[2].
>>
>> Let me know if that works for you!
>>
>> Best,
>> Austin
>>
>> [1]:
>> https://docs.aws.amazon.com/e

Re: Flink - Pod Identity

2021-04-03 Thread Austin Cawley-Edwards
If you’re just looking to attach a service account to a pod using the
native AWS EKS IAM mapping[1], you should be able to attach the service
account to the pod via the `kubernetes.service-account` configuration
option[2].

Let me know if that works for you!

Best,
Austin

[1]:
https://docs.aws.amazon.com/eks/latest/userguide/iam-roles-for-service-accounts.html
[2]:
https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/config.html#kubernetes-service-account

On Sat, Apr 3, 2021 at 10:18 PM Austin Cawley-Edwards <
austin.caw...@gmail.com> wrote:

> Can you describe your setup a little bit more? And perhaps how you use
> this setup to grant access to other non-Flink pods?
>
> On Sat, Apr 3, 2021 at 2:29 PM Swagat Mishra  wrote:
>
>> Yes I looked at kube2iam, I haven't experimented with it.
>>
>> Given that the service account has access to S3, shouldn't we have a
>> simpler mechanism to connect to underlying resources based on the service
>> account authorization?
>>
>> On Sat, Apr 3, 2021, 10:10 PM Austin Cawley-Edwards <
>> austin.caw...@gmail.com> wrote:
>>
>>> Hi Swagat,
>>>
>>> I’ve used kube2iam[1] for granting AWS access to Flink pods in the past
>>> with good results. It’s all based on mapping pod annotations to AWS IAM
>>> roles. Is this something that might work for you?
>>>
>>> Best,
>>> Austin
>>>
>>> [1]: https://github.com/jtblin/kube2iam
>>>
>>> On Sat, Apr 3, 2021 at 10:40 AM Swagat Mishra 
>>> wrote:
>>>
>>>> No we are running on aws. The mechanisms supported by flink to connect
>>>> to resources like S3, need us to make changes that will impact all
>>>> services, something that we don't want to do. So providing the aws secret
>>>> key ID and passcode upfront or iam rules where it connects by executing
>>>> curl/ http calls to connect to S3 , don't work for me.
>>>>
>>>> I want to be able to connect to S3, using aws Api's and if that
>>>> connection can be leveraged by the presto library, that is what I am
>>>> looking for.
>>>>
>>>> Regards,
>>>> Swagat
>>>>
>>>>
>>>> On Sat, Apr 3, 2021, 7:37 PM Israel Ekpo  wrote:
>>>>
>>>>> Are you running on Azure Kubernetes Service.
>>>>>
>>>>> You should be able to do it because the identity can be mapped to the
>>>>> labels of the pods not necessary Flink.
>>>>>
>>>>> On Sat, Apr 3, 2021 at 6:31 AM Swagat Mishra 
>>>>> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> I think flink doesn't support pod identity, any plans tk achieve it
>>>>>> in any subsequent release.
>>>>>>
>>>>>> Regards,
>>>>>> Swagat
>>>>>>
>>>>>>
>>>>>>


Re: Flink - Pod Identity

2021-04-03 Thread Austin Cawley-Edwards
Can you describe your setup a little bit more? And perhaps how you use this
setup to grant access to other non-Flink pods?

On Sat, Apr 3, 2021 at 2:29 PM Swagat Mishra  wrote:

> Yes I looked at kube2iam, I haven't experimented with it.
>
> Given that the service account has access to S3, shouldn't we have a
> simpler mechanism to connect to underlying resources based on the service
> account authorization?
>
> On Sat, Apr 3, 2021, 10:10 PM Austin Cawley-Edwards <
> austin.caw...@gmail.com> wrote:
>
>> Hi Swagat,
>>
>> I’ve used kube2iam[1] for granting AWS access to Flink pods in the past
>> with good results. It’s all based on mapping pod annotations to AWS IAM
>> roles. Is this something that might work for you?
>>
>> Best,
>> Austin
>>
>> [1]: https://github.com/jtblin/kube2iam
>>
>> On Sat, Apr 3, 2021 at 10:40 AM Swagat Mishra  wrote:
>>
>>> No we are running on aws. The mechanisms supported by flink to connect
>>> to resources like S3, need us to make changes that will impact all
>>> services, something that we don't want to do. So providing the aws secret
>>> key ID and passcode upfront or iam rules where it connects by executing
>>> curl/ http calls to connect to S3 , don't work for me.
>>>
>>> I want to be able to connect to S3, using aws Api's and if that
>>> connection can be leveraged by the presto library, that is what I am
>>> looking for.
>>>
>>> Regards,
>>> Swagat
>>>
>>>
>>> On Sat, Apr 3, 2021, 7:37 PM Israel Ekpo  wrote:
>>>
>>>> Are you running on Azure Kubernetes Service.
>>>>
>>>> You should be able to do it because the identity can be mapped to the
>>>> labels of the pods not necessary Flink.
>>>>
>>>> On Sat, Apr 3, 2021 at 6:31 AM Swagat Mishra 
>>>> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> I think flink doesn't support pod identity, any plans tk achieve it in
>>>>> any subsequent release.
>>>>>
>>>>> Regards,
>>>>> Swagat
>>>>>
>>>>>
>>>>>


Re: Flink - Pod Identity

2021-04-03 Thread Austin Cawley-Edwards
Hi Swagat,

I’ve used kube2iam[1] for granting AWS access to Flink pods in the past
with good results. It’s all based on mapping pod annotations to AWS IAM
roles. Is this something that might work for you?

Best,
Austin

[1]: https://github.com/jtblin/kube2iam

On Sat, Apr 3, 2021 at 10:40 AM Swagat Mishra  wrote:

> No we are running on aws. The mechanisms supported by flink to connect to
> resources like S3, need us to make changes that will impact all services,
> something that we don't want to do. So providing the aws secret key ID and
> passcode upfront or iam rules where it connects by executing curl/ http
> calls to connect to S3 , don't work for me.
>
> I want to be able to connect to S3, using aws Api's and if that connection
> can be leveraged by the presto library, that is what I am looking for.
>
> Regards,
> Swagat
>
>
> On Sat, Apr 3, 2021, 7:37 PM Israel Ekpo  wrote:
>
>> Are you running on Azure Kubernetes Service.
>>
>> You should be able to do it because the identity can be mapped to the
>> labels of the pods not necessary Flink.
>>
>> On Sat, Apr 3, 2021 at 6:31 AM Swagat Mishra  wrote:
>>
>>> Hi,
>>>
>>> I think flink doesn't support pod identity, any plans tk achieve it in
>>> any subsequent release.
>>>
>>> Regards,
>>> Swagat
>>>
>>>
>>>


Re: Print on screen DataStream content

2020-11-23 Thread Austin Cawley-Edwards
Hey Simone,

I'd suggest trying out the `DataStream#print()` function to start, but
there are a few other easy-to-integrate sinks for testing that you can
check out in the docs here[1]

Best,
Austin

[1]:
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/datastream_api.html#data-sinks

On Mon, Nov 23, 2020 at 3:56 PM Simone Cavallarin 
wrote:

> Hi All,
>
> On my code I have a DataStream that I would like to access. I need to
> understand what I'm getting for each transformation to check if the data
> that I'm working on make sense. How can I print into the console or get a
> file (csv, txt) for the variables: "stream", "enriched" and "result"?
>
> I have tried different way but no way to get the data.
>
> Thanks!
>
>
> *FlinkKafkaConsumer kafkaData =*
> *new FlinkKafkaConsumer("CorID_1", new
> EventDeserializationSchema(), p);*
> *WatermarkStrategy wmStrategy =*
> *WatermarkStrategy*
> *.forMonotonousTimestamps()*
> *.withIdleness(Duration.ofMinutes(1))*
> *.withTimestampAssigner((event, timestamp) -> {*
> *return event.get_Time();*
> *});*
> *DataStream stream = env.addSource(*
> *kafkaData.assignTimestampsAndWatermarks(wmStrategy));*
>
> *DataStream> enriched = stream*
> *.keyBy((Event KafkaMSG) -> KafkaMSG.CorrID)*
> *.map(new StatefulSessionCalculator());*
>
> *WindowedStream, String, TimeWindow> result =
> enriched*
> *.keyBy(new MyKeySelector())*
> *.window(EventTimeSessionWindows.withDynamicGap(new
> DynamicSessionWindows()));*
>


Re: Un-ignored Parsing Exceptions in the CsvFormat

2020-10-22 Thread Austin Cawley-Edwards
Hey Roman,

Sorry to miss this -- thanks for the confirmation and making the ticket.
I'm happy to propose a fix if someone is able to assign the ticket to me.

Best,
Austin

On Mon, Oct 19, 2020 at 6:56 AM Khachatryan Roman <
khachatryan.ro...@gmail.com> wrote:

> Hey Austin,
>
> I think you are right. The problematic row contains an odd number of
> delimiters in which case skipFields will return -1, which in turn leads to
> an exception.
>
> I opened a bug ticket https://issues.apache.org/jira/browse/FLINK-19711
> to fix it.
>
> Regards,
> Roman
>
>
> On Fri, Oct 16, 2020 at 8:32 PM Austin Cawley-Edwards <
> austin.caw...@gmail.com> wrote:
>
>> Hey all,
>>
>> I'm ingesting CSV files with Flink 1.10.2 using SQL and the CSV
>> Format[1].
>>
>> Even with the `ignoreParseErrors()` set, the job fails when it encounters
>> some types of malformed rows. The root cause is indeed a `ParseException`,
>> so I'm wondering if there's anything more I need to do to ignore these
>> rows. Each field in the schema is a STRING.
>>
>>
>> I've configured the CSV format and table like so:
>>
>> tableEnv.connect(
>> new FileSystem()
>> .path(path)
>> )
>> .withFormat(
>> new Csv()
>> .quoteCharacter('"')
>> .ignoreParseErrors()
>> )
>> .withSchema(schema)
>> .inAppendMode()
>>
>>
>> Shot in the dark, but should `RowCsvInputFormat#parseRecord` have a check
>> to `isLenient()` if there is an unexpected parser position?[2]
>>
>> Example error:
>>
>> 2020-10-16 12:50:18
>> org.apache.flink.streaming.runtime.tasks.AsynchronousException: Caught
>> exception when processing split: null
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncExceptionHandler.handleAsyncException(StreamTask.java:1098)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(StreamTask.java:1066)
>> at
>> org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:351)
>> Caused by: org.apache.flink.api.common.io.ParseException: Unexpected
>> parser position for column 1 of row '",
>> https://www.facebook.com/GoingOn-Networks-154758847925524/,https://www.linkedin.com/company/goingon,,
>> ""company,'
>> at
>> org.apache.flink.api.java.io.RowCsvInputFormat.parseRecord(RowCsvInputFormat.java:204)
>> at
>> org.apache.flink.api.java.io.CsvInputFormat.readRecord(CsvInputFormat.java:111)
>> at
>> org.apache.flink.api.common.io.DelimitedInputFormat.nextRecord(DelimitedInputFormat.java:520)
>> at
>> org.apache.flink.api.java.io.CsvInputFormat.nextRecord(CsvInputFormat.java:79)
>> at
>> org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:329)
>>
>>
>> Thanks,
>> Austin
>>
>> [1]:
>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#csv-format
>> [2]:
>> https://github.com/apache/flink/blob/c09e959cf55c549ca4a3673f72deeb12a34e12f5/flink-java/src/main/java/org/apache/flink/api/java/io/RowCsvInputFormat.java#L203-L206
>>
>


Re: Flink Kubernetes / Helm

2020-10-16 Thread Austin Cawley-Edwards
We use the Ververica Platform and have built an operator for it here[1] :)
and use Helm with it as well.

Best,
Austin

[1]: https://github.com/fintechstudios/ververica-platform-k8s-operator


On Fri, Oct 16, 2020 at 3:12 PM Dan Hill  wrote:

> What libraries do people use for running Flink on Kubernetes?
>
> Some links I've found:
>
>- Flink official documentation
>
> 
>- Ververica documentation
>
>- https://github.com/lightbend/flink-operator
>- https://github.com/riskfocus/helm-charts-public
>- https://github.com/docker-flink/examples
>- different K8 proposal
>
> 
>
>


Un-ignored Parsing Exceptions in the CsvFormat

2020-10-16 Thread Austin Cawley-Edwards
Hey all,

I'm ingesting CSV files with Flink 1.10.2 using SQL and the CSV Format[1].

Even with the `ignoreParseErrors()` set, the job fails when it encounters
some types of malformed rows. The root cause is indeed a `ParseException`,
so I'm wondering if there's anything more I need to do to ignore these
rows. Each field in the schema is a STRING.


I've configured the CSV format and table like so:

tableEnv.connect(
new FileSystem()
.path(path)
)
.withFormat(
new Csv()
.quoteCharacter('"')
.ignoreParseErrors()
)
.withSchema(schema)
.inAppendMode()


Shot in the dark, but should `RowCsvInputFormat#parseRecord` have a check
to `isLenient()` if there is an unexpected parser position?[2]

Example error:

2020-10-16 12:50:18
org.apache.flink.streaming.runtime.tasks.AsynchronousException: Caught
exception when processing split: null
at
org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncExceptionHandler.handleAsyncException(StreamTask.java:1098)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(StreamTask.java:1066)
at
org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:351)
Caused by: org.apache.flink.api.common.io.ParseException: Unexpected parser
position for column 1 of row '",
https://www.facebook.com/GoingOn-Networks-154758847925524/,https://www.linkedin.com/company/goingon,,
""company,'
at
org.apache.flink.api.java.io.RowCsvInputFormat.parseRecord(RowCsvInputFormat.java:204)
at
org.apache.flink.api.java.io.CsvInputFormat.readRecord(CsvInputFormat.java:111)
at
org.apache.flink.api.common.io.DelimitedInputFormat.nextRecord(DelimitedInputFormat.java:520)
at
org.apache.flink.api.java.io.CsvInputFormat.nextRecord(CsvInputFormat.java:79)
at
org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:329)


Thanks,
Austin

[1]:
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#csv-format
[2]:
https://github.com/apache/flink/blob/c09e959cf55c549ca4a3673f72deeb12a34e12f5/flink-java/src/main/java/org/apache/flink/api/java/io/RowCsvInputFormat.java#L203-L206


Re: Streaming SQL Job Switches to FINISHED before all records processed

2020-10-09 Thread Austin Cawley-Edwards
Hey Timo,

Hah, that's a fair point about using time. I guess I should update my
statement to "as a user, I don't want to worry about *manually managing*
time".

That's a nice suggestion with the KeyedProcessFunction and no windows, I'll
give that a shot. If I don't want to emit any duplicates, I'd have to
essentially buffer the "last seen duplicate" for each key in that process
function until the MAX_WATERMARK is sent through though, right? I could
emit early results if I assume the max number of possible duplicates, but
for records with no duplicates, I'd have to wait until no more records are
coming -- am I missing something?

Thanks so much,
Austin

On Fri, Oct 9, 2020 at 10:44 AM Timo Walther  wrote:

> Hi Austin,
>
> if you don't want to worry about time at all, you should probably not
> use any windows because those are a time-based operation.
>
> A solution that would look a bit nicer could be to use a pure
> KeyedProcessFunction and implement the deduplication logic without
> reusing windows. In ProcessFunctions you can register an event-time
> timer. The timer would be triggered by the MAX_WATERMARK when the
> pipeline shuts down even without having a timestamp assigned in the
> StreamRecord. Watermark will leave SQL also without a time attribute as
> far as I know.
>
> Regards,
> Timo
>
>
> On 08.10.20 17:38, Austin Cawley-Edwards wrote:
> > Hey Timo,
> >
> > Sorry for the delayed reply. I'm using the Blink planner and using
> > non-time-based joins. I've got an example repo here that shows my query/
> > setup [1]. It's got the manual timestamp assignment commented out for
> > now, but that does indeed solve the issue.
> >
> > I'd really like to not worry about time at all in this job hah -- I
> > started just using processing time, but Till pointed out that processing
> > time timers won't be fired when input ends, which is the case for this
> > streaming job processing CSV files, so I should be using event time.
> > With that suggestion, I switched to ingestion time, where I then
> > discovered the issue converting from SQL to data stream.
> >
> > IMO, as a user manually assigning timestamps on conversion makes sense
> > if you're using event time and already handling time attributes
> > yourself, but for ingestion time you really don't want to think about
> > time at all, which is why it might make sense to propigate the
> > automatically assigned timestamps in that case. Though not sure how
> > difficult that would be. Let me know what you think!
> >
> >
> > Best + thanks again,
> > Austin
> >
> > [1]: https://github.com/austince/flink-1.10-sql-windowing-error
> >
> > On Mon, Oct 5, 2020 at 4:24 AM Timo Walther  > <mailto:twal...@apache.org>> wrote:
> >
> > Btw which planner are you using?
> >
> > Regards,
> > Timo
> >
> > On 05.10.20 10:23, Timo Walther wrote:
> >  > Hi Austin,
> >  >
> >  > could you share some details of your SQL query with us? The
> > reason why
> >  > I'm asking is because I guess that the rowtime field is not
> inserted
> >  > into the `StreamRecord` of DataStream API. The rowtime field is
> only
> >  > inserted if there is a single field in the output of the query
> > that is a
> >  > valid "time attribute".
> >  >
> >  > Esp. after non-time-based joins and aggregations, time attributes
> > loose
> >  > there properties and become regular timestamps. Because timestamp
> > and
> >  > watermarks might have diverged.
> >  >
> >  > If you know what you're doing, you can also assign the timestamp
> >  > manually after `toRetractStream.assignTimestampAndWatermarks` and
> >  > reinsert the field into the stream record. But before you do
> that, I
> >  > think it is better to share more information about the query with
> us.
> >  >
> >  > I hope this helps.
> >      >
> >  > Regards,
> >  > Timo
> >  >
> >  >
> >  >
> >  > On 05.10.20 09:25, Till Rohrmann wrote:
> >  >> Hi Austin,
> >  >>
> >  >> thanks for offering to help. First I would suggest asking Timo
> > whether
> >  >> this is an aspect which is still missing or whether we
> > overlooked it.
> >  >> Based on that we can then take the next steps.
> >  >>
> >  >> Cheers,
> >  >> Till
>

Re: Issue with Flink - unrelated executeSql causes other executeSqls to fail.

2020-10-08 Thread Austin Cawley-Edwards
Can't comment on the SQL issues, but here's our exact setup for Bazel and
Junit5 w/ the resource files approach:
https://github.com/fintechstudios/vp-operator-demo-ff-virtual-2020/tree/master/tools/junit

Best,
Austin

On Thu, Oct 8, 2020 at 2:41 AM Dan Hill  wrote:

> I was able to get finer grained logs showing.  I switched from
> -Dlog4j.configuration to -Dlog4j.configurationFile and it worked.  With my
> larger test case, I was hitting a silent log4j error.  When I created a
> small test case to just test logging, I received a log4j error.
>
> Here is a tar
> <https://drive.google.com/file/d/1b6vJR_hfaRZwA28jKNlUBxDso7YiTIbk/view?usp=sharing>
> with the info logs for:
> - (test-nojoin.log) this one works as expected
> - (test-join.log) this does not work as expected
>
> I don't see an obvious issue just by scanning the logs.  I'll take a
> deeper in 9 hours.
>
>
>
>
> On Wed, Oct 7, 2020 at 8:28 PM Dan Hill  wrote:
>
>> Switching to junit4 did not help.
>>
>> If I make a request to the url returned from
>> MiniClusterWithClientResource.flinkCluster.getClusterClient().getWebInterfaceURL(),
>> I get
>> {"errors":["Not found."]}.  I'm not sure if this is intentional.
>>
>>
>>
>>
>> On Tue, Oct 6, 2020 at 4:16 PM Dan Hill  wrote:
>>
>>> @Aljoscha - Thanks!  That setup lets fixing the hacky absolute path
>>> reference.  However, the actual log calls are not printing to the console.
>>> Only errors appear in my terminal window and the test logs.  Maybe console
>>> logger does not work for this junit setup.  I'll see if the file version
>>> works.
>>>
>>> On Tue, Oct 6, 2020 at 4:08 PM Austin Cawley-Edwards <
>>> austin.caw...@gmail.com> wrote:
>>>
>>>> What Aljoscha suggested is what works for us!
>>>>
>>>> On Tue, Oct 6, 2020 at 6:58 PM Aljoscha Krettek 
>>>> wrote:
>>>>
>>>>> Hi Dan,
>>>>>
>>>>> to make the log properties file work this should do it: assuming the
>>>>> log4j.properties is in //src/main/resources. You will need a
>>>>> BUILD.bazel
>>>>> in that directory that has only the line
>>>>> "exports_files(["log4j.properties"]). Then you can reference it in
>>>>> your
>>>>> test via "resources = ["//src/main/resources:log4j.properties"],". Of
>>>>> course you also need to have the right log4j deps (or slf4j if you're
>>>>> using that)
>>>>>
>>>>> Hope that helps!
>>>>>
>>>>> Aljoscha
>>>>>
>>>>> On 07.10.20 00:41, Dan Hill wrote:
>>>>> > I'm trying to use Table API for my job.  I'll soon try to get a test
>>>>> > working for my stream job.
>>>>> > - I'll parameterize so I can have different sources and sink for
>>>>> tests.
>>>>> > How should I mock out a Kafka source?  For my test, I was planning on
>>>>> > changing the input to be from a temp file (instead of Kafka).
>>>>> > - What's a good way of forcing a watermark using the Table API?
>>>>> >
>>>>> >
>>>>> > On Tue, Oct 6, 2020 at 3:35 PM Dan Hill 
>>>>> wrote:
>>>>> >
>>>>> >> Thanks!
>>>>> >>
>>>>> >> Great to know.  I copied this junit5-jupiter-starter-bazel
>>>>> >> <
>>>>> https://github.com/junit-team/junit5-samples/tree/main/junit5-jupiter-starter-bazel>
>>>>> rule
>>>>> >> into my repository (I don't think junit5 is supported directly with
>>>>> >> java_test yet).  I tried a few ways of bundling `log4j.properties`
>>>>> into the
>>>>> >> jar and didn't get them to work.  My current iteration hacks the
>>>>> >> log4j.properties file as an absolute path.  My failed attempts
>>>>> would spit
>>>>> >> an error saying log4j.properties file was not found.  This route
>>>>> finds it
>>>>> >> but the log properties are not used for the java logger.
>>>>> >>
>>>>> >> Are there a better set of rules to use for junit5?
>>>>> >>
>>>>> >> # build rule
>>>>> >> java_junit5_test(
>>>>> >>  name = "tests",
>>>>> >>

Re: Streaming SQL Job Switches to FINISHED before all records processed

2020-10-08 Thread Austin Cawley-Edwards
Hey Timo,

Sorry for the delayed reply. I'm using the Blink planner and using
non-time-based joins. I've got an example repo here that shows my query/
setup [1]. It's got the manual timestamp assignment commented out for now,
but that does indeed solve the issue.

I'd really like to not worry about time at all in this job hah -- I started
just using processing time, but Till pointed out that processing time
timers won't be fired when input ends, which is the case for this streaming
job processing CSV files, so I should be using event time. With that
suggestion, I switched to ingestion time, where I then discovered the issue
converting from SQL to data stream.

IMO, as a user manually assigning timestamps on conversion makes sense if
you're using event time and already handling time attributes yourself, but
for ingestion time you really don't want to think about time at all, which
is why it might make sense to propigate the automatically assigned
timestamps in that case. Though not sure how difficult that would be. Let
me know what you think!


Best + thanks again,
Austin

[1]: https://github.com/austince/flink-1.10-sql-windowing-error

On Mon, Oct 5, 2020 at 4:24 AM Timo Walther  wrote:

> Btw which planner are you using?
>
> Regards,
> Timo
>
> On 05.10.20 10:23, Timo Walther wrote:
> > Hi Austin,
> >
> > could you share some details of your SQL query with us? The reason why
> > I'm asking is because I guess that the rowtime field is not inserted
> > into the `StreamRecord` of DataStream API. The rowtime field is only
> > inserted if there is a single field in the output of the query that is a
> > valid "time attribute".
> >
> > Esp. after non-time-based joins and aggregations, time attributes loose
> > there properties and become regular timestamps. Because timestamp and
> > watermarks might have diverged.
> >
> > If you know what you're doing, you can also assign the timestamp
> > manually after `toRetractStream.assignTimestampAndWatermarks` and
> > reinsert the field into the stream record. But before you do that, I
> > think it is better to share more information about the query with us.
> >
> > I hope this helps.
> >
> > Regards,
> > Timo
> >
> >
> >
> > On 05.10.20 09:25, Till Rohrmann wrote:
> >> Hi Austin,
> >>
> >> thanks for offering to help. First I would suggest asking Timo whether
> >> this is an aspect which is still missing or whether we overlooked it.
> >> Based on that we can then take the next steps.
> >>
> >> Cheers,
> >> Till
> >>
> >> On Fri, Oct 2, 2020 at 7:05 PM Austin Cawley-Edwards
> >> mailto:austin.caw...@gmail.com>> wrote:
> >>
> >> Hey Till,
> >>
> >> Thanks for the notes. Yeah, the docs don't mention anything specific
> >> to this case, not sure if it's an uncommon one. Assigning timestamps
> >> on conversion does solve the issue. I'm happy to take a stab at
> >> implementing the feature if it is indeed missing and you all think
> >> it'd be worthwhile. I think it's definitely a confusing aspect of
> >> working w/ the Table & DataStream APIs together.
> >>
> >> Best,
> >> Austin
> >>
> >> On Fri, Oct 2, 2020 at 6:05 AM Till Rohrmann  >> <mailto:trohrm...@apache.org>> wrote:
> >>
> >> Hi Austin,
> >>
> >> yes, it should also work for ingestion time.
> >>
> >> I am not entirely sure whether event time is preserved when
> >> converting a Table into a retract stream. It should be possible
> >> and if it is not working, then I guess it is a missing feature.
> >> But I am sure that @Timo Walther
> >> <mailto:twal...@apache.org> knows more about it. In doubt, you
> >>     could assign a new watermark generator when having obtained the
> >> retract stream.
> >>
> >> Here is also a link to some information about event time and
> >> watermarks [1]. Unfortunately, it does not state anything about
> >> the direction Table => DataStream.
> >>
> >> [1]
> >>
> >>
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/time_attributes.html
> >>
> >>
> >> Cheers,
> >> Till
> >>
> >> On Fri, Oct 2, 2020 at 12:10 AM Austin Cawley-Edwards
> >> mailto:austin.caw...@gmail.com>>
> wrote:
> >>
> >>

Re: Issue with Flink - unrelated executeSql causes other executeSqls to fail.

2020-10-06 Thread Austin Cawley-Edwards
What Aljoscha suggested is what works for us!

On Tue, Oct 6, 2020 at 6:58 PM Aljoscha Krettek  wrote:

> Hi Dan,
>
> to make the log properties file work this should do it: assuming the
> log4j.properties is in //src/main/resources. You will need a BUILD.bazel
> in that directory that has only the line
> "exports_files(["log4j.properties"]). Then you can reference it in your
> test via "resources = ["//src/main/resources:log4j.properties"],". Of
> course you also need to have the right log4j deps (or slf4j if you're
> using that)
>
> Hope that helps!
>
> Aljoscha
>
> On 07.10.20 00:41, Dan Hill wrote:
> > I'm trying to use Table API for my job.  I'll soon try to get a test
> > working for my stream job.
> > - I'll parameterize so I can have different sources and sink for tests.
> > How should I mock out a Kafka source?  For my test, I was planning on
> > changing the input to be from a temp file (instead of Kafka).
> > - What's a good way of forcing a watermark using the Table API?
> >
> >
> > On Tue, Oct 6, 2020 at 3:35 PM Dan Hill  wrote:
> >
> >> Thanks!
> >>
> >> Great to know.  I copied this junit5-jupiter-starter-bazel
> >> <
> https://github.com/junit-team/junit5-samples/tree/main/junit5-jupiter-starter-bazel>
> rule
> >> into my repository (I don't think junit5 is supported directly with
> >> java_test yet).  I tried a few ways of bundling `log4j.properties` into
> the
> >> jar and didn't get them to work.  My current iteration hacks the
> >> log4j.properties file as an absolute path.  My failed attempts would
> spit
> >> an error saying log4j.properties file was not found.  This route finds
> it
> >> but the log properties are not used for the java logger.
> >>
> >> Are there a better set of rules to use for junit5?
> >>
> >> # build rule
> >> java_junit5_test(
> >>  name = "tests",
> >>  srcs = glob(["*.java"]),
> >>  test_package = "ai.promoted.logprocessor.batch",
> >>  deps = [...],
> >>  jvm_flags =
> >>
> ["-Dlog4j.configuration=file:///Users/danhill/code/src/ai/promoted/logprocessor/batch/log4j.properties"],
> >> )
> >>
> >> # log4j.properties
> >> status = error
> >> name = Log4j2PropertiesConfig
> >> appenders = console
> >> appender.console.type = Console
> >> appender.console.name = LogToConsole
> >> appender.console.layout.type = PatternLayout
> >> appender.console.layout.pattern = %d [%t] %-5p %c - %m%n
> >> rootLogger.level = info
> >> rootLogger.appenderRefs = stdout
> >> rootLogger.appenderRef.stdout.ref = LogToConsole
> >>
> >> On Tue, Oct 6, 2020 at 3:34 PM Austin Cawley-Edwards <
> >> austin.caw...@gmail.com> wrote:
> >>
> >>> Oops, this is actually the JOIN issue thread [1]. Guess I should revise
> >>> my previous "haven't had issues" statement hah. Sorry for the spam!
> >>>
> >>> [1]:
> >>>
> apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Streaming-SQL-Job-Switches-to-FINISHED-before-all-records-processed-td38382.html
> >>>
> >>> On Tue, Oct 6, 2020 at 6:32 PM Austin Cawley-Edwards <
> >>> austin.caw...@gmail.com> wrote:
> >>>
> >>>> Unless it's related to this issue[1], which was w/ my JOIN and time
> >>>> characteristics, though not sure that applies for batch.
> >>>>
> >>>> Best,
> >>>> Austin
> >>>>
> >>>> [1]:
> >>>>
> apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-SQL-Streaming-Join-Creates-Duplicates-td37764.html
> >>>>
> >>>>
> >>>> On Tue, Oct 6, 2020 at 6:20 PM Austin Cawley-Edwards <
> >>>> austin.caw...@gmail.com> wrote:
> >>>>
> >>>>> Hey Dan,
> >>>>>
> >>>>> We use Junit5 and Bazel to run Flink SQL tests on a mini cluster and
> >>>>> haven’t had issues, though we’re only testing on streaming jobs.
> >>>>>
> >>>>> Happy to help setting up logging with that if you’d like.
> >>>>>
> >>>>> Best,
> >>>>> Austin
> >>>>>
> >>>>> On Tue, Oct 6, 2020 at 6:02 PM Dan Hill 
> wrote:
> >>>>>
> >>>>&

Re: Issue with Flink - unrelated executeSql causes other executeSqls to fail.

2020-10-06 Thread Austin Cawley-Edwards
Oops, this is actually the JOIN issue thread [1]. Guess I should revise my
previous "haven't had issues" statement hah. Sorry for the spam!

[1]:
apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Streaming-SQL-Job-Switches-to-FINISHED-before-all-records-processed-td38382.html

On Tue, Oct 6, 2020 at 6:32 PM Austin Cawley-Edwards <
austin.caw...@gmail.com> wrote:

> Unless it's related to this issue[1], which was w/ my JOIN and time
> characteristics, though not sure that applies for batch.
>
> Best,
> Austin
>
> [1]:
> apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-SQL-Streaming-Join-Creates-Duplicates-td37764.html
>
>
> On Tue, Oct 6, 2020 at 6:20 PM Austin Cawley-Edwards <
> austin.caw...@gmail.com> wrote:
>
>> Hey Dan,
>>
>> We use Junit5 and Bazel to run Flink SQL tests on a mini cluster and
>> haven’t had issues, though we’re only testing on streaming jobs.
>>
>> Happy to help setting up logging with that if you’d like.
>>
>> Best,
>> Austin
>>
>> On Tue, Oct 6, 2020 at 6:02 PM Dan Hill  wrote:
>>
>>> I don't think any of the gotchas apply to me (at the bottom of this
>>> link).
>>>
>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/testing.html#junit-rule-miniclusterwithclientresource
>>>
>>> I'm assuming for a batch job that I don't have to do anything for: "You
>>> can implement a custom parallel source function for emitting watermarks if
>>> your job uses event time timers."
>>>
>>> On Tue, Oct 6, 2020 at 2:42 PM Dan Hill  wrote:
>>>
>>>> I've tried to enable additional logging for a few hours today.  I think
>>>> something with junit5 is swallowing the logs.  I'm using Bazel and junit5.
>>>> I setup MiniClusterResourceConfiguration using a custom extension.  Are
>>>> there any known issues with Flink and junit5?  I can try switching to
>>>> junit4.
>>>>
>>>> When I've binary searched this issue, this failure happens if my query
>>>> in step 3 has a join it.  If I remove the join, I can remove step 4 and the
>>>> code still works.  I've renamed a bunch of my tables too and the problem
>>>> still exists.
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> On Tue, Oct 6, 2020, 00:42 Aljoscha Krettek 
>>>> wrote:
>>>>
>>>>> Hi Dan,
>>>>>
>>>>> there were some bugs and quirks in the MiniCluster that we recently
>>>>> fixed:
>>>>>
>>>>>   - https://issues.apache.org/jira/browse/FLINK-19123
>>>>>   - https://issues.apache.org/jira/browse/FLINK-19264
>>>>>
>>>>> But I think they are probably unrelated to your case. Could you enable
>>>>> logging and see from the logs whether the 2) and 3) jobs execute
>>>>> correctly on the MiniCluster?
>>>>>
>>>>> Best,
>>>>> Aljoscha
>>>>>
>>>>> On 06.10.20 08:08, Dan Hill wrote:
>>>>> > I'm writing a test for a batch job using
>>>>> MiniClusterResourceConfiguration.
>>>>> >
>>>>> > Here's a simple description of my working test case:
>>>>> > 1) I use TableEnvironment.executeSql(...) to create a source and
>>>>> sink table
>>>>> > using tmp filesystem directory.
>>>>> > 2) I use executeSql to insert some test data into the source tabel.
>>>>> > 3) I use executeSql to select from source and insert into sink.
>>>>> > 4) I use executeSql from the same source to a different sink.
>>>>> >
>>>>> > When I do these steps, it works.  If I remove step 4, no data gets
>>>>> written
>>>>> > to the sink.  My actual code is more complex than this (has create
>>>>> view,
>>>>> > join and more tables).  This is a simplified description but
>>>>> highlights the
>>>>> > weird error.
>>>>> >
>>>>> > Has anyone hit issues like this?  I'm assuming I have a small code
>>>>> bug in
>>>>> > my queries that's causing issues.  These queries appear to work in
>>>>> > production so I'm confused.  Are there ways of viewing failed jobs or
>>>>> > queries with MiniClusterResourceConfiguration?
>>>>> >
>>>>> > Thanks!
>>>>> > - Dan
>>>>> >
>>>>>
>>>>>


Re: Issue with Flink - unrelated executeSql causes other executeSqls to fail.

2020-10-06 Thread Austin Cawley-Edwards
Unless it's related to this issue[1], which was w/ my JOIN and time
characteristics, though not sure that applies for batch.

Best,
Austin

[1]:
apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-SQL-Streaming-Join-Creates-Duplicates-td37764.html


On Tue, Oct 6, 2020 at 6:20 PM Austin Cawley-Edwards <
austin.caw...@gmail.com> wrote:

> Hey Dan,
>
> We use Junit5 and Bazel to run Flink SQL tests on a mini cluster and
> haven’t had issues, though we’re only testing on streaming jobs.
>
> Happy to help setting up logging with that if you’d like.
>
> Best,
> Austin
>
> On Tue, Oct 6, 2020 at 6:02 PM Dan Hill  wrote:
>
>> I don't think any of the gotchas apply to me (at the bottom of this link).
>>
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/testing.html#junit-rule-miniclusterwithclientresource
>>
>> I'm assuming for a batch job that I don't have to do anything for: "You
>> can implement a custom parallel source function for emitting watermarks if
>> your job uses event time timers."
>>
>> On Tue, Oct 6, 2020 at 2:42 PM Dan Hill  wrote:
>>
>>> I've tried to enable additional logging for a few hours today.  I think
>>> something with junit5 is swallowing the logs.  I'm using Bazel and junit5.
>>> I setup MiniClusterResourceConfiguration using a custom extension.  Are
>>> there any known issues with Flink and junit5?  I can try switching to
>>> junit4.
>>>
>>> When I've binary searched this issue, this failure happens if my query
>>> in step 3 has a join it.  If I remove the join, I can remove step 4 and the
>>> code still works.  I've renamed a bunch of my tables too and the problem
>>> still exists.
>>>
>>>
>>>
>>>
>>>
>>> On Tue, Oct 6, 2020, 00:42 Aljoscha Krettek  wrote:
>>>
>>>> Hi Dan,
>>>>
>>>> there were some bugs and quirks in the MiniCluster that we recently
>>>> fixed:
>>>>
>>>>   - https://issues.apache.org/jira/browse/FLINK-19123
>>>>   - https://issues.apache.org/jira/browse/FLINK-19264
>>>>
>>>> But I think they are probably unrelated to your case. Could you enable
>>>> logging and see from the logs whether the 2) and 3) jobs execute
>>>> correctly on the MiniCluster?
>>>>
>>>> Best,
>>>> Aljoscha
>>>>
>>>> On 06.10.20 08:08, Dan Hill wrote:
>>>> > I'm writing a test for a batch job using
>>>> MiniClusterResourceConfiguration.
>>>> >
>>>> > Here's a simple description of my working test case:
>>>> > 1) I use TableEnvironment.executeSql(...) to create a source and sink
>>>> table
>>>> > using tmp filesystem directory.
>>>> > 2) I use executeSql to insert some test data into the source tabel.
>>>> > 3) I use executeSql to select from source and insert into sink.
>>>> > 4) I use executeSql from the same source to a different sink.
>>>> >
>>>> > When I do these steps, it works.  If I remove step 4, no data gets
>>>> written
>>>> > to the sink.  My actual code is more complex than this (has create
>>>> view,
>>>> > join and more tables).  This is a simplified description but
>>>> highlights the
>>>> > weird error.
>>>> >
>>>> > Has anyone hit issues like this?  I'm assuming I have a small code
>>>> bug in
>>>> > my queries that's causing issues.  These queries appear to work in
>>>> > production so I'm confused.  Are there ways of viewing failed jobs or
>>>> > queries with MiniClusterResourceConfiguration?
>>>> >
>>>> > Thanks!
>>>> > - Dan
>>>> >
>>>>
>>>>


Re: Issue with Flink - unrelated executeSql causes other executeSqls to fail.

2020-10-06 Thread Austin Cawley-Edwards
Hey Dan,

We use Junit5 and Bazel to run Flink SQL tests on a mini cluster and
haven’t had issues, though we’re only testing on streaming jobs.

Happy to help setting up logging with that if you’d like.

Best,
Austin

On Tue, Oct 6, 2020 at 6:02 PM Dan Hill  wrote:

> I don't think any of the gotchas apply to me (at the bottom of this link).
>
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/testing.html#junit-rule-miniclusterwithclientresource
>
> I'm assuming for a batch job that I don't have to do anything for: "You
> can implement a custom parallel source function for emitting watermarks if
> your job uses event time timers."
>
> On Tue, Oct 6, 2020 at 2:42 PM Dan Hill  wrote:
>
>> I've tried to enable additional logging for a few hours today.  I think
>> something with junit5 is swallowing the logs.  I'm using Bazel and junit5.
>> I setup MiniClusterResourceConfiguration using a custom extension.  Are
>> there any known issues with Flink and junit5?  I can try switching to
>> junit4.
>>
>> When I've binary searched this issue, this failure happens if my query in
>> step 3 has a join it.  If I remove the join, I can remove step 4 and the
>> code still works.  I've renamed a bunch of my tables too and the problem
>> still exists.
>>
>>
>>
>>
>>
>> On Tue, Oct 6, 2020, 00:42 Aljoscha Krettek  wrote:
>>
>>> Hi Dan,
>>>
>>> there were some bugs and quirks in the MiniCluster that we recently
>>> fixed:
>>>
>>>   - https://issues.apache.org/jira/browse/FLINK-19123
>>>   - https://issues.apache.org/jira/browse/FLINK-19264
>>>
>>> But I think they are probably unrelated to your case. Could you enable
>>> logging and see from the logs whether the 2) and 3) jobs execute
>>> correctly on the MiniCluster?
>>>
>>> Best,
>>> Aljoscha
>>>
>>> On 06.10.20 08:08, Dan Hill wrote:
>>> > I'm writing a test for a batch job using
>>> MiniClusterResourceConfiguration.
>>> >
>>> > Here's a simple description of my working test case:
>>> > 1) I use TableEnvironment.executeSql(...) to create a source and sink
>>> table
>>> > using tmp filesystem directory.
>>> > 2) I use executeSql to insert some test data into the source tabel.
>>> > 3) I use executeSql to select from source and insert into sink.
>>> > 4) I use executeSql from the same source to a different sink.
>>> >
>>> > When I do these steps, it works.  If I remove step 4, no data gets
>>> written
>>> > to the sink.  My actual code is more complex than this (has create
>>> view,
>>> > join and more tables).  This is a simplified description but
>>> highlights the
>>> > weird error.
>>> >
>>> > Has anyone hit issues like this?  I'm assuming I have a small code bug
>>> in
>>> > my queries that's causing issues.  These queries appear to work in
>>> > production so I'm confused.  Are there ways of viewing failed jobs or
>>> > queries with MiniClusterResourceConfiguration?
>>> >
>>> > Thanks!
>>> > - Dan
>>> >
>>>
>>>


Re: Streaming SQL Job Switches to FINISHED before all records processed

2020-10-02 Thread Austin Cawley-Edwards
Hey Till,

Thanks for the notes. Yeah, the docs don't mention anything specific to
this case, not sure if it's an uncommon one. Assigning timestamps on
conversion does solve the issue. I'm happy to take a stab at implementing
the feature if it is indeed missing and you all think it'd be worthwhile. I
think it's definitely a confusing aspect of working w/ the Table &
DataStream APIs together.

Best,
Austin

On Fri, Oct 2, 2020 at 6:05 AM Till Rohrmann  wrote:

> Hi Austin,
>
> yes, it should also work for ingestion time.
>
> I am not entirely sure whether event time is preserved when converting a
> Table into a retract stream. It should be possible and if it is not
> working, then I guess it is a missing feature. But I am sure that @Timo
> Walther  knows more about it. In doubt, you could
> assign a new watermark generator when having obtained the retract stream.
>
> Here is also a link to some information about event time and watermarks
> [1]. Unfortunately, it does not state anything about the direction Table =>
> DataStream.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/time_attributes.html
>
> Cheers,
> Till
>
> On Fri, Oct 2, 2020 at 12:10 AM Austin Cawley-Edwards <
> austin.caw...@gmail.com> wrote:
>
>> Hey Till,
>>
>> Just a quick question on time characteristics -- this should work for
>> IngestionTime as well, correct? Is there anything special I need to do to
>> have the CsvTableSource/ toRetractStream call to carry through the assigned
>> timestamps, or do I have to re-assign timestamps during the conversion? I'm
>> currently getting the `Record has Long.MIN_VALUE timestamp (= no timestamp
>> marker)` error, though I'm seeing timestamps being assigned if I step
>> through the AutomaticWatermarkContext.
>>
>> Thanks,
>> Austin
>>
>> On Thu, Oct 1, 2020 at 10:52 AM Austin Cawley-Edwards <
>> austin.caw...@gmail.com> wrote:
>>
>>> Perfect, thanks so much Till!
>>>
>>> On Thu, Oct 1, 2020 at 5:13 AM Till Rohrmann 
>>> wrote:
>>>
>>>> Hi Austin,
>>>>
>>>> I believe that the problem is the processing time window. Unlike for
>>>> event time where we send a MAX_WATERMARK at the end of the stream to
>>>> trigger all remaining windows, this does not happen for processing time
>>>> windows. Hence, if your stream ends and you still have an open processing
>>>> time window, then it will never get triggered.
>>>>
>>>> The problem should disappear if you use event time or if you process
>>>> unbounded streams which never end.
>>>>
>>>> Cheers,
>>>> Till
>>>>
>>>> On Thu, Oct 1, 2020 at 12:01 AM Austin Cawley-Edwards <
>>>> austin.caw...@gmail.com> wrote:
>>>>
>>>>> Hey all,
>>>>>
>>>>> Thanks for your patience. I've got a small repo that reproduces the
>>>>> issue here: https://github.com/austince/flink-1.10-sql-windowing-error
>>>>>
>>>>> Not sure what I'm doing wrong but it feels silly.
>>>>>
>>>>> Thanks so much!
>>>>> Austin
>>>>>
>>>>> On Tue, Sep 29, 2020 at 3:48 PM Austin Cawley-Edwards <
>>>>> austin.caw...@gmail.com> wrote:
>>>>>
>>>>>> Hey Till,
>>>>>>
>>>>>> Thanks for the reply -- I'll try to see if I can reproduce this in a
>>>>>> small repo and share it with you.
>>>>>>
>>>>>> Best,
>>>>>> Austin
>>>>>>
>>>>>> On Tue, Sep 29, 2020 at 3:58 AM Till Rohrmann 
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Austin,
>>>>>>>
>>>>>>> could you share with us the exact job you are running (including the
>>>>>>> custom window trigger)? This would help us to better understand your
>>>>>>> problem.
>>>>>>>
>>>>>>> I am also pulling in Klou and Timo who might help with the windowing
>>>>>>> logic and the Table to DataStream conversion.
>>>>>>>
>>>>>>> Cheers,
>>>>>>> Till
>>>>>>>
>>>>>>> On Mon, Sep 28, 2020 at 11:49 PM Austin Cawley-Edwards <
>>>>>>> austin.caw...@gmail.com> wrote:
>>>>>>>
>>>>>>>> Hey all,
>>>&

Re: Streaming SQL Job Switches to FINISHED before all records processed

2020-10-01 Thread Austin Cawley-Edwards
Hey Till,

Just a quick question on time characteristics -- this should work for
IngestionTime as well, correct? Is there anything special I need to do to
have the CsvTableSource/ toRetractStream call to carry through the assigned
timestamps, or do I have to re-assign timestamps during the conversion? I'm
currently getting the `Record has Long.MIN_VALUE timestamp (= no timestamp
marker)` error, though I'm seeing timestamps being assigned if I step
through the AutomaticWatermarkContext.

Thanks,
Austin

On Thu, Oct 1, 2020 at 10:52 AM Austin Cawley-Edwards <
austin.caw...@gmail.com> wrote:

> Perfect, thanks so much Till!
>
> On Thu, Oct 1, 2020 at 5:13 AM Till Rohrmann  wrote:
>
>> Hi Austin,
>>
>> I believe that the problem is the processing time window. Unlike for
>> event time where we send a MAX_WATERMARK at the end of the stream to
>> trigger all remaining windows, this does not happen for processing time
>> windows. Hence, if your stream ends and you still have an open processing
>> time window, then it will never get triggered.
>>
>> The problem should disappear if you use event time or if you process
>> unbounded streams which never end.
>>
>> Cheers,
>> Till
>>
>> On Thu, Oct 1, 2020 at 12:01 AM Austin Cawley-Edwards <
>> austin.caw...@gmail.com> wrote:
>>
>>> Hey all,
>>>
>>> Thanks for your patience. I've got a small repo that reproduces the
>>> issue here: https://github.com/austince/flink-1.10-sql-windowing-error
>>>
>>> Not sure what I'm doing wrong but it feels silly.
>>>
>>> Thanks so much!
>>> Austin
>>>
>>> On Tue, Sep 29, 2020 at 3:48 PM Austin Cawley-Edwards <
>>> austin.caw...@gmail.com> wrote:
>>>
>>>> Hey Till,
>>>>
>>>> Thanks for the reply -- I'll try to see if I can reproduce this in a
>>>> small repo and share it with you.
>>>>
>>>> Best,
>>>> Austin
>>>>
>>>> On Tue, Sep 29, 2020 at 3:58 AM Till Rohrmann 
>>>> wrote:
>>>>
>>>>> Hi Austin,
>>>>>
>>>>> could you share with us the exact job you are running (including the
>>>>> custom window trigger)? This would help us to better understand your
>>>>> problem.
>>>>>
>>>>> I am also pulling in Klou and Timo who might help with the windowing
>>>>> logic and the Table to DataStream conversion.
>>>>>
>>>>> Cheers,
>>>>> Till
>>>>>
>>>>> On Mon, Sep 28, 2020 at 11:49 PM Austin Cawley-Edwards <
>>>>> austin.caw...@gmail.com> wrote:
>>>>>
>>>>>> Hey all,
>>>>>>
>>>>>> I'm not sure if I've missed something in the docs, but I'm having a
>>>>>> bit of trouble with a streaming SQL job that starts w/ raw SQL queries 
>>>>>> and
>>>>>> then transitions to a more traditional streaming job. I'm on Flink 1.10
>>>>>> using the Blink planner, running locally with no checkpointing.
>>>>>>
>>>>>> The job looks roughly like:
>>>>>>
>>>>>> CSV 1 -->
>>>>>> CSV 2 -->  SQL Query to Join --> toRetractStream --> keyed time
>>>>>> window w/ process func & custom trigger --> some other ops
>>>>>> CSV 3 -->
>>>>>>
>>>>>>
>>>>>> When I remove the windowing directly after the `toRetractStream`, the
>>>>>> records make it to the "some other ops" stage, but with the windowing,
>>>>>> those operations are sometimes not sent any data. I can also get data 
>>>>>> sent
>>>>>> to the downstream operators by putting in a no-op map before the window 
>>>>>> and
>>>>>> placing some breakpoints in there to manually slow down processing.
>>>>>>
>>>>>>
>>>>>> The logs don't seem to indicate anything went wrong and generally
>>>>>> look like:
>>>>>>
>>>>>> 4819 [Source: Custom File source (1/1)] INFO
>>>>>>  org.apache.flink.runtime.taskmanager.Task  - Source: Custom File source
>>>>>> (1/1) (3578629787c777320d9ab030c004abd4) switched from RUNNING to
>>>>>> FINISHED.\4819 [Source: Custom File source (1/1)] INFO
>>>>>>  org.apache.flink.runtime.task

Re: Streaming SQL Job Switches to FINISHED before all records processed

2020-09-30 Thread Austin Cawley-Edwards
Hey all,

Thanks for your patience. I've got a small repo that reproduces the issue
here: https://github.com/austince/flink-1.10-sql-windowing-error

Not sure what I'm doing wrong but it feels silly.

Thanks so much!
Austin

On Tue, Sep 29, 2020 at 3:48 PM Austin Cawley-Edwards <
austin.caw...@gmail.com> wrote:

> Hey Till,
>
> Thanks for the reply -- I'll try to see if I can reproduce this in a small
> repo and share it with you.
>
> Best,
> Austin
>
> On Tue, Sep 29, 2020 at 3:58 AM Till Rohrmann 
> wrote:
>
>> Hi Austin,
>>
>> could you share with us the exact job you are running (including the
>> custom window trigger)? This would help us to better understand your
>> problem.
>>
>> I am also pulling in Klou and Timo who might help with the windowing
>> logic and the Table to DataStream conversion.
>>
>> Cheers,
>> Till
>>
>> On Mon, Sep 28, 2020 at 11:49 PM Austin Cawley-Edwards <
>> austin.caw...@gmail.com> wrote:
>>
>>> Hey all,
>>>
>>> I'm not sure if I've missed something in the docs, but I'm having a bit
>>> of trouble with a streaming SQL job that starts w/ raw SQL queries and then
>>> transitions to a more traditional streaming job. I'm on Flink 1.10 using
>>> the Blink planner, running locally with no checkpointing.
>>>
>>> The job looks roughly like:
>>>
>>> CSV 1 -->
>>> CSV 2 -->  SQL Query to Join --> toRetractStream --> keyed time window
>>> w/ process func & custom trigger --> some other ops
>>> CSV 3 -->
>>>
>>>
>>> When I remove the windowing directly after the `toRetractStream`, the
>>> records make it to the "some other ops" stage, but with the windowing,
>>> those operations are sometimes not sent any data. I can also get data sent
>>> to the downstream operators by putting in a no-op map before the window and
>>> placing some breakpoints in there to manually slow down processing.
>>>
>>>
>>> The logs don't seem to indicate anything went wrong and generally look
>>> like:
>>>
>>> 4819 [Source: Custom File source (1/1)] INFO
>>>  org.apache.flink.runtime.taskmanager.Task  - Source: Custom File source
>>> (1/1) (3578629787c777320d9ab030c004abd4) switched from RUNNING to
>>> FINISHED.\4819 [Source: Custom File source (1/1)] INFO
>>>  org.apache.flink.runtime.taskmanager.Task  - Freeing task resources for
>>> Source: Custom File source (1/1) (3578629787c777320d9ab030c004abd4).
>>> 4819 [Source: Custom File source (1/1)] INFO
>>>  org.apache.flink.runtime.taskmanager.Task  - Ensuring all FileSystem
>>> streams are closed for task Source: Custom File source (1/1)
>>> (3578629787c777320d9ab030c004abd4) [FINISHED]
>>> 4820 [flink-akka.actor.default-dispatcher-5] INFO
>>>  org.apache.flink.runtime.taskexecutor.TaskExecutor  - Un-registering task
>>> and sending final execution state FINISHED to JobManager for task Source:
>>> Custom File source (1/1) 3578629787c777320d9ab030c004abd4.
>>> ...
>>> 4996 [Window(TumblingProcessingTimeWindows(6), TimedCountTrigger,
>>> ProcessWindowFunction$1) (1/1)] INFO
>>>  org.apache.flink.runtime.taskmanager.Task  -
>>> Window(TumblingProcessingTimeWindows(6), TimedCountTrigger,
>>> ProcessWindowFunction$1) (1/1) (907acf9bfa2f4a9bbd13997b8b34d91f) switched
>>> from RUNNING to FINISHED.
>>> 4996 [Window(TumblingProcessingTimeWindows(6), TimedCountTrigger,
>>> ProcessWindowFunction$1) (1/1)] INFO
>>>  org.apache.flink.runtime.taskmanager.Task  - Freeing task resources for
>>> Window(TumblingProcessingTimeWindows(6), TimedCountTrigger,
>>> ProcessWindowFunction$1) (1/1) (907acf9bfa2f4a9bbd13997b8b34d91f).
>>> 4996 [Window(TumblingProcessingTimeWindows(6), TimedCountTrigger,
>>> ProcessWindowFunction$1) (1/1)] INFO
>>>  org.apache.flink.runtime.taskmanager.Task  - Ensuring all FileSystem
>>> streams are closed for task Window(TumblingProcessingTimeWindows(6),
>>> TimedCountTrigger, ProcessWindowFunction$1) (1/1)
>>> (907acf9bfa2f4a9bbd13997b8b34d91f) [FINISHED]
>>> ...
>>> rest of the shutdown
>>> ...
>>> Program execution finished
>>> Job with JobID 889b161e432c0e69a8d760bbed205d5d has finished.
>>> Job Runtime: 783 ms
>>>
>>>
>>> Is there something I'm missing in my setup? Could it be my custom window
>>> trigger? Bug? I'm stumped.
>>>
>>>
>>> Thanks,
>>> Austin
>>>
>>>
>>>


Re: Streaming SQL Job Switches to FINISHED before all records processed

2020-09-29 Thread Austin Cawley-Edwards
Hey Till,

Thanks for the reply -- I'll try to see if I can reproduce this in a small
repo and share it with you.

Best,
Austin

On Tue, Sep 29, 2020 at 3:58 AM Till Rohrmann  wrote:

> Hi Austin,
>
> could you share with us the exact job you are running (including the
> custom window trigger)? This would help us to better understand your
> problem.
>
> I am also pulling in Klou and Timo who might help with the windowing logic
> and the Table to DataStream conversion.
>
> Cheers,
> Till
>
> On Mon, Sep 28, 2020 at 11:49 PM Austin Cawley-Edwards <
> austin.caw...@gmail.com> wrote:
>
>> Hey all,
>>
>> I'm not sure if I've missed something in the docs, but I'm having a bit
>> of trouble with a streaming SQL job that starts w/ raw SQL queries and then
>> transitions to a more traditional streaming job. I'm on Flink 1.10 using
>> the Blink planner, running locally with no checkpointing.
>>
>> The job looks roughly like:
>>
>> CSV 1 -->
>> CSV 2 -->  SQL Query to Join --> toRetractStream --> keyed time window w/
>> process func & custom trigger --> some other ops
>> CSV 3 -->
>>
>>
>> When I remove the windowing directly after the `toRetractStream`, the
>> records make it to the "some other ops" stage, but with the windowing,
>> those operations are sometimes not sent any data. I can also get data sent
>> to the downstream operators by putting in a no-op map before the window and
>> placing some breakpoints in there to manually slow down processing.
>>
>>
>> The logs don't seem to indicate anything went wrong and generally look
>> like:
>>
>> 4819 [Source: Custom File source (1/1)] INFO
>>  org.apache.flink.runtime.taskmanager.Task  - Source: Custom File source
>> (1/1) (3578629787c777320d9ab030c004abd4) switched from RUNNING to
>> FINISHED.\4819 [Source: Custom File source (1/1)] INFO
>>  org.apache.flink.runtime.taskmanager.Task  - Freeing task resources for
>> Source: Custom File source (1/1) (3578629787c777320d9ab030c004abd4).
>> 4819 [Source: Custom File source (1/1)] INFO
>>  org.apache.flink.runtime.taskmanager.Task  - Ensuring all FileSystem
>> streams are closed for task Source: Custom File source (1/1)
>> (3578629787c777320d9ab030c004abd4) [FINISHED]
>> 4820 [flink-akka.actor.default-dispatcher-5] INFO
>>  org.apache.flink.runtime.taskexecutor.TaskExecutor  - Un-registering task
>> and sending final execution state FINISHED to JobManager for task Source:
>> Custom File source (1/1) 3578629787c777320d9ab030c004abd4.
>> ...
>> 4996 [Window(TumblingProcessingTimeWindows(6), TimedCountTrigger,
>> ProcessWindowFunction$1) (1/1)] INFO
>>  org.apache.flink.runtime.taskmanager.Task  -
>> Window(TumblingProcessingTimeWindows(6), TimedCountTrigger,
>> ProcessWindowFunction$1) (1/1) (907acf9bfa2f4a9bbd13997b8b34d91f) switched
>> from RUNNING to FINISHED.
>> 4996 [Window(TumblingProcessingTimeWindows(6), TimedCountTrigger,
>> ProcessWindowFunction$1) (1/1)] INFO
>>  org.apache.flink.runtime.taskmanager.Task  - Freeing task resources for
>> Window(TumblingProcessingTimeWindows(6), TimedCountTrigger,
>> ProcessWindowFunction$1) (1/1) (907acf9bfa2f4a9bbd13997b8b34d91f).
>> 4996 [Window(TumblingProcessingTimeWindows(6), TimedCountTrigger,
>> ProcessWindowFunction$1) (1/1)] INFO
>>  org.apache.flink.runtime.taskmanager.Task  - Ensuring all FileSystem
>> streams are closed for task Window(TumblingProcessingTimeWindows(6),
>> TimedCountTrigger, ProcessWindowFunction$1) (1/1)
>> (907acf9bfa2f4a9bbd13997b8b34d91f) [FINISHED]
>> ...
>> rest of the shutdown
>> ...
>> Program execution finished
>> Job with JobID 889b161e432c0e69a8d760bbed205d5d has finished.
>> Job Runtime: 783 ms
>>
>>
>> Is there something I'm missing in my setup? Could it be my custom window
>> trigger? Bug? I'm stumped.
>>
>>
>> Thanks,
>> Austin
>>
>>
>>


Streaming SQL Job Switches to FINISHED before all records processed

2020-09-28 Thread Austin Cawley-Edwards
Hey all,

I'm not sure if I've missed something in the docs, but I'm having a bit of
trouble with a streaming SQL job that starts w/ raw SQL queries and then
transitions to a more traditional streaming job. I'm on Flink 1.10 using
the Blink planner, running locally with no checkpointing.

The job looks roughly like:

CSV 1 -->
CSV 2 -->  SQL Query to Join --> toRetractStream --> keyed time window w/
process func & custom trigger --> some other ops
CSV 3 -->


When I remove the windowing directly after the `toRetractStream`, the
records make it to the "some other ops" stage, but with the windowing,
those operations are sometimes not sent any data. I can also get data sent
to the downstream operators by putting in a no-op map before the window and
placing some breakpoints in there to manually slow down processing.


The logs don't seem to indicate anything went wrong and generally look like:

4819 [Source: Custom File source (1/1)] INFO
 org.apache.flink.runtime.taskmanager.Task  - Source: Custom File source
(1/1) (3578629787c777320d9ab030c004abd4) switched from RUNNING to
FINISHED.\4819 [Source: Custom File source (1/1)] INFO
 org.apache.flink.runtime.taskmanager.Task  - Freeing task resources for
Source: Custom File source (1/1) (3578629787c777320d9ab030c004abd4).
4819 [Source: Custom File source (1/1)] INFO
 org.apache.flink.runtime.taskmanager.Task  - Ensuring all FileSystem
streams are closed for task Source: Custom File source (1/1)
(3578629787c777320d9ab030c004abd4) [FINISHED]
4820 [flink-akka.actor.default-dispatcher-5] INFO
 org.apache.flink.runtime.taskexecutor.TaskExecutor  - Un-registering task
and sending final execution state FINISHED to JobManager for task Source:
Custom File source (1/1) 3578629787c777320d9ab030c004abd4.
...
4996 [Window(TumblingProcessingTimeWindows(6), TimedCountTrigger,
ProcessWindowFunction$1) (1/1)] INFO
 org.apache.flink.runtime.taskmanager.Task  -
Window(TumblingProcessingTimeWindows(6), TimedCountTrigger,
ProcessWindowFunction$1) (1/1) (907acf9bfa2f4a9bbd13997b8b34d91f) switched
from RUNNING to FINISHED.
4996 [Window(TumblingProcessingTimeWindows(6), TimedCountTrigger,
ProcessWindowFunction$1) (1/1)] INFO
 org.apache.flink.runtime.taskmanager.Task  - Freeing task resources for
Window(TumblingProcessingTimeWindows(6), TimedCountTrigger,
ProcessWindowFunction$1) (1/1) (907acf9bfa2f4a9bbd13997b8b34d91f).
4996 [Window(TumblingProcessingTimeWindows(6), TimedCountTrigger,
ProcessWindowFunction$1) (1/1)] INFO
 org.apache.flink.runtime.taskmanager.Task  - Ensuring all FileSystem
streams are closed for task Window(TumblingProcessingTimeWindows(6),
TimedCountTrigger, ProcessWindowFunction$1) (1/1)
(907acf9bfa2f4a9bbd13997b8b34d91f) [FINISHED]
...
rest of the shutdown
...
Program execution finished
Job with JobID 889b161e432c0e69a8d760bbed205d5d has finished.
Job Runtime: 783 ms


Is there something I'm missing in my setup? Could it be my custom window
trigger? Bug? I'm stumped.


Thanks,
Austin


Re: Apache Qpid connector.

2020-09-25 Thread Austin Cawley-Edwards
Hey (Master) Parag,

I don't know anything about Apache Qpid, but from the homepage[1], it looks
like the protocol is just AMQP? Are there more specifics than that? If it
is just AMQP would the RabbitMQ connector[2] work for you?

Best,
Austin

[1]: https://qpid.apache.org/
[2]:
https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/rabbitmq.html

On Fri, Sep 25, 2020 at 11:26 AM Master Yoda 
wrote:

>  Hello,   Is there a flink source and sink from/to
> Apache Qpid. ? I searched around a bit but could not find one. Would I need
> to write one if there isn't one already ?
>
> thanks,
> Parag
>


Re: Flink SQL Streaming Join Creates Duplicates

2020-08-31 Thread Austin Cawley-Edwards
Hey Arvid,

Yes, I was able to self-answer this one. Was just confused on the
non-deterministic behavior of the FULL OUTER join statement. Thinking
through it and took a harder read through the Dynamic Tables doc section[1]
where "Result Updating" is hinted at, and the behavior makes total sense in
a streaming env.

Thanks,
Austin

[1]:
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/streaming/dynamic_tables.html

On Mon, Aug 31, 2020 at 5:16 AM Arvid Heise  wrote:

> Hi Austin,
>
> Do I assume correctly, that you self-answered your question? If not, could
> you please update your current progress?
>
> Best,
>
> Arvid
>
> On Thu, Aug 27, 2020 at 11:41 PM Austin Cawley-Edwards <
> austin.caw...@gmail.com> wrote:
>
>> Ah, I think the "Result Updating" is what got me -- INNER joins do the
>> job!
>>
>> On Thu, Aug 27, 2020 at 3:38 PM Austin Cawley-Edwards <
>> austin.caw...@gmail.com> wrote:
>>
>>> oops, the example query should actually be:
>>>
>>> SELECT table_1.a, table_1.b, table_2.c
>>> FROM table_1
>>> LEFT OUTER JOIN table_2 ON table_1.b = table_2.b;
>>>
>>> and duplicate results should actually be:
>>>
>>> Record(a = "data a 1", b = "data b 1", c = "data c 1")
>>> Record(a = "data a 1", b = "data b 1", c = null)
>>> Record(a = "data a 2", b = "data b 2", c = "data c 2")
>>> Record(a = "data a 2", b = "data b 2", c = null)
>>>
>>> On Thu, Aug 27, 2020 at 3:34 PM Austin Cawley-Edwards <
>>> austin.caw...@gmail.com> wrote:
>>>
>>>> Hey all,
>>>>
>>>> I've got a Flink 1.10 Streaming SQL job using the Blink Planner that is
>>>> reading from a few CSV files and joins some records across them into a
>>>> couple of data streams (yes, this could be a batch job won't get into why
>>>> we chose streams unless it's relevant). These joins are producing some
>>>> duplicate records, one with the joined field present and one with the
>>>> joined field as `null`, though this happens only ~25% of the time. Reading
>>>> the docs on joins[1], I thought this could be caused by too strict Idle
>>>> State Retention[2], so I increased that to min, max (15min, 24h) but that
>>>> doesn't seem to have an effect, and the problem still occurs when testing
>>>> on a subset of data that finishes processing in under a minute.
>>>>
>>>> The query roughly looks like:
>>>>
>>>> table_1 has fields a, b
>>>> table_2 has fields b, c
>>>>
>>>> SELECT table_1.a, table_1.b, table_1.c
>>>> FROM table_1
>>>> LEFT OUTER JOIN table_2 ON table_1.b = table_2.b;
>>>>
>>>> Correct result:
>>>> Record(a = "data a 1", b = "data b 1", c = "data c 1")
>>>> Record(a = "data a 2", b = "data b 2", c = "data c 2")
>>>>
>>>> Results seem to be anywhere between all possible dups and the correct
>>>> result.
>>>>
>>>> Record(a = "data a 1", b = "data b 1", c = "data c 1")
>>>> Record(a = "data a 1", b = null, c = "data c 1")
>>>> Record(a = "data a 2", b = "data b 2", c = "data c 2")
>>>> Record(a = "data a 2", b = null, c = "data c 2")
>>>>
>>>> The CSV files are registered as Flink Tables with the following:
>>>>
>>>> tableEnv.connect(
>>>> new FileSystem()
>>>> .path(path)
>>>> )
>>>> .withFormat(
>>>> new Csv()
>>>> .quoteCharacter('"')
>>>> .ignoreParseErrors()
>>>> )
>>>> .withSchema(schema)
>>>> .inAppendMode()
>>>> .createTemporaryTable(tableName);
>>>>
>>>>
>>>> I'm creating my table environment like so:
>>>>
>>>> EnvironmentSettings tableEnvSettings = EnvironmentSettings.newInstance()
>>>> .useBlinkPlanner()
>>>> .build();
>>>>
>>>> StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, 
>>>> tableEnvSettings);
>>>>
>>>> TableConfig tConfig = tEnv.getConfig();
>>>> tConfig.setIdleStateRetentionTime(Time.minutes(15), Time.hours(24));
>>>>
>>>>
>>>> Is there something I'm misconfiguring or have misunderstood the docs?
>>>>
>>>> Thanks,
>>>> Austin
>>>>
>>>> [1]:
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/queries.html#joins
>>>> [2]:
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/query_configuration.html#idle-state-retention-time
>>>>
>>>
>
> --
>
> Arvid Heise | Senior Java Developer
>
> <https://www.ververica.com/>
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
> (Toni) Cheng
>


Re: Flink SQL Streaming Join Creates Duplicates

2020-08-27 Thread Austin Cawley-Edwards
Ah, I think the "Result Updating" is what got me -- INNER joins do the job!

On Thu, Aug 27, 2020 at 3:38 PM Austin Cawley-Edwards <
austin.caw...@gmail.com> wrote:

> oops, the example query should actually be:
>
> SELECT table_1.a, table_1.b, table_2.c
> FROM table_1
> LEFT OUTER JOIN table_2 ON table_1.b = table_2.b;
>
> and duplicate results should actually be:
>
> Record(a = "data a 1", b = "data b 1", c = "data c 1")
> Record(a = "data a 1", b = "data b 1", c = null)
> Record(a = "data a 2", b = "data b 2", c = "data c 2")
> Record(a = "data a 2", b = "data b 2", c = null)
>
> On Thu, Aug 27, 2020 at 3:34 PM Austin Cawley-Edwards <
> austin.caw...@gmail.com> wrote:
>
>> Hey all,
>>
>> I've got a Flink 1.10 Streaming SQL job using the Blink Planner that is
>> reading from a few CSV files and joins some records across them into a
>> couple of data streams (yes, this could be a batch job won't get into why
>> we chose streams unless it's relevant). These joins are producing some
>> duplicate records, one with the joined field present and one with the
>> joined field as `null`, though this happens only ~25% of the time. Reading
>> the docs on joins[1], I thought this could be caused by too strict Idle
>> State Retention[2], so I increased that to min, max (15min, 24h) but that
>> doesn't seem to have an effect, and the problem still occurs when testing
>> on a subset of data that finishes processing in under a minute.
>>
>> The query roughly looks like:
>>
>> table_1 has fields a, b
>> table_2 has fields b, c
>>
>> SELECT table_1.a, table_1.b, table_1.c
>> FROM table_1
>> LEFT OUTER JOIN table_2 ON table_1.b = table_2.b;
>>
>> Correct result:
>> Record(a = "data a 1", b = "data b 1", c = "data c 1")
>> Record(a = "data a 2", b = "data b 2", c = "data c 2")
>>
>> Results seem to be anywhere between all possible dups and the correct
>> result.
>>
>> Record(a = "data a 1", b = "data b 1", c = "data c 1")
>> Record(a = "data a 1", b = null, c = "data c 1")
>> Record(a = "data a 2", b = "data b 2", c = "data c 2")
>> Record(a = "data a 2", b = null, c = "data c 2")
>>
>> The CSV files are registered as Flink Tables with the following:
>>
>> tableEnv.connect(
>> new FileSystem()
>> .path(path)
>> )
>> .withFormat(
>> new Csv()
>> .quoteCharacter('"')
>> .ignoreParseErrors()
>> )
>> .withSchema(schema)
>> .inAppendMode()
>> .createTemporaryTable(tableName);
>>
>>
>> I'm creating my table environment like so:
>>
>> EnvironmentSettings tableEnvSettings = EnvironmentSettings.newInstance()
>> .useBlinkPlanner()
>> .build();
>>
>> StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, 
>> tableEnvSettings);
>>
>> TableConfig tConfig = tEnv.getConfig();
>> tConfig.setIdleStateRetentionTime(Time.minutes(15), Time.hours(24));
>>
>>
>> Is there something I'm misconfiguring or have misunderstood the docs?
>>
>> Thanks,
>> Austin
>>
>> [1]:
>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/queries.html#joins
>> [2]:
>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/query_configuration.html#idle-state-retention-time
>>
>


Flink SQL Streaming Join Creates Duplicates

2020-08-27 Thread Austin Cawley-Edwards
Hey all,

I've got a Flink 1.10 Streaming SQL job using the Blink Planner that is
reading from a few CSV files and joins some records across them into a
couple of data streams (yes, this could be a batch job won't get into why
we chose streams unless it's relevant). These joins are producing some
duplicate records, one with the joined field present and one with the
joined field as `null`, though this happens only ~25% of the time. Reading
the docs on joins[1], I thought this could be caused by too strict Idle
State Retention[2], so I increased that to min, max (15min, 24h) but that
doesn't seem to have an effect, and the problem still occurs when testing
on a subset of data that finishes processing in under a minute.

The query roughly looks like:

table_1 has fields a, b
table_2 has fields b, c

SELECT table_1.a, table_1.b, table_1.c
FROM table_1
LEFT OUTER JOIN table_2 ON table_1.b = table_2.b;

Correct result:
Record(a = "data a 1", b = "data b 1", c = "data c 1")
Record(a = "data a 2", b = "data b 2", c = "data c 2")

Results seem to be anywhere between all possible dups and the correct
result.

Record(a = "data a 1", b = "data b 1", c = "data c 1")
Record(a = "data a 1", b = null, c = "data c 1")
Record(a = "data a 2", b = "data b 2", c = "data c 2")
Record(a = "data a 2", b = null, c = "data c 2")

The CSV files are registered as Flink Tables with the following:

tableEnv.connect(
new FileSystem()
.path(path)
)
.withFormat(
new Csv()
.quoteCharacter('"')
.ignoreParseErrors()
)
.withSchema(schema)
.inAppendMode()
.createTemporaryTable(tableName);


I'm creating my table environment like so:

EnvironmentSettings tableEnvSettings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.build();

StreamTableEnvironment tEnv = StreamTableEnvironment.create(env,
tableEnvSettings);

TableConfig tConfig = tEnv.getConfig();
tConfig.setIdleStateRetentionTime(Time.minutes(15), Time.hours(24));


Is there something I'm misconfiguring or have misunderstood the docs?

Thanks,
Austin

[1]:
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/queries.html#joins
[2]:
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/query_configuration.html#idle-state-retention-time


Re: K8s operator for dlink

2020-08-14 Thread Austin Cawley-Edwards
Hey Narasimha,

We use an operator at FinTech Studios[1] (built by me) to deploy Flink via
the Ververica Platform[2]. We've been using it in production for the past 7
months with no "show-stopping" bugs, and know some others have been
experimenting with bringing it to production as well.

Best,
Austin


[1]: https://github.com/fintechstudios/ververica-platform-k8s-operator
[2]: https://www.ververica.com/platform

On Fri, Aug 14, 2020 at 12:42 PM narasimha  wrote:

> Hi all,
>
> Checking if anyone has deployed flink using k8s operator.
>
> If so what has been the experience and well it has eased the job updates.
>
> Also was there any comparison among other available operators like
>
> • lyft
> • Google cloud
>
>
> Thanks in advance, some insights into the above will save lot of time.
>


Re: Two Queries and a Kafka Topic

2020-08-10 Thread Austin Cawley-Edwards
for each operator that needs the *data** 臘‍♂️ *

On Mon, Aug 10, 2020 at 3:58 PM Austin Cawley-Edwards <
austin.caw...@gmail.com> wrote:

> Hey all,
>
> A bit late here and I’m not sure it’s totally valuable, but we have a
> similar job where we need to query an external data source on startup
> before processing the main stream as well.
>
> Instead of making that request in the Jobmanager process when building the
> graph, we make those requests from the operator “open()” methods, and then
> store it in broadcast state.
>
> Our queries aren’t that expensive to run, so we run multiple on startup
> for each operator that needs the fat to avoid a race condition. The
> blocking until the broadcast state is received downstream sounds like a
> reasonable way to do it to.
>
> Hope that helps a bit, or at least as another example!
>
> Best,
> Austin
>
> On Mon, Aug 10, 2020 at 3:03 PM Theo Diefenthal <
> theo.diefent...@scoop-software.de> wrote:
>
>> Hi Marco,
>>
>> Sadly, I myself haven't ever used StateProcessorAPI either.
>>
>> I thought, the documentation here [1] is rather straight forward to be
>> used, but I never tried it myself.
>>
>> Also, I don' get what you mean with "tieing" DataSet and DataStream? For
>> what I understand, the StateProcessorAPI is internally upon DataSets at the
>> moment. So if you have a Streaming job (DataStream), you would run your
>> custom state migration program before even starting the streaming job using
>> the StateProcessor API and initialize your state as a  DataSet. After all,
>> initializing a state doesn't require you to have an infinite job running,
>> but only a finite one (Thus batch/DataSet). Once that program finished
>> execution, you would submit your streaming job starting from the written
>> savepoint. I guess the API works with either HDFS or filesystem, but maybe
>> someone who has already used the API might shed some more light for us
>> here.
>>
>> Best regards
>> Theo
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html
>>
>> --
>> *Von: *"Marco Villalobos" 
>> *An: *"Theo Diefenthal" 
>> *CC: *"user" 
>> *Gesendet: *Donnerstag, 6. August 2020 23:47:13
>>
>> *Betreff: *Re: Two Queries and a Kafka Topic
>>
>> I am trying to use the State Processor API.  Does that require HDFS or a
>> filesystem?
>> I wish there was a complete example that ties in both DataSet and
>> DataStream API, and the State Processor API.
>>
>> So far I have not been able to get it to work.
>>
>> Does anybody know where I can find examples of these type of techniques?
>>
>>
>>
>> On Wed, Aug 5, 2020 at 3:52 AM Theo Diefenthal <
>> theo.diefent...@scoop-software.de> wrote:
>>
>>> Hi Marco,
>>>
>>> In general, I see three solutions here you could approach:
>>>
>>> 1. Use the StateProcessorAPI: You can run a program with the
>>> stateProcessorAPI that loads the data from JDBC and stores it into a Flink
>>> SavePoint. Afterwards, you start your streaming job from that savepoint
>>> which will load its state and within find all the data from JDBC stored
>>> already.
>>> 2. Load from master, distribute with the job: When you build up your
>>> jobgraph, you could execute the JDBC queries and put the result into some
>>> Serializable class which in turn you plug in a an operator in your stream
>>> (e.g. a map stage). The class along with all the queried data will be
>>> serialized and deserialized on the taskmanagers (Usually, I use this for
>>> configuration parameters, but it might be ok in this case as well)
>>> 3. Load from TaskManager: In your map-function, if the very first event
>>> is received, you can block processing and synchronously load the data from
>>> JDBC (So each Taskmanager performs the JDBC query itself). You then keep
>>> the data to be used for all subsequent map steps.
>>>
>>> I think, option 3 is the easiest to be implemented while option 1 might
>>> be the most elegant way in my opinion.
>>>
>>> Best regards
>>> Theo
>>>
>>> --
>>> *Von: *"Marco Villalobos" 
>>> *An: *"Leonard Xu" 
>>> *CC: *"user" 
>>> *Gesendet: *Mittwoch, 5. August 2020 04:33:23
>>> *Betreff: *Re: Two Queries and a Kafka Topic
>>>
>>> Hi Leonard,
>>>
&g

Re: Two Queries and a Kafka Topic

2020-08-10 Thread Austin Cawley-Edwards
Hey all,

A bit late here and I’m not sure it’s totally valuable, but we have a
similar job where we need to query an external data source on startup
before processing the main stream as well.

Instead of making that request in the Jobmanager process when building the
graph, we make those requests from the operator “open()” methods, and then
store it in broadcast state.

Our queries aren’t that expensive to run, so we run multiple on startup for
each operator that needs the fat to avoid a race condition. The blocking
until the broadcast state is received downstream sounds like a reasonable
way to do it to.

Hope that helps a bit, or at least as another example!

Best,
Austin

On Mon, Aug 10, 2020 at 3:03 PM Theo Diefenthal <
theo.diefent...@scoop-software.de> wrote:

> Hi Marco,
>
> Sadly, I myself haven't ever used StateProcessorAPI either.
>
> I thought, the documentation here [1] is rather straight forward to be
> used, but I never tried it myself.
>
> Also, I don' get what you mean with "tieing" DataSet and DataStream? For
> what I understand, the StateProcessorAPI is internally upon DataSets at the
> moment. So if you have a Streaming job (DataStream), you would run your
> custom state migration program before even starting the streaming job using
> the StateProcessor API and initialize your state as a  DataSet. After all,
> initializing a state doesn't require you to have an infinite job running,
> but only a finite one (Thus batch/DataSet). Once that program finished
> execution, you would submit your streaming job starting from the written
> savepoint. I guess the API works with either HDFS or filesystem, but maybe
> someone who has already used the API might shed some more light for us
> here.
>
> Best regards
> Theo
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html
>
> --
> *Von: *"Marco Villalobos" 
> *An: *"Theo Diefenthal" 
> *CC: *"user" 
> *Gesendet: *Donnerstag, 6. August 2020 23:47:13
>
> *Betreff: *Re: Two Queries and a Kafka Topic
>
> I am trying to use the State Processor API.  Does that require HDFS or a
> filesystem?
> I wish there was a complete example that ties in both DataSet and
> DataStream API, and the State Processor API.
>
> So far I have not been able to get it to work.
>
> Does anybody know where I can find examples of these type of techniques?
>
>
>
> On Wed, Aug 5, 2020 at 3:52 AM Theo Diefenthal <
> theo.diefent...@scoop-software.de> wrote:
>
>> Hi Marco,
>>
>> In general, I see three solutions here you could approach:
>>
>> 1. Use the StateProcessorAPI: You can run a program with the
>> stateProcessorAPI that loads the data from JDBC and stores it into a Flink
>> SavePoint. Afterwards, you start your streaming job from that savepoint
>> which will load its state and within find all the data from JDBC stored
>> already.
>> 2. Load from master, distribute with the job: When you build up your
>> jobgraph, you could execute the JDBC queries and put the result into some
>> Serializable class which in turn you plug in a an operator in your stream
>> (e.g. a map stage). The class along with all the queried data will be
>> serialized and deserialized on the taskmanagers (Usually, I use this for
>> configuration parameters, but it might be ok in this case as well)
>> 3. Load from TaskManager: In your map-function, if the very first event
>> is received, you can block processing and synchronously load the data from
>> JDBC (So each Taskmanager performs the JDBC query itself). You then keep
>> the data to be used for all subsequent map steps.
>>
>> I think, option 3 is the easiest to be implemented while option 1 might
>> be the most elegant way in my opinion.
>>
>> Best regards
>> Theo
>>
>> --
>> *Von: *"Marco Villalobos" 
>> *An: *"Leonard Xu" 
>> *CC: *"user" 
>> *Gesendet: *Mittwoch, 5. August 2020 04:33:23
>> *Betreff: *Re: Two Queries and a Kafka Topic
>>
>> Hi Leonard,
>>
>> First, Thank you.
>>
>> I am currently trying to restrict my solution to Apache Flink 1.10
>> because its the current version supported by Amazon EMR.
>> i am not ready to change our operational environment to solve this.
>>
>> Second, I am using the DataStream API.  The Kafka Topic is not in a
>> table, it is in a DataStream.
>>
>> The SQL queries are literally from a PostgresSQL database, and only need
>> to be run exactly once in the lifetime of the job.
>>
>> I am struggling to determine where this happens.
>>
>> JDBCInputFormat seems to query the SQL table repetitively, and also
>> connecting streams and aggregating into one object is very complicated.
>>
>> Thus, I am wondering what is the right approach.
>>
>> Let me restate the parameters.
>>
>> SQL Query One = data in PostgreSQL (200K records) that is used for
>> business logic.
>> SQL Query Two = data in PostgreSQL (1000 records) that is used for
>> business logic.
>> Kafka Topic One = unlimited data-stream that uses the data-stream api 

Re: Decompressing Tar Files for Batch Processing

2020-07-07 Thread Austin Cawley-Edwards
On Tue, Jul 7, 2020 at 10:53 AM Austin Cawley-Edwards <
austin.caw...@gmail.com> wrote:

> Hey Xiaolong,
>
> Thanks for the suggestions. Just to make sure I understand, are you saying
> to run the download and decompression in the Job Manager before executing
> the job?
>
> I think another way to ensure the tar file is not downloaded more than
> once is a source w/ parallelism 1. The issue I can't get past is after
> decompressing the tarball, how would I pass those OutputStreams for each
> entry through Flink?
>
> Best,
> Austin
>
>
>
> On Tue, Jul 7, 2020 at 5:56 AM Xiaolong Wang 
> wrote:
>
>> It seems like to me that it can not be done by Flink, for code will be
>> run across all task managers. That way, there will be multiple downloads of
>> you tar file, which is unnecessary.
>>
>> However, you can do it  on your code before initializing a Flink runtime,
>> and the code will be run only on the client side.
>>
>> On Tue, Jul 7, 2020 at 7:31 AM Austin Cawley-Edwards <
>> austin.caw...@gmail.com> wrote:
>>
>>> Hey all,
>>>
>>> I need to ingest a tar file containing ~1GB of data in around 10 CSVs.
>>> The data is fairly connected and needs some cleaning, which I'd like to do
>>> with the Batch Table API + SQL (but have never used before). I've got a
>>> small prototype loading the uncompressed CSVs and applying the necessary
>>> SQL, which works well.
>>>
>>> I'm wondering about the task of downloading the tar file and unzipping
>>> it into the CSVs. Does this sound like something I can/ should do in Flink,
>>> or should I set up another process to download, unzip, and store in a
>>> filesystem to then read with the Flink Batch job? My research is leading me
>>> towards doing it separately but I'd like to do it all in the same job if
>>> there's a creative way.
>>>
>>> Thanks!
>>> Austin
>>>
>>


Re: Decompressing Tar Files for Batch Processing

2020-07-07 Thread Austin Cawley-Edwards
Hey Chesnay,

Thanks for the advice, and easy enough to do it in a separate process.

Best,
Austin

On Tue, Jul 7, 2020 at 10:29 AM Chesnay Schepler  wrote:

> I would probably go with a separate process.
>
> Downloading the file could work with Flink if it is already present in
> some supported filesystem. Decompressing the file is supported for
> selected formats (deflate, gzip, bz2, xz), but this seems to be an
> undocumented feature, so I'm not sure how usable it is in reality.
>
> On 07/07/2020 01:30, Austin Cawley-Edwards wrote:
> > Hey all,
> >
> > I need to ingest a tar file containing ~1GB of data in around 10 CSVs.
> > The data is fairly connected and needs some cleaning, which I'd like
> > to do with the Batch Table API + SQL (but have never used before).
> > I've got a small prototype loading the uncompressed CSVs and applying
> > the necessary SQL, which works well.
> >
> > I'm wondering about the task of downloading the tar file and unzipping
> > it into the CSVs. Does this sound like something I can/ should do in
> > Flink, or should I set up another process to download, unzip, and
> > store in a filesystem to then read with the Flink Batch job? My
> > research is leading me towards doing it separately but I'd like to do
> > it all in the same job if there's a creative way.
> >
> > Thanks!
> > Austin
>
>
>


Decompressing Tar Files for Batch Processing

2020-07-06 Thread Austin Cawley-Edwards
Hey all,

I need to ingest a tar file containing ~1GB of data in around 10 CSVs. The
data is fairly connected and needs some cleaning, which I'd like to do with
the Batch Table API + SQL (but have never used before). I've got a small
prototype loading the uncompressed CSVs and applying the necessary SQL,
which works well.

I'm wondering about the task of downloading the tar file and unzipping it
into the CSVs. Does this sound like something I can/ should do in Flink, or
should I set up another process to download, unzip, and store in a
filesystem to then read with the Flink Batch job? My research is leading me
towards doing it separately but I'd like to do it all in the same job if
there's a creative way.

Thanks!
Austin


Re: Does anyone have an example of Bazel working with Flink?

2020-06-18 Thread Austin Cawley-Edwards
Ok, no worries Aaron, that's still good advice :)

One last question - are you using JAR-based or image-based deployments? The
only real problem using Flink & Bazel and a JAR-based deployment from our
experience is removing the Flink libs present in the deploy environment
from the deploy jar, and still having them present when we want to do local
debugging/ integration testing. Also possible that we're just not using
Bazel entirely correctly!

Thank you!
Austin


On Thu, Jun 18, 2020 at 12:32 PM Aaron Levin  wrote:

> Hi Austin,
>
> In our experience, `rules_scala` and `rules_java` are enough for us at
> this point.
>
> It's entirely possible I'm not thinking far enough into the future,
> though, so don't take our lack of investment as a sign it's not worth
> investing in :)
>
> Best,
>
> Aaron Levin
>
> On Thu, Jun 18, 2020 at 10:27 AM Austin Cawley-Edwards <
> austin.caw...@gmail.com> wrote:
>
>> Great to hear Dan!
>>
>> @Aaron - would you/ your team be interested in a `rules_flink` project?
>> I'm still fairly new to Bazel and know enough to contribute, but could
>> definitely use guidance on design as well.
>>
>> Best,
>> Austin
>>
>> On Mon, Jun 15, 2020 at 11:07 PM Dan Hill  wrote:
>>
>>> Thanks for the replies!  I was able to use the provided answers to get a
>>> setup working (maybe not the most efficiently).  The main change I made was
>>> to switch to including the deploy jar in the image (rather than the default
>>> one).
>>>
>>> I'm open to contributing to a "rules_flink" project.  I don't know
>>> enough yet to help design it.
>>>
>>> On Sat, Jun 13, 2020 at 4:39 AM Till Rohrmann 
>>> wrote:
>>>
>>>> Hi Dan,
>>>>
>>>> if you want to run a Flink job without specifying the main class via
>>>> `bin/flink run --class org.a.b.Foobar` then you have to add a MANIFEST.MF
>>>> file to your jar under META-INF and this file needs to contain `Main-Class:
>>>> org.a.b.Foobar`.
>>>>
>>>> Cheers,
>>>> Till
>>>>
>>>> On Fri, Jun 12, 2020 at 12:30 AM Austin Cawley-Edwards <
>>>> austin.caw...@gmail.com> wrote:
>>>>
>>>>> Hey all,
>>>>>
>>>>> Adding to Aaron's response, we use Bazel to build our Flink apps.
>>>>> We've open-sourced some of our setup here[1] though a bit outdated. There
>>>>> are definitely rough edges/ probably needs a good deal of work to fit 
>>>>> other
>>>>> setups. We have written a wrapper around the `java_library` and
>>>>> `java_binary` and could do the same for `rules_scala`, though we just
>>>>> started using Bazel last November and have a lot to learn in terms of best
>>>>> practices there.
>>>>>
>>>>> If you're interested in contributing to a `rules_flink` project, I
>>>>> would be as well!
>>>>>
>>>>> Best,
>>>>> Austin
>>>>>
>>>>> [1]:
>>>>> https://github.com/fintechstudios/vp-operator-demo-ff-virtual-2020
>>>>>
>>>>> On Thu, Jun 11, 2020 at 6:14 PM Aaron Levin 
>>>>> wrote:
>>>>>
>>>>>> Hi Dan,
>>>>>>
>>>>>> We use Bazel to compile our Flink applications. We're using
>>>>>> "rules_scala" (https://github.com/bazelbuild/rules_scala) to manage
>>>>>> the dependencies and produce jars. We haven't had any issues. However, I
>>>>>> have found that sometimes it's difficult to figure out exactly what Flink
>>>>>> target or dependency my application needs.
>>>>>>
>>>>>> Unfortunately I'm not sure what issue you're seeing here. I would
>>>>>> guess either your flink application wasn't compiled into the jar
>>>>>> you're executing. If you can paste the bazel target used to generate your
>>>>>> jar and how you're launching the application, that will be helpful
>>>>>> for diagnosis.
>>>>>>
>>>>>> On Thu, Jun 11, 2020 at 5:21 PM Dan Hill 
>>>>>> wrote:
>>>>>>
>>>>>>> I took the Flink playground and I'm trying to swap out Maven for
>>>>>>> Bazel.  I got to the point where I'm hitting the following error.  I 
>>>>>>> want
>>>>>>> to diff my code with an existing, working se

Re: Does anyone have an example of Bazel working with Flink?

2020-06-18 Thread Austin Cawley-Edwards
Great to hear Dan!

@Aaron - would you/ your team be interested in a `rules_flink` project? I'm
still fairly new to Bazel and know enough to contribute, but could
definitely use guidance on design as well.

Best,
Austin

On Mon, Jun 15, 2020 at 11:07 PM Dan Hill  wrote:

> Thanks for the replies!  I was able to use the provided answers to get a
> setup working (maybe not the most efficiently).  The main change I made was
> to switch to including the deploy jar in the image (rather than the default
> one).
>
> I'm open to contributing to a "rules_flink" project.  I don't know enough
> yet to help design it.
>
> On Sat, Jun 13, 2020 at 4:39 AM Till Rohrmann 
> wrote:
>
>> Hi Dan,
>>
>> if you want to run a Flink job without specifying the main class via
>> `bin/flink run --class org.a.b.Foobar` then you have to add a MANIFEST.MF
>> file to your jar under META-INF and this file needs to contain `Main-Class:
>> org.a.b.Foobar`.
>>
>> Cheers,
>> Till
>>
>> On Fri, Jun 12, 2020 at 12:30 AM Austin Cawley-Edwards <
>> austin.caw...@gmail.com> wrote:
>>
>>> Hey all,
>>>
>>> Adding to Aaron's response, we use Bazel to build our Flink apps. We've
>>> open-sourced some of our setup here[1] though a bit outdated. There are
>>> definitely rough edges/ probably needs a good deal of work to fit other
>>> setups. We have written a wrapper around the `java_library` and
>>> `java_binary` and could do the same for `rules_scala`, though we just
>>> started using Bazel last November and have a lot to learn in terms of best
>>> practices there.
>>>
>>> If you're interested in contributing to a `rules_flink` project, I would
>>> be as well!
>>>
>>> Best,
>>> Austin
>>>
>>> [1]: https://github.com/fintechstudios/vp-operator-demo-ff-virtual-2020
>>>
>>> On Thu, Jun 11, 2020 at 6:14 PM Aaron Levin 
>>> wrote:
>>>
>>>> Hi Dan,
>>>>
>>>> We use Bazel to compile our Flink applications. We're using
>>>> "rules_scala" (https://github.com/bazelbuild/rules_scala) to manage
>>>> the dependencies and produce jars. We haven't had any issues. However, I
>>>> have found that sometimes it's difficult to figure out exactly what Flink
>>>> target or dependency my application needs.
>>>>
>>>> Unfortunately I'm not sure what issue you're seeing here. I would guess
>>>> either your flink application wasn't compiled into the jar
>>>> you're executing. If you can paste the bazel target used to generate your
>>>> jar and how you're launching the application, that will be helpful
>>>> for diagnosis.
>>>>
>>>> On Thu, Jun 11, 2020 at 5:21 PM Dan Hill  wrote:
>>>>
>>>>> I took the Flink playground and I'm trying to swap out Maven for
>>>>> Bazel.  I got to the point where I'm hitting the following error.  I want
>>>>> to diff my code with an existing, working setup.
>>>>>
>>>>> Thanks! - Dan
>>>>>
>>>>>
>>>>> client_1| 
>>>>> org.apache.flink.client.program.ProgramInvocationException:
>>>>> Neither a 'Main-Class', nor a 'program-class' entry was found in the jar
>>>>> file.
>>>>>
>>>>> client_1| at
>>>>> org.apache.flink.client.program.PackagedProgram.getEntryPointClassNameFromJar(PackagedProgram.java:596)
>>>>>
>>>>> client_1| at
>>>>> org.apache.flink.client.program.PackagedProgram.(PackagedProgram.java:190)
>>>>>
>>>>> client_1| at
>>>>> org.apache.flink.client.program.PackagedProgram.(PackagedProgram.java:128)
>>>>>
>>>>> client_1| at
>>>>> org.apache.flink.client.cli.CliFrontend.buildProgram(CliFrontend.java:862)
>>>>>
>>>>> client_1| at
>>>>> org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:204)
>>>>>
>>>>> client_1| at
>>>>> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050)
>>>>>
>>>>> client_1| at
>>>>> org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126)
>>>>>
>>>>> client_1| at
>>>>> org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
>>>>>
>>>>> client_1| at
>>>>> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)
>>>>>
>>>>


Re: Does anyone have an example of Bazel working with Flink?

2020-06-11 Thread Austin Cawley-Edwards
Hey all,

Adding to Aaron's response, we use Bazel to build our Flink apps. We've
open-sourced some of our setup here[1] though a bit outdated. There are
definitely rough edges/ probably needs a good deal of work to fit other
setups. We have written a wrapper around the `java_library` and
`java_binary` and could do the same for `rules_scala`, though we just
started using Bazel last November and have a lot to learn in terms of best
practices there.

If you're interested in contributing to a `rules_flink` project, I would be
as well!

Best,
Austin

[1]: https://github.com/fintechstudios/vp-operator-demo-ff-virtual-2020

On Thu, Jun 11, 2020 at 6:14 PM Aaron Levin  wrote:

> Hi Dan,
>
> We use Bazel to compile our Flink applications. We're using "rules_scala" (
> https://github.com/bazelbuild/rules_scala) to manage the dependencies and
> produce jars. We haven't had any issues. However, I have found that
> sometimes it's difficult to figure out exactly what Flink target or
> dependency my application needs.
>
> Unfortunately I'm not sure what issue you're seeing here. I would guess
> either your flink application wasn't compiled into the jar
> you're executing. If you can paste the bazel target used to generate your
> jar and how you're launching the application, that will be helpful
> for diagnosis.
>
> On Thu, Jun 11, 2020 at 5:21 PM Dan Hill  wrote:
>
>> I took the Flink playground and I'm trying to swap out Maven for Bazel.
>> I got to the point where I'm hitting the following error.  I want to diff
>> my code with an existing, working setup.
>>
>> Thanks! - Dan
>>
>>
>> client_1| 
>> org.apache.flink.client.program.ProgramInvocationException:
>> Neither a 'Main-Class', nor a 'program-class' entry was found in the jar
>> file.
>>
>> client_1| at
>> org.apache.flink.client.program.PackagedProgram.getEntryPointClassNameFromJar(PackagedProgram.java:596)
>>
>> client_1| at
>> org.apache.flink.client.program.PackagedProgram.(PackagedProgram.java:190)
>>
>> client_1| at
>> org.apache.flink.client.program.PackagedProgram.(PackagedProgram.java:128)
>>
>> client_1| at
>> org.apache.flink.client.cli.CliFrontend.buildProgram(CliFrontend.java:862)
>>
>> client_1| at
>> org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:204)
>>
>> client_1| at
>> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050)
>>
>> client_1| at
>> org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126)
>>
>> client_1| at
>> org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
>>
>> client_1| at
>> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)
>>
>


Re: Single stream, two sinks

2020-03-05 Thread Austin Cawley-Edwards
We have the same setup and it works quite well. One thing to take into
account is that your HTTP call may happen multiple times if you’re using
checkpointing/ fault tolerance mechanism, so it’s important that those
calls are idempotent and won’t duplicate data.

Also we’ve found that it’s important to make the max number of parallel
requests in your async operator runtime-configurable so you can control
that bottleneck.

Hope that is helpful!

Austin

On Thu, Mar 5, 2020 at 6:18 PM Gadi Katsovich 
wrote:

> Guys, thanks for the great advice. It works!
> I used HttpAsyncClient from Apache Commons.
> At first I tried to implement the async http client by implementing
> AsyncFunction. I implemented the asyncInvoke method and used
> try-with-resouce to instantiate the client (because it's
> CloseableHttpAsyncClient). That didn't work and I got "Async function call
> has timed out" exception.
> Then I followed the example in the link and had my async http client
> extend RichAsyncFunction that opens and closes the http client instance in
> the corresponding methods, all started to working.
>
> On Tue, Mar 3, 2020 at 7:13 PM John Smith  wrote:
>
>> If I understand correctly he wants the HTTP result in the DB. So I do not
>> think side output works here. The DB would have to be the sink. Also sinks
>> in Flink are the final destination.
>>
>> So it would have to be RabbitMQ -> Some Cool Business Logic Operators
>> Here > Async I/O HTTP Operator -> JDBC Sink.
>>
>> Take look here also:
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/asyncio.html
>>  <--
>> The Example shows database client, but you can easily replace that with
>> HTTP client.
>>
>> But basically...
>> 1- Get input from RabbitMQ Source.
>> 2- Do what ever type of stream computations/business logic you need.
>> 3- Use the Async I/O operator to send HTTP
>> - If HTTP 200 OK create Flink record tagged as SUCESS and what ever
>> other info you want. Maybe response body.
>> - If NOT HTTO 200 OK create Flink record tagged as FAILED plus other
>> info.
>> 4- Sink the output record from #3 to JDBC.
>>
>> On Sun, 1 Mar 2020 at 10:28, miki haiat  wrote:
>>
>>> So you have rabitmq source and http sink?
>>> If so you can use side output in order to dump your data to db.
>>>
>>>
>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/side_output.html
>>>
>>> On Sat, Feb 29, 2020, 23:01 Gadi Katsovich 
>>> wrote:
>>>
 Hi,
 I'm new to flink and am evaluating it to replace our existing streaming
 application.
 The use case I'm working on is reading messages from RabbitMQ queue,
 applying some transformation and filtering logic and sending it via HTTP to
 a 3rd party.
 A must have requirement of this flow is to to write the data that was
 sent to an SQL db, for audit and troubleshooting purposes.
 I'm currently basing my HTTP solution on a PR with needed adjustments:
 https://github.com/apache/flink/pull/5866/files
 How can I add an insertion to a DB after a successful HTTP request?
 Thank you.

>>>


Re: StreamingFileSink Not Flushing All Data

2020-03-04 Thread Austin Cawley-Edwards
Hey Kostas,

We’re a little bit off from a 1.10 update but I can certainly see if that
CompressWriterFactory might solve my use case for when we do.

If there is anything I can do to help document that feature, please let me
know.

Thanks!

Austin

On Wed, Mar 4, 2020 at 4:58 AM Kostas Kloudas  wrote:

> Hi Austin,
>
> I will have a look at your repo. In the meantime, given that [1] is
> already merged in 1.10,
> would upgrading to 1.10 and using the newly introduced
> CompressWriterFactory be an option for you?
>
> It is unfortunate that this feature was not documented.
>
> Cheers,
> Kostas
>
> [1] https://issues.apache.org/jira/browse/FLINK-13634
>
>
> On Tue, Mar 3, 2020 at 11:13 PM Austin Cawley-Edwards <
> austin.caw...@gmail.com> wrote:
>
>> Hi all,
>>
>> Thanks for the docs pointer/ FLIP Rafi, and the workaround strategy
>> Kostas -- strange though, as I wasn't using a bounded source when I first
>> ran into this issue. I have updated the example repo to use an unbounded
>> source[1], and the same file corruption problems remain.
>>
>> Anything else I could be doing wrong with the compression stream?
>>
>> Thanks again,
>> Austin
>>
>> [1]:
>> https://github.com/austince/flink-streaming-file-sink-compression/tree/unbounded
>>
>> On Tue, Mar 3, 2020 at 3:50 AM Kostas Kloudas 
>> wrote:
>>
>>> Hi Austin and Rafi,
>>>
>>> @Rafi Thanks for providing the pointers!
>>> Unfortunately there is no progress on the FLIP (or the issue).
>>>
>>> @ Austin In the meantime, what you could do --assuming that your input
>>> is bounded --  you could simply not stop the job after the whole input is
>>> processed, then wait until the output is committed, and then cancel the
>>> job. I know and I agree that this is not an elegant solution but it is a
>>> temporary workaround.
>>>
>>> Hopefully the FLIP and related issue is going to be prioritised soon.
>>>
>>> Cheers,
>>> Kostas
>>>
>>> On Tue, Mar 3, 2020 at 8:04 AM Rafi Aroch  wrote:
>>>
>>>> Hi,
>>>>
>>>> This happens because StreamingFileSink does not support a finite input
>>>> stream.
>>>> In the docs it's mentioned under "Important Considerations":
>>>>
>>>> [image: image.png]
>>>>
>>>> This behaviour often surprises users...
>>>>
>>>> There's a FLIP
>>>> <https://cwiki.apache.org/confluence/display/FLINK/FLIP-46%3A+Graceful+Shutdown+Handling+by+UDFs>
>>>>  and
>>>> an issue <https://issues.apache.org/jira/browse/FLINK-13103> about
>>>> fixing this. I'm not sure what's the status though, maybe Kostas can share.
>>>>
>>>> Thanks,
>>>> Rafi
>>>>
>>>>
>>>> On Mon, Mar 2, 2020 at 5:05 PM Austin Cawley-Edwards <
>>>> austin.caw...@gmail.com> wrote:
>>>>
>>>>> Hi Dawid and Kostas,
>>>>>
>>>>> Sorry for the late reply + thank you for the troubleshooting. I put
>>>>> together an example repo that reproduces the issue[1], because I did have
>>>>> checkpointing enabled in my previous case -- still must be doing something
>>>>> wrong with that config though.
>>>>>
>>>>> Thanks!
>>>>> Austin
>>>>>
>>>>> [1]: https://github.com/austince/flink-streaming-file-sink-compression
>>>>>
>>>>>
>>>>> On Mon, Feb 24, 2020 at 5:28 AM Kostas Kloudas 
>>>>> wrote:
>>>>>
>>>>>> Hi Austin,
>>>>>>
>>>>>> Dawid is correct in that you need to enable checkpointing for the
>>>>>> StreamingFileSink to work.
>>>>>>
>>>>>> I hope this solves the problem,
>>>>>> Kostas
>>>>>>
>>>>>> On Mon, Feb 24, 2020 at 11:08 AM Dawid Wysakowicz
>>>>>>  wrote:
>>>>>> >
>>>>>> > Hi Austing,
>>>>>> >
>>>>>> > If I am not mistaken the StreamingFileSink by default flushes on
>>>>>> checkpoints. If you don't have checkpoints enabled it might happen that 
>>>>>> not
>>>>>> all data is flushed.
>>>>>> >
>>>>>> > I think you can also adjust that behavior with:

Re: StreamingFileSink Not Flushing All Data

2020-03-03 Thread Austin Cawley-Edwards
Hi all,

Thanks for the docs pointer/ FLIP Rafi, and the workaround strategy Kostas
-- strange though, as I wasn't using a bounded source when I first ran into
this issue. I have updated the example repo to use an unbounded source[1],
and the same file corruption problems remain.

Anything else I could be doing wrong with the compression stream?

Thanks again,
Austin

[1]:
https://github.com/austince/flink-streaming-file-sink-compression/tree/unbounded

On Tue, Mar 3, 2020 at 3:50 AM Kostas Kloudas  wrote:

> Hi Austin and Rafi,
>
> @Rafi Thanks for providing the pointers!
> Unfortunately there is no progress on the FLIP (or the issue).
>
> @ Austin In the meantime, what you could do --assuming that your input is
> bounded --  you could simply not stop the job after the whole input is
> processed, then wait until the output is committed, and then cancel the
> job. I know and I agree that this is not an elegant solution but it is a
> temporary workaround.
>
> Hopefully the FLIP and related issue is going to be prioritised soon.
>
> Cheers,
> Kostas
>
> On Tue, Mar 3, 2020 at 8:04 AM Rafi Aroch  wrote:
>
>> Hi,
>>
>> This happens because StreamingFileSink does not support a finite input
>> stream.
>> In the docs it's mentioned under "Important Considerations":
>>
>> [image: image.png]
>>
>> This behaviour often surprises users...
>>
>> There's a FLIP
>> <https://cwiki.apache.org/confluence/display/FLINK/FLIP-46%3A+Graceful+Shutdown+Handling+by+UDFs>
>>  and
>> an issue <https://issues.apache.org/jira/browse/FLINK-13103> about
>> fixing this. I'm not sure what's the status though, maybe Kostas can share.
>>
>> Thanks,
>> Rafi
>>
>>
>> On Mon, Mar 2, 2020 at 5:05 PM Austin Cawley-Edwards <
>> austin.caw...@gmail.com> wrote:
>>
>>> Hi Dawid and Kostas,
>>>
>>> Sorry for the late reply + thank you for the troubleshooting. I put
>>> together an example repo that reproduces the issue[1], because I did have
>>> checkpointing enabled in my previous case -- still must be doing something
>>> wrong with that config though.
>>>
>>> Thanks!
>>> Austin
>>>
>>> [1]: https://github.com/austince/flink-streaming-file-sink-compression
>>>
>>>
>>> On Mon, Feb 24, 2020 at 5:28 AM Kostas Kloudas 
>>> wrote:
>>>
>>>> Hi Austin,
>>>>
>>>> Dawid is correct in that you need to enable checkpointing for the
>>>> StreamingFileSink to work.
>>>>
>>>> I hope this solves the problem,
>>>> Kostas
>>>>
>>>> On Mon, Feb 24, 2020 at 11:08 AM Dawid Wysakowicz
>>>>  wrote:
>>>> >
>>>> > Hi Austing,
>>>> >
>>>> > If I am not mistaken the StreamingFileSink by default flushes on
>>>> checkpoints. If you don't have checkpoints enabled it might happen that not
>>>> all data is flushed.
>>>> >
>>>> > I think you can also adjust that behavior with:
>>>> >
>>>> > forBulkFormat(...)
>>>> >
>>>> > .withRollingPolicy(/* your custom logic */)
>>>> >
>>>> > I also cc Kostas who should be able to correct me if I am wrong.
>>>> >
>>>> > Best,
>>>> >
>>>> > Dawid
>>>> >
>>>> > On 22/02/2020 01:59, Austin Cawley-Edwards wrote:
>>>> >
>>>> > Hi there,
>>>> >
>>>> > Using Flink 1.9.1, trying to write .tgz files with the
>>>> StreamingFileSink#BulkWriter. It seems like flushing the output stream
>>>> doesn't flush all the data written. I've verified I can create valid files
>>>> using the same APIs and data on there own, so thinking it must be something
>>>> I'm doing wrong with the bulk format. I'm writing to the local filesystem,
>>>> with the `file://` protocol.
>>>> >
>>>> > For Tar/ Gzipping, I'm using the Apache Commons Compression library,
>>>> version 1.20.
>>>> >
>>>> > Here's a runnable example of the issue:
>>>> >
>>>> > import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
>>>> > import
>>>> org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
>>>> > import
>>>> org.apache.commons.compress.compressors.gzip.GzipCompressorOutputStream;
>>>> > import org.apache.fl

Re: StreamingFileSink Not Flushing All Data

2020-03-02 Thread Austin Cawley-Edwards
Hi Dawid and Kostas,

Sorry for the late reply + thank you for the troubleshooting. I put
together an example repo that reproduces the issue[1], because I did have
checkpointing enabled in my previous case -- still must be doing something
wrong with that config though.

Thanks!
Austin

[1]: https://github.com/austince/flink-streaming-file-sink-compression


On Mon, Feb 24, 2020 at 5:28 AM Kostas Kloudas  wrote:

> Hi Austin,
>
> Dawid is correct in that you need to enable checkpointing for the
> StreamingFileSink to work.
>
> I hope this solves the problem,
> Kostas
>
> On Mon, Feb 24, 2020 at 11:08 AM Dawid Wysakowicz
>  wrote:
> >
> > Hi Austing,
> >
> > If I am not mistaken the StreamingFileSink by default flushes on
> checkpoints. If you don't have checkpoints enabled it might happen that not
> all data is flushed.
> >
> > I think you can also adjust that behavior with:
> >
> > forBulkFormat(...)
> >
> > .withRollingPolicy(/* your custom logic */)
> >
> > I also cc Kostas who should be able to correct me if I am wrong.
> >
> > Best,
> >
> > Dawid
> >
> > On 22/02/2020 01:59, Austin Cawley-Edwards wrote:
> >
> > Hi there,
> >
> > Using Flink 1.9.1, trying to write .tgz files with the
> StreamingFileSink#BulkWriter. It seems like flushing the output stream
> doesn't flush all the data written. I've verified I can create valid files
> using the same APIs and data on there own, so thinking it must be something
> I'm doing wrong with the bulk format. I'm writing to the local filesystem,
> with the `file://` protocol.
> >
> > For Tar/ Gzipping, I'm using the Apache Commons Compression library,
> version 1.20.
> >
> > Here's a runnable example of the issue:
> >
> > import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
> > import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
> > import
> org.apache.commons.compress.compressors.gzip.GzipCompressorOutputStream;
> > import org.apache.flink.api.common.serialization.BulkWriter;
> > import org.apache.flink.core.fs.FSDataOutputStream;
> > import org.apache.flink.core.fs.Path;
> > import
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> > import
> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
> >
> > import java.io.FileOutputStream;
> > import java.io.IOException;
> > import java.io.Serializable;
> > import java.nio.charset.StandardCharsets;
> >
> > class Scratch {
> >   public static class Record implements Serializable {
> > private static final long serialVersionUID = 1L;
> >
> > String id;
> >
> > public Record() {}
> >
> > public Record(String id) {
> >   this.id = id;
> > }
> >
> > public String getId() {
> >   return id;
> > }
> >
> > public void setId(String id) {
> >   this.id = id;
> > }
> >   }
> >
> >   public static void main(String[] args) throws Exception {
> > final StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> >
> > TarArchiveOutputStream taos = new TarArchiveOutputStream(new
> GzipCompressorOutputStream(new
> FileOutputStream("/home/austin/Downloads/test.tgz")));
> > TarArchiveEntry fileEntry = new
> TarArchiveEntry(String.format("%s.txt", "test"));
> > String fullText = "hey\nyou\nwork";
> > byte[] fullTextData = fullText.getBytes();
> > fileEntry.setSize(fullTextData.length);
> > taos.putArchiveEntry(fileEntry);
> > taos.write(fullTextData, 0, fullTextData.length);
> > taos.closeArchiveEntry();
> > taos.flush();
> > taos.close();
> >
> > StreamingFileSink textSink = StreamingFileSink
> > .forBulkFormat(new
> Path("file:///home/austin/Downloads/text-output"),
> > new BulkWriter.Factory() {
> >   @Override
> >   public BulkWriter create(FSDataOutputStream out)
> throws IOException {
> > final TarArchiveOutputStream compressedOutputStream =
> new TarArchiveOutputStream(new GzipCompressorOutputStream(out));
> >
> > return new BulkWriter() {
> >   @Override
> >   public void addElement(Record record) throws
> IOException {
> > TarArchiveEntry fileEntry = new
> TarArchiveEntry(String.format("%s.txt", record.id));
> >   

StreamingFileSink Not Flushing All Data

2020-02-21 Thread Austin Cawley-Edwards
Hi there,

Using Flink 1.9.1, trying to write .tgz files with the
StreamingFileSink#BulkWriter. It seems like flushing the output stream
doesn't flush all the data written. I've verified I can create valid files
using the same APIs and data on there own, so thinking it must be something
I'm doing wrong with the bulk format. I'm writing to the local filesystem,
with the `file://` protocol.

For Tar/ Gzipping, I'm using the Apache Commons Compression library,
version 1.20.

Here's a runnable example of the issue:

import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
import org.apache.commons.compress.compressors.gzip.GzipCompressorOutputStream;
import org.apache.flink.api.common.serialization.BulkWriter;
import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;

import java.io.FileOutputStream;
import java.io.IOException;
import java.io.Serializable;
import java.nio.charset.StandardCharsets;

class Scratch {
  public static class Record implements Serializable {
private static final long serialVersionUID = 1L;

String id;

public Record() {}

public Record(String id) {
  this.id = id;
}

public String getId() {
  return id;
}

public void setId(String id) {
  this.id = id;
}
  }

  public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();

TarArchiveOutputStream taos = new TarArchiveOutputStream(new
GzipCompressorOutputStream(new
FileOutputStream("/home/austin/Downloads/test.tgz")));
TarArchiveEntry fileEntry = new
TarArchiveEntry(String.format("%s.txt", "test"));
String fullText = "hey\nyou\nwork";
byte[] fullTextData = fullText.getBytes();
fileEntry.setSize(fullTextData.length);
taos.putArchiveEntry(fileEntry);
taos.write(fullTextData, 0, fullTextData.length);
taos.closeArchiveEntry();
taos.flush();
taos.close();

StreamingFileSink textSink = StreamingFileSink
.forBulkFormat(new Path("file:///home/austin/Downloads/text-output"),
new BulkWriter.Factory() {
  @Override
  public BulkWriter create(FSDataOutputStream out)
throws IOException {
final TarArchiveOutputStream compressedOutputStream =
new TarArchiveOutputStream(new GzipCompressorOutputStream(out));

return new BulkWriter() {
  @Override
  public void addElement(Record record) throws IOException {
TarArchiveEntry fileEntry = new
TarArchiveEntry(String.format("%s.txt", record.id));
byte[] fullTextData =
"hey\nyou\nplease\nwork".getBytes(StandardCharsets.UTF_8);
fileEntry.setSize(fullTextData.length);
compressedOutputStream.putArchiveEntry(fileEntry);
compressedOutputStream.write(fullTextData, 0,
fullTextData.length);
compressedOutputStream.closeArchiveEntry();
  }

  @Override
  public void flush() throws IOException {
compressedOutputStream.flush();
  }

  @Override
  public void finish() throws IOException {
this.flush();
  }
};
  }
})
.withBucketCheckInterval(1000)
.build();

env
.fromElements(new Record("1"), new Record("2"))
.addSink(textSink)
.name("Streaming File Sink")
.uid("streaming-file-sink");
env.execute("streaming file sink test");
  }
}


>From the stat/ hex dumps, you can see that the first bits are there, but
are then cut off:

~/Downloads » stat test.tgz
  File: test.tgz
  Size: 114   Blocks: 8  IO Block: 4096   regular file
Device: 801h/2049d Inode: 30041077Links: 1
Access: (0664/-rw-rw-r--)  Uid: ( 1000/  austin)   Gid: ( 1000/  austin)
Access: 2020-02-21 19:30:06.009028283 -0500
Modify: 2020-02-21 19:30:44.509424406 -0500
Change: 2020-02-21 19:30:44.509424406 -0500
 Birth: -

~/Downloads » tar -tvf test.tgz
-rw-r--r-- 0/0  12 2020-02-21 19:35 test.txt

~/Downloads » hd test.tgz
  1f 8b 08 00 00 00 00 00  00 ff ed cf 31 0e 80 20
 |1.. |
0010  0c 85 61 66 4f c1 09 cc  2b 14 3c 8f 83 89 89 03
 |..afO...+.<.|
0020  09 94 a8 b7 77 30 2e ae  8a 2e fd 96 37 f6 af 4c
 |w0..7..L|
0030  45 7a d9 c4 34 04 02 22  b3 c5 e9 be 00 b1 25 1f
 |Ez..4.."..%.|
0040  1d 63 f0 81 82 05 91 77  d1 58 b4 8c ba d4 22 63
 |.c.w.X"c|
0050  36 78 7c eb fe dc 0b 69  5f 98 a7 bd db 53 ed d6
 |6x|i_S..|
0060  94 97 bf 5b 94 52 4a 7d  

Re: CSV StreamingFileSink

2020-02-19 Thread Austin Cawley-Edwards
Hey Timo,

Thanks for the assignment link! Looks like most of my issues can be solved
by getting better acquainted with Java file APIs and not in Flink-land.


Best,
Austin

On Wed, Feb 19, 2020 at 6:48 AM Timo Walther  wrote:

> Hi Austin,
>
> the StreamingFileSink allows bucketing the output data.
>
> This should help for your use case:
>
>
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/streamfile_sink.html#bucket-assignment
>
> Regards,
> Timo
>
>
> On 19.02.20 01:00, Austin Cawley-Edwards wrote:
> > Following up on this -- does anyone know if it's possible to stream
> > individual files to a directory using the StreamingFileSink? For
> > instance, if I want all records that come in during a certain day to be
> > partitioned into daily directories:
> >
> > 2020-02-18/
> > large-file-1.txt
> > large-file-2.txt
> > 2020-02-19/
> > large-file-3.txt
> >
> > Or is there another way to accomplish this?
> >
> > Thanks!
> > Austin
> >
> > On Tue, Feb 18, 2020 at 5:33 PM Austin Cawley-Edwards
> > mailto:austin.caw...@gmail.com>> wrote:
> >
> > Hey all,
> >
> > Has anyone had success using the StreamingFileSink[1] to write CSV
> > files? And if so, what about compressed (Gzipped, ideally) files/
> > which libraries did you use?
> >
> >
> > Best,
> > Austin
> >
> >
> > [1]:
> >
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/streamfile_sink.html
> >
>
>


  1   2   >