Re: BlinkPlanner limitation related clarification

2020-01-23 Thread Jingsong Li
Hi RKandoji,

IMO, yes, you can not reuse table env, you should create a new tEnv after
executing, 1.9.1 still has this problem.
Related issue is [1], fixed in 1.9.2 and 1.10.

[1] https://issues.apache.org/jira/browse/FLINK-13708

Best,
Jingsong Lee

On Fri, Jan 24, 2020 at 11:14 AM RKandoji  wrote:

> Hi Team,
>
> I've been using Blink Planner and just came across this page
> https://ci.apache.org/projects/flink/flink-docs-stable/release-notes/flink-1.9.html#known-shortcomings-or-limitations-for-new-features
>  and
> saw below limitation:
>
> Due to a bug with how transformations are not being cleared on execution,
>> TableEnvironment instances should not be reused across multiple SQL
>> statements when using the Blink planner.
>
>
> In my code I've created a StreamTableEnvironment (like shown below) and
> reusing this instance everywhere for registering data streams, registering
> tables and performing multiple SQL queries. So I'm a bit concerned if I
> need to change anything? Would above limitation affect StreamTableEnvironment
> as well?
>
> private static StreamTableEnvironment bsTableEnv = 
> StreamTableEnvironment.create(bsEnv,
> bsSettings);
>
> Could someone please clarify and provide more details about the
> implications.
>
>
> Thanks,
> RKandoji
>


-- 
Best, Jingsong Lee


BlinkPlanner limitation related clarification

2020-01-23 Thread RKandoji
Hi Team,

I've been using Blink Planner and just came across this page
https://ci.apache.org/projects/flink/flink-docs-stable/release-notes/flink-1.9.html#known-shortcomings-or-limitations-for-new-features
and
saw below limitation:

Due to a bug with how transformations are not being cleared on execution,
> TableEnvironment instances should not be reused across multiple SQL
> statements when using the Blink planner.


In my code I've created a StreamTableEnvironment (like shown below) and
reusing this instance everywhere for registering data streams, registering
tables and performing multiple SQL queries. So I'm a bit concerned if I
need to change anything? Would above limitation affect StreamTableEnvironment
as well?

private static StreamTableEnvironment bsTableEnv =
StreamTableEnvironment.create(bsEnv,
bsSettings);

Could someone please clarify and provide more details about the
implications.


Thanks,
RKandoji


How to debug a job stuck in a deployment/run loop?

2020-01-23 Thread Jason Kania
I am attempting to migrate from 1.7.1 to 1.9.1 and I have hit a problem where 
previously working jobs can no longer launch after being submitted. In the UI, 
the submitted jobs show up as deploying for a period, then go into a run state 
before returning to the deploy state and this repeats regularly with the job 
bouncing between states. No exceptions or errors are visible in the logs. There 
is no data coming in for the job to process and the kafka queues are empty.
If I look at the thread activity of the task manager running the job in top, I 
see that the busiest threads are flink-akka threads, sometimes jumping to very 
high CPU numbers. That is all I have for info.
Any suggestions on how to debug this? I can set break points and connect if 
that helps, just not sure at this point where to start.
Thanks,
Jason

FileStreamingSink is using the same counter for different files

2020-01-23 Thread Pawel Bartoszek
Hi,


Flink Streaming Sink is designed to use global counter when creating files
to avoid overwrites. I am running Flink 1.8.2 with Kinesis Analytics
(managed flink provided by AWS) with bulk writes (rolling policy is
hardcoded to roll over on checkpoint).
My job is configured to checkpoint every minute. Job is running with
parallelism 1.

The problem is that the same counter 616 is used for both
files invalid-records/2020-01-22T15_06_00Z/part-0-616
and invalid-records/2020-01-22T15_05_00Z/part-0-616.

15:06:37
{ "locationInformation":
"org.apache.flink.fs.s3.common.writer.S3Committer.commit(S3Committer.java:64)",
"logger": "org.apache.flink.fs.s3.common.writer.S3Committer",
"message": "Committing
invalid-records/2020-01-22T15_06_00Z/part-0-616 with MPU ID
f7PQc2D82.kKaDRS.RXYYS8AkLd5q_9ogw3WZJJg2KGABhYgjtv.eJbqQ_UwpzciYb.TDTIkixulkmaTMyyuwmr6c5eC61aenoo2m4cj7wAT9v0JXB3i6gBArw.HpSLxpUBTEW6PT3aN9XKPZmT2kg--",
"threadName": "Async calls on Source: Custom Source -> Extract Td-agent
message -> To DataRecord -> Find invalid records -> Sink: Unnamed (1/1)",
"applicationARN":
"arn:aws:kinesisanalytics:eu-west-1:679291754673:application/pawel",
"applicationVersionId": "33", "messageSchemaVersion": "1", "messageType":
"INFO"}
}
15:07:37
{ "locationInformation":
"org.apache.flink.fs.s3.common.writer.S3Committer.commit(S3Committer.java:64)",
"logger": "org.apache.flink.fs.s3.common.writer.S3Committer",
"message": "Committing
invalid-records/2020-01-22T15_05_00Z/part-0-616 with MPU ID
XoliYkdvP1Cc3gePyteIGhTqF1LrID8rEyddaPXRNPQYkWDNKpDF0tnYuhDBqywAqCWf4nJTOJ2Kx_a_91KTyVTvZ7GkKs25nseGs4jDR6Y5Nxuai47aKNeWeS4bs9imMJ1iAxbd7lRQyxnM5qwDeA--",
"threadName": "Async calls on Source: Custom Source -> Extract Td-agent
message -> To DataRecord -> Find invalid records -> Sink: Unnamed (1/1)",
"applicationARN":
"arn:aws:kinesisanalytics:eu-west-1:679291754673:application/pawel",
"applicationVersionId": "33", "messageSchemaVersion": "1", "messageType":
"INFO" }

Thanks,
Pawel


Re: Location of flink-s3-fs-hadoop plugin (Flink 1.9.0 on EMR)

2020-01-23 Thread Aaron Langford
When creating your cluster, you can provide configurations that EMR will
find the right home for. Example for the aws cli:

aws emr create-cluster ... --configurations '[{
> "Classification": "flink-log4j",
> "Properties": {
>   "log4j.rootLogger": "DEBUG,file"
> }
>   },{
> "Classification": "flink-log4j-yarn-session",
> "Properties": {
>   "log4j.rootLogger": "DEBUG,stdout"
>   }]'


If you can't take down your existing EMR cluster for some reason, you can
ask AWS to modify these configurations for you on the cluster. They should
take effect when you start a new Flink job (new job manager as well as a
new job in that job manager). It is my understanding that configuration
changes require a restart of a flink jobmanager + topology in order to take
effect. Here's an example of how to modify an existing cluster (I just
threw this together, so beware malformed JSON):

aws emr modify-instance-groups --cli-input-json '{
> "ClusterId": "",
> "InstanceGroups": [{
> "InstanceGroupId": "",
> "Configurations": [{
> "Classification": "flink-log4j",
> "Properties": {
> "log4j.rootLogger": "DEBUG,file"
> }
> },{
> "Classification": "flink-log4j-yarn-session",
> "Properties": {
> "log4j.rootLogger": "DEBUG,stdout"
> }
> }]
> },{
> "InstanceGroupId": "",
> "Configurations": [{
> "Classification": "flink-log4j",
> "Properties": {
> "log4j.rootLogger": "DEBUG,file"
> }
> },{
> "Classification": "flink-log4j-yarn-session",
> "Properties": {
> "log4j.rootLogger": "DEBUG,stdout"
> }
> }]
>  }]
> }'


On Thu, Jan 23, 2020 at 11:03 AM Senthil Kumar  wrote:

> Could you tell us how to turn on debug level logs?
>
>
>
> We attempted this (on driver)
>
>
>
> sudo stop hadoop-yarn-resourcemanager
>
>
>
> followed the instructions here
>
>
> https://stackoverflow.com/questions/27853974/how-to-set-debug-log-level-for-resourcemanager
>
>
>
> and
>
>
>
> sudo start hadoop-yarn-resourcemanager
>
>
>
> but we still don’t see any debug level logs
>
>
>
> Any further info is much appreciated!
>
>
>
>
>
> *From: *Aaron Langford 
> *Date: *Tuesday, January 21, 2020 at 10:54 AM
> *To: *Senthil Kumar 
> *Cc: *Yang Wang , "user@flink.apache.org" <
> user@flink.apache.org>
> *Subject: *Re: Location of flink-s3-fs-hadoop plugin (Flink 1.9.0 on EMR)
>
>
>
> Senthil,
>
>
>
> One of the key steps in debugging this for me was enabling debug level
> logs on my cluster, and then looking at the logs in the resource manager.
> The failure you are after happens before the exceptions you have reported
> here. When your Flink application is starting, it will attempt to load
> various file system implementations. You can see which ones it successfully
> loaded when you have the debug level of logs configured. You will have to
> do some digging, but this is a good place to start. Try to discover if your
> application is indeed loading the s3 file system, or if that is not
> happening. You should be able to find the file system implementations that
> were loaded by searching for the string "Added file system".
>
>
>
> Also, do you mind sharing the bootstrap action script that you are using
> to get the s3 file system in place?
>
>
>
> Aaron
>
>
>
> On Tue, Jan 21, 2020 at 8:39 AM Senthil Kumar 
> wrote:
>
> Yang, I appreciate your help! Please let me know if I can provide with any
> other info.
>
>
>
> I resubmitted my executable jar file as a step to the flink EMR and here’s
> are all the  exceptions. I see two of them.
>
>
>
> I fished them out of /var/log/Hadoop//syslog
>
>
>
> 2020-01-21 16:31:37,587 ERROR
> org.apache.flink.streaming.runtime.tasks.StreamTask (Split Reader: Custom
> File Source -> Sink: Unnamed (11/16)): Error during di
>
> sposal of stream operator.
>
> java.lang.NullPointerException
>
> at
> org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator.dispose(ContinuousFileReaderOperator.java:165)
>
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:582)
>
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:481)
>
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
>
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>
> at java.lang.Thread.run(Thread.java:748)
>
>
>
>
>
> 2020-01-21 16:31:37,591 INFO org.apache.flink.runtime.taskmanager.Task
> (Split Reader: Custom File Source -> Sink: Unnamed (8/16)): Split Reader:
> Custom File Source -> Sink: Unnamed (8/16)
> (865a5a078c3e40cbe2583afeaa0c601e) switched from RUNNING to FAILED.
>
> java.lang.UnsupportedOperationException: Recoverable writers on Hadoop
> are only supported for HDFS and for Hadoop version 2.7 or

Re: Location of flink-s3-fs-hadoop plugin (Flink 1.9.0 on EMR)

2020-01-23 Thread Senthil Kumar
Could you tell us how to turn on debug level logs?

We attempted this (on driver)

sudo stop hadoop-yarn-resourcemanager

followed the instructions here
https://stackoverflow.com/questions/27853974/how-to-set-debug-log-level-for-resourcemanager

and

sudo start hadoop-yarn-resourcemanager

but we still don’t see any debug level logs

Any further info is much appreciated!


From: Aaron Langford 
Date: Tuesday, January 21, 2020 at 10:54 AM
To: Senthil Kumar 
Cc: Yang Wang , "user@flink.apache.org" 

Subject: Re: Location of flink-s3-fs-hadoop plugin (Flink 1.9.0 on EMR)

Senthil,

One of the key steps in debugging this for me was enabling debug level logs on 
my cluster, and then looking at the logs in the resource manager. The failure 
you are after happens before the exceptions you have reported here. When your 
Flink application is starting, it will attempt to load various file system 
implementations. You can see which ones it successfully loaded when you have 
the debug level of logs configured. You will have to do some digging, but this 
is a good place to start. Try to discover if your application is indeed loading 
the s3 file system, or if that is not happening. You should be able to find the 
file system implementations that were loaded by searching for the string "Added 
file system".

Also, do you mind sharing the bootstrap action script that you are using to get 
the s3 file system in place?

Aaron

On Tue, Jan 21, 2020 at 8:39 AM Senthil Kumar 
mailto:senthi...@vmware.com>> wrote:
Yang, I appreciate your help! Please let me know if I can provide with any 
other info.

I resubmitted my executable jar file as a step to the flink EMR and here’s are 
all the  exceptions. I see two of them.

I fished them out of /var/log/Hadoop//syslog


2020-01-21 16:31:37,587 ERROR 
org.apache.flink.streaming.runtime.tasks.StreamTask (Split Reader: Custom File 
Source -> Sink: Unnamed (11/16)): Error during di

sposal of stream operator.

java.lang.NullPointerException

at 
org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator.dispose(ContinuousFileReaderOperator.java:165)

at 
org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:582)

at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:481)

at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)

at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)

at java.lang.Thread.run(Thread.java:748)



2020-01-21 16:31:37,591 INFO org.apache.flink.runtime.taskmanager.Task (Split 
Reader: Custom File Source -> Sink: Unnamed (8/16)): Split Reader: Custom File 
Source -> Sink: Unnamed (8/16) (865a5a078c3e40cbe2583afeaa0c601e) switched from 
RUNNING to FAILED.

java.lang.UnsupportedOperationException: Recoverable writers on Hadoop are only 
supported for HDFS and for Hadoop version 2.7 or newer

at 
org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter.(HadoopRecoverableWriter.java:57)

at 
org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.createRecoverableWriter(HadoopFileSystem.java:202)

at 
org.apache.flink.core.fs.SafetyNetWrapperFileSystem.createRecoverableWriter(SafetyNetWrapperFileSystem.java:69)

at 
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.(Buckets.java:112)

at 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink$RowFormatBuilder.createBuckets(StreamingFileSink.java:242)

at 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:327)

at 
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)

at 
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)

at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)

at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:281)

at 
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:878)

at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:392)

at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)

at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)

at java.lang.Thread.run(Thread.java:748)


From: Yang Wang mailto:danrtsey...@gmail.com>>
Date: Saturday, January 18, 2020 at 7:58 PM
To: Senthil Kumar mailto:senthi...@vmware.com>>
Cc: "user@flink.apache.org" 
mailto:user@flink.apache.org>>
Subject: Re: Location of flink-s3-fs-hadoop plugin (Flink 1.9.0 on EMR)

I think this exception is not because the hadoop version isn't high enough.
It seems that the "s3" URI scheme could not be recognized b

PostgreSQL JDBC connection drops after inserting some records

2020-01-23 Thread Soheil Pourbafrani
Hi,
I have a peace of Flink Streaming code that reads data from files and
inserts them into the PostgreSQL table. After inserting 6 to 11 million
records, I got the following errors:























*Caused by: java.lang.RuntimeException: Execution of JDBC statement failed.
at
org.apache.flink.api.java.io.jdbc.JDBCOutputFormat.flush(JDBCOutputFormat.java:219)
at
org.apache.flink.api.java.io.jdbc.JDBCOutputFormat.writeRecord(JDBCOutputFormat.java:210)
at
org.apache.flink.api.java.io.jdbc.JDBCOutputFormat.writeRecord(JDBCOutputFormat.java:41)
at
org.apache.flink.streaming.api.functions.sink.OutputFormatSinkFunction.invoke(OutputFormatSinkFunction.java:86)
at
org.apache.flink.streaming.api.functions.sink.SinkFunction.invoke(SinkFunction.java:52)
at
org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
... 15 moreCaused by: java.sql.BatchUpdateException: Batch entry 0 INSERT
INTO csv_data(asset, tag, t, q, v, backfill, createdAt, createdBy) VALUES
('SST', 'XC_XC', '2015-04-11 21:36:23+03', 12.0, '1.00', 'FALSE',
'2020-01-23 19:22:14.469+03', 'system') ON CONFLICT DO NOTHING was aborted:
An I/O error occurred while sending to the backend.  Call getNextException
to see other errors in the batch. at
org.postgresql.jdbc.BatchResultHandler.handleError(BatchResultHandler.java:148)
at
org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:515)
at org.postgresql.jdbc.PgStatement.executeBatch(PgStatement.java:853) at
org.postgresql.jdbc.PgPreparedStatement.executeBatch(PgPreparedStatement.java:1546)
at
org.apache.flink.api.java.io.jdbc.JDBCOutputFormat.flush(JDBCOutputFormat.java:216)
... 21 moreCaused by: org.postgresql.util.PSQLException: An I/O error
occurred while sending to the backend. at
org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:516)
... 24 moreCaused by: java.io.EOFException at
org.postgresql.core.PGStream.receiveChar(PGStream.java:337) at
org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2000)*
at
org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:510)
... 24 more

However as I enabled the Restart Strategy, the app will automatically be
restarted and reconnect to the database.
My code simply reads data from files and after transforming them into the
table schema, insert the rows into the table.

It would be great if anyone can help me with this
Thanks


Re: where does flink store the intermediate results of a join and what is the key?

2020-01-23 Thread Benoît Paris
Hi all!

@Jark, out of curiosity, would you be so kind as to expand a bit on "Query
on the intermediate state is on the roadmap"?
Are you referring to working on QueryableStateStream/QueryableStateClient
[1], or around "FOR SYSTEM_TIME AS OF" [2], or on other APIs/concepts (is
there a FLIP?)?

Cheers
Ben

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/queryable_state.html#querying-state
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/streaming/temporal_tables.html#temporal-table


On Thu, Jan 23, 2020 at 6:40 AM kant kodali  wrote:

> Is it a common practice to have a custom state backend? if so, what would
> be a popular custom backend?
>
> Can I do Elasticseatch as a state backend?
>
> Thanks!
>
> On Wed, Jan 22, 2020 at 1:42 AM Jark Wu  wrote:
>
>> Hi Kant,
>>
>> 1) List of row is also sufficient in this case. Using a MapState is in
>> order to retract a row faster, and save the storage size.
>>
>> 2) State Process API is usually used to process save point. I’m afraid
>> the performance is not good to use it for querying.
>> On the other side, AFAIK, State Process API requires the uid of
>> operator. However, uid of operators is not set in Table API & SQL.
>> So I’m not sure whether it works or not.
>>
>> 3)You can have a custom statebackend by
>> implement org.apache.flink.runtime.state.StateBackend interface, and use it
>> via `env.setStateBackend(…)`.
>>
>> Best,
>> Jark
>>
>> On Wed, 22 Jan 2020 at 14:16, kant kodali  wrote:
>>
>>> Hi Jark,
>>>
>>> 1) shouldn't it be a col1 to List of row? multiple rows can have the
>>> same joining key right?
>>>
>>> 2) Can I use state processor API
>>> 
>>> from an external application to query the intermediate results in near
>>> real-time? I thought querying rocksdb state is a widely requested feature.
>>> It would be really great to consider this feature for 1.11
>>>
>>> 3) Is there any interface where I can implement my own state backend?
>>>
>>> Thanks!
>>>
>>>
>>> On Tue, Jan 21, 2020 at 6:08 PM Jark Wu  wrote:
>>>
 Hi Kant,

 1) Yes, it will be stored in rocksdb statebackend.
 2) In old planner, the left state is the same with right state which
 are both `>>`.
 It is a 2-level map structure, where the `col1` is the join key, it
 is the first-level key of the state. The key of the MapState is the input
 row,
 and the `count` is the number of this row, the expiredTime
 indicates when to cleanup this row (avoid infinite state size). You can
 find the source code here[1].
 In blink planner, the state structure will be more complex which is
 determined by the meta-information of upstream. You can see the source code
 of blink planner here [2].
 3) Currently, the intermediate state is not exposed to users. Usually,
 users should write the query result to an external system (like Mysql) and
 query the external system.
 Query on the intermediate state is on the roadmap, but I guess it
 is not in 1.11 plan.

 Best,
 Jark

 [1]:
 http://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/join/NonWindowJoin.scala#L61
 [2]:
 https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/stream/state/JoinRecordStateViews.java#L45


 2020年1月21日 18:01,kant kodali  写道:

 Hi All,

 If I run a query like this

 StreamTableEnvironment.sqlQuery("select * from table1 join table2 on
 table1.col1 = table2.col1")

 1) Where will flink store the intermediate result? Imagine
 flink-conf.yaml says state.backend = 'rocksdb'

 2) If the intermediate results are stored in rockdb then what is the
 key and value in this case(given the query above)?

 3) What is the best way to query these intermediate results from an
 external application? while the job is running and while the job is not
 running?

 Thanks!




-- 
Benoît Paris
Ingénieur Machine Learning Explicable
Tél : +33 6 60 74 23 00
http://benoit.paris
http://explicable.ml


Re: REST rescale with Flink on YARN

2020-01-23 Thread Vasily Melnik
Hi all,
I've found some solution for this issue.
Problem is that with YARN ApplicationMaster URL we communicate with
JobManager via proxy which is implemented on Jetty 6 (for Hadoop 2.6).
So to use PATCH method we need to locate original JobManager URL.
Using /jobmanager/config API we could get only host, but web.port is
displayed as 0 (???)
To find actual web port, we should parse YARN logs for jobmanager, where we
can find something like this:

*INFO org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint - Rest
endpoint listening at :.*

Maybe  someone knows less complicated way to find actual REST URL under
YARN?




С уважением,
Василий Мельник


On Thu, 23 Jan 2020 at 15:32, Chesnay Schepler  wrote:

> Older versions of Jetty don't support PATCH requests. You will either have
> to update it or create a custom Flink version that uses POST for the
> rescale operation.
>
> On 23/01/2020 13:23, Vasily Melnik wrote:
>
> Hi all.
> I'm using Flink 1.8 on YARN with CDH 5.12
> When i try to perform rescale request:
>
> curl -v -X PATCH 
> '/proxy/application_1576854986116_0079/jobs/11dcfc3163936fc019e049fc841b075b/rescaling?parallelism=3
>  
> '
>
> i get a mistake:
>
> *Method PATCH is not defined in RFC 2068 and is not supported by the
> Servlet API *GET and POST methods work well.
> The Server type in response is Jetty(6.1.26.cloudera.4).
>
> How can i deal with this situation?
>
> С уважением,
> Василий Мельник
>
>
>


Re: Old offsets consumed from Kafka after Flink upgrade to 1.9.1 (from 1.2.1)

2020-01-23 Thread Tzu-Li (Gordon) Tai
Hi Somya,

I'll have to take a closer look at the JIRA history to refresh my memory on
potential past changes that caused this.

My first suspection is this:
It is expected that the Kafka consumer will *ignore* the configured startup
position if the job was restored from a savepoint.
It will always use the offsets that were persisted at the time of the
savepoint.
Would this probably already explain what you are seeing?

What I'm not sure of yet is whether this was a behavioural change that
occurred between versions 1.2.x and 1.3.x or later versions.
I'll take a closer look once I'm back from travelling tomorrow and get back
to you on that.

Cheers,
Gordon

On Thu, Jan 23, 2020, 7:52 PM Chesnay Schepler  wrote:

> @gordon Do you remember whether we changed any behavior of the Kafka 0.10
> consumer after 1.3.3?
>
> On 23/01/2020 12:02, Somya Maithani wrote:
>
> Hey,
>
> Any ideas about this? We are blocked on the upgrade because we want async
> timer checkpointing.
>
> Regards,
>
> Somya Maithani
> Software Developer II
> Helpshift Pvt Ltd
>
>
> On Fri, Jan 17, 2020 at 10:37 AM Somya Maithani 
> wrote:
>
>> Hey Team,
>>
>> *Problem*
>> Recently, we were trying to upgrade Flink infrastructure to version 1.9.1
>> and we noticed that a week old offset was consumed from Kafka even though
>> the configuration says latest.
>>
>> *Pretext*
>> 1. Our current Flink version in production is 1.2.1.
>> 2. We use RocksDB + Hadoop as our backend / checkpointing data store.
>> 3. We consume and produce messages to / from Kafka.
>>
>> *Release Plan*
>> 1. Upgrade Flink 1.2.1 to 1.3.
>> 2. Upgrade Flink 1.3.3 to 1.9.1
>> Note: We have a transitioning version (1.3.3) because of the
>> serialisation change in checkpointing.
>>
>> After performing step 1, the service was consuming latest Kafka events
>> but after performing step 2 we noticed that the service was consuming one
>> week old Kafka messages from the source topic. We did not see any
>> exceptions but since the number of messages consumed increased a lot for
>> our Flink infrastructure, our task managers started crashing eventually.
>>
>> We did not change Kafka configuration in the service for the upgrade but
>> we did upgrade the Flink dependencies for Kafka.
>>
>> Old dependency:
>>
>> 
>>>   org.apache.flink
>>>   flink-streaming-java_2.10
>>>   ${flink.version}
>>> 
>>> 
>>>   org.apache.flink
>>>   flink-clients_2.10
>>>   ${flink.version}
>>> 
>>> 
>>>   org.apache.flink
>>>   flink-connector-kafka-0.10_2.10
>>>   ${flink.version}
>>> 
>>> 
>>>   org.apache.flink
>>>   flink-statebackend-rocksdb_2.10
>>>   ${flink.version}
>>> 
>>>
>>
>>
>> New dependency:
>>
>> 
>>>   org.apache.flink
>>>   flink-streaming-java_2.12
>>>   ${flink.version}
>>> 
>>> 
>>>   org.apache.flink
>>>   flink-clients_2.12
>>>   ${flink.version}
>>> 
>>> 
>>>   org.apache.flink
>>>   flink-connector-kafka-0.10_2.11
>>>   ${flink.version}
>>> 
>>> 
>>>   org.apache.flink
>>>   flink-statebackend-rocksdb_2.12
>>>   ${flink.version}
>>> 
>>>
>>
>>
>> Do we know why this would be happening?
>>
>> Regards,
>>
>> Somya Maithani
>> Software Developer II
>> Helpshift Pvt Ltd
>>
>
>


Re: REST rescale with Flink on YARN

2020-01-23 Thread Chesnay Schepler
Older versions of Jetty don't support PATCH requests. You will either 
have to update it or create a custom Flink version that uses POST for 
the rescale operation.


On 23/01/2020 13:23, Vasily Melnik wrote:

Hi all.
I'm using Flink 1.8 on YARN with CDH 5.12
When i try to perform rescale request:
curl -v -X PATCH 
'/proxy/application_1576854986116_0079/jobs/11dcfc3163936fc019e049fc841b075b/rescaling?parallelism=3
  
'
i get a mistake:
/Method PATCH is not defined in RFC 2068 and is not supported by the 
Servlet API

/GET and POST methods work well.
The Server type in response is Jetty(6.1.26.cloudera.4).

How can i deal with this situation?

С уважением,
Василий Мельник





REST rescale with Flink on YARN

2020-01-23 Thread Vasily Melnik
Hi all.
I'm using Flink 1.8 on YARN with CDH 5.12
When i try to perform rescale request:

curl -v -X PATCH
'/proxy/application_1576854986116_0079/jobs/11dcfc3163936fc019e049fc841b075b/rescaling?parallelism=3
'

i get a mistake:

*Method PATCH is not defined in RFC 2068 and is not supported by the
Servlet API*GET and POST methods work well.
The Server type in response is Jetty(6.1.26.cloudera.4).

How can i deal with this situation?

С уважением,
Василий Мельник


Re: Old offsets consumed from Kafka after Flink upgrade to 1.9.1 (from 1.2.1)

2020-01-23 Thread Chesnay Schepler
@gordon Do you remember whether we changed any behavior of the Kafka 
0.10 consumer after 1.3.3?


On 23/01/2020 12:02, Somya Maithani wrote:

Hey,

Any ideas about this? We are blocked on the upgrade because we want 
async timer checkpointing.


Regards,

Somya Maithani
Software Developer II
Helpshift Pvt Ltd


On Fri, Jan 17, 2020 at 10:37 AM Somya Maithani 
mailto:somyamaithan...@gmail.com>> wrote:


Hey Team,

*Problem*
Recently, we were trying to upgrade Flink infrastructure to
version 1.9.1 and we noticed that a week old offset was consumed
from Kafka even though the configuration says latest.

*Pretext*
1. Our current Flink version in production is 1.2.1.
2. We use RocksDB + Hadoop as our backend / checkpointing data store.
3. We consume and produce messages to / from Kafka.
*
*
*Release Plan*
1. Upgrade Flink 1.2.1 to 1.3.
2. Upgrade Flink 1.3.3 to 1.9.1
Note: We have a transitioning version (1.3.3) because of the
serialisation change in checkpointing.

After performing step 1, the service was consuming latest Kafka
events but after performing step 2 we noticed that the service was
consuming one week old Kafka messages from the source topic. We
did not see any exceptions but since the number of messages
consumed increased a lot for our Flink infrastructure, our task
managers started crashing eventually.

We did not change Kafka configuration in the service for the
upgrade but we did upgrade the Flink dependencies for Kafka.

Old dependency:


      org.apache.flink
flink-streaming-java_2.10
      ${flink.version}
    
    
      org.apache.flink
flink-clients_2.10
      ${flink.version}
    
    
      org.apache.flink
flink-connector-kafka-0.10_2.10
      ${flink.version}
    
    
      org.apache.flink
flink-statebackend-rocksdb_2.10
      ${flink.version}
    



New dependency:


      org.apache.flink
flink-streaming-java_2.12
      ${flink.version}
    
    
      org.apache.flink
flink-clients_2.12
      ${flink.version}
    
    
      org.apache.flink
flink-connector-kafka-0.10_2.11
      ${flink.version}
    
    
      org.apache.flink
flink-statebackend-rocksdb_2.12
      ${flink.version}
    


Do we know why this would be happening?

Regards,

Somya Maithani
Software Developer II
Helpshift Pvt Ltd





Re: batch job OOM

2020-01-23 Thread Jingsong Li
Fanbin,

I have no idea now, can you created a JIRA to track it? You can describe
complete SQL and some data informations.

Best,
Jingsong Lee

On Thu, Jan 23, 2020 at 4:13 PM Fanbin Bu  wrote:

> Jingsong,
>
> Do you have any suggestions to debug the above mentioned
> IndexOutOfBoundsException error?
> Thanks,
>
> Fanbin
>
> On Wed, Jan 22, 2020 at 11:07 PM Fanbin Bu  wrote:
>
>> I got the following error when running another job. any suggestions?
>>
>> Caused by: java.lang.IndexOutOfBoundsException
>> at
>> org.apache.flink.core.memory.MemorySegment.getInt(MemorySegment.java:701)
>> at org.apache.flink.table.dataformat.BinaryRow.getInt(BinaryRow.java:264)
>> at HashWinAggWithKeys$538.endInput(Unknown Source)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain.endInput(OperatorChain.java:276)
>> at
>> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.checkFinished(StreamOneInputProcessor.java:151)
>> at
>> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:138)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.performDefaultAction(StreamTask.java:276)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:298)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:403)
>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>> at java.lang.Thread.run(Thread.java:748)
>>
>> On Wed, Jan 22, 2020 at 8:57 PM Fanbin Bu  wrote:
>>
>>> Jingsong,
>>>
>>> I set the config value to be too large. After I changed it to a smaller
>>> number it works now!
>>> thanks you for the help. really appreciate it!
>>>
>>> Fanbin
>>>
>>> On Wed, Jan 22, 2020 at 8:50 PM Jingsong Li 
>>> wrote:
>>>
 Fanbin,

 Looks like your config is wrong, can you show your config code?

 Best,
 Jingsong Lee

 On Thu, Jan 23, 2020 at 12:41 PM Fanbin Bu 
 wrote:

> Jingsong,
>
> Great, now i got a different error:
>
> java.lang.NullPointerException: Initial Segment may not be null
>   at 
> org.apache.flink.runtime.memory.AbstractPagedOutputView.(AbstractPagedOutputView.java:65)
>   at 
> org.apache.flink.runtime.io.disk.SimpleCollectingOutputView.(SimpleCollectingOutputView.java:49)
>   at 
> org.apache.flink.table.runtime.operators.aggregate.BytesHashMap$RecordArea.(BytesHashMap.java:522)
>   at 
> org.apache.flink.table.runtime.operators.aggregate.BytesHashMap.(BytesHashMap.java:190)
>   at 
> org.apache.flink.table.runtime.operators.aggregate.BytesHashMap.(BytesHashMap.java:149)
>   at LocalHashWinAggWithKeys$292.open(Unknown Source)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:529)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:393)
>   at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>   at java.lang.Thread.run(Thread.java:748)
>
>
> is there any other config i should add?
>
> thanks,
>
> Fanbin
>
>
> On Wed, Jan 22, 2020 at 8:07 PM Fanbin Bu 
> wrote:
>
>> you beat me to it.
>> let's me try that.
>>
>> On Wed, Jan 22, 2020 at 7:57 PM Jingsong Li 
>> wrote:
>>
>>> Fanbin,
>>>
>>> Document is here:
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/config.html
>>> NOTE: you need configure this into TableConfig.
>>>
>>> Best,
>>> Jingsong Lee
>>>
>>> On Thu, Jan 23, 2020 at 11:54 AM Fanbin Bu 
>>> wrote:
>>>
 Jingsong,

 Thank you for the response.
 Since I'm using flink on EMR and the latest version is 1.9 now. the
 second option is ruled out. but will keep that in mind for future 
 upgrade.

 I'm going to try the first option. It's probably a good idea to add
 that in the doc for example:
 https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html

 Thanks,
 Fanbin

 On Wed, Jan 22, 2020 at 7:08 PM Jingsong Li 
 wrote:

> Hi Fanbin,
>
> Thanks for using blink batch mode.
>
> The OOM is caused by the manage memory not enough in Hash
> aggregation.
>
> There are three options you can choose from:
>
> 1.Is your version Flink 1.9? 1.9 still use fix memory
> configuration. So you need increase hash memory:
> - table.exec.resource.hash-agg.memory: 1024 mb
>
> 2.In 1.10, we use slot manage memory to dynamic config real
> operator memory, so operator can use more manage memory, so you don't 
> need
> configure h

Re: Usage of KafkaDeserializationSchema and KafkaSerializationSchema

2020-01-23 Thread Aljoscha Krettek

Hi,

the reason the new schema feels a bit weird is that it implements a new 
paradigm in a FlinkKafkaProducer that still follows a somewhat older 
paradigm. In the old paradigm, partitioning and topic where configured 
on the sink, which made it fixed for all produced records. The new 
schema allows setting the partition and topic dynamically, which, as I 
said, doesn't play well with the old way.


As Chesnay said, you can either pass the topic to the schema, to make it 
fixed for all records or encode it in the data and then forward it to 
the record.


Best,
Aljoscha

On 23.01.20 11:35, Chesnay Schepler wrote:
That's a fair question; the interface is indeed weird in this regard and 
does have some issues.


 From what I can tell you have 2 options:
a) have the user pass the topic to the serialization schema constructor, 
which in practice would be identical to the topic they pass to the 
producer.
b) Additionally implement KafkaContextAware, and have the schema 
determine the topic _somehow_.


Both options are quite "eh" from a usability perspective;
a) requires the user to pass the same information around to multiple 
places,
b) is inefficient since the topic has to be determined twice per record 
(once when KCA#getTargetTopic is called, once again when serialize is 
called), and can maybe(?) result in subtle issues if the 2 calls 
determine different topics.


The Table API version of the sink handles this better since it's 
serialization schema only returns a byte array, and not a producer record.


You could also use one of  the deprecated constructors that accept a 
"SerializationSchema" or "KeyedSerializationSchema" which handle this 
case better.


I've CC'd Aljoscha who was involved in the introduction of current 
iteration of the schema.


On 23/01/2020 03:13, Jason Kania wrote:

Thanks for responding.

I am aware where the topic is used. What I do not see is how to set 
the topic within the class that implements the
KafkaSerializationSchema.serialize(  T classObject, Long timestamp ) 
method.


The method must create and return a value of type 
ProducerRecord, but all the constructors for 
ProducerRecord expect "String topic" as the first argument. This will 
not be passed to the method so the question is where the 
implementation of the class is supposed to get the topic?


On Wednesday, January 22, 2020, 08:29:49 p.m. EST, David Magalhães 
 wrote:



Hi Jason,

The topic is used in *FlinkKafkaConsumer*, following the 
*KafkaDeserializationSchema* and then *Properties*.


https://ci.apache.org/projects/flink/flink-docs-release-1.9/api/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.html 



new FlinkKafkaConsumer(kafkaTopic, new MessageDeserializer, 
kafkaProperties)

...
class MessageDeserializer extends 
KafkaDeserializationSchema[GenericRecord] {




On Thu, Jan 23, 2020 at 1:20 AM Jason Kania > wrote:


    Hello,

    I was looking for documentation in 1.9.1 on how to create
    implementations of the KafkaSerializationSchema and
    KafkaDeserializationSchema interfaces. I have created
    implementations in the past for the SerializationSchema and
    DeserializationSchema interface. Unfortunately, I can find no
    examples and the code contains no documentation for this purpose
    but some information appears missing.

    Can someone please answer the following:

    1) When creating a ProducerRecord with the
    KafkaSerializationSchema.serialize() method, how is the topic
    String supposed to be obtained by the implementing class? All of
    the constructors require that the topic be specified, but the
    topic is not passed in. Is there another interface that should be
    implemented to get the topic or get a callback? Or is expected
    that the topic has to be fixed in the interface's implementation
    class? Some of the constructors also ask for a partition. Again,
    where is this information expected to come from?

    2) The interfaces specify that ConsumerRecord is
    received and ProducerRecord is to be generated.
    What are the 2 byte arrays referencing in the type definitions?

    Thanks,

    Jason






Re: Old offsets consumed from Kafka after Flink upgrade to 1.9.1 (from 1.2.1)

2020-01-23 Thread Somya Maithani
Hey,

Any ideas about this? We are blocked on the upgrade because we want async
timer checkpointing.

Regards,

Somya Maithani
Software Developer II
Helpshift Pvt Ltd


On Fri, Jan 17, 2020 at 10:37 AM Somya Maithani 
wrote:

> Hey Team,
>
> *Problem*
> Recently, we were trying to upgrade Flink infrastructure to version 1.9.1
> and we noticed that a week old offset was consumed from Kafka even though
> the configuration says latest.
>
> *Pretext*
> 1. Our current Flink version in production is 1.2.1.
> 2. We use RocksDB + Hadoop as our backend / checkpointing data store.
> 3. We consume and produce messages to / from Kafka.
>
> *Release Plan*
> 1. Upgrade Flink 1.2.1 to 1.3.
> 2. Upgrade Flink 1.3.3 to 1.9.1
> Note: We have a transitioning version (1.3.3) because of the serialisation
> change in checkpointing.
>
> After performing step 1, the service was consuming latest Kafka events but
> after performing step 2 we noticed that the service was consuming one week
> old Kafka messages from the source topic. We did not see any exceptions but
> since the number of messages consumed increased a lot for our Flink
> infrastructure, our task managers started crashing eventually.
>
> We did not change Kafka configuration in the service for the upgrade but
> we did upgrade the Flink dependencies for Kafka.
>
> Old dependency:
>
> 
>>   org.apache.flink
>>   flink-streaming-java_2.10
>>   ${flink.version}
>> 
>> 
>>   org.apache.flink
>>   flink-clients_2.10
>>   ${flink.version}
>> 
>> 
>>   org.apache.flink
>>   flink-connector-kafka-0.10_2.10
>>   ${flink.version}
>> 
>> 
>>   org.apache.flink
>>   flink-statebackend-rocksdb_2.10
>>   ${flink.version}
>> 
>>
>
>
> New dependency:
>
> 
>>   org.apache.flink
>>   flink-streaming-java_2.12
>>   ${flink.version}
>> 
>> 
>>   org.apache.flink
>>   flink-clients_2.12
>>   ${flink.version}
>> 
>> 
>>   org.apache.flink
>>   flink-connector-kafka-0.10_2.11
>>   ${flink.version}
>> 
>> 
>>   org.apache.flink
>>   flink-statebackend-rocksdb_2.12
>>   ${flink.version}
>> 
>>
>
>
> Do we know why this would be happening?
>
> Regards,
>
> Somya Maithani
> Software Developer II
> Helpshift Pvt Ltd
>


Re: Apache Flink - Sharing state in processors

2020-01-23 Thread Chesnay Schepler
1. a/b) No, they are deserialized into separate instances in any case 
and are independent afterwards.


2. a/b) No, see 1).

3. a/b) No, as individual tasks are isolated by different class-loaders.

On 23/01/2020 09:25, M Singh wrote:

Thanks Yun for your answers.

By processor I did mean user defined processor function. Keeping that 
in view, do you have any advice on how the shared state - ie, the 
parameters passed to the processor as mentioned above (not the key 
state or operator state) will be affected in a distributed runtime env ?


Mans

On Sunday, January 12, 2020, 09:51:10 PM EST, Yun Tang 
 wrote:



Hi Mans

What's the meaning of 'processor' you defined here? A user defined 
function?


When talking about share state, I'm afraid it's not so easy to 
implement in Flink. As no matter keyed state or operator state, 
they're both instantiated, used and only thread-safe in operator 
scope. The only way to read read-only state during runtime is via 
queryable state[1]


For the question of keyBy, the message would only sent to one of task 
in downstream according to the hashcode [2].


[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/queryable_state.html
[2] 
https://github.com/apache/flink/blob/7a6ca9c03f67f488e40a114e94c389a5cfb67836/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/KeyGroupStreamPartitioner.java#L58



Best
Yun Tang


*From:* M Singh 
*Sent:* Friday, January 10, 2020 23:29
*To:* User 
*Subject:* Apache Flink - Sharing state in processors
Hi:

I have a few question about how state is shared in processors in Flink.

1. If I have a processor instantiated in the Flink app, and apply use 
in multiple times in the Flink -
    (a) if the tasks are in the same slot - do they share the same 
processor on the taskmanager ?
    (b) if the tasks are on same node but different slots - do they 
share the same processor on the taskmanager ?


2. If I instantiate a single processor with local state and use it in 
multiple times in Flink
    (a) if the tasks are in the same slot - do they share the same 
processor and state on the taskmanager ?
    (b) if the tasks are on same node but different slots - do they 
share the same processor and state on the taskmanager ?


3. If I instantiate a multiple processors with shared collection and 
use it in multiple times in Flink
    (a) if the tasks are in the same slot - do they share the state on 
the taskmanager ?
    (b) if the tasks are on same node but different slots - do they 
share the state on the taskmanager ?


4. How do the above scenarios affect sharing
(a) operator state
(b) keyed state

5. If I have have a parallelism of > 1, and use keyBy - is each key 
handled by only one instance of the processor ?  I believe so, but 
wanted to confirm.



Thanks

Mans









Re: Usage of KafkaDeserializationSchema and KafkaSerializationSchema

2020-01-23 Thread Chesnay Schepler
That's a fair question; the interface is indeed weird in this regard and 
does have some issues.


From what I can tell you have 2 options:
a) have the user pass the topic to the serialization schema constructor, 
which in practice would be identical to the topic they pass to the producer.
b) Additionally implement KafkaContextAware, and have the schema 
determine the topic _somehow_.


Both options are quite "eh" from a usability perspective;
a) requires the user to pass the same information around to multiple places,
b) is inefficient since the topic has to be determined twice per record 
(once when KCA#getTargetTopic is called, once again when serialize is 
called), and can maybe(?) result in subtle issues if the 2 calls 
determine different topics.


The Table API version of the sink handles this better since it's 
serialization schema only returns a byte array, and not a producer record.


You could also use one of  the deprecated constructors that accept a  
"SerializationSchema" or "KeyedSerializationSchema" which handle this 
case better.


I've CC'd Aljoscha who was involved in the introduction of current 
iteration of the schema.


On 23/01/2020 03:13, Jason Kania wrote:

Thanks for responding.

I am aware where the topic is used. What I do not see is how to set 
the topic within the class that implements the
KafkaSerializationSchema.serialize(  T classObject, Long timestamp ) 
method.


The method must create and return a value of type 
ProducerRecord, but all the constructors for 
ProducerRecord expect "String topic" as the first argument. This will 
not be passed to the method so the question is where the 
implementation of the class is supposed to get the topic?


On Wednesday, January 22, 2020, 08:29:49 p.m. EST, David Magalhães 
 wrote:



Hi Jason,

The topic is used in *FlinkKafkaConsumer*, following the 
*KafkaDeserializationSchema* and then *Properties*.


https://ci.apache.org/projects/flink/flink-docs-release-1.9/api/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.html

new FlinkKafkaConsumer(kafkaTopic, new MessageDeserializer, 
kafkaProperties)

...
class MessageDeserializer extends 
KafkaDeserializationSchema[GenericRecord] {




On Thu, Jan 23, 2020 at 1:20 AM Jason Kania > wrote:


Hello,

I was looking for documentation in 1.9.1 on how to create
implementations of the KafkaSerializationSchema and
KafkaDeserializationSchema interfaces. I have created
implementations in the past for the SerializationSchema and
DeserializationSchema interface. Unfortunately, I can find no
examples and the code contains no documentation for this purpose
but some information appears missing.

Can someone please answer the following:

1) When creating a ProducerRecord with the
KafkaSerializationSchema.serialize() method, how is the topic
String supposed to be obtained by the implementing class? All of
the constructors require that the topic be specified, but the
topic is not passed in. Is there another interface that should be
implemented to get the topic or get a callback? Or is expected
that the topic has to be fixed in the interface's implementation
class? Some of the constructors also ask for a partition. Again,
where is this information expected to come from?

2) The interfaces specify that ConsumerRecord is
received and ProducerRecord is to be generated.
What are the 2 byte arrays referencing in the type definitions?

Thanks,

Jason





Re: Flink ParquetAvroWriters Sink

2020-01-23 Thread aj
Hi Arvid,

I am not clear with this " Note that I still recommend to just bundle the
schema with your Flink application and not reinvent the wheel."

Can you please help with some sample code on how it should be written. Or
can we connect some way so that I can understand with you .


On Thu, Jan 23, 2020 at 2:09 PM Arvid Heise  wrote:

> The issue is that your are not providing any meaningful type information,
> so that Flink has to resort to Kryo. You need to extract the schema during
> query compilation (in your main) and pass it to your deserialization schema.
>
> public TypeInformation getProducedType() {
>   return (TypeInformation) new GenericRecordAvroTypeInfo(this.schema);
> }
>
> If you don't want to extract it statically you need to tell Flink how to
> handle arbitrary GenericRecords. You could implement your own serializer
> [1], which would write GenericRecords to byte[] and vice versa.
>
> Note that I still recommend to just bundle the schema with your Flink
> application and not reinvent the wheel.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/custom_serializers.html
>
> On Thu, Jan 23, 2020 at 2:22 AM aj  wrote:
>
>>  Hi Arvid,
>>
>> I want to keep generic records only and I do not want to keep the schema
>> definition on the consumer side and should be resolve from the schema
>> registry only. I am following the below post
>>
>>
>> https://stackoverflow.com/questions/58849635/is-it-possible-to-deserialize-avro-messageconsuming-message-from-kafka-without/59865360#59865360
>>
>> so please help me what is wrong with my code.
>>
>>
>>
>> On Thu, Jan 23, 2020, 00:38 Arvid Heise  wrote:
>>
>>> Hi Anuj,
>>>
>>> I recommend using the ConfluentRegistryAvroDeserializationSchema [1]
>>> with a specific record that has been generated with the Avro Maven Plugin
>>> [2] or Avro Gradle Plugin [3]. That should result into almost no code and
>>> maximal maintainability.
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/connectors/kafka.html#the-deserializationschema
>>> [2] https://avro.apache.org/docs/1.8.2/gettingstartedjava.html
>>> [3] https://github.com/davidmc24/gradle-avro-plugin
>>>
>>> On Wed, Jan 22, 2020 at 6:43 PM aj  wrote:
>>>
 Hi Arvid,

 I have implemented the code with envelope schema as you suggested but
 now I am facing issues with the consumer . I have written code like this:

 FlinkKafkaConsumer010 kafkaConsumer010 = new
 FlinkKafkaConsumer010(KAFKA_TOPICS,
 new
 KafkaGenericAvroDeserializationSchema(schemaRegistryUrl),
 properties);

 And the Deserialization class looks like this :

 pblic class KafkaGenericAvroDeserializationSchema implements
 KeyedDeserializationSchema {

 private final String registryUrl;
 private transient KafkaAvroDeserializer inner;

 public KafkaGenericAvroDeserializationSchema(String registryUrl) {
 this.registryUrl = registryUrl;
 }

 @Override
 public GenericRecord deserialize(byte[] messageKey, byte[] message,
 String topic, int partition, long offset) {
 checkInitialized();
 return (GenericRecord) inner.deserialize(topic, message);
 }

 @Override
 public boolean isEndOfStream(GenericRecord nextElement) {
 return false;
 }

 @Override
 public TypeInformation getProducedType() {
 return TypeExtractor.getForClass(GenericRecord.class);
 }

 private void checkInitialized() {
 if (inner == null) {
 Map props = new HashMap<>();

 props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
 registryUrl);

 props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, false);
 SchemaRegistryClient client =
 new CachedSchemaRegistryClient(
 registryUrl,
 AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT);
 inner = new KafkaAvroDeserializer(client, props);
 }
 }
 }


 It's working locally on my machine but when I deployed it on yarn
 cluster I am getting below exception:


 java.lang.Exception: org.apache.flink.streaming.runtime.tasks.
 ExceptionInChainedOperatorException: Could not forward element to next
 operator
 at org.apache.flink.streaming.runtime.tasks.
 SourceStreamTask$LegacySourceFunctionThread
 .checkThrowSourceExecutionException(SourceStreamTask.java:212)
 at org.apache.flink.streaming.runtime.tasks.SourceStreamTask
 .performDefaultAction(SourceStreamTask.java:132)
 at org.apache.flink.streaming.runtime.tasks.StreamTask.run(
 StreamTask.java:298)
 at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
 StreamTask.java:403)
  

Re: Flink ParquetAvroWriters Sink

2020-01-23 Thread Arvid Heise
The issue is that your are not providing any meaningful type information,
so that Flink has to resort to Kryo. You need to extract the schema during
query compilation (in your main) and pass it to your deserialization schema.

public TypeInformation getProducedType() {
  return (TypeInformation) new GenericRecordAvroTypeInfo(this.schema);
}

If you don't want to extract it statically you need to tell Flink how to
handle arbitrary GenericRecords. You could implement your own serializer
[1], which would write GenericRecords to byte[] and vice versa.

Note that I still recommend to just bundle the schema with your Flink
application and not reinvent the wheel.

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/custom_serializers.html

On Thu, Jan 23, 2020 at 2:22 AM aj  wrote:

>  Hi Arvid,
>
> I want to keep generic records only and I do not want to keep the schema
> definition on the consumer side and should be resolve from the schema
> registry only. I am following the below post
>
>
> https://stackoverflow.com/questions/58849635/is-it-possible-to-deserialize-avro-messageconsuming-message-from-kafka-without/59865360#59865360
>
> so please help me what is wrong with my code.
>
>
>
> On Thu, Jan 23, 2020, 00:38 Arvid Heise  wrote:
>
>> Hi Anuj,
>>
>> I recommend using the ConfluentRegistryAvroDeserializationSchema [1] with
>> a specific record that has been generated with the Avro Maven Plugin [2] or
>> Avro Gradle Plugin [3]. That should result into almost no code and maximal
>> maintainability.
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/connectors/kafka.html#the-deserializationschema
>> [2] https://avro.apache.org/docs/1.8.2/gettingstartedjava.html
>> [3] https://github.com/davidmc24/gradle-avro-plugin
>>
>> On Wed, Jan 22, 2020 at 6:43 PM aj  wrote:
>>
>>> Hi Arvid,
>>>
>>> I have implemented the code with envelope schema as you suggested but
>>> now I am facing issues with the consumer . I have written code like this:
>>>
>>> FlinkKafkaConsumer010 kafkaConsumer010 = new
>>> FlinkKafkaConsumer010(KAFKA_TOPICS,
>>> new
>>> KafkaGenericAvroDeserializationSchema(schemaRegistryUrl),
>>> properties);
>>>
>>> And the Deserialization class looks like this :
>>>
>>> pblic class KafkaGenericAvroDeserializationSchema implements
>>> KeyedDeserializationSchema {
>>>
>>> private final String registryUrl;
>>> private transient KafkaAvroDeserializer inner;
>>>
>>> public KafkaGenericAvroDeserializationSchema(String registryUrl) {
>>> this.registryUrl = registryUrl;
>>> }
>>>
>>> @Override
>>> public GenericRecord deserialize(byte[] messageKey, byte[] message,
>>> String topic, int partition, long offset) {
>>> checkInitialized();
>>> return (GenericRecord) inner.deserialize(topic, message);
>>> }
>>>
>>> @Override
>>> public boolean isEndOfStream(GenericRecord nextElement) {
>>> return false;
>>> }
>>>
>>> @Override
>>> public TypeInformation getProducedType() {
>>> return TypeExtractor.getForClass(GenericRecord.class);
>>> }
>>>
>>> private void checkInitialized() {
>>> if (inner == null) {
>>> Map props = new HashMap<>();
>>>
>>> props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
>>> registryUrl);
>>>
>>> props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, false);
>>> SchemaRegistryClient client =
>>> new CachedSchemaRegistryClient(
>>> registryUrl,
>>> AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT);
>>> inner = new KafkaAvroDeserializer(client, props);
>>> }
>>> }
>>> }
>>>
>>>
>>> It's working locally on my machine but when I deployed it on yarn
>>> cluster I am getting below exception:
>>>
>>>
>>> java.lang.Exception: org.apache.flink.streaming.runtime.tasks.
>>> ExceptionInChainedOperatorException: Could not forward element to next
>>> operator
>>> at org.apache.flink.streaming.runtime.tasks.
>>> SourceStreamTask$LegacySourceFunctionThread
>>> .checkThrowSourceExecutionException(SourceStreamTask.java:212)
>>> at org.apache.flink.streaming.runtime.tasks.SourceStreamTask
>>> .performDefaultAction(SourceStreamTask.java:132)
>>> at org.apache.flink.streaming.runtime.tasks.StreamTask.run(
>>> StreamTask.java:298)
>>> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
>>> StreamTask.java:403)
>>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>>> at java.lang.Thread.run(Thread.java:748)
>>> Caused by: org.apache.flink.streaming.runtime.tasks.
>>> ExceptionInChainedOperatorException: Could not forward element to next
>>> operator
>>> at org.apache.flink.streaming.runtime.tasks.
>>> OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:
>>> 654)
>>>  

Re: Apache Flink - Sharing state in processors

2020-01-23 Thread M Singh
 Thanks Yun for your answers.
By processor I did mean user defined processor function. Keeping that in view, 
do you have any advice on how the shared state - ie, the parameters passed to 
the processor as mentioned above (not the key state or operator state) will be 
affected in a distributed runtime env ?
Mans
On Sunday, January 12, 2020, 09:51:10 PM EST, Yun Tang  
wrote:  
 
 #yiv0773511519 P {margin-top:0;margin-bottom:0;}Hi Mans
What's the meaning of 'processor' you defined here? A user defined function?
When talking about share state, I'm afraid it's not so easy to implement in 
Flink. As no matter keyed state or operator state, they're both instantiated, 
used and only thread-safe in operator scope. The only way to read read-only 
state during runtime is via queryable state[1]
For the question of keyBy, the message would only sent to one of task in 
downstream according to the hashcode [2].

[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/queryable_state.html[2]
 
https://github.com/apache/flink/blob/7a6ca9c03f67f488e40a114e94c389a5cfb67836/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/KeyGroupStreamPartitioner.java#L58


BestYun Tang

From: M Singh 
Sent: Friday, January 10, 2020 23:29
To: User 
Subject: Apache Flink - Sharing state in processors Hi:
I have a few question about how state is shared in processors in Flink.
1. If I have a processor instantiated in the Flink app, and apply use in 
multiple times in the Flink -     (a) if the tasks are in the same slot - do 
they share the same processoron the taskmanager ?
    (b) if the tasks are on same node but different slots - do they share the 
same processor on the taskmanager ?

2. If I instantiate a single processor with local state and use it in multiple 
times in Flink     (a) if the tasks are in the same slot - do they share the 
same processor and stateon the taskmanager ?
    (b) if the tasks are on same node but different slots - do they share the 
same processor and stateon the taskmanager ?

3. If I instantiate a multiple processors with shared collection and use it in 
multiple times in Flink     (a) if the tasks are in the same slot - do they 
share the state on the taskmanager ?
    (b) if the tasks are on same node but different slots - do they share the 
stateon the taskmanager ?
4. How do the above scenarios affect sharing (a) operator state(b) 
keyed state
5. If I have have a parallelism of > 1, and use keyBy - is each key handled by 
only one instance of the processor ?  I believe so, but wanted to confirm.

Thanks
Mans




  

debug flink in intelliJ on EMR

2020-01-23 Thread Fanbin Bu
Hi,

I m following
https://cwiki.apache.org/confluence/display/FLINK/Remote+Debugging+of+Flink+Clusters
to
debug flink program running on EMR.

how do I specify the host in the `edit configurations` if the terminal on
emr master is
hadoop@ip-10-200-46-186
?

Thanks,
Fanbin


Re: batch job OOM

2020-01-23 Thread Fanbin Bu
Jingsong,

Do you have any suggestions to debug the above mentioned
IndexOutOfBoundsException error?
Thanks,

Fanbin

On Wed, Jan 22, 2020 at 11:07 PM Fanbin Bu  wrote:

> I got the following error when running another job. any suggestions?
>
> Caused by: java.lang.IndexOutOfBoundsException
> at
> org.apache.flink.core.memory.MemorySegment.getInt(MemorySegment.java:701)
> at org.apache.flink.table.dataformat.BinaryRow.getInt(BinaryRow.java:264)
> at HashWinAggWithKeys$538.endInput(Unknown Source)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain.endInput(OperatorChain.java:276)
> at
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.checkFinished(StreamOneInputProcessor.java:151)
> at
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:138)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.performDefaultAction(StreamTask.java:276)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:298)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:403)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
> at java.lang.Thread.run(Thread.java:748)
>
> On Wed, Jan 22, 2020 at 8:57 PM Fanbin Bu  wrote:
>
>> Jingsong,
>>
>> I set the config value to be too large. After I changed it to a smaller
>> number it works now!
>> thanks you for the help. really appreciate it!
>>
>> Fanbin
>>
>> On Wed, Jan 22, 2020 at 8:50 PM Jingsong Li 
>> wrote:
>>
>>> Fanbin,
>>>
>>> Looks like your config is wrong, can you show your config code?
>>>
>>> Best,
>>> Jingsong Lee
>>>
>>> On Thu, Jan 23, 2020 at 12:41 PM Fanbin Bu 
>>> wrote:
>>>
 Jingsong,

 Great, now i got a different error:

 java.lang.NullPointerException: Initial Segment may not be null
at 
 org.apache.flink.runtime.memory.AbstractPagedOutputView.(AbstractPagedOutputView.java:65)
at 
 org.apache.flink.runtime.io.disk.SimpleCollectingOutputView.(SimpleCollectingOutputView.java:49)
at 
 org.apache.flink.table.runtime.operators.aggregate.BytesHashMap$RecordArea.(BytesHashMap.java:522)
at 
 org.apache.flink.table.runtime.operators.aggregate.BytesHashMap.(BytesHashMap.java:190)
at 
 org.apache.flink.table.runtime.operators.aggregate.BytesHashMap.(BytesHashMap.java:149)
at LocalHashWinAggWithKeys$292.open(Unknown Source)
at 
 org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:529)
at 
 org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:393)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.lang.Thread.run(Thread.java:748)


 is there any other config i should add?

 thanks,

 Fanbin


 On Wed, Jan 22, 2020 at 8:07 PM Fanbin Bu 
 wrote:

> you beat me to it.
> let's me try that.
>
> On Wed, Jan 22, 2020 at 7:57 PM Jingsong Li 
> wrote:
>
>> Fanbin,
>>
>> Document is here:
>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/config.html
>> NOTE: you need configure this into TableConfig.
>>
>> Best,
>> Jingsong Lee
>>
>> On Thu, Jan 23, 2020 at 11:54 AM Fanbin Bu 
>> wrote:
>>
>>> Jingsong,
>>>
>>> Thank you for the response.
>>> Since I'm using flink on EMR and the latest version is 1.9 now. the
>>> second option is ruled out. but will keep that in mind for future 
>>> upgrade.
>>>
>>> I'm going to try the first option. It's probably a good idea to add
>>> that in the doc for example:
>>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html
>>>
>>> Thanks,
>>> Fanbin
>>>
>>> On Wed, Jan 22, 2020 at 7:08 PM Jingsong Li 
>>> wrote:
>>>
 Hi Fanbin,

 Thanks for using blink batch mode.

 The OOM is caused by the manage memory not enough in Hash
 aggregation.

 There are three options you can choose from:

 1.Is your version Flink 1.9? 1.9 still use fix memory
 configuration. So you need increase hash memory:
 - table.exec.resource.hash-agg.memory: 1024 mb

 2.In 1.10, we use slot manage memory to dynamic config real
 operator memory, so operator can use more manage memory, so you don't 
 need
 configure hash agg memory anymore. You can try 1.10 RC0 [1]

 3.We can use sort aggregation to avoid OOM too, but there is no
 config option now, I created JIRA to track it. [2]

 [1]
 http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/ANNOUNCE-Apache-Flink-1-10-0-release-candidate-0