Dedicated ExecutionContext inside Flink

2016-06-07 Thread Soumya Simanta
What is the recommended practice for using a dedicated ExecutionContexts
inside Flink code?

We are making some external network calls using Futures. Currently all of
them are running on the global execution context (import
scala.concurrent.ExecutionContext.Implicits.global).

Thanks
-Soumya


Uploaded jar disappears when web monitor restarts

2016-06-07 Thread Emanuele Cesena
Hi,

When the web monitor restarts the uploaded jars disappear — in fact, every time 
it restarts the upload directory is different.

This seems intentional:
https://github.com/apache/flink/blob/master/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java#L162

Could anyone confirm?

Wouldn’t it be useful to have a config param to be able to set it permanently 
(and thus avoiding jars to be deleted)? Or, what is the intended way to “add 
jars to my cluster”?

Thank you,
E.



Re: Reading whole files (from S3)

2016-06-07 Thread Suneel Marthi
You can use Mahout XMLInputFormat with Flink - HAdoopInputFormat
definitions. See


http://stackoverflow.com/questions/29429428/xmlinputformat-for-apache-flink
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Read-XML-from-HDFS-td7023.html


On Tue, Jun 7, 2016 at 10:11 PM, Jamie Grier 
wrote:

> Hi Andrea,
>
> How large are these data files?  The implementation you've mentioned here
> is only usable if they are very small.  If so, you're fine.  If not read
> on...
>
> Processing XML input files in parallel is tricky.  It's not a great format
> for this type of processing as you've seen.  They are tricky to split and
> more complex to iterate through than simpler formats. However, others have
> implemented XMLInputFormat classes for Hadoop.  Have you looked at these?
> Mahout has an XMLInputFormat implementation for example but I haven't used
> it directly.
>
> Anyway, you can reuse Hadoop InputFormat implementations in Flink
> directly.  This is likely a good route.  See Flink's HadoopInputFormat
> class.
>
> -Jamie
>
>
> On Tue, Jun 7, 2016 at 7:35 AM, Andrea Cisternino 
> wrote:
>
>> Hi all,
>>
>> I am evaluating Apache Flink for processing large sets of Geospatial data.
>> The use case I am working on will involve reading a certain number of GPX
>> files stored on Amazon S3.
>>
>> GPX files are actually XML files and therefore cannot be read on a line
>> by line basis.
>> One GPX file will produce one or more Java objects that will contain the
>> geospatial data we need to process (mostly a list of geographical points).
>>
>> To cover this use case I tried to extend the FileInputFormat class:
>>
>> public class WholeFileInputFormat extends FileInputFormat
>> {
>>   private boolean hasReachedEnd = false;
>>
>>   public WholeFileInputFormat() {
>> unsplittable = true;
>>   }
>>
>>   @Override
>>   public void open(FileInputSplit fileSplit) throws IOException {
>> super.open(fileSplit);
>> hasReachedEnd = false;
>>   }
>>
>>   @Override
>>   public String nextRecord(String reuse) throws IOException {
>> // uses apache.commons.io.IOUtils
>> String fileContent = IOUtils.toString(stream, StandardCharsets.UTF_8);
>> hasReachedEnd = true;
>> return fileContent;
>>   }
>>
>>   @Override
>>   public boolean reachedEnd() throws IOException {
>> return hasReachedEnd;
>>   }
>> }
>>
>> This class returns the content of the whole file as a string.
>>
>> Is this the right approach?
>> It seems to work when run locally with local files but I wonder if it
>> would
>> run into problems when tested in a cluster.
>>
>> Thanks in advance.
>>   Andrea.
>>
>> --
>> Andrea Cisternino, Erlangen, Germany
>> GitHub: http://github.com/acisternino
>> GitLab: https://gitlab.com/u/acisternino
>>
>
>
>
> --
>
> Jamie Grier
> data Artisans, Director of Applications Engineering
> @jamiegrier 
> ja...@data-artisans.com
>
>


Re: Reading whole files (from S3)

2016-06-07 Thread Jamie Grier
Hi Andrea,

How large are these data files?  The implementation you've mentioned here
is only usable if they are very small.  If so, you're fine.  If not read
on...

Processing XML input files in parallel is tricky.  It's not a great format
for this type of processing as you've seen.  They are tricky to split and
more complex to iterate through than simpler formats. However, others have
implemented XMLInputFormat classes for Hadoop.  Have you looked at these?
Mahout has an XMLInputFormat implementation for example but I haven't used
it directly.

Anyway, you can reuse Hadoop InputFormat implementations in Flink
directly.  This is likely a good route.  See Flink's HadoopInputFormat
class.

-Jamie


On Tue, Jun 7, 2016 at 7:35 AM, Andrea Cisternino 
wrote:

> Hi all,
>
> I am evaluating Apache Flink for processing large sets of Geospatial data.
> The use case I am working on will involve reading a certain number of GPX
> files stored on Amazon S3.
>
> GPX files are actually XML files and therefore cannot be read on a line by
> line basis.
> One GPX file will produce one or more Java objects that will contain the
> geospatial data we need to process (mostly a list of geographical points).
>
> To cover this use case I tried to extend the FileInputFormat class:
>
> public class WholeFileInputFormat extends FileInputFormat
> {
>   private boolean hasReachedEnd = false;
>
>   public WholeFileInputFormat() {
> unsplittable = true;
>   }
>
>   @Override
>   public void open(FileInputSplit fileSplit) throws IOException {
> super.open(fileSplit);
> hasReachedEnd = false;
>   }
>
>   @Override
>   public String nextRecord(String reuse) throws IOException {
> // uses apache.commons.io.IOUtils
> String fileContent = IOUtils.toString(stream, StandardCharsets.UTF_8);
> hasReachedEnd = true;
> return fileContent;
>   }
>
>   @Override
>   public boolean reachedEnd() throws IOException {
> return hasReachedEnd;
>   }
> }
>
> This class returns the content of the whole file as a string.
>
> Is this the right approach?
> It seems to work when run locally with local files but I wonder if it would
> run into problems when tested in a cluster.
>
> Thanks in advance.
>   Andrea.
>
> --
> Andrea Cisternino, Erlangen, Germany
> GitHub: http://github.com/acisternino
> GitLab: https://gitlab.com/u/acisternino
>



-- 

Jamie Grier
data Artisans, Director of Applications Engineering
@jamiegrier 
ja...@data-artisans.com


Re: Does Flink allows for encapsulation of transformations?

2016-06-07 Thread Ser Kho

Chesnay:Just want to thank you. I might have one or two related questions later 
on, but now just thanks.

 

On Tuesday, June 7, 2016 8:18 AM, Greg Hogan  wrote:
 

 "The question is how to encapsulate numerous transformations into one object 
or may be a function in Apache Flink Java setting."

Implement CustomUnaryOperation. This can then be applied to a DataSet by 
calling `DataSet result = DataSet.runOperation(new MyOperation<>(...));`.

On Mon, Jun 6, 2016 at 3:14 PM, Ser Kho  wrote:

The question is how to encapsulate numerous transformations into one object or 
may be a function in Apache Flink Java setting. I have tried to investigate 
this question using an example of Pi calculation (see below). I am wondering 
whether or not the suggested approach is valid from the Flink's point of view. 
It works on one computer, however, I do not know how it will behave in a 
cluster setup. The code is given below, and the main idea behind it as follows: 
  
   - Create a class, named classPI, which method compute() does all data 
transformations, see more about it below.
   - In the main method create a DataSet as in DataSet< classPI > opi = 
env.fromElements(new classPI());
   - Create DataSet< Double > PI, which equals output of transformation map() 
that calls the object PI's method compute() as inDataSet< Double > PI = 
opi.map(new MapFunction< classPI , Double>() { public Double map(classPI objPI) 
{ return objPI.compute(); }});
   - Now about ClassPI  
  - Constructor instantiates ExecutionEnvironment, which is local for this 
class, as inpublic classPI(){ this.NumIter=100; env = 
ExecutionEnvironment.getExecutionEnvironment();}

Thus, the code has two ExecutionEnvironment objects: one in main and another in 
the class classPI.   
   - Has method compute() that runs all data transormations (in this example it 
is just several lines but potentially it might contain tons of Flink 
transfromations)public Double compute(){ DataSet count = 
env.generateSequence(1, NumIter) .map(new Sampler()) .reduce(new SumReducer()); 
PI = 4.0*count.collect().get(0)/NumIter;   
return PI;}
the whole code is given below. Again, the question is if this is a valid 
approach for encapsulation of data transformation into a class in Flink setup 
that is supposed to be parallelizable to work on a cluster. Is there a better 
way to hide details of data transformations?Thanks a lot!
-The code --
public class PiEstimation{

public static void main(String[] args) throws Exception 
{
// this is one ExecutionEnvironment
 final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();   
// this is critical DataSet with my classPI that computes PI
 DataSet opi = env.fromElements(new classPI());
// this map calls the method compute() of class classPI that computes PI
 DataSet PI = opi.map(new MapFunction() 
{
   public Double map(classPI  objPI) throws Exception { 
   // this is how I call method compute() that calculates PI using 
transformations  
   return objPI.compute(); } });

   double pi = PI.collect().get(0);
   System.out.println("We estimate Pi to be: " + pi);   
}

// this class is of no impotance for my question, howerver, it is relevant for 
pi calculation 
public static class Sampler implements MapFunction {
@Override
public Long map(Long value) {
double x = Math.random();
double y = Math.random();
return (x * x + y * y) < 1 ? 1L : 0L;}}

// this class is of no impotance for my question, howerver, it is relevant for 
pi calculation 
public static final class SumReducer implements ReduceFunction{
  @Override
  public Long reduce(Long value1, Long value2) {
  return value1 + value2;}}

// this is my class that computes PI, my question is whether such a class is 
valid in Flink on  cluster with parallel computation 
public static final class classPI
{
   public Integer NumIter;
   private final ExecutionEnvironment env;
   public Double PI;

   // this is constructor with another ExecutionEnvironment
   public   classPI(){
   this.NumIter=100;
env = ExecutionEnvironment.getExecutionEnvironment();
   }
   //This is the the method that contains all data transformation
   public Double compute() throws Exception{
 DataSet count = env.generateSequence(1, NumIter)
   .map(new Sampler())
   .reduce(new SumReducer());
 PI = 4.0*count.collect().get(0)/NumIter;   

 return  PI;}}}



  

Re: Multi-field "sum" function just like "keyBy"

2016-06-07 Thread Jamie Grier
I'm assuming what you're trying to do is essentially sum over two different
fields of your data.  I would do this with my own ReduceFunction.


stream
  .keyBy("someKey")
  .reduce(CustomReduceFunction) // sum whatever fields you want and return
the result

I think it does make sense that Flink could provide a generic sum function
that could sum over multiple fields, though.

-Jamie


On Tue, Jun 7, 2016 at 5:41 AM, Al-Isawi Rami 
wrote:

> Hi,
>
> Is there any reason why “keyBy" accepts multi-field, while for example
> “sum” does not.
>
> -Rami
> Disclaimer: This message and any attachments thereto are intended solely
> for the addressed recipient(s) and may contain confidential information. If
> you are not the intended recipient, please notify the sender by reply
> e-mail and delete the e-mail (including any attachments thereto) without
> producing, distributing or retaining any copies thereof. Any review,
> dissemination or other use of, or taking of any action in reliance upon,
> this information by persons or entities other than the intended
> recipient(s) is prohibited. Thank you.
>



-- 

Jamie Grier
data Artisans, Director of Applications Engineering
@jamiegrier 
ja...@data-artisans.com


Re: Submit Flink Jobs to YARN running on AWS

2016-06-07 Thread Ashutosh Kumar
If you use open vpn for accessing aws then you can use private IP of ec2
machine from your laptop.


Thanks
Ashutosh

On Tue, Jun 7, 2016 at 11:00 PM, Shannon Carey  wrote:

> We're also starting to look at automating job deployment/start to Flink
> running on EMR. There are a few options:
>
>- Use RemoteExecutionEnvironment (per the examples). Problems: not
>sure best way to upload JAR, not sure how to run it detached so that the
>Java program that starts the job is asynchronous with the long-running
>cluster job.
>- Use the CLI. Problems: need to run it locally on the YARN node,
>otherwise you encounter the problems discussed below? It requires a Flink
>distro. Logs of the launch will remain local to the machine that executes
>it (eg. if it's on a Jenkins slave)
>- Use the HTTP API
>
> Is using the HTTP API a reasonable approach? Is that API considered stable
> enough that we could rely on it continuing to be present?
>
> Thanks,
> Shannon
>
>
> From: "Bajaj, Abhinav" 
> Date: Monday, June 6, 2016 at 12:10 PM
> To: Josh 
> Cc: "user@flink.apache.org" 
>
> Subject: Re: Submit Flink Jobs to YARN running on AWS
>
> Hi Josh,
>
> I have not yet :-( . I am working on getting a REST service setup on AWS
> that can do it rather than using Flink client remotely.
> This way the AKKA communication is within AWS.
>
> However, I still need the solution for running some of the
> integration/system tests.
>
> ~ Abhi
>
> From: Josh 
> Reply-To: "user@flink.apache.org" 
> Date: Monday, June 6, 2016 at 11:55 AM
> To: "user@flink.apache.org" 
> Subject: Re: Submit Flink Jobs to YARN running on AWS
>
> Hi Abhi,
>
> I'm also looking to deploy Flink jobs remotely to YARN, and eventually
> automate it - just wondering if you found a way to do it?
>
> Thanks,
> Josh
>
> On Wed, May 25, 2016 at 12:36 AM, Bajaj, Abhinav 
> wrote:
>
>> Hi,
>>
>> Has anyone tried to submit a Flink Job remotely to Yarn running in AWS ?
>> The case I am stuck with is where the Flink client is on my laptop and
>> YARN is running on AWS.
>>
>> @Robert, Did you get a chance to try this out?
>>
>> Regards,
>> Abhi
>>
>> From: "Bajaj, Abhinav" 
>> Date: Friday, April 29, 2016 at 3:50 PM
>>
>> To: "user@flink.apache.org" 
>> Subject: Re: Submit Flink Jobs to YARN running on AWS
>>
>> Hi Robert,
>>
>> Thanks for your reply.
>>
>> I am using the Public DNS for the EC2 machines in the yarn and hdfs
>> configuration files. It looks like "
>> ec2-203-0-113-25.compute-1.amazonaws.com”
>> You should be able to connect then.
>>
>> I have hadoop installed locally and the YARN_CONF_DIR is pointing to it.
>> The yarn-site.xml and core-site.xml files use the resource manager
>> address(Public DNS) running in AWS.
>>
>> So, whenever I submit the job using the client on my laptop, it connects
>> to RM.
>> The RM starts the YARN application and starts the Job manager.
>> The job manager starts the actor system using the internal IP of the
>> nodemanager. In my understanding, this is where the problem lies.
>>
>> The local client tries to connect to the Job manager actor system but the
>> messages are dropped by the actor system as the IP address(EC2 internal IP)
>> that actor system started with does not match the external IP
>> address(Public IP) that was used by Flink client to send the message.
>> Please see my first mail below for detailed logs.
>>
>> Please keep me posted with your progress.
>>
>> I plan to move the cluster to VPC for other reasons.
>> I have limited knowledge of VPC but I guess the difference in internal
>> and external IP address will not be resolved.
>> Please correct if your views are different.
>>
>> It will be great if you are able to reproduce the issue.
>>
>> Thanks again.
>> Abhi
>>
>>
>>
>> *[image: cid:DACBF116-FD8C-48DB-B91D-D54510B306E8]*
>>
>> *Abhinav Bajaj*
>>
>> Senior Engineer
>>
>> HERE Predictive Analytics
>>
>> Office:  +12062092767
>>
>> Mobile: +17083299516
>>
>> *HERE Seattle*
>>
>> 701 Pike Street, #2000, Seattle, WA 98101, USA
>>
>> *47° 36' 41" N. 122° 19' 57" W*
>>
>> *HERE Maps*
>>
>>
>>
>>
>> From: Robert Metzger 
>> Reply-To: "user@flink.apache.org" 
>> Date: Tuesday, April 26, 2016 at 3:16 AM
>> To: "user@flink.apache.org" 
>> Subject: Re: Submit Flink Jobs to YARN running on AWS
>>
>> I've started my own EMR cluster and tried to launch a Flink job from my
>> local machine on it.
>> I have to admin that configuring the EMR launched Hadoop for external
>> access is quite a hassle.
>>
>> I'm not even able to submit Flink to the YARN cluster because the client
>> can not connect to the ResourceManager. I've change the resource manager
>> hostname to the public one in the yarn-site.xml on the cluster and

Re: Window start and end issue with TumblingProcessingTimeWindows

2016-06-07 Thread Soumya Simanta
Thanks for the clarification.

On Tue, Jun 7, 2016 at 9:15 PM, Aljoscha Krettek 
wrote:

> Hi,
> I'm afraid you're running into a bug into the special processing-time
> window operator. A suggested workaround would be to switch to
> characteristic IngestionTime and use TumblingEventTimeWindows.
>
> I also open a Jira issue for the bug so that we can keep track of it:
> https://issues.apache.org/jira/browse/FLINK-4028
>
> Cheers,
> Aljoscha
>
> On Tue, 7 Jun 2016 at 14:57 Soumya Simanta 
> wrote:
>
>> The problem is why is the window end time in the future ?
>>
>> For example if my window size is 60 seconds and my window is being
>> evaluated at 3.00 pm then why is the window end time 3.01 pm and not 3.00
>> pm even when the data that is being evaluated falls in the window 2.59 -
>> 3.00.
>>
>> Sent from my iPhone
>>
>> On Jun 7, 2016, at 3:47 PM, Chesnay Schepler  wrote:
>>
>> could you state a specific problem?
>>
>> On 07.06.2016 06:40, Soumya Simanta wrote:
>>
>> I've a simple program which takes some inputs from a command line (Socket
>> stream) and then aggregates based on the key.
>>
>> When running this program on my local machine I see some output that is
>> counter intuitive to my understanding of windows in Flink.
>>
>> The start time of the Window is around the time the Functions are being
>> evaluated. However, *the window end time is around 60 s (window size)
>> after the current time (please see below). *
>>
>> Can someone explain this behaviour please?
>>
>> import org.apache.flink.api.scala._import 
>> org.apache.flink.streaming.api.TimeCharacteristicimport 
>> org.apache.flink.streaming.api.scala.StreamExecutionEnvironmentimport 
>> org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindowsimport
>>  org.apache.flink.streaming.api.windowing.time.Timeimport 
>> org.apache.flink.streaming.api.windowing.windows.TimeWindowimport 
>> org.apache.flink.util.Collector
>> case class EventAgg(start: Long, end: Long, key: String, value: Int)
>> object Processor {
>>
>>   val window_length = 6 // milliseconds  def aggregateEvents(key: 
>> String, window: TimeWindow, in: Iterable[Event], out: Collector[EventAgg]): 
>> Unit = {
>> var sum = 0for (e <- in) {
>>   sum = sum + e.value
>> }
>> val start = window.getStart
>> val end = window.getEnd
>> val diff = (end - start)
>> println(s" windowId: ${window.hashCode()} currenttime: 
>> ${System.currentTimeMillis()} key:[$key] start: $start end: $end diff: 
>> $diff")
>>
>>
>> out.collect(
>>   new EventAgg(
>> start = window.getStart,
>> end = window.getEnd,
>> key = key,
>> value = sum
>>   )
>> )
>>   }
>>
>>   def main(Args: Array[String]): Unit = {
>> val env = StreamExecutionEnvironment.getExecutionEnvironment
>> //env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>> env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
>> //env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
>> val sevents = env.socketTextStream("localhost", 9000)
>> sevents
>>   .map(x => parseEvent(x))
>>   .keyBy(_.key)
>>   
>> .window(TumblingProcessingTimeWindows.of(Time.milliseconds(window_length)))
>>   .apply(aggregateEvents(_, _, _, _: Collector[EventAgg]))
>>   .map("Default Assigner: " + System.currentTimeMillis().toString + " - 
>> " + _.toString)
>>   .print()
>>
>> env.execute("Event time windows")
>>   }
>>
>>   def parseEvent(s: String): Event = {
>> if (s == null || s.trim().length == 0)
>>   Event("default", 0, 0L)
>> else {
>>   val parts = s.split(",")
>>   Event(parts(0), parts(1).toInt, 1L)
>> }
>>   }
>> }
>>
>>
>> *Output*
>>
>>  windowId: -663519360 currenttime: 146523427 key:[a] start:
>> 146523420 end: 146523426 diff: 6
>>  windowId: -663519360 currenttime: 146523426 key:[b] start:
>> 146523420 end: 146523426 diff: 6
>> 3> Default Assigner: 1465234200010 -
>> EventAgg(146523420,146523426,a,3)
>> 7> Default Assigner: 1465234200010 -
>> EventAgg(146523420,146523426,b,4)
>>
>>
>>
>>


Re: Window start and end issue with TumblingProcessingTimeWindows

2016-06-07 Thread Aljoscha Krettek
Hi,
I'm afraid you're running into a bug into the special processing-time
window operator. A suggested workaround would be to switch to
characteristic IngestionTime and use TumblingEventTimeWindows.

I also open a Jira issue for the bug so that we can keep track of it:
https://issues.apache.org/jira/browse/FLINK-4028

Cheers,
Aljoscha

On Tue, 7 Jun 2016 at 14:57 Soumya Simanta  wrote:

> The problem is why is the window end time in the future ?
>
> For example if my window size is 60 seconds and my window is being
> evaluated at 3.00 pm then why is the window end time 3.01 pm and not 3.00
> pm even when the data that is being evaluated falls in the window 2.59 -
> 3.00.
>
> Sent from my iPhone
>
> On Jun 7, 2016, at 3:47 PM, Chesnay Schepler  wrote:
>
> could you state a specific problem?
>
> On 07.06.2016 06:40, Soumya Simanta wrote:
>
> I've a simple program which takes some inputs from a command line (Socket
> stream) and then aggregates based on the key.
>
> When running this program on my local machine I see some output that is
> counter intuitive to my understanding of windows in Flink.
>
> The start time of the Window is around the time the Functions are being
> evaluated. However, *the window end time is around 60 s (window size)
> after the current time (please see below). *
>
> Can someone explain this behaviour please?
>
> import org.apache.flink.api.scala._import 
> org.apache.flink.streaming.api.TimeCharacteristicimport 
> org.apache.flink.streaming.api.scala.StreamExecutionEnvironmentimport 
> org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindowsimport
>  org.apache.flink.streaming.api.windowing.time.Timeimport 
> org.apache.flink.streaming.api.windowing.windows.TimeWindowimport 
> org.apache.flink.util.Collector
> case class EventAgg(start: Long, end: Long, key: String, value: Int)
> object Processor {
>
>   val window_length = 6 // milliseconds  def aggregateEvents(key: String, 
> window: TimeWindow, in: Iterable[Event], out: Collector[EventAgg]): Unit = {
> var sum = 0for (e <- in) {
>   sum = sum + e.value
> }
> val start = window.getStart
> val end = window.getEnd
> val diff = (end - start)
> println(s" windowId: ${window.hashCode()} currenttime: 
> ${System.currentTimeMillis()} key:[$key] start: $start end: $end diff: $diff")
>
>
> out.collect(
>   new EventAgg(
> start = window.getStart,
> end = window.getEnd,
> key = key,
> value = sum
>   )
> )
>   }
>
>   def main(Args: Array[String]): Unit = {
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> //env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
> env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
> //env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
> val sevents = env.socketTextStream("localhost", 9000)
> sevents
>   .map(x => parseEvent(x))
>   .keyBy(_.key)
>   
> .window(TumblingProcessingTimeWindows.of(Time.milliseconds(window_length)))
>   .apply(aggregateEvents(_, _, _, _: Collector[EventAgg]))
>   .map("Default Assigner: " + System.currentTimeMillis().toString + " - " 
> + _.toString)
>   .print()
>
> env.execute("Event time windows")
>   }
>
>   def parseEvent(s: String): Event = {
> if (s == null || s.trim().length == 0)
>   Event("default", 0, 0L)
> else {
>   val parts = s.split(",")
>   Event(parts(0), parts(1).toInt, 1L)
> }
>   }
> }
>
>
> *Output*
>
>  windowId: -663519360 currenttime: 146523427 key:[a] start:
> 146523420 end: 146523426 diff: 6
>  windowId: -663519360 currenttime: 146523426 key:[b] start:
> 146523420 end: 146523426 diff: 6
> 3> Default Assigner: 1465234200010 -
> EventAgg(146523420,146523426,a,3)
> 7> Default Assigner: 1465234200010 -
> EventAgg(146523420,146523426,b,4)
>
>
>
>


Re: Kafka producer sink message loss?

2016-06-07 Thread Elias Levy
On Tue, Jun 7, 2016 at 4:52 AM, Stephan Ewen  wrote:

> The concern you raised about the sink being synchronous is exactly what my
> last suggestion should address:
>
> The internal state backends can return a handle that can do the sync in a
> background thread. The sink would continue processing messages, and the
> checkpoint would only be acknowledged after the background sync did
> complete.
> We should allow user code to return such a handle as well.
>

Sorry.  Apparently I hadn't had enough coffee and completely missed the
last paragraph of your response.  The async solution you propose seems
ideal.

What message ordering guarantees are you worried about?

I don't think you can do much about guaranteeing message ordering within
Kafka in case of failure, and you'll replay some messages.  And there isn't
any guarantee if you are writing to a Kafka topic with multiple partitions
from multiple sinks using a message key distinct from the key you used in a
keyBy in Flink, as you'll be writing from multiple sink instances in
parallel in what is essentially a shuffle.  It would seem the only ordering
guarantee is if you write from a sink into a Kafka topic using a message
key that is the same as the key used in a keyBy in Flink, and even that
will be violated during a failure and replay by the sink.


Re: Multi-field "sum" function just like "keyBy"

2016-06-07 Thread Gábor Gévay
Ah, sorry, you are right. You could also call keyBy again before the
second sum, but maybe someone else has a better idea.

Best,
Gábor



2016-06-07 16:18 GMT+02:00 Al-Isawi Rami :
> Thanks Gábor, but the first sum call will return
>
> SingleOutputStreamOperator
>
> I could not do another sum call on that. Would tell me how did you manage to
> do
>
> stream.sum().sum()
>
> Regards,
> -Rami
>
> On 7 Jun 2016, at 16:13, Gábor Gévay  wrote:
>
> Hello,
>
> In the case of "sum", you can just specify them one after the other, like:
>
> stream.sum(1).sum(2)
>
> This works, because summing the two fields are independent. However,
> in the case of "keyBy", the information is needed from both fields at
> the same time to produce the key.
>
> Best,
> Gábor
>
>
>
> 2016-06-07 14:41 GMT+02:00 Al-Isawi Rami :
>
> Hi,
>
> Is there any reason why “keyBy" accepts multi-field, while for example “sum”
> does not.
>
> -Rami
> Disclaimer: This message and any attachments thereto are intended solely for
> the addressed recipient(s) and may contain confidential information. If you
> are not the intended recipient, please notify the sender by reply e-mail and
> delete the e-mail (including any attachments thereto) without producing,
> distributing or retaining any copies thereof. Any review, dissemination or
> other use of, or taking of any action in reliance upon, this information by
> persons or entities other than the intended recipient(s) is prohibited.
> Thank you.
>
>
> Disclaimer: This message and any attachments thereto are intended solely for
> the addressed recipient(s) and may contain confidential information. If you
> are not the intended recipient, please notify the sender by reply e-mail and
> delete the e-mail (including any attachments thereto) without producing,
> distributing or retaining any copies thereof. Any review, dissemination or
> other use of, or taking of any action in reliance upon, this information by
> persons or entities other than the intended recipient(s) is prohibited.
> Thank you.


Reading whole files (from S3)

2016-06-07 Thread Andrea Cisternino
Hi all,

I am evaluating Apache Flink for processing large sets of Geospatial data.
The use case I am working on will involve reading a certain number of GPX
files stored on Amazon S3.

GPX files are actually XML files and therefore cannot be read on a line by
line basis.
One GPX file will produce one or more Java objects that will contain the
geospatial data we need to process (mostly a list of geographical points).

To cover this use case I tried to extend the FileInputFormat class:

public class WholeFileInputFormat extends FileInputFormat
{
  private boolean hasReachedEnd = false;

  public WholeFileInputFormat() {
unsplittable = true;
  }

  @Override
  public void open(FileInputSplit fileSplit) throws IOException {
super.open(fileSplit);
hasReachedEnd = false;
  }

  @Override
  public String nextRecord(String reuse) throws IOException {
// uses apache.commons.io.IOUtils
String fileContent = IOUtils.toString(stream, StandardCharsets.UTF_8);
hasReachedEnd = true;
return fileContent;
  }

  @Override
  public boolean reachedEnd() throws IOException {
return hasReachedEnd;
  }
}

This class returns the content of the whole file as a string.

Is this the right approach?
It seems to work when run locally with local files but I wonder if it would
run into problems when tested in a cluster.

Thanks in advance.
  Andrea.

-- 
Andrea Cisternino, Erlangen, Germany
GitHub: http://github.com/acisternino
GitLab: https://gitlab.com/u/acisternino


Re: Multi-field "sum" function just like "keyBy"

2016-06-07 Thread Al-Isawi Rami
Thanks Gábor, but the first sum call will return

SingleOutputStreamOperator

I could not do another sum call on that. Would tell me how did you manage to do

stream.sum().sum()

Regards,
-Rami

On 7 Jun 2016, at 16:13, Gábor Gévay 
> wrote:

Hello,

In the case of "sum", you can just specify them one after the other, like:

stream.sum(1).sum(2)

This works, because summing the two fields are independent. However,
in the case of "keyBy", the information is needed from both fields at
the same time to produce the key.

Best,
Gábor



2016-06-07 14:41 GMT+02:00 Al-Isawi Rami 
>:
Hi,

Is there any reason why “keyBy" accepts multi-field, while for example “sum” 
does not.

-Rami
Disclaimer: This message and any attachments thereto are intended solely for 
the addressed recipient(s) and may contain confidential information. If you are 
not the intended recipient, please notify the sender by reply e-mail and delete 
the e-mail (including any attachments thereto) without producing, distributing 
or retaining any copies thereof. Any review, dissemination or other use of, or 
taking of any action in reliance upon, this information by persons or entities 
other than the intended recipient(s) is prohibited. Thank you.

Disclaimer: This message and any attachments thereto are intended solely for 
the addressed recipient(s) and may contain confidential information. If you are 
not the intended recipient, please notify the sender by reply e-mail and delete 
the e-mail (including any attachments thereto) without producing, distributing 
or retaining any copies thereof. Any review, dissemination or other use of, or 
taking of any action in reliance upon, this information by persons or entities 
other than the intended recipient(s) is prohibited. Thank you.


Re: Weird Kryo exception (Unable to find class: java.ttil.HashSet)

2016-06-07 Thread Flavio Pompermaier
After a second look to KryoSerializer I fear that Input and Output are
never closed..am I right?

On Tue, Jun 7, 2016 at 3:06 PM, Flavio Pompermaier 
wrote:

> Hi Aljoscha,
> of course I can :)
> Thanks for helping me..do you think it is the right thing to do calling
> reset()?
> Actually, I don't know whether this is meaningful or not, but I already
> ran the job successfully once on the cluster (a second attempt is curerntly
> running) after my accidental modification to the KryoException handling in
> the KryoSerializer.deserialize()...
> My intention was to reset the input buffer calling the clear() method on
> it so I copied the line from above but I forgot to change the variable so I
> called output.clear() instead of input.reset()...
> For this reason I say that I don't know if this is meaningful or not...
>
>
> On Tue, Jun 7, 2016 at 2:50 PM, Aljoscha Krettek 
> wrote:
>
>> That's nice. Can you try it on your cluster with an added "reset" call on
>> the buffer?
>>
>> On Tue, 7 Jun 2016 at 14:35 Flavio Pompermaier 
>> wrote:
>>
>>> After "some" digging into this problem I'm quite convinced that the
>>> problem is caused by a missing reset of the buffer during the Kryo
>>> deserialization,
>>> likewise to what has been fixed by FLINK-2800 (
>>> https://github.com/apache/flink/pull/1308/files).
>>> That fix added an output.clear() in theKryoException handling in
>>> KryoSerializer.serialize() but, for the deserialization part there's no
>>> such a call for the Input/NoFetchingInput object (there's a reset() method
>>> but I don't know whether it is the right one to call..).
>>> Do you think that's reasonable?
>>> Could someone help me in writing a test to see whether this situation is
>>> correctly handled by Flink?
>>> I saw for example that in KryoGenericTypeSerializerTest there's a test
>>> to test the EOFException triggered by the deserialization but it doesn't
>>> test what happens making another call to the serializer after such
>>> Exception occurs (and thus check whether the buffers are correctly cleared
>>> or not).
>>> I'll try to start my testing part from there for the moment if anybody
>>> has no objections..
>>>
>>> Best,
>>> Flavio
>>>
>>>
>>> On Mon, Jun 6, 2016 at 4:08 PM, Ufuk Celebi  wrote:
>>>
 Unless someone really invests time into debugging this I fear that the
 different misspellings are not really helpful, Flavio.

 On Mon, Jun 6, 2016 at 10:31 AM, Flavio Pompermaier
  wrote:
 > This time I had the following exception (obviously
 > it.okkam.flinj.model.pojo.TipoSoggetto should be
 > it.okkam.flink.model.pojo.TipoSoggetto).
 >
 > java.lang.RuntimeException: Cannot instantiate class.
 >   at
 >
 org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:407)
 >   at
 >
 org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
 >   at
 >
 org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:124)
 >   at
 >
 org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65)
 >   at
 >
 org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34)
 >   at
 >
 org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)
 >   at
 >
 org.apache.flink.runtime.operators.FlatMapDriver.run(FlatMapDriver.java:101)
 >   at
 org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:480)
 >   at
 org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
 >   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
 >   at java.lang.Thread.run(Thread.java:745)
 > Caused by: java.lang.ClassNotFoundException:
 > it.okkam.flinj.model.pojo.TipoSoggetto
 >   at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
 >   at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
 >   at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
 >   at java.lang.Class.forName0(Native Method)
 >   at java.lang.Class.forName(Class.java:348)
 >   at
 >
 org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:405)
 >   ... 10 more
 >
 >
 >
 > On Wed, Jun 1, 2016 at 5:44 PM, Flavio Pompermaier <
 pomperma...@okkam.it>
 > wrote:
 >>
 >> The last week I've been able to run the job several times without any
 >> error. then I just recompiled it and the error reappered :(
 >> This time I had:
 >>
 >> java.lang.Exception: The data preparation 

Re: Multi-field "sum" function just like "keyBy"

2016-06-07 Thread Gábor Gévay
Hello,

In the case of "sum", you can just specify them one after the other, like:

stream.sum(1).sum(2)

This works, because summing the two fields are independent. However,
in the case of "keyBy", the information is needed from both fields at
the same time to produce the key.

Best,
Gábor



2016-06-07 14:41 GMT+02:00 Al-Isawi Rami :
> Hi,
>
> Is there any reason why “keyBy" accepts multi-field, while for example “sum” 
> does not.
>
> -Rami
> Disclaimer: This message and any attachments thereto are intended solely for 
> the addressed recipient(s) and may contain confidential information. If you 
> are not the intended recipient, please notify the sender by reply e-mail and 
> delete the e-mail (including any attachments thereto) without producing, 
> distributing or retaining any copies thereof. Any review, dissemination or 
> other use of, or taking of any action in reliance upon, this information by 
> persons or entities other than the intended recipient(s) is prohibited. Thank 
> you.


Re: Weird Kryo exception (Unable to find class: java.ttil.HashSet)

2016-06-07 Thread Flavio Pompermaier
Hi Aljoscha,
of course I can :)
Thanks for helping me..do you think it is the right thing to do calling
reset()?
Actually, I don't know whether this is meaningful or not, but I already ran
the job successfully once on the cluster (a second attempt is curerntly
running) after my accidental modification to the KryoException handling in
the KryoSerializer.deserialize()...
My intention was to reset the input buffer calling the clear() method on it
so I copied the line from above but I forgot to change the variable so I
called output.clear() instead of input.reset()...
For this reason I say that I don't know if this is meaningful or not...

On Tue, Jun 7, 2016 at 2:50 PM, Aljoscha Krettek 
wrote:

> That's nice. Can you try it on your cluster with an added "reset" call on
> the buffer?
>
> On Tue, 7 Jun 2016 at 14:35 Flavio Pompermaier 
> wrote:
>
>> After "some" digging into this problem I'm quite convinced that the
>> problem is caused by a missing reset of the buffer during the Kryo
>> deserialization,
>> likewise to what has been fixed by FLINK-2800 (
>> https://github.com/apache/flink/pull/1308/files).
>> That fix added an output.clear() in theKryoException handling in
>> KryoSerializer.serialize() but, for the deserialization part there's no
>> such a call for the Input/NoFetchingInput object (there's a reset() method
>> but I don't know whether it is the right one to call..).
>> Do you think that's reasonable?
>> Could someone help me in writing a test to see whether this situation is
>> correctly handled by Flink?
>> I saw for example that in KryoGenericTypeSerializerTest there's a test to
>> test the EOFException triggered by the deserialization but it doesn't test
>> what happens making another call to the serializer after such Exception
>> occurs (and thus check whether the buffers are correctly cleared or not).
>> I'll try to start my testing part from there for the moment if anybody
>> has no objections..
>>
>> Best,
>> Flavio
>>
>>
>> On Mon, Jun 6, 2016 at 4:08 PM, Ufuk Celebi  wrote:
>>
>>> Unless someone really invests time into debugging this I fear that the
>>> different misspellings are not really helpful, Flavio.
>>>
>>> On Mon, Jun 6, 2016 at 10:31 AM, Flavio Pompermaier
>>>  wrote:
>>> > This time I had the following exception (obviously
>>> > it.okkam.flinj.model.pojo.TipoSoggetto should be
>>> > it.okkam.flink.model.pojo.TipoSoggetto).
>>> >
>>> > java.lang.RuntimeException: Cannot instantiate class.
>>> >   at
>>> >
>>> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:407)
>>> >   at
>>> >
>>> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
>>> >   at
>>> >
>>> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:124)
>>> >   at
>>> >
>>> org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65)
>>> >   at
>>> >
>>> org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34)
>>> >   at
>>> >
>>> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)
>>> >   at
>>> >
>>> org.apache.flink.runtime.operators.FlatMapDriver.run(FlatMapDriver.java:101)
>>> >   at
>>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:480)
>>> >   at
>>> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
>>> >   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>> >   at java.lang.Thread.run(Thread.java:745)
>>> > Caused by: java.lang.ClassNotFoundException:
>>> > it.okkam.flinj.model.pojo.TipoSoggetto
>>> >   at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>>> >   at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>>> >   at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>>> >   at java.lang.Class.forName0(Native Method)
>>> >   at java.lang.Class.forName(Class.java:348)
>>> >   at
>>> >
>>> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:405)
>>> >   ... 10 more
>>> >
>>> >
>>> >
>>> > On Wed, Jun 1, 2016 at 5:44 PM, Flavio Pompermaier <
>>> pomperma...@okkam.it>
>>> > wrote:
>>> >>
>>> >> The last week I've been able to run the job several times without any
>>> >> error. then I just recompiled it and the error reappered :(
>>> >> This time I had:
>>> >>
>>> >> java.lang.Exception: The data preparation for task 'CHAIN CoGroup
>>> (CoGroup
>>> >> at main(DataInference.java:372)) -> Map (Map at
>>> >> writeEntitonPojos(ParquetThriftEntitons.java:170))' , caused an
>>> error: Error
>>> >> obtaining the sorted input: Thread 'SortMerger Reading Thread'
>>> terminated
>>> >> due to an exception: Serializer 

Re: Window start and end issue with TumblingProcessingTimeWindows

2016-06-07 Thread Soumya Simanta
The problem is why is the window end time in the future ? 

For example if my window size is 60 seconds and my window is being evaluated at 
3.00 pm then why is the window end time 3.01 pm and not 3.00 pm even when the 
data that is being evaluated falls in the window 2.59 - 3.00. 

Sent from my iPhone

> On Jun 7, 2016, at 3:47 PM, Chesnay Schepler  wrote:
> 
> could you state a specific problem?
> 
>> On 07.06.2016 06:40, Soumya Simanta wrote:
>> I've a simple program which takes some inputs from a command line (Socket 
>> stream) and then aggregates based on the key. 
>> 
>> When running this program on my local machine I see some output that is 
>> counter intuitive to my understanding of windows in Flink. 
>> 
>> The start time of the Window is around the time the Functions are being 
>> evaluated. However, the window end time is around 60 s (window size) after 
>> the current time (please see below).  
>> 
>> Can someone explain this behaviour please? 
>> import org.apache.flink.api.scala._
>> import org.apache.flink.streaming.api.TimeCharacteristic
>> import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
>> import 
>> org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows
>> import org.apache.flink.streaming.api.windowing.time.Time
>> import org.apache.flink.streaming.api.windowing.windows.TimeWindow
>> import org.apache.flink.util.Collector
>> 
>> case class EventAgg(start: Long, end: Long, key: String, value: Int)
>> 
>> object Processor {
>> 
>>   val window_length = 6 // milliseconds
>> 
>>   def aggregateEvents(key: String, window: TimeWindow, in: Iterable[Event], 
>> out: Collector[EventAgg]): Unit = {
>> var sum = 0
>> for (e <- in) {
>>   sum = sum + e.value
>> }
>> val start = window.getStart
>> val end = window.getEnd
>> val diff = (end - start)
>> println(s" windowId: ${window.hashCode()} currenttime: 
>> ${System.currentTimeMillis()} key:[$key] start: $start end: $end diff: 
>> $diff")
>> 
>> 
>> out.collect(
>>   new EventAgg(
>> start = window.getStart,
>> end = window.getEnd,
>> key = key,
>> value = sum
>>   )
>> )
>>   }
>> 
>>   def main(Args: Array[String]): Unit = {
>> val env = StreamExecutionEnvironment.getExecutionEnvironment
>> //env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>> env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
>> //env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
>> 
>> val sevents = env.socketTextStream("localhost", 9000)
>> sevents
>>   .map(x => parseEvent(x))
>>   .keyBy(_.key)
>>   
>> .window(TumblingProcessingTimeWindows.of(Time.milliseconds(window_length)))
>>   .apply(aggregateEvents(_, _, _, _: Collector[EventAgg]))
>>   .map("Default Assigner: " + System.currentTimeMillis().toString + " - 
>> " + _.toString)
>>   .print()
>> 
>> env.execute("Event time windows")
>>   }
>> 
>>   def parseEvent(s: String): Event = {
>> if (s == null || s.trim().length == 0)
>>   Event("default", 0, 0L)
>> else {
>>   val parts = s.split(",")
>>   Event(parts(0), parts(1).toInt, 1L)
>> }
>>   }
>> }
>> 
>> Output
>> 
>>  windowId: -663519360 currenttime: 146523427 key:[a] start: 
>> 146523420 end: 146523426 diff: 6
>>  windowId: -663519360 currenttime: 146523426 key:[b] start: 
>> 146523420 end: 146523426 diff: 6
>> 3> Default Assigner: 1465234200010 - 
>> EventAgg(146523420,146523426,a,3)
>> 7> Default Assigner: 1465234200010 - 
>> EventAgg(146523420,146523426,b,4)
>> 
>> 
> 


Re: Weird Kryo exception (Unable to find class: java.ttil.HashSet)

2016-06-07 Thread Aljoscha Krettek
That's nice. Can you try it on your cluster with an added "reset" call on
the buffer?

On Tue, 7 Jun 2016 at 14:35 Flavio Pompermaier  wrote:

> After "some" digging into this problem I'm quite convinced that the
> problem is caused by a missing reset of the buffer during the Kryo
> deserialization,
> likewise to what has been fixed by FLINK-2800 (
> https://github.com/apache/flink/pull/1308/files).
> That fix added an output.clear() in theKryoException handling in
> KryoSerializer.serialize() but, for the deserialization part there's no
> such a call for the Input/NoFetchingInput object (there's a reset() method
> but I don't know whether it is the right one to call..).
> Do you think that's reasonable?
> Could someone help me in writing a test to see whether this situation is
> correctly handled by Flink?
> I saw for example that in KryoGenericTypeSerializerTest there's a test to
> test the EOFException triggered by the deserialization but it doesn't test
> what happens making another call to the serializer after such Exception
> occurs (and thus check whether the buffers are correctly cleared or not).
> I'll try to start my testing part from there for the moment if anybody has
> no objections..
>
> Best,
> Flavio
>
>
> On Mon, Jun 6, 2016 at 4:08 PM, Ufuk Celebi  wrote:
>
>> Unless someone really invests time into debugging this I fear that the
>> different misspellings are not really helpful, Flavio.
>>
>> On Mon, Jun 6, 2016 at 10:31 AM, Flavio Pompermaier
>>  wrote:
>> > This time I had the following exception (obviously
>> > it.okkam.flinj.model.pojo.TipoSoggetto should be
>> > it.okkam.flink.model.pojo.TipoSoggetto).
>> >
>> > java.lang.RuntimeException: Cannot instantiate class.
>> >   at
>> >
>> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:407)
>> >   at
>> >
>> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
>> >   at
>> >
>> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:124)
>> >   at
>> >
>> org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65)
>> >   at
>> >
>> org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34)
>> >   at
>> >
>> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)
>> >   at
>> >
>> org.apache.flink.runtime.operators.FlatMapDriver.run(FlatMapDriver.java:101)
>> >   at
>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:480)
>> >   at
>> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
>> >   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>> >   at java.lang.Thread.run(Thread.java:745)
>> > Caused by: java.lang.ClassNotFoundException:
>> > it.okkam.flinj.model.pojo.TipoSoggetto
>> >   at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>> >   at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>> >   at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>> >   at java.lang.Class.forName0(Native Method)
>> >   at java.lang.Class.forName(Class.java:348)
>> >   at
>> >
>> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:405)
>> >   ... 10 more
>> >
>> >
>> >
>> > On Wed, Jun 1, 2016 at 5:44 PM, Flavio Pompermaier <
>> pomperma...@okkam.it>
>> > wrote:
>> >>
>> >> The last week I've been able to run the job several times without any
>> >> error. then I just recompiled it and the error reappered :(
>> >> This time I had:
>> >>
>> >> java.lang.Exception: The data preparation for task 'CHAIN CoGroup
>> (CoGroup
>> >> at main(DataInference.java:372)) -> Map (Map at
>> >> writeEntitonPojos(ParquetThriftEntitons.java:170))' , caused an error:
>> Error
>> >> obtaining the sorted input: Thread 'SortMerger Reading Thread'
>> terminated
>> >> due to an exception: Serializer consumed more bytes than the record
>> had.
>> >> This indicates broken serialization. If you are using custom
>> serialization
>> >> types (Value or Writable), check their serialization methods. If you
>> are
>> >> using a Kryo-serialized type, check the corresponding Kryo serializer.
>> >>  at
>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:456)
>> >>  at
>> >> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
>> >>  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>> >>  at java.lang.Thread.run(Thread.java:745)
>> >> Caused by: java.lang.RuntimeException: Error obtaining the sorted
>> input:
>> >> Thread 'SortMerger Reading Thread' terminated due to an exception:
>> >> Serializer consumed more bytes than the record had. This indicates
>> broken

Multi-field "sum" function just like "keyBy"

2016-06-07 Thread Al-Isawi Rami
Hi,

Is there any reason why “keyBy" accepts multi-field, while for example “sum” 
does not.

-Rami
Disclaimer: This message and any attachments thereto are intended solely for 
the addressed recipient(s) and may contain confidential information. If you are 
not the intended recipient, please notify the sender by reply e-mail and delete 
the e-mail (including any attachments thereto) without producing, distributing 
or retaining any copies thereof. Any review, dissemination or other use of, or 
taking of any action in reliance upon, this information by persons or entities 
other than the intended recipient(s) is prohibited. Thank you.


Re: Weird Kryo exception (Unable to find class: java.ttil.HashSet)

2016-06-07 Thread Flavio Pompermaier
After "some" digging into this problem I'm quite convinced that the problem
is caused by a missing reset of the buffer during the Kryo deserialization,
likewise to what has been fixed by FLINK-2800 (
https://github.com/apache/flink/pull/1308/files).
That fix added an output.clear() in theKryoException handling in
KryoSerializer.serialize() but, for the deserialization part there's no
such a call for the Input/NoFetchingInput object (there's a reset() method
but I don't know whether it is the right one to call..).
Do you think that's reasonable?
Could someone help me in writing a test to see whether this situation is
correctly handled by Flink?
I saw for example that in KryoGenericTypeSerializerTest there's a test to
test the EOFException triggered by the deserialization but it doesn't test
what happens making another call to the serializer after such Exception
occurs (and thus check whether the buffers are correctly cleared or not).
I'll try to start my testing part from there for the moment if anybody has
no objections..

Best,
Flavio

On Mon, Jun 6, 2016 at 4:08 PM, Ufuk Celebi  wrote:

> Unless someone really invests time into debugging this I fear that the
> different misspellings are not really helpful, Flavio.
>
> On Mon, Jun 6, 2016 at 10:31 AM, Flavio Pompermaier
>  wrote:
> > This time I had the following exception (obviously
> > it.okkam.flinj.model.pojo.TipoSoggetto should be
> > it.okkam.flink.model.pojo.TipoSoggetto).
> >
> > java.lang.RuntimeException: Cannot instantiate class.
> >   at
> >
> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:407)
> >   at
> >
> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
> >   at
> >
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:124)
> >   at
> >
> org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65)
> >   at
> >
> org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34)
> >   at
> >
> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)
> >   at
> >
> org.apache.flink.runtime.operators.FlatMapDriver.run(FlatMapDriver.java:101)
> >   at
> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:480)
> >   at
> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
> >   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
> >   at java.lang.Thread.run(Thread.java:745)
> > Caused by: java.lang.ClassNotFoundException:
> > it.okkam.flinj.model.pojo.TipoSoggetto
> >   at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
> >   at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> >   at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> >   at java.lang.Class.forName0(Native Method)
> >   at java.lang.Class.forName(Class.java:348)
> >   at
> >
> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:405)
> >   ... 10 more
> >
> >
> >
> > On Wed, Jun 1, 2016 at 5:44 PM, Flavio Pompermaier  >
> > wrote:
> >>
> >> The last week I've been able to run the job several times without any
> >> error. then I just recompiled it and the error reappered :(
> >> This time I had:
> >>
> >> java.lang.Exception: The data preparation for task 'CHAIN CoGroup
> (CoGroup
> >> at main(DataInference.java:372)) -> Map (Map at
> >> writeEntitonPojos(ParquetThriftEntitons.java:170))' , caused an error:
> Error
> >> obtaining the sorted input: Thread 'SortMerger Reading Thread'
> terminated
> >> due to an exception: Serializer consumed more bytes than the record had.
> >> This indicates broken serialization. If you are using custom
> serialization
> >> types (Value or Writable), check their serialization methods. If you are
> >> using a Kryo-serialized type, check the corresponding Kryo serializer.
> >>  at
> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:456)
> >>  at
> >> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
> >>  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
> >>  at java.lang.Thread.run(Thread.java:745)
> >> Caused by: java.lang.RuntimeException: Error obtaining the sorted input:
> >> Thread 'SortMerger Reading Thread' terminated due to an exception:
> >> Serializer consumed more bytes than the record had. This indicates
> broken
> >> serialization. If you are using custom serialization types (Value or
> >> Writable), check their serialization methods. If you are using a
> >> Kryo-serialized type, check the corresponding Kryo serializer.
> >>  at
> >>
> 

Re: Does Flink allows for encapsulation of transformations?

2016-06-07 Thread Greg Hogan
"The question is how to encapsulate numerous transformations into one
object or may be a function in Apache Flink Java setting."

Implement CustomUnaryOperation. This can then be applied to a DataSet by
calling `DataSet result = DataSet.runOperation(new MyOperation<>(...));`.

On Mon, Jun 6, 2016 at 3:14 PM, Ser Kho  wrote:

> The question is how to encapsulate numerous transformations into one
> object or may be a function in Apache Flink Java setting. I have tried to
> investigate this question using an example of Pi calculation (see below). I
> am wondering whether or not the suggested approach is valid from the
> Flink's point of view. It works on one computer, however, I do not know how
> it will behave in a cluster setup. The code is given below, and the main
> idea behind it as follows:
>
>1. Create a class, named classPI, which method compute() does all data
>transformations, see more about it below.
>2. In the main method create a DataSet as in *DataSet< classPI > opi =
>env.fromElements(new classPI());*
>3. Create *DataSet< Double > PI*, which equals output of
>transformation map() that calls the object PI's method compute() as in
>*DataSet< Double > PI = opi.map(new MapFunction< classPI , Double>() {
>public Double map(classPI objPI) { return objPI.compute(); }});*
>4. Now about ClassPI
>- Constructor instantiates ExecutionEnvironment, which is local for
>   this class, as in
>   *public classPI(){ this.NumIter=100; env =
>   ExecutionEnvironment.getExecutionEnvironment();}*
>
> Thus, the code has two ExecutionEnvironment objects: one in main and
> another in the class classPI.
>
>- Has method compute() that runs all data transormations (in this
>example it is just several lines but potentially it might contain tons of
>Flink transfromations)
>
> *public Double compute(){ DataSet count = env.generateSequence(1, NumIter)
>.map(new Sampler()) .reduce(new SumReducer()); PI =
>4.0*count.collect().get(0)/NumIter; return PI;}*
>
> the whole code is given below. Again, the question is if this is a valid
> approach for encapsulation of data transformation into a class in Flink
> setup that is supposed to be parallelizable to work on a cluster. Is there
> a better way to hide details of data transformations?
> Thanks a lot!
>
> -The code --
>
> public class PiEstimation{
> public static void main(String[] args) throws Exception {// this is one 
> ExecutionEnvironment
>  final ExecutionEnvironment env = 
> ExecutionEnvironment.getExecutionEnvironment();   // this is critical DataSet 
> with my classPI that computes PI
>  DataSet opi = env.fromElements(new classPI());// this map calls the 
> method compute() of class classPI that computes PI
>  DataSet PI = opi.map(new MapFunction() {
>public Double map(classPI  objPI) throws Exception {
>// this is how I call method compute() that calculates PI using 
> transformations
>return objPI.compute(); } });
>
>double pi = PI.collect().get(0);
>System.out.println("We estimate Pi to be: " + pi);   }
> // this class is of no impotance for my question, howerver, it is relevant 
> for pi calculation public static class Sampler implements MapFunction Long> {@Overridepublic Long map(Long value) {
> double x = Math.random();
> double y = Math.random();
> return (x * x + y * y) < 1 ? 1L : 0L;}}
> // this class is of no impotance for my question, howerver, it is relevant 
> for pi calculation public static final class SumReducer implements 
> ReduceFunction{
>   @Override
>   public Long reduce(Long value1, Long value2) {
>   return value1 + value2;}}
> // this is my class that computes PI, my question is whether such a class is 
> valid in Flink on  cluster with parallel computation public static final 
> class classPI{
>public Integer NumIter;
>private final ExecutionEnvironment env;
>public Double PI;
>
>// this is constructor with another ExecutionEnvironment
>public   classPI(){
>this.NumIter=100;
> env = ExecutionEnvironment.getExecutionEnvironment();
>}
>//This is the the method that contains all data transformation
>public Double compute() throws Exception{
>  DataSet count = env.generateSequence(1, NumIter)
>.map(new Sampler())
>.reduce(new SumReducer());
>  PI = 4.0*count.collect().get(0)/NumIter;
>  return  PI;}}}
>
>


Re: Does Flink allows for encapsulation of transformations?

2016-06-07 Thread Chesnay Schepler
1a. ah. yeah i see how it could work, but i wouldn't count on it in a 
cluster.
you would (most likely) run the the sub-job (calculating pi) only on a 
single node.


1b. different execution environments generally imply different flink 
programs.


2. sure it does, since it's a normal flink job. yours on the other hand 
doesn't, since the job calculating PI only runs on a single TaskManager.


3. there are 2 ways. you can either chain jobs like this: (effectively 
running 2 flink programs in succession)


|publicstaticvoidmain(String[]args)throwsException{doublepi =new 
classPI().compute();System.out.println("We estimate Pi to be: "+pi); new 
classThatNeedsPI().computeWhatever(pi); //feeds pi into an 
env.fromElements call and proceeds from there }|


or (if all building blocks are flink programs) build a single job:

|publicstaticvoidmain(String[]args)throwsException{ ExecutionEnvironment 
env = ExecutionEnvironment.getExecutionEnvironment(); DataSet pi 
=new classPI(env).compute();new 
classThatNeedsPI(env).computeWhatever(pi); //append your transformations 
to pi env.execute(); } ... ||publicDataSetcompute()throwsException{return 
this.env.generateSequence(1,NumIter).map(newSampler()).reduce(newSumReducer()) 
.map(/*return 4 * x*/);} ... public ? computeWhatever(DataSet pi) 
throws Exception { ... } |



On 07.06.2016 13:35, Ser Kho wrote:

Chesnay:
1a. The code actually works, that is the point.
1b. What restrict for a Flink program to have several execution 
environments?

2. I am not sure that your modification allows for parallelism. Does it?
3. This code is a simple example of writing/organizing large and 
complicated programs, where the result of this pi needed to be used in 
another DataSet transformations beyond classPi(). What to do in this case?

Thanks a lot for the suggestions.


On Tuesday, June 7, 2016 6:15 AM, Chesnay Schepler 
 wrote:



from what i can tell from your code you are trying to execute a job 
within a job. This just doesn't work.


your main method should look like this:

|publicstaticvoidmain(String[]args)throwsException{doublepi =new 
classPI().compute();System.out.println("We estimate Pi to be: "+pi);}|




On 06.06.2016 21:14, Ser Kho wrote:
The question is how to encapsulate numerous transformations into one 
object or may be a function in Apache Flink Java setting. I have 
tried to investigate this question using an example of Pi calculation 
(see below). I am wondering whether or not the suggested approach is 
valid from the Flink's point of view. It works on one computer, 
however, I do not know how it will behave in a cluster setup. The 
code is given below, and the main idea behind it as follows:


 1. Create a class, named classPI, which method compute() does all
data transformations, see more about it below.
 2. In the main method create a DataSet as in *DataSet< classPI > opi
= env.fromElements(new classPI());*
3.
Create *DataSet< Double > PI*, which equals output of
transformation map() that calls the object PI's method compute()
as in
*DataSet< Double > PI = opi.map(new MapFunction< classPI ,
Double>() { public Double map(classPI objPI) { return
objPI.compute(); }});*
4.
Now about ClassPI
 *
Constructor instantiates ExecutionEnvironment, which is local
for this class, as in
*public classPI(){ this.NumIter=100; env =
ExecutionEnvironment.getExecutionEnvironment();}*

Thus, the code has two ExecutionEnvironment objects: one in main and 
another in the class classPI.


 *
Has method compute() that runs all data transormations (in this
example it is just several lines but potentially it might contain
tons of Flink transfromations)
*public Double compute(){ DataSet count = env.generateSequence(1,
NumIter) .map(new Sampler()) .reduce(new SumReducer()); PI =
4.0*count.collect().get(0)/NumIter;
return PI;}*

the whole code is given below. Again, the question is if this is a 
valid approach for encapsulation of data transformation into a class 
in Flink setup that is supposed to be parallelizable to work on a 
cluster. Is there a better way to hide details of data transformations?

Thanks a lot!

-The code --

|publicclassPiEstimation{publicstaticvoidmain(String[]args)throwsException{// 
this is one ExecutionEnvironmentfinalExecutionEnvironmentenv 
=ExecutionEnvironment.getExecutionEnvironment();// this is critical 
DataSet with my classPI that computes PIDataSetopi 
=env.fromElements(newclassPI());// this map calls the method 
compute() of class classPI that computes PIDataSetPI 
=opi.map(newMapFunction(){publicDoublemap(classPI 
objPI)throwsException{// this is how I call method compute() that 
calculates PI using transformations 
returnobjPI.compute();}});doublepi 
=PI.collect().get(0);System.out.println("We estimate Pi to be: 
"+pi);}// this class is of no impotance for my question, howerver, it 
is 

Re: Kafka producer sink message loss?

2016-06-07 Thread Stephan Ewen
Hi Elias!

The concern you raised about the sink being synchronous is exactly what my
last suggestion should address:

The internal state backends can return a handle that can do the sync in a
background thread. The sink would continue processing messages, and the
checkpoint would only be acknowledged after the background sync did
complete.
We should allow user code to return such a handle as well.

We have to think about implications concerning message order, though...

Greetings,
Stephan


On Mon, Jun 6, 2016 at 11:58 PM, Elias Levy 
wrote:

> On Sun, Jun 5, 2016 at 3:16 PM, Stephan Ewen  wrote:
>
>> You raised a good point. Fortunately, there should be a simply way to fix
>> this.
>>
>> The Kafka Sunk Function should implement the "Checkpointed" interface. It
>> will get a call to the "snapshotState()" method whenever a checkpoint
>> happens. Inside that call, it should then sync on the callbacks, and only
>> return once all have completed. It may return null (no need to store
>> anything in the checkpoint).
>>
>> While the "Checkpointed" method has not returned, the checkpoint will not
>> complete. That way, there will be a "synchronization" point per checkpoint.
>>
>> We can even improve this further in the future: The checkpoint method can
>> return an async state handle. While the async state handle completes its
>> "wait for callbacks" in the background (and only acks the checkpoint after
>> that has complete), the sink function can continue processing.
>>
>> What do you think?
>>
>
> I opened FLINK-4027  to
> track the issue.
>
> That seems like an acceptable solution.  Presumably an exception can be
> raised in snapshotState() if there is a Kafka publishing error when calling
> flush() on the Kafka producer, which will cause the checkpoint to fail.
>
> I do wonder what sort of performance penalty using flush() will incur, as
> it is a synchronous call.  I assume no other messages can be processed by
> the sink while inside snapshotState().  In theory a sink could continue
> processing messages, so long as it kept track of pending messages that
> occurred before the barrier and responded to the snapshotState() call when
> there no longer were any pending messages from before the barrier.
>
>


Re: Does Flink allows for encapsulation of transformations?

2016-06-07 Thread Ser Kho
Chesnay: 
1a. The code actually works, that is the point. 1b. What restrict for a Flink 
program to have several execution environments?2. I am not sure that your 
modification allows for parallelism. Does it?3. This code is a simple example 
of writing/organizing large and complicated programs, where the result of this 
pi needed to be used in another DataSet transformations beyond classPi(). What 
to do in this case?Thanks a lot for the suggestions. 

On Tuesday, June 7, 2016 6:15 AM, Chesnay Schepler  
wrote:
 

  from what i can tell from your code you are trying to execute a job within a 
job. This just doesn't work.
 
 your main method should look like this:
 
 public static void main(String[] args) throws Exception 
{
  double pi = new classPI().compute();
   System.out.println("We estimate Pi to be: " + pi);   
} 
 
 
 On 06.06.2016 21:14, Ser Kho wrote:
  
  The question is how to encapsulate numerous transformations into one object 
or may be a function in Apache Flink Java setting. I have tried to investigate 
this question using an example of Pi calculation (see below). I am wondering 
whether or not the suggested approach is valid from the Flink's point of view. 
It works on  one computer, however, I do not know how it will behave in a 
cluster setup. The code is given below, and the main idea behind it as follows: 
   
   - Create a class, named classPI, which method compute() does all data 
transformations, see more about it below.
   - In the main method create a DataSet as in DataSet< classPI > opi = 
env.fromElements(new classPI());
   -  Create DataSet< Double > PI, which equals output of transformation map() 
that calls the object PI's method compute() as in DataSet< Double > PI = 
opi.map(new MapFunction< classPI , Double>() { public Double map(classPI objPI) 
{ return objPI.compute(); }}); 
   -  Now about ClassPI   
  -  Constructor instantiates ExecutionEnvironment, which is local for this 
class, as in public classPI(){ this.NumIter=100; env = 
ExecutionEnvironment.getExecutionEnvironment();} 
 
 Thus, the code has two ExecutionEnvironment objects: one in main and another 
in the class classPI.
   -  Has method compute() that runs all data transormations (in this example 
it is just several lines but potentially it might contain tons of Flink 
transfromations) public Double compute(){ DataSet count = 
env.generateSequence(1, NumIter) .map(new Sampler()) .reduce(new SumReducer()); 
PI = 4.0*count.collect().get(0)/NumIter;   
 return PI;} 
 the whole code is given below. Again, the question is if this is a valid 
approach for encapsulation of data transformation into a class in Flink setup 
that is supposed to be parallelizable to work on a cluster. Is there a better 
way to hide details of data transformations? Thanks a lot! 
  -The code -- 
  public class PiEstimation{

public static void main(String[] args) throws Exception 
{
// this is one ExecutionEnvironment
 final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();   
// this is critical DataSet with my classPI that computes PI
 DataSet opi = env.fromElements(new classPI());
// this map calls the method compute() of class classPI that computes PI
 DataSet PI = opi.map(new MapFunction() 
{
   public Double map(classPI  objPI) throws Exception { 
   // this is how I call method compute() that calculates PI using 
transformations  
   return objPI.compute(); } });

   double pi = PI.collect().get(0);
   System.out.println("We estimate Pi to be: " + pi);   
}

// this class is of no impotance for my question, howerver, it is relevant for 
pi calculation 
public static class Sampler implements MapFunction {
@Override
public Long map(Long value) {
double x = Math.random();
double y = Math.random();
return (x * x + y * y) < 1 ? 1L : 0L;}}

// this class is of no impotance for my question, howerver, it is relevant for 
pi calculation 
public static final class SumReducer implements ReduceFunction{
  @Override
  public Long reduce(Long value1, Long value2) {
  return value1 + value2;}}

// this is my class that computes PI, my question is whether such a class is 
valid in Flink on  cluster with parallel computation 
public static final class classPI
{
   public Integer NumIter;
   private final ExecutionEnvironment env;
   public Double PI;

   // this is constructor with another ExecutionEnvironment
   public   classPI(){
   this.NumIter=100;
env = ExecutionEnvironment.getExecutionEnvironment();
   }
   //This is the the method that contains all data transformation
   public Double compute() throws Exception{
 DataSet count = env.generateSequence(1, NumIter
)
   .map(new Sampler())
   .reduce(new SumReducer());
 PI = 4.0*count.collect().get(0)/NumIter;  
 
 

Re: Window start and end issue with TumblingProcessingTimeWindows

2016-06-07 Thread Chesnay Schepler

could you state a specific problem?

On 07.06.2016 06:40, Soumya Simanta wrote:
I've a simple program which takes some inputs from a command line 
(Socket stream) and then aggregates based on the key.


When running this program on my local machine I see some output that 
is counter intuitive to my understanding of windows in Flink.


The start time of the Window is around the time the Functions are 
being evaluated. However, *the window end time is around 60 s (window 
size) after the current time (please see below). *


Can someone explain this behaviour please?
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import 
org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector

case class EventAgg(start: Long, end: Long, key:String, value: Int)

object Processor {

   val window_length =6 // milliseconds def aggregateEvents(key:String, 
window: TimeWindow, in:Iterable[Event], out: Collector[EventAgg]): Unit = {
 var sum =0 for (e <- in) {
   sum = sum + e.value
 }
 val start = window.getStart
 val end = window.getEnd
 val diff = (end - start)
 println(s" windowId: ${window.hashCode()}currenttime: 
${System.currentTimeMillis()}key:[$key] start: $startend: $enddiff: $diff")


 out.collect(
   new EventAgg(
 start = window.getStart,
 end = window.getEnd,
 key = key,
 value = sum
   )
 )
   }

   def main(Args: Array[String]): Unit = {
 val env = StreamExecutionEnvironment.getExecutionEnvironment 
//env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)

 //env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime) val sevents = 
env.socketTextStream("localhost",9000)
 sevents
   .map(x =>parseEvent(x))
   .keyBy(_.key)
   
.window(TumblingProcessingTimeWindows.of(Time.milliseconds(window_length)))
   .apply(aggregateEvents(_, _, _, _: Collector[EventAgg]))
   .map("Default Assigner: " + System.currentTimeMillis().toString +" - " + 
_.toString)
   .print()

 env.execute("Event time windows")
   }

   def parseEvent(s:String): Event = {
 if (s ==null || s.trim().length ==0)
   Event("default",0,0L)
 else {
   val parts = s.split(",")
   Event(parts(0), parts(1).toInt,1L)
 }
   }
}

*_Output_*

 windowId: -663519360 currenttime: 146523427 key:[a] start: 
146523420 end: 146523426 diff: 6
 windowId: -663519360 currenttime: 146523426 key:[b] start: 
146523420 end: 146523426 diff: 6
3> Default Assigner: 1465234200010 - 
EventAgg(146523420,146523426,a,3)
7> Default Assigner: 1465234200010 - 
EventAgg(146523420,146523426,b,4)







Re: Does Flink allows for encapsulation of transformations?

2016-06-07 Thread Chesnay Schepler
from what i can tell from your code you are trying to execute a job 
within a job. This just doesn't work.


your main method should look like this:

|publicstaticvoidmain(String[]args)throwsException{doublepi =new 
classPI().compute();System.out.println("We estimate Pi to be: "+pi);}|





On 06.06.2016 21:14, Ser Kho wrote:
The question is how to encapsulate numerous transformations into one 
object or may be a function in Apache Flink Java setting. I have tried 
to investigate this question using an example of Pi calculation (see 
below). I am wondering whether or not the suggested approach is valid 
from the Flink's point of view. It works on one computer, however, I 
do not know how it will behave in a cluster setup. The code is given 
below, and the main idea behind it as follows:


 1. Create a class, named classPI, which method compute() does all
data transformations, see more about it below.
 2. In the main method create a DataSet as in *DataSet< classPI > opi
= env.fromElements(new classPI());*
3.
Create *DataSet< Double > PI*, which equals output of
transformation map() that calls the object PI's method compute() as in
*DataSet< Double > PI = opi.map(new MapFunction< classPI ,
Double>() { public Double map(classPI objPI) { return
objPI.compute(); }});*
4.
Now about ClassPI
 *
Constructor instantiates ExecutionEnvironment, which is local
for this class, as in
*public classPI(){ this.NumIter=100; env =
ExecutionEnvironment.getExecutionEnvironment();}*

Thus, the code has two ExecutionEnvironment objects: one in main and 
another in the class classPI.


 *
Has method compute() that runs all data transormations (in this
example it is just several lines but potentially it might contain
tons of Flink transfromations)
*public Double compute(){ DataSet count = env.generateSequence(1,
NumIter) .map(new Sampler()) .reduce(new SumReducer()); PI =
4.0*count.collect().get(0)/NumIter;
return PI;}*

the whole code is given below. Again, the question is if this is a 
valid approach for encapsulation of data transformation into a class 
in Flink setup that is supposed to be parallelizable to work on a 
cluster. Is there a better way to hide details of data transformations?

Thanks a lot!

-The code --

|publicclassPiEstimation{publicstaticvoidmain(String[]args)throwsException{// 
this is one ExecutionEnvironmentfinalExecutionEnvironmentenv 
=ExecutionEnvironment.getExecutionEnvironment();// this is critical 
DataSet with my classPI that computes PIDataSetopi 
=env.fromElements(newclassPI());// this map calls the method compute() 
of class classPI that computes PIDataSetPI 
=opi.map(newMapFunction(){publicDoublemap(classPI 
objPI)throwsException{// this is how I call method compute() that 
calculates PI using transformations returnobjPI.compute();}});doublepi 
=PI.collect().get(0);System.out.println("We estimate Pi to be: 
"+pi);}// this class is of no impotance for my question, howerver, it 
is relevant for pi calculation 
publicstaticclassSamplerimplementsMapFunction{@OverridepublicLongmap(Longvalue){doublex 
=Math.random();doubley =Math.random();return(x *x +y *y)<1?1L:0L;}}// 
this class is of no impotance for my question, howerver, it is 
relevant for pi calculation 
publicstaticfinalclassSumReducerimplementsReduceFunction{@OverridepublicLongreduce(Longvalue1,Longvalue2){returnvalue1 
+value2;}}// this is my class that computes PI, my question is whether 
such a class is valid in Flink on cluster with parallel computation 
publicstaticfinalclassclassPI 
{publicIntegerNumIter;privatefinalExecutionEnvironmentenv;publicDoublePI;// 
this is constructor with another 
ExecutionEnvironmentpublicclassPI(){this.NumIter=100;env 
=ExecutionEnvironment.getExecutionEnvironment();}//This is the the 
method that contains all data 
transformationpublicDoublecompute()throwsException{DataSetcount 
=env.generateSequence(1,NumIter).map(newSampler()).reduce(newSumReducer());PI 
=4.0*count.collect().get(0)/NumIter;returnPI;}}}|




Re: Logs show `Marking the coordinator 2147483637 dead` in Flink-Kafka conn

2016-06-07 Thread Sendoh
Hi Robert,

Thank you for checking the issue. That INFO is the only information Flink
workers say. 

I agree your point of view. Looks like it closes the connections to all
other topics which is not used(idle) although it's a bit misleading.

Ref: https://github.com/edenhill/librdkafka/issues/437

Best,

Sendoh




--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Logs-show-Marking-the-coordinator-2147483637-dead-in-Flink-Kafka-conn-tp7396p7420.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Periodically evicting internal states when using mapWithState()

2016-06-07 Thread Aljoscha Krettek
Hi Jack,
right now this is not possible except when writing a custom operator. We
are working on support for a time-to-live setting on states, this should
solve your problem.

For writing a custom operator, check out DataStream.transform() and
StreamMap, which is the operator implementation for Map. Please let me know
if you have any further questions.

Best,
Aljoscha

On Tue, 7 Jun 2016 at 03:05 Jack Huang  wrote:

> Hi all,
>
> I have an incoming stream of event objects, each with its session ID. I am
> writing a task that aggregate the events by session. The general logics
> looks like
>
> case class Event(sessionId:Int, data:String)case class Session(id:Int, var 
> events:List[Event])
> val events = ... //some source
> events.
> .keyBy((event:Event) => event.sessionId)
> .mapWithState((event:Event, state:Option[Session]) => {
> val session = state.getOrElse(Session(id=event.session_id, events=List()))
> session.event = session.event :+ event
> (session, Some(session))
> })
>
> The problem is that there is no reliable way of knowing the end of a
> session, since events are likely to get lost. If I keep this process
> running, the number of stored sessions will keep growing until it fills up
> the disk.
>
> Is there a recommended way of periodically evicting sessions that are too
> old (e.g. a day old)?
>
>
>
> Thanks,
> Jack
>


Re: Custom keyBy(), look for similaties

2016-06-07 Thread iñaki williams
Thanks for your answer Ufuk.

However, I have been reading about KeySelector and I don't understand
completely how it works with my idea.

I am using an algorithm that gives me an score between some different
strings. My idea is: if the score is higher than 0'80 for example, then
those two strings will be consider the same and when I apply the
keyby("name") those similar string will be keyed as they have the exact
same name.

El lunes, 6 de junio de 2016, Ufuk Celebi  escribió:

> Hey Iñaki,
>
> you can use the KeySelector as described here:
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/common/index.html#specifying-keys
>
> But you only a local view for the current element, e.g. the library
> you use to determine the similarity has to know the similarities
> upfront.
>
> – Ufuk
>
>
> On Mon, Jun 6, 2016 at 9:31 AM, iñaki williams  > wrote:
> > Hi guys,
> >
> > I am using Flink on my project and I have a question. (I am using Java)
> >
> > Is it possible to modify the keyby method in order to key by similarities
> > and not by the exact name?
> >
> > Example: I recieve 2 DataStreams, in the first one , the name of the
> field
> > that I want to KeyBy is "John Locke", while in the Datastream the field
> > value is "John L". Can I use some java library to find for similarities
> > between strings and if the similitude is high, then key those elements
> > together.
>