Working around lack of SQL triggers

2019-04-26 Thread deklanw
I'm not sure how to express my logic simply where early triggers are a
necessity.

My application has large windows (2 weeks~) where early triggering is
absolutely required. But, also, my application has mostly relatively simple
logic which can be expressed in SQL. There's a ton of duplication, like the
following 


```
SELECT A,B,C,
COUNT(*) FILTER (WHERE my_condition) AS total_conditions,
COUNT(*) AS total,
ROUND(COUNT(*) FILTER (WHERE my_condition)/(COUNT(*)), 1) AS
condition_rate,
AVG(D),
AVG(E),
AVG(F)
FROM foo
GROUP BY A,B,C, SESSION(...)
```

Just imagine these kinds of queries duplicated a ton, just varying which
fields are being averaged and grouped by.

This is fairly easy to do with SQL, with some copying and pasting. Just
Ctrl+Fing to give an idea (so far),
COUNT - 50
AVG - 27
GROUP BY - 12

Since Flink doesn't support GROUPING SETS for streaming, I'll need to
duplicate a lot of these queries actually. So this is an underestimation.

Is writing an absolute ton of custom AggregateFunction boilerplate the only
way to solve this problem? Is there no way to abstract this while
maintaining early triggers? I feel like I'm missing something. Is Flink SQL
streaming only for short windows where triggering only at the end of the
window is acceptable?




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


FileInputFormat that processes files in chronological order

2019-04-26 Thread Sergei Poganshev
Given a directory with input files of the following format:

/data/shard1/file1.json
/data/shard1/file2.json
/data/shard1/file3.json
/data/shard2/file1.json
/data/shard2/file2.json
/data/shard2/file3.json

Is there a way to make FileInputFormat with parallelism 2 split processing
by "shard" (folder) and then process files in chronological order
(file1.json, file2.json, file3.json) in each shard? Will I have to
implement a custom FilInputFormat for that?


Re: Emitting current state to a sink

2019-04-26 Thread Avi Levi
Hi Timo,
I defiantly did. but broadcasting a command and trying to address the
persisted state (I mean the state of the data stream and not the
broadcasted one) you get the exception that I wrote
(java.lang.NullPointerException: No key set. This method should not be
called outside of a keyed context). e.g doing something like

override def processBroadcastElement(value: BroadcastRequest, ctx:
KeyedBroadcastProcessFunction[String, Request, BroadcastRequest,
Response]#Context, out: Collector[Response]): Unit = {
  value match {
case Command(StateCmd.Fetch, _) =>
  if (state.value() != null) {
ouout.collecy(state.value())
  }

will yield that exception

BR
Avi

On Fri, Apr 26, 2019 at 11:55 AM Timo Walther  wrote:

> This Message originated outside your organization.
>
> Hi Avi,
>
> did you have a look at the .connect() and .broadcast() API
> functionalities? They allow you to broadcast a control stream to all
> operators. Maybe this example [1] or other examples in this repository
> can help you.
>
> Regards,
> Timo
>
> [1]
>
> https://github.com/ververica/flink-training-exercises/blob/master/src/main/java/com/dataartisans/flinktraining/solutions/datastream_java/broadcast/TaxiQuerySolution.java
> 
>
> Am 26.04.19 um 07:57 schrieb Avi Levi:
> > Hi,
> > We have a keyed pipeline with persisted state.
> > Is there a way to broadcast a command and collect all values that
> > persisted in  the state ?
> >
> > The end result can be for example sending a fetch command to all
> > operators and emitting the results to some sink
> >
> > why do we need it ? from time to time we might want to check if we are
> > missing keys what are the additional keys or simply emit the current
> > state to a table and to query it.
> >
> > I tried simply broadcasting a command and addressing the persisted
> > state but that resulted with:
> > java.lang.NullPointerException: No key set. This method should not be
> > called outside of a keyed context.
> >
> > is there a good way to achieve that ?
> >
> > Cheers
> > Avi
>
>


Re: Flink CLI

2019-04-26 Thread Gary Yao
Hi Steve,

(1)

The CLI action you are looking for is called "modify" [1]. However, we
want
to temporarily disable this feature beginning from Flink 1.9 due to some
caveats with it [2]. If you have objections, it would be appreciated if
you
could comment on the respective thread on the user/dev mailing list.

(2)

There is currently no option to have the CLI output JSON. However, as
others
have pointed out, you can use the REST API to invoke actions on the
cluster,
such as drawing savepoints [3]. This is also what the CLI ultimately
does
[4].

Best,
Gary

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/cli.html
[2]
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/DISCUSS-Temporarily-remove-support-for-job-rescaling-via-CLI-action-quot-modify-quot-td27447.html
[3]
https://ci.apache.org/projects/flink/flink-docs-release-1.8/monitoring/rest_api.html#jobs-jobid-savepoints
[4]
https://github.com/apache/flink/blob/767fe152cb69a204261a0770412c8b28d037614d/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java#L415-L424

On Wed, Apr 24, 2019 at 5:06 PM Steven Nelson 
wrote:

> Hello!
>
> I am working on automating our deployments to our Flink cluster. I had a
> couple questions about the flink cli.
>
> 1) I thought there was an "update" command that would internally manage
> the cancel with savepoint, upload new jar, restart from savepoint process.
>
> 2) Is there a way to get the Flink cli to output it's result in a json
> format? Right now I would need to parse the results of the "flink list"
> command to get the job id, cancel the job with savepoint, parse the results
> of that to get the savepoint filename, then restore using that. Parsing the
> output seems brittle to me.
>
> Thought?
> -Steve
>
>


Re: Exceptions when launching counts on a Flink DataSet concurrently

2019-04-26 Thread Juan Rodríguez Hortalá
Hi Timo,

Thanks for your answer. I was surprised to have problems calling those
methods concurrently, because I though data sets were immutable. Now I
understand calling count or collect mutates the data set, not its contents
but some kind of execution plan included in the data set.

I suggest adding a remark about this lack of thread safety to the
documentation. Maybe it’s already there but I haven’t seen it. I also
understand repeated calls to collect and count the safe data set are ok as
long as they are done sequentially, and not concurrently.

Thanks,

Juan

On Fri, Apr 26, 2019 at 02:00 Timo Walther  wrote:

> Hi Juan,
>
> as far as I know we do not provide any concurrency guarantees for count()
> or collect(). Those methods need to be used with caution anyways as the
> result size must not exceed a certain threshold. I will loop in Fabian who
> might know more about the internals of the execution?
>
> Regards,
> Timo
>
>
> Am 26.04.19 um 03:13 schrieb Juan Rodríguez Hortalá:
>
> Any thoughts on this?
>
> On Sun, Apr 7, 2019, 6:56 PM Juan Rodríguez Hortalá <
> juan.rodriguez.hort...@gmail.com> wrote:
>
>> Hi,
>>
>> I have a very simple program using the local execution environment, that
>> throws NPE and other exceptions related to concurrent access when launching
>> a count for a DataSet from different threads. The program is
>> https://gist.github.com/juanrh/685a89039e866c1067a6efbfc22c753e which is
>> basically this:
>>
>> def doubleCollectConcurrent = {
>>   val env = ExecutionEnvironment.createLocalEnvironment(3)
>>   val xs = env.fromCollection(1 to 100).map{_+1}
>>   implicit val ec = 
>> ExecutionContext.fromExecutor(Executors.newFixedThreadPool(10))
>>
>>   val pendingActions = Seq.fill(10)(
>> Future { println(s"xs.count = ${xs.count}") }
>>   )
>>   val pendingActionsFinished = Future.fold(pendingActions)(Unit){ (u1, u2) =>
>> println("pending action finished")
>> Unit  }
>>   Await.result(pendingActionsFinished, 10 seconds)
>>
>>   ok}
>>
>>
>> It looks like the issue is on OperatorTranslation.java at
>> https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/operators/OperatorTranslation.java#L51,
>> when a sink is added to the sinks list while that list is being traversed.
>> I have the impression that this is by design, so I'd like to confirm that
>> this is the expected behaviour, and whether this is happening only for the
>> local execution environment, or if this affects all execution environments
>> implementations. Other related questions I have are:
>>
>>- Is this documented somewhere? I'm quite new to Flink, so I might
>>have missed this. Is there any known workaround for concurrently launching
>>counts and other sink computations on the same DataSet?
>>- Is it safe performing a sequence of calls to DataSet sink methods
>>like count or collect, on the same DataSet, as long as they are performed
>>from the same thread? From my experience it looks like it is, but I'd like
>>to get a confirmation if possible.
>>
>> This might be related to
>> https://stackoverflow.com/questions/51035465/concurrentmodificationexception-in-flink
>> but I'm not sure.
>>
>> Thanks a lot for your help.
>>
>> Greetings,
>>
>> Juan
>>
>
>


Re: RichAsyncFunction Timer Service

2019-04-26 Thread Mikhail Pryakhin
Hi David, 
Thank you!

Yes, fair enough, but take for instance a BucketingSink class[1], it is a 
RichFunction which employs Timeservice to execute time-based logic, which is 
not directly associated with an event flow, like for example closing files 
every n minutes, etc. In an AsyncFunction I intended to use Timeservice the 
same way, like periodically reloading configuration from outside. 
Does it make sense?

[1] 
https://github.com/apache/flink/blob/24c2e17c8d52ae2f0f897a5806a3a44fdf62b0a5/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java#L405

Kind Regards,
Mike Pryakhin

> On 25 Apr 2019, at 10:59, Dawid Wysakowicz  wrote:
> 
> Hi Mike,
> 
> I think the reason why there is no access to TimerService in async function 
> is that as it is an async function, there are no guarantees when/and where(at 
> which stage of the pipeline) the function will actually be executed. This 
> characteristic doesn't align with TimerService and timely callbacks.
> 
> Best,
> 
> Dawid
> 
> On 19/04/2019 17:41, Mikhail Pryakhin wrote:
>> Hello, Flink community!
>> 
>> It happens that I need to access a timer service in a RichAsyncFunction 
>> implementation. I know it's normally accomplished via 
>> StreamingRuntimeContext instance available in a RichFunction, but 
>> unfortunately, RichAsyncFunction extending RichFunction overrides 
>> “setRuntimeContext” method [1] wrapping a RuntimeContext instance passed as 
>> the method argument into a RichAsyncFunctionRuntimeContext instance [2]. 
>> This RichAsyncFunction specific RuntimeContext implementation is private [2] 
>> which makes it infeasible to gain access to a wrapped original 
>> RuntimeContext thus making it impossible to leverage timer service in 
>> RichAsyncFunction implementations. Just curious is there any reason for 
>> that? Can we make this implementation public or somehow share a wrapped 
>> instance?
>> 
>> Many thanks in advance!
>> 
>> [1] 
>> https://github.com/apache/flink/blob/c96a4d7afe379a291cc538ca36af896df8dc2127/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunction.java#L76
>>  
>> 
>> [2] 
>> https://github.com/apache/flink/blob/c96a4d7afe379a291cc538ca36af896df8dc2127/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunction.java#L100
>>  
>> 
>> 
>> 
>> 
>> Kind Regards,
>> Mike Pryakhin
>> 



smime.p7s
Description: S/MIME cryptographic signature


Re: Serialising null value in case-class

2019-04-26 Thread Timo Walther
Currently, tuples and case classes are the most efficient data types 
because they avoid the need for special null handling. Everything else 
is hard to estimate. You might need to perform micro benchmarks with the 
serializers you want to use if you have a very performance critical use 
case. Object types vs primitive types don't make a big difference (also 
for Scala) as their value is serialzed by the same serializers.


I hope this helps.

Timo


Am 26.04.19 um 13:17 schrieb Averell:

Thank you Timo.

In term of performance, does the use of Option[] cause performance impact? I
guess that there is because there will be one more layer of object handling,
isn't it?

I am also confused about choosing between primitive types (Int, Long) vs
object type (Integer, JLong). I have seen many places in Flink documents
that Java primitive types are recommended. But how are Scala types?

Thanks and regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/





Re: Serialising null value in case-class

2019-04-26 Thread Averell
Thank you Timo.

In term of performance, does the use of Option[] cause performance impact? I
guess that there is because there will be one more layer of object handling,
isn't it?

I am also confused about choosing between primitive types (Int, Long) vs
object type (Integer, JLong). I have seen many places in Flink documents
that Java primitive types are recommended. But how are Scala types?

Thanks and regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Serialising null value in case-class

2019-04-26 Thread Timo Walther

Hi Averell,

the reason for this lies in the internal serializer implementation. In 
general, the composite/wrapping type serializer is responsible for 
encoding nulls. The case class serialzer does not support nulls, because 
Scala discourages the use of nulls and promotes `Option`. Some 
serializers such as `String` use the length binary field internally to 
encode nulls see [1]. For a full list of Scala types, I would recommend 
this class [2].


Regards,
Timo

[1] 
https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/types/StringValue.java#L788
[2] 
https://github.com/apache/flink/blob/master/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/Types.scala


Am 26.04.19 um 11:30 schrieb Averell:

Good day,

I have a case-class defined like this:

 case class MyClass(ts: Long, s1: String, s2: String, i1: Integer,  i2:
Integer)
 object MyClass {
 val EMPTY = MyClass(0L, null, null, 0, 0)
 def apply(): MyClass = EMPTY
 }

My code has been running fine (I was not aware of the limitation mentioned
in
https://ci.apache.org/projects/flink/flink-docs-stable/dev/types_serialization.html)

But when I tried to create the instance /MyClass(0L, null, null, *null*,
0)/, I got the following error: /org.apache.flink.types.NullFieldException:
Field 3 is null, but expected to hold a value./

I am confused. Why there's the difference between a null String and a null
Integer?

Thanks and regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/





How to let Flink 1.7.X run Flink session cluster on YARN in Java 7 default environment

2019-04-26 Thread 胡逸才
At present, all YARN clusters adopt JAVA 7 environment.

While trying to use FLINK to handle the deployment of flow processing business 
scenarios, it was found that FLINK ON YARN mode always failed to perform a 
session task. The application log of YARN shows Unsupported major. minor 
version 52.0.

I tried to add env. java. home: < JDK 1.8PATH > in flink-conf. yaml of the 
mailing list solution. And the startup command adds -yD yarn. taskmanager. env. 
JAVA_HOME= < JDK1.8PATH>、-yD containerized. master. env. JAVA_HOME= < 
JDK1.8PATH>, -yD containerized. taskmanager. env. JAVA_HOME= < JDK1.8PATH>. 
Flink session cluster in YARN can not run Application in JAVA 8 environment.

So can I use Flink1.7.X submit Flink session cluster application in YARN under 
JAVA 7 environment?

Re: Identify orphan records after joining two streams

2019-04-26 Thread Averell
Hi Dawid,

I just tried to change from CoProcessFunction with onTimer() to
ProcessWindowFunction with Trigger and TumblingWindow. So I can key my
stream by (id) instead of (id, eventTime). With this, I can use
/reinterpretAsKeyedStream/, and hope that it would give better performance.
I can also use the out of the box function sideOutputLateData() 
Not sure whether I would really be benefited from that.

Thanks and regards,
Averell 



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Serialising null value in case-class

2019-04-26 Thread Averell
Good day,

I have a case-class defined like this:

case class MyClass(ts: Long, s1: String, s2: String, i1: Integer,  i2:
Integer)
object MyClass {
val EMPTY = MyClass(0L, null, null, 0, 0)
def apply(): MyClass = EMPTY
}

My code has been running fine (I was not aware of the limitation mentioned
in
https://ci.apache.org/projects/flink/flink-docs-stable/dev/types_serialization.html)

But when I tried to create the instance /MyClass(0L, null, null, *null*,
0)/, I got the following error: /org.apache.flink.types.NullFieldException:
Field 3 is null, but expected to hold a value./

I am confused. Why there's the difference between a null String and a null
Integer?

Thanks and regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Exceptions when launching counts on a Flink DataSet concurrently

2019-04-26 Thread Timo Walther

Hi Juan,

as far as I know we do not provide any concurrency guarantees for 
count() or collect(). Those methods need to be used with caution anyways 
as the result size must not exceed a certain threshold. I will loop in 
Fabian who might know more about the internals of the execution?


Regards,
Timo


Am 26.04.19 um 03:13 schrieb Juan Rodríguez Hortalá:

Any thoughts on this?

On Sun, Apr 7, 2019, 6:56 PM Juan Rodríguez Hortalá 
> wrote:


Hi,

I have a very simple program using the local execution
environment, that throws NPE and other exceptions related to
concurrent access when launching a count for a DataSet from
different threads. The program is
https://gist.github.com/juanrh/685a89039e866c1067a6efbfc22c753e
which is basically this:

def doubleCollectConcurrent = {
   val env = ExecutionEnvironment.createLocalEnvironment(3)
   val xs = env.fromCollection(1 to100).map{_+1}
   implicit val ec = 
ExecutionContext.fromExecutor(Executors.newFixedThreadPool(10))

   val pendingActions =Seq.fill(10)(
 Future {println(s"xs.count = ${xs.count}") }
   )
   val pendingActionsFinished = Future.fold(pendingActions)(Unit){ (u1, u2) 
=>
 println("pending action finished")
 Unit }
   Await.result(pendingActionsFinished, 10 seconds)

   ok }


It looks like the issue is on OperatorTranslation.java at

https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/operators/OperatorTranslation.java#L51,
when a sink is added to the sinks list while that list is being
traversed. I have the impression that this is by design, so I'd
like to confirm that this is the expected behaviour, and whether
this is happening only for the local execution environment, or if
this affects all execution environments implementations. Other
related questions I have are:

  * Is this documented somewhere? I'm quite new to Flink, so I
might have missed this. Is there any known workaround for
concurrently launching counts and other sink computations on
the same DataSet?
  * Is it safe performing a sequence of calls to DataSet sink
methods like count or collect, on the same DataSet, as long as
they are performed from the same thread? From my experience it
looks like it is, but I'd like to get a confirmation if possible.

This might be related to

https://stackoverflow.com/questions/51035465/concurrentmodificationexception-in-flink
but I'm not sure.

Thanks a lot for your help.

Greetings,

Juan





Re: Emitting current state to a sink

2019-04-26 Thread Timo Walther

Hi Avi,

did you have a look at the .connect() and .broadcast() API 
functionalities? They allow you to broadcast a control stream to all 
operators. Maybe this example [1] or other examples in this repository 
can help you.


Regards,
Timo

[1] 
https://github.com/ververica/flink-training-exercises/blob/master/src/main/java/com/dataartisans/flinktraining/solutions/datastream_java/broadcast/TaxiQuerySolution.java


Am 26.04.19 um 07:57 schrieb Avi Levi:

Hi,
We have a keyed pipeline with persisted state.
Is there a way to broadcast a command and collect all values that 
persisted in  the state ?


The end result can be for example sending a fetch command to all 
operators and emitting the results to some sink


why do we need it ? from time to time we might want to check if we are 
missing keys what are the additional keys or simply emit the current 
state to a table and to query it.


I tried simply broadcasting a command and addressing the persisted 
state but that resulted with:
java.lang.NullPointerException: No key set. This method should not be 
called outside of a keyed context.


is there a good way to achieve that ?

Cheers
Avi





Re: kafka partitions, data locality

2019-04-26 Thread Stefan Richter
Hi Sergey,

The point why this I flagged as beta is actually less about stability but more 
about the fact that this is supposed to be more of a "power user" feature 
because bad things can happen if your data is not 100% correctly partitioned in 
the same way as Flink would partition it. This is why typically you should only 
use it if the data was partitioned by Flink and you are very sure what your are 
doing, because the is not really something we can to at the API level to 
protect you from mistakes in using this feature. Eventually some runtime 
exceptions might show you that something is going wrong, but that is not 
exactly a good user experience.

On a different note, there actually is currently one open issue [1] to be aware 
of in connection with this feature and operator chaining, but at the same time 
this is something that should not hard to fix in for the next minor release.

Best,
Stefan

[1] 
https://issues.apache.org/jira/browse/FLINK-12296?focusedCommentId=16824945=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16824945
  

> On 26. Apr 2019, at 09:48, Smirnov Sergey Vladimirovich (39833) 
>  wrote:
> 
> Hi,
>  
> Dawid, great, thanks!
> Any plans to make it stable? 1.9?
>  
>  
> Regards,
> Sergey
>  
> From: Dawid Wysakowicz [mailto:dwysakow...@apache.org] 
> Sent: Thursday, April 25, 2019 10:54 AM
> To: Smirnov Sergey Vladimirovich (39833) ; Ken Krugler 
> 
> Cc: user@flink.apache.org; d...@flink.apache.org
> Subject: Re: kafka partitions, data locality
>  
> Hi Smirnov,
> 
> Actually there is a way to tell Flink that data is already partitioned. You 
> can try the reinterpretAsKeyedStream[1] method. I must warn you though this 
> is an experimental feature.
> 
> Best,
> 
> Dawid
> 
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/stream/experimental.html#experimental-features
>  
> 
> On 19/04/2019 11:48, Smirnov Sergey Vladimirovich (39833) wrote:
> Hi Ken,
>  
> It’s a bad story for us: even for a small window we have a dozens of 
> thousands events per job with 10x in peaks or even more. And the number of 
> jobs was known to be high. So instead of N operations (our producer/consumer 
> mechanism) with shuffle/resorting (current flink realization) it will be 
> N*ln(N) - the tenfold loss of execution speed!
> 4 all, my next step? Contribute to apache flink? Issues backlog?
>  
>  
> With best regards,
> Sergey
> From: Ken Krugler [mailto:kkrugler_li...@transpac.com 
> ] 
> Sent: Wednesday, April 17, 2019 9:23 PM
> To: Smirnov Sergey Vladimirovich (39833)  
> 
> Subject: Re: kafka partitions, data locality
>  
> Hi Sergey,
>  
> As you surmised, once you do a keyBy/max on the Kafka topic, to group by 
> clientId and find the max, then the topology will have a partition/shuffle to 
> it.
>  
> This is because Flink doesn’t know that client ids don’t span Kafka 
> partitions.
>  
> I don’t know of any way to tell Flink that the data doesn’t need to be 
> shuffled. There was a discussion 
> 
>  about adding a keyByWithoutPartitioning a while back, but I don’t think that 
> support was ever added.
>  
> A simple ProcessFunction with MapState (clientId -> max) should allow you do 
> to the same thing without too much custom code. In order to support 
> windowing, you’d use triggers to flush state/emit results.
>  
> — Ken
>  
>  
> On Apr 17, 2019, at 2:33 AM, Smirnov Sergey Vladimirovich (39833) 
> mailto:s.smirn...@tinkoff.ru>> wrote:
>  
> Hello,
>  
> We planning to use apache flink as a core component of our new streaming 
> system for internal processes (finance, banking business) based on apache 
> kafka.
> So we starting some research with apache flink and one of the question, 
> arises during that work, is how flink handle with data locality.
> I`ll try to explain: suppose we have a kafka topic with some kind of events. 
> And this events groups by topic partitions so that the handler (or a job 
> worker), consuming message from a partition, have all necessary information 
> for further processing. 
> As an example, say we have client’s payment transaction in a kafka topic. We 
> grouping by clientId (transaction with the same clientId goes to one same 
> kafka topic partition) and the task is to find max transaction per client in 
> sliding windows. In terms of map\reduce there is no needs to shuffle data 
> between all topic consumers, may be it`s worth to do within each consumer to 
> gain some speedup due to increasing number of executors within each partition 
> data.
> And my question is how flink will work in this case. Do it shuffle all data, 
> or it have some settings to avoid this extra unnecessary 

Re: How to implement custom stream operator over a window? And after the Count-Min Sketch?

2019-04-26 Thread Felipe Gutierrez
Hi Rong,

thanks for your insights. I agree with the three points that you said. My
plan is to implement an operator that compute the Count-min sketch and
developers can assign functions to increase the estimative of the sketch
(adding more/different functions the sketch will be more precise, hence
more heavy). But the operator will also hold default hash functions so the
developer does not have to add any function with he does not want.

Like I said, I will implement on my project. But I totally agree to keep
the discussion on the original FLINK-2147
 JIRA ticket. Doing so I
can collect more opinions =)

Thanks!
Felipe
*--*
*-- Felipe Gutierrez*

*-- skype: felipe.o.gutierrez*
*--* *https://felipeogutierrez.blogspot.com
*


On Fri, Apr 26, 2019 at 4:10 AM Rong Rong  wrote:

> Hi Felipe,
>
> I am not sure the algorithm requires to construct a new extension of the
> window operator. I think your implementation of the CountMinSketch object
> as an aggregator:
> E.g.
> 1. AggregateState (ACC) should be the aggregating accumulate
> count-min-sketch 2-D hash array (plus a few other needed fields).
> 2. accumulate method just simply do the update.
> 3. getResult simply get the frequency from sketch.
>
> Thus you will not need to use a customized ValueStateDescriptor.
>
> But I agree that maybe it is a good idea to support a class of use cases
> that requires approximate aggregate state (like HyperLogLog?), this
> might've been a good value add in my opinion.
> I think some further discussion is needed if we are going down that path.
> Do you think the original FLINK-2147
>  JIRA ticket is a good
> place to carry out that discussion? We can probably continue there or
> create a new JIRA for discussion.
>
> --
> Rong
>
> On Wed, Apr 24, 2019 at 1:32 AM Felipe Gutierrez <
> felipe.o.gutier...@gmail.com> wrote:
>
>> Hi Rong,
>>
>> thanks for your reply. I guess I already did something regarding what you
>> have told to me. I have one example on this application [1], which uses
>> this state [2]  and computes a CountMinSketch [3].
>>
>> I am seeking how to implement my own operator over a window in order to
>> have more fine-grained control over it and learn with it. And hopefully,
>> building a path to contribute to Flink in the future [4].
>>
>> [1]
>> https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/MultiSensorMultiStationsReadingMqtt2.java#L69
>> [2]
>> https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/MultiSensorMultiStationsReadingMqtt2.java#L182
>> [3]
>> https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/util/CountMinSketch.java
>> [4] https://issues.apache.org/jira/browse/FLINK-2147
>>
>> Best,
>> Felipe
>> *--*
>> *-- Felipe Gutierrez*
>>
>> *-- skype: felipe.o.gutierrez*
>> *--* *https://felipeogutierrez.blogspot.com
>> *
>>
>>
>> On Wed, Apr 24, 2019 at 2:06 AM Rong Rong  wrote:
>>
>>> Hi Felipe,
>>>
>>> In a short glance, the question can depend on how your window is (is
>>> there any overlap like sliding window) and how many data you would like to
>>> process.
>>>
>>> In general, you can always buffer all the data into a ListState and
>>> apply your window function by iterating through all those buffered elements
>>> [1]. Provided that the data size is small enough to be hold efficiently in
>>> the state-backend.
>>> If this algorithm has some sort of pre-aggregation that can simplify the
>>> buffering through an incremental, orderless aggregation, you can also think
>>> about using [2].
>>> With these two approaches, you do not necessarily need to implement your
>>> own window operator (extending window operator can be tricky), and you also
>>> have access to the internal state [3].
>>>
>>> Hope these helps your exploration.
>>>
>>> Thanks,
>>> Rong
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#processwindowfunction
>>> [2]
>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#processwindowfunction-with-incremental-aggregation
>>> [3]
>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#using-per-window-state-in-processwindowfunction
>>>
>>> On Tue, Apr 23, 2019 at 8:16 AM Felipe Gutierrez <
>>> felipe.o.gutier...@gmail.com> wrote:
>>>
 Hi,

 I want to implement my own operator that computes the Count-Min Sketch
 over a window in Flink. Then, I found this Jira issue [1]
  which is exactly
 what I want. I believe that I have to work out my skills to arrive at a
 mature solution.

 So, the first thing that comes to my mind is to create my custom
 

RE: kafka partitions, data locality

2019-04-26 Thread Smirnov Sergey Vladimirovich (39833)
Hi,

Dawid, great, thanks!
Any plans to make it stable? 1.9?


Regards,
Sergey

From: Dawid Wysakowicz [mailto:dwysakow...@apache.org]
Sent: Thursday, April 25, 2019 10:54 AM
To: Smirnov Sergey Vladimirovich (39833) ; Ken Krugler 

Cc: user@flink.apache.org; d...@flink.apache.org
Subject: Re: kafka partitions, data locality


Hi Smirnov,

Actually there is a way to tell Flink that data is already partitioned. You can 
try the reinterpretAsKeyedStream[1] method. I must warn you though this is an 
experimental feature.

Best,

Dawid

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/stream/experimental.html#experimental-features
On 19/04/2019 11:48, Smirnov Sergey Vladimirovich (39833) wrote:
Hi Ken,

It’s a bad story for us: even for a small window we have a dozens of thousands 
events per job with 10x in peaks or even more. And the number of jobs was known 
to be high. So instead of N operations (our producer/consumer mechanism) with 
shuffle/resorting (current flink realization) it will be N*ln(N) - the tenfold 
loss of execution speed!
4 all, my next step? Contribute to apache flink? Issues backlog?


With best regards,
Sergey
From: Ken Krugler [mailto:kkrugler_li...@transpac.com]
Sent: Wednesday, April 17, 2019 9:23 PM
To: Smirnov Sergey Vladimirovich (39833) 

Subject: Re: kafka partitions, data locality

Hi Sergey,

As you surmised, once you do a keyBy/max on the Kafka topic, to group by 
clientId and find the max, then the topology will have a partition/shuffle to 
it.

This is because Flink doesn’t know that client ids don’t span Kafka partitions.

I don’t know of any way to tell Flink that the data doesn’t need to be 
shuffled. There was a 
discussion
 about adding a keyByWithoutPartitioning a while back, but I don’t think that 
support was ever added.

A simple ProcessFunction with MapState (clientId -> max) should allow you do to 
the same thing without too much custom code. In order to support windowing, 
you’d use triggers to flush state/emit results.

— Ken


On Apr 17, 2019, at 2:33 AM, Smirnov Sergey Vladimirovich (39833) 
mailto:s.smirn...@tinkoff.ru>> wrote:

Hello,

We planning to use apache flink as a core component of our new streaming system 
for internal processes (finance, banking business) based on apache kafka.
So we starting some research with apache flink and one of the question, arises 
during that work, is how flink handle with data locality.
I`ll try to explain: suppose we have a kafka topic with some kind of events. 
And this events groups by topic partitions so that the handler (or a job 
worker), consuming message from a partition, have all necessary information for 
further processing.
As an example, say we have client’s payment transaction in a kafka topic. We 
grouping by clientId (transaction with the same clientId goes to one same kafka 
topic partition) and the task is to find max transaction per client in sliding 
windows. In terms of map\reduce there is no needs to shuffle data between all 
topic consumers, may be it`s worth to do within each consumer to gain some 
speedup due to increasing number of executors within each partition data.
And my question is how flink will work in this case. Do it shuffle all data, or 
it have some settings to avoid this extra unnecessary shuffle/sorting 
operations?
Thanks in advance!


With best regards,
Sergey Smirnov

--
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
Custom big data solutions & training
Flink, Solr, Hadoop, Cascading & Cassandra



RE: Zeppelin

2019-04-26 Thread Smirnov Sergey Vladimirovich (39833)
Hi,

Dawid, great, thanks for answering.

Jeff,
flink 1.8 with default settings, standalone cluster, one job node and three 
task managers nodes.
zeppelin 0.9 config
checked "Connect to existing cluster"
host: 10.219.179.16
port: 6123
create simple notebook:
%flink
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

zeppelin logs:
2019-04-23 10:09:17,241 WARN  akka.remote.transport.netty.NettyTransport
- Remote connection to [/10.216.26.26:45588] failed with 
org.apache.flink.shaded.akka.org.jboss.netty.handler.codec.frame.TooLongFrameException:
 Adjusted frame length exceeds 10485760: 2147549189 - discarded
2019-04-23 10:09:29,475 WARN  akka.remote.transport.netty.NettyTransport
- Remote connection to [/10.216.26.26:45624] failed with 
org.apache.flink.shaded.akka.org.jboss.netty.handler.codec.frame.TooLongFrameException:
 Adjusted frame length exceeds 10485760: 2147549189 - discarded
flink:
org.apache.thrift.transport.TTransportException
at 
org.apache.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:132)
at 
org.apache.thrift.transport.TTransport.readAll(TTransport.java:86)
at 
org.apache.thrift.protocol.TBinaryProtocol.readStringBody(TBinaryProtocol.java:380)
at 
org.apache.thrift.protocol.TBinaryProtocol.readMessageBegin(TBinaryProtocol.java:230)
at 
org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:77)
at 
org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService$Client.recv_createInterpreter(RemoteInterpreterService.java:189)
at 
org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService$Client.createInterpreter(RemoteInterpreterService.java:172)
at 
org.apache.zeppelin.interpreter.remote.RemoteInterpreter$2.call(RemoteInterpreter.java:169)
at 
org.apache.zeppelin.interpreter.remote.RemoteInterpreter$2.call(RemoteInterpreter.java:165)
at 
org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess.callRemoteFunction(RemoteInterpreterProcess.java:118)
at 
org.apache.zeppelin.interpreter.remote.RemoteInterpreter.internal_create(RemoteInterpreter.java:165)
at 
org.apache.zeppelin.interpreter.remote.RemoteInterpreter.open(RemoteInterpreter.java:132)
at 
org.apache.zeppelin.interpreter.remote.RemoteInterpreter.getFormType(RemoteInterpreter.java:290)
at 
org.apache.zeppelin.notebook.Paragraph.jobRun(Paragraph.java:443)
at 
org.apache.zeppelin.notebook.Paragraph.jobRun(Paragraph.java:75)
at org.apache.zeppelin.scheduler.Job.run(Job.java:181)
at 
org.apache.zeppelin.scheduler.AbstractScheduler.runJob(AbstractScheduler.java:123)
at 
org.apache.zeppelin.scheduler.RemoteScheduler$JobRunner.run(RemoteScheduler.java:187)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
...

Regards,
Sergey

From: Jeff Zhang [mailto:zjf...@gmail.com]
Sent: Thursday, April 25, 2019 4:24 PM
To: Dawid Wysakowicz 
Cc: Smirnov Sergey Vladimirovich (39833) ; 
user@flink.apache.org
Subject: Re: Zeppelin

Thanks Dawid,

Hi Sergey,

I am working on update the flink interpreter of zeppelin to support flink 1.9 
(supposed to be released this summer).
For the current flink interpreter of zeppelin 0.9, I haven't verified it 
against flink 1.8. could you show the full interpreter log ? And what is the 
size your input file ?



Dawid Wysakowicz mailto:dwysakow...@apache.org>> 
于2019年4月25日周四 下午6:31写道:

Hi Sergey,

I am not very familiar with Zepellin. But I know Jeff (cc'ed) is working on 
integrating Flink with some notebooks. He might be able to help you.

Best,

Dawid
On 25/04/2019 08:42, Smirnov Sergey Vladimirovich (39833) wrote:
Hello,

Trying to link Zeppelin 0.9 with Flink 1.8. It`s a small dev cluster deployed 
in standalone manner.
Got the same error as described here 
https://stackoverflow.com/questions/54257671/runnning-a-job-in-apache-flink-standalone-mode-on-zeppelin-i-have-this-error-to
Would appreciate for any support for helping to resolve that problem.

Regards,
Sergey



--
Best Regards

Jeff Zhang


Re: assignTimestampsAndWatermarks not work after KeyedStream.process

2019-04-26 Thread Dawid Wysakowicz
Hi,

Watermarks are meta events that travel independently of data events.

1) If you assingTimestampsAndWatermarks before keyBy, all parallel
instances of trips have some data(this is my assumption) so Watermarks
can be generated. Afterwards even if some of the keyed partitions have
no data, Watermarks are broadcasted/forwarded anyway. In other words if
at some point Watermarks were generated for all partitions of a single
stage, they will be forwarded beyond this point.

2) If you assingTimestampsAndWatermarks after keyBy, you try to assign
watermarks for an empty partition which produces no Watermarks at all
for this partition, therefore there is no progress beyond this point.

I hope this clarifies it a bit.

Best,

Dawid

On 25/04/2019 16:49, an0 wrote:
> If my understanding is correct, then why `assignTimestampsAndWatermarks` 
> before `keyBy` works? The `timeWindowAll` stream's input streams are task 1 
> and task 2, with task 2 idling, no matter whether 
> `assignTimestampsAndWatermarks` is before or after `keyBy`, because whether 
> task 2 receives elements only depends on the key distribution, has nothing to 
> do with timestamp assignment, right?
>
>   
>   /key 1 trips\
>   
> /\  
> (A) trips--> assignTimestampsAndWatermarks-->keyBy
> timeWindowAll
>   
> \   idle/
>   
>   \key 2 trips/
>
>/key 1 trips--> assignTimestampsAndWatermarks\
>  /
>  \  
> (B) trips-->keyBy 
> timeWindowAll
>  \   idle 
> /
>\key 2 trips--> assignTimestampsAndWatermarks/
>
> How things are different between A and B from `timeWindowAll`'s perspective?
>
> BTW, thanks for the webinar link, I'll check it later.
>
> On 2019/04/25 08:30:20, Dawid Wysakowicz  wrote: 
>> Hi,
>>
>> Yes I think your explanation is correct. I can also recommend Seth's
>> webinar where he talks about debugging Watermarks[1]
>>
>> Best,
>>
>> Dawid
>>
>> [1]
>> https://www.ververica.com/resources/webinar/webinar/debugging-flink-tutorial
>>
>> On 22/04/2019 22:55, an0 wrote:
>>> Thanks, I feel I'm getting closer to the truth. 
>>>
>>> So parallelism is the cause? Say my parallelism is 2. Does that mean I get 
>>> 2 tasks running after `keyBy` if even all elements have the same key so go 
>>> to 1 down stream(say task 1)? And it is the other task(task 2) with no 
>>> incoming data that caused the `timeWindowAll` stream unable to progress? 
>>> Because both task 1 and task 2 are its input streams and one is idling so 
>>> its event time cannot make progress?
>>>
>>> On 2019/04/22 01:57:39, Guowei Ma  wrote: 
 HI,

 BoundedOutOfOrdernessTimestampExtractors can send a WM at least after it
 receives an element.

 For after Keyby:
 Flink uses the HashCode of key and the parallelism of down stream to decide
 which subtask would receive the element. This means if your key is always
 same, all the sources will only send the elements to the same down stream
 task, for example only no. 3 BoundedOutOfOrdernessTimestampExtractor.

 For before Keyby:
 In your case, the Source and BoundedOutOfOrdernessTimestampExtractors would
 be chained together, which means every
 BoundedOutOfOrdernessTimestampExtractors will receive elements.

 Best,
 Guowei


 an0  于2019年4月19日周五 下午10:41写道:

> Hi,
>
> First of all, thank you for the `shuffle()` tip. It works. However, I
> still don't understand why it doesn't work without calling `shuffle()`.
>
> Why would not all BoundedOutOfOrdernessTimestampExtractors receive trips?
> All the trips has keys and timestamps. As I said in my reply to Paul, I 
> see
> the same watermarks being extracted.
>
> How could calling `assignTimestampsAndWatermarks` before VS after `keyBy`
> matter? My understanding is any specific window for a specific key always
> receives the exactly same data, and the calling order of
> `assignTimestampsAndWatermarks` and `keyBy` shouldn't affect that.
>
> To make `keyBy` as irrelevant as possible, I tried letting it always
> return the same key so that there is only 1 keyed stream and it is exactly
> the same as the original unkeyed stream. It still doesn't trigger windows:
> ```java
> DataStream trips = env.addSource(consumer);
> 

Re: taskmanager faild

2019-04-26 Thread Yangze Guo
Hi,

目前apache邮件列表不支持附件[1]
可将问题提至jira[2]或提供图片链接

[1] https://commons.apache.org/mail-lists.html
[2] https://issues.apache.org/jira/projects/FLINK/summary

Best,
Yangze Guo

Best,
Yangze Guo


On Fri, Apr 26, 2019 at 11:01 AM Xintong Song  wrote:
>
> hi naisili,
>
> 我没有在你的邮件里看到任何附件、截图或者文字描述的错误,麻烦你再确认一次。
>
> Thank you~
>
> Xintong Song
>
>
>
> On Fri, Apr 26, 2019 at 10:46 AM naisili Yuan 
> wrote:
>
> > 还是集群稳定性问题,发现了这个错误,我想问下是不是我配置集群高可用的问题,是否不依赖zookeeper会更稳定一点。
> > 希望得到回复,谢谢!
> >
> > naisili Yuan  于2019年4月22日周一 下午2:23写道:
> >
> >> 不好意思,我忘记贴图了。
> >> 我的flink standalone集群挂了,查看日志,看到截图上的错误
> >> 我自己分析不明白,谷歌也查不到对应的问题。希望能得到你们的帮助,谢谢!
> >>
> >> 问题描述:我正在运行任务的flink集群跑了两天后挂掉了,原因是所有taskmanager进程全部挂了,只有一个jobmanager还在。
> >>
> >> 集群环境:5台centos7的机器,32核,256GB内存,2个jobmanager,5个taskmanager,每台机器32个slots。jobmanager使用zookeeper做了高可用。
> >> 初步分析原因:zookeeper的问题
> >> 另外:不小心把日志清理了,没法粘贴文字了~
> >>
> >> Xintong Song  于2019年4月22日周一 下午1:27写道:
> >>
> >>> Hi naisili,
> >>>
> >>> This is the user-zh mailing list, so if you speak Chinese you can ask
> >>> questions in Chinese. If you prefer using English, you can send emails to
> >>> u...@flink.apache.org. Hope that helps you.
> >>>
> >>> BTW, I think you forgot to attache the screenshot.
> >>>
> >>> Thank you~
> >>>
> >>> Xintong Song
> >>>
> >>>
> >>>
> >>> On Mon, Apr 22, 2019 at 10:53 AM naisili Yuan 
> >>> wrote:
> >>>
> >>> > I use standalone cluster on flink, and i use zookeeper for the
> >>> jobmanager
> >>> > HA.
> >>> > The Screenshot is my taskmanager proccess down log, falte a error.
> >>> > And is don't know why it failed, even i google the error.
> >>> > Ask for help, thanks.
> >>> >
> >>> >
> >>> >
> >>>
> >>