[jira] [Created] (FLINK-5804) Add [non-partitioned] processing time OVER RANGE BETWEEN UNBOUNDED PRECEDING aggregation to SQL

2017-02-14 Thread sunjincheng (JIRA)
sunjincheng created FLINK-5804:
--

 Summary: Add [non-partitioned] processing time OVER RANGE BETWEEN 
UNBOUNDED PRECEDING aggregation to SQL
 Key: FLINK-5804
 URL: https://issues.apache.org/jira/browse/FLINK-5804
 Project: Flink
  Issue Type: Sub-task
  Components: Table API & SQL
Reporter: sunjincheng
Assignee: sunjincheng






--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-5803) Add [partitioned] processing time OVER RANGE BETWEEN UNBOUNDED PRECEDING aggregation to SQL

2017-02-14 Thread sunjincheng (JIRA)
sunjincheng created FLINK-5803:
--

 Summary: Add [partitioned] processing time OVER RANGE BETWEEN 
UNBOUNDED PRECEDING aggregation to SQL
 Key: FLINK-5803
 URL: https://issues.apache.org/jira/browse/FLINK-5803
 Project: Flink
  Issue Type: Sub-task
  Components: Table API & SQL
Reporter: sunjincheng
Assignee: sunjincheng






--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-5802) Flink SQL calling Hive User-Defined Functions

2017-02-14 Thread Zhuoluo Yang (JIRA)
Zhuoluo Yang created FLINK-5802:
---

 Summary: Flink SQL calling Hive User-Defined Functions
 Key: FLINK-5802
 URL: https://issues.apache.org/jira/browse/FLINK-5802
 Project: Flink
  Issue Type: New Feature
Reporter: Zhuoluo Yang


It's important to call hive udf in Flink SQL. A great many udfs were written in 
hive since last ten years. 
It's really important to reuse the hive udfs. This feature will reduce the cost 
of migration and bring more users to flink.
Spark SQL has already supported this function.

https://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.4.0/bk_spark-guide/content/calling-udfs.html



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Handling skewness and Heterogeniety

2017-02-14 Thread Anis Nasir
Dear All,

I have few use cases for Flink streaming where the cluster consist of
heterogenous machines.

Additionally, there is skew present in both the input distribution (e.g.,
each tuple is drawn from a zipf distribution) and the service time (e.g.,
service time required for each tuple comes from a zipf distribution).

I want to know who Flink will handle such use cases assuming that the
distribution of both workload and cluster is unknown in prior.

Any help will be highly appreciated!


Regards,
Anis


[jira] [Created] (FLINK-5801) Queryable State from Scala job/client fails with key of type Long

2017-02-14 Thread Patrick Lucas (JIRA)
Patrick Lucas created FLINK-5801:


 Summary: Queryable State from Scala job/client fails with key of 
type Long
 Key: FLINK-5801
 URL: https://issues.apache.org/jira/browse/FLINK-5801
 Project: Flink
  Issue Type: Bug
  Components: Queryable State
Affects Versions: 1.2.0
 Environment: Flink 1.2.0
Scala 2.10
Reporter: Patrick Lucas
 Attachments: OrderFulfillment.scala, OrderFulfillmentStateQuery.scala

While working on a demo Flink job, to try out Queryable State, I exposed some 
state of type Long -> custom class via the Query server. However, the query 
server returned an exception when I tried to send a query:

{noformat}
Exception in thread "main" java.lang.RuntimeException: Failed to query state 
backend for query 0. Caused by: java.io.IOException: Unable to deserialize key 
and namespace. This indicates a mismatch in the key/namespace serializers used 
by the KvState instance and this access.
at 
org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer.deserializeKeyAndNamespace(KvStateRequestSerializer.java:392)
at 
org.apache.flink.runtime.state.heap.AbstractHeapState.getSerializedValue(AbstractHeapState.java:130)
at 
org.apache.flink.runtime.query.netty.KvStateServerHandler$AsyncKvStateQueryTask.run(KvStateServerHandler.java:220)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.EOFException
at 
org.apache.flink.runtime.util.DataInputDeserializer.readLong(DataInputDeserializer.java:217)
at 
org.apache.flink.api.common.typeutils.base.LongSerializer.deserialize(LongSerializer.java:69)
at 
org.apache.flink.api.common.typeutils.base.LongSerializer.deserialize(LongSerializer.java:27)
at 
org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer.deserializeKeyAndNamespace(KvStateRequestSerializer.java:379)
... 7 more

at 
org.apache.flink.runtime.query.netty.KvStateServerHandler$AsyncKvStateQueryTask.run(KvStateServerHandler.java:257)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
{noformat}

I banged my head against this for a while, then per [~jgrier]'s suggestion I 
tried simply changing the key from Long to String (modifying the two {{keyBy}} 
calls and the {{keySerializer}} {{TypeHint}} in the attached code) and it 
started working perfectly.

cc [~uce]



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Re: [DISCUSS] Table API / SQL indicators for event and processing time

2017-02-14 Thread Haohui Mai
Hi,

Thanks for starting the discussion. I can see there are multiple trade-offs
in these two approaches. One question I have is that to which extent Flink
wants to open its APIs to allow users to access both processing and event
time.

Before we talk about joins, my understanding for the two approaches that
you mentioned are essentially (1) treating the value of event / processing
time as first-class fields for each row, (2) limiting the scope of time
indicators to only specifying windows. Take the following query as an
example:

SELECT (PROCTIME() - ROWTIME()) / 1000 AS latency FROM table GROUP BY
FLOOR(PROCTIME() TO MINUTES)

There are several questions we can ask:

(1) Is it a valid query?
(2) How efficient the query will be?

For this query I can see arguments from both sides. I think at the end of
the day it really comes down to what Flink wants to support. After working
on FLINK-5624 I'm more inclined to support the second approach (i.e.,
built-in functions). The main reason why is that the APIs of Flink are
designed to separate times from the real payloads. It probably makes sense
for the Table / SQL APIs to have the same designs.

For joins I don't have a clear answer on top of my head. Flink requires two
streams to be put in the same window before doing the joins. This is
essentially a subset of what SQL can express. I don't know what would be
the best approach here.

Regards,
Haohui


On Tue, Feb 14, 2017 at 12:26 AM Fabian Hueske  wrote:

> Hi,
>
> It would as in the query I gave as an example before:
>
> SELECT
>   a,
>   SUM(b) OVER (PARTITION BY c ORDER BY proctime ROWS BETWEEN 2
> PRECEDING AND CURRENT ROW) AS sumB,
> FROM myStream
>
> Here "proctime" would be a system attribute of the table "myStream".
> The table would also have another system attribute called "rowtime" which
> would be used to indicate event time semantics.
> These attributes would always be present in tables which are derived from
> streams.
> Because we still require that streams have timestamps and watermarks
> assigned (either by the StreamTableSource or the somewhere downstream the
> DataStream program) when they are converted into a table, there is no need
> to register anything.
>
> Does that answer your questions?
>
> Best, Fabian
>
>
>
> 2017-02-14 2:04 GMT+01:00 Radu Tudoran :
>
> > Hi Fabian,
> >
> > Thanks for starting the discussion. Before I give my thoughts on this can
> > you please give some examples of how would you see option of using
> "system
> > attributes"?
> > Do you use this when you register the stream as a table, do you use if
> > when you call an SQL query, do you use it when you translate back a table
> > to a stream / write it to a dynamic table?
> >
> > Dr. Radu Tudoran
> > Senior Research Engineer - Big Data Expert
> > IT R Division
> >
> >
> > HUAWEI TECHNOLOGIES Duesseldorf GmbH
> > European Research Center
> > Riesstrasse 25, 80992 München
> >
> > E-mail: radu.tudo...@huawei.com
> > Mobile: +49 15209084330 <+49%201520%209084330>
> > Telephone: +49 891588344173 <+49%2089%201588344173>
> >
> > HUAWEI TECHNOLOGIES Duesseldorf GmbH
> > Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
> > Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
> > Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN
> > Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
> > Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN
> > This e-mail and its attachments contain confidential information from
> > HUAWEI, which is intended only for the person or entity whose address is
> > listed above. Any use of the information contained herein in any way
> > (including, but not limited to, total or partial disclosure,
> reproduction,
> > or dissemination) by persons other than the intended recipient(s) is
> > prohibited. If you receive this e-mail in error, please notify the sender
> > by phone or email immediately and delete it!
> >
> > -Original Message-
> > From: Fabian Hueske [mailto:fhue...@gmail.com]
> > Sent: Tuesday, February 14, 2017 1:01 AM
> > To: dev@flink.apache.org
> > Subject: [DISCUSS] Table API / SQL indicators for event and processing
> time
> >
> > Hi,
> >
> > I'd like to start an discussion about how Table API / SQL queries
> indicate
> > whether an operation is done in event or processing time.
> >
> > 1) Why do we need to indicate the time mode?
> >
> > We need to distinguish event time and processing time mode for operations
> > in queries in order to have the semantics of a query fully defined.
> > This cannot be globally done in the TableEnvironment because some queries
> > explicitly request an expression such as the ORDER BY clause of an OVER
> > window with PRECEDING / FOLLOWING clauses.
> > So we need a way to specify something like the following query:
> >
> > SELECT
> >   a,
> >   SUM(b) OVER (PARTITION BY c ORDER BY proctime ROWS BETWEEN 2 PRECEDING
> > AND CURRENT ROW) AS sumB, FROM myStream

[jira] [Created] (FLINK-5800) Make sure that the CheckpointStreamFactory is instantiated once per operator only

2017-02-14 Thread Stephan Ewen (JIRA)
Stephan Ewen created FLINK-5800:
---

 Summary: Make sure that the CheckpointStreamFactory is 
instantiated once per operator only
 Key: FLINK-5800
 URL: https://issues.apache.org/jira/browse/FLINK-5800
 Project: Flink
  Issue Type: Improvement
  Components: State Backends, Checkpointing
Affects Versions: 1.2.0
Reporter: Stephan Ewen
Assignee: Stephan Ewen
 Fix For: 1.3.0, 1.2.1


Previously, the {{CheckpointSteamFactory}} was created once per checkpoint, and 
its repeated initialization logic (like ensuring existence of the parent 
directory) caused unnecessary load on some file systems at very large scale.




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-5799) Let RpcService.scheduleRunnable return ScheduledFuture

2017-02-14 Thread Till Rohrmann (JIRA)
Till Rohrmann created FLINK-5799:


 Summary: Let RpcService.scheduleRunnable return ScheduledFuture
 Key: FLINK-5799
 URL: https://issues.apache.org/jira/browse/FLINK-5799
 Project: Flink
  Issue Type: Sub-task
  Components: Distributed Coordination
Affects Versions: 1.3.0
Reporter: Till Rohrmann
Assignee: Till Rohrmann
Priority: Minor
 Fix For: 1.3.0


Currently, the method {{RpcService.scheduleRunnable}} does not return a control 
instance for the scheduled runnable. I think it would be good to return a 
{{ScheduledFuture}} with which one can cancel the scheduled runnable after it 
has been scheduled, e.g. a timeout registration which became obsolete. This API 
is also more in line with the {{ScheduledExecutorService}} where one also 
receives a {{ScheduledFuture}} after scheduling a runnable.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-5798) Let the RPCService provide a ScheduledExecutorService

2017-02-14 Thread Till Rohrmann (JIRA)
Till Rohrmann created FLINK-5798:


 Summary: Let the RPCService provide a ScheduledExecutorService
 Key: FLINK-5798
 URL: https://issues.apache.org/jira/browse/FLINK-5798
 Project: Flink
  Issue Type: Sub-task
  Components: Distributed Coordination
Affects Versions: 1.3.0
Reporter: Till Rohrmann
Assignee: Till Rohrmann
Priority: Minor
 Fix For: 1.3.0


Currently the {{RPCService}} interface provides a {{scheduleRunnable}} method 
to schedule {{Runnables}}. I would like to generalize this functionality by 
letting the {{RPCService}} provide a {{ScheduledExecutorService}} to the user. 
That way other components which require such an executor service could simply 
use the one provided by the {{RPCService}}.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Re: FLINK-5734 : Code Generation for NormalizedKeySorter

2017-02-14 Thread Greg Hogan
Pat,

Thanks for adding the new test results. This idea for this implementation
was Gábor's from the FLINK-3722 description.

Since you will be filing a FLIP I recommend including these benchmarks for
consideration and discussion on the mailing list. In part because the PR is
4 months old and need of further review but also that there may be new
ideas or questions as to the form of the new code.

Greg

On Tue, Feb 14, 2017 at 5:40 AM, Gábor Gévay  wrote:

> Hello,
>
> Pat, the table in your email is somehow not visible in my gmail, but
> it is visible here:
> http://apache-flink-mailing-list-archive.1008284.n3.
> nabble.com/FLINK-5734-Code-Generation-for-NormalizedKeySorter-tt15804.
> html#a15936
> Maybe the problem is caused by the formatting.
>
> > FLINK-3722
> > approach seems to be the fastest one.
>
> OK, then I would suggest to do the implementation of the code
> generation on top of the PR for FLINK-3722. I guess we can assume that
> that PR will be merged sooner than the code generation, so there won't
> be any serious merge conflicts this way.
>
> Best,
> Gábor
>
>
>
>
> 2017-02-14 11:16 GMT+01:00 pat.chormai :
> > Hi [~greghogan]
> >
> > I have done the benchmark comparing between FLINK-3722 and our
> approaches.
> > As you can see at *Score * column which represents sorting time,
> FLINK-3722
> > approach seems to be the fastest one.
> >
> >
> >
> >
> >
> > --
> > View this message in context: http://apache-flink-mailing-
> list-archive.1008284.n3.nabble.com/FLINK-5734-Code-Generation-for-
> NormalizedKeySorter-tp15804p15936.html
> > Sent from the Apache Flink Mailing List archive. mailing list archive at
> Nabble.com.
>


[jira] [Created] (FLINK-5797) incorrect use of port range selector in BootstrapTool

2017-02-14 Thread Yelei Feng (JIRA)
Yelei Feng created FLINK-5797:
-

 Summary: incorrect use of port range selector in BootstrapTool
 Key: FLINK-5797
 URL: https://issues.apache.org/jira/browse/FLINK-5797
 Project: Flink
  Issue Type: Bug
  Components: YARN
Affects Versions: 1.2.0, 1.3.0
Reporter: Yelei Feng
 Fix For: 1.3.0


In method {{BootstrapTool.startActorSystem}}, port range is iterated twice.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Re: Using QueryableState inside Flink jobs (and Parameter Server implementation)

2017-02-14 Thread Theodore Vasiloudis
Hello all,

I would also be really interested in how a PS-like architecture would work
in Flink. Note that we not necessarily talking about PS, but generally how
QueryableState can be used for ML tasks with I guess a focus on
model-parallel training.

One suggestion I would make is to take a look at Tensorflow which is also a
dataflow model that has support for distributed computation, both data and
model parallel.

I don't know too much about the internal workings of the system, but I
would point out this from the TF whitepaper [1], Section 11 related work:

It also permits a significant simplification by
> allowing  the  expression  of  stateful  parameter  nodes  as
> variables,  and  variable  update  operations  that  are  just
> additional  nodes  in  the  graph;  in  contrast,  DistBelief,
> Project Adam and the Parameter Server systems all have
> whole separate parameter server subsystems devoted to
> communicating and updating parameter values.
>

I think the Related work section is the most relevant to this discussion as
it discusses the differences between the programming models in Spark, Naiad
etc. to the TF model.

Also re. fault tolerance:

When a failure is detected, the entire graph execution
> is  aborted  and  restarted  from  scratch.   Recall  however
> that Variable nodes refer to tensors that persist across ex-
> ecutions of the graph. We support consistent checkpoint-
> ing and recovery of this state on a restart.  In particular,
> each Variable node is connected to a Save node.  These
> Save nodes are executed periodically, say once every N
> iterations, or once every N seconds. When they execute,
> the contents of the variables are written to persistent stor-
> age, e.g., a distributed file system.  Similarly each Vari-
> able is connected to a Restore node that is only enabled
> in the first iteration after a restart.
>

[1] http://download.tensorflow.org/paper/whitepaper2015.pdf

On Tue, Feb 14, 2017 at 3:18 PM, Gábor Hermann 
wrote:

> Hey Ufuk,
>
> I'm happy to contribute. At least I'll get a bit more understanding of the
> details.
>
> Breaking the assumption that only a single thread updates state would
> brings us from strong isolation guarantees (i.e. serializability at the
> updates and read committed at the external queries) to no isolation
> guarantees. That's not something to be taken lightly. I think that these
> guarantees would be more easily provided for inside queries that modify
> (setKvState), but that's still not trivial.
>
> Indeed, the iteration approach works better for the use-cases I mentioned,
> at least for now.
>
> Cheers,
> Gabor
>
>
> On 2017-02-14 14:43, Ufuk Celebi wrote:
>
> Hey Gabor,
>>
>> great ideas here. It's only slightly related, but I'm currently working
>> on a proposal to improve the queryable state APIs for lookups (partly along
>> the lines of what you suggested with higher level accessors). Maybe you are
>> interested in contributing there?
>>
>> I really like your ideas for the use cases you describe, but I'm unsure
>> about the write path (setKvState), because of the discussed implications to
>> the state backends. I think that this needs more discussion and
>> coordination with the contributors working on the backends. For example,
>> one assumption so far was that only a single thread updates state and we
>> don't scope state per checkpoint (to provide "isolation levels" for the
>> queries; read comitted vs. read uncommitted) and probably more.
>>
>> Because of this I would actually lean towards the iteration approach in a
>> first version. Would that be a feasible starting point for you?
>>
>> – Ufuk
>>
>> On 14 February 2017 at 14:01:21, Gábor Hermann (m...@gaborhermann.com)
>> wrote:
>>
>>> Hi Gyula, Jinkui Shi,
>>>   Thanks for your thoughts!
>>>   @Gyula: I'll try and explain a bit more detail.
>>>   The API could be almost like the QueryableState's. It could be
>>> higher-level though: returning Java objects instead of serialized data
>>> (because there would not be issues with class loading). Also, it could
>>> support setKvState (see my 5. point). This could lead to both a
>>> potential performance improvements and easier usage (see my points 2.
>>> and 3.).
>>>   A use-case could be anything where we'd use an external KV-store.
>>> For instance we are updating user states based on another user state, so
>>> in the map function we do a query (in pseudo-ish Scala code):
>>>   users.keyBy(0).flatMapWithState { (userEvent, collector) =>
>>> val thisUser: UserState = state.get()
>>> val otherUser: Future[UserState] =
>>> qsClient.getKvState("users", userEvent.otherUserId)
>>>   otherUser.onSuccess { case otherUserState =>
>>> state.update(someFunc1(thisUser, otherUserState))
>>> collector.collect(someFunc2(thisUser, otherUserState))
>>> }
>>> }
>>>   Another example could be (online) distributed matrix factorization,
>>> where the two factor matrices are represented by distributed states. One
>>> is 

Re: Using QueryableState inside Flink jobs (and Parameter Server implementation)

2017-02-14 Thread Gábor Hermann

Hey Ufuk,

I'm happy to contribute. At least I'll get a bit more understanding of 
the details.


Breaking the assumption that only a single thread updates state would 
brings us from strong isolation guarantees (i.e. serializability at the 
updates and read committed at the external queries) to no isolation 
guarantees. That's not something to be taken lightly. I think that these 
guarantees would be more easily provided for inside queries that modify 
(setKvState), but that's still not trivial.


Indeed, the iteration approach works better for the use-cases I 
mentioned, at least for now.


Cheers,
Gabor

On 2017-02-14 14:43, Ufuk Celebi wrote:


Hey Gabor,

great ideas here. It's only slightly related, but I'm currently working on a 
proposal to improve the queryable state APIs for lookups (partly along the 
lines of what you suggested with higher level accessors). Maybe you are 
interested in contributing there?

I really like your ideas for the use cases you describe, but I'm unsure about the write 
path (setKvState), because of the discussed implications to the state backends. I think 
that this needs more discussion and coordination with the contributors working on the 
backends. For example, one assumption so far was that only a single thread updates state 
and we don't scope state per checkpoint (to provide "isolation levels" for the 
queries; read comitted vs. read uncommitted) and probably more.

Because of this I would actually lean towards the iteration approach in a first 
version. Would that be a feasible starting point for you?

– Ufuk

On 14 February 2017 at 14:01:21, Gábor Hermann (m...@gaborhermann.com) wrote:

Hi Gyula, Jinkui Shi,
  
Thanks for your thoughts!
  
@Gyula: I'll try and explain a bit more detail.
  
The API could be almost like the QueryableState's. It could be

higher-level though: returning Java objects instead of serialized data
(because there would not be issues with class loading). Also, it could
support setKvState (see my 5. point). This could lead to both a
potential performance improvements and easier usage (see my points 2.
and 3.).
  
A use-case could be anything where we'd use an external KV-store.

For instance we are updating user states based on another user state, so
in the map function we do a query (in pseudo-ish Scala code):
  
users.keyBy(0).flatMapWithState { (userEvent, collector) =>

val thisUser: UserState = state.get()
val otherUser: Future[UserState] =
qsClient.getKvState("users", userEvent.otherUserId)
  
otherUser.onSuccess { case otherUserState =>

state.update(someFunc1(thisUser, otherUserState))
collector.collect(someFunc2(thisUser, otherUserState))
}
}
  
Another example could be (online) distributed matrix factorization,

where the two factor matrices are represented by distributed states. One
is updated by querying the other (with getKvState), and computing some
function (i.e. SGD), while the other is updated at the same place (with
setKvState).
  
I see the motivation behind the QueryableState as a way to make further

use of the KV-store we practically have at stateful operators (but
please correct me if I'm wrong). I think we could make even more use of
if the KV-store is used inside the same job.
  
1) Order and timeliness

As you've mentioned it's hard to guarantee any ordering when working
with two states on possibly distinct machines. This could bring us to
distributed transaction processing, what's a complex topic in itself. I
can imagine using watermarks and keeping different state versions to
only allow querying state from the past, and not from the future. For
now, let's just assume that order does not matter.
  
2) Fault-tolerance

There might be other things we could do, but there are two simple
guarantees that we can surely provide. First, by using the current
QueryableState the task could fail with incomplete futures. If the
records producing those futures are received before the previous
checkpoint barrier, those updates will be completely lost. We could
solve this by wait for the futures to complete before starting a
checkpoint, thus providing exactly-once guarantees. This would ensure
that, although the UDF has side-effects, every record has its effect
exactly-once. I don't see a good way to provide this guarantee with the
current QueryableState. Second, we can guarantee that the query will
eventually succeed (or else the whole topology would fail).
  
3) Optimizations

I've also got two examples for optimizations. First, we can do a
best-effort to co-locate the two stateful operators to save on network
overhead. The user could try to co-locate the querying machines when
externally querying the state, but could not move the machines with the
state being queried. Second, we could provide a user-interface for
(loose) guarantees on the latency of sending and returning queries, just
like setting the buffer timeout.
  
4) Concurrent reading/writing

Key-value states and collectors might be accessed concurrently. While
the 

Re: Using QueryableState inside Flink jobs (and Parameter Server implementation)

2017-02-14 Thread Ufuk Celebi
Hey Gabor,

great ideas here. It's only slightly related, but I'm currently working on a 
proposal to improve the queryable state APIs for lookups (partly along the 
lines of what you suggested with higher level accessors). Maybe you are 
interested in contributing there?

I really like your ideas for the use cases you describe, but I'm unsure about 
the write path (setKvState), because of the discussed implications to the state 
backends. I think that this needs more discussion and coordination with the 
contributors working on the backends. For example, one assumption so far was 
that only a single thread updates state and we don't scope state per checkpoint 
(to provide "isolation levels" for the queries; read comitted vs. read 
uncommitted) and probably more. 

Because of this I would actually lean towards the iteration approach in a first 
version. Would that be a feasible starting point for you?

– Ufuk

On 14 February 2017 at 14:01:21, Gábor Hermann (m...@gaborhermann.com) wrote:
> Hi Gyula, Jinkui Shi,
>  
> Thanks for your thoughts!
>  
> @Gyula: I'll try and explain a bit more detail.
>  
> The API could be almost like the QueryableState's. It could be
> higher-level though: returning Java objects instead of serialized data
> (because there would not be issues with class loading). Also, it could
> support setKvState (see my 5. point). This could lead to both a
> potential performance improvements and easier usage (see my points 2.
> and 3.).
>  
> A use-case could be anything where we'd use an external KV-store.
> For instance we are updating user states based on another user state, so
> in the map function we do a query (in pseudo-ish Scala code):
>  
> users.keyBy(0).flatMapWithState { (userEvent, collector) =>
> val thisUser: UserState = state.get()
> val otherUser: Future[UserState] =
> qsClient.getKvState("users", userEvent.otherUserId)
>  
> otherUser.onSuccess { case otherUserState =>
> state.update(someFunc1(thisUser, otherUserState))
> collector.collect(someFunc2(thisUser, otherUserState))
> }
> }
>  
> Another example could be (online) distributed matrix factorization,
> where the two factor matrices are represented by distributed states. One
> is updated by querying the other (with getKvState), and computing some
> function (i.e. SGD), while the other is updated at the same place (with
> setKvState).
>  
> I see the motivation behind the QueryableState as a way to make further
> use of the KV-store we practically have at stateful operators (but
> please correct me if I'm wrong). I think we could make even more use of
> if the KV-store is used inside the same job.
>  
> 1) Order and timeliness
> As you've mentioned it's hard to guarantee any ordering when working
> with two states on possibly distinct machines. This could bring us to
> distributed transaction processing, what's a complex topic in itself. I
> can imagine using watermarks and keeping different state versions to
> only allow querying state from the past, and not from the future. For
> now, let's just assume that order does not matter.
>  
> 2) Fault-tolerance
> There might be other things we could do, but there are two simple
> guarantees that we can surely provide. First, by using the current
> QueryableState the task could fail with incomplete futures. If the
> records producing those futures are received before the previous
> checkpoint barrier, those updates will be completely lost. We could
> solve this by wait for the futures to complete before starting a
> checkpoint, thus providing exactly-once guarantees. This would ensure
> that, although the UDF has side-effects, every record has its effect
> exactly-once. I don't see a good way to provide this guarantee with the
> current QueryableState. Second, we can guarantee that the query will
> eventually succeed (or else the whole topology would fail).
>  
> 3) Optimizations
> I've also got two examples for optimizations. First, we can do a
> best-effort to co-locate the two stateful operators to save on network
> overhead. The user could try to co-locate the querying machines when
> externally querying the state, but could not move the machines with the
> state being queried. Second, we could provide a user-interface for
> (loose) guarantees on the latency of sending and returning queries, just
> like setting the buffer timeout.
>  
> 4) Concurrent reading/writing
> Key-value states and collectors might be accessed concurrently. While
> the user could use locks, we the system handle this instead of the user.
> E.g. using a thread-safe collector whenever we see internal KV-state
> query registered at the UDF.
>  
> 5) setKvState
> We could not give exactly-once guarantees if we allowed external queries
> to modify the state. When a Flink topology fails and restarts the
> modifications coming from the outside would not be replayed. However, we
> can simply give exactly-once guarantees if the modifications are done
> inside (set aside ordering), as the records would be 

[jira] [Created] (FLINK-5795) Improve “UDTF" to support with parameter constructor

2017-02-14 Thread sunjincheng (JIRA)
sunjincheng created FLINK-5795:
--

 Summary: Improve “UDTF" to support with parameter constructor
 Key: FLINK-5795
 URL: https://issues.apache.org/jira/browse/FLINK-5795
 Project: Flink
  Issue Type: Sub-task
  Components: Table API & SQL
Reporter: sunjincheng
Assignee: sunjincheng






--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Re: Using QueryableState inside Flink jobs (and Parameter Server implementation)

2017-02-14 Thread Gábor Hermann

Hi Gyula, Jinkui Shi,

Thanks for your thoughts!

@Gyula: I'll try and explain a bit more detail.

The API could be almost like the QueryableState's. It could be 
higher-level though: returning Java objects instead of serialized data 
(because there would not be issues with class loading). Also, it could 
support setKvState (see my 5. point). This could lead to both a 
potential performance improvements and easier usage (see my points 2. 
and 3.).


A use-case could be anything where we'd use an external KV-store.
For instance we are updating user states based on another user state, so 
in the map function we do a query (in pseudo-ish Scala code):


users.keyBy(0).flatMapWithState { (userEvent, collector) =>
  val thisUser: UserState = state.get()
  val otherUser: Future[UserState] =
qsClient.getKvState("users", userEvent.otherUserId)

  otherUser.onSuccess { case otherUserState =>
state.update(someFunc1(thisUser, otherUserState))
collector.collect(someFunc2(thisUser, otherUserState))
  }
}

Another example could be (online) distributed matrix factorization, 
where the two factor matrices are represented by distributed states. One 
is updated by querying the other (with getKvState), and computing some 
function (i.e. SGD), while the other is updated at the same place (with 
setKvState).


I see the motivation behind the QueryableState as a way to make further 
use of the KV-store we practically have at stateful operators (but 
please correct me if I'm wrong). I think we could make even more use of 
if the KV-store is used inside the same job.


1) Order and timeliness
As you've mentioned it's hard to guarantee any ordering when working 
with two states on possibly distinct machines. This could bring us to 
distributed transaction processing, what's a complex topic in itself. I 
can imagine using watermarks and keeping different state versions to 
only allow querying state from the past, and not from the future. For 
now, let's just assume that order does not matter.


2) Fault-tolerance
There might be other things we could do, but there are two simple 
guarantees that we can surely provide. First, by using the current 
QueryableState the task could fail with incomplete futures. If the 
records producing those futures are received before the previous 
checkpoint barrier, those updates will be completely lost. We could 
solve this by wait for the futures to complete before starting a 
checkpoint, thus providing exactly-once guarantees. This would ensure 
that, although the UDF has side-effects, every record has its effect 
exactly-once. I don't see a good way to provide this guarantee with the 
current QueryableState. Second, we can guarantee that the query will 
eventually succeed (or else the whole topology would fail).


3) Optimizations
I've also got two examples for optimizations. First, we can do a 
best-effort to co-locate the two stateful operators to save on network 
overhead. The user could try to co-locate the querying machines when 
externally querying the state, but could not move the machines with the 
state being queried. Second, we could provide a user-interface for 
(loose) guarantees on the latency of sending and returning queries, just 
like setting the buffer timeout.


4) Concurrent reading/writing
Key-value states and collectors might be accessed concurrently. While 
the user could use locks, we the system handle this instead of the user. 
E.g. using a thread-safe collector whenever we see internal KV-state 
query registered at the UDF.


5) setKvState
We could not give exactly-once guarantees if we allowed external queries 
to modify the state. When a Flink topology fails and restarts the 
modifications coming from the outside would not be replayed. However, we 
can simply give exactly-once guarantees if the modifications are done 
inside (set aside ordering), as the records would be replayed if the 
modification failed.


I believe it would not take much effort to do these improvements. 
Although, it would affect the runtime (integration with FT), and it 
might not be worth working towards these goals. What do you think about 
this?


It's also worth considering that the same use-cases could be similarly 
done with the iteration/loops API, but in a bit less natural way, 
imitating two-direction communication.


Should we move this discussion to a JIRA issue, to avoid flooding the 
mailing list?



@Jinkui Shi:
1. I think we should definitely support a flexible update strategy. I.e. 
allow to choose between sync, async and bounded-delay.
2. I really like your idea of using PS externally and connecting to it 
with a source and a sink. Fault-tolerance could be also be achieved by 
versioning at the PS and resending older parameters if the Flink job 
fails (i.e. making PS a durable source). Although, the question is then 
how to implement the PS? Do you think we could use the implementations 
you've mentioned?
3. Good idea. It's been just proposed to support GPU calculations 

Re: Support for daylight saving timezone changes in Flink

2017-02-14 Thread Till Rohrmann
Hi Swapnil,

Flink does not have explicit support for this. It's the responsibility of
the user to make sure that the right watermarks are extracted from the
incoming events. This also means the correct handling of different time
zones.

Cheers,
Till

On Tue, Feb 14, 2017 at 8:41 AM, Swapnil Chougule 
wrote:

> I want to know the behavior of flink streaming systems during daylight
> saving changes in multiple timezones. As streaming systems may in such
> timezones.
> Is there any built-in support is needed ? Can anybody answer ?
> Thanks in advance
>
> --Swapnil
>


Re: FLINK-5734 : Code Generation for NormalizedKeySorter

2017-02-14 Thread Gábor Gévay
Hello,

Pat, the table in your email is somehow not visible in my gmail, but
it is visible here:
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/FLINK-5734-Code-Generation-for-NormalizedKeySorter-tt15804.html#a15936
Maybe the problem is caused by the formatting.

> FLINK-3722
> approach seems to be the fastest one.

OK, then I would suggest to do the implementation of the code
generation on top of the PR for FLINK-3722. I guess we can assume that
that PR will be merged sooner than the code generation, so there won't
be any serious merge conflicts this way.

Best,
Gábor




2017-02-14 11:16 GMT+01:00 pat.chormai :
> Hi [~greghogan]
>
> I have done the benchmark comparing between FLINK-3722 and our approaches.
> As you can see at *Score * column which represents sorting time, FLINK-3722
> approach seems to be the fastest one.
>
>
>
>
>
> --
> View this message in context: 
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/FLINK-5734-Code-Generation-for-NormalizedKeySorter-tp15804p15936.html
> Sent from the Apache Flink Mailing List archive. mailing list archive at 
> Nabble.com.