Re: Add me to slack

2022-06-05 Thread Zain Haider Nemati
Hi Jing,
Could you also send the invite to me?
zain.hai...@retailo.co


On Mon, 6 Jun 2022 at 7:04 AM Jing Ge  wrote:

> Hi Xiao,
>
> Just done, please check. Thanks!
>
> Best regards,
> Jing
>
>
> On Mon, Jun 6, 2022 at 3:59 AM Xiao Ma  wrote:
>
>> Hi Jing,
>>
>> Could you please add me to the slack channel also?
>>
>> Thank you.
>>
>>
>> Best,
>> Mark Ma
>>
>> On Sun, Jun 5, 2022 at 9:57 PM Jing Ge  wrote:
>>
>>> Hi Raghunadh,
>>>
>>> Just did, please check your email. Thanks!
>>>
>>> Best regards,
>>> Jing
>>>
>>> On Mon, Jun 6, 2022 at 3:51 AM Raghunadh Nittala 
>>> wrote:
>>>
 Team, Kindly add me to the slack channel.

 Best Regards.

>>> --
>> Xiao Ma
>> Geotab
>> Software Developer, Data Engineering | B.Sc, M.Sc
>> Direct +1 (416) 836 - 3541
>> Toll-free  +1 (877) 436 - 8221
>> Visit   www.geotab.com
>> Twitter | Facebook | YouTube | LinkedIn
>>
>


Not able to see std output in console/.out files with table API

2022-06-02 Thread Zain Haider Nemati
Hi,
We are using table apis to integrate and transform data sources and
converting them to datastream. We want to see the data formatting and
adding a .print() sink to the datastream but the .out files do not show any
output.
We do see records coming in from the metrics in flink UI though.
Suggestions on where to look at for potential issues?

code:
tEnv.executeSql(“CREATE TABLE orders (\n” +
”id  BIGINT,\n” +
”customer_id BIGINT\n” +
“) WITH (\n” +
”‘connector’ = ‘kafka’,\n” +
”‘topic’ = ‘orders’,\n” +
”‘properties.bootstrap.servers’ = ‘...’,\n” +
”‘scan.startup.mode’ = ‘earliest-offset’,\n” +
”‘format’= ‘json’\n” +
“)”);
Table result = tEnv.sqlQuery(“SELECT o.id AS order_id,\n” +
” dbo.batch_id AS batch_id,\n” +
” o.customer_id AS customer_id,\n” +
” dbo.delivery_priority AS delivery_priority\n” +
” FROM orders o\n” +
” INNER JOIN delivery_batch_orders dbo ON o.id =
dbo.order_id\n”
);

tEnv.toAppendStream(result, StringValue.class).print();
env.execute();

Flink Version : 1.13.1


Re: Flink UI in Application Mode

2022-05-23 Thread Zain Haider Nemati
Hi David,
Thanks for your response.
When submitting a job in application mode it gives a url at the end but
that is different i.e. on different ports when you submit different jobs in
application mode. Is there a port/ui where I can see the consolidated list
of jobs running instead of checking each on a different port?

Noted regarding the mailing list concern thankyou for letting me know !


On Mon, May 23, 2022 at 1:49 PM David Morávek  wrote:

> Hi Zain,
>
> you can find a link to web-ui either in the CLI output after the job
> submission or in the YARN ResourceManager web ui [1]. With YARN Flink needs
> to choose the application master port at random (could be somehow
> controlled by setting _yarn.application-master.port_) as there might be
> multiple JMs running on the same NodeManager.
>
> OT: I've seen you've opened multiple threads on both dev and user mailing
> list. As these are all "user" related questions, can you please focus them
> on the user ML only? Separating user & development (the Flink
> contributions) threads into separate lists allows community to work more
> efficiently.
>
> Best,
> D.
>
> On Sun, May 22, 2022 at 7:44 PM Zain Haider Nemati 
> wrote:
>
>> Hi,
>> Which port does flink UI run on in application mode?
>> If I am running 5 yarn jobs in application mode would the UI be same for
>> each or different ports for each?
>>
>


Flink UI in Application Mode

2022-05-22 Thread Zain Haider Nemati
Hi,
Which port does flink UI run on in application mode?
If I am running 5 yarn jobs in application mode would the UI be same for
each or different ports for each?


Application mode -yarn dependancy error

2022-05-22 Thread Zain Haider Nemati
Hi,
I'm getting this error in yarn application mode when submitting my job.

Caused by: java.lang.ClassCastException: cannot assign instance of
org.apache.commons.collections.map.LinkedMap to field
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.pendingOffsetsToCommit
of type org.apache.commons.collections.map.LinkedMap in instance of
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
at
java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2301)
~[?:1.8.0_332]
at
java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1431)
~[?:1.8.0_332]
at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2437)
~[?:1.8.0_332]
at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2355)
~[?:1.8.0_332]
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2213)
~[?:1.8.0_332]
at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1669)
~[?:1.8.0_332]
at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2431)
~[?:1.8.0_332]
at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2355)
~[?:1.8.0_332]
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2213)
~[?:1.8.0_332]
at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1669)
~[?:1.8.0_332]
at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2431)
~[?:1.8.0_332]
at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2355)
~[?:1.8.0_332]
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2213)
~[?:1.8.0_332]
at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1669)
~[?:1.8.0_332]
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:503)
~[?:1.8.0_332]
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:461)
~[?:1.8.0_332]
at
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:615)
~[streamingjobs-1.13.jar:?]
at
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:600)
~[streamingjobs-1.13.jar:?]
at
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:587)
~[streamingjobs-1.13.jar:?]
at
org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:541)
~[streamingjobs-1.13.jar:?]
at
org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:322)
~[flink-dist_2.12-1.13.1.jar:1.13.1]
at
org.apache.flink.streaming.runtime.tasks.OperatorChain.(OperatorChain.java:154)
~[flink-dist_2.12-1.13.1.jar:1.13.1]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.executeRestore(StreamTask.java:548)
~[flink-dist_2.12-1.13.1.jar:1.13.1]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647)
~[flink-dist_2.12-1.13.1.jar:1.13.1]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:537)
~[flink-dist_2.12-1.13.1.jar:1.13.1]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:759)
~[streamingjobs-1.13.jar:?]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
~[streamingjobs-1.13.jar:?]
at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_332]


Json Deserialize in DataStream API with array length not fixed

2022-05-21 Thread Zain Haider Nemati
Hi Folks,
I have data coming in this format:

{
“data”: {
“oid__id”:  “61de4f26f01131783f162453”,
“array_coordinates”:“[ { \“speed\” : \“xxx\“, \“accuracy\” :
\“xxx\“, \“bearing\” : \“xxx\“, \“altitude\” : \“xxx\“, \“longitude\” :
\“xxx\“, \“latitude\” : \“xxx\“, \“dateTimeStamp\” : \“xxx\“, \“_id\” : {
\“$oid\” : \“xxx\” } }, { \“speed\” : \“xxx\“, \“isFromMockProvider\” :
\“false\“, \“accuracy\” : \“xxx\“, \“bearing\” : \“xxx\“, \“altitude\” :
\“xxx\“, \“longitude\” : \“xxx\“, \“latitude\” : \“xxx\“, \“dateTimeStamp\”
: \“xxx\“, \“_id\” : { \“$oid\” : \“xxx\” } }]“,
“batchId”:  “xxx",
“agentId”:  “xxx",
“routeKey”: “40042-12-01-2022",
“__v”:  0
},
“metadata”: {
“timestamp”:“2022-05-02T18:49:52.619827Z”,
“record-type”:  “data”,
“operation”:“load”,
“partition-key-type”:   “primary-key”,
“schema-name”:  “xxx”,
“table-name”:   “xxx”
}
}

Where length of array coordinates array varies is not fixed in the source
is their any way to define a json deserializer for this? If so would really
appreciate if I can get some help on this


Re: Job Logs - Yarn Application Mode

2022-05-20 Thread Zain Haider Nemati
Hi,
Thanks for your response folks. Is it possible to access logs via flink UI
in yarn application mode? Similar to how we can access them in standalone
mode

On Fri, May 20, 2022 at 11:06 AM Shengkai Fang  wrote:

> Thanks for Biao's explanation.
>
> Best,
> Shengkai
>
> Biao Geng  于2022年5月20日周五 11:16写道:
>
>> Hi there,
>> @Zain, Weihua's suggestion should be able to fulfill the request to check
>> JM logs. If you do want to use YARN cli for running Flink applications, it
>> is possible to check JM's log with the YARN command like:
>> *yarn logs -applicationId application_xxx_yyy -am -1 -logFiles
>> jobmanager.log*
>> For TM log, command would be like:
>> * yarn logs -applicationId  -containerId 
>> -logFiles taskmanager.log*
>> Note, it is not super easy to find the container id of TM. Some
>> workaround would be to check JM's log first and get the container id for TM
>> from that. You can also learn more about the details of above commands from 
>> *yarn
>> logs -help*
>>
>> @Shengkai, yes, you are right the actual JM address is managed by YARN.
>> To access the JM launched by YARN, users need to access YARN web ui to find
>> the YARN application by applicationId and then click 'application master
>> url' of that application to be redirected to Flink web ui.
>>
>> Best,
>> Biao Geng
>>
>> Shengkai Fang  于2022年5月20日周五 10:59写道:
>>
>>> Hi.
>>>
>>> I am not familiar with the YARN application mode. Because the job
>>> manager is started when submit the jobs. So how can users know the address
>>> of the JM? Do we need to look up the Yarn UI to search the submitted job
>>> with the JobID?
>>>
>>> Best,
>>> Shengkai
>>>
>>> Weihua Hu  于2022年5月20日周五 10:23写道:
>>>
>>>> Hi,
>>>> You can get the logs from Flink Web UI if job is running.
>>>> Best,
>>>> Weihua
>>>>
>>>> 2022年5月19日 下午10:56,Zain Haider Nemati  写道:
>>>>
>>>> Hey All,
>>>> How can I check logs for my job when it is running in application mode
>>>> via yarn
>>>>
>>>>
>>>>


Re: Kinesis Sink - Data being received with intermittent breaks

2022-05-20 Thread Zain Haider Nemati
Hi Danny,
I looked into it in a bit more thorough detail, the bottleneck seems to be
the transform function which is at 100% and causing back pressuring. Im
looking into that.
Thanks for your help, much appreciated !

On Fri, May 20, 2022 at 1:24 AM Ber, Jeremy  wrote:

> Hi Zain—
>
>
>
> Are you seeing any data loss present within the Flink Dashboard subtasks
> of each task? On the bottom of your dashboard you should see data going
> from each blue box to the next. Is this a comprehensive set of data?
> Meaning do you see 80M from the source -> first operator -> second operator
> -> sink?
>
>
>
> Secondly, it may be easier to troubleshoot this by removing a few
> variables. Would you be able to remove the operator which segregates your
> data into 100 length records and simply forward that data to the next
> operator? Simultaneously, could you leave the Kinesis Producer
> configuration settings (apart from queue limit) at their defaults? This
> will give a good baseline from which to improve upon.
>
>
>
> Jeremy
>
>
>
> *From: *Zain Haider Nemati 
> *Date: *Wednesday, May 18, 2022 at 6:15 AM
> *To: *Danny Cranmer 
> *Cc: *Alexander Preuß , user <
> user@flink.apache.org>
> *Subject: *RE: [EXTERNAL]Kinesis Sink - Data being received with
> intermittent breaks
>
>
>
> *CAUTION*: This email originated from outside of the organization. Do not
> click links or open attachments unless you can confirm the sender and know
> the content is safe.
>
>
>
> Hey Danny,
>
> Thanks for getting back to me.
>
> - You are seeing bursty throughput, but the job is keeping up? There is no
> backpressure? --> Correct I'm not seeing any backpressure in any of the
> metrics
>
> - What is the throughput at the sink? --> num of records out -- 1100 per
> 10 seconds
>
> - On the graph screenshot, what is the period and stat (sum/average/etc)?
> -->It is incoming data (MB/s) each second
>
>
>
> So let me explain this in totality, the number of records residing in the
> source are about 80 million and the number of records i see in the kinesis
> data stream after it has consumed the data from source is about 20 million
> so im seeing alot of data loss and I think this potentially has to do with
> the intermediate dips im seeing in the records coming in the data stream.
>
>
>
> What are the configurations you guys generally suggest for data of this
> range and sinking to a kinesis data stream?
>
>
>
> On Wed, May 18, 2022 at 2:00 AM Danny Cranmer 
> wrote:
>
> Hello Zain,
>
>
>
> Thanks for providing the additional information. Going back to the
> original issue:
>
> - You are seeing bursty throughput, but the job is keeping up? There is no
> backpressure?
>
> - What is the throughput at the sink?
>
> - On the graph screenshot, what is the period and stat (sum/average/etc)?
>
>
>
> Let me shed some light on the log messages, let's take this example:
>
>
>
> LogInputStreamReader ... Stage 1 Triggers ...  { stream:
> 'flink-kafka-tracer', manual: 0, count: 0, size: 0, matches: 0, timed: 3,
> UserRecords: 6, KinesisRecords: 3 }
>
>
>
> Flush trigger reason:
>
> - manual: the flush was manually triggered
>
> - count: flush was triggered by the number of records in the container
>
> - size: the flush was triggered by the number of bytes in the container
>
> - matches: the predicate was matched
>
> - timed: the flush is triggered by elapsed timer
>
>
>
> Input/Output:
>
> - UserRecords: Number of input records KPL flushed (this can be higher
> than KinesisRecords when aggregation is enabled)
>
> - KinesisRecords: Number of records shipped to Kinesis Data Streams
>
>
>
> Stage 2 triggers tells us the number of API invocations via the PutRecords
> field.
>
>
>
> I can see from your logs that the majority of flushes are due to the
> timer, and it does not look overly bursty. Seems to sit at around 3 records
> per 15 seconds, or 1 record every 5 seconds. This seems very low, is it
> expected?
>
>
>
> Thanks,
>
> Danny Cranmer
>
>
>
> On Mon, May 16, 2022 at 10:57 PM Zain Haider Nemati <
> zain.hai...@retailo.co> wrote:
>
> Hey Danny,
>
> Thanks for having a look at the issue.
>
> I am using a custom flink operator to segregate the data into a consistent
> format of length 100 which is no more than 1 MB. The configurations I
> shared were after I was exploring tweaking some of them to see if it
> improves the throughput.
>
>
>
> Regarding your queries :
>
> - Which Flink version is this? -- > *Version 1.13*
>
> - Can you see any errors in the

Job Logs - Yarn Application Mode

2022-05-19 Thread Zain Haider Nemati
Hey All,
How can I check logs for my job when it is running in application mode via
yarn


Re: Flink application on yarn cluster - main method not found

2022-05-19 Thread Zain Haider Nemati
Hi Folks,
Would appreciate it if someone could help me out with this !

Cheers

On Thu, May 19, 2022 at 1:49 PM Zain Haider Nemati 
wrote:

> Hi,
> Im running flink application on yarn cluster it is giving me this error,
> it is working fine on standalone cluster. Any idea what could be causing
> this?
>
> Exception in thread "main" java.lang.NoSuchMethodError:
> org.apache.flink.client.deployment.application.ClassPathPackagedProgramRetriever.newBuilder([Ljava/lang/String;)Lorg/apache/flink/client/deployment/application/ClassPathPackagedProgramRetriever$Builder;
> at
> org.apache.flink.yarn.entrypoint.YarnApplicationClusterEntryPoint.getPackagedProgramRetriever(YarnApplicationClusterEntryPoint.java:137)
> at
> org.apache.flink.yarn.entrypoint.YarnApplicationClusterEntryPoint.getPackagedProgram(YarnApplicationClusterEntryPoint.java:121)
> at
> org.apache.flink.yarn.entrypoint.YarnApplicationClusterEntryPoint.main(YarnApplicationClusterEntryPoint.java:95)
>


Flink application on yarn cluster - main method not found

2022-05-19 Thread Zain Haider Nemati
Hi,
Im running flink application on yarn cluster it is giving me this error, it
is working fine on standalone cluster. Any idea what could be causing this?

Exception in thread "main" java.lang.NoSuchMethodError:
org.apache.flink.client.deployment.application.ClassPathPackagedProgramRetriever.newBuilder([Ljava/lang/String;)Lorg/apache/flink/client/deployment/application/ClassPathPackagedProgramRetriever$Builder;
at
org.apache.flink.yarn.entrypoint.YarnApplicationClusterEntryPoint.getPackagedProgramRetriever(YarnApplicationClusterEntryPoint.java:137)
at
org.apache.flink.yarn.entrypoint.YarnApplicationClusterEntryPoint.getPackagedProgram(YarnApplicationClusterEntryPoint.java:121)
at
org.apache.flink.yarn.entrypoint.YarnApplicationClusterEntryPoint.main(YarnApplicationClusterEntryPoint.java:95)


Re: Kinesis Sink - Data being received with intermittent breaks

2022-05-18 Thread Zain Haider Nemati
Hey Danny,
Thanks for getting back to me.
- You are seeing bursty throughput, but the job is keeping up? There is no
backpressure? --> Correct I'm not seeing any backpressure in any of the
metrics
- What is the throughput at the sink? --> num of records out -- 1100 per 10
seconds
- On the graph screenshot, what is the period and stat (sum/average/etc)?
-->It is incoming data (MB/s) each second

So let me explain this in totality, the number of records residing in the
source are about 80 million and the number of records i see in the kinesis
data stream after it has consumed the data from source is about 20 million
so im seeing alot of data loss and I think this potentially has to do with
the intermediate dips im seeing in the records coming in the data stream.

What are the configurations you guys generally suggest for data of this
range and sinking to a kinesis data stream?

On Wed, May 18, 2022 at 2:00 AM Danny Cranmer 
wrote:

> Hello Zain,
>
> Thanks for providing the additional information. Going back to the
> original issue:
> - You are seeing bursty throughput, but the job is keeping up? There is no
> backpressure?
> - What is the throughput at the sink?
> - On the graph screenshot, what is the period and stat (sum/average/etc)?
>
> Let me shed some light on the log messages, let's take this example:
>
> LogInputStreamReader ... Stage 1 Triggers ...  { stream:
> 'flink-kafka-tracer', manual: 0, count: 0, size: 0, matches: 0, timed: 3,
> UserRecords: 6, KinesisRecords: 3 }
>
> Flush trigger reason:
> - manual: the flush was manually triggered
> - count: flush was triggered by the number of records in the container
> - size: the flush was triggered by the number of bytes in the container
> - matches: the predicate was matched
> - timed: the flush is triggered by elapsed timer
>
> Input/Output:
> - UserRecords: Number of input records KPL flushed (this can be higher
> than KinesisRecords when aggregation is enabled)
> - KinesisRecords: Number of records shipped to Kinesis Data Streams
>
> Stage 2 triggers tells us the number of API invocations via the PutRecords
> field.
>
> I can see from your logs that the majority of flushes are due to the
> timer, and it does not look overly bursty. Seems to sit at around 3 records
> per 15 seconds, or 1 record every 5 seconds. This seems very low, is it
> expected?
>
> Thanks,
> Danny Cranmer
>
> On Mon, May 16, 2022 at 10:57 PM Zain Haider Nemati <
> zain.hai...@retailo.co> wrote:
>
>> Hey Danny,
>> Thanks for having a look at the issue.
>> I am using a custom flink operator to segregate the data into a
>> consistent format of length 100 which is no more than 1 MB. The
>> configurations I shared were after I was exploring tweaking some of them to
>> see if it improves the throughput.
>>
>> Regarding your queries :
>> - Which Flink version is this? -- > *Version 1.13*
>> - Can you see any errors in the Flink logs?  -->* No, Im attaching flink
>> logs after I have set all the configurations to default*
>> - Do you see any errors/throttles in the Kinesis Data Stream metrics?
>> --> *I was before segregating into smaller chunks not anymore*
>> - How many shards does your stream have? --> *It has 4 shards*
>> - What is your sink operator parallelism? --> *1*
>> - What is the general health of your job graph? --> *This is the only
>> job running at the moment, it isn't unhealthy*
>>   - Are the operators upstream of the sink backpressured? --> *No*
>>   - Are you sure the sink is actually the issue here? --> *I have used
>> the .print() as a sink and Im seeing all the records in real time it chokes
>> when paired with sink*
>>   - Are there any other potential bottlenecks? --> *So data is coming in
>> from source correctly, I have a flatmap transformation enabled which reads
>> and segments it into chunks of <=1MB which is also tested using the
>> .print() sink*
>> - When you say you are trying to achieve "1MB chunks", I assume this is
>> per Kinesis record, not per PutRecords batch? --> *Correct*
>>
>> Attaching a small chunk of the log file from when the job is started [It
>> goes down to 0 records for some periods of time as well, in the log file it
>> shows mostly between 3-6 records]
>>
>> Really appreciate your response on this, since I have not been able to
>> gather much help from other resources online. Would be great if you can let
>> me know what the issue here could be, let me know if you need to know
>> anything else as well !
>>
>> Cheers
>>
>>
>> On Tue, May 17, 2022 at 12:34 AM Danny Cranmer 
>> wrote:
>>
>>&g

Could not copy native libraries - Permission denied

2022-05-18 Thread Zain Haider Nemati
Hi,
We are using flink version 1.13 with a kafka source and a kinesis sink with
a parallelism of 3.
On submitting the job I get this error

 Could not copy native binaries to temp directory
/tmp/amazon-kinesis-producer-native-binaries
Followed by permission denied even though all the permissions have been
provided and is being run as root user. What could be causing this?


Memory configuration for Queue

2022-05-17 Thread Zain Haider Nemati
Hi,
I am using a kafka source with a kinesis sink and the speed of data coming
in is not the same as data flowing out hence the need to configure a
relatively larger queue to hold the data before backpressuring. Which
memory configuration corresponds to this that I'll need to configure?


Metrics in Flink UI

2022-05-17 Thread Zain Haider Nemati
Hi,
I'm running a job on a local flink cluster but metrics are showing as Bytes
received,records received,bytes sent,backpressure all 0 in the flink UI
even though I'm receiving data in the sink.
Do I need to additionally configure something to see these metrics work in
real time?


Channel became inactive while submitting job

2022-05-17 Thread Zain Haider Nemati
Hi,
I am trying to run a job in my local cluster and facing this issue.


org.apache.flink.client.program.ProgramInvocationException: The main method
caused an error: Failed to execute job 'Tracer Processor'.
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)
at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
at
org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
at
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)
at
org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)
at
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1730)
at
org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at
org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)
*Caused by: org.apache.flink.util.FlinkException: Failed to execute job
'Tracer Processor'.*
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1970)
at
org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:135)
at
org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:76)
at
*org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1834)*
at basicpackage.StreamingJob.main(StreamingJob.java:284)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
... 11 more
*Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed
to submit JobGraph.*
at
org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$9(RestClusterClient.java:405)
at
java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:884)
at
java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:866)
at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
at
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990)
at
org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$9(FutureUtils.java:390)
at
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
at
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
at
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990)
at
org.apache.flink.runtime.rest.RestClient$ClientHandler.channelInactive(RestClient.java:588)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241)
at
org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:81)
at
org.apache.flink.shaded.netty4.io.netty.handler.timeout.IdleStateHandler.channelInactive(IdleStateHandler.java:277)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241)
at
org.apache.flink.shaded.netty4.io.netty.handler.stream.ChunkedWriteHandler.channelInactive(ChunkedWriteHandler.java:138)
at

Kinesis Sink - Data being received with intermittent breaks

2022-05-15 Thread Zain Haider Nemati
Hi,
Im fetching data from kafka topics converting them to chunks of <= 1MB and
sinking them to a kinesis data stream.
The streaming job is functional however I see bursts of data in kinesis
stream with intermittent dips where data received is 0. I'm attaching the
configuration parameters for kinesis sink. What could be the cause of this
issue?
The data is being fed into datastream by a kafka topic which is being fed
in by a mongodb and has about 60 million records which are loaded fully.
I am trying to configure parameters in such a way that the 1MB per data
payload limit of kinesis is not breached. Would appreciate help on this !

producerConfig.put(AWSConfigConstants.AWS_REGION, “xxx”);

producerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, “xxx");

producerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, “xxx”);
producerConfig.put(“AggregationMaxCount”, “3”);
producerConfig.put(“AggregationMaxSize”, “256”);
producerConfig.put(“CollectionMaxCount”, “3”);
producerConfig.put(“CollectionMaxSize”, “10”);
producerConfig.put(“AggregationEnabled”, true);
producerConfig.put(“RateLimit”, “50");
producerConfig.put(“RecordMaxBufferedTime”, “1000");
producerConfig.put(“ThreadingModel”, “POOLED”);
FlinkKinesisProducer kinesis = new
FlinkKinesisProducer<>(new SimpleStringSchema(), producerConfig);
kinesis.setFailOnError(false);
kinesis.setDefaultStream(“xxx”);
kinesis.setDefaultPartition(“0");
kinesis.setQueueLimit(1000);

*Data in Kinesis :*
[image: image.png]


taskexecutor .out files

2022-05-15 Thread Zain Haider Nemati
Hi,
I have been running a streaming job which prints data to .out files the
size of the file has gotten really large and is choking the root memory for
my VM. Is it ok to delete the .out files? Would that affect any other
operation or functionality?


Split string flatmap -> Arraylist/List

2022-05-13 Thread Zain Haider Nemati
Hi,
I have a comma separated string of the format
x,y,z \n
a,b,c ...

I want to split the string on the basis of '\n' and insert each string into
the data stream separately using *flatmap*. Any example on how to do that?
If I add the chunks into an Arralist or List and return that from the
flatmap function would that work?


Re: Batching in kinesis sink

2022-05-12 Thread Zain Haider Nemati
Thanks for your response ! Much appreciated

On Thu, May 12, 2022 at 5:51 PM Teoh, Hong  wrote:

> Hi Zain,
>
> For Flink 1.13, we use the KinesisProducerLibrary. If you are using
> aggregation, you can control the maximum size of aggregated records by
> configuring the AggregationMaxSize in the producer config when constructing
> the FlinkKinesisProducer. (See [1] for more docs)
>
> producerConfig.put("AggregationMaxSize", "1048576”);
>
>
> However, since the default value is actually <1MB here, I doubt this is
> the issue. A possibility I can think of is that a single record is larger
> than 1MB, so the aggregation limit doesn’t apply. If this is the case,
> the way forward would be to change the record size to be lower than 1MB.
>
> In general, I would recommend upgrading to Flink 1.15 and using the newer
> KinesisStreamsSink. That sink is more configurable (see below and see [2]
> for more docs), and will surface the problem explicitly if the issue is
> really that a single record is larger than 1MB.
>
> (Note that we use the PutRecords API, so individual records still need to
> be smaller than 1MB, but batches can be up to 5MB) See [3] for more info.
>
> .setMaxBatchSizeInBytes(5 * 1024 * 1024)  
>  .setMaxRecordSizeInBytes(1 * 1024 * 1024)
>
>
>
> Thanks,
> Hong
>
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/datastream/kinesis/#kinesis-producer
> [2]
> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kinesis/#kinesis-streams-sink
> [3]
> https://docs.aws.amazon.com/streams/latest/dev/service-sizes-and-limits.html
>
>
>
>
> On 2022/05/12 10:30:47 Zain Haider Nemati wrote:
> > Hi,
> > I am using a kinesis sink with flink 1.13.
> > The amount of data is in millions and it choke the 1MB cap for kinesis
> data
> > streams.
> > Is there any way to send data to kinesis sink in batches of less than
> 1MB?
> > or any other workaround
> >
>


Batching in kinesis sink

2022-05-12 Thread Zain Haider Nemati
Hi,
I am using a kinesis sink with flink 1.13.
The amount of data is in millions and it choke the 1MB cap for kinesis data
streams.
Is there any way to send data to kinesis sink in batches of less than 1MB?
or any other workaround


Re: Incompatible data types while using firehose sink

2022-05-12 Thread Zain Haider Nemati
Hi, Appreciate your response.
My flink version is 1.13.
Is there any other way to sink data to kinesis without having to update to
1.15

On Thu, May 12, 2022 at 12:25 PM Martijn Visser 
wrote:

> I'm guessing this must be Flink 1.15 since Firehose was added in that
> version :)
>
> On Thu, 12 May 2022 at 08:41, yu'an huang  wrote:
>
>> Hi,
>>
>> Your code is working fine in my computer. What is the Flink version you
>> are using.
>>
>>
>>
>>
>> On 12 May 2022, at 3:39 AM, Zain Haider Nemati 
>> wrote:
>>
>> Hi Folks,
>> Getting this error when sinking data to a firehosesink, would really
>> appreciate some help !
>>
>> DataStream inputStream = env.addSource(new
>> FlinkKafkaConsumer<>("xxx", new SimpleStringSchema(), properties));
>>
>> Properties sinkProperties = new Properties();
>> sinkProperties.put(AWSConfigConstants.AWS_REGION, "xxx");
>> sinkProperties.put(AWSConfigConstants.AWS_ACCESS_KEY_ID,
>> "xxx");
>> sinkProperties.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY,
>> "xxx");
>> KinesisFirehoseSink kdfSink =
>> KinesisFirehoseSink.builder()
>> .setFirehoseClientProperties(sinkProperties)
>> .setSerializationSchema(new SimpleStringSchema())
>> .setDeliveryStreamName("xxx")
>> .setMaxBatchSize(350)
>> .build();
>>
>> inputStream.sinkTo(kdfSink);
>>
>> incompatible types:
>> org.apache.flink.connector.firehose.sink.KinesisFirehoseSink
>> cannot be converted to
>> org.apache.flink.api.connector.sink.Sink
>>
>>
>>


Incompatible data types while using firehose sink

2022-05-11 Thread Zain Haider Nemati
Hi Folks,
Getting this error when sinking data to a firehosesink, would really
appreciate some help !

DataStream inputStream = env.addSource(new
FlinkKafkaConsumer<>("xxx", new SimpleStringSchema(), properties));

Properties sinkProperties = new Properties();

sinkProperties.put(AWSConfigConstants.AWS_REGION, "xxx");

sinkProperties.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "xxx");

sinkProperties.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY,
"xxx");

KinesisFirehoseSink kdfSink =
KinesisFirehoseSink.builder()

.setFirehoseClientProperties(sinkProperties)

.setSerializationSchema(new SimpleStringSchema())

.setDeliveryStreamName("xxx")

.setMaxBatchSize(350)

.build();

inputStream.sinkTo(kdfSink);


incompatible types:
org.apache.flink.connector.firehose.sink.KinesisFirehoseSink
cannot be converted to
org.apache.flink.api.connector.sink.Sink


Incompatible data types while using firehose sink

2022-05-11 Thread Zain Haider Nemati
Hi Folks,
Getting this error when sinking data to a firehosesink, would really
appreciate some help !

DataStream inputStream = env.addSource(new
FlinkKafkaConsumer<>("xxx", new SimpleStringSchema(), properties));

Properties sinkProperties = new Properties();

sinkProperties.put(AWSConfigConstants.AWS_REGION, "xxx");

sinkProperties.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "xxx");

sinkProperties.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY,
"xxx");

KinesisFirehoseSink kdfSink =
KinesisFirehoseSink.builder()

.setFirehoseClientProperties(sinkProperties)

.setSerializationSchema(new SimpleStringSchema())

.setDeliveryStreamName("xxx")

.setMaxBatchSize(350)

.build();

inputStream.sinkTo(kdfSink);


incompatible types:
org.apache.flink.connector.firehose.sink.KinesisFirehoseSink
cannot be converted to
org.apache.flink.api.connector.sink.Sink


Pyflink -> Redshift/S3/Firehose

2022-05-02 Thread Zain Haider Nemati
Hi,
I am working on writing a flink processor which has to send transformed
data to redshift/S3.
I do not find any sort of documentation for pyflink in reference to how to
send data to firehose,s3 or redshift. Would appreciate some help here.