Re: flink add multiple sink in sequence

2023-01-08 Thread Great Info
Does it mean sequence of writing is not guaranteed between these
independent Sink1 and Sink2?




On Fri, 6 Jan 2023, 10:44 am Shammon FY,  wrote:

> Hi @Great
>
> I think the two sinks in your example are equivalent and independent. If
> there are some logical relationships between two sinks, you may need to
> create a new combined sink and do it yourself.
>
> On Thu, Jan 5, 2023 at 11:48 PM Great Info  wrote:
>
>>
>> I have a stream from Kafka, after reading it and doing some
>> transformations/enrichment I need to store the final data stream in the
>> database and publish it to Kafka so I am planning to add two sinks like
>> below
>>
>>
>> *finalInputStream.addSink(dataBaseSink); //
>> Sink1finalInputStream.addSink( flinkKafkaProducer ); //Sink2 *
>>
>> Has the sequence guaranteed between Sink1 and Sink2, in my requirement
>> stream to sink2 should start to begin only after successfully completing
>> Sink1, If Sink1 fails it should not write to Sink2.
>>
>


flink add multiple sink in sequence

2023-01-05 Thread Great Info
I have a stream from Kafka, after reading it and doing some
transformations/enrichment I need to store the final data stream in the
database and publish it to Kafka so I am planning to add two sinks like
below


*finalInputStream.addSink(dataBaseSink); // Sink1finalInputStream.addSink(
flinkKafkaProducer ); //Sink2 *

Has the sequence guaranteed between Sink1 and Sink2, in my requirement
stream to sink2 should start to begin only after successfully completing
Sink1, If Sink1 fails it should not write to Sink2.


Utilizing Kafka headers in Flink Kafka connector

2022-10-12 Thread Great Info
I have some flink applications that read streams from Kafka, now
the producer side code has introduced some additional information in Kafka
headers while producing records.
Now I need to change my consumer-side logic to process the records if the
header contains a specific value, if the header value is different than the
one I am looking I just need to move forward with the next steam.

I got some sample reference code
but this logic needs to
deserialize and verify the header. Is there any simple way to ignore the
record before deserializing?


Flink FaultTolerant at operator level

2022-10-05 Thread Great Info
I have flink job and the current flow looks like below

Source_Kafka->*operator-1*(key by partition)->*operator-2*(enrich the
record)-*Sink1-Operator* & *Sink2-Operator *

With this flow the current problem is at operator-2, the core logic runs
here is to lookup some reference status data from redis cache and enrich
the stream, this works fine if job runs well but recently I saw if job
failed at this operator or sink operators, entire jobs gets restarts and
stream gets repossessed from source, that causes different
reference status(if reference status in cache changes during this restart)
in enrichment, as per the business requirement I need to enrich with
reference status when stream received at my job.

1. Is there any way to just reprocess sink1,sink2 operators?
2. How to just resume Sink2 during some cases like Sink-1 was
successful where Sink2 failed


Re: Flink running same task on different Task Manager

2022-08-18 Thread Great Info
Kindly help with this, I got stuck
-> If so, I think you can set Task1 and Task2 to the same parallelism and
set them in the same slot sharing group. In this way, Task1 and Task2 will
be deployed into the same slot(That is, the same task manager).

*Updating task details *
*Task1- Source some static data over HTTPS and keep it in memory(in static
memory block), this keeps refreshing it every 1 hour, since this is huge,
it can not be broadcasted *

*  Task2- Process some real-time events from Kafka and uses
static data to validate something and transform, then forward to other
Kafka topic*

Task2 needs more parallelism so deploying both Task1 and Task2 on the same
node (task manager) is becoming difficult, I am using AWS KDA and that has
the limitation to run only 8 tasks per node. now I have a requirement to
run parallelism  of 12 for the Task2

1. set different SlotSharingGroup for task1 and Task2
2. set  parallelism to 12 for the task2 (this real-time task needs to read
from 12 different Kafka partitions hence setting it to 12)
3. set parallelism  of task1 to 2
4. then set this cluster.evenly-spread-out-slots: true

Will these methods work? Also, I did not find a way to set
different parallelism for each slotSharingGourp

On Thu, Jul 14, 2022 at 10:12 PM Great Info  wrote:

> -> If so, I think you can set Task1 and Task2 to the same parallelism and
> set them in the same slot sharing group. In this way, Task1 and Task2 will
> be deployed into the same slot(That is, the same task manager).
>
> *Updating task details *
> *Task1- Source some static data over HTTPS and keep it in memory(in static
> memory block), this keeps refreshing it every 1 hour, since this is huge,
> it can not be broadcasted *
>
> *  Task2- Process some real-time events from Kafka and uses
> static data to validate something and transform, then forward to other
> Kafka topic*
>
> Task2 needs more parallelism so deploying both Task1 and Task2 on the same
> node (task manager) is becoming difficult, I am using AWS KDA and that has
> the limitation to run only 8 tasks per node. now I have a requirement to
> run parallelism  of 12 for the Task2
>
> 1. set different SlotSharingGroup for task1 and Task2
> 2. set  parallelism to 12 for the task2 (this real-time task needs to read
> from 12 different Kafka partitions hence setting it to 12)
> 3. set parallelism  of task1 to 2
> 4. then set this cluster.evenly-spread-out-slots: true
>
> Will these methods work? Also, I did not find a way to set
> different parallelism for each slotSharingGourp
>
>
>
> On Thu, Jul 14, 2022 at 7:54 AM Lijie Wang 
> wrote:
>
>> Hi Great,
>>
>> -> Is there a way to set the restart strategy so that only tasks in the
>> same slot group will restart during failure?
>>
>> No. On task failover, all tasks in the same region will be restarted at
>> the same time (to ensure the data consistency).
>> You can get more details about failover strategy in [1]
>>
>> [1]
>> https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/ops/state/task_failure_recovery/#failover-strategies
>>
>> Best,
>> Lijie
>>
>>
>> Great Info  于2022年7月13日周三 23:11写道:
>>
>>> thanks for helping with some inputs
>>> actually, I have created task1 and task2 in separate slot groups,
>>> thought it would be good if they run in independent slots. Also now facing
>>> some issues during restarts. whenever  task1 has any exception entire job
>>> is restarting.
>>>
>>> Is there a way to set the restart strategy so that only tasks in the
>>> same slot group will restart during failure
>>> ?
>>>
>>> On Wed, Jun 15, 2022 at 6:13 PM Lijie Wang 
>>> wrote:
>>>
>>>> Hi Great,
>>>>
>>>> Do you mean there is a Task1 and a Task2 on each task manager?
>>>>
>>>> If so, I think you can set Task1 and Task2 to the same parallelism and
>>>> set them in the same slot sharing group. In this way, the Task1 and Task2
>>>> will be deployed into the same slot(That is, the same task manager).
>>>>
>>>> You can get more details about slot sharing group in [1], and you can
>>>> get how to set slot sharing group in [2].
>>>>
>>>> [1]
>>>> https://nightlies.apache.org/flink/flink-docs-master/docs/concepts/flink-architecture/#task-slots-and-resources
>>>> [2]
>>>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/overview/#set-slot-sharing-group
>>>>
>>>> Best,
>>>> Lijie
>>>>
>>>> Weihua Hu  于2022年6月15日周三 13:16写道:
>

Re: Flink running same task on different Task Manager

2022-07-14 Thread Great Info
-> If so, I think you can set Task1 and Task2 to the same parallelism and
set them in the same slot sharing group. In this way, Task1 and Task2 will
be deployed into the same slot(That is, the same task manager).

*Updating task details *
*Task1- Source some static data over HTTPS and keep it in memory(in static
memory block), this keeps refreshing it every 1 hour, since this is huge,
it can not be broadcasted *

*  Task2- Process some real-time events from Kafka and uses
static data to validate something and transform, then forward to other
Kafka topic*

Task2 needs more parallelism so deploying both Task1 and Task2 on the same
node (task manager) is becoming difficult, I am using AWS KDA and that has
the limitation to run only 8 tasks per node. now I have a requirement to
run parallelism  of 12 for the Task2

1. set different SlotSharingGroup for task1 and Task2
2. set  parallelism to 12 for the task2 (this real-time task needs to read
from 12 different Kafka partitions hence setting it to 12)
3. set parallelism  of task1 to 2
4. then set this cluster.evenly-spread-out-slots: true

Will these methods work? Also, I did not find a way to set
different parallelism for each slotSharingGourp



On Thu, Jul 14, 2022 at 7:54 AM Lijie Wang  wrote:

> Hi Great,
>
> -> Is there a way to set the restart strategy so that only tasks in the
> same slot group will restart during failure?
>
> No. On task failover, all tasks in the same region will be restarted at
> the same time (to ensure the data consistency).
> You can get more details about failover strategy in [1]
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/ops/state/task_failure_recovery/#failover-strategies
>
> Best,
> Lijie
>
>
> Great Info  于2022年7月13日周三 23:11写道:
>
>> thanks for helping with some inputs
>> actually, I have created task1 and task2 in separate slot groups,
>> thought it would be good if they run in independent slots. Also now facing
>> some issues during restarts. whenever  task1 has any exception entire job
>> is restarting.
>>
>> Is there a way to set the restart strategy so that only tasks in the same
>> slot group will restart during failure
>> ?
>>
>> On Wed, Jun 15, 2022 at 6:13 PM Lijie Wang 
>> wrote:
>>
>>> Hi Great,
>>>
>>> Do you mean there is a Task1 and a Task2 on each task manager?
>>>
>>> If so, I think you can set Task1 and Task2 to the same parallelism and
>>> set them in the same slot sharing group. In this way, the Task1 and Task2
>>> will be deployed into the same slot(That is, the same task manager).
>>>
>>> You can get more details about slot sharing group in [1], and you can
>>> get how to set slot sharing group in [2].
>>>
>>> [1]
>>> https://nightlies.apache.org/flink/flink-docs-master/docs/concepts/flink-architecture/#task-slots-and-resources
>>> [2]
>>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/overview/#set-slot-sharing-group
>>>
>>> Best,
>>> Lijie
>>>
>>> Weihua Hu  于2022年6月15日周三 13:16写道:
>>>
>>>> I don't really understand how task2 reads static data from task1,
>>>> but I think you can integrate the logic of getting static data from
>>>> http in
>>>> task1 into task2 and keep only one kind of task.
>>>>
>>>> Best,
>>>> Weihua
>>>>
>>>>
>>>> On Wed, Jun 15, 2022 at 10:07 AM Great Info  wrote:
>>>>
>>>> > thanks for helping with some inputs, yes I am using rich function and
>>>> > handling objects created in open, and also and network calls are
>>>> getting
>>>> > called in a run.
>>>> > but currently, I got stuck running this same task on *all task
>>>> managers*
>>>> > (nodes), when I submit the job, this task1(static data task) runs
>>>> only one
>>>> > task manager, I have 3 task managers in my Flink cluster.
>>>> >
>>>> >
>>>> > On Tue, Jun 14, 2022 at 7:20 PM Weihua Hu 
>>>> wrote:
>>>> >
>>>> >> Hi,
>>>> >>
>>>> >> IMO, Broadcast is a better way to do this, which can reduce the QPS
>>>> of
>>>> >> external access.
>>>> >> If you do not want to use Broadcast, Try using RichFunction, start a
>>>> >> thread in the open() method to refresh the data regularly. but be
>>>> careful
>>>> >> to clean up your data and threads in the close() method,

Re: Flink running same task on different Task Manager

2022-07-13 Thread Great Info
thanks for helping with some inputs
actually, I have created task1 and task2 in separate slot groups,
thought it would be good if they run in independent slots. Also now facing
some issues during restarts. whenever  task1 has any exception entire job
is restarting.

Is there a way to set the restart strategy so that only tasks in the same
slot group will restart during failure
?

On Wed, Jun 15, 2022 at 6:13 PM Lijie Wang  wrote:

> Hi Great,
>
> Do you mean there is a Task1 and a Task2 on each task manager?
>
> If so, I think you can set Task1 and Task2 to the same parallelism and set
> them in the same slot sharing group. In this way, the Task1 and Task2 will
> be deployed into the same slot(That is, the same task manager).
>
> You can get more details about slot sharing group in [1], and you can get
> how to set slot sharing group in [2].
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/concepts/flink-architecture/#task-slots-and-resources
> [2]
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/overview/#set-slot-sharing-group
>
> Best,
> Lijie
>
> Weihua Hu  于2022年6月15日周三 13:16写道:
>
>> I don't really understand how task2 reads static data from task1,
>> but I think you can integrate the logic of getting static data from http
>> in
>> task1 into task2 and keep only one kind of task.
>>
>> Best,
>> Weihua
>>
>>
>> On Wed, Jun 15, 2022 at 10:07 AM Great Info  wrote:
>>
>> > thanks for helping with some inputs, yes I am using rich function and
>> > handling objects created in open, and also and network calls are getting
>> > called in a run.
>> > but currently, I got stuck running this same task on *all task managers*
>> > (nodes), when I submit the job, this task1(static data task) runs only
>> one
>> > task manager, I have 3 task managers in my Flink cluster.
>> >
>> >
>> > On Tue, Jun 14, 2022 at 7:20 PM Weihua Hu 
>> wrote:
>> >
>> >> Hi,
>> >>
>> >> IMO, Broadcast is a better way to do this, which can reduce the QPS of
>> >> external access.
>> >> If you do not want to use Broadcast, Try using RichFunction, start a
>> >> thread in the open() method to refresh the data regularly. but be
>> careful
>> >> to clean up your data and threads in the close() method, otherwise it
>> will
>> >> lead to leaks.
>> >>
>> >> Best,
>> >> Weihua
>> >>
>> >>
>> >> On Tue, Jun 14, 2022 at 12:04 AM Great Info  wrote:
>> >>
>> >>> Hi,
>> >>> I have one flink job which has two tasks
>> >>> Task1- Source some static data over https and keep it in memory, this
>> >>> keeps refreshing it every 1 hour
>> >>> Task2- Process some real-time events from Kafka and uses static data
>> to
>> >>> validate something and transform, then forward to other Kafka topic.
>> >>>
>> >>> so far, everything was running on the same Task manager(same node),
>> but
>> >>> due to some recent scaling requirements need to enable partitioning on
>> >>> Task2 and that will make some partitions run on other task managers.
>> but
>> >>> other task managers don't have the static data
>> >>>
>> >>> is there a way to run Task1 on all the task managers? I don't want to
>> >>> enable broadcasting since it is a little huge and also I can not
>> persist
>> >>> data in DB due to data regulations.
>> >>>
>> >>>
>>
>


Flink running same task on different Task Manager

2022-06-13 Thread Great Info
Hi,
I have one flink job which has two tasks
Task1- Source some static data over https and keep it in memory, this keeps
refreshing it every 1 hour
Task2- Process some real-time events from Kafka and uses static data to
validate something and transform, then forward to other Kafka topic.

so far, everything was running on the same Task manager(same node), but due
to some recent scaling requirements need to enable partitioning on Task2
and that will make some partitions run on other task managers. but other
task managers don't have the static data

is there a way to run Task1 on all the task managers? I don't want to
enable broadcasting since it is a little huge and also I can not persist
data in DB due to data regulations.


flink Job is throwing depdnecy issue when submitted to clusrer

2022-05-06 Thread Great Info
I have one flink job which reads files from s3 and processes them.
Currently, it is running on flink 1.9.0, I need to upgrade my cluster to
1.13.5, so I have done the changes in my job pom and brought up the flink
cluster using 1.13.5 dist.

when I submit my application I am getting the below error when it tries to
connect to s3, have updated the s3 SDK version to the latest, but still
getting the same error.

caused by: java.lang.invoke.lambdaconversionexception: invalid receiver
type interface org.apache.http.header; not a subtype of implementation type
interface org.apache.http.namevaluepair

it works when I just run as a mini-cluster ( running just java -jar
) and also when I submit to the Flink cluster with 1.9.0.

Not able to understand where the dependency match is happening.


Re: how to initialize few things at task managers

2022-04-19 Thread Great Info
I am deploying as a docker on our servers, due to some restrictions I can
only pass Keystore URLs.

one option is yarn.ship-files !.  can you help me with pointing to the
sample code, and how job manager can ship this file?

download as part of job's main function and send to all task managers..
will this work?



On Mon, Apr 18, 2022 at 10:10 PM Austin Cawley-Edwards <
austin.caw...@gmail.com> wrote:

> If you are using Kubernetes to deploy Flink, you could think about an
> initContainer on the TMs or a custom Docker entry point that does this
> initialization.
>
> Best,
> Austin
>
> On Mon, Apr 18, 2022 at 7:49 AM huweihua  wrote:
>
>> Hi, Init stuff when task manager comes up is not an option.
>> But if the Keystore file is not changeable and you are using yarn mode,
>> maybe you can use ‘yarn.ship-files’[1] to localize it.
>>
>> [1]
>> https://nightlies.apache.org/flink/flink-docs-master/zh/docs/deployment/config/#yarn-ship-files
>>
>> 2022年4月16日 下午11:44,Great Info  写道:
>>
>> I need to download Keystore and use it while creating the source
>> connector, currently, I am overriding the open method
>> <https://github.com/apache/flink/blob/99c2a415e9eeefafacf70762b6f54070f7911ceb/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java#L560>
>>  but
>> this gets called for each of my source connectors.
>>
>>  @Override
>> public void open(Configuration configuration) throws Exception {
>>
>>   // do few things like download Keystore to default path etc
>>  super.open(configuration)
>> }
>>
>> Is there an option to init a few pre stuff as soon as the task manager
>> comes up?
>>
>>
>>


how to initialize few things at task managers

2022-04-16 Thread Great Info
I need to download Keystore and use it while creating the source connector,
currently, I am overriding the open method

but
this gets called for each of my source connectors.

 @Override
public void open(Configuration configuration) throws Exception {

  // do few things like download Keystore to default path etc
 super.open(configuration)
}

Is there an option to init a few pre stuff as soon as the task manager
comes up?


Flink does not cleanup some disk memory after submitting jar over rest

2021-04-08 Thread Great Info
I have deployed my own flink setup in AWS ECS. One Service for JobManager
and one Service for task Managers. I am running one ECS task for a job
manager and 3 ecs tasks for TASK managers.

I have a kind of batch job which I upload using flink rest every-day with
changing new arguments, when I submit each time disk memory gets increased
by ~ 600MB, I have given a checkpoint as S3 . Also I have set
*historyserver.archive.clean-expired-jobs* true .

Since I am running on ECS, I am not able to find why the memory is getting
increased on every jar upload and execution .

What are the flink config params I should look at to make sure the memory
is not shooting up?