Re: Accessing StateBackend snapshots outside of Flink

2016-11-02 Thread bwong247
We're currently investigating Flink, and one of the features that we'd like
to have is a TTL feature to time out older values in state.  I saw this
thread and it sounds like the functionality was being considered.  Is there
any update?

 



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Accessing-StateBackend-snapshots-outside-of-Flink-tp6116p9846.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Link read avro from Kafka Connect Issue

2016-11-02 Thread Will Du
Hi folks,
I am trying to consume avro data from Kafka in Flink. The data is produced by 
Kafka connect using AvroConverter. I have created a 
AvroDeserializationSchema.java 
 used by 
Flink consumer. Then, I use following code to read it.

public static void main(String[] args) throws Exception {
  StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
  Properties properties = new Properties();
  properties.setProperty("bootstrap.servers", “localhost:9092");
  properties.setProperty("zookeeper.connect", “localhost:2181”);
Schema schema = new Parser().parse("{" + "\"name\": \"test\", "
   + "\"type\": \"record\", "
   + "\"fields\": "
   +" [ "
   + "  { \"name\": \"name\", \"type\": 
\"string\" },"
   + "  { \"name\": \"symbol\", 
\"type\": \"string\" },"
   + "  { \"name\": \"exchange\", 
\"type\": \"string\"}"
   + "] "
   +"}");

  AvroDeserializationSchema avroSchema = new 
AvroDeserializationSchema<>(schema);
  FlinkKafkaConsumer09 kafkaConsumer = 
new FlinkKafkaConsumer09<>("myavrotopic",avroSchema, 
properties);
  DataStream messageStream = 
env.addSource(kafkaConsumer);
  messageStream.rebalance().print();
  env.execute("Flink AVRO KAFKA Test");
}

Once, I run the code, I am able to get the schema information only as follows.
{"name":"", "symbol":"", "exchange":""}
{"name":"", "symbol":"", "exchange":""}
{"name":"", "symbol":"", "exchange":""}
{"name":"", "symbol":"", "exchange":”"}

Could anyone help to find out the issues why I cannot decode it?

Further troubleshooting, I found out if I use a kafka producer here 
 to send 
the avro data especially using kafka.serializer.DefaultEncoder. Above code can 
get correct result. Does any body know how to either set DefaultEncoder in 
Kafka Connect or set it when writing customized kafka connect? Or in the other 
way, how should I modify the AvroDeserializationSchema.java for instead?

Thanks, I’ll post this to the Kafka user group as well.
Will






Reg. custom sink for Flink streaming

2016-11-02 Thread Sandeep Vakacharla
Hi there,

I have the following use case-

I have data coming from Kafka which I need to stream and write each message to 
a database. I’m using kafka-flink connector for streaming data from Kafka. I 
don’t want to use flink sinks to write date from stream.

I’m doing the following which doesn’t seem to work-

messageStream
.rebalance()
.map(new MapFunction() {
@Override
public String map(String value) {
getDbSession().execute("insert into TABLE_XXX (key, 
event_timeuuid, data) " +
"VALUES ("+ i+",null, value); ");
return value;
}
})

How can I iterate over each message in the stream and do something with that 
message?

Thanks

Information contained in this e-mail message is confidential. This e-mail 
message is intended only for the personal use of the recipient(s) named above. 
If you are not an intended recipient, do not read, distribute or reproduce this 
transmission (including any attachments). If you have received this email in 
error, please immediately notify the sender by email reply and delete the 
original message.


Re: Looping over a DataSet and accesing another DataSet

2016-11-02 Thread otherwise777
I did mean the iteratino yes, I currently solved the problem by rewriting the
algorithm in gelly's GathersumApply model, thnx for the tips

I had another question regarding the original message, about appending items
to a list, how would I do that? Because afaik it's not possible to add a
list or array in a Tuple element right?





--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Looping-over-a-DataSet-and-accesing-another-DataSet-tp9778p9843.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: watermark trigger doesn't check whether element's timestamp is passed

2016-11-02 Thread Manu Zhang
Thanks, that will be great. I'd like to test against my particular use
cases once your PR is available.

Manu

On Wed, Nov 2, 2016 at 11:09 PM Ventura Del Monte 
wrote:

> Hello,
>
> I have just opened the JIRA issue
>  and I have almost
> completed the implementation of this feature. I will keep you posted :)
>
> Cheers,
> Ventura
>
>
>
> This message, for the D. Lgs n. 196/2003 (Privacy Code), may contain
> confidential and/or privileged information. If you are not the addressee or
> authorized to receive this for the addressee, you must not use, copy,
> disclose or take any action based on this message or any information
> herein. If you have received this message in error, please advise the
> sender immediately by reply e-mail and delete this message. Thank you for
> your cooperation.
>
> On Wed, Nov 2, 2016 at 2:18 PM, Aljoscha Krettek 
> wrote:
>
> Hi,
> a contributor (Bonaventure Del Monte) has started working on this. He
> should open a Jira this week.
>
> Cheer,
> Aljoscha
>
> On Tue, 1 Nov 2016 at 23:57 aj heller  wrote:
>
> Hi Manu, Aljoscha,
>
> I had been interested in implementing FLIP-2, but I haven't been able to
> make time for it. There is no implementation yet that I'm aware of, and
> I'll gladly step aside (or help out how I can) if you or anyone is
> interested to take charge of it.
>
> That said, I'm also not sure if discussions are ongoing. I had hoped to
> prototype the proposal as is, to have something more concrete to discuss.
>
> Cheers,
> aj
> On Nov 1, 2016 3:24 PM, "Manu Zhang"  wrote:
>
> Thanks.  The ideal case is to fire after watermark past each element from
> the window but that requires a custom trigger and FLIP-2 as well. The
> enhanced window evictor will help to avoid the last firing.
>
> Are the discussions on FLIP-2 still going on ?
> Are there any opening JIRAs or PRs ? (The proposed `ProcessWindowFunction`
> will be sufficient for my case)
> Is there a workaround now for my case ?
>
> Thanks again for following through this.
> Manu
>
> On Wed, Nov 2, 2016 at 1:07 AM Aljoscha Krettek 
> wrote:
>
> Ah, I finally understand it. You would a way to query the current
> watermark in the window function to only emit those elements where the
> timestamp is lower than the watermark.
>
> When the window fires again, do you want to emit elements that you emitted
> during the last firing again? If not, I think you also need to use an
> evictor to evict the elements from the window where the timestamp is lower
> than the watermark. With this FLIP
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-2+Extending+Window+Function+Metadata
>  we
> should be able to extend the WindowFunction Context to also provide the
> current watermark. With this recent PR
> https://github.com/apache/flink/pull/2736 you would be able to evict
> elements from the window state after the window function was called.
>
> Cheers,
> Aljoscha
>
> On Tue, 1 Nov 2016 at 02:27 Manu Zhang  wrote:
>
> Yes, here's the example
> https://github.com/manuzhang/flink/blob/pv/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/session/PageViewSessionWindowing.scala
>
> If you print and compare the timestamp of timer with that of "PageView" in
> the outputs, you could see what I mean.
>
> I think the recently introduced TimelyFlatMapFunction is close to what I
> want to achieve. It will be great if we can query time information in the
> window function so I filed
> https://issues.apache.org/jira/browse/FLINK-4953
>
> Thanks for your time.
>
> Manu
>
> On Mon, Oct 31, 2016 at 10:29 PM Aljoscha Krettek 
> wrote:
>
> Hmm, I don't completely understand what's going on. Could you maybe post
> an example, with the trigger code that shows this behaviour?
>
> Cheers,
> Aljoscha
>
> On Thu, 27 Oct 2016 at 17:12 Manu Zhang  wrote:
>
> Hi,
>
> It's what I'm seeing. If timers are not fired at the end of window, a
> state (in the window) whose timestamp is *after *the timer will also be
> emitted. That's a problem for event-time trigger.
>
> Thanks,
> Manu
>
>
> On Thu, Oct 27, 2016 at 10:29 PM Aljoscha Krettek 
> wrote:
>
> Hi,
> is that example input/output what you would like to achieve or what you
> are currently seeing with Flink? I think for your use case a custom Trigger
> would be required that works like the event-time trigger but additionally
> registers timers for each element where you want to emit.
>
> Cheers,
> Aljoscha
>
> On Wed, 26 Oct 2016 at 04:04 Manu Zhang  wrote:
>
> Hi Aljoscha,
>
> Thanks for your response.  My use case is to track user trajectory based
> on page view event when they visit a website.  The input would be like a
> list of PageView(userId, url, eventTimestamp) with 

Re: Kinesis Connector Dependency Problems

2016-11-02 Thread Justin Yan
Sorry it took me a little while, but I'm happy to report back that it seems
to be working properly with EMR 4.8.  It seems so obvious in retrospect...
thanks again for the assistance!

Cheers,

Justin

On Tue, Nov 1, 2016 at 11:44 AM, Robert Metzger  wrote:

> Hi Justin,
>
> thank you for sharing the classpath of the Flink container with us. It
> contains what Till was already expecting: An older version of the AWS SDK.
>
> If you have some spare time, could you quickly try to run your program
> with a newer EMR version, just to validate our suspicion?
> If the error doesn't occur on a more recent EMR version, then we know why
> its happening.
>
> We'll then probably need to shade (relocate) the Kinesis code to make it
> work with older EMR libraries.
>
> Regards,
> Robert
>
>
> On Tue, Nov 1, 2016 at 6:27 PM, Justin Yan  wrote:
>
>> Hi there,
>>
>> We're using EMR 4.4.0 -> I suppose this is a bit old, and I can migrate
>> forward if you think that would be best.
>>
>> I've appended the classpath that the Flink cluster was started with at
>> the end of this email (with a slight improvement to the formatting to make
>> it readable).
>>
>> Willing to poke around or fiddle with this as necessary - thanks very
>> much for the help!
>>
>> Justin
>>
>> Task Manager's classpath from logs:
>>
>> lib/flink-dist_2.11-1.1.3.jar
>> lib/flink-python_2.11-1.1.3.jar
>> lib/log4j-1.2.17.jar
>> lib/slf4j-log4j12-1.7.7.jar
>> logback.xml
>> log4j.properties
>> flink.jar
>> flink-conf.yaml
>> /etc/hadoop/conf
>> /usr/lib/hadoop/hadoop-annotations-2.7.1-amzn-1.jar
>> /usr/lib/hadoop/hadoop-extras.jar
>> /usr/lib/hadoop/hadoop-archives-2.7.1-amzn-1.jar
>> /usr/lib/hadoop/hadoop-aws-2.7.1-amzn-1.jar
>> /usr/lib/hadoop/hadoop-sls-2.7.1-amzn-1.jar
>> /usr/lib/hadoop/hadoop-auth-2.7.1-amzn-1.jar
>> /usr/lib/hadoop/hadoop-sls.jar
>> /usr/lib/hadoop/hadoop-gridmix.jar
>> /usr/lib/hadoop/hadoop-auth.jar
>> /usr/lib/hadoop/hadoop-gridmix-2.7.1-amzn-1.jar
>> /usr/lib/hadoop/hadoop-rumen.jar
>> /usr/lib/hadoop/hadoop-azure-2.7.1-amzn-1.jar
>> /usr/lib/hadoop/hadoop-common-2.7.1-amzn-1.jar
>> /usr/lib/hadoop/hadoop-azure.jar
>> /usr/lib/hadoop/hadoop-datajoin-2.7.1-amzn-1.jar
>> /usr/lib/hadoop/hadoop-nfs.jar
>> /usr/lib/hadoop/hadoop-aws.jar
>> /usr/lib/hadoop/hadoop-streaming-2.7.1-amzn-1.jar
>> /usr/lib/hadoop/hadoop-archives.jar
>> /usr/lib/hadoop/hadoop-openstack.jar
>> /usr/lib/hadoop/hadoop-distcp.jar
>> /usr/lib/hadoop/hadoop-annotations.jar
>> /usr/lib/hadoop/hadoop-distcp-2.7.1-amzn-1.jar
>> /usr/lib/hadoop/hadoop-streaming.jar
>> /usr/lib/hadoop/hadoop-rumen-2.7.1-amzn-1.jar
>> /usr/lib/hadoop/hadoop-common.jar
>> /usr/lib/hadoop/hadoop-nfs-2.7.1-amzn-1.jar
>> /usr/lib/hadoop/hadoop-common-2.7.1-amzn-1-tests.jar
>> /usr/lib/hadoop/hadoop-ant-2.7.1-amzn-1.jar
>> /usr/lib/hadoop/hadoop-datajoin.jar
>> /usr/lib/hadoop/hadoop-ant.jar
>> /usr/lib/hadoop/hadoop-extras-2.7.1-amzn-1.jar
>> /usr/lib/hadoop/hadoop-openstack-2.7.1-amzn-1.jar
>> /usr/lib/hadoop/lib/jackson-xc-1.9.13.jar
>> /usr/lib/hadoop/lib/api-asn1-api-1.0.0-M20.jar
>> /usr/lib/hadoop/lib/curator-client-2.7.1.jar
>> /usr/lib/hadoop/lib/jackson-mapper-asl-1.9.13.jar
>> /usr/lib/hadoop/lib/commons-io-2.4.jar
>> /usr/lib/hadoop/lib/jackson-jaxrs-1.9.13.jar
>> /usr/lib/hadoop/lib/log4j-1.2.17.jar
>> /usr/lib/hadoop/lib/junit-4.11.jar
>> /usr/lib/hadoop/lib/apacheds-i18n-2.0.0-M15.jar
>> /usr/lib/hadoop/lib/commons-cli-1.2.jar
>> /usr/lib/hadoop/lib/curator-recipes-2.7.1.jar
>> /usr/lib/hadoop/lib/xmlenc-0.52.jar
>> /usr/lib/hadoop/lib/zookeeper-3.4.6.jar
>> /usr/lib/hadoop/lib/jsr305-3.0.0.jar
>> /usr/lib/hadoop/lib/htrace-core-3.1.0-incubating.jar
>> /usr/lib/hadoop/lib/httpclient-4.3.4.jar
>> /usr/lib/hadoop/lib/jettison-1.1.jar
>> /usr/lib/hadoop/lib/commons-beanutils-1.7.0.jar
>> /usr/lib/hadoop/lib/commons-math3-3.1.1.jar
>> /usr/lib/hadoop/lib/jersey-core-1.9.jar
>> /usr/lib/hadoop/lib/httpcore-4.3.2.jar
>> /usr/lib/hadoop/lib/commons-compress-1.4.1.jar
>> /usr/lib/hadoop/lib/asm-3.2.jar
>> /usr/lib/hadoop/lib/slf4j-api-1.7.10.jar
>> /usr/lib/hadoop/lib/xz-1.0.jar
>> /usr/lib/hadoop/lib/commons-collections-3.2.1.jar
>> /usr/lib/hadoop/lib/commons-net-3.1.jar
>> /usr/lib/hadoop/lib/commons-configuration-1.6.jar
>> /usr/lib/hadoop/lib/jetty-util-6.1.26-emr.jar
>> /usr/lib/hadoop/lib/commons-codec-1.4.jar
>> /usr/lib/hadoop/lib/protobuf-java-2.5.0.jar
>> /usr/lib/hadoop/lib/jetty-6.1.26-emr.jar
>> /usr/lib/hadoop/lib/java-xmlbuilder-0.4.jar
>> /usr/lib/hadoop/lib/apacheds-kerberos-codec-2.0.0-M15.jar
>> /usr/lib/hadoop/lib/commons-logging-1.1.3.jar
>> /usr/lib/hadoop/lib/jersey-json-1.9.jar
>> /usr/lib/hadoop/lib/jackson-core-asl-1.9.13.jar
>> /usr/lib/hadoop/lib/gson-2.2.4.jar
>> /usr/lib/hadoop/lib/stax-api-1.0-2.jar
>> /usr/lib/hadoop/lib/commons-digester-1.8.jar
>> /usr/lib/hadoop/lib/servlet-api-2.5.jar
>> /usr/lib/hadoop/lib/curator-framework-2.7.1.jar
>> /usr/lib/hadoop/lib/commons-httpclient-3.1.jar
>> 

Why are externalized checkpoints deleted on Job Manager exit?

2016-11-02 Thread Clifford Resnick
Testing externalized checkpoints in a YARN-based cluster, configured with:

env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

I can confirm that checkpoint is retained between cancelled jobs, however it’s 
deleted when the Job Manager session is gracefully shutdown. We’d really like 
for the persistent checkpoint to be treated like a Savepoint and not be 
deleted. Is there a way to enable this?
 



Flink Application on YARN failed on losing Job Manager | No recovery | Need help debug the cause from logs

2016-11-02 Thread Anchit Jatana
Hi All,

I started my flink application on YARN using flink run -m yarn-cluster,
after running smoothly for 20 hrs it failed. Ideally the application should
have recover on losing the Job Manger (which runs in the same container as
the application master) pertaining to the fault tolerant nature of flink on
YARN but it didn't recover and failed. 

Please help me debug the logs. 

Thank you

Regards,
Anchit

Below are the logs:

2016-11-01 14:12:37,592 INFO  org.apache.flink.runtime.client.JobClientActor
   
- 11/01/2016 14:12:36   Parse & Map Record - (Visitor ID, Product List)  ->
Filtering None Objects -> Fetching Output(148/200) switched to RUNNING 
2016-11-02 10:16:42,960 INFO 
org.apache.hadoop.yarn.client.ConfiguredRMFailoverProxyProvider  - Failing
over to rm1
2016-11-02 10:17:24,026 INFO 
org.apache.hadoop.yarn.client.ConfiguredRMFailoverProxyProvider  - Failing
over to rm2
2016-11-02 10:17:40,882 INFO 
org.apache.hadoop.yarn.client.ConfiguredRMFailoverProxyProvider  - Failing
over to rm1
2016-11-02 10:24:41,964 WARN  akka.remote.ReliableDeliverySupervisor
   
- Association with remote system [akka.tcp://flink@10.66.245.26:47722] has
failed, address is now gated for [5000] ms. Reason is: [Disassociated].
2016-11-02 10:24:56,311 WARN  Remoting  
   
- Tried to associate with unreachable remote address
[akka.tcp://flink@10.66.245.26:47722]. Address is now gated for 5000 ms, all
messages to this address will be delivered to dead letters. Reason:
Connection refused: /10.66.245.26:47722
2016-11-02 10:24:56,315 INFO  org.apache.flink.runtime.client.JobClientActor
   
- Lost connection to JobManager
akka.tcp://flink@10.66.245.26:47722/user/jobmanager. Triggering connection
timeout.
2016-11-02 10:24:56,315 INFO  org.apache.flink.runtime.client.JobClientActor
   
- Disconnect from JobManager
Actor[akka.tcp://flink@10.66.245.26:47722/user/jobmanager#1251121709].
2016-11-02 10:25:56,330 INFO  org.apache.flink.runtime.client.JobClientActor
   
- Terminate JobClientActor.
2016-11-02 10:25:56,331 INFO  org.apache.flink.runtime.client.JobClientActor
   
- Disconnect from JobManager null.
2016-11-02 10:25:56,333 ERROR org.apache.flink.client.CliFrontend   
   
- Error while running the command.
org.apache.flink.client.program.ProgramInvocationException: The program
execution failed: Communication with JobManager failed: Lost connection to
the JobManager.
at
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:405)
at
org.apache.flink.yarn.YarnClusterClient.submitJob(YarnClusterClient.java:204)
at
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:378)
at
org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:68)
at
org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:585)
at
com.tgt.prz.streaming.recs.drivers.SessionRecs2$.main(SessionRecs2.scala:126)
at 
com.tgt.prz.streaming.recs.drivers.SessionRecs2.main(SessionRecs2.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:509)
at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
at
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:320)
at 
org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:777)
at org.apache.flink.client.CliFrontend.run(CliFrontend.java:253)
at org.apache.flink.client.CliFrontend$2.run(CliFrontend.java:997)
at org.apache.flink.client.CliFrontend$2.run(CliFrontend.java:994)
at
org.apache.flink.runtime.security.SecurityUtils$1.run(SecurityUtils.java:56)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1556)
at
org.apache.flink.runtime.security.SecurityUtils.runSecured(SecurityUtils.java:53)
at
org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:994)
at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1048)
Caused by: org.apache.flink.runtime.client.JobExecutionException:
Communication with JobManager failed: Lost connection to the JobManager.
at
org.apache.flink.runtime.client.JobClient.submitJobAndWait(JobClient.java:137)
at

Re: Flink Metrics - InfluxDB + Grafana | Help with query influxDB query for Grafana to plot 'numRecordsIn' & 'numRecordsOut' for each operator/operation

2016-11-02 Thread Anchit Jatana
Hi Jamie,

Thanks for sharing your thoughts. I'll try and integrate with Graphite to
see if this gets resolved.

Regards,
Anchit



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-Metrics-InfluxDB-Grafana-Help-with-query-influxDB-query-for-Grafana-to-plot-numRecordsIn-numRen-tp9775p9838.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Testing DataStreams

2016-11-02 Thread Juan Rodríguez Hortalá
Hi,

I'm new to Flink, and I'm trying to write my first unit test  for a simple
DataStreams job. In
https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/util/package-summary.html
I see several promising classes, but for example I cannot import
org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase from the
artifacts obtained by the following Maven dependencies:

 
org.apache.flink
flink-java
${flink.version}


org.apache.flink
flink-streaming-java_2.10
${flink.version}


org.apache.flink
flink-clients_2.10
${flink.version}


I also see that the page
https://cwiki.apache.org/confluence/display/FLINK/Testing+Utilities+and+Mini+Clusters
is empty. Is there any documentation or tutorial about writing simple unit
tests running in local mode? I'm looking for something similar to
http://blog.cloudera.com/blog/2015/09/making-apache-spark-testing-easy-with-spark-testing-base/,
where you can specify the expected output as a collection to define an
assertion, but for Flink.

By the way I have also implemented source function similar to
StreamExecutionEnvironment.fromElements but that allows to add time gaps
between the generated elements, that I think could be useful for testing,
in case someone is interested
https://github.com/juanrh/flink-state-eviction/blob/master/src/main/java/com/github/juanrh/streaming/source/ElementsWithGapsSource.java.


Thanks,

Juan


Re: TimelyFlatMapFunction and DataStream

2016-11-02 Thread Ken Krugler
Hi Stephan,

> On Nov 2, 2016, at 3:04am, Stephan Ewen  wrote:
> 
> Hi Ken!
> 
> It may not be obvious, so here is a bit of background:
> 
> The timers that are used in the FlatMapFunction are scoped by key. We thought 
> that this is how they are mainly useful - that's why you need to define keys 
> to use them.
> I think the docs are in error, thanks for pointing that out.
> 
> In your use case, do you need timers without keys, or only access to the 
> current processing/event time?

I was hoping to use timers as an alternative approach for async generation of 
tuples. I’ve got several functions that use multi-threading to process the 
incoming tuples. These get put into a queue, processed by threads, and the 
results placed in another queue. With timers it seemed like I could regularly 
flush the output queue to the collector.

So unkeyed data, yes.

Though it wasn’t clear from the docs if/how I would set up a timer that 
regularly fires (say every 100ms).

In any case I can keep using my current approach of having a “tickler” Tuple0 
stream that I use with CoFlatMapFunctions.

Regards,

— Ken
 
> On Wed, Nov 2, 2016 at 1:59 AM, Ken Krugler  > wrote:
> I’m curious why it seems like a TimelyFlatMapFunction can’t be used with a 
> regular DataStream, but it can be used with a KeyedStream.
> 
> Or maybe I’m missing something obvious (this is with 1.2-SNAPSHOT, pulled 
> today).
> 
> Also the documentation of TimelyFlatMapFunction 
> (https://ci.apache.org/projects/flink/flink-docs-master/api/java/index.html?org/apache/flink/streaming/api/functions/TimelyFlatMapFunction.html
>  
> )
>  shows using it with a DataStream.flatMap(xxx) call.
> 
> Thanks,
> 
> — Ken
> 
> --
> Ken Krugler
> +1 530-210-6378 
> http://www.scaleunlimited.com 
> custom big data solutions & training
> Hadoop, Cascading, Cassandra & Solr
> 
> 
> 
> 

--
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
custom big data solutions & training
Hadoop, Cascading, Cassandra & Solr





Re: watermark trigger doesn't check whether element's timestamp is passed

2016-11-02 Thread Ventura Del Monte
Hello,

I have just opened the JIRA issue
 and I have almost
completed the implementation of this feature. I will keep you posted :)

Cheers,
Ventura



This message, for the D. Lgs n. 196/2003 (Privacy Code), may contain
confidential and/or privileged information. If you are not the addressee or
authorized to receive this for the addressee, you must not use, copy,
disclose or take any action based on this message or any information
herein. If you have received this message in error, please advise the
sender immediately by reply e-mail and delete this message. Thank you for
your cooperation.

On Wed, Nov 2, 2016 at 2:18 PM, Aljoscha Krettek 
wrote:

> Hi,
> a contributor (Bonaventure Del Monte) has started working on this. He
> should open a Jira this week.
>
> Cheer,
> Aljoscha
>
> On Tue, 1 Nov 2016 at 23:57 aj heller  wrote:
>
> Hi Manu, Aljoscha,
>
> I had been interested in implementing FLIP-2, but I haven't been able to
> make time for it. There is no implementation yet that I'm aware of, and
> I'll gladly step aside (or help out how I can) if you or anyone is
> interested to take charge of it.
>
> That said, I'm also not sure if discussions are ongoing. I had hoped to
> prototype the proposal as is, to have something more concrete to discuss.
>
> Cheers,
> aj
> On Nov 1, 2016 3:24 PM, "Manu Zhang"  wrote:
>
> Thanks.  The ideal case is to fire after watermark past each element from
> the window but that requires a custom trigger and FLIP-2 as well. The
> enhanced window evictor will help to avoid the last firing.
>
> Are the discussions on FLIP-2 still going on ?
> Are there any opening JIRAs or PRs ? (The proposed `ProcessWindowFunction`
> will be sufficient for my case)
> Is there a workaround now for my case ?
>
> Thanks again for following through this.
> Manu
>
> On Wed, Nov 2, 2016 at 1:07 AM Aljoscha Krettek 
> wrote:
>
> Ah, I finally understand it. You would a way to query the current
> watermark in the window function to only emit those elements where the
> timestamp is lower than the watermark.
>
> When the window fires again, do you want to emit elements that you emitted
> during the last firing again? If not, I think you also need to use an
> evictor to evict the elements from the window where the timestamp is lower
> than the watermark. With this FLIP https://cwiki.apache.org/
> confluence/display/FLINK/FLIP-2+Extending+Window+Function+Metadata we
> should be able to extend the WindowFunction Context to also provide the
> current watermark. With this recent PR https://github.com/apache/
> flink/pull/2736 you would be able to evict elements from the window state
> after the window function was called.
>
> Cheers,
> Aljoscha
>
> On Tue, 1 Nov 2016 at 02:27 Manu Zhang  wrote:
>
> Yes, here's the example https://github.com/manuzhang/flink/blob/pv/flink-
> examples/flink-examples-streaming/src/main/scala/org/
> apache/flink/streaming/scala/examples/session/
> PageViewSessionWindowing.scala
>
> If you print and compare the timestamp of timer with that of "PageView" in
> the outputs, you could see what I mean.
>
> I think the recently introduced TimelyFlatMapFunction is close to what I
> want to achieve. It will be great if we can query time information in the
> window function so I filed https://issues.apache.
> org/jira/browse/FLINK-4953
>
> Thanks for your time.
>
> Manu
>
> On Mon, Oct 31, 2016 at 10:29 PM Aljoscha Krettek 
> wrote:
>
> Hmm, I don't completely understand what's going on. Could you maybe post
> an example, with the trigger code that shows this behaviour?
>
> Cheers,
> Aljoscha
>
> On Thu, 27 Oct 2016 at 17:12 Manu Zhang  wrote:
>
> Hi,
>
> It's what I'm seeing. If timers are not fired at the end of window, a
> state (in the window) whose timestamp is *after *the timer will also be
> emitted. That's a problem for event-time trigger.
>
> Thanks,
> Manu
>
>
> On Thu, Oct 27, 2016 at 10:29 PM Aljoscha Krettek 
> wrote:
>
> Hi,
> is that example input/output what you would like to achieve or what you
> are currently seeing with Flink? I think for your use case a custom Trigger
> would be required that works like the event-time trigger but additionally
> registers timers for each element where you want to emit.
>
> Cheers,
> Aljoscha
>
> On Wed, 26 Oct 2016 at 04:04 Manu Zhang  wrote:
>
> Hi Aljoscha,
>
> Thanks for your response.  My use case is to track user trajectory based
> on page view event when they visit a website.  The input would be like a
> list of PageView(userId, url, eventTimestamp) with watermarks (=
> eventTimestamp - duration). I'm trying SessionWindows with some event time
> trigger. Note we can't wait for the end of session window due to latency.
> Instead, we want to emit the user trajectories whenever a 

Re: NotSerializableException: jdk.nashorn.api.scripting.NashornScriptEngine

2016-11-02 Thread PedroMrChaves
Hello,

I'm having the exact same problem.
I'm using a filter function on a datastream.
My flink version is 1.1.3.

What could be the problem? 


Regards,
Pedro Chaves.



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/NotSerializableException-jdk-nashorn-api-scripting-NashornScriptEngine-tp1496p9834.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Questions about FoldFunction and WindowFunction

2016-11-02 Thread Aljoscha Krettek
Hi Yassine,
I made you a contributor in the Flink Jira so you will be able to assign
issues to yourself in the future. I also assigned this issue to you.

I think you only need to do changes in WindwedStream and AllWindowedStream.
Let me know if you need anything. :-)

Cheers,
Aljoscha

On Wed, 2 Nov 2016 at 14:48 Yassine MARZOUGUI 
wrote:

> Yes, with please. Could you please assign it temporarily to me? (I am not
> very familiar with the internal components of Flink and migh take some time
> before contributing the code, if by the time you are ready to work on it I
> am not yet done, you can reassign it to yourself)
>
> 2016-11-02 14:07 GMT+01:00 Aljoscha Krettek :
>
> Would you be interested in contributing a fix for that? Otherwise I'll
> probably fix work on that in the coming weeks.
>
> On Wed, 2 Nov 2016 at 13:38 Yassine MARZOUGUI 
> wrote:
>
> Thank you Aljoscha for your quick response.
>
> Best,
> Yassine
>
> 2016-11-02 12:30 GMT+01:00 Aljoscha Krettek :
>
> Hi Yassine,
>
> regarding 1. The close() method of the RichFoldFunction will only be
> called at the very end of your streaming job, so in practise it will never
> be called. This is there because of batch jobs, where you have an actual
> end in your processing.
>
> regarding 2. I'm afraid you came across a bug:
> https://issues.apache.org/jira/browse/FLINK-3869. We can't change this
> right now because we cannot break API instability but right at the end of
> this issue I'm proposing a different solution that we'll hopefully get in
> for the next release.
>
> Cheers,
> Aljoscha
>
> On Wed, 2 Nov 2016 at 10:42 Yassine MARZOUGUI 
> wrote:
>
> Hi all,
>
> I have a couple questions about FoldFunction and WindowFunction:
>
> 1. When using a RichFoldFunction after a window as in 
> keyedStream.window().fold(new
> RichFoldFunction()), is the close() method called after each window or
> after all the windows for that key are fired?
>
> 2. When applying a FoldFunction to a window followed by a WindowFunction
> via apply
> (R
>  initialValue,
> FoldFunction
> 
>  ,R>
>  foldFunction,
> WindowFunction
> 
>  
> ,W
> 
> > function), why should the output of the WindowFunction be of the same
> type as the input? It would be practical to have a different output type
> sometimes, for example one would fold tuples in the FoldFunction and then
> process the (only) aggregated tuple in the Window function and emit an
> Integer.
>
> Best,
> Yassine
>
>
>
>


Re: Flink Metrics - InfluxDB + Grafana | Help with query influxDB query for Grafana to plot 'numRecordsIn' & 'numRecordsOut' for each operator/operation

2016-11-02 Thread Jamie Grier
Hi Anchit,

That last bit is very interesting - the fact that it works fine with
subtasks <= 30.  It could be that either Influx or Grafana are not able to
keep up with the data being produced.  I would guess that the culprit is
Grafana if looking at any particular subtask index works fine and only the
full aggregation shows issues.  I'm not familiar enough with Grafana to
know which parts of the queries are "pushed down" to the database and which
are done in Grafana.  This might also very by backend database.

Anecdotally, I've also seen scenarios using Grafana and Influx together
where the system seems to get overwhelmed fairly easily..  I suspect the
Graphite/Grafana combo would work a lot better in production setups.

This might be relevant:

https://github.com/grafana/grafana/issues/2634

-Jamie



On Tue, Nov 1, 2016 at 5:48 PM, Anchit Jatana 
wrote:

> I've set the metric reporting frequency to InfluxDB as 10s. In the
> screenshot, I'm using Grafana query interval of 1s. I've tried 10s and more
> too, the graph shape changes a bit but the incorrect negative values are
> still plotted(makes no difference).
>
> Something to add: If the subtasks are less than equal to 30, the same query
> yields correct results. For subtask index > 30 (for my case being 50) it
> plots junk negative and poistive values.
>
> Regards,
> Anchit
>
>
>
> --
> View this message in context: http://apache-flink-user-
> mailing-list-archive.2336050.n4.nabble.com/Flink-Metrics-
> InfluxDB-Grafana-Help-with-query-influxDB-query-for-
> Grafana-to-plot-numRecordsIn-numRen-tp9775p9819.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>



-- 

Jamie Grier
data Artisans, Director of Applications Engineering
@jamiegrier 
ja...@data-artisans.com


Re: Questions about FoldFunction and WindowFunction

2016-11-02 Thread Yassine MARZOUGUI
Yes, with please. Could you please assign it temporarily to me? (I am not
very familiar with the internal components of Flink and migh take some time
before contributing the code, if by the time you are ready to work on it I
am not yet done, you can reassign it to yourself)

2016-11-02 14:07 GMT+01:00 Aljoscha Krettek :

> Would you be interested in contributing a fix for that? Otherwise I'll
> probably fix work on that in the coming weeks.
>
> On Wed, 2 Nov 2016 at 13:38 Yassine MARZOUGUI 
> wrote:
>
>> Thank you Aljoscha for your quick response.
>>
>> Best,
>> Yassine
>>
>> 2016-11-02 12:30 GMT+01:00 Aljoscha Krettek :
>>
>> Hi Yassine,
>>
>> regarding 1. The close() method of the RichFoldFunction will only be
>> called at the very end of your streaming job, so in practise it will never
>> be called. This is there because of batch jobs, where you have an actual
>> end in your processing.
>>
>> regarding 2. I'm afraid you came across a bug: https://issues.apache.
>> org/jira/browse/FLINK-3869. We can't change this right now because we
>> cannot break API instability but right at the end of this issue I'm
>> proposing a different solution that we'll hopefully get in for the next
>> release.
>>
>> Cheers,
>> Aljoscha
>>
>> On Wed, 2 Nov 2016 at 10:42 Yassine MARZOUGUI 
>> wrote:
>>
>> Hi all,
>>
>> I have a couple questions about FoldFunction and WindowFunction:
>>
>> 1. When using a RichFoldFunction after a window as in
>> keyedStream.window().fold(new RichFoldFunction()), is the close() method
>> called after each window or after all the windows for that key are fired?
>>
>> 2. When applying a FoldFunction to a window followed by a WindowFunction
>> via apply
>> (R
>>  initialValue,
>> FoldFunction
>> 
>> > 
>> ,R> foldFunction, WindowFunction
>> 
>> > 
>> ,W
>> 
>> > function), why should the output of the WindowFunction be of the same
>> type as the input? It would be practical to have a different output type
>> sometimes, for example one would fold tuples in the FoldFunction and then
>> process the (only) aggregated tuple in the Window function and emit an
>> Integer.
>>
>> Best,
>> Yassine
>>
>>
>>


Re: watermark trigger doesn't check whether element's timestamp is passed

2016-11-02 Thread Aljoscha Krettek
Hi,
a contributor (Bonaventure Del Monte) has started working on this. He
should open a Jira this week.

Cheer,
Aljoscha

On Tue, 1 Nov 2016 at 23:57 aj heller  wrote:

Hi Manu, Aljoscha,

I had been interested in implementing FLIP-2, but I haven't been able to
make time for it. There is no implementation yet that I'm aware of, and
I'll gladly step aside (or help out how I can) if you or anyone is
interested to take charge of it.

That said, I'm also not sure if discussions are ongoing. I had hoped to
prototype the proposal as is, to have something more concrete to discuss.

Cheers,
aj
On Nov 1, 2016 3:24 PM, "Manu Zhang"  wrote:

Thanks.  The ideal case is to fire after watermark past each element from
the window but that requires a custom trigger and FLIP-2 as well. The
enhanced window evictor will help to avoid the last firing.

Are the discussions on FLIP-2 still going on ?
Are there any opening JIRAs or PRs ? (The proposed `ProcessWindowFunction`
will be sufficient for my case)
Is there a workaround now for my case ?

Thanks again for following through this.
Manu

On Wed, Nov 2, 2016 at 1:07 AM Aljoscha Krettek  wrote:

Ah, I finally understand it. You would a way to query the current watermark
in the window function to only emit those elements where the timestamp is
lower than the watermark.

When the window fires again, do you want to emit elements that you emitted
during the last firing again? If not, I think you also need to use an
evictor to evict the elements from the window where the timestamp is lower
than the watermark. With this FLIP
https://cwiki.apache.org/confluence/display/FLINK/FLIP-2+Extending+Window+Function+Metadata
we
should be able to extend the WindowFunction Context to also provide the
current watermark. With this recent PR
https://github.com/apache/flink/pull/2736 you would be able to evict
elements from the window state after the window function was called.

Cheers,
Aljoscha

On Tue, 1 Nov 2016 at 02:27 Manu Zhang  wrote:

Yes, here's the example
https://github.com/manuzhang/flink/blob/pv/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/session/PageViewSessionWindowing.scala

If you print and compare the timestamp of timer with that of "PageView" in
the outputs, you could see what I mean.

I think the recently introduced TimelyFlatMapFunction is close to what I
want to achieve. It will be great if we can query time information in the
window function so I filed https://issues.apache.org/jira/browse/FLINK-4953

Thanks for your time.

Manu

On Mon, Oct 31, 2016 at 10:29 PM Aljoscha Krettek 
wrote:

Hmm, I don't completely understand what's going on. Could you maybe post an
example, with the trigger code that shows this behaviour?

Cheers,
Aljoscha

On Thu, 27 Oct 2016 at 17:12 Manu Zhang  wrote:

Hi,

It's what I'm seeing. If timers are not fired at the end of window, a state
(in the window) whose timestamp is *after *the timer will also be emitted.
That's a problem for event-time trigger.

Thanks,
Manu


On Thu, Oct 27, 2016 at 10:29 PM Aljoscha Krettek 
wrote:

Hi,
is that example input/output what you would like to achieve or what you are
currently seeing with Flink? I think for your use case a custom Trigger
would be required that works like the event-time trigger but additionally
registers timers for each element where you want to emit.

Cheers,
Aljoscha

On Wed, 26 Oct 2016 at 04:04 Manu Zhang  wrote:

Hi Aljoscha,

Thanks for your response.  My use case is to track user trajectory based on
page view event when they visit a website.  The input would be like a list
of PageView(userId, url, eventTimestamp) with watermarks (= eventTimestamp
- duration). I'm trying SessionWindows with some event time trigger. Note
we can't wait for the end of session window due to latency. Instead, we
want to emit the user trajectories whenever a buffered PageView's event
time is passed by watermark. I tried ContinuousEventTimeTrigger and a
custom trigger which sets timer on each element's timestamp. For both
triggers I've witnessed a problem like the following (e.g. a session gap of
5)

PageView("user1", "http://foo;, 1)
PageView("user1", "http://foo/bar;, 2)
Watermark(1) => emit UserTrajectory("user1", "http://foo -> *http://foo/bar
*", [1,6])
PageView("user1", "http://foo/bar/foobar;, 5)
Watermark(4) => emit UserTrajectory("user1", "http://foo -> http://foo/bar ->
*http://foo/bar/foobar *", [1, 10])

The urls in bold should be included since there could be events before them
not arrived yet.


Thanks,
Manu


On Tue, Oct 25, 2016 at 1:36 AM Aljoscha Krettek 
wrote:

Hi,
with some additional information we might be able to figure this out
together. What specific combination of WindowAssigner/Trigger are you 

Re: Questions about FoldFunction and WindowFunction

2016-11-02 Thread Aljoscha Krettek
Would you be interested in contributing a fix for that? Otherwise I'll
probably fix work on that in the coming weeks.

On Wed, 2 Nov 2016 at 13:38 Yassine MARZOUGUI 
wrote:

> Thank you Aljoscha for your quick response.
>
> Best,
> Yassine
>
> 2016-11-02 12:30 GMT+01:00 Aljoscha Krettek :
>
> Hi Yassine,
>
> regarding 1. The close() method of the RichFoldFunction will only be
> called at the very end of your streaming job, so in practise it will never
> be called. This is there because of batch jobs, where you have an actual
> end in your processing.
>
> regarding 2. I'm afraid you came across a bug:
> https://issues.apache.org/jira/browse/FLINK-3869. We can't change this
> right now because we cannot break API instability but right at the end of
> this issue I'm proposing a different solution that we'll hopefully get in
> for the next release.
>
> Cheers,
> Aljoscha
>
> On Wed, 2 Nov 2016 at 10:42 Yassine MARZOUGUI 
> wrote:
>
> Hi all,
>
> I have a couple questions about FoldFunction and WindowFunction:
>
> 1. When using a RichFoldFunction after a window as in 
> keyedStream.window().fold(new
> RichFoldFunction()), is the close() method called after each window or
> after all the windows for that key are fired?
>
> 2. When applying a FoldFunction to a window followed by a WindowFunction
> via apply
> (R
>  initialValue,
> FoldFunction
> 
>  ,R>
>  foldFunction,
> WindowFunction
> 
>  
> ,W
> 
> > function), why should the output of the WindowFunction be of the same
> type as the input? It would be practical to have a different output type
> sometimes, for example one would fold tuples in the FoldFunction and then
> process the (only) aggregated tuple in the Window function and emit an
> Integer.
>
> Best,
> Yassine
>
>
>


Re: Questions about FoldFunction and WindowFunction

2016-11-02 Thread Yassine MARZOUGUI
Thank you Aljoscha for your quick response.

Best,
Yassine

2016-11-02 12:30 GMT+01:00 Aljoscha Krettek :

> Hi Yassine,
>
> regarding 1. The close() method of the RichFoldFunction will only be
> called at the very end of your streaming job, so in practise it will never
> be called. This is there because of batch jobs, where you have an actual
> end in your processing.
>
> regarding 2. I'm afraid you came across a bug: https://issues.apache.
> org/jira/browse/FLINK-3869. We can't change this right now because we
> cannot break API instability but right at the end of this issue I'm
> proposing a different solution that we'll hopefully get in for the next
> release.
>
> Cheers,
> Aljoscha
>
> On Wed, 2 Nov 2016 at 10:42 Yassine MARZOUGUI 
> wrote:
>
>> Hi all,
>>
>> I have a couple questions about FoldFunction and WindowFunction:
>>
>> 1. When using a RichFoldFunction after a window as in
>> keyedStream.window().fold(new RichFoldFunction()), is the close() method
>> called after each window or after all the windows for that key are fired?
>>
>> 2. When applying a FoldFunction to a window followed by a WindowFunction
>> via apply
>> (R
>>  initialValue,
>> FoldFunction
>> 
>> > 
>> ,R> foldFunction, WindowFunction
>> 
>> > 
>> ,W
>> 
>> > function), why should the output of the WindowFunction be of the same
>> type as the input? It would be practical to have a different output type
>> sometimes, for example one would fold tuples in the FoldFunction and then
>> process the (only) aggregated tuple in the Window function and emit an
>> Integer.
>>
>> Best,
>> Yassine
>>
>


Re: Questions about FoldFunction and WindowFunction

2016-11-02 Thread Aljoscha Krettek
Hi Yassine,

regarding 1. The close() method of the RichFoldFunction will only be called
at the very end of your streaming job, so in practise it will never be
called. This is there because of batch jobs, where you have an actual end
in your processing.

regarding 2. I'm afraid you came across a bug:
https://issues.apache.org/jira/browse/FLINK-3869. We can't change this
right now because we cannot break API instability but right at the end of
this issue I'm proposing a different solution that we'll hopefully get in
for the next release.

Cheers,
Aljoscha

On Wed, 2 Nov 2016 at 10:42 Yassine MARZOUGUI 
wrote:

> Hi all,
>
> I have a couple questions about FoldFunction and WindowFunction:
>
> 1. When using a RichFoldFunction after a window as in 
> keyedStream.window().fold(new
> RichFoldFunction()), is the close() method called after each window or
> after all the windows for that key are fired?
>
> 2. When applying a FoldFunction to a window followed by a WindowFunction
> via apply
> (R
>  initialValue,
> FoldFunction
> 
>  ,R>
>  foldFunction,
> WindowFunction
> 
>  
> ,W
> 
> > function), why should the output of the WindowFunction be of the same
> type as the input? It would be practical to have a different output type
> sometimes, for example one would fold tuples in the FoldFunction and then
> process the (only) aggregated tuple in the Window function and emit an
> Integer.
>
> Best,
> Yassine
>


Re: TimelyFlatMapFunction and DataStream

2016-11-02 Thread Aljoscha Krettek
There is already an open PR for fixing those Javadoc issues (along with
some other issues): https://github.com/apache/flink/pull/2715

On Wed, 2 Nov 2016 at 11:04 Stephan Ewen  wrote:

> Hi Ken!
>
> It may not be obvious, so here is a bit of background:
>
> The timers that are used in the FlatMapFunction are scoped by key. We
> thought that this is how they are mainly useful - that's why you need to
> define keys to use them.
> I think the docs are in error, thanks for pointing that out.
>
> In your use case, do you need timers without keys, or only access to the
> current processing/event time?
>
> Best,
> Stephan
>
>
> On Wed, Nov 2, 2016 at 1:59 AM, Ken Krugler 
> wrote:
>
> I’m curious why it seems like a TimelyFlatMapFunction can’t be used with a
> regular DataStream, but it can be used with a KeyedStream.
>
> Or maybe I’m missing something obvious (this is with 1.2-SNAPSHOT, pulled
> today).
>
> Also the documentation of TimelyFlatMapFunction (
> https://ci.apache.org/projects/flink/flink-docs-master/api/java/index.html?org/apache/flink/streaming/api/functions/TimelyFlatMapFunction.html)
> shows using it with a DataStream.flatMap(xxx) call.
>
> Thanks,
>
> — Ken
>
> --
> Ken Krugler
> +1 530-210-6378
> http://www.scaleunlimited.com
> custom big data solutions & training
> Hadoop, Cascading, Cassandra & Solr
>
>
>
>
>


Re: TimelyFlatMapFunction and DataStream

2016-11-02 Thread Stephan Ewen
Hi Ken!

It may not be obvious, so here is a bit of background:

The timers that are used in the FlatMapFunction are scoped by key. We
thought that this is how they are mainly useful - that's why you need to
define keys to use them.
I think the docs are in error, thanks for pointing that out.

In your use case, do you need timers without keys, or only access to the
current processing/event time?

Best,
Stephan


On Wed, Nov 2, 2016 at 1:59 AM, Ken Krugler 
wrote:

> I’m curious why it seems like a TimelyFlatMapFunction can’t be used with a
> regular DataStream, but it can be used with a KeyedStream.
>
> Or maybe I’m missing something obvious (this is with 1.2-SNAPSHOT, pulled
> today).
>
> Also the documentation of TimelyFlatMapFunction (https://ci.apache.org/
> projects/flink/flink-docs-master/api/java/index.html?
> org/apache/flink/streaming/api/functions/TimelyFlatMapFunction.html)
> shows using it with a DataStream.flatMap(xxx) call.
>
> Thanks,
>
> — Ken
>
> --
> Ken Krugler
> +1 530-210-6378
> http://www.scaleunlimited.com
> custom big data solutions & training
> Hadoop, Cascading, Cassandra & Solr
>
>
>
>


Questions about FoldFunction and WindowFunction

2016-11-02 Thread Yassine MARZOUGUI
Hi all,

I have a couple questions about FoldFunction and WindowFunction:

1. When using a RichFoldFunction after a window as in
keyedStream.window().fold(new
RichFoldFunction()), is the close() method called after each window or
after all the windows for that key are fired?

2. When applying a FoldFunction to a window followed by a WindowFunction
via apply
(R
initialValue,
FoldFunction

https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/datastream/WindowedStream.html>,R>
foldFunction,
WindowFunction


,W

> function), why should the output of the WindowFunction be of the same
type as the input? It would be practical to have a different output type
sometimes, for example one would fold tuples in the FoldFunction and then
process the (only) aggregated tuple in the Window function and emit an
Integer.

Best,
Yassine


Re: Question about the checkpoint mechanism in Flink.

2016-11-02 Thread Renjie Liu
Thanks for the reply.

On Wed, Nov 2, 2016 at 5:19 PM Till Rohrmann  wrote:

> Yes you're right. Whenever you have multiple input channels which could
> also be the case if you do a repartitioning between two mappers.
>
> On Tue, Nov 1, 2016 at 11:48 PM, Renjie Liu 
> wrote:
>
> Hi, Till:
> I think the multiple input should include the more general case where
> redistribution happens between subtasks, right? Since in this case we also
> need to align check barrier.
>
> Till Rohrmann 于2016年11月1日周二 下午11:05写道:
>
> The tuples are not buffered until the snapshot is globally complete (a
> snapshot is globally complete iff all operators have successfully taken a
> snapshot). They are only buffered until the corresponding checkpoint
> barrier on the second input is received. Once this is the case, the
> checkpoint barrier will directly be send to the downstream operators. Next
> a snapshot is taken. Depending on the state backend this can happen
> asynchronously or synchronously. After this is done, the operator continues
> processing elements (for the first input, the buffered elements are
> consumed first).
>
> With multiple inputs I referred to a coFlatMap operator or a join operator
> which have both two inputs.
>
> Cheers,
> Till
>
> On Tue, Nov 1, 2016 at 3:29 PM, Renjie Liu 
> wrote:
>
> Hi, Till:
> By operator with multiple inputs, do you mean inputs from multiple
> subtasks?
>
> On Tue, Nov 1, 2016 at 8:56 PM Till Rohrmann  wrote:
>
> Hi Li,
>
> the statement refers to operators with multiple inputs (two in this case).
> With the current implementation you will indeed block one of the inputs
> after receiving a checkpoint barrier n until you've received the
> corresponding checkpoint barrier n on the other input as well. This is what
> we call checkpoint barrier alignment. If the processing time on both input
> paths is similar and thus there is no back pressure on any of the inputs,
> the alignment should not take too long. In case where one of the inputs is
> considerably slower than the other, you should an additional delay.
>
> For single input operators, you don't have to align the checkpoint
> barriers.
>
> The checkpoint barrier alignment is not strictly necessary, but it allows
> us to not having to store all in flight records from the second input which
> arrive between the checkpoint barrier on the first input and the
> corresponding barrier on the second input. We might change this
> implementation in the future, though.
>
> Cheers,
> Till
>
> On Tue, Nov 1, 2016 at 8:05 AM, Li Wang  wrote:
>
> Hi all,
>
> I have a question regarding to the state checkpoint mechanism in Flink. I
> find the statement  "Once the last stream has received barrier n, the
> operator emits all pending outgoing records, and then emits
> snapshot n barriers itself” on the document
> https://ci.apache.org/projects/flink/flink-docs-master/internals/stream_checkpointing.html#exactly-once-vs-at-least-once
> .
>
> Does this mean that to achieve exactly-once semantic, instead of sending
> tuples downstream immediately the operator buffers its outgoing tuples in a
> pending queue until the current snapshot is committed? If yes, will this
> introduce significant processing delay?
>
> Thanks,
> Li
>
>
> --
> Liu, Renjie
> Software Engineer, MVAD
>
>
> --
> Liu, Renjie
> Software Engineer, MVAD
>
>
> --
Liu, Renjie
Software Engineer, MVAD


Best Practices/Advice - Execution of jobs

2016-11-02 Thread PedroMrChaves
Hello,

I'm trying to build a stream event correlation engine with Flink and I have
some questions regarding the for the execution of jobs. 

In my architecture I need to have different sources of data, lets say for
instance:
/firewallStream= environment.addSource([FirewalLogsSource]);
proxyStream = environment.addSource([ProxyLogsSource]);
/
and for each of these sources, I need to apply a set of rules. 
So lets say I have a job that has as a source the proxy stream data with the
following rules:

///Abnormal Request Method
stream.[RuleLogic].addSink([output])
//Web Service on Non-Typical Port
stream.[RuleLogic].addSink([output])
//Possible Brute Force 
stream.[RuleLogic].addSink([output])/

These rules will probably scale to be in the order of 15 to 20 rules.

What is the best approach in this case:
1. Should I create 2 jobs one for each source and each job would have the
15-20 rules?
2. Should I split the rules into several jobs?
3. Other options?


Thank you and Regards,
Pedro Chaves.





--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Best-Practices-Advice-Execution-of-jobs-tp9822.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Question about the checkpoint mechanism in Flink.

2016-11-02 Thread Till Rohrmann
Yes you're right. Whenever you have multiple input channels which could
also be the case if you do a repartitioning between two mappers.

On Tue, Nov 1, 2016 at 11:48 PM, Renjie Liu  wrote:

> Hi, Till:
> I think the multiple input should include the more general case where
> redistribution happens between subtasks, right? Since in this case we also
> need to align check barrier.
>
> Till Rohrmann 于2016年11月1日周二 下午11:05写道:
>
>> The tuples are not buffered until the snapshot is globally complete (a
>> snapshot is globally complete iff all operators have successfully taken a
>> snapshot). They are only buffered until the corresponding checkpoint
>> barrier on the second input is received. Once this is the case, the
>> checkpoint barrier will directly be send to the downstream operators. Next
>> a snapshot is taken. Depending on the state backend this can happen
>> asynchronously or synchronously. After this is done, the operator continues
>> processing elements (for the first input, the buffered elements are
>> consumed first).
>>
>> With multiple inputs I referred to a coFlatMap operator or a join
>> operator which have both two inputs.
>>
>> Cheers,
>> Till
>>
>> On Tue, Nov 1, 2016 at 3:29 PM, Renjie Liu 
>> wrote:
>>
>> Hi, Till:
>> By operator with multiple inputs, do you mean inputs from multiple
>> subtasks?
>>
>> On Tue, Nov 1, 2016 at 8:56 PM Till Rohrmann 
>> wrote:
>>
>> Hi Li,
>>
>> the statement refers to operators with multiple inputs (two in this
>> case). With the current implementation you will indeed block one of the
>> inputs after receiving a checkpoint barrier n until you've received the
>> corresponding checkpoint barrier n on the other input as well. This is what
>> we call checkpoint barrier alignment. If the processing time on both input
>> paths is similar and thus there is no back pressure on any of the inputs,
>> the alignment should not take too long. In case where one of the inputs is
>> considerably slower than the other, you should an additional delay.
>>
>> For single input operators, you don't have to align the checkpoint
>> barriers.
>>
>> The checkpoint barrier alignment is not strictly necessary, but it allows
>> us to not having to store all in flight records from the second input which
>> arrive between the checkpoint barrier on the first input and the
>> corresponding barrier on the second input. We might change this
>> implementation in the future, though.
>>
>> Cheers,
>> Till
>>
>> On Tue, Nov 1, 2016 at 8:05 AM, Li Wang  wrote:
>>
>> Hi all,
>>
>> I have a question regarding to the state checkpoint mechanism in Flink. I
>> find the statement  "Once the last stream has received barrier n, the
>> operator emits all pending outgoing records, and then emits
>> snapshot n barriers itself” on the document https://ci.apache.org/
>> projects/flink/flink-docs-master/internals/stream_
>> checkpointing.html#exactly-once-vs-at-least-once.
>>
>> Does this mean that to achieve exactly-once semantic, instead of sending
>> tuples downstream immediately the operator buffers its outgoing tuples in a
>> pending queue until the current snapshot is committed? If yes, will this
>> introduce significant processing delay?
>>
>> Thanks,
>> Li
>>
>>
>> --
>> Liu, Renjie
>> Software Engineer, MVAD
>>
>>
>> --
> Liu, Renjie
> Software Engineer, MVAD
>