[jira] [Created] (FLINK-5861) TaskManager's components support updating JobManagerGateway

2017-02-20 Thread Biao Liu (JIRA)
Biao Liu created FLINK-5861:
---

 Summary: TaskManager's components support updating 
JobManagerGateway
 Key: FLINK-5861
 URL: https://issues.apache.org/jira/browse/FLINK-5861
 Project: Flink
  Issue Type: Sub-task
Reporter: Biao Liu
Assignee: Biao Liu






--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Re: [DISCUSS] Table API / SQL indicators for event and processing time

2017-02-20 Thread Xingcan Cui
Hi all,

As I said in another thread, the main difference between stream and table
is that a stream is an ordered list while a table is an unordered set.

Without considering the out-of-order problem in practice, whether
event-time or processing-time can be just taken as a monotonically
increasing field and that's why the given query[1] would work. In other
words, we must guarantee the "SELECT MAX(t22.rowtime) ..." subquery returns
a single value that can be retrieved from the cached dynamic table since
it's dangerous to join two un-windowed streams.

Under this circumstance, I just consider adding a "monotonic hint"(INC or
DEC) to the field of a (generalized) table (maybe using an annotation on
the registerDataXX method) that can be used to indicate whether a field is
monotonically increasing or decreasing. Then by taking rowtime as common
(monotonically increasing) field, there are several benefits:

1) This can uniform the table and stream by importing total ordering
relation to an unordered set.

2) These fields can be modified arbitrarily as long as they keep the
declared monotonic feature and the watermark problem does not exist any
more.

3) The monotonic hint will be useful in the query optimization process.

What do you think?

Best,
Xingcan

[1]
SELECT​ ​t1.amount​,​ ​t2.rate
FROM​ ​
  table1 ​AS​ t1,
​ ​ table2 ​AS​ ​t2
WHERE ​
  t1.currency = t2.currency AND
  t2.rowtime ​=​ ​(
​ ​​ ​  SELECT​ ​MAX(t22.rowtime)
​ ​​ ​  FROM​ ​table2 ​AS​ t22
​ ​​   ​AND​ ​t22.rowtime ​<=​ t1.rowtime)

On Tue, Feb 21, 2017 at 2:52 AM, Fabian Hueske  wrote:

> Hi everybody,
>
> When Timo wrote to the Calcite mailing list, Julian Hyde replied and gave
> good advice and explained why a system attribute for event-time would be a
> problem [1].
> I thought about this and agree with Julian.
>
> Here is a document to describe the problem, constraints in Flink and a
> proposal how to handle processing time and event time in Table API and SQL:
>
> ->
> https://docs.google.com/document/d/1MDGViWA_TCqpaVoWub7u_GY4PMFSbT8TuaNl-
> EpbTHQ
>
> Please have a look, comment and ask questions.
>
> Thank you,
> Fabian
>
> [1]
> https://lists.apache.org/thread.html/6397caf0ca37f97f2cd27d96f7a12c
> 6fa845d6fd0870214fdce18d96@%3Cdev.calcite.apache.org%3E
>
> 2017-02-16 1:18 GMT+01:00 Fabian Hueske :
>
> > Thanks everybody for the comments.
> >
> > Actually, I think we do not have much choice when deciding whether to use
> > attributes or functions.
> > Consider the following join query:
> >
> > SELECT​ ​t1.amount​,​ ​t2.rate
> > FROM​ ​
> >   table1 ​AS​ t1,
> > ​ ​ table2 ​AS​ ​t2
> > WHERE ​
> >   t1.currency = t2.currency AND
> >   t2.rowtime ​=​ ​(
> > ​ ​​ ​  SELECT​ ​MAX(t22.rowtime)
> > ​ ​​ ​  FROM​ ​table2 ​AS​ t22
> > ​ ​​   ​AND​ ​t22.rowtime ​<=​ t1.rowtime)
> >
> > The query joins two streaming tables. Table 1 is a streaming table with
> > amounts in a certain currency. Table 2 is a (slowly changing) streaming
> > table of currency exchange rates.
> > We want to join the amounts stream with the exchange rate of the
> > corresponding currency that is valid (i.e., last received value ->
> > MAX(rowtime)) at the rowtime of the amounts row.
> > In order to specify the query, we need to refer to the rowtime of the
> > different tables. Hence, we need a way to relate the rowtime expression
> (or
> > marker) to a table.
> > This is not possible with a parameterless scalar function.
> >
> > I'd like to comment on the concerns regarding the performance:
> > In fact, the columns could be completely virtual and only exist during
> > query parsing and validation.
> > During execution, we can directly access the rowtime metadata of a Flink
> > streaming record (which is present anyway) or look up the current
> > processing time from the machine clock. So the processing overhead would
> > actually be the same as with a marker function.
> >
> > Regarding the question on what should be allowed with a system attribute:
> > IMO, it could be used as any other attribute. We need it at least in
> GROUP
> > BY, ORDER BY, and WHERE to define windows and joins. We could also allow
> to
> > access it in SELECT if we want users to give access to rowtime and
> > processing time. So @Haohui, your query could be supported.
> > However, what would not be allowed is to modify the value of the rows,
> > i.e., by naming another column rowtime, i.e., "SELECT sometimestamp AS
> > rowtime" would not be allowed, because Flink does not support to modify
> the
> > event time of a row (for good reasons) and processing time should not be
> > modifiable anyway.
> >
> > @Timo:
> > I think the approach to only use the system columns during parsing and
> > validation and converting them to expressions afterwards makes a lot of
> > sense.
> > The question is how this approach could be nicely integrated with
> Calcite.
> >
> > Best, Fabian
> >
> >
> >
> > 2017-02-15 16:50 GMT+01:00 Radu Tudoran :
> >
> >> Hi,
> >>
> >> My initial thought would be that it makes more se

[jira] [Created] (FLINK-5860) Replace all the file creating from java.io.tmpdir with TemporaryFolder

2017-02-20 Thread shijinkui (JIRA)
shijinkui created FLINK-5860:


 Summary: Replace all the file creating from java.io.tmpdir with 
TemporaryFolder
 Key: FLINK-5860
 URL: https://issues.apache.org/jira/browse/FLINK-5860
 Project: Flink
  Issue Type: Test
  Components: Tests
Reporter: shijinkui


Search `System.getProperty("java.io.tmpdir")` whole Flink project. It will get 
a  Unit test list. Replace all the file creating from `java.io.tmpdir` with 
TemporaryFolder.

Who can fix this problem thoroughly?

```

$ grep -ri 'System.getProperty("java.io.tmpdir")' .
./flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraTupleWriteAheadSinkExample.java:
env.setStateBackend(new FsStateBackend("file:///" + 
System.getProperty("java.io.tmpdir") + "/flink/backend"));
./flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java:
  File tempDir = new File(System.getProperty("java.io.tmpdir"));
./flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java:
   File tempDir = new File(System.getProperty("java.io.tmpdir"));
./flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java:
   File tempDir = new File(System.getProperty("java.io.tmpdir"));
./flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java:
   return getMockEnvironment(new File[] { new 
File(System.getProperty("java.io.tmpdir")) });
./flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java: 
public static final String DEFAULT_TASK_MANAGER_TMP_PATH = 
System.getProperty("java.io.tmpdir");
./flink-core/src/test/java/org/apache/flink/api/common/io/EnumerateNestedFilesTest.java:
final String tempPath = System.getProperty("java.io.tmpdir");
./flink-core/src/test/java/org/apache/flink/testutils/TestConfigUtils.java: 
final File tempDir = new File(System.getProperty("java.io.tmpdir"));
./flink-core/src/test/java/org/apache/flink/testutils/TestFileUtils.java:   
File tempDir = new File(System.getProperty("java.io.tmpdir"));
./flink-core/src/test/java/org/apache/flink/testutils/TestFileUtils.java:   
File tempDir = new File(System.getProperty("java.io.tmpdir"));
./flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/clustering/util/KMeansDataGenerator.java:
final String outDir = params.get("output", 
System.getProperty("java.io.tmpdir"));
./flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/ml/util/LinearRegressionDataGenerator.java:
  final String tmpDir = System.getProperty("java.io.tmpdir");
./flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/relational/util/WebLogDataGenerator.java:
final String outPath = System.getProperty("java.io.tmpdir");
./flink-java8/src/test/java/org/apache/flink/runtime/util/JarFileCreatorLambdaTest.java:
File out = new File(System.getProperty("java.io.tmpdir"), 
"jarcreatortest.jar");
./flink-java8/src/test/java/org/apache/flink/runtime/util/JarFileCreatorLambdaTest.java:
File out = new File(System.getProperty("java.io.tmpdir"), 
"jarcreatortest.jar");
./flink-java8/src/test/java/org/apache/flink/runtime/util/JarFileCreatorLambdaTest.java:
File out = new File(System.getProperty("java.io.tmpdir"), 
"jarcreatortest.jar");
./flink-java8/src/test/java/org/apache/flink/runtime/util/JarFileCreatorLambdaTest.java:
File out = new File(System.getProperty("java.io.tmpdir"), 
"jarcreatortest.jar");
./flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java:
 public static final String FLINK_PYTHON_FILE_PATH = 
System.getProperty("java.io.tmpdir") + File.separator + "flink_plan";
./flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java:
 public static final String FLINK_TMP_DATA_DIR = 
System.getProperty("java.io.tmpdir") + File.separator + "flink_data";
./flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java:
 FLINK_HDFS_PATH = "file:" + 
System.getProperty("java.io.tmpdir") + File.separator + "flink";
./flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java: 
baseDir = new File(System.getProperty("java.io.tmpdir"));
./flink-runtime/src/main/java/org/apache/flink/runtime/util/EnvironmentInformation.java:
return System.getProperty("java.io.tmpdir");
./flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/FlinkZooKeeperQuorumPeer.java:
  

[jira] [Created] (FLINK-5859) support partition pruning on Table API & SQL

2017-02-20 Thread godfrey he (JIRA)
godfrey he created FLINK-5859:
-

 Summary: support partition pruning on Table API & SQL
 Key: FLINK-5859
 URL: https://issues.apache.org/jira/browse/FLINK-5859
 Project: Flink
  Issue Type: New Feature
  Components: Table API & SQL
Reporter: godfrey he
Assignee: godfrey he


Many data sources are partitionable storage, e.g. HDFS, Druid. And many queries 
just need to read a small subset of the total data. We can use partition 
information to prune or skip over files irrelevant to the user’s queries. Both 
query optimization time and execution time can be reduced obviously, especially 
for a large partitioned table.




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-5858) Support multiple sinks in same execution DAG

2017-02-20 Thread godfrey he (JIRA)
godfrey he created FLINK-5858:
-

 Summary: Support multiple sinks in same execution DAG
 Key: FLINK-5858
 URL: https://issues.apache.org/jira/browse/FLINK-5858
 Project: Flink
  Issue Type: Improvement
  Components: Table API & SQL
Reporter: godfrey he


When call writeToSink method to write the Table(with TableSource) to a 
TableSink, the Table was translated to DataSet or DataStream, if we call 
writeToSink(write to different sinks) more than once, the Table was also 
translated more than once. The final execution graph was parted to different 
DAGs. For example:

{code:title=Example.scala|borderStyle=solid}
val env = ExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env)

val csvTableSource = new CsvTableSource(
  "/tmp/words",
  Array("first", "id", "score", "last"),
  Array(
BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.INT_TYPE_INFO,
BasicTypeInfo.DOUBLE_TYPE_INFO,
BasicTypeInfo.STRING_TYPE_INFO
  ),
  fieldDelim = "#"
)

tEnv.registerTableSource("csv_source", csvTableSource)
val resultTable = tEnv.scan("csv_source")
  .groupBy('first)
  .select('first, 'score.sum)

resultTable.writeToSink(new CsvTableSink("/tmp/wordcount1"))
resultTable.writeToSink(new CsvTableSink("/tmp/wordcount2"))

println(tEnv.explain(resultTable))
{code}

result:

== Abstract Syntax Tree ==
LogicalProject(first=[$0], TMP_1=[$1])
  LogicalAggregate(group=[{0}], TMP_0=[SUM($1)])
LogicalProject(first=[$0], score=[$2])
  LogicalTableScan(table=[[csv_source]])

== Optimized Logical Plan ==
DataSetAggregate(groupBy=[first], select=[first, SUM(score) AS TMP_0])
  BatchTableSourceScan(table=[[csv_source]], fields=[first, score])

== Physical Execution Plan ==
{color:red}
Stage 6 : Data Source
{color}
content : collect elements with CollectionInputFormat
Partitioning : RANDOM_PARTITIONED

Stage 5 : Map
content : prepare select: (first, SUM(score) AS TMP_0)
ship_strategy : Forward
exchange_mode : PIPELINED
driver_strategy : Map
Partitioning : RANDOM_PARTITIONED

Stage 4 : GroupCombine
content : groupBy: (first), select: (first, SUM(score) 
AS TMP_0)
ship_strategy : Forward
exchange_mode : PIPELINED
driver_strategy : Sorted Combine
Partitioning : RANDOM_PARTITIONED

Stage 3 : GroupReduce
content : groupBy: (first), select: (first, 
SUM(score) AS TMP_0)
ship_strategy : Hash Partition on [0]
exchange_mode : PIPELINED
driver_strategy : Sorted Group Reduce
Partitioning : RANDOM_PARTITIONED

Stage 2 : Map
content : to: Row(f0: String, f1: 
Double)
ship_strategy : Forward
exchange_mode : PIPELINED
driver_strategy : Map
Partitioning : RANDOM_PARTITIONED

Stage 1 : Map
content : Map at 
emitDataSet(CsvTableSink.scala:67)
ship_strategy : Forward
exchange_mode : PIPELINED
driver_strategy : Map
Partitioning : 
RANDOM_PARTITIONED

Stage 0 : Data Sink
content : 
TextOutputFormat (/tmp/wordcount1) - UTF-8
ship_strategy : Forward
exchange_mode : 
PIPELINED
Partitioning : 
RANDOM_PARTITIONED

{color:red}
Stage 13 : Data Source
{color}
content : collect elements with CollectionInputFormat
Partitioning : RANDOM_PARTITIONED

Stage 12 : Map
content : prepare select: (first, SUM(score) AS TMP_0)
ship_strategy : Forward
exchange_mode : PIPELINED
driver_strategy : Map
Partitioning : RANDOM_PARTITIONED

Stage 11 : GroupCombine
content : groupBy: (first), select: (first, SUM(score) 
AS TMP_0)
ship_strategy : Forward
exchange_mode 

[jira] [Created] (FLINK-5857) Recycle idle containers in time for yarn mode

2017-02-20 Thread shuai.xu (JIRA)
shuai.xu created FLINK-5857:
---

 Summary: Recycle idle containers in time for yarn mode
 Key: FLINK-5857
 URL: https://issues.apache.org/jira/browse/FLINK-5857
 Project: Flink
  Issue Type: Bug
  Components: YARN
Reporter: shuai.xu
Assignee: shuai.xu


When we run flink batch job like map reduce, after a map is finished, the 
container for it may be idle for a long time, we need to have a strategy to 
recycle there container to reduce resource usage



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-5856) Need return redundant containers to yarn for yarn mode

2017-02-20 Thread shuai.xu (JIRA)
shuai.xu created FLINK-5856:
---

 Summary: Need return redundant containers to yarn for yarn mode
 Key: FLINK-5856
 URL: https://issues.apache.org/jira/browse/FLINK-5856
 Project: Flink
  Issue Type: Bug
  Components: YARN
Reporter: shuai.xu
Assignee: shuai.xu


For flink on yarn mode, RM requests container from yarn according to the 
requirement of the JM. But the AMRMClientAsync used in yarn doesn't guarantee 
that the number of containers returned exactly equal to the number requested. 
So it need to record the number request by flink rm and return the redundant 
ones to yarn.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Re: Contribute to Flink

2017-02-20 Thread Jin Mingjian
much thanks, Timo, I am starting to feel great at the Flink community!
cheers,
Jin

On Mon, Feb 20, 2017 at 10:00 PM, Timo Walther  wrote:

> Welcome to the Flink community, Jin!
>
> I gave you contributor permissions, so you can assign issues to yourself.
>
> Regards,
> Timo
>
>
> Am 20/02/17 um 14:47 schrieb Jin Mingjian:
>
> Hi, Flink dev community,
>>
>> I'd like to contribute to Flink. Particularly, I am interested in kinds of
>> optimization works in Flink.
>>
>> To be familiar with the process of contribution, I pick up some starter
>> issue as the entrance contribution(s) such as [FLINK-5692](
>> https://issues.apache.org/jira/browse/FLINK-5692) and/or some else.
>>
>> It is appreciated who gives me the permission of that I can assign that
>> issue to myself. Then, the journey can be started:)
>>
>> best regards,
>> Jin
>>
>>
>


Re: [DISCUSS] Flink ML roadmap

2017-02-20 Thread Andrea Spina
Hi all,

Thanks Stavros for pushing forward the discussion which I feel really
relevant.

Since I'm approaching actively the community just right now and I haven't
enough experience and such visibility around the Flink community, I'd limit
myself to share an opinion as a Flink user.

I'm using Flink since almost a year along two different experiences, but
I've bumped into the question "how to handle ML workloads and keep Flink as
the main engine?" in both cases. Then the first point raises in my mind: why
do I need to adopt an extra system for purely ML purposes: how amazing could
be to benefit the Flink engine as ML features provider and to avoid paying
the effort to maintain an additional engine? This thought links also @Timur
opinion: I believe that users would prefer way more a unified architecture
in this case. Even if a user want to use an external tool/library - perhaps
providing additional language support (e.g. R) - so that user should be
capable to run it on top of Flink.

Along my work with Flink I needed to implement some ML algorithms on both
Flink and Spark and I often struggled with Flink performances: namely, I
think (in the name of the bigger picture) we should first focus the effort
on solving some well-known Flink limitations as @theodore pinpointed. I'd
like to highlight [1] and [2] which I find relevant. Since the community
would decide to go ahead with FlinkML I believe fixing the above described
issues may be a good starting point. That would also definitely push forward
some important integrations as Apache SystemML.

Given all these points, I'm increasingly convinced that Online Machine
Learning would be the real final objective and the more suitable goal since
we're talking about a real-time streaming engine and - from a real high
point of view - I believe Flink would fit this topic in a more genuine way
than the batch case. We've a connector for Apache SAMOA, but it seems in an
early stage of development IMHO and not really active. If we want to make
something within Flink instead, we need to speed up the design of some
features (e.g. side inputs [3]).

I really hope we can define a new roadmap by which we can finally push
forward the topic. I will put my best to help in this way.

Sincerely, 
Andrea

[1] Add a FlinkTools.persist style method to the Data Set
https://issues.apache.org/jira/browse/FLINK-1730
[2] Only send data to each taskmanager once for broadcasts
https://cwiki.apache.org/confluence/display/FLINK/FLIP-5%3A+Only+send+data+to+each+taskmanager+once+for+broadcasts
[3] Side inputs - Evolving or static Filter/Enriching
https://docs.google.com/document/d/1hIgxi2Zchww_5fWUHLoYiXwSBXjv-M5eOv-MKQYN3m4/edit#
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Add-Side-Input-Broadcast-Set-For-Streaming-API-td11529.html



--
View this message in context: 
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Flink-ML-roadmap-tp16040p16064.html
Sent from the Apache Flink Mailing List archive. mailing list archive at 
Nabble.com.


[jira] [Created] (FLINK-5855) Unprotected access to pendingFilesPerCheckpoint in BucketingSink

2017-02-20 Thread Ted Yu (JIRA)
Ted Yu created FLINK-5855:
-

 Summary: Unprotected access to pendingFilesPerCheckpoint in 
BucketingSink
 Key: FLINK-5855
 URL: https://issues.apache.org/jira/browse/FLINK-5855
 Project: Flink
  Issue Type: Bug
Reporter: Ted Yu
Priority: Minor


{code}
handlePendingFilesForPreviousCheckpoints(restoredState.pendingFilesPerCheckpoint);

synchronized (restoredState.pendingFilesPerCheckpoint) {
  restoredState.pendingFilesPerCheckpoint.clear();
{code}
Lock on pendingFilesPerCheckpoint should be obtained prior to the call to 
handlePendingFilesForPreviousCheckpoints().



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Re: Question about Async IO

2017-02-20 Thread Stephan Ewen
You can also issue multiple calls in one "invoke()" call (have multiple
Futures) and then chain these futures and return only something
once all Futures are complete.

On Mon, Feb 20, 2017 at 4:01 PM, Till Rohrmann  wrote:

> In order to output 0 elements you have to pass an empty collection to the
> `collect` method.
>
> You're right that our online documentation is lacking the fact that you're
> only supposed to call `collect` once. It's actually documented in the
> JavaDocs of this method. We should change this.
>
> You're also right that the name `AsyncCollector` is definitely not the best
> name for reflecting what it actually represents. I think the initial idea
> was to make it look similar to the existing functions which are given a
> `Collector`. Actually it is a promise/completable future and as such we
> should maybe name it "ResultPromise"/"ResultFuture". I've opened a JIRA
> for
> this [1].
>
> [1] https://issues.apache.org/jira/browse/FLINK-5851
>
> Cheers,
> Till
>
> On Mon, Feb 20, 2017 at 3:41 PM, Gyula Fóra  wrote:
>
> > Hi Till,
> >
> > Thanks, for the explanation!
> >
> > How do I express if I don't want to collect any elements in the async
> > collector? Like 0 output from a flatmap.
> >
> > Also it doesn't seem to be specified anywhere that the AsyncCollector is
> > "completed", it is just a collector. You should be able to collect
> multiple
> > things to it, but it actually won't work if you try to do that from more
> > than one Future.
> >
> > I wonder if it would make sense to change the API to make this more
> > specific otherwise we might keep a lot of unnecessary state or have
> > potential leaks depending on the usage.
> >
> > Just my thoughts, now I also understand the current rationale just I
> didn't
> > completely get it for the first pass.
> >
> > Gyula
> >
> > Till Rohrmann  ezt írta (időpont: 2017. febr. 20.,
> > H,
> > 15:35):
> >
> > > Hi Gyula,
> > >
> > > the assumption is that the AsyncCollector is either completed by the
> user
> > > or, if you have a timeout defined, that it will be completed with a
> > timeout
> > > exception. This means that if you have no timeout defined, then you
> have
> > to
> > > make sure that the collector is completed. Otherwise you will have
> > > lingering state which is never cleared. In that sense it follows the
> > > semantics of normal futures.
> > >
> > > What do you mean by creating more than one future? More than one future
> > > which completes the AsyncCollector? If that's the case, then the first
> > > future which completes will also complete the AsyncCollector and the
> > result
> > > of the other future should be ignored.
> > >
> > > Cheers,
> > > Till
> > > ​
> > >
> > > On Mon, Feb 20, 2017 at 2:53 PM, Gyula Fóra  wrote:
> > >
> > > > Hi,
> > > >
> > > > I was looking at the AsyncFunction interface and try to wrap my head
> > > around
> > > > the implementation and the assumptions and I have some questions,
> maybe
> > > > somebody could help me out :)
> > > >
> > > > What happens if the user does not collect any data or set a future to
> > do
> > > so
> > > > in the invoke method?
> > > > Also what happens if I create more than one Future?
> > > >
> > > > It seems that the "streamRecordBufferEntry"  logic assumes that there
> > > will
> > > > be a Future that eventually collects 1 thing or the user does this
> > > > directly.
> > > > Do I understand correctly? If not I am probably missing the part
> where
> > > the
> > > > buffer entry is removed immediately if no async request was made.
> > > >
> > > > Thank you!
> > > > Gyula
> > > >
> > >
> >
>


Re: [DISCUSS] Table API / SQL indicators for event and processing time

2017-02-20 Thread Fabian Hueske
Hi everybody,

When Timo wrote to the Calcite mailing list, Julian Hyde replied and gave
good advice and explained why a system attribute for event-time would be a
problem [1].
I thought about this and agree with Julian.

Here is a document to describe the problem, constraints in Flink and a
proposal how to handle processing time and event time in Table API and SQL:

->
https://docs.google.com/document/d/1MDGViWA_TCqpaVoWub7u_GY4PMFSbT8TuaNl-EpbTHQ

Please have a look, comment and ask questions.

Thank you,
Fabian

[1]
https://lists.apache.org/thread.html/6397caf0ca37f97f2cd27d96f7a12c6fa845d6fd0870214fdce18d96@%3Cdev.calcite.apache.org%3E

2017-02-16 1:18 GMT+01:00 Fabian Hueske :

> Thanks everybody for the comments.
>
> Actually, I think we do not have much choice when deciding whether to use
> attributes or functions.
> Consider the following join query:
>
> SELECT​ ​t1.amount​,​ ​t2.rate
> FROM​ ​
>   table1 ​AS​ t1,
> ​ ​ table2 ​AS​ ​t2
> WHERE ​
>   t1.currency = t2.currency AND
>   t2.rowtime ​=​ ​(
> ​ ​​ ​  SELECT​ ​MAX(t22.rowtime)
> ​ ​​ ​  FROM​ ​table2 ​AS​ t22
> ​ ​​   ​AND​ ​t22.rowtime ​<=​ t1.rowtime)
>
> The query joins two streaming tables. Table 1 is a streaming table with
> amounts in a certain currency. Table 2 is a (slowly changing) streaming
> table of currency exchange rates.
> We want to join the amounts stream with the exchange rate of the
> corresponding currency that is valid (i.e., last received value ->
> MAX(rowtime)) at the rowtime of the amounts row.
> In order to specify the query, we need to refer to the rowtime of the
> different tables. Hence, we need a way to relate the rowtime expression (or
> marker) to a table.
> This is not possible with a parameterless scalar function.
>
> I'd like to comment on the concerns regarding the performance:
> In fact, the columns could be completely virtual and only exist during
> query parsing and validation.
> During execution, we can directly access the rowtime metadata of a Flink
> streaming record (which is present anyway) or look up the current
> processing time from the machine clock. So the processing overhead would
> actually be the same as with a marker function.
>
> Regarding the question on what should be allowed with a system attribute:
> IMO, it could be used as any other attribute. We need it at least in GROUP
> BY, ORDER BY, and WHERE to define windows and joins. We could also allow to
> access it in SELECT if we want users to give access to rowtime and
> processing time. So @Haohui, your query could be supported.
> However, what would not be allowed is to modify the value of the rows,
> i.e., by naming another column rowtime, i.e., "SELECT sometimestamp AS
> rowtime" would not be allowed, because Flink does not support to modify the
> event time of a row (for good reasons) and processing time should not be
> modifiable anyway.
>
> @Timo:
> I think the approach to only use the system columns during parsing and
> validation and converting them to expressions afterwards makes a lot of
> sense.
> The question is how this approach could be nicely integrated with Calcite.
>
> Best, Fabian
>
>
>
> 2017-02-15 16:50 GMT+01:00 Radu Tudoran :
>
>> Hi,
>>
>> My initial thought would be that it makes more sense to thave procTime()
>> and rowTime() only as functions which in fact are to be used as markers.
>> Having the value (even from special system attributes does not make sense
>> in some scenario such as the ones for creating windows, e.g.,
>> If you have SELECT Count(*) OVER (ORDER BY procTime()...)
>> If you get the value of procTime you cannot do anything as you need the
>> marker to know how to construct the window logic.
>>
>> However, your final idea of having " implement some rule/logic that
>> translates the attributes to special RexNodes internally " I believe is
>> good and gives a solution to both problems. One the one hand for those
>> scenarios where you need the value you can access the value, while for
>> others you can see the special type of the RexNode and use it as a marker.
>>
>> Regarding keeping this data in a table...i am not sure as you would say
>> we  need to augment the data with two fields whether needed or not...this
>> is nto necessary very efficient
>>
>>
>> Dr. Radu Tudoran
>> Senior Research Engineer - Big Data Expert
>> IT R&D Division
>>
>>
>> HUAWEI TECHNOLOGIES Duesseldorf GmbH
>> European Research Center
>> Riesstrasse 25, 80992 München
>>
>> E-mail: radu.tudo...@huawei.com
>> Mobile: +49 15209084330
>> Telephone: +49 891588344173
>>
>> HUAWEI TECHNOLOGIES Duesseldorf GmbH
>> Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
>> Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
>> Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN
>> Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
>> Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN
>> This e-mail and its attachments contain confidential information from
>> HUAWEI, which is intended onl

[jira] [Created] (FLINK-5854) Introduce some Flink-specific base Exception types

2017-02-20 Thread Stephan Ewen (JIRA)
Stephan Ewen created FLINK-5854:
---

 Summary: Introduce some Flink-specific base Exception types
 Key: FLINK-5854
 URL: https://issues.apache.org/jira/browse/FLINK-5854
 Project: Flink
  Issue Type: Improvement
  Components: Core
Reporter: Stephan Ewen
Assignee: Stephan Ewen
 Fix For: 1.3.0


Going through the code, there are a lot of places where exception handling 
could be done a bit nicer, for example 
  - Some methods do not declare exceptions at all in their signatures. They 
simply catch all and wrap it in a {{RuntimeException}}.
  - Some places declare overly generic that they throw {{Exception}}, even 
though they could very specifically type the exceptions they throw.

I suggest to introduce two new basic exceptions, that at least help document a 
bit more what goes wrong:

  - {{FlinkException}} as a base class for checked exceptions that indicate 
that something related to using Flink went wrong. Letting a method throw 
{{FlinkException}} rather than {{Exception}} already helps to not include all 
of Java's runtime exceptions, which indicate programming errors, rather than 
situations that should be recovered.

  - {{FlinkUncheckedException}} as a Flink-specific subclass of 
{{RuntimeException}}. That one can come in handy in places where no exceptions 
were declared, for example when reusing an interface that does not declare 
exceptions.




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Re: [DISCUSS] Flink ML roadmap

2017-02-20 Thread Stavros Kontopoulos
I think Flink ML could be a success. Many use cases out there could benefit
from such algorithms especially online ones.
I agree examples should be created showing how it could be used.

I was not aware of the project re-structuring issues. GPUs is really
important nowdays but it is still not the major reason for not adopting
Flink ML. Flink ML has to be developed further and promoted as previously
stated.

In the meantime as for the reviewing part I am investing time there, so I
would like to see if we can join forces and push stuff.

I am aware of the evaluation framework PR and I will review it this week
hopefully. Bu can we commit on pushing anything given the load people have?

As another option could we propose someone to be the committer there as
well, someone Till will guide if it is needed?

I think we dont need to wait for all issues to be solved first. As for the
big picture re-use makes sense but I think the end result should be
something that benefits
Flink.  I would like to stay in Flink as much as possible from a
UX/features side of view. Of course people already use a number of
libraries for years and what we do by implementing the algorithms is
getting those algorithms to work on large datasets plus for streaming,
keeping the UX familiar at the same time.

I think connecting to external libraries should be done if possible for
things not being your domain like dbs or dfs etc... Is it a domain related
for a streaming engine? Use cases drive that IMHO... Again implementation
should be justified by user needs, if there is no such need no reason to
implement anything.

Just some thoughts...


On Mon, Feb 20, 2017 at 3:39 PM, Timur Shenkao  wrote:

> Hello guys,
>
> My couple of cents.
> All Flink presentations, articles, etc. articulate that Flink is for ETL,
> data ingestion. CEP is a maximum.
> If you visit http://flink.apache.org/usecases.html, you'll there aren't
> any
> explicit ML or Graphs there.
> It's also stated that Flink is suitable when "Data that is processed
> quickly".
> That's why people believe that Flink isn't for ML or don't even know that
> Flink has such algorithms.
> Then, folks decide: "I would better use old good Spark or scikit-learn than
> dive into Flink's internals & implement algo by myself "
>
> Sincerely yours, Timur
>
> On Mon, Feb 20, 2017 at 1:53 PM, Katherin Eri 
> wrote:
>
> > Hello guys,
> >
> >
> > May be we will be able to focus our forces on some E2E scenario or show
> > case for Flink as also ML supporting engine, and in such a way actualize
> > the roadmap?
> >
> >
> > This means: we can take some real life/production problem, like Fraud
> > detection in some area, and try to solve this problem from the point of
> > view of DataScience.
> >
> > Starting from data preprocessing and preparation, finishing
> > implementation/usage of some ML algorithm.
> >
> > Doing this we will understand which issues are showstopper for
> > implementation of such functionality. We will be able to understand
> Flink’s
> > users better.
> >
> >
> > May be community could share its ideas which show case could be the most
> > useful for Apache Flink, or may be Data artisans could lead this?
> >
> > пн, 20 февр. 2017 г. в 15:28, Theodore Vasiloudis <
> > theodoros.vasilou...@gmail.com>:
> >
> > > Hello all,
> > >
> > > thank you for opening this discussion Stavros, note that it's almost
> > > exactly 1 year since I last opened such a topic (linked by Gabor) and
> the
> > > comments there are still relevant.
> > >
> > > I think Gabor described the current state quite well, development in
> the
> > > libraries is hard without committers dedicated to each project, and as
> a
> > > result FlinkML and CEP have stalled.
> > >
> > > I think it's important to look at why development has stalled as well.
> As
> > > people have mentioned there's a multitude of ML libraries out there and
> > my
> > > impression was that not many people are looking to use Flink for ML.
> > Lately
> > > that seems to have changed (with some interest shown in the Flink
> survey
> > as
> > > well).
> > >
> > > Gabor makes some good points about future directions for the library.
> Our
> > > initial goal [1] was to make a truly scalable, easy to use library,
> > within
> > > the Flink ecosystem, providing a set of "workhorse" algorithms, sampled
> > > from what's actually being used in the industry. We planned for a
> library
> > > that has few algorithms, but does them properly.
> > >
> > > If we decide to go the way of focusing within Flink we face some major
> > > challenges, because these are system limitations that do not
> necessarily
> > > align with the goals of the community. Some issues relevant to ML on
> > Flink
> > > are:
> > >
> > >- FLINK-2396 - Review the datasets of dynamic path and static path
> in
> > >iteration.
> > >https://issues.apache.org/jira/browse/FLINK-2396
> > >This has to do with the ability to iterate over one datset (model)
> > while
> > >changing another

[jira] [Created] (FLINK-5853) Add consecutive event based filters

2017-02-20 Thread Dawid Wysakowicz (JIRA)
Dawid Wysakowicz created FLINK-5853:
---

 Summary: Add consecutive event based filters
 Key: FLINK-5853
 URL: https://issues.apache.org/jira/browse/FLINK-5853
 Project: Flink
  Issue Type: New Feature
  Components: CEP
Reporter: Dawid Wysakowicz
Priority: Minor


Support Patterns like:

{code}
Pattern warningPattern = 
Pattern.begin("First Event")
.subtype(TemperatureEvent.class)
.next("Second Event")
.subtype(TemperatureEvent.class)
.where((firstEv, secondEv) -> firstEv.getTemperature() >= 
secondEv.getTemperature() + TEMPERATURE_THRESHOLD)
.within(Time.seconds(10));
{code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-5852) Move JSON generation code into static methods

2017-02-20 Thread Chesnay Schepler (JIRA)
Chesnay Schepler created FLINK-5852:
---

 Summary: Move JSON generation code into static methods
 Key: FLINK-5852
 URL: https://issues.apache.org/jira/browse/FLINK-5852
 Project: Flink
  Issue Type: Improvement
  Components: Webfrontend
Reporter: Chesnay Schepler
Assignee: Chesnay Schepler
 Fix For: 1.3.0


In order to implement the HistoryServer we need a way to generate the JSON 
responses independent of the REST API. As such i suggest to move the main parts 
of the generation code for job-specific handlers into static methods. 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-5851) Renaming AsyncCollector into ResultPromise/ResultFuture

2017-02-20 Thread Till Rohrmann (JIRA)
Till Rohrmann created FLINK-5851:


 Summary: Renaming AsyncCollector into ResultPromise/ResultFuture
 Key: FLINK-5851
 URL: https://issues.apache.org/jira/browse/FLINK-5851
 Project: Flink
  Issue Type: Improvement
  Components: DataStream API
Affects Versions: 1.2.0, 1.3.0
Reporter: Till Rohrmann
 Fix For: 1.3.0


Currently, the async I/O API gives an {{AsyncCollector}} to an 
{{AsyncFunction}} implementation. The name does not really reflect what the 
{{AsyncCollector}} does since it does not collect but is actually a one time 
completable future. Therefore, I propose to rename the {{AsyncCollector}} into 
{{ResultPromise}} or {{ResultFuture}}. This is API changing.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Re: Question about Async IO

2017-02-20 Thread Till Rohrmann
In order to output 0 elements you have to pass an empty collection to the
`collect` method.

You're right that our online documentation is lacking the fact that you're
only supposed to call `collect` once. It's actually documented in the
JavaDocs of this method. We should change this.

You're also right that the name `AsyncCollector` is definitely not the best
name for reflecting what it actually represents. I think the initial idea
was to make it look similar to the existing functions which are given a
`Collector`. Actually it is a promise/completable future and as such we
should maybe name it "ResultPromise"/"ResultFuture". I've opened a JIRA for
this [1].

[1] https://issues.apache.org/jira/browse/FLINK-5851

Cheers,
Till

On Mon, Feb 20, 2017 at 3:41 PM, Gyula Fóra  wrote:

> Hi Till,
>
> Thanks, for the explanation!
>
> How do I express if I don't want to collect any elements in the async
> collector? Like 0 output from a flatmap.
>
> Also it doesn't seem to be specified anywhere that the AsyncCollector is
> "completed", it is just a collector. You should be able to collect multiple
> things to it, but it actually won't work if you try to do that from more
> than one Future.
>
> I wonder if it would make sense to change the API to make this more
> specific otherwise we might keep a lot of unnecessary state or have
> potential leaks depending on the usage.
>
> Just my thoughts, now I also understand the current rationale just I didn't
> completely get it for the first pass.
>
> Gyula
>
> Till Rohrmann  ezt írta (időpont: 2017. febr. 20.,
> H,
> 15:35):
>
> > Hi Gyula,
> >
> > the assumption is that the AsyncCollector is either completed by the user
> > or, if you have a timeout defined, that it will be completed with a
> timeout
> > exception. This means that if you have no timeout defined, then you have
> to
> > make sure that the collector is completed. Otherwise you will have
> > lingering state which is never cleared. In that sense it follows the
> > semantics of normal futures.
> >
> > What do you mean by creating more than one future? More than one future
> > which completes the AsyncCollector? If that's the case, then the first
> > future which completes will also complete the AsyncCollector and the
> result
> > of the other future should be ignored.
> >
> > Cheers,
> > Till
> > ​
> >
> > On Mon, Feb 20, 2017 at 2:53 PM, Gyula Fóra  wrote:
> >
> > > Hi,
> > >
> > > I was looking at the AsyncFunction interface and try to wrap my head
> > around
> > > the implementation and the assumptions and I have some questions, maybe
> > > somebody could help me out :)
> > >
> > > What happens if the user does not collect any data or set a future to
> do
> > so
> > > in the invoke method?
> > > Also what happens if I create more than one Future?
> > >
> > > It seems that the "streamRecordBufferEntry"  logic assumes that there
> > will
> > > be a Future that eventually collects 1 thing or the user does this
> > > directly.
> > > Do I understand correctly? If not I am probably missing the part where
> > the
> > > buffer entry is removed immediately if no async request was made.
> > >
> > > Thank you!
> > > Gyula
> > >
> >
>


Re: Question about Async IO

2017-02-20 Thread Gyula Fóra
Hi Till,

Thanks, for the explanation!

How do I express if I don't want to collect any elements in the async
collector? Like 0 output from a flatmap.

Also it doesn't seem to be specified anywhere that the AsyncCollector is
"completed", it is just a collector. You should be able to collect multiple
things to it, but it actually won't work if you try to do that from more
than one Future.

I wonder if it would make sense to change the API to make this more
specific otherwise we might keep a lot of unnecessary state or have
potential leaks depending on the usage.

Just my thoughts, now I also understand the current rationale just I didn't
completely get it for the first pass.

Gyula

Till Rohrmann  ezt írta (időpont: 2017. febr. 20., H,
15:35):

> Hi Gyula,
>
> the assumption is that the AsyncCollector is either completed by the user
> or, if you have a timeout defined, that it will be completed with a timeout
> exception. This means that if you have no timeout defined, then you have to
> make sure that the collector is completed. Otherwise you will have
> lingering state which is never cleared. In that sense it follows the
> semantics of normal futures.
>
> What do you mean by creating more than one future? More than one future
> which completes the AsyncCollector? If that's the case, then the first
> future which completes will also complete the AsyncCollector and the result
> of the other future should be ignored.
>
> Cheers,
> Till
> ​
>
> On Mon, Feb 20, 2017 at 2:53 PM, Gyula Fóra  wrote:
>
> > Hi,
> >
> > I was looking at the AsyncFunction interface and try to wrap my head
> around
> > the implementation and the assumptions and I have some questions, maybe
> > somebody could help me out :)
> >
> > What happens if the user does not collect any data or set a future to do
> so
> > in the invoke method?
> > Also what happens if I create more than one Future?
> >
> > It seems that the "streamRecordBufferEntry"  logic assumes that there
> will
> > be a Future that eventually collects 1 thing or the user does this
> > directly.
> > Do I understand correctly? If not I am probably missing the part where
> the
> > buffer entry is removed immediately if no async request was made.
> >
> > Thank you!
> > Gyula
> >
>


Re: KeyGroupRangeAssignment ?

2017-02-20 Thread Ovidiu-Cristian MARCU
Hi,

Thank you for clarifications (I am working with KeyedStream so a custom 
partitioner does not help).

So I should set maxParallelism>=parallelism and change my keys (from 
input.keyBy(0)) such that key group assignment works as expected), 
but I can’t modify these keys in order to make it work.

The other option is to change Flink’s internals in order to evenly distribute 
keys (changing computeKeyGroupForKeyHash: is this enough?).
What I was looking for was an api to change the way key group assignment is 
done, but without changing Flink’s runtime.

I think that the maxParallelism setting is not enough (it introduces this 
inefficient way of distributing data for processing when using KeyedStream).
Is it possible to expose somehow the key group assignment?

This is how keys are distributed (1024 keys, key=1..1024; and groups from 2 to 
16 - equiv. parallelism that is number of slots):

{0=517, 1=507} 2
{0=881, 1=809, 2=358} 3
{0=1139, 1=1048, 2=617, 3=268} 4
{0=1319, 1=1268, 2=829, 3=473, 4=207} 5
{0=1512, 1=1425, 2=1008, 3=644, 4=352, 5=179} 6
{0=1656, 1=1586, 2=1160, 3=781, 4=512, 5=310, 6=139} 7
{0=1781, 1=1718, 2=1280, 3=908, 4=645, 5=417, 6=278, 7=141} 8
{0=1901, 1=1828, 2=1395, 3=1031, 4=738, 5=529, 6=399, 7=240, 8=131} 9
{0=1996, 1=1934, 2=1493, 3=1134, 4=846, 5=614, 6=513, 7=354, 8=233, 9=99} 10
{0=2094, 1=2017, 2=1577, 3=1226, 4=935, 5=713, 6=610, 7=434, 8=359, 9=174, 
10=101} 11
{0=2192, 1=2091, 2=1669, 3=1316, 4=1008, 5=797, 6=705, 7=517, 8=446, 9=255, 
10=173, 11=95} 12
{0=2257, 1=2175, 2=1741, 3=1396, 4=1079, 5=882, 6=785, 7=596, 8=524, 9=340, 
10=254, 11=186, 12=73} 13
{0=2330, 1=2258, 2=1816, 3=1458, 4=1160, 5=951, 6=858, 7=667, 8=602, 9=417, 
10=329, 11=265, 12=135, 13=66} 14
{0=2397, 1=2323, 2=1897, 3=1542, 4=1233, 5=1008, 6=934, 7=723, 8=671, 9=479, 
10=385, 11=344, 12=210, 13=118, 14=72} 15
{0=2454, 1=2395, 2=1949, 3=1603, 4=1296, 5=1055, 6=998, 7=803, 8=739, 9=539, 
10=453, 11=410, 12=280, 13=178, 14=147, 15=61} 16

Best,
Ovidiu

> On 20 Feb 2017, at 12:04, Till Rohrmann  wrote:
> 
> Hi Ovidiu,
> 
> the way Flink works is to assign key group ranges to operators. For each 
> element you calculate a hash value and based on that you assign it to a key 
> group. Thus, in your example, you have either a key group with more than 1 
> key or multiple key groups with 1 or more keys assigned to an operator.
> 
> So what you could try to do is to reduce the number of key groups to your 
> parallelism via env.setMaxParallelism() and then try to figure a key out 
> whose hashes are uniformly distributed over the key groups. The key group 
> assignment is calculated via murmurHash(key.hashCode()) % maxParallelism.
> 
> Alternatively if you don’t need a keyed stream, you could try to use a custom 
> partitioner via DataStream.partitionCustom.
> 
> Cheers,
> Till
> 
> 
> On Mon, Feb 20, 2017 at 11:46 AM, Ovidiu-Cristian MARCU 
> mailto:ovidiu-cristian.ma...@inria.fr>> 
> wrote:
> Hi,
> 
> Can you please comment on how can I ensure stream input records are 
> distributed evenly onto task slots?
> See attached screen Records received issue.
> 
> I have a simple application which is applying some window function over a 
> stream partitioned as follows:
> (parallelism is equal to the number of keys; records with the same key are 
> streamed evenly)
> 
> // get the execution environment
> final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> // get input data by connecting to the socket
> DataStream text = env.socketTextStream("localhost", port, "\n");
> DataStream Long>> input = text.flatMap(...);
> DataStream counts1 = null;
> counts1 = input.keyBy(0).countWindow(windowSize, slideSize)
>   .apply(new WindowFunction Integer, String, Double, Long, Long>, Double, Tuple, GlobalWindow>() {
>   ...
>   });
> counts1.writeAsText(params.get("output1"));
> env.execute("Socket Window WordCount”);
> 
> Best,
> Ovidiu
> 
> 



Re: Question about Async IO

2017-02-20 Thread Till Rohrmann
Hi Gyula,

the assumption is that the AsyncCollector is either completed by the user
or, if you have a timeout defined, that it will be completed with a timeout
exception. This means that if you have no timeout defined, then you have to
make sure that the collector is completed. Otherwise you will have
lingering state which is never cleared. In that sense it follows the
semantics of normal futures.

What do you mean by creating more than one future? More than one future
which completes the AsyncCollector? If that's the case, then the first
future which completes will also complete the AsyncCollector and the result
of the other future should be ignored.

Cheers,
Till
​

On Mon, Feb 20, 2017 at 2:53 PM, Gyula Fóra  wrote:

> Hi,
>
> I was looking at the AsyncFunction interface and try to wrap my head around
> the implementation and the assumptions and I have some questions, maybe
> somebody could help me out :)
>
> What happens if the user does not collect any data or set a future to do so
> in the invoke method?
> Also what happens if I create more than one Future?
>
> It seems that the "streamRecordBufferEntry"  logic assumes that there will
> be a Future that eventually collects 1 thing or the user does this
> directly.
> Do I understand correctly? If not I am probably missing the part where the
> buffer entry is removed immediately if no async request was made.
>
> Thank you!
> Gyula
>


Re: Contribute to Flink

2017-02-20 Thread Timo Walther

Welcome to the Flink community, Jin!

I gave you contributor permissions, so you can assign issues to yourself.

Regards,
Timo


Am 20/02/17 um 14:47 schrieb Jin Mingjian:

Hi, Flink dev community,

I'd like to contribute to Flink. Particularly, I am interested in kinds of
optimization works in Flink.

To be familiar with the process of contribution, I pick up some starter
issue as the entrance contribution(s) such as [FLINK-5692](
https://issues.apache.org/jira/browse/FLINK-5692) and/or some else.

It is appreciated who gives me the permission of that I can assign that
issue to myself. Then, the journey can be started:)

best regards,
Jin





Question about Async IO

2017-02-20 Thread Gyula Fóra
Hi,

I was looking at the AsyncFunction interface and try to wrap my head around
the implementation and the assumptions and I have some questions, maybe
somebody could help me out :)

What happens if the user does not collect any data or set a future to do so
in the invoke method?
Also what happens if I create more than one Future?

It seems that the "streamRecordBufferEntry"  logic assumes that there will
be a Future that eventually collects 1 thing or the user does this
directly.
Do I understand correctly? If not I am probably missing the part where the
buffer entry is removed immediately if no async request was made.

Thank you!
Gyula


Contribute to Flink

2017-02-20 Thread Jin Mingjian
Hi, Flink dev community,

I'd like to contribute to Flink. Particularly, I am interested in kinds of
optimization works in Flink.

To be familiar with the process of contribution, I pick up some starter
issue as the entrance contribution(s) such as [FLINK-5692](
https://issues.apache.org/jira/browse/FLINK-5692) and/or some else.

It is appreciated who gives me the permission of that I can assign that
issue to myself. Then, the journey can be started:)

best regards,
Jin


Re: [DISCUSS] Flink ML roadmap

2017-02-20 Thread Timur Shenkao
Hello guys,

My couple of cents.
All Flink presentations, articles, etc. articulate that Flink is for ETL,
data ingestion. CEP is a maximum.
If you visit http://flink.apache.org/usecases.html, you'll there aren't any
explicit ML or Graphs there.
It's also stated that Flink is suitable when "Data that is processed
quickly".
That's why people believe that Flink isn't for ML or don't even know that
Flink has such algorithms.
Then, folks decide: "I would better use old good Spark or scikit-learn than
dive into Flink's internals & implement algo by myself "

Sincerely yours, Timur

On Mon, Feb 20, 2017 at 1:53 PM, Katherin Eri 
wrote:

> Hello guys,
>
>
> May be we will be able to focus our forces on some E2E scenario or show
> case for Flink as also ML supporting engine, and in such a way actualize
> the roadmap?
>
>
> This means: we can take some real life/production problem, like Fraud
> detection in some area, and try to solve this problem from the point of
> view of DataScience.
>
> Starting from data preprocessing and preparation, finishing
> implementation/usage of some ML algorithm.
>
> Doing this we will understand which issues are showstopper for
> implementation of such functionality. We will be able to understand Flink’s
> users better.
>
>
> May be community could share its ideas which show case could be the most
> useful for Apache Flink, or may be Data artisans could lead this?
>
> пн, 20 февр. 2017 г. в 15:28, Theodore Vasiloudis <
> theodoros.vasilou...@gmail.com>:
>
> > Hello all,
> >
> > thank you for opening this discussion Stavros, note that it's almost
> > exactly 1 year since I last opened such a topic (linked by Gabor) and the
> > comments there are still relevant.
> >
> > I think Gabor described the current state quite well, development in the
> > libraries is hard without committers dedicated to each project, and as a
> > result FlinkML and CEP have stalled.
> >
> > I think it's important to look at why development has stalled as well. As
> > people have mentioned there's a multitude of ML libraries out there and
> my
> > impression was that not many people are looking to use Flink for ML.
> Lately
> > that seems to have changed (with some interest shown in the Flink survey
> as
> > well).
> >
> > Gabor makes some good points about future directions for the library. Our
> > initial goal [1] was to make a truly scalable, easy to use library,
> within
> > the Flink ecosystem, providing a set of "workhorse" algorithms, sampled
> > from what's actually being used in the industry. We planned for a library
> > that has few algorithms, but does them properly.
> >
> > If we decide to go the way of focusing within Flink we face some major
> > challenges, because these are system limitations that do not necessarily
> > align with the goals of the community. Some issues relevant to ML on
> Flink
> > are:
> >
> >- FLINK-2396 - Review the datasets of dynamic path and static path in
> >iteration.
> >https://issues.apache.org/jira/browse/FLINK-2396
> >This has to do with the ability to iterate over one datset (model)
> while
> >changing another (dataset), which is necessary for many ML algorithms
> > like
> >SGD.
> >- FLINK-1730 - Add a FlinkTools.persist style method to the Data Set.
> >https://issues.apache.org/jira/browse/FLINK-1730
> >This is again relevant to many algorithms, to create intermediate
> >results etc, for example L-BFGS development has been attempted 2-3
> > times,
> >but always abandoned because of the need to collect a DataSet kills
> the
> >performance.
> >- FLINK-5782 - Support GPU calculations
> >https://issues.apache.org/jira/browse/FLINK-5782
> >Many algorithms will benefit greatly by GPU-accelerated linear
> algebra,
> >to the point where if a library doesn't support it puts it at a severe
> >disadvantage compared to other offerings.
> >
> >
> > These issues aside, Stephan has mentioned recently the possibility of
> > re-structuring the Flink project to allow for more flexibility for the
> > libraries. I think that sounds quite promising and it should allow the
> > development to pick up in the libraries, if we can get some more people
> > reviewing and merging PRs.
> >
> > I would be all for updating our vision and roadmap to match what the
> > community desires from the library.
> >
> > [1]
> >
> > https://cwiki.apache.org/confluence/display/FLINK/
> FlinkML%3A+Vision+and+Roadmap
> >
> > On Mon, Feb 20, 2017 at 12:47 PM, Gábor Hermann 
> > wrote:
> >
> > > Hi Stavros,
> > >
> > > Thanks for bringing this up.
> > >
> > > There have been past [1] and recent [2, 3] discussions about the Flink
> > > libraries, because there are some stalling PRs and overloaded
> committers.
> > > (Actually, Till is the only committer shepherd of the both the CEP and
> ML
> > > library, and AFAIK he has a ton of other responsibilities and work to
> > do.)
> > > Thus it's hard to get code reviewed and merged, and without merged 

Re: [DISCUSS] Flink ML roadmap

2017-02-20 Thread Katherin Eri
Hello guys,


May be we will be able to focus our forces on some E2E scenario or show
case for Flink as also ML supporting engine, and in such a way actualize
the roadmap?


This means: we can take some real life/production problem, like Fraud
detection in some area, and try to solve this problem from the point of
view of DataScience.

Starting from data preprocessing and preparation, finishing
implementation/usage of some ML algorithm.

Doing this we will understand which issues are showstopper for
implementation of such functionality. We will be able to understand Flink’s
users better.


May be community could share its ideas which show case could be the most
useful for Apache Flink, or may be Data artisans could lead this?

пн, 20 февр. 2017 г. в 15:28, Theodore Vasiloudis <
theodoros.vasilou...@gmail.com>:

> Hello all,
>
> thank you for opening this discussion Stavros, note that it's almost
> exactly 1 year since I last opened such a topic (linked by Gabor) and the
> comments there are still relevant.
>
> I think Gabor described the current state quite well, development in the
> libraries is hard without committers dedicated to each project, and as a
> result FlinkML and CEP have stalled.
>
> I think it's important to look at why development has stalled as well. As
> people have mentioned there's a multitude of ML libraries out there and my
> impression was that not many people are looking to use Flink for ML. Lately
> that seems to have changed (with some interest shown in the Flink survey as
> well).
>
> Gabor makes some good points about future directions for the library. Our
> initial goal [1] was to make a truly scalable, easy to use library, within
> the Flink ecosystem, providing a set of "workhorse" algorithms, sampled
> from what's actually being used in the industry. We planned for a library
> that has few algorithms, but does them properly.
>
> If we decide to go the way of focusing within Flink we face some major
> challenges, because these are system limitations that do not necessarily
> align with the goals of the community. Some issues relevant to ML on Flink
> are:
>
>- FLINK-2396 - Review the datasets of dynamic path and static path in
>iteration.
>https://issues.apache.org/jira/browse/FLINK-2396
>This has to do with the ability to iterate over one datset (model) while
>changing another (dataset), which is necessary for many ML algorithms
> like
>SGD.
>- FLINK-1730 - Add a FlinkTools.persist style method to the Data Set.
>https://issues.apache.org/jira/browse/FLINK-1730
>This is again relevant to many algorithms, to create intermediate
>results etc, for example L-BFGS development has been attempted 2-3
> times,
>but always abandoned because of the need to collect a DataSet kills the
>performance.
>- FLINK-5782 - Support GPU calculations
>https://issues.apache.org/jira/browse/FLINK-5782
>Many algorithms will benefit greatly by GPU-accelerated linear algebra,
>to the point where if a library doesn't support it puts it at a severe
>disadvantage compared to other offerings.
>
>
> These issues aside, Stephan has mentioned recently the possibility of
> re-structuring the Flink project to allow for more flexibility for the
> libraries. I think that sounds quite promising and it should allow the
> development to pick up in the libraries, if we can get some more people
> reviewing and merging PRs.
>
> I would be all for updating our vision and roadmap to match what the
> community desires from the library.
>
> [1]
>
> https://cwiki.apache.org/confluence/display/FLINK/FlinkML%3A+Vision+and+Roadmap
>
> On Mon, Feb 20, 2017 at 12:47 PM, Gábor Hermann 
> wrote:
>
> > Hi Stavros,
> >
> > Thanks for bringing this up.
> >
> > There have been past [1] and recent [2, 3] discussions about the Flink
> > libraries, because there are some stalling PRs and overloaded committers.
> > (Actually, Till is the only committer shepherd of the both the CEP and ML
> > library, and AFAIK he has a ton of other responsibilities and work to
> do.)
> > Thus it's hard to get code reviewed and merged, and without merged code
> > it's hard to get a committer status, so there are not many committers who
> > can review e.g. ML algorithm implementations, and the cycle goes on.
> Until
> > this is resolved somehow, we should help the committers by reviewing
> > each-others PRs.
> >
> > I think prioritizing features (b) is a good way to start. We could
> declare
> > most blocking features and concentrate on reviewing and merging them
> before
> > moving forward. E.g. the evaluation framework is quite important for an
> ML
> > library in my opinion, and has a PR stalling for long [4].
> >
> > Regarding c),  there are styleguides generally for contributing to Flink,
> > so we should follow that. Is there something more ML specific you think
> we
> > could follow? We should definitely declare, we follow scikit-learn and
> make
> > sure contributions comply to that.

Re: [DISCUSS] Flink ML roadmap

2017-02-20 Thread Theodore Vasiloudis
Hello all,

thank you for opening this discussion Stavros, note that it's almost
exactly 1 year since I last opened such a topic (linked by Gabor) and the
comments there are still relevant.

I think Gabor described the current state quite well, development in the
libraries is hard without committers dedicated to each project, and as a
result FlinkML and CEP have stalled.

I think it's important to look at why development has stalled as well. As
people have mentioned there's a multitude of ML libraries out there and my
impression was that not many people are looking to use Flink for ML. Lately
that seems to have changed (with some interest shown in the Flink survey as
well).

Gabor makes some good points about future directions for the library. Our
initial goal [1] was to make a truly scalable, easy to use library, within
the Flink ecosystem, providing a set of "workhorse" algorithms, sampled
from what's actually being used in the industry. We planned for a library
that has few algorithms, but does them properly.

If we decide to go the way of focusing within Flink we face some major
challenges, because these are system limitations that do not necessarily
align with the goals of the community. Some issues relevant to ML on Flink
are:

   - FLINK-2396 - Review the datasets of dynamic path and static path in
   iteration.
   https://issues.apache.org/jira/browse/FLINK-2396
   This has to do with the ability to iterate over one datset (model) while
   changing another (dataset), which is necessary for many ML algorithms like
   SGD.
   - FLINK-1730 - Add a FlinkTools.persist style method to the Data Set.
   https://issues.apache.org/jira/browse/FLINK-1730
   This is again relevant to many algorithms, to create intermediate
   results etc, for example L-BFGS development has been attempted 2-3 times,
   but always abandoned because of the need to collect a DataSet kills the
   performance.
   - FLINK-5782 - Support GPU calculations
   https://issues.apache.org/jira/browse/FLINK-5782
   Many algorithms will benefit greatly by GPU-accelerated linear algebra,
   to the point where if a library doesn't support it puts it at a severe
   disadvantage compared to other offerings.


These issues aside, Stephan has mentioned recently the possibility of
re-structuring the Flink project to allow for more flexibility for the
libraries. I think that sounds quite promising and it should allow the
development to pick up in the libraries, if we can get some more people
reviewing and merging PRs.

I would be all for updating our vision and roadmap to match what the
community desires from the library.

[1]
https://cwiki.apache.org/confluence/display/FLINK/FlinkML%3A+Vision+and+Roadmap

On Mon, Feb 20, 2017 at 12:47 PM, Gábor Hermann 
wrote:

> Hi Stavros,
>
> Thanks for bringing this up.
>
> There have been past [1] and recent [2, 3] discussions about the Flink
> libraries, because there are some stalling PRs and overloaded committers.
> (Actually, Till is the only committer shepherd of the both the CEP and ML
> library, and AFAIK he has a ton of other responsibilities and work to do.)
> Thus it's hard to get code reviewed and merged, and without merged code
> it's hard to get a committer status, so there are not many committers who
> can review e.g. ML algorithm implementations, and the cycle goes on. Until
> this is resolved somehow, we should help the committers by reviewing
> each-others PRs.
>
> I think prioritizing features (b) is a good way to start. We could declare
> most blocking features and concentrate on reviewing and merging them before
> moving forward. E.g. the evaluation framework is quite important for an ML
> library in my opinion, and has a PR stalling for long [4].
>
> Regarding c),  there are styleguides generally for contributing to Flink,
> so we should follow that. Is there something more ML specific you think we
> could follow? We should definitely declare, we follow scikit-learn and make
> sure contributions comply to that.
>
> In terms of features (a, d), I think we should first see the bigger
> picture. That is, it would be nice to discuss a clearer direction for Flink
> ML. I've seen a lot of interest in contributing to Flink ML lately. I
> believe we should rethink our goals, to put the contribution efforts in
> making a usable and useful library. Are we trying to implement as many
> useful algorithms as possible to create a scalable ML library? That would
> seem ambitious, and of course there are a lot of frameworks and libraries
> that already has something like this as goal (e.g. Spark MLlib, Mahout).
> Should we rather create connectors to existing libraries? Then we cannot
> really do Flink specific optimizations. Should we go for online machine
> learning (as Flink is concentrating on streaming)? We already have a
> connector to SAMOA. We could go on with questions like this. Maybe I'm
> missing something, but I haven't seen such directions declared.
>
> Cheers,
> Gabor
>
> [1] http://apache-f

Re: [DISCUSS] Flink ML roadmap

2017-02-20 Thread Gábor Hermann

Hi Stavros,

Thanks for bringing this up.

There have been past [1] and recent [2, 3] discussions about the Flink 
libraries, because there are some stalling PRs and overloaded 
committers. (Actually, Till is the only committer shepherd of the both 
the CEP and ML library, and AFAIK he has a ton of other responsibilities 
and work to do.) Thus it's hard to get code reviewed and merged, and 
without merged code it's hard to get a committer status, so there are 
not many committers who can review e.g. ML algorithm implementations, 
and the cycle goes on. Until this is resolved somehow, we should help 
the committers by reviewing each-others PRs.


I think prioritizing features (b) is a good way to start. We could 
declare most blocking features and concentrate on reviewing and merging 
them before moving forward. E.g. the evaluation framework is quite 
important for an ML library in my opinion, and has a PR stalling for 
long [4].


Regarding c),  there are styleguides generally for contributing to 
Flink, so we should follow that. Is there something more ML specific you 
think we could follow? We should definitely declare, we follow 
scikit-learn and make sure contributions comply to that.


In terms of features (a, d), I think we should first see the bigger 
picture. That is, it would be nice to discuss a clearer direction for 
Flink ML. I've seen a lot of interest in contributing to Flink ML 
lately. I believe we should rethink our goals, to put the contribution 
efforts in making a usable and useful library. Are we trying to 
implement as many useful algorithms as possible to create a scalable ML 
library? That would seem ambitious, and of course there are a lot of 
frameworks and libraries that already has something like this as goal 
(e.g. Spark MLlib, Mahout). Should we rather create connectors to 
existing libraries? Then we cannot really do Flink specific 
optimizations. Should we go for online machine learning (as Flink is 
concentrating on streaming)? We already have a connector to SAMOA. We 
could go on with questions like this. Maybe I'm missing something, but I 
haven't seen such directions declared.


Cheers,
Gabor

[1] 
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Opening-a-discussion-on-FlinkML-td10265.html
[2] 
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Flink-CEP-development-is-stalling-td15237.html#a15341
[3] 
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/New-Flink-team-member-Kate-Eri-td15349.html

[4] https://github.com/apache/flink/pull/1849

On 2017-02-20 11:43, Stavros Kontopoulos wrote:


(Resending with the appropriate topic)

Hi,

I would like to start a discussion about next steps for Flink ML.
Currently there is a lot of work going on but needs a push forward.

Some topics to discuss:

a) How several features should be planned and get aligned with Flink
releases.
b) Priorities of what should be done.
c) Basic guidelines for code: styleguides, scikit-learn compliance etc
d) Missing features important for the success of the library, next steps
etc...

Thoughts?

Best,
Stavros





Re: KeyGroupRangeAssignment ?

2017-02-20 Thread Till Rohrmann
Hi Ovidiu,

the way Flink works is to assign key group ranges to operators. For each
element you calculate a hash value and based on that you assign it to a key
group. Thus, in your example, you have either a key group with more than 1
key or multiple key groups with 1 or more keys assigned to an operator.

So what you could try to do is to reduce the number of key groups to your
parallelism via env.setMaxParallelism() and then try to figure a key out
whose hashes are uniformly distributed over the key groups. The key group
assignment is calculated via murmurHash(key.hashCode()) % maxParallelism.

Alternatively if you don’t need a keyed stream, you could try to use a
custom partitioner via DataStream.partitionCustom.

Cheers,
Till
​

On Mon, Feb 20, 2017 at 11:46 AM, Ovidiu-Cristian MARCU <
ovidiu-cristian.ma...@inria.fr> wrote:

> Hi,
>
> Can you please comment on how can I ensure stream input records are
> distributed evenly onto task slots?
> See attached screen Records received issue.
>
> I have a simple application which is applying some window function over a
> stream partitioned as follows:
> (parallelism is equal to the number of keys; records with the same key are
> streamed evenly)
>
> // get the execution environment
> final StreamExecutionEnvironment env = StreamExecutionEnvironment.
> getExecutionEnvironment();
> // get input data by connecting to the socket
> DataStream text = env.socketTextStream("localhost", port, "\n");
> DataStream Long>> input = text.flatMap(...);
> DataStream counts1 = null;
> counts1 =* input.keyBy(0*).countWindow(windowSize, slideSize)
> .apply(new WindowFunction Double, Long, Long>, Double, Tuple, GlobalWindow>() {
> ...
> });
> counts1.writeAsText(params.get("output1"));
> env.execute("Socket Window WordCount”);
>
> Best,
> Ovidiu
>


Re: [Discuss] Organizing Documentation for Configuration Options

2017-02-20 Thread Ufuk Celebi
Unfortunately, I didn't have time to work on this yet and it looks
like I won't have time this week.

I've created the following issue to track the implementation with two sub tasks:
https://issues.apache.org/jira/browse/FLINK-5779

If someone else wants to take this up in the meantime, feel free to do
so. Otherwise, I will try to have a look next week. I can certainly
help with the initial setup if there are questions.


On Tue, Feb 7, 2017 at 3:14 PM, Greg Hogan  wrote:
> +1 and thanks for volunteering for an initial implementation.
>
> Looking forward to auto-scaling of network buffers.
>
> On Tue, Feb 7, 2017 at 3:04 AM, Ufuk Celebi  wrote:
>
>> I fully agree with you Greg.
>>
>> Since this is doomed to get out of sync again very shortly after clean up,
>> I vote to automate this. Stephan introduced the ConfigOption type, which
>> makes it easy to define the options. It's already planned to migrate all
>> configuration options from ConfigConstants to this approach.
>>
>> For an example see here: https://github.com/apache/flink/blob/master/
>> flink-core/src/main/java/org/apache/flink/configuration/
>> HighAvailabilityOptions.java
>>
>> I think that it is possible to build the configuration docs page from this
>> with reasonable effort.
>>
>> This would translate the task to:
>> 1) Automate ConfigOption to HTML/Markdown generation
>> 2) Extend ConfigOption with description fields
>> 3) Migrate ConfigConstants to ConfigOptions
>>
>> I would also volunteer to take a first stab at this.
>>
>> Regarding the network buffers: +1 to your suggestion. Nico (cc'd) is
>> starting to work on automating the network buffer configuration in order to
>> get rid of any manual tuning for most users (because of the issues you
>> described + streaming and batch jobs require different tuning, which
>> complicates things even more).
>>
>> – Ufuk
>>
>> On 6 February 2017 at 19:21:28, Greg Hogan (c...@greghogan.com) wrote:
>> > > Hi devs,
>> >
>> > Flink's Configuration page [1] has grown intimidatingly long
>> > and complex.
>> > Options are described across three main sections: common options
>> > (single
>> > section), advanced options (multiple sections), and full reference.
>> > The
>> > trailing "background" section further describes the most impactful
>> > options
>> > in much greater detail.
>> >
>> > Several recent tickets, and a few outstanding, have added missing
>> > options
>> > to the configuration documentation. I'd like to propose a goal
>> > of
>> > organizing all options in the full reference into alphabetized,
>> > tabular
>> > form (one table per section), much like the system metrics [2].
>> > Columns
>> > would be option name, description, and default value.
>> >
>> > The common and advanced sections could also be converted to tabular
>> > form
>> > with the exception of Kerberos-based Security. Missing options
>> > would be
>> > added to the full reference.
>> >
>> > Lastly, the simple heuristic for configuring network buffers
>> > has prompted
>> > many questions on the mailing list. With the 1.3 release the total
>> > and
>> > number of available buffers is reported through metrics and
>> > in the web
>> > dashboard. My experience has been that the number of required
>> > buffers is
>> > highly dependent on job topology and cluster performance. I
>> > propose keeping
>> > the simple heuristic and description while directing users
>> > to monitor the
>> > balance of available buffers.
>> >
>> > Greg
>> >
>> > [1] https://ci.apache.org/projects/flink/flink-docs-
>> master/setup/config.html
>> > [2]
>> > https://ci.apache.org/projects/flink/flink-docs-
>> master/monitoring/metrics.html#system-metrics
>> > [3]
>> > https://ci.apache.org/projects/flink/flink-docs-
>> master/setup/config.html#configuring-the-network-buffers
>>
>>


Re: Interested to Contribute to Flink

2017-02-20 Thread Till Rohrmann
Hi Karthik,

great to hear that :-) Best you first take a look at the how to contribute
guide [1] which will help you to get started.

[1] http://flink.apache.org/how-to-contribute.html

Cheers,
Till

On Fri, Feb 17, 2017 at 11:01 PM, Karthik Ramakrishnan <
karthik.ramakrishnan...@gmail.com> wrote:

> Hello Dev Team,
>
> I am a grad student at UT Dallas and I am using Flink in my internship as
> well as in my college project. I am really liking it and I wish to
> contribute to the project and help with some issues and tasks
>
> Thanks,
> Karthik
>


[DISCUSS] Flink ML roadmap

2017-02-20 Thread Stavros Kontopoulos
(Resending with the appropriate topic)

Hi,

I would like to start a discussion about next steps for Flink ML.
Currently there is a lot of work going on but needs a push forward.

Some topics to discuss:

a) How several features should be planned and get aligned with Flink
releases.
b) Priorities of what should be done.
c) Basic guidelines for code: styleguides, scikit-learn compliance etc
d) Missing features important for the success of the library, next steps
etc...

Thoughts?

Best,
Stavros


[jira] [Created] (FLINK-5850) implement OAuth 2.0 check in Web Backend API

2017-02-20 Thread Fabian Wollert (JIRA)
Fabian Wollert created FLINK-5850:
-

 Summary: implement OAuth 2.0 check in Web Backend API
 Key: FLINK-5850
 URL: https://issues.apache.org/jira/browse/FLINK-5850
 Project: Flink
  Issue Type: Improvement
  Components: Web Client
Affects Versions: 1.1.4, 1.2.0
Reporter: Fabian Wollert


currently the web frontend is open to public. it would be helpful for us to 
have the frontend and the backend secured by OAuth 2.0.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-5849) Kafka Consumer checkpointed state may contain undefined offsets

2017-02-20 Thread Tzu-Li (Gordon) Tai (JIRA)
Tzu-Li (Gordon) Tai created FLINK-5849:
--

 Summary: Kafka Consumer checkpointed state may contain undefined 
offsets
 Key: FLINK-5849
 URL: https://issues.apache.org/jira/browse/FLINK-5849
 Project: Flink
  Issue Type: Bug
  Components: Kafka Connector
Reporter: Tzu-Li (Gordon) Tai
Assignee: Tzu-Li (Gordon) Tai
Priority: Critical


This is a regression due to FLINK-4280.

In FLINK-4280, all initial offset determination was refactored to be 
consolidated at the start of {{AbstractFetcher#runFetchLoop}}. However, this 
caused checkpoints that were triggered before the method was ever reached to 
contain undefined partition offsets.

Ref:
```
org.apache.flink.client.program.ProgramInvocationException: The program 
execution failed: Job execution failed.
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:427)
at 
org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:101)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:400)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:392)
at 
org.apache.flink.streaming.api.environment.RemoteStreamEnvironment.executeRemotely(RemoteStreamEnvironment.java:209)
at 
org.apache.flink.streaming.api.environment.RemoteStreamEnvironment.execute(RemoteStreamEnvironment.java:173)
at org.apache.flink.test.util.TestUtils.tryExecute(TestUtils.java:32)
at 
org.apache.flink.streaming.connectors.kafka.KafkaConsumerTestBase.runMultipleSourcesOnePartitionExactlyOnceTest(KafkaConsumerTestBase.java:942)
at 
org.apache.flink.streaming.connectors.kafka.Kafka09ITCase.testMultipleSourcesOnePartition(Kafka09ITCase.java:76)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at 
org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:298)
at 
org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:292)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution 
failed.
at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:915)
at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:858)
at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:858)
at 
scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at 
scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.IllegalArgumentException: Restoring from a checkpoint / 
savepoint, but found a partition state Partition: 
KafkaTopicPartition{topic='manyToOneTopic', partition=2}, 
KafkaPartitionHandle=manyToOneTopic-2, offset=(not set) that does not have a 
defined offset.
at 
org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.(KafkaConsumerThread.java:133)
at 
org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.(Kafka09Fetcher.java:113)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09.createFetcher(FlinkKafkaConsumer09.java:182)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:275)
at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:78)
at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55)
at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStre

[jira] [Created] (FLINK-5848) make Flink Web Backend a little bit more restful

2017-02-20 Thread Fabian Wollert (JIRA)
Fabian Wollert created FLINK-5848:
-

 Summary: make Flink Web Backend a little bit more restful
 Key: FLINK-5848
 URL: https://issues.apache.org/jira/browse/FLINK-5848
 Project: Flink
  Issue Type: Improvement
  Components: Webfrontend
Affects Versions: 1.1.4, 1.2.0
Reporter: Fabian Wollert


we are using the web backend for managing flink jobs (cancelling, starting, 
etc.). Unfortunately the Backend is not completely RESTful, the responses are 
mixed.

E.g. 
https://github.com/apache/flink/blob/master/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ClusterOverviewHandler.java
 is showing that if a error occurs in the backend, its not resulting in a HTTP 
error code and the response is not JSON.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-5847) Not able to deploy Flink 1.3-SNAPSHOT on Openshit Paas

2017-02-20 Thread xinyang gao (JIRA)
xinyang gao created FLINK-5847:
--

 Summary: Not able to deploy Flink 1.3-SNAPSHOT on Openshit Paas
 Key: FLINK-5847
 URL: https://issues.apache.org/jira/browse/FLINK-5847
 Project: Flink
  Issue Type: Bug
  Components: JobManager, Local Runtime
Affects Versions: 1.3.0
 Environment: Openshift Paas
Reporter: xinyang gao
Priority: Blocker
 Fix For: 1.3.0


I am now trying to deploy Flink-1.3-Snapshot on Openshift which is a 
docker-environment. I was able to successfully deploy Flink-1.1.2 version on 
Openshift although I encountered the exception that 
"org.apache.flink.runtime.util.EnvironmentInformation  - Error while 
accessing user/group information via Hadoop utils.java.io.IOException: failure 
to login", this exception was handled and finally the jobmanager can be fired 
up on Openshift.

However when it comes 1.3-SNAPSHOT, it will finally incur the error that 
"Exception in thread "main" java.lang.Exception: unable to establish the 
security context" which is caused by "Caused by: 
javax.security.auth.login.LoginException: java.lang.NullPointerException: 
invalid null input: name", then the Jobmanager failed to start. The reason 
looks like is that containers run on openshift are assigned a random uid rather 
than running as the default uid declared by the image. this can cause problems 
for some images. So for the container it does not have a current user name, 
however due to some reason in Flink 1.3-SNAPSHOT Hadoop needs to extract the 
user name from UNIX, if null it will throw an exception and stop firing up.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-5846) CEP: make the operators backwards compatible.

2017-02-20 Thread Kostas Kloudas (JIRA)
Kostas Kloudas created FLINK-5846:
-

 Summary: CEP: make the operators backwards compatible.
 Key: FLINK-5846
 URL: https://issues.apache.org/jira/browse/FLINK-5846
 Project: Flink
  Issue Type: Sub-task
  Components: CEP
Affects Versions: 1.3.0
Reporter: Kostas Kloudas
Assignee: Kostas Kloudas
 Fix For: 1.3.0


This targets making the new CEP operators compatible with their previous 
versions from Flink 1.1 and Flink 1.2.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-5845) CEP: unify key and non-keyed operators

2017-02-20 Thread Kostas Kloudas (JIRA)
Kostas Kloudas created FLINK-5845:
-

 Summary: CEP: unify key and non-keyed operators
 Key: FLINK-5845
 URL: https://issues.apache.org/jira/browse/FLINK-5845
 Project: Flink
  Issue Type: Bug
  Components: CEP
Affects Versions: 1.3.0
Reporter: Kostas Kloudas
Assignee: Kostas Kloudas
 Fix For: 1.3.0


Currently the keyed and non-keyed operators in the CEP library have different 
implementations. This issue targets to unify them into one. 

This new implementation will always be applied on a keyed stream, and in the 
case of non-keyed usecases, the input stream will be keyed on a dummy keye, as 
done in the case of the {{DataStream.windowAll()}} method, where the input 
stream is keyed using the {{NullByteKeySelector}}.

This is a first step towards making the CEP operators rescalable.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)