Re: Flink restart strategy on specific exception

2020-05-14 Thread Zhu Zhu
Ticket FLINK-17714 is created to track this requirement.

Thanks,
Zhu Zhu

Till Rohrmann  于2020年5月13日周三 下午8:30写道:

> Yes, you are right Zhu Zhu. Extending
> the RestartBackoffTimeStrategyFactoryLoader to also load custom
> RestartBackoffTimeStrategies sound like a good improvement for the future.
>
> @Ken Krugler , the old RestartStrategy
> interface did not provide the cause of the failure, unfortunately.
>
> Cheers,
> Till
>
> On Wed, May 13, 2020 at 7:55 AM Zhu Zhu  wrote:
>
>> Hi Ken,
>>
>> Custom restart-strategy was an experimental feature and was deprecated
>> since 1.10. [1]
>> That's why you cannot find any documentation for it.
>>
>> The old RestartStrategy was deprecated and replaced by
>> RestartBackoffTimeStrategy since 1.10
>> (unless you are using the legacy scheduler which was also deprecated).
>> The new restart strategy, RestartBackoffTimeStrategy, will be able to
>> know the exact failure cause.
>> However, the new restart strategy does not support customization at the
>> moment.
>> Your requirement sounds reasonable to me and I think custom (new) restart
>> strategy can be something to support later.
>>
>> @Till Rohrmann  @Gary Yao  what
>> do you think?
>>
>> [1]
>> https://lists.apache.org/thread.html/6ed95eb6a91168dba09901e158bc1b6f4b08f1e176db4641f79de765%40%3Cdev.flink.apache.org%3E
>>
>> Thanks,
>> Zhu Zhu
>>
>> Ken Krugler  于2020年5月13日周三 上午7:34写道:
>>
>>> Hi Til,
>>>
>>> Sorry, missed the key question…in the RestartStrategy.restart() method,
>>> I don’t see any good way to get at the underlying exception.
>>>
>>> I can cast the RestartCallback to an ExecutionGraphRestartCallback, but
>>> I still need access to the private execGraph to be able to get at the
>>> failure info. Is there some other way in the restart handler to get at this?
>>>
>>> And yes, I meant to note you’d mentioned the required static method in
>>> your email, I was asking about documentation for it.
>>>
>>> Thanks,
>>>
>>> — Ken
>>>
>>> ===
>>> Sorry to resurface an ancient question, but is there a working example
>>> anywhere of setting a custom restart strategy?
>>>
>>> Asking because I’ve been wandering through the Flink 1.9 code base for a
>>> while, and the restart strategy implementation is…pretty tangled.
>>>
>>> From what I’ve been able to figure out, you have to provide a factory
>>> class, something like this:
>>>
>>> Configuration config = new Configuration();
>>> config.setString(ConfigConstants.RESTART_STRATEGY,
>>> MyRestartStrategyFactory.class.getCanonicalName());
>>> StreamExecutionEnvironment env =
>>> StreamExecutionEnvironment.createLocalEnvironment(4, config);
>>>
>>> That factory class should extend RestartStrategyFactory, but it also
>>> needs to implement a static method that looks like:
>>>
>>> public static MyRestartStrategyFactory
>>> createFactory(Configuration config) {
>>> return new MyRestartStrategyFactory();
>>> }
>>>
>>> I wasn’t able to find any documentation that mentioned this particular
>>> method being a requirement.
>>>
>>> And also the documentation at
>>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#fault-tolerance
>>>  doesn’t
>>> mention you can set a custom class name for the restart-strategy.
>>>
>>> Thanks,
>>>
>>> — Ken
>>>
>>>
>>> On Nov 22, 2018, at 8:18 AM, Till Rohrmann  wrote:
>>>
>>> Hi Kasif,
>>>
>>> I think in this situation it is best if you defined your own custom
>>> RestartStrategy by specifying a class which has a `RestartStrategyFactory
>>> createFactory(Configuration configuration)` method as `restart-strategy:
>>> MyRestartStrategyFactoryFactory` in `flink-conf.yaml`.
>>>
>>> Cheers,
>>> Till
>>>
>>> On Thu, Nov 22, 2018 at 7:18 AM Ali, Kasif  wrote:
>>>
 Hello,



 Looking at existing restart strategies they are kind of generic. We
 have a requirement to restart the job only in case of specific
 exception/issues.

 What would be the best way to have a re start strategy which is based
 on few rules like looking at particular type of exception or some extra
 condition checks which are application specific.?



 Just a background on one specific issue which invoked this requirement
 is slots not getting released when the job finishes. In our applications,
 we keep track of jobs submitted with the amount of parallelism allotted to
 it.  Once the job finishes we assume that the slots are free and try to
 submit next set of jobs which at times fail with error  “not enough slots
 available”.



 So we think a job re start can solve this issue but we only want to re
 start only if this particular situation is encountered.



 Please let us know If there are better ways to solve this problem other
 than re start strategy.



 Thanks,

 Kasif



 --

Flink suggestions;

2020-05-14 Thread Aissa Elaffani
Hello Guys,
I am a beginner in this field of real-time streaming and i am working with
apache flink, and i ignore a lot of features of it, and actually I am
building an application, in which i receive some sensors data in this
format {"status": "Alerte", "classe": " ", "value": {"temperature": 15.7},
"mode": "ON", "equipementID": 1, "date": "2019-03-20 22:00", "sensorID":
9157}, each sensor is installed on an equipment in a workshop in a factory
somewhere. My goal is :
If one sensor of a factory get down (status="alerte"), I want that the
status of all the factory to be Alerte. But as  the Stream does not contain
the factory ID, other Static data set source that contain the data of
factories and the sensors that belongs to each one.
So Please guys i want to know the optimized way to do so, and the
aggregation that i need to do!
Sorry for disturbing you, i wish you all the best! And i hope you share
with me the of your experiences!
Best regards,
Aissa


Flink performance tuning on operators

2020-05-14 Thread Ivan Yang
Hi,

We have a Flink job that reads data from an input stream, then converts each 
event from JSON string Avro object, finally writes to parquet files using 
StreamingFileSink with OnCheckPointRollingPolicy of 5 mins. Basically a 
stateless job. Initially, we use one map operator to convert Json string to 
Avro object, Inside the map function, it goes form String -> JsonObject -> Avro 
object. 

DataStream avroData = data.map(new JsonToAVRO());

When we try to break the map operator to two, one for String to JsonObject, 
another for JsonObject to Avro. 

DataStream JsonData = data.map(new StringToJson());
DataStream avroData = rawDataAsJson.map(new 
JsonToAvroSchema())

The benchmark shows significant performance hit when breaking down to two 
operators. We try to understand the Flink internal on why such a big 
difference. The setup is using state backend = filesystem. Checkpoint = s3 
bucket. Our event object has 300+ attributes.


Thanks
Ivan

Re: Protection against huge values in RocksDB List State

2020-05-14 Thread Yun Tang
Hi Robin

First of all, the root cause is not RocksDB cannot store large list state when 
you merge but the JNI limitation of 2^31 bytes [1].
Moreover, RocksDB java would not return anything when you call merge [2] 
operator.

Did you merge too many elements or just merge too big-size elements? Last but 
not least, even you could merge large list, I think getting a value with size 
larger than 2^31 bytes should not behave well.


[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/state_backends.html#the-rocksdbstatebackend
[2] 
https://github.com/facebook/rocksdb/blob/50d63a2af01a46dd938dc1b717067339c92da040/java/src/main/java/org/rocksdb/RocksDB.java#L1382

Best
Yun Tang

From: Robin Cassan 
Sent: Friday, May 15, 2020 0:37
To: user 
Subject: Protection against huge values in RocksDB List State

Hi all!

I cannot seem to find any setting to limit the number of records appended in a 
RocksDBListState that is used when we use SessionWindows with a ProcessFunction.
It seems that, for each incoming element, the new element will be appended to 
the value with the RocksDB `merge` operator, without any safeguard to make sure 
that it doesn't grow infinitely. RocksDB merge seems to support returning false 
in case of error, so I guess we could implement a limit by returning false in 
the merge operator, but since Flink seems to use the "stringappendtest" merge 
operator ( 
https://github.com/facebook/rocksdb/blob/fdf882ded218344c136c97daf76dfb59e4bc155f/utilities/merge_operators/string_append/stringappend2.cc
 ), we always return true no matter what.

This is troublesome for us because it would make a lot of sense to specify an 
acceptable limit to how many elements can be aggregated under a given key, and 
because when we happen to have too many elements we get an exception from 
RocksDB:
```
Caused by: org.apache.flink.util.FlinkRuntimeException: Error while retrieving 
data from RocksDB
at 
org.apache.flink.contrib.streaming.state.RocksDBListState.getInternal(RocksDBListState.java:121)
at 
org.apache.flink.contrib.streaming.state.RocksDBListState.get(RocksDBListState.java:111)
at 
org.apache.flink.contrib.streaming.state.RocksDBListState.get(RocksDBListState.java:60)
at 
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime(WindowOperator.java:501)
at 
org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.onProcessingTime(InternalTimerServiceImpl.java:260)
at 
org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:281)
... 7 more
Caused by: org.rocksdb.RocksDBException: Requested array size exceeds VM limit
at org.rocksdb.RocksDB.get(Native Method)
at org.rocksdb.RocksDB.get(RocksDB.java:810)
at 
org.apache.flink.contrib.streaming.state.RocksDBListState.getInternal(RocksDBListState.java:118)
... 12 more
```

We are currently bypassing this by using a Reduce operator instead, which 
ensures that we only store one element per key, but this gives us degraded 
performance.

Thanks for your input!
Robin


Re: upgrade flink from 1.9.1 to 1.10.0 on EMR

2020-05-14 Thread aj
Hi Yang,

I am able to resolve the issue by removing Hadoop dependency as you
mentioned.

1. Removed hadoop-common dependency and

org.apache.flink
flink-streaming-java_${scala.binary.version}
${flink.version}


org.apache.flink
flink-hadoop-fs






2. After the above changes, I am able to submit job on yarn but facing
issues with s3 plugin that I resolved by putting *
flink-s3-fs-hadoop-1.9.0.jar *i n the plugins/s3-fs-hadoop  directory.

Thanks for your support.

Any update when will flink.10 officially supported in EMR.  Even in new EMR
version(emr 6.0)  flink has been removed.



On Sat, May 9, 2020 at 1:36 PM aj  wrote:

> Hello Yang,
>
> I have attached my pom file and I did not see that I am using any Hadoop
> dependency. Can you please help me.
>
> On Wed, May 6, 2020 at 1:22 PM Yang Wang  wrote:
>
>> Hi aj,
>>
>> From the logs you have provided, the hadoop version is still 2.4.1.
>> Could you check the user jar(i.e. events-processor-1.0-SNAPSHOT.jar) have
>> some
>> hadoop classes? If it is, you need to exclude the hadoop dependency.
>>
>>
>> Best,
>> Yang
>>
>> aj  于2020年5月6日周三 下午3:38写道:
>>
>>> Hello,
>>>
>>> Please help me upgrade to 1.10 in AWS EMR.
>>>
>>> On Fri, May 1, 2020 at 4:05 PM aj  wrote:
>>>
 Hi Yang,

 I am attaching the logs for your reference, please help me what i am
 doing wrong.

 Thanks,
 Anuj

 On Wed, Apr 29, 2020 at 9:06 AM Yang Wang 
 wrote:

> Hi Anuj,
>
> I think the exception you come across still because the hadoop version
> is 2.4.1. I have checked the hadoop code, the code line are exactly
> same.
> For 2.8.1, i also have checked the ruleParse. It could work.
>
> /**
>  * A pattern for parsing a auth_to_local rule.
>  */
> private static final Pattern ruleParser =
>   
> Pattern.compile("\\s*((DEFAULT)|(RULE:\\[(\\d*):([^\\]]*)](\\(([^)]*)\\))?"+
>   "(s/([^/]*)/([^/]*)/(g)?)?))/?(L)?");
>
>
> Could you share the jobmanager logs so that i could check the
> classpath and hadoop version?
>
> Best,
> Yang
>
> aj  于2020年4月28日周二 上午1:01写道:
>
>> Hello Yang,
>> My Hadoop version is Hadoop 3.2.1-amzn-0
>> and I have put in flink/lib.
>>  flink-shaded-hadoop-2-uber-2.8.3-10.0.jar
>>
>> then I am getting below error :
>>
>> SLF4J: Class path contains multiple SLF4J bindings.
>> SLF4J: Found binding in
>> [jar:file:/mnt/yarn/usercache/hadoop/appcache/application_1587983834922_0002/filecache/10/slf4j-log4j12-1.7.15.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>> SLF4J: Found binding in
>> [jar:file:/usr/lib/hadoop/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
>> explanation.
>> SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
>> Exception in thread "main" java.lang.IllegalArgumentException:
>> Invalid rule: /L
>>   RULE:[2:$1@$0](.*@)s/@.*///L
>>   DEFAULT
>> at
>> org.apache.hadoop.security.authentication.util.KerberosName.parseRules(KerberosName.java:321)
>> at
>> org.apache.hadoop.security.authentication.util.KerberosName.setRules(KerberosName.java:386)
>> at
>> org.apache.hadoop.security.HadoopKerberosName.setConfiguration(HadoopKerberosName.java:75)
>> at
>> org.apache.hadoop.security.UserGroupInformation.initialize(UserGroupInformation.java:247)
>> at
>> org.apache.hadoop.security.UserGroupInformation.ensureInitialized(UserGroupInformation.java:232)
>> at
>> org.apache.hadoop.security.UserGroupInformation.loginUserFromSubject(UserGroupInformation.java:718)
>> at
>> org.apache.hadoop.security.UserGroupInformation.getLoginUser(UserGroupInformation.java:703)
>> at
>> org.apache.hadoop.security.UserGroupInformation.getCurrentUser(UserGroupInformation.java:605)
>> at
>> org.apache.flink.yarn.entrypoint.YarnEntrypointUtils.logYarnEnvironmentInformation(YarnEntrypointUtils.java:136)
>> at
>> org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint.main(YarnJobClusterEntrypoint.java:109)
>>
>>
>> if I remove the  flink-shaded-hadoop-2-uber-2.8.3-10.0.jar  from lib
>> then i get below error:
>>
>> 2020-04-27 16:59:37,293 INFO  org.apache.flink.client.cli.CliFrontend
>>   -  Classpath:
>> /usr/lib/flink/lib/flink-table-blink_2.11-1.10.0.jar:/usr/lib/flink/lib/flink-table_2.11-1.10.0.jar:/usr/lib/flink/lib/log4j-1.2.17.jar:/usr/lib/flink/lib/slf4j-log4j12-1.7.15.jar:/usr/lib/flink/lib/flink-dist_2.11-1.10.0.jar::/etc/hadoop/conf:/etc/hadoop/conf
>> 2020-04-27 16:59:37,293 INFO  org.apache.flink.client.cli.CliFrontend
>>   -
>>

How to read UUID out of a JDBC table source

2020-05-14 Thread Bonino Dario

Dear list,

I need to use a Table Source to extract data from a PostgreSQL table 
that includes a column of type uuid. Data in the column is converted to 
java.util.UUID by the postgresql jdbc driver (I guess) however I was not 
able to find a way to define a Table schema for correctly reading that 
column (selecting RAW(TypeInformation.of(UUID.class)) does not work 
providing the following error:


org.apache.flink.table.api.ValidationException: Type 
RAW('java.util.UUID', ?) of table field 'datastream' does not match with 
the physical type LEGACY('RAW', 'ANY') of the 
'datastream' field of the TableSource return type.


Did someone encountered the same issue? Does someone have any suggestion 
on how to proceed?



Thank you very much

Dario


--
Ing. Dario Bonino, Ph.D

e-m@il: dario.bon...@gmail.com
www: https://www.linkedin.com/in/dariobonino

Dario
Bonino
slide...@hotmail.com




Protection against huge values in RocksDB List State

2020-05-14 Thread Robin Cassan
Hi all!

I cannot seem to find any setting to limit the number of records appended
in a RocksDBListState that is used when we use SessionWindows with a
ProcessFunction.
It seems that, for each incoming element, the new element will be appended
to the value with the RocksDB `merge` operator, without any safeguard to
make sure that it doesn't grow infinitely. RocksDB merge seems to support
returning false in case of error, so I guess we could implement a limit by
returning false in the merge operator, but since Flink seems to use the
"stringappendtest" merge operator (
https://github.com/facebook/rocksdb/blob/fdf882ded218344c136c97daf76dfb59e4bc155f/utilities/merge_operators/string_append/stringappend2.cc
),
we always return true no matter what.

This is troublesome for us because it would make a lot of sense to specify
an acceptable limit to how many elements can be aggregated under a given
key, and because when we happen to have too many elements we get an
exception from RocksDB:
```
Caused by: org.apache.flink.util.FlinkRuntimeException: Error while
retrieving data from RocksDB
at
org.apache.flink.contrib.streaming.state.RocksDBListState.getInternal(RocksDBListState.java:121)
at
org.apache.flink.contrib.streaming.state.RocksDBListState.get(RocksDBListState.java:111)
at
org.apache.flink.contrib.streaming.state.RocksDBListState.get(RocksDBListState.java:60)
at
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime(WindowOperator.java:501)
at
org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.onProcessingTime(InternalTimerServiceImpl.java:260)
at
org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:281)
... 7 more
Caused by: org.rocksdb.RocksDBException: Requested array size exceeds VM
limit
at org.rocksdb.RocksDB.get(Native Method)
at org.rocksdb.RocksDB.get(RocksDB.java:810)
at
org.apache.flink.contrib.streaming.state.RocksDBListState.getInternal(RocksDBListState.java:118)
... 12 more
```

We are currently bypassing this by using a Reduce operator instead, which
ensures that we only store one element per key, but this gives us degraded
performance.

Thanks for your input!
Robin


Re: Watermarks and parallelism

2020-05-14 Thread Alexander Fedulov
Hi Gnana,

1. No, watermarks are generated independently per subtask. I think this
section of the docs might make things more clear - [1]

 .

2. The same watermark from the input of the keyBy will be dispatched to all
of the instances of the downstream keyed operator. That said, there is no
global coordination between the subtasks. The same watermark can arrive at
the downstream subtask at a different time, depending on how much time
they'd spend on the input channels. Notice also that watermarks are managed
on the subtask level, not at the level of the individual keys.

3. I am not quite sure I get what you mean by this one and what exactly you
try to achieve. I assume you want to basically have parallel windows that
are scoped to all of the items coming from a corresponding subtask of the
previous non-keyed operator. As Flink windows can be executed in parallel
only on keyed streams, you could  do a little trick - use
`reinterpredAsKeyedStream` [2]
.
This will make it possible to basically have a "passthrough" partitioning,
without an actual data shuffle. Another alternative would be to implement
your Map function as a RichMapFunction, which gives you the access to the
runtime context. From there:
1) use `getRuntimeContext().getIndexOfThisSubtask();` to retrieve the ID of
the current subtask
2) enrich your events with a new field, containing the subtask ID
3) use this ID as the key in your keyBy operator
The problem is that both of those approaches will be non-deterministic in
terms of state recovery when, for instance, you would like to scale out
your job to a higher degree of parallelism. You'd need to decide if this is
relevant for your use case.

[1]
https://ci.apache.org/projects/flink/flink-docs-master/concepts/timely-stream-processing.html#watermarks-in-parallel-streams
[2]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/experimental.html#reinterpreting-a-pre-partitioned-data-stream-as-keyed-stream

Best,

--

Alexander Fedulov | Solutions Architect

+49 1514 6265796



Follow us @VervericaData

--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Tony) Cheng



On Thu, May 14, 2020 at 6:14 AM Gnanasoundari Soundarajan <
gnanasoundari.soundara...@man-es.com> wrote:

> Hi all,
>
>
>
> I have below queries in flink. Could anyone help me to understand?
>
>
>
> *Query:*
>
> 1 Is watermark maintained  globally at the operator level?
>
> 2 When we have a keyByOperator with parallelism >1, is there a single
> watermark maintained across all the parallel subtasks or for each of the
> parallel subtasks
>
> 3. Assuming I have a keybyoperator with parallelism > 1, is it possible
> to feed data to this operator from only one stream from the previous
> parameter (say map (1) always goes to window (1)
>
>
>
> Regards,
>
> Gnana
>


[Announce] Flink Forward Global 2020 - Call for Proposals

2020-05-14 Thread Seth Wiesman
Hi Everyone!


After a successful Virtual Flink Forward in April, we have decided to
present our October edition in the same way. In these uncertain times, we
are conscious of everyone's health and safety and want to make sure our
events are accessible for everyone.


Flink Forward Global Conference 2020 will take place virtually, October
19-21, and the Call for Presentations and pre-registration is now open!


Flink Forward Global will include a virtual training day, followed by two
days of keynotes and breakout sessions. The conference will be free to
attend, and there will be a limited number of paid spots available for the
training.


Conference tracks include:

   - Use Cases
   - Operations
   - Technology Deep Dive
   - Ecosystem
   - Community and Industry Impact


You can find more detailed track descriptions and the form to submit a

proposal on the Flink Forward website at:


https://www.flink-forward.org/global-2020/call-for-presentations


The deadline for submissions is June 19th, 2020. If you have any questions
please feel free to reach out to me here or directly.


Best,


Seth

PC Chair - Flink Forward Global 2020

Twitter: @sjwiesman


Re: ProducerRecord with Kafka Sink for 1.8.0

2020-05-14 Thread Nick Bendtner
Hi Gary,
I have used this technique before. I deleted flink-avro jar from lib and
packed it into the application  jar and there are no problems.

Best,
Nick

On Thu, May 14, 2020 at 6:11 AM Gary Yao  wrote:

> Its because the flink distribution of the cluster is 1.7.2. We use a
>> standalone cluster , so in the lib directory in flink the artifact is
>> flink-core-1.7.2.jar . I need to pack flink-core-1.9.0.jar from application
>> and use child first class loading to use newer version of flink-core.
>>
>
> Do you have experience with this technique in production? In general I do
> not think this can work; a job pretending to run a newer version of Flink
> generally cannot communicate with an older JobManager, which normally does
> not even run user code.
>
> If you are stuck with Flink 1.8, maybe it is an option for you to backport
> FLINK-11693 to Flink 1.8 yourself and build a custom Kafka connector.
>
>
> On Tue, May 12, 2020 at 10:04 PM Nick Bendtner  wrote:
> >
> > Hi Gary,
> > Its because the flink distribution of the cluster is 1.7.2. We use a
> standalone cluster , so in the lib directory in flink the artifact is
> flink-core-1.7.2.jar . I need to pack flink-core-1.9.0.jar from application
> and use child first class loading to use newer version of flink-core. If I
> have it as provided scope, sure it will work in IntelliJ but not outside of
> it .
> >
> > Best,
> > Nick
> >
> > On Tue, May 12, 2020 at 2:53 PM Gary Yao  wrote:
> >>
> >> Hi Nick,
> >>
> >> Can you explain why it is required to package flink-core into your
> >> application jar? Usually flink-core is a dependency with provided
> >> scope [1]
> >>
> >> Best,
> >> Gary
> >>
> >> [1]
> https://maven.apache.org/guides/introduction/introduction-to-dependency-mechanism.html#dependency-scope
> >>
> >> On Tue, May 12, 2020 at 5:41 PM Nick Bendtner 
> wrote:
> >> >
> >> > Hi Gary,
> >> > Thanks for the info. I am aware this feature is available in 1.9.0
> onwards. Our cluster is still very old and have CICD challenges,I was
> hoping not to bloat up the application jar by packaging even flink-core
> with it. If its not possible to do this with older version without writing
> our own kafka sink implementation similar to the flink provided version in
> 1.9.0 then I think we will pack flink-core 1.9.0 with the application and
> follow the approach that you suggested. Thanks again for getting back to me
> so quickly.
> >> >
> >> > Best,
> >> > Nick
> >> >
> >> > On Tue, May 12, 2020 at 3:37 AM Gary Yao  wrote:
> >> >>
> >> >> Hi Nick,
> >> >>
> >> >> Are you able to upgrade to Flink 1.9? Beginning with Flink 1.9 you
> can use
> >> >> KafkaSerializationSchema to produce a ProducerRecord [1][2].
> >> >>
> >> >> Best,
> >> >> Gary
> >> >>
> >> >> [1] https://issues.apache.org/jira/browse/FLINK-11693
> >> >> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/api/java/org/apache/flink/streaming/connectors/kafka/KafkaSerializationSchema.html
> >> >>
> >> >> On Mon, May 11, 2020 at 10:59 PM Nick Bendtner 
> wrote:
> >> >> >
> >> >> > Hi guys,
> >> >> > I use 1.8.0 version for flink-connector-kafka. Do you have any
> recommendations on how to produce a ProducerRecord from a kafka sink.
> Looking to add support to kafka headers therefore thinking about
> ProducerRecord. If you have any thoughts its highly appreciated.
> >> >> >
> >> >> > Best,
> >> >> > Nick.
>


Re: Statefun 2.0 questions

2020-05-14 Thread Igal Shilman
Hi,
I'm glad things are getting clearer, looking forward to seeing how statefun
is working out for you :-)

To change the parallelism you can simply set the "parallelism.default" [1]
key in flink-conf.yaml.
It is located in the statefun container at /opt/flink/conf/flink-conf.yaml.
To avoid rebuilding the container you can mount the flink-conf.yaml
externally, and if you are using Kubernetes then
simply define flink-conf.yaml it as a config map.

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#basic-setup

Good luck,
Igal.

On Wed, May 13, 2020 at 11:55 AM Wouter Zorgdrager 
wrote:

> Dear Igal, all,
>
> Thanks a lot. This is very helpful. I understand the architecture a bit
> more now. We can just scale the stateful functions and put a load balancer
> in front and Flink will contact them. The only part of the scaling I don't
> understand yet is how to scale the 'Flink side'. So If I understand
> correctly the Kafka ingress/egress parts runs on the Flink cluster and
> contacts the remote workers through HTTP. How can I scale this Kafka part
> then? For a normal Flink job I would just change the parallelism, but I
> couldn't really find that option yet. Is there some value I need to set in
> the module.yaml.
>
> Once again, thanks for the help so far. It has been useful.
>
> Regards,
> Wouter
>
> Op wo 13 mei 2020 om 00:03 schreef Igal Shilman :
>
>> Hi Wouter,
>>
>> Triggering a stateful function from a frontend indeed requires an ingress
>> between them, so the way you've approached this is also the way we were
>> thinking of.
>> As Gordon mentioned a potential improvement might be an HTTP ingress,
>> that would allow triggering stateful functions directly from the front end
>> servers.
>> But this kind of ingress is not implemented yet.
>>
>> Regarding scaling: Your understanding is correct, you can scale both the
>> Flink cluster and the remote "python-stateful-function" cluster
>> independently.
>> Scaling the Flink cluster, tho, requires taking a savepoint, bumping the
>> job parallelism, and starting the cluster with more workers from the
>> savepoint taken previously.
>>
>> Scaling "python-stateful-function" workers can be done transparently to
>> the Flink cluster, but the exact details are deployment specific.
>> - For example the python workers are a k8s service.
>> - Or the python workers are deployed behind a load balancer
>> - Or you add new entries to the DNS record of your python worker.
>>
>> I didn't understand "ensuring that it ends op in the correct Flink job"
>> can you please clarify?
>> Flink would be the one contacting the remote workers and not the other
>> way around. So as long as the new instances
>> are visible to Flink they would be reached with the same shared state.
>>
>> I'd recommend watching [1] and the demo at the end, and [2] for a demo
>> using stateful functions on AWS lambda.
>>
>> [1] https://youtu.be/NF0hXZfUyqE
>> [2] https://www.youtube.com/watch?v=tuSylBadNSo
>>
>> It seems like you are on the correct path!
>> Good luck!
>> Igal.
>>
>>
>> On Tue, May 12, 2020 at 11:18 PM Wouter Zorgdrager 
>> wrote:
>>
>>> Hi Igal, all,
>>>
>>> In the meantime we found a way to serve Flink stateful functions in a
>>> frontend. We decided to add another (set of) Flask application(s) which
>>> link to Kafka topics. These Kafka topics then serve as ingress and egress
>>> for the statefun cluster. However, we're wondering how we can scale this
>>> cluster. On the documentation page some nice figures are provided for
>>> different setups but no implementation details are given. In our case we
>>> are using a remote cluster so we have a Docker instance containing the
>>> `python-stateful-function` and of course the Flink cluster containing a
>>> `master` and `worker`. If I understood correctly, in a remote setting, we
>>> can scale both the Flink cluster and the `python-stateful-function`.
>>> Scaling the Flink cluster is trivial because I can add just more
>>> workers/task-managers (providing more taskslots) just by scaling the worker
>>> instance. However, how can I scale the stateful function also ensuring that
>>> it ends op in the correct Flink job (because we need shared state there). I
>>> tried scaling the Docker instance as well but that didn't seem to work.
>>>
>>> Hope you can give me some leads there.
>>> Thanks in advance!
>>>
>>> Kind regards,
>>> Wouter
>>>
>>> Op do 7 mei 2020 om 17:17 schreef Wouter Zorgdrager <
>>> zorgdrag...@gmail.com>:
>>>
 Hi Igal,

 Thanks for your quick reply. Getting back to point 2, I was wondering
 if you could trigger indeed a stateful function directly from Flask and
 also get the reply there instead of using Kafka in between. We want to
 experiment running stateful functions behind a front-end (which should be
 able to trigger a function), but we're a bit afraid that using Kafka
 doesn't scale well if on the frontend side a user has to consume all Kafka
 messages to find the co

Re: ProducerRecord with Kafka Sink for 1.8.0

2020-05-14 Thread Gary Yao
>
> Its because the flink distribution of the cluster is 1.7.2. We use a
> standalone cluster , so in the lib directory in flink the artifact is
> flink-core-1.7.2.jar . I need to pack flink-core-1.9.0.jar from application
> and use child first class loading to use newer version of flink-core.
>

Do you have experience with this technique in production? In general I do
not think this can work; a job pretending to run a newer version of Flink
generally cannot communicate with an older JobManager, which normally does
not even run user code.

If you are stuck with Flink 1.8, maybe it is an option for you to backport
FLINK-11693 to Flink 1.8 yourself and build a custom Kafka connector.

On Tue, May 12, 2020 at 10:04 PM Nick Bendtner  wrote:
>
> Hi Gary,
> Its because the flink distribution of the cluster is 1.7.2. We use a
standalone cluster , so in the lib directory in flink the artifact is
flink-core-1.7.2.jar . I need to pack flink-core-1.9.0.jar from application
and use child first class loading to use newer version of flink-core. If I
have it as provided scope, sure it will work in IntelliJ but not outside of
it .
>
> Best,
> Nick
>
> On Tue, May 12, 2020 at 2:53 PM Gary Yao  wrote:
>>
>> Hi Nick,
>>
>> Can you explain why it is required to package flink-core into your
>> application jar? Usually flink-core is a dependency with provided
>> scope [1]
>>
>> Best,
>> Gary
>>
>> [1]
https://maven.apache.org/guides/introduction/introduction-to-dependency-mechanism.html#dependency-scope
>>
>> On Tue, May 12, 2020 at 5:41 PM Nick Bendtner  wrote:
>> >
>> > Hi Gary,
>> > Thanks for the info. I am aware this feature is available in 1.9.0
onwards. Our cluster is still very old and have CICD challenges,I was
hoping not to bloat up the application jar by packaging even flink-core
with it. If its not possible to do this with older version without writing
our own kafka sink implementation similar to the flink provided version in
1.9.0 then I think we will pack flink-core 1.9.0 with the application and
follow the approach that you suggested. Thanks again for getting back to me
so quickly.
>> >
>> > Best,
>> > Nick
>> >
>> > On Tue, May 12, 2020 at 3:37 AM Gary Yao  wrote:
>> >>
>> >> Hi Nick,
>> >>
>> >> Are you able to upgrade to Flink 1.9? Beginning with Flink 1.9 you
can use
>> >> KafkaSerializationSchema to produce a ProducerRecord [1][2].
>> >>
>> >> Best,
>> >> Gary
>> >>
>> >> [1] https://issues.apache.org/jira/browse/FLINK-11693
>> >> [2]
https://ci.apache.org/projects/flink/flink-docs-release-1.9/api/java/org/apache/flink/streaming/connectors/kafka/KafkaSerializationSchema.html
>> >>
>> >> On Mon, May 11, 2020 at 10:59 PM Nick Bendtner 
wrote:
>> >> >
>> >> > Hi guys,
>> >> > I use 1.8.0 version for flink-connector-kafka. Do you have any
recommendations on how to produce a ProducerRecord from a kafka sink.
Looking to add support to kafka headers therefore thinking about
ProducerRecord. If you have any thoughts its highly appreciated.
>> >> >
>> >> > Best,
>> >> > Nick.


Re: How To subscribe a Kinesis Stream using enhance fanout?

2020-05-14 Thread Xiaolong Wang
Thanks, I'll check it out.

On Thu, May 14, 2020 at 6:26 PM Tzu-Li (Gordon) Tai 
wrote:

> Hi Xiaolong,
>
> You are right, the way the Kinesis connector is implemented / the way the
> AWS APIs are used, does not allow it to consume Kinesis streams with
> enhanced fan-out enabled consumers [1].
> Could you open a JIRA ticket for this?
> As far as I can tell, this could be a valuable contribution to the
> connector for Kinesis users who require dedicated throughput isolated from
> other running consumers.
>
> Cheers,
> Gordon
>
> [1]
> https://docs.aws.amazon.com/streams/latest/dev/building-enhanced-consumers-api.html
>
> On Wed, May 13, 2020 at 1:44 PM Xiaolong Wang 
> wrote:
>
>> Hello Flink Community!
>>
>>   I'm currently coding on a project relying on AWS Kinesis. With the
>> provided connector (flink-connector-kinesis_2.11;1.10.0), I can consume the
>> message.
>>
>>  But as the main stream is used among several other teams, I was
>> required to use the enhance fanout of Kinesis. I checked the connector code
>> and found no implementations.
>>
>>  Has this issue occurred to anyone before?
>>
>> Thanks for your help.
>>
>


Re: How To subscribe a Kinesis Stream using enhance fanout?

2020-05-14 Thread Tzu-Li (Gordon) Tai
Hi Xiaolong,

You are right, the way the Kinesis connector is implemented / the way the
AWS APIs are used, does not allow it to consume Kinesis streams with
enhanced fan-out enabled consumers [1].
Could you open a JIRA ticket for this?
As far as I can tell, this could be a valuable contribution to the
connector for Kinesis users who require dedicated throughput isolated from
other running consumers.

Cheers,
Gordon

[1]
https://docs.aws.amazon.com/streams/latest/dev/building-enhanced-consumers-api.html

On Wed, May 13, 2020 at 1:44 PM Xiaolong Wang 
wrote:

> Hello Flink Community!
>
>   I'm currently coding on a project relying on AWS Kinesis. With the
> provided connector (flink-connector-kinesis_2.11;1.10.0), I can consume the
> message.
>
>  But as the main stream is used among several other teams, I was
> required to use the enhance fanout of Kinesis. I checked the connector code
> and found no implementations.
>
>  Has this issue occurred to anyone before?
>
> Thanks for your help.
>


Re: Flink operator throttle

2020-05-14 Thread Benchao Li
AFAIK, `FlinkKafkaConsumer010#setRateLimiter` can configure the kafka
source to have a rate limiter.
(I assume you uses Kafka)
However it only exists in Kafka 0.10 DataStream Connector, not in other
versions nor table api.

王雷  于2020年5月14日周四 下午5:31写道:

> hi, All
>
> Does Flink support rate limitation?
> How to limit the rate when the external database connected by the sink
> operator has throughput limitation.
> Instead of passive back pressure after reaching the limit of the external
> database, we want to limit rate actively.
>
> Thanks
> Ray
>


-- 

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenc...@gmail.com; libenc...@pku.edu.cn


Flink operator throttle

2020-05-14 Thread 王雷
hi, All

Does Flink support rate limitation?
How to limit the rate when the external database connected by the sink
operator has throughput limitation.
Instead of passive back pressure after reaching the limit of the external
database, we want to limit rate actively.

Thanks
Ray