Confusing debug level log output with Flink 1.5

2018-04-18 Thread Ken Krugler
Hi Till,

I just saw https://issues.apache.org/jira/browse/FLINK-9215 


I’ve been trying out 1.5, and noticed similar output in my logs, e.g.

18/04/18 17:33:47 DEBUG slotpool.SlotPool:751 - Releasing slot with slot 
request id 2c9fba45bd28940c46502d2b003fc437.
org.apache.flink.util.FlinkException: Release multi task slot because all 
children have been released.
at 
org.apache.flink.runtime.jobmaster.slotpool.SlotSharingManager$MultiTaskSlot.releaseChild(SlotSharingManager.java:507)
…
28 or so more lines
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

It seems to be a result of this change in SlotPool.java:

107c8e04be8 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
 (Till Rohrmann  2018-02-22 14:10:29 +0100  741)   
log.debug("Releasing slot with slot request id {}.", slotRequestId, cause);

While I understand the value of the call stack, it’s pretty confusing to see 
what looks like a bunch of stack traces showing up in the logs.

I can obviously edit the log4j.properties file to hide this, but that’s not so 
easy when running in EMR, for example.

And I think you’ll continue to get bug reports :)

Could this be turned into a trace-level msg?

Thanks,

— Ken


http://about.me/kkrugler
+1 530-210-6378



Re: Help with OneInputStreamOperatorTestHarness

2018-04-18 Thread Chris Schneider
Hi Ted,

I should have written that we’re using Flink 1.4.0.

Thanks for the suggestion re: FLINK-8268 
; it could well be the issue 
(though the pull request  
appears fairly complex so I’ll need some time to study it).

Best Regards,

- Chris

> On Apr 18, 2018, at 6:33 PM, Ted Yu  wrote:
> 
> Which release are you using ?
> 
> See if the work around from FLINK-8268 helps.
> 
> Cheers
> 
> On Wed, Apr 18, 2018 at 6:26 PM, Chris Schneider 
> > wrote:
> Hi Gang,
> 
> I’m having trouble getting my streaming unit test to work. The following code:
> 
> @Test
> public void testDemo() throws Throwable {
> OneInputStreamOperatorTestHarness 
> testHarness =
> new KeyedOneInputStreamOperatorTestHarness CrawlStateUrl>(
> new StreamFlatMap<>(new DomainDBFunction()),
> new PldKeySelector(),
> BasicTypeInfo.STRING_TYPE_INFO,
> 1,
> 1,
> 0);
> testHarness.setup();
> testHarness.open();
> 
> for (int i = 0; i < 10; i++) {
> String urlString = String.format("https://domain-%d.com/page1 
> ", i);
> CrawlStateUrl url = new CrawlStateUrl(new RawUrl(urlString));
> testHarness.processElement(new StreamRecord<>(url));
> }
> testHarness.snapshot(0L, 0L);
> }
> 
> 
> Generates the following exception:
> 
> DomainDBFunctionTest.testDemo
> testDemo(com.scaleunlimited.flinkcrawler.functions.DomainDBFunctionTest)
> java.lang.Exception: Could not complete snapshot 0 for operator MockTask 
> (1/1).
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:379)
>   at 
> org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.snapshot(AbstractStreamOperatorTestHarness.java:459)
>   at 
> com.scaleunlimited.flinkcrawler.functions.DomainDBFunctionTest.testDemo(DomainDBFunctionTest.java:51)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
>   at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>   at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>   at 
> org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:50)
>   at 
> org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38)
>   at 
> org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:459)
>   at 
> org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:675)
>   at 
> org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:382)
>   at 
> org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:192)
> Caused by: java.lang.NullPointerException
>   at 
> org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:58)
>   at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:95)
>   at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:357)
>   ... 26 more
> 
> I tried explicitly calling testHarness.setStateBackend(new 
> MemoryStateBackend()), 

Help with OneInputStreamOperatorTestHarness

2018-04-18 Thread Chris Schneider
Hi Gang,

I’m having trouble getting my streaming unit test to work. The following code:

@Test
public void testDemo() throws Throwable {
OneInputStreamOperatorTestHarness 
testHarness =
new KeyedOneInputStreamOperatorTestHarness(
new StreamFlatMap<>(new DomainDBFunction()),
new PldKeySelector(),
BasicTypeInfo.STRING_TYPE_INFO,
1,
1,
0);
testHarness.setup();
testHarness.open();

for (int i = 0; i < 10; i++) {
String urlString = String.format("https://domain-%d.com/page1;, i);
CrawlStateUrl url = new CrawlStateUrl(new RawUrl(urlString));
testHarness.processElement(new StreamRecord<>(url));
}
testHarness.snapshot(0L, 0L);
}


Generates the following exception:

DomainDBFunctionTest.testDemo
testDemo(com.scaleunlimited.flinkcrawler.functions.DomainDBFunctionTest)
java.lang.Exception: Could not complete snapshot 0 for operator MockTask (1/1).
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:379)
at 
org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.snapshot(AbstractStreamOperatorTestHarness.java:459)
at 
com.scaleunlimited.flinkcrawler.functions.DomainDBFunctionTest.testDemo(DomainDBFunctionTest.java:51)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at 
org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:50)
at 
org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38)
at 
org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:459)
at 
org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:675)
at 
org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:382)
at 
org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:192)
Caused by: java.lang.NullPointerException
at 
org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:58)
at 
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:95)
at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:357)
... 26 more

I tried explicitly calling testHarness.setStateBackend(new 
MemoryStateBackend()), but that didn’t seem to help. I could provide more of my 
code (e.g., PldKeySelector, DomainDBFunction, CrawlStateUrl, RawUrl, etc.), but 
that doesn’t seem like it would have much to do with the problem.

Any advice would be most welcome.

Thanks,

- Chris

-
Chris Schneider
http://www.scaleunlimited.com
custom big data solutions
-



debug for Flink

2018-04-18 Thread Qian Ye
Hi

I’m wondering if new debugging methods/tools  are urgent for Flink development. 
I know there already exists some debug methods for Flink, e.g., remote 
debugging of flink 
clusters(https://cwiki.apache.org/confluence/display/FLINK/Remote+Debugging+of+Flink+Clusters
 
).
 But are they are convenient enough? 

Best regards.

Re: FlinkML

2018-04-18 Thread Christophe Salperwyck
Hi,

You could try to plug MOA/Weka library too. I did some preliminary work
with that:
https://moa.cms.waikato.ac.nz/moa-with-apache-flink/

but then it is not anymore FlinkML algorithms.

Best regards,
Christophe


2018-04-18 21:13 GMT+02:00 shashank734 :

> There are no active discussions or guide on that. But I found this example
> in
> the repo :
>
> https://github.com/apache/flink/blob/master/flink-examples/
> flink-examples-streaming/src/main/java/org/apache/flink/
> streaming/examples/ml/IncrementalLearningSkeleton.java
>  flink-examples-streaming/src/main/java/org/apache/flink/
> streaming/examples/ml/IncrementalLearningSkeleton.java>
>
> Which is trying to do the same thing. Although I haven't checked this yet.
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.
> nabble.com/
>


Re: why doesn't the over-window-aggregation sort the element(considering watermark) before processing?

2018-04-18 Thread Yan Zhou [FDS Science]
nvm, I figure it out. The event is not process once it's arrived. It's 
registered to processed in event time. It make sense.


best

Yan


From: Yan Zhou [FDS Science] 
Sent: Wednesday, April 18, 2018 12:56:58 PM
To: Fabian Hueske
Cc: user
Subject: Re: why doesn't the over-window-aggregation sort the 
element(considering watermark) before processing?


Hi Fabian,


Thanks for the reply.



I think here is the problem. Currently, the timestamp of an event is compared 
with previous processed element's timestamp, instead of watermark, to determine 
if it's late.


To my understanding, even the order of emitted event in preceding operator is 
perfectly sorted by event time, because of shuffle/buffing or something else, 
the order or arrival in current operator is not guaranteed. And watermark is 
not considered within the over window operator, I will image that a substantial 
portion of the elements might be dropped. How can I avoid that?


Looking forward to hear your reply.


Best

Yan



From: Fabian Hueske 
Sent: Wednesday, April 18, 2018 1:04:43 AM
To: Yan Zhou [FDS Science]
Cc: user
Subject: Re: why doesn't the over-window-aggregation sort the 
element(considering watermark) before processing?

The over window operates on an unbounded stream of data. Hence it is not 
possible to sort the complete stream.
Instead we can sort ranges of the stream. Flink uses watermarks to define these 
ranges.

The operator processes the records in timestamp order that are not late, i.e., 
have timestamps larger than the last watermark.
In principle there are different ways to handle records that violate this 
condition. In the current implementation of the operator we simply drop these 
records.

At the current state, the only thing to avoid records from being dropped is to 
use more conservative watermarks. Note that this will increase the processing 
latency.

Best, Fabian

2018-04-18 8:55 GMT+02:00 Yan Zhou [FDS Science] 
>:

Hi,


I use bounded over-window  aggregation in my application. However, sometimes 
some input elements are "discarded" and not generating output. By reading the 
source code of RowTimeBoundedRangeOver.scala, I realize the record is actually 
discarded if it is out of order. Please see the quoted code block below. Please 
help me to understand why don't we sort the record first? Said we are using 
BoundedOutOfOrdernessTimestampExtractor. we can use watermark to select a 
portion of the elements to do the sorting. when watermark proceeds, process the 
elements that are before the watermark and extend the portion of elements for 
sorting.



Best

Yan



override def processElement(
inputC: CRow,
ctx: ProcessFunction[CRow, CRow]#Context,
out: Collector[CRow]): Unit = {
// triggering timestamp for trigger calculation
val triggeringTs = input.getField(rowTimeIdx).asInstanceOf[Long]

val lastTriggeringTs = lastTriggeringTsState.value

// check if the data is expired, if not, save the data and register event time 
timer
if (triggeringTs > lastTriggeringTs) {
// put in cache, and register timer to process/clean
// ...
}else{
// DISCARD
}
}




Re: why doesn't the over-window-aggregation sort the element(considering watermark) before processing?

2018-04-18 Thread Yan Zhou [FDS Science]
Hi Fabian,


Thanks for the reply.



I think here is the problem. Currently, the timestamp of an event is compared 
with previous processed element's timestamp, instead of watermark, to determine 
if it's late.


To my understanding, even the order of emitted event in preceding operator is 
perfectly sorted by event time, because of shuffle/buffing or something else, 
the order or arrival in current operator is not guaranteed. And watermark is 
not considered within the over window operator, I will image that a substantial 
portion of the elements might be dropped. How can I avoid that?


Looking forward to hear your reply.


Best

Yan



From: Fabian Hueske 
Sent: Wednesday, April 18, 2018 1:04:43 AM
To: Yan Zhou [FDS Science]
Cc: user
Subject: Re: why doesn't the over-window-aggregation sort the 
element(considering watermark) before processing?

The over window operates on an unbounded stream of data. Hence it is not 
possible to sort the complete stream.
Instead we can sort ranges of the stream. Flink uses watermarks to define these 
ranges.

The operator processes the records in timestamp order that are not late, i.e., 
have timestamps larger than the last watermark.
In principle there are different ways to handle records that violate this 
condition. In the current implementation of the operator we simply drop these 
records.

At the current state, the only thing to avoid records from being dropped is to 
use more conservative watermarks. Note that this will increase the processing 
latency.

Best, Fabian

2018-04-18 8:55 GMT+02:00 Yan Zhou [FDS Science] 
>:

Hi,


I use bounded over-window  aggregation in my application. However, sometimes 
some input elements are "discarded" and not generating output. By reading the 
source code of RowTimeBoundedRangeOver.scala, I realize the record is actually 
discarded if it is out of order. Please see the quoted code block below. Please 
help me to understand why don't we sort the record first? Said we are using 
BoundedOutOfOrdernessTimestampExtractor. we can use watermark to select a 
portion of the elements to do the sorting. when watermark proceeds, process the 
elements that are before the watermark and extend the portion of elements for 
sorting.



Best

Yan



override def processElement(
inputC: CRow,
ctx: ProcessFunction[CRow, CRow]#Context,
out: Collector[CRow]): Unit = {
// triggering timestamp for trigger calculation
val triggeringTs = input.getField(rowTimeIdx).asInstanceOf[Long]

val lastTriggeringTs = lastTriggeringTsState.value

// check if the data is expired, if not, save the data and register event time 
timer
if (triggeringTs > lastTriggeringTs) {
// put in cache, and register timer to process/clean
// ...
}else{
// DISCARD
}
}




Re: FlinkML

2018-04-18 Thread shashank734
There are no active discussions or guide on that. But I found this example in
the repo :

https://github.com/apache/flink/blob/master/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java

  

Which is trying to do the same thing. Although I haven't checked this yet.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: FlinkML

2018-04-18 Thread Christophe Jolif
Szymon,

The short answer is no. See:
http://mail-archives.apache.org/mod_mbox/flink-user/201802.mbox/%3ccaadrtt39ciiec1uzwthzgnbkjxs-_h5yfzowhzph_zbidux...@mail.gmail.com%3E


On Mon, Apr 16, 2018 at 11:25 PM, Szymon Szczypiński 
wrote:

> Hi,
>
> i wonder if there are possibility to build FlinkML streaming job not a
> batch job. Examples on https://ci.apache.org/projects
> /flink/flink-docs-release-1.4/dev/libs/ml/ are only batch examples.
>
> Is there any possibility?
>
>
> Best regards.
>
>


-- 
Christophe


Re: Tracking deserialization errors

2018-04-18 Thread Elias Levy
Either proposal would work.  In the later case, at a minimum we'd need a
way to identify the source within the metric.  The basic error metric would
then allow us to go into the logs to determine the cause of the error, as
we already record the message causing trouble in the log.


On Mon, Apr 16, 2018 at 4:42 AM, Fabian Hueske  wrote:

> Thanks for starting the discussion Elias.
>
> I see two ways to address this issue.
>
> 1) Add an interface that a deserialization schema can implement to
> register metrics. Each source would need to check for the interface and
> call it to setup metrics.
> 2) Check for null returns in the source functions and increment a
> respective counter.
>
> In both cases, we need to touch the source connectors.
>
> I see that passing information such as topic name, partition, and offset
> are important debugging information. However, I don't think that metrics
> would be good to capture them.
> In that case, log files might be a better approach.
>
> I'm not sure to what extend the source functions (Kafka, Kinesis) support
> such error tracking.
> Adding Gordon to the thread who knows the internals of the connectors.
>
>


Re: Substasks - Uneven allocation

2018-04-18 Thread Ken Krugler
Hi Pedro,

That’s interesting, and something we’d like to be able to control as well.

I did a little research, and it seems like (with some stunts) there could be a 
way to achieve this via CoLocationConstraint 
/CoLocationGroup
 magic.

Though CoLocationConstraint is for ensuring the different subtasks of different 
JobVertices are executed on the same Instance 

 (Task Manager), versus ensuring they’re executed on different Task Managers.

The only thing I found on the list was this snippet (from Till), a few years 
back...

> If your requirement is that O_i will be executed in the same slot as P_i, 
> then you have to add the corresponding JobVertices to a CoLocationGroup. At 
> the moment this is not really exposed but you could try to get the JobGraph 
> from the StreamGraph.getJobGraph and then use JobGraph.getVertices to get the 
> JobVertices. Then you have to find out which JobVertices accommodate your 
> operators. Once this is done, you can colocate them via the 
> JobVertex.setStrictlyCoLocatedWith method. This might solve your problem, but 
> I haven’t tested it myself.
> 
Hoping someone with actual knowledge of the task to slot allocation logic can 
chime in here with a solution :)

— Ken



> On Apr 18, 2018, at 9:10 AM, PedroMrChaves  wrote:
> 
> Hello,
> 
> I have a job that has one async operational node (i.e. implements
> AsyncFunction). This Operational node will spawn multiple threads that
> perform heavy tasks (cpu bound). 
> 
> I have a Flink Standalone cluster deployed on two machines of 32 cores and
> 128 gb of RAM, each machine has one task manager and one Job Manager. When I
> deploy the job, all of the subtasks from the async operational node end up
> on the same machine, which causes it to have a much higher cpu load then the
> other. 
> 
> I've researched ways to overcome this issue, but I haven't found a solution
> to my problem. 
> Ideally, the subtasks would be evenly split across both machines. 
> 
> Can this problem be solved somehow? 
> 
> Regards,
> Pedro Chaves. 
> 
> 
> 
> -
> Best Regards,
> Pedro Chaves
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


http://about.me/kkrugler
+1 530-210-6378



Substasks - Uneven allocation

2018-04-18 Thread PedroMrChaves
Hello,

I have a job that has one async operational node (i.e. implements
AsyncFunction). This Operational node will spawn multiple threads that
perform heavy tasks (cpu bound). 

I have a Flink Standalone cluster deployed on two machines of 32 cores and
128 gb of RAM, each machine has one task manager and one Job Manager. When I
deploy the job, all of the subtasks from the async operational node end up
on the same machine, which causes it to have a much higher cpu load then the
other. 

I've researched ways to overcome this issue, but I haven't found a solution
to my problem. 
Ideally, the subtasks would be evenly split across both machines. 

Can this problem be solved somehow? 

Regards,
Pedro Chaves. 



-
Best Regards,
Pedro Chaves
--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: State-machine-based search logic in Flink ?

2018-04-18 Thread Fabian Hueske
As I said before, this is work in progress and there is a pending pull
request (PR) to add this feature.
So no, MATCH_RECOGNIZE is not supported by Flink yet and hence also not
documented.

Best, Fabian

2018-04-18 10:12 GMT+02:00 Esa Heikkinen :

> Hi
>
>
>
> I did mean like “finding series of consecutive events”, as it was
> described in [2].
>
>
>
> Are these features already in Flink and how well they are documented ?
>
>
>
> Can I use Scala or only Java ?
>
>
>
> I would like some example codes, it they are exist ?
>
>
>
> Best, Esa
>
>
>
> *From:* Fabian Hueske 
> *Sent:* Tuesday, April 17, 2018 1:41 PM
> *To:* Esa Heikkinen 
> *Cc:* user@flink.apache.org
> *Subject:* Re: State-machine-based search logic in Flink ?
>
>
>
> Hi Esa,
>
> What do you mean by "individual searches in the Table API"?
>
> There is some work (a pending PR [1]) to integrate the MATCH_RECOGNIZE
> clause (SQL 2016) [2] into Flink's SQL which basically adds a SQL syntax
> for the CEP library.
>
> Best, Fabian
>
>
> [1] https://github.com/apache/flink/pull/4502
> [2] https://modern-sql.com/feature/match_recognize
>
>
>
> 2018-04-17 10:07 GMT+02:00 Esa Heikkinen :
>
> Hi
>
>
>
> I am not sure I have understand all, but it is possible to build some kind
> of state-machine-based search logic for
>
> example on top of the individual searches in Table API (using
> CsvTableSource) ?
>
>
>
> Best, Esa
>
>
>
>
>


Jars uploaded to jobmanager are deleted but not free'ed by OS

2018-04-18 Thread Jeroen Steggink | knowsy

Sorry, I meant the jobmanager, not the taskmanager.


On 18-Apr-18 15:44, Jeroen Steggink | knowsy wrote:

Hi,

I'm having some troubles running the Flink taskmanager in a Docker 
container (OpenShift). The container's internal storage is filling up 
because the deleted jar files in blob storage are probably still in 
use and therefore resources are not free'ed.


We are using Apache Beam to start an Apache Flink process, so the Jars 
are sent to Apache Flink everytime we fire a batch.


I enabled the debug logging, but I can't seem to find anything showing 
these deletes. Maybe someone has an idea why resources are not 
free'ed? I checked the blob store, and it indeed are the jars.


208875129    0 lr-x--   1 100015 root   64 Apr 18 
12:58 /proc/1/fd/142 -> 
/var/tmp/flink/blobStore-580cc38d-44e4-45a1-8922-e21c00d73dec/job_90964be94a2f4471844a00284e44fb32/blob_p-5202910b36af8c12548df97a7e4a057b77786217-ffa3f85003b1f124cd1cccdb0f72a8e0\ 
(deleted)


208875130    0 lr-x--   1 100015 root   64 Apr 18 
12:58 /proc/1/fd/143 -> 
/var/tmp/flink/blobStore-580cc38d-44e4-45a1-8922-e21c00d73dec/job_b7c00268b488411a8f6e1af984bcdcc2/blob_p-5202910b36af8c12548df97a7e4a057b77786217-8bab07adb34d4ce8de20846ec72059ce\ 
(deleted)


208875131    0 lr-x--   1 100015 root   64 Apr 18 
12:58 /proc/1/fd/144 -> 
/var/tmp/flink/blobStore-580cc38d-44e4-45a1-8922-e21c00d73dec/job_46183ac02f1dcd3543f8e481f59948b5/blob_p-5202910b36af8c12548df97a7e4a057b77786217-ac6bc86d8932e7d631416d9bafab4ab4\ 
(deleted)


208875132    0 lr-x--   1 100015 root   64 Apr 18 
12:58 /proc/1/fd/145 -> 
/var/tmp/flink/blobStore-580cc38d-44e4-45a1-8922-e21c00d73dec/job_717bf3f4b3f80700c1cc44d6076c2aca/blob_p-5202910b36af8c12548df97a7e4a057b77786217-780dd2383dee11a2361ac20a5da7bbb8\ 
(deleted)


208875133    0 lr-x--   1 100015 root   64 Apr 18 
12:58 /proc/1/fd/146 -> 
/var/tmp/flink/blobStore-580cc38d-44e4-45a1-8922-e21c00d73dec/job_22e67caac65c9c4e537caa3b072b8cc3/blob_p-5202910b36af8c12548df97a7e4a057b77786217-e0b523663672c641b368e5d1440b0b70\ 
(deleted)


208875134    0 lr-x--   1 100015 root   64 Apr 18 
12:58 /proc/1/fd/147 -> 
/var/tmp/flink/blobStore-580cc38d-44e4-45a1-8922-e21c00d73dec/job_3afe5b02ccb95b3494a1acd8677c66f0/blob_p-5202910b36af8c12548df97a7e4a057b77786217-9a8cd48c09a4b518adf0309a0255b339\ 
(deleted)


208875135    0 lr-x--   1 100015 root   64 Apr 18 
12:58 /proc/1/fd/148 -> 
/var/tmp/flink/blobStore-580cc38d-44e4-45a1-8922-e21c00d73dec/job_cb024c561531905e81c9768ec62a2fe0/blob_p-5202910b36af8c12548df97a7e4a057b77786217-0addc83aaf9a2f781528ad035fd79cc8\ 
(deleted)


208875136    0 lr-x--   1 100015 root   64 Apr 18 
12:58 /proc/1/fd/149 -> 
/var/tmp/flink/blobStore-580cc38d-44e4-45a1-8922-e21c00d73dec/job_d3dc0b0608d71ffa77575771f088e80e/blob_p-5202910b36af8c12548df97a7e4a057b77786217-c9015b012ec4b249f32872471a31a500\ 
(deleted)


208875137    0 lr-x--   1 100015 root   64 Apr 18 
12:58 /proc/1/fd/150 -> 
/var/tmp/flink/blobStore-580cc38d-44e4-45a1-8922-e21c00d73dec/job_1b4cdb127bb2c345e1b099e3e446bf58/blob_p-5202910b36af8c12548df97a7e4a057b77786217-ac4457b393b7ff0565c47c1e38786005\ 
(deleted)


208875138    0 lr-x--   1 100015 root   64 Apr 18 
12:58 /proc/1/fd/151 -> 
/var/tmp/flink/blobStore-580cc38d-44e4-45a1-8922-e21c00d73dec/job_8c23503c614a88e8c8f7a54a31e41886/blob_p-5202910b36af8c12548df97a7e4a057b77786217-d096b3ef150bf7e8e98224e0b8f17292\ 
(deleted)


208875139    0 lr-x--   1 100015 root   64 Apr 18 
12:58 /proc/1/fd/152 -> 
/var/tmp/flink/blobStore-580cc38d-44e4-45a1-8922-e21c00d73dec/job_e7c8132da483bd14e5abfe9390adeeb1/blob_p-5202910b36af8c12548df97a7e4a057b77786217-f370d8dcad0cb36581f9a5f1568e1487\ 
(deleted)


208875140    0 lr-x--   1 100015 root   64 Apr 18 
12:58 /proc/1/fd/153 -> 
/var/tmp/flink/blobStore-580cc38d-44e4-45a1-8922-e21c00d73dec/job_cbee9f15b0c6adba0f5ddb67b587b607/blob_p-5202910b36af8c12548df97a7e4a057b77786217-9ae77c3419d77adab8f44258ca4290c5\ 
(deleted)


208875141    0 lr-x--   1 100015 root   64 Apr 18 
12:58 /proc/1/fd/154 -> 
/var/tmp/flink/blobStore-580cc38d-44e4-45a1-8922-e21c00d73dec/job_29c5a145ae231be4c0d53717625c3938/blob_p-5202910b36af8c12548df97a7e4a057b77786217-76bb4d83f962a887d41effb2646bd63d\ 
(deleted)




There are several places in the code where the returned boolean of the 
file delete is not read, so we have no clue if the file was deleted 
succesfully. Maybe it can be changed to something like 
java.nio.file.Files.delete to get an IOException when something goes 
wrong.  Though this is not a solution, but it can make it more 
transparent when things go wrong.


Thanks,
Jeroen





Jars uploaded to taskmanager are deleted but not free'ed by OS

2018-04-18 Thread Jeroen Steggink | knowsy

Hi,

I'm having some troubles running the Flink taskmanager in a Docker 
container (OpenShift). The container's internal storage is filling up 
because the deleted jar files in blob storage are probably still in use 
and therefore resources are not free'ed.


We are using Apache Beam to start an Apache Flink process, so the Jars 
are sent to Apache Flink everytime we fire a batch.


I enabled the debug logging, but I can't seem to find anything showing 
these deletes. Maybe someone has an idea why resources are not free'ed? 
I checked the blob store, and it indeed are the jars.


208875129    0 lr-x--   1 100015 root   64 Apr 18 12:58 
/proc/1/fd/142 -> 
/var/tmp/flink/blobStore-580cc38d-44e4-45a1-8922-e21c00d73dec/job_90964be94a2f4471844a00284e44fb32/blob_p-5202910b36af8c12548df97a7e4a057b77786217-ffa3f85003b1f124cd1cccdb0f72a8e0\
 (deleted)

208875130    0 lr-x--   1 100015 root   64 Apr 18 12:58 
/proc/1/fd/143 -> 
/var/tmp/flink/blobStore-580cc38d-44e4-45a1-8922-e21c00d73dec/job_b7c00268b488411a8f6e1af984bcdcc2/blob_p-5202910b36af8c12548df97a7e4a057b77786217-8bab07adb34d4ce8de20846ec72059ce\
 (deleted)

208875131    0 lr-x--   1 100015 root   64 Apr 18 12:58 
/proc/1/fd/144 -> 
/var/tmp/flink/blobStore-580cc38d-44e4-45a1-8922-e21c00d73dec/job_46183ac02f1dcd3543f8e481f59948b5/blob_p-5202910b36af8c12548df97a7e4a057b77786217-ac6bc86d8932e7d631416d9bafab4ab4\
 (deleted)

208875132    0 lr-x--   1 100015 root   64 Apr 18 12:58 
/proc/1/fd/145 -> 
/var/tmp/flink/blobStore-580cc38d-44e4-45a1-8922-e21c00d73dec/job_717bf3f4b3f80700c1cc44d6076c2aca/blob_p-5202910b36af8c12548df97a7e4a057b77786217-780dd2383dee11a2361ac20a5da7bbb8\
 (deleted)

208875133    0 lr-x--   1 100015 root   64 Apr 18 12:58 
/proc/1/fd/146 -> 
/var/tmp/flink/blobStore-580cc38d-44e4-45a1-8922-e21c00d73dec/job_22e67caac65c9c4e537caa3b072b8cc3/blob_p-5202910b36af8c12548df97a7e4a057b77786217-e0b523663672c641b368e5d1440b0b70\
 (deleted)

208875134    0 lr-x--   1 100015 root   64 Apr 18 12:58 
/proc/1/fd/147 -> 
/var/tmp/flink/blobStore-580cc38d-44e4-45a1-8922-e21c00d73dec/job_3afe5b02ccb95b3494a1acd8677c66f0/blob_p-5202910b36af8c12548df97a7e4a057b77786217-9a8cd48c09a4b518adf0309a0255b339\
 (deleted)

208875135    0 lr-x--   1 100015 root   64 Apr 18 12:58 
/proc/1/fd/148 -> 
/var/tmp/flink/blobStore-580cc38d-44e4-45a1-8922-e21c00d73dec/job_cb024c561531905e81c9768ec62a2fe0/blob_p-5202910b36af8c12548df97a7e4a057b77786217-0addc83aaf9a2f781528ad035fd79cc8\
 (deleted)

208875136    0 lr-x--   1 100015 root   64 Apr 18 12:58 
/proc/1/fd/149 -> 
/var/tmp/flink/blobStore-580cc38d-44e4-45a1-8922-e21c00d73dec/job_d3dc0b0608d71ffa77575771f088e80e/blob_p-5202910b36af8c12548df97a7e4a057b77786217-c9015b012ec4b249f32872471a31a500\
 (deleted)

208875137    0 lr-x--   1 100015 root   64 Apr 18 12:58 
/proc/1/fd/150 -> 
/var/tmp/flink/blobStore-580cc38d-44e4-45a1-8922-e21c00d73dec/job_1b4cdb127bb2c345e1b099e3e446bf58/blob_p-5202910b36af8c12548df97a7e4a057b77786217-ac4457b393b7ff0565c47c1e38786005\
 (deleted)

208875138    0 lr-x--   1 100015 root   64 Apr 18 12:58 
/proc/1/fd/151 -> 
/var/tmp/flink/blobStore-580cc38d-44e4-45a1-8922-e21c00d73dec/job_8c23503c614a88e8c8f7a54a31e41886/blob_p-5202910b36af8c12548df97a7e4a057b77786217-d096b3ef150bf7e8e98224e0b8f17292\
 (deleted)

208875139    0 lr-x--   1 100015 root   64 Apr 18 12:58 
/proc/1/fd/152 -> 
/var/tmp/flink/blobStore-580cc38d-44e4-45a1-8922-e21c00d73dec/job_e7c8132da483bd14e5abfe9390adeeb1/blob_p-5202910b36af8c12548df97a7e4a057b77786217-f370d8dcad0cb36581f9a5f1568e1487\
 (deleted)

208875140    0 lr-x--   1 100015 root   64 Apr 18 12:58 
/proc/1/fd/153 -> 
/var/tmp/flink/blobStore-580cc38d-44e4-45a1-8922-e21c00d73dec/job_cbee9f15b0c6adba0f5ddb67b587b607/blob_p-5202910b36af8c12548df97a7e4a057b77786217-9ae77c3419d77adab8f44258ca4290c5\
 (deleted)

208875141    0 lr-x--   1 100015 root   64 Apr 18 12:58 
/proc/1/fd/154 -> 
/var/tmp/flink/blobStore-580cc38d-44e4-45a1-8922-e21c00d73dec/job_29c5a145ae231be4c0d53717625c3938/blob_p-5202910b36af8c12548df97a7e4a057b77786217-76bb4d83f962a887d41effb2646bd63d\
 (deleted)



There are several places in the code where the returned boolean of the 
file delete is not read, so we have no clue if the file was deleted 
succesfully. Maybe it can be changed to something like 
java.nio.file.Files.delete to get an IOException when something goes 
wrong.  Though this is not a solution, but it can make it more 
transparent when things go wrong.


Thanks,
Jeroen



Outputting the content of in flight session windows

2018-04-18 Thread jelmer
I defined a session window and I would like to write the contents of the
window to storage before the window closes

Initially I was doing this by setting a CountTrigger.of(1) on the session
window. But this leads to very frequent writes.

To remedy this i switched to
a ContinuousProcessingTimeTrigger.of(Time.minutes(1)) but this seems to not
perform well, and if i understand it correctly it will fire even if no new
elements have been added to the window since the last fire.

I then proceeded to create my own trigger implementation
 that
aims to Fire at most once every minute (processing time)

This works In simple test scenario's on my developer machine but somehow
fails on production.

So I have two questions.

1. What would be the best way to periodically output content of a session
window
2. Can anyone come up with a plausible reason why my custom trigger never
fires ?


Re: Flink job testing with

2018-04-18 Thread Tzu-Li (Gordon) Tai
Hi,

The docs here [1] provide some example snippets of using the Kafka connector
to consume from / write to Kafka topics.

Once you consumed a `DataStream` from a Kafka topic using the Kafka
consumer, you can use Flink transformations such as map, flatMap, etc. to
perform processing on the records.

Hope this helps!

Cheers,
Gordon

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kafka.html



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Tracking deserialization errors

2018-04-18 Thread Tzu-Li (Gordon) Tai
Hi,

These are valid concerns. And yes, AFAIK users have been writing to logs within 
the deserialization schema to track this. The connectors as of now have no 
logging themselves in case of a skipped record.

I think we can implement both logging and metrics to track this, most of which 
you have already brought up.
For logging, the information should contain topic, partition, and offset for 
debugging.
For metrics, we should be able to use the user variable functionality to have 
skip counters that can be grouped by topic / partition / offset.

Though, I’m not sure how helpful this would be in practice.
I’ve opened a JIRA for this issue for further discussion: 
https://issues.apache.org/jira/browse/FLINK-9204

Cheers,
Gordon
On 16 April 2018 at 7:43:00 PM, Fabian Hueske (fhue...@gmail.com) wrote:

Thanks for starting the discussion Elias.

I see two ways to address this issue.

1) Add an interface that a deserialization schema can implement to register 
metrics. Each source would need to check for the interface and call it to setup 
metrics.
2) Check for null returns in the source functions and increment a respective 
counter.

In both cases, we need to touch the source connectors.

I see that passing information such as topic name, partition, and offset are 
important debugging information. However, I don't think that metrics would be 
good to capture them.
In that case, log files might be a better approach.

I'm not sure to what extend the source functions (Kafka, Kinesis) support such 
error tracking.
Adding Gordon to the thread who knows the internals of the connectors.

Best, Fabian

2018-04-08 17:53 GMT+02:00 Alexander Smirnov :
I have the same question. In case of kafka source, it would be good to know 
topic name and offset of the corrupted message for further investigation.
Looks like the only option is to write messages into a log file

On Fri, Apr 6, 2018 at 9:12 PM Elias Levy  wrote:
I was wondering how are folks tracking deserialization errors.  The 
AbstractDeserializationSchema interface provides no mechanism for the 
deserializer to instantiate a metric counter, and "deserialize" must return a 
null instead of raising an exception in case of error if you want your job to 
continue functioning during a deserialization error.  But that means such 
errors are invisible.

Thoughts?



Re: Managing state migrations with Flink and Avro

2018-04-18 Thread Timo Walther
Thank you. Maybe we already identified the issue (see 
https://issues.apache.org/jira/browse/FLINK-9202). I will use your code 
to verify it.


Regards,
Timo


Am 18.04.18 um 14:07 schrieb Petter Arvidsson:

Hi Timo,

Please find the generated class (for the second schema) attached.

Regards,
Petter

On Wed, Apr 18, 2018 at 11:32 AM, Timo Walther > wrote:


Hi Petter,

could you share the source code of the class that Avro generates
out of this schema?

Thank you.

Regards,
Timo

Am 18.04.18 um 11:00 schrieb Petter Arvidsson:

Hello everyone,

I am trying to figure out how to set up Flink with Avro for state
management (especially the content of snapshots) to enable state
migrations (e.g. adding a nullable fields to a class). So far, in
version 1.4.2, I tried to explicitly provide an instance of "new
AvroTypeInfo(Accumulator.getClass())" where accumulator is a very
simple Avro generated SpecificRecordBase of the following schema:

{"namespace": "io.relayr.flink",
 "type": "record",
 "name": "Accumulator",
 "fields": [
 {"name": "accumulator", "type": "int"}
 ]
}

This successfully saves the state to the snapshot. When I then
try to load the snapshot with an updated schema (adding the
nullable field) it fails. Schema looks like this:

{"namespace": "io.relayr.flink",
 "type": "record",
 "name": "Accumulator",
 "fields": [
 {"name": "accumulator", "type": "int"},
 {"name": "newStuff", "type": ["int", "null"]}
 ]
}

When I try to restart the Job from the snapshot, I get the
following exception:
2018-04-17 09:35:23,519 WARN
org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil
- Deserialization of serializer errored; replacing with null.
java.io.IOException: Unloadable class for type serializer.
...
Caused by: java.io.InvalidClassException:
io.relayr.flink.Accumulator; local class incompatible: stream
classdesc serialVersionUID = -3555733236161157838, local class
serialVersionUID = 5291033088112484292

Which is true, Avro tools do generate a new serialization ID for
the bean, I just didn't expect it to be used and expected the
Avro schema to be used instead? Did anyone get this working? What
am I getting wrong?

Best regards,
Petter








Re: Managing state migrations with Flink and Avro

2018-04-18 Thread Petter Arvidsson
Hi Timo,

Please find the generated class (for the second schema) attached.

Regards,
Petter

On Wed, Apr 18, 2018 at 11:32 AM, Timo Walther  wrote:

> Hi Petter,
>
> could you share the source code of the class that Avro generates out of
> this schema?
>
> Thank you.
>
> Regards,
> Timo
>
> Am 18.04.18 um 11:00 schrieb Petter Arvidsson:
>
> Hello everyone,
>
> I am trying to figure out how to set up Flink with Avro for state
> management (especially the content of snapshots) to enable state migrations
> (e.g. adding a nullable fields to a class). So far, in version 1.4.2, I
> tried to explicitly provide an instance of "new 
> AvroTypeInfo(Accumulator.getClass())"
> where accumulator is a very simple Avro generated SpecificRecordBase of the
> following schema:
>
> {"namespace": "io.relayr.flink",
>  "type": "record",
>  "name": "Accumulator",
>  "fields": [
>  {"name": "accumulator", "type": "int"}
>  ]
> }
>
> This successfully saves the state to the snapshot. When I then try to load
> the snapshot with an updated schema (adding the nullable field) it fails.
> Schema looks like this:
>
> {"namespace": "io.relayr.flink",
>  "type": "record",
>  "name": "Accumulator",
>  "fields": [
>  {"name": "accumulator", "type": "int"},
>  {"name": "newStuff", "type": ["int", "null"]}
>  ]
> }
>
> When I try to restart the Job from the snapshot, I get the following
> exception:
> 2018-04-17 09:35:23,519 WARN  org.apache.flink.api.common.ty
> peutils.TypeSerializerSerializationUtil  - Deserialization of serializer
> errored; replacing with null.
> java.io.IOException: Unloadable class for type serializer.
> ...
> Caused by: java.io.InvalidClassException: io.relayr.flink.Accumulator;
> local class incompatible: stream classdesc serialVersionUID =
> -3555733236161157838, local class serialVersionUID = 5291033088112484292
>
> Which is true, Avro tools do generate a new serialization ID for the bean,
> I just didn't expect it to be used and expected the Avro schema to be used
> instead? Did anyone get this working? What am I getting wrong?
>
> Best regards,
> Petter
>
>
>
/**
 * Autogenerated by Avro
 *
 * DO NOT EDIT DIRECTLY
 */
package io.relayr.flink;

import org.apache.avro.specific.SpecificData;
import org.apache.avro.message.BinaryMessageEncoder;
import org.apache.avro.message.BinaryMessageDecoder;
import org.apache.avro.message.SchemaStore;

@SuppressWarnings("all")
@org.apache.avro.specific.AvroGenerated
public class Accumulator extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord {
  private static final long serialVersionUID = 5291033088112484292L;
  public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"Accumulator\",\"namespace\":\"io.relayr.flink\",\"fields\":[{\"name\":\"accumulator\",\"type\":\"int\"},{\"name\":\"newStuff\",\"type\":[\"int\",\"null\"]}]}");
  public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; }

  private static SpecificData MODEL$ = new SpecificData();

  private static final BinaryMessageEncoder ENCODER =
  new BinaryMessageEncoder(MODEL$, SCHEMA$);

  private static final BinaryMessageDecoder DECODER =
  new BinaryMessageDecoder(MODEL$, SCHEMA$);

  /**
   * Return the BinaryMessageDecoder instance used by this class.
   */
  public static BinaryMessageDecoder getDecoder() {
return DECODER;
  }

  /**
   * Create a new BinaryMessageDecoder instance for this class that uses the specified {@link SchemaStore}.
   * @param resolver a {@link SchemaStore} used to find schemas by fingerprint
   */
  public static BinaryMessageDecoder createDecoder(SchemaStore resolver) {
return new BinaryMessageDecoder(MODEL$, SCHEMA$, resolver);
  }

  /** Serializes this Accumulator to a ByteBuffer. */
  public java.nio.ByteBuffer toByteBuffer() throws java.io.IOException {
return ENCODER.encode(this);
  }

  /** Deserializes a Accumulator from a ByteBuffer. */
  public static Accumulator fromByteBuffer(
  java.nio.ByteBuffer b) throws java.io.IOException {
return DECODER.decode(b);
  }

  @Deprecated public int accumulator;
  @Deprecated public java.lang.Integer newStuff;

  /**
   * Default constructor.  Note that this does not initialize fields
   * to their default values from the schema.  If that is desired then
   * one should use newBuilder().
   */
  public Accumulator() {}

  /**
   * All-args constructor.
   * @param accumulator The new value for accumulator
   * @param newStuff The new value for newStuff
   */
  public Accumulator(java.lang.Integer accumulator, java.lang.Integer newStuff) {
this.accumulator = accumulator;
this.newStuff = newStuff;
  }

  public org.apache.avro.Schema getSchema() { return SCHEMA$; }
  // Used by DatumWriter.  Applications should not call.
  public java.lang.Object get(int field$) {
switch (field$) {
case 0: return accumulator;
case 1: return 

Re: CaseClassSerializer and/or TraversableSerializer may still not be threadsafe?

2018-04-18 Thread Stefan Richter
Hi,

I agree that this looks like a serializer is shared between two threads, one of 
them being the event processing loop. I am doubting that the problem is with 
the async fs backend, because there is code in place that will duplicate all 
serializers for the async snapshot thread and this is also checked by a test. 
Furthermore, the code of this backend was not changed in ways that could affect 
this behaviour since many month and there have not been similar reports. The 
fact that it works for async snapshots with RocksDB does not have any 
implcations for the fs backend, because the RocksDB backend does not use any 
serializers to write snapshots. Mutating the bitset objects should also not be 
a problem, this would not cause such an exception in a serializer.

I find it more likely that there is still some problem with the duplication of 
stateful serializers, especially since your serializer looks like a more 
complex, nested structure. I have quickly checked through 
TraversableSerializer, CaseClassSerializer, and KryoSerializer but could not 
spot any obvious errors in their deep copy code. Except there might be a (known 
and discussed) case that registering stateful default serializers for Kryo can 
cause problems. Are you registering default serializers for Kryo?

As a further step to debug this problem, could you run this job with debug 
logging on the flink 1.5 RC? This would show if the problem is still present 
and flink 1.5 would also generate better logs to debug this problem.

Best,
Stefan

> Am 17.04.2018 um 18:14 schrieb joshlemer :
> 
> Hello all, I am running Flink 1.4.0 on Amazon EMR, and find that asynchronous
> snapshots fail when using the Filesystem back-end. Synchronous snapshots
> succeed, and RocksDB snapshots succeed (both async and sync), but async
> Filesystem snapshots fail with this error:
> 
> java.lang.IndexOutOfBoundsException: Index: 0, Size: 0
>at java.util.ArrayList.rangeCheck(ArrayList.java:657)
>at java.util.ArrayList.set(ArrayList.java:448)
>at
> com.esotericsoftware.kryo.util.MapReferenceResolver.setReadObject(MapReferenceResolver.java:56)
>at com.esotericsoftware.kryo.Kryo.reference(Kryo.java:875)
>at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:710)
>at
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:189)
>at
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:101)
>at
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32)
>at
> org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$copy$1.apply(TraversableSerializer.scala:69)
>at
> org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$copy$1.apply(TraversableSerializer.scala:69)
>at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>at
> scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>at
> org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:69)
>at
> org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:33)
>at
> org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:282)
>at
> org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:306)
>at
> org.apache.flink.runtime.state.heap.HeapValueState.value(HeapValueState.java:55)
>at
> net.districtm.segmentsync.processing.JoinSegmentMappingWithSegmentAssignments.enqueueSegmentAssignment(JoinSegmentMappingWithSegmentAssignments.scala:102)
>at
> net.districtm.segmentsync.processing.JoinSegmentMappingWithSegmentAssignments.processElement2(JoinSegmentMappingWithSegmentAssignments.scala:218)
>at
> net.districtm.segmentsync.processing.JoinSegmentMappingWithSegmentAssignments.processElement2(JoinSegmentMappingWithSegmentAssignments.scala:76)
>at
> org.apache.flink.streaming.api.operators.co.KeyedCoProcessOperator.processElement2(KeyedCoProcessOperator.java:86)
>at
> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:270)
>at
> org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.run(TwoInputStreamTask.java:91)
>at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)
>at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>at java.lang.Thread.run(Thread.java:748)
> 
> This stack trace occurs when I am trying to access the value of a
> 
> `ValueState[scala.collection.mutable.PriorityQueue[(AJavaObjectThatUsesTwitterChillProtoSerialization,
> Long, scala.collection.mutable.BitSet)]` 

Re: rest.port is reset to 0 by YarnEntrypointUtils

2018-04-18 Thread Dongwon Kim
Hi Gary,

Thanks a lot for replay.
Hope the issue is resolved soon.

I have a suggestion regarding the rest port.
Considering the role of dispatcher, it needs to have its own port range that is 
not shared by job managers spawned by dispatcher.
If I understand FLIP-6 correctly, only a few dispatchers are going to be 
launched as a kind of gateway and they should be visible outside the cluster.
it seems quite reasonable to give them a range of ports that are only consumed 
by dispatcher.

Best,

- Dongwon



> 2018. 4. 18. 오후 6:40, Gary Yao  작성:
> 
> Hi Dongwon,
> 
> I think the rationale was to avoid conflicts between multiple Flink instances
> running on the same YARN cluster. There is a ticket that proposes to allow
> configuring a port range instead [1]. 
>   
> Best,
> Gary
> 
> [1] https://issues.apache.org/jira/browse/FLINK-5758 
> 
> 
> On Tue, Apr 17, 2018 at 9:56 AM, Dongwon Kim  > wrote:
> Hi,
> 
> I'm trying to launch a dispatcher on top of YARN by executing 
> "yarn-session.sh" on the command line.
> 
> To access the rest endpoint outside the cluster, I need to assign a port from 
> an allowed range.
> 
> YarnEntrypointUtils, however, sets rest.port to 0 for random binding.
> 
> Is there any reason on it?
> 
> 
> Best,
> 
> - Dongwon
> 



Re: rest.port is reset to 0 by YarnEntrypointUtils

2018-04-18 Thread Gary Yao
Hi Dongwon,

I think the rationale was to avoid conflicts between multiple Flink
instances
running on the same YARN cluster. There is a ticket that proposes to allow
configuring a port range instead [1].

Best,
Gary

[1] https://issues.apache.org/jira/browse/FLINK-5758

On Tue, Apr 17, 2018 at 9:56 AM, Dongwon Kim  wrote:

> Hi,
>
> I'm trying to launch a dispatcher on top of YARN by executing
> "yarn-session.sh" on the command line.
>
> To access the rest endpoint outside the cluster, I need to assign a port
> from an allowed range.
>
> YarnEntrypointUtils, however, sets rest.port to 0 for random binding.
>
> Is there any reason on it?
>
>
> Best,
>
> - Dongwon


Managing state migrations with Flink and Avro

2018-04-18 Thread Petter Arvidsson
Hello everyone,

I am trying to figure out how to set up Flink with Avro for state
management (especially the content of snapshots) to enable state migrations
(e.g. adding a nullable fields to a class). So far, in version 1.4.2, I
tried to explicitly provide an instance of "new
AvroTypeInfo(Accumulator.getClass())" where accumulator is a very simple
Avro generated SpecificRecordBase of the following schema:

{"namespace": "io.relayr.flink",
 "type": "record",
 "name": "Accumulator",
 "fields": [
 {"name": "accumulator", "type": "int"}
 ]
}

This successfully saves the state to the snapshot. When I then try to load
the snapshot with an updated schema (adding the nullable field) it fails.
Schema looks like this:

{"namespace": "io.relayr.flink",
 "type": "record",
 "name": "Accumulator",
 "fields": [
 {"name": "accumulator", "type": "int"},
 {"name": "newStuff", "type": ["int", "null"]}
 ]
}

When I try to restart the Job from the snapshot, I get the following
exception:
2018-04-17 09:35:23,519 WARN  org.apache.flink.api.common.typeutils.
TypeSerializerSerializationUtil  - Deserialization of serializer errored;
replacing with null.
java.io.IOException: Unloadable class for type serializer.
...
Caused by: java.io.InvalidClassException: io.relayr.flink.Accumulator;
local class incompatible: stream classdesc serialVersionUID =
-3555733236161157838, local class serialVersionUID = 5291033088112484292

Which is true, Avro tools do generate a new serialization ID for the bean,
I just didn't expect it to be used and expected the Avro schema to be used
instead? Did anyone get this working? What am I getting wrong?

Best regards,
Petter


RE: State-machine-based search logic in Flink ?

2018-04-18 Thread Esa Heikkinen
Hi

I did mean like “finding series of consecutive events”, as it was described in 
[2].

Are these features already in Flink and how well they are documented ?

Can I use Scala or only Java ?

I would like some example codes, it they are exist ?

Best, Esa

From: Fabian Hueske 
Sent: Tuesday, April 17, 2018 1:41 PM
To: Esa Heikkinen 
Cc: user@flink.apache.org
Subject: Re: State-machine-based search logic in Flink ?

Hi Esa,
What do you mean by "individual searches in the Table API"?
There is some work (a pending PR [1]) to integrate the MATCH_RECOGNIZE clause 
(SQL 2016) [2] into Flink's SQL which basically adds a SQL syntax for the CEP 
library.
Best, Fabian

[1] https://github.com/apache/flink/pull/4502
[2] https://modern-sql.com/feature/match_recognize

2018-04-17 10:07 GMT+02:00 Esa Heikkinen 
>:
Hi

I am not sure I have understand all, but it is possible to build some kind of 
state-machine-based search logic for
example on top of the individual searches in Table API (using CsvTableSource) ?

Best, Esa




Re: why doesn't the over-window-aggregation sort the element(considering watermark) before processing?

2018-04-18 Thread Fabian Hueske
The over window operates on an unbounded stream of data. Hence it is not
possible to sort the complete stream.
Instead we can sort ranges of the stream. Flink uses watermarks to define
these ranges.

The operator processes the records in timestamp order that are not late,
i.e., have timestamps larger than the last watermark.
In principle there are different ways to handle records that violate this
condition. In the current implementation of the operator we simply drop
these records.

At the current state, the only thing to avoid records from being dropped is
to use more conservative watermarks. Note that this will increase the
processing latency.

Best, Fabian

2018-04-18 8:55 GMT+02:00 Yan Zhou [FDS Science] :

> Hi,
>
>
> I use bounded over-window  aggregation in my application. However,
> sometimes some input elements are "discarded" and not generating output. By
> reading the source code of *RowTimeBoundedRangeOver.scala, *I realize the
> record is actually discarded if it is out of order. Please see the quoted
> code block below. Please help me to understand why don't we sort the record
> first? Said we are using *BoundedOutOfOrdernessTimestampExtractor*. we
> can use watermark to select a portion of the elements to do the sorting.
> when watermark proceeds, process the elements that are before the watermark
> and extend the portion of elements for sorting.
>
>
>
> Best
>
> Yan
>
>
>
> *override def processElement(*
> *inputC: CRow,*
> *ctx: ProcessFunction[CRow, CRow]#Context,*
> *out: Collector[CRow]): Unit = {*
> *// triggering timestamp for trigger calculation*
> *val triggeringTs = input.getField(rowTimeIdx).asInstanceOf[Long]*
>
> *val lastTriggeringTs = lastTriggeringTsState.value*
>
> *// check if the data is expired, if not, save the data and register event
> time timer*
> *if (triggeringTs > lastTriggeringTs) {*
> *// put in cache, and register timer to process/clean*
> *// ...*
> *}else{*
> *// DISCARD*
> *}*
> *} *
>
>


Re: assign time attribute after first window group when using Flink SQL

2018-04-18 Thread Fabian Hueske
This sounds like a windowed join between the raw stream and the aggregated
stream.
It might be possible to do the "lookup" in the second raw stream with
another windowed join. If not, you can fall back to the DataStream API /
ProcessFunction and implement the lookup logic as you need it.

Best, Fabian

2018-04-18 3:03 GMT+02:00 Ivan Wang :

> Thanks Fabian. I tried to use "rowtime" and Flink tells me below exception:
>
> *Exception in thread "main"
> org.apache.flink.table.api.ValidationException: SlidingGroupWindow('w2,
> 'end, 150.rows, 1.rows) is invalid: Event-time grouping windows on row
> intervals in a stream environment are currently not supported.*
>
> Then I tried to OverWindows, luckily it can serve my requirement as well.
> Now my table query is like below
>
> .window(Tumble.over("15.seconds").on("timeMill").as("w1"))
> .groupBy("symbol, w1").select("(w1.rowtime) as end, symbol, price.max as 
> p_max, price.min as p_min")
> .window(Over.partitionBy("symbol").orderBy("end").preceding("149.rows").as("w2"))
> .select("symbol as symbol_, end, p_max.max over w2 as max, p_min.min over w2 
> as min");
>
>
> It works and I can get what I want. However, the result is not ordered by
> the rowtime (here I use "end" as alias). Is this by default and any thing
> to get it ordered?
>
> Below is the entire requirement,
>
> Basically there's one raw stream (r1), and I group it first by time as w1
> then by window count as w2. I'd like to compare the "price" field in every
> raw event with the same field in the most close preceding event in w2.
> If condition meets, I'd like to use the price value and timestamp in that
> event to get one matching event from another raw stream (r2).
>
> CEP sounds to be a good idea. But I need to refer to event in other stream
> (r2) in current pattern condition (r1). Is it possible to do this using CEP?
>
> Thanks
> Ivan
>
>
>
> On Mon, Apr 16, 2018 at 4:01 PM, Fabian Hueske  wrote:
>
>> Sorry, I forgot to CC the user mailing list in my reply.
>>
>> 2018-04-12 17:27 GMT+02:00 Fabian Hueske :
>>
>>> Hi,
>>>
>>> Assuming you are using event time, the right function to generate a row
>>> time attribute from a window would be "w1.rowtime" instead of "w1.start".
>>>
>>> The reason why Flink is picky about this is that we must ensure that the
>>> result rows of the windows are aligned with the watermarks of the stream.
>>>
>>> Best, Fabian
>>>
>>>
>>> Ivan Wang  schrieb am So., 8. Apr. 2018, 22:26:
>>>
 Hi all,



 I'd like to use 2 window group in a chain in my program as below.



 Table myTable = cTable
 .window(Tumble.*over*("15.seconds").on("timeMill").as("w1"))
 .groupBy("symbol, w1").select("w1.start as start, w1.end as
 end, symbol, price.max as p_max, price.min as p_min")
 .window(Slide.*over*("150.rows").every("1.rows").on("start").a
 s("w2"))
 .groupBy("symbol, w2").select("w2.start, w2.end, symbol,
 p_max.max, p_min.min")
 ;





 However, it throws error: SlidingGroupWindow('w2, 'start, 150.rows,
 1.rows) is invalid: Sliding window expects a time attribute for grouping in
 a stream environment.

  at org.apache.flink.table.plan.lo
 gical.LogicalNode.failValidation(LogicalNode.scala:149)

  at org.apache.flink.table.plan.lo
 gical.WindowAggregate.validate(operators.scala:658)

  at org.apache.flink.table.api.Win
 dowGroupedTable.select(table.scala:1159)

  at org.apache.flink.table.api.Win
 dowGroupedTable.select(table.scala:1179)

  at minno.gundam.ReadPattern.main(ReadPattern.java:156)



 Is there any way to assign time attribute after the first groupBy (w1)?



 Thanks

 Ivan




>>
>


why doesn't the over-window-aggregation sort the element(considering watermark) before processing?

2018-04-18 Thread Yan Zhou [FDS Science]
Hi,


I use bounded over-window  aggregation in my application. However, sometimes 
some input elements are "discarded" and not generating output. By reading the 
source code of RowTimeBoundedRangeOver.scala, I realize the record is actually 
discarded if it is out of order. Please see the quoted code block below. Please 
help me to understand why don't we sort the record first? Said we are using 
BoundedOutOfOrdernessTimestampExtractor. we can use watermark to select a 
portion of the elements to do the sorting. when watermark proceeds, process the 
elements that are before the watermark and extend the portion of elements for 
sorting.



Best

Yan



override def processElement(
inputC: CRow,
ctx: ProcessFunction[CRow, CRow]#Context,
out: Collector[CRow]): Unit = {
// triggering timestamp for trigger calculation
val triggeringTs = input.getField(rowTimeIdx).asInstanceOf[Long]

val lastTriggeringTs = lastTriggeringTsState.value

// check if the data is expired, if not, save the data and register event time 
timer
if (triggeringTs > lastTriggeringTs) {
// put in cache, and register timer to process/clean
// ...
}else{
// DISCARD
}
}



Re: Need Help/Code Examples with reading/writing Parquet File with Flink ?

2018-04-18 Thread Jörn Franke
You can use the corresponding HadoopInputformat within Flink 

> On 18. Apr 2018, at 07:23, sohimankotia  wrote:
> 
> Hi ..
> 
> I have file in hdfs in format file.snappy.parquet . Can someone please
> point/help with code example of reading parquet files .
> 
> 
> -Sohi
> 
> 
> 
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


masters file only needed when using start-cluster.sh script?

2018-04-18 Thread David Corley
The HA documentation is a little confusing in that it suggests JM
registration and discovery is done via Zookeeper, but it also recommends
creating a static `masters` file listing all JMs.
The only use I can currently see for the masters file is by the
`start-cluster.sh` script.
Thus, if we're not using that script, do we need the masters file at all?


Re: Need Help/Code Examples with reading/writing Parquet File with Flink ?

2018-04-18 Thread Shuyi Chen
AFA I remember, there is no ParquetInputFormat in Flink. But there is a JIRA
 logged and an attempt in
this PR , but was never
merged. We do have an internal implementation that is being used in our
company, and we can contribute back to the community.

+Peter, could we help check how we could contribute the ParquetInputFormat
implementation back?

Also, FYI, if you want to read parquet with Flink SQL, here is the JIRA
 for adding
ParquetTableSource.

On Tue, Apr 17, 2018 at 10:23 PM, sohimankotia 
wrote:

> Hi ..
>
> I have file in hdfs in format file.snappy.parquet . Can someone please
> point/help with code example of reading parquet files .
>
>
> -Sohi
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.
> nabble.com/
>



-- 
"So you have to trust that the dots will somehow connect in your future."