Re: load + update global state

2017-08-07 Thread Tzu-Li (Gordon) Tai
Hi Peter!

One thing I’d like to understand first after reading about your use case:
Why exactly do you need the lookup table to be globally accessible? From what I 
understand, you are using this lookup table for stream event enriching, so 
whatever processing you need to perform downstream on this enriched stream, you 
would already have the corresponding information for each session attached.

Regarding a solution for efficient stream enriching in your case:
In your case, the enrichment data comes from the input events itself, so it can 
be fairly straightforward: use a MapFunction that keeps the lookup table as 
managed keyed state [1].
By using RocksDB as your state backend [2], the table would not be backed by 
memory and therefore your state size is only bounded by disk size. Each state 
access would be bound to the current processed key (i.e., in your case session 
id, meaning that you’d only be accessing the emails set of that session).
Using RocksDB as your state backend, each state access and update would require 
de-/serialization (of the state of a single key), but that would always be 
local access and in general would outperform remotely looking up an external 
store.

So, to wrap this up, the answers to your doubts, when using Flink, would be:

(1) load the state as a whole from the data store into memory is a huge burn of 
memory (also making changes cluster-wide visible is an issue) 
Apart from the “cluster-wide visibility” aspect which needs to be clarified, 
you can use RocksDB as the state backend to back the state and not keep the 
state in memory.

(2) not loading into memory but using something like cassandra / redis as a 
lookup store would certainly work but introduces a lot of network requests 
(possible ideas: use a distributed cache? broadcast updates in flink cluster?) 
Remote lookup is not required, if you keep the lookup store as managed keyed 
state in Flink. All session lookup would be local state access. You can think 
of it as you’re basically setting up a K-V store within Flink that is always 
co-partitioned by session id with your incoming events.

(3) how should I integrate the changes to the table with flink's checkpointing? 
Simply by registering managed keyed state. Flink will handle checkpointing that 
for fault tolerance for you, and ensuring exactly-once. The “Working with 
State" docs hopefully should cover that quite well!


Hope this helps :)

Cheers,
Gordon

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/state.html#using-managed-keyed-state
[2] 
https://ci.apache.org/projects/flink/flink-docs-release-1.3/ops/state_backends.html#the-rocksdbstatebackend


On 8 August 2017 at 3:00:57 AM, Peter Ertl (peter.e...@gmx.net) wrote:

Hi folks,  

I am coding a streaming task that processes http requests from our web site and 
enriches these with additional information.  

It contains session ids from historic requests and the related emails that were 
used within these session in the past.  


lookup - hashtable: session_id: String => emails: Set[String]  


During processing of these NEW http request  

- the lookup table should be used to get previous emails and enrich the current 
stream item  
- new candidates for the lookup table will be discovered during processing of 
these items and should be added to the lookup table (also these changes should 
be visible through the cluster)  

I see at least the following issues:  

(1) load the state as a whole from the data store into memory is a huge burn of 
memory (also making changes cluster-wide visible is an issue)  

(2) not loading into memory but using something like cassandra / redis as a 
lookup store would certainly work but introduces a lot of network requests 
(possible ideas: use a distributed cache? broadcast updates in flink cluster?)  

(3) how should I integrate the changes to the table with flink's checkpointing? 
 

I really don't get how to solve this best and my current solution is far from 
elegant  

So is there any best practice for supporting "large lookup tables that change 
during stream processing" ?  

Cheers  
Peter  






Re: Flink streaming Parallelism

2017-08-07 Thread Tzu-Li (Gordon) Tai
Hi,

The equivalent would be setting a parallelism on your sink operator. e.g. 
stream.addSink(…).setParallelism(…).
By default the parallelism of all operators in the pipeline will be whatever 
parallelism was set for the whole job, unless parallelism is explicitly set for 
a specific operator. For more details on the distributed runtime concepts you 
can take a look at [1]

        I saw the implementation of elasticsearch sink in Flink which can do 
batching of messsges before writes. How can I batch data based on a custom 
logic? For eg: batch writes  grouped on one of the message keys.  This is 
possible in Storm via FieldGrouping.
The equivalent of partitioning streams in Flink is `stream.keyBy(…)`. All 
messages of the same key would then go to the same parallel downstream operator 
instance. If its an ElasticsearchSink, then following a keyBy all messages of 
the same key will be batched by the same ElasticSearch writer.

Hope this helps!

Cheers,
Gordon

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.3/concepts/runtime.html


On 8 August 2017 at 8:58:30 AM, Jakes John (jakesjohn12...@gmail.com) wrote:

       I am coming from Apache Storm world.  I am planning to switch from storm 
to flink. I was reading Flink documentation but, I couldn't find some 
requirements in Flink which was present in Storm.  

I need to have a streaming pipeline  Kafka->flink-> ElasticSearch.  In storm,  
I have seen that I can specify number of tasks per bolt.  Typically databases 
are slow in writes and hence I need more writers to the database.  Reading from 
kafka is pretty fast when compared to ES writes.  This means that I need to 
have more ES writer tasks than Kafka consumers. How can I achieve it in Flink?  
What are the concepts in Flink similar to Storm Parallelism concepts like 
workers, executors, tasks?
        I saw the implementation of elasticsearch sink in Flink which can do 
batching of messsges before writes. How can I batch data based on a custom 
logic? For eg: batch writes  grouped on one of the message keys.  This is 
possible in Storm via FieldGrouping. But I couldn't find an equivalent way to 
do grouping in Flink and control the overall number of writes to ES.

Please help me with above questions and some pointers to flink parallelism. 





Flink streaming Parallelism

2017-08-07 Thread Jakes John
   I am coming from Apache Storm world.  I am planning to switch from
storm to flink. I was reading Flink documentation but, I couldn't find some
requirements in Flink which was present in Storm.

I need to have a streaming pipeline  Kafka->flink-> ElasticSearch.  In
storm,  I have seen that I can specify number of tasks per bolt.  Typically
databases are slow in writes and hence I need more writers to the
database.  Reading from kafka is pretty fast when compared to ES writes.
This means that I need to have more ES writer tasks than Kafka consumers.
How can I achieve it in Flink?  What are the concepts in Flink similar to
Storm Parallelism concepts like workers, executors, tasks?
I saw the implementation of elasticsearch sink in Flink which can
do batching of messsges before writes. How can I batch data based on a
custom logic? For eg: batch writes  grouped on one of the message keys.
This is possible in Storm via FieldGrouping. But I couldn't find an
equivalent way to do grouping in Flink and control the overall number of
writes to ES.

Please help me with above questions and some pointers to flink parallelism.


Re: Experiencing long latency while using sockets

2017-08-07 Thread Chao Wang
Following the original post, I've tried stripping down my Flink app to 
only the following, and then it still exhibits long latencies: after the 
second source socket write, it took 90+ milliseconds from data source to 
the socket-front in Flink. I would like to ask for pointers about how to 
investigate the latency issue like this, and in general how to properly 
benchmark Flink latencies. Thank you very much!



The main method:


  public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
DataStream inEventGroupStream = env.addSource(new 
SocketEventGroupStreamFunction(6065, 512));
inEventGroupStream.writeToSocket("DestHost", 6066, new 
MySeGroup());

env.execute("event processing");
 }


where all the custom classes are as follows (for 
serialization/deserialization and socket server functionality):



  public static class MySeGroup implements 
SerializationSchema {


@Override
public byte[] serialize(EventGroup arg0) {
  int tLength = EKFFFTAES.getSizeTimepoint();
  //Note: report error if tLength != arg0.getT().length
  if (tLength != arg0.getT().length) {
System.out.println ("Serialization error: Timepoint size 
discrepancy.");

System.out.println ("tLength = " + tLength);
System.out.println ("arg0.getT().length = " + arg0.getT().length);
  }
  byte[] buffer = new byte[1 + arg0.getT().length + 
arg0.getP().length];

  buffer[0] = arg0.type;
  System.arraycopy(arg0.getT(), 0, buffer, 1, tLength);
  System.arraycopy(arg0.getP(), 0, buffer, 1 + tLength, 
arg0.getP().length);

  return buffer;
}
  }

  public static class Event extends SimpleImmutableEntry {

Event(byte[] timestamp, byte[] payload){
  super(timestamp, payload);
}
public byte[] getT() { // get the timestamp
  return getKey();
}
public byte[] getP() { // get the payload
  return getValue();
}
  }

  public static class EventGroup extends Event {
public byte type;
EventGroup(byte type, byte[] timestamp, byte[] payload){
  super(timestamp, payload);
  this.type = type;
}
  }


  public static class SocketEventGroupStreamFunction implements 
SourceFunction {


private transient ServerSocket serverSocket;
private int serverPort;
private int dataLength;
private byte[] inbuf;
private byte[] timestamp;
private byte[] payload;
private int tLength = EKFFFTAES.getSizeTimepoint();
private volatile boolean isRunning = true;

public SocketEventGroupStreamFunction(int port, int length) {
  serverPort = port;
  dataLength = length;
  inbuf = new byte[1 + dataLength + tLength];
  timestamp = new byte[tLength];
  payload = new byte[dataLength];
}

@Override
public void run(SourceContext ctx) throws Exception {
  while(isRunning) {
serverSocket = new ServerSocket(serverPort, 100, 
InetAddress.getByName("192.168.1.13"));

serverSocket.setSoTimeout(100);
System.out.println("Waiting for incoming connections on port " +
  serverSocket.getLocalPort() + "...");
Socket server = serverSocket.accept();

System.out.println("Just connected to " + 
server.getRemoteSocketAddress());

DataInputStream in = new DataInputStream(server.getInputStream());

while(isRunning) {
  in.readFully(inbuf, 0, inbuf.length);
  System.arraycopy(inbuf, 1, timestamp, 0, tLength);
  System.arraycopy(inbuf, 1+tLength, payload, 0, dataLength);

  System.out.print("Got an event " + inbuf[0] + ": ");
  displayElapsedTime(timestamp);

  ctx.collect(new EventGroup(inbuf[0], timestamp, payload));
}
  }
}

@Override
public void cancel() {
  isRunning = false;
  ServerSocket theSocket = this.serverSocket;
  if (theSocket != null) {
try {
  theSocket.close();
}catch(SocketTimeoutException s) {
  System.out.println("Socket timed out!");
}catch(IOException e) {
  e.printStackTrace();
}
  }
}
  }


and finally, EKFFFTAES is my cpp library implementing the timestamping 
facility:



int timePointLength = sizeof(std::chrono::system_clock::time_point);

JNIEXPORT jint JNICALL Java_eventProcessing_EKFFFTAES_getSizeTimepoint
  (JNIEnv *, jclass)
{
  return ::timePointLength;
}

JNIEXPORT void JNICALL Java_eventProcessing_EKFFFTAES_displayElapsedTime
  (JNIEnv *env, jclass, jbyteArray inArray)
{
  std::chrono::system_clock::time_point end =
std::chrono::system_clock::now();
  jbyte *inCArray = env->GetByteArrayElements(inArray, NULL);
  std::chrono::system_clock::time_point start;
  std::memcpy (, inCArray, ::timePointLength);
  std::cout << 
std::chrono::duration_cast(end - 
start).count() << std::endl;

}


Thank you,

Chao

On 08/07/2017 03:20 PM, Chao Wang wrote:

Hi,

Experiencing long latency while using sockets

2017-08-07 Thread Chao Wang

Hi,

I have been trying to benchmark the end-to-end latency of a Flink 1.3.1 
application, but got confused regarding the amount of time spent in 
Flink. In my setting, data source and data sink dwell in separated 
machines, like the following topology:


Machine 1Machine 2  
Machine 3
data source (via a socket client)   ->  Flink ->data sink (via a 
socket server)


I observed 200-400 milliseconds end-to-end latency, while the execution 
time of my stream transformations took no more than two milliseconds, 
and the socket-only networking latency between machines is no more than 
one millisecond, and I used ptpd so that the clock offset between 
machines were also no more than one millisecond.


Question: What took those hundreds of milliseconds?

Here are the details of my setting and my observation so far:

On Machine 2, I implemented a socket server as a data source to Flink 
(by implementing SourceFunction), and I splited the incoming stream into 
several streams (by SplitStream) for some transformations (implementing 
MapFuction and CoFlatMapFunction), where the results were fed to socket 
(using writeToSocket). I used c++11's chrono time library (through JNI) 
to take timestamps and determine the elapsed time, and I have verified 
that the overhead of timestamping this way is no more than one millisecond.


I observed that for the four consecutive writes from Machine 1, with the 
time between two writes no more than 0.3 milliseconds, on Machine 2 
Flink got the first write in 0.2 milliseconds, but then it took 90 
milliseconds for Flink to get the next write, and another 4 milliseconds 
for the third write, and yet another 4 milliseconds for the fourth write.


And then it took more than 70 milliseconds before Flink started 
processing my plan's first stream transformation. And after my last 
transformation, it took more than 70 milliseconds before the result was 
received at Machine 3.



Thank you,

Chao




load + update global state

2017-08-07 Thread Peter Ertl
Hi folks,

I am coding a streaming task that processes http requests from our web site and 
enriches these with additional information.

It contains session ids from historic requests and the related emails that were 
used within these session in the past.


lookup - hashtable: session_id: String => emails: Set[String]


During processing of these NEW http request

- the lookup table should be used to get previous emails and enrich the current 
stream item
- new candidates for the lookup table will be discovered during processing of 
these items and should be added to the lookup table (also these changes should 
be visible through the cluster)

I see at least the following issues:

(1) load the state as a whole from the data store into memory is a huge burn of 
memory (also making changes cluster-wide visible is an issue)

(2) not loading into memory but using something like cassandra / redis as a 
lookup store would certainly work but introduces a lot of network requests 
(possible ideas: use a distributed cache? broadcast updates in flink cluster?)

(3) how should I integrate the changes to the table with flink's checkpointing?

I really don't get how to solve this best and my current solution is far from 
elegant 

So is there any best practice for supporting "large lookup tables that change 
during stream processing" ?

Cheers
Peter






Re: [EXTERNAL] Re: schema to just read as "byte[] array" from kafka

2017-08-07 Thread Raja . Aravapalli

Thank you very much Chao. That helps me.


Regards,
Raja.

From: Chao Wang 
Date: Monday, August 7, 2017 at 12:28 PM
To: Raja Aravapalli 
Cc: "user@flink.apache.org" 
Subject: [EXTERNAL] Re: schema to just read as "byte[] array" from kafka


A quick update, in class MyDe:
public static class MyDe extends AbstractDeserializationSchema {
  @Override
  public byte[] deserialize(byte[] arg0) {
// Perform deserialization here, if needed;
// otherwise, probably we can simply return arg0 as raw byte[]
return arg0;
  }
}


Chao
On 08/07/2017 12:23 PM, Chao Wang wrote:

Hi Raja,

I just happened to work on the similar thing, and here is how to do it in 
general, I think (In my case, I did a bit more, to deserialize a tuple of 
) :
FlinkKafkaConsumer010 consumer = new 
FlinkKafkaConsumer010<>("topic_name", new MyDe(), properties);

and for MyDe the schema:

public static class MyDe extends AbstractDeserializationSchema {
  @Override
  public byte[] deserialize(byte[] arg0) {
return new e;
  }
}


Chao
On 08/07/2017 10:47 AM, Raja.Aravapalli wrote:

Hi

I am using SimpleStringSchema to deserialize a message read from kafka, but 
need some help to know if there is any schema available I can use rather than 
“SimpleStringSchema()” and instead just get “byte[]” without any 
deserialization happening!

Below is code I am currently using, but instead of SimpleStringSchema() which 
is giving me Strings, but I want the a raw byte array Byte[]:

FlinkKafkaConsumer08 myConsumer = new 
FlinkKafkaConsumer08<>("xxx_topic", new SimpleStringSchema(), properties);


Thanks a lot.


Regards,
Raja.





Re: schema to just read as "byte[] array" from kafka

2017-08-07 Thread Chao Wang

A quick update, in class MyDe:

public static class MyDe extends AbstractDeserializationSchema {
  @Override
  public byte[] deserialize(byte[] arg0) {
// Perform deserialization here, if needed;
// otherwise, probably we can simply return arg0 as raw byte[]
return arg0;
  }
}


Chao

On 08/07/2017 12:23 PM, Chao Wang wrote:


Hi Raja,

I just happened to work on the similar thing, and here is how to do it 
in general, I think (In my case, I did a bit more, to deserialize a 
tuple of ) :


FlinkKafkaConsumer010 consumer = new 
FlinkKafkaConsumer010<>("topic_name", new MyDe(), properties);


and for MyDe the schema:

public static class MyDe extends 
AbstractDeserializationSchema {

  @Override
  public byte[] deserialize(byte[] arg0) {
return new e;
  }
}


Chao

On 08/07/2017 10:47 AM, Raja.Aravapalli wrote:


Hi

I am using /SimpleStringSchema/ to deserialize a message read from 
kafka, but need some help to know if there is any schema available I 
can use rather than “SimpleStringSchema()” and instead just get 
“byte[]” without any deserialization happening!


Below is code I am currently using, but instead of 
SimpleStringSchema() which is giving me Strings, but I want the a raw 
byte array Byte[]:


FlinkKafkaConsumer08 myConsumer = *new 
*FlinkKafkaConsumer08<>(*"xxx_topic"*, *new */SimpleStringSchema(),/ 
properties);


Thanks a lot.

Regards,

Raja.







Re: schema to just read as "byte[] array" from kafka

2017-08-07 Thread Chao Wang

Hi Raja,

I just happened to work on the similar thing, and here is how to do it 
in general, I think (In my case, I did a bit more, to deserialize a 
tuple of ) :


FlinkKafkaConsumer010 consumer = new 
FlinkKafkaConsumer010<>("topic_name", new MyDe(), properties);


and for MyDe the schema:

public static class MyDe extends AbstractDeserializationSchema {
  @Override
  public byte[] deserialize(byte[] arg0) {
return new e;
  }
}


Chao

On 08/07/2017 10:47 AM, Raja.Aravapalli wrote:


Hi

I am using /SimpleStringSchema/ to deserialize a message read from 
kafka, but need some help to know if there is any schema available I 
can use rather than “SimpleStringSchema()” and instead just get 
“byte[]” without any deserialization happening!


Below is code I am currently using, but instead of 
SimpleStringSchema() which is giving me Strings, but I want the a raw 
byte array Byte[]:


FlinkKafkaConsumer08 myConsumer = *new 
*FlinkKafkaConsumer08<>(*"xxx_topic"*, *new */SimpleStringSchema(),/ 
properties);


Thanks a lot.

Regards,

Raja.





Re: WaterMark & Eventwindow not fired correctly

2017-08-07 Thread aitozi

Hi,

my flink version is 1.2

i am work on this problem these days. Below is my found.

when i use "assignTimestampsAndWatermark" with same parallelism as 240 as
the before operator, the before operator has two input(it is a "connected"
Co-FlatMap operator with parallelism 240), it runs into that the watermark
didn't update.

the i look into the source code, that the
StreamTwoInputProcessor.java#processInput called by TwoInputStreamTask has
method with processElement1() and processElement2() method, but all of them
do not run processElement in StreamInputProcessor to extractTimestamp(shown
in TimestampsAndPeriodicWatermarksOperator)

so that, the timestamp is not update, and my waterMark is update just like
the class BoundedOutOfOrdernessTimestampExtractor .

So, is it a bug that the timestamp is not update when deal with a two input
stream.

Ps: my English is not very good , i dont know can you understand me :)

thanks,
aitozi



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/WaterMark-Eventwindow-not-fired-correctly-tp14668p14727.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Flink REST API async?

2017-08-07 Thread Eron Wright
When you submit a program via the REST API, the main method executes inside
the JobManager process.Unfortunately a static variable is used to
establish the execution environment that the program obtains from
`ExecutionEnvironment.getExecutionEnvironment()`.  From the stack trace it
appears that two main methods are executing simultaneously and one is
corrupting the other.

On Mon, Aug 7, 2017 at 8:21 AM, Francisco Gonzalez Barea <
francisco.gonza...@piksel.com> wrote:

> Hi there!
>
> We are doing some POCs submitting jobs remotely to Flink. We tried with
> Flink CLI and now we´re testing the Rest API.
>
> So the point is that when we try to execute a set of requests in an async
> way (using CompletableFutures) only a couple of them run successfully. For
> the rest we get the exception copied at the end of the email.
>
> Do you know the reason for this?
>
> Thanks in advance!!
> Regards,
>
> org.apache.flink.client.program.ProgramInvocationException: The main
> method caused an error.
>   at org.apache.flink.client.program.PackagedProgram.callMainMethod(
> PackagedProgram.java:545)
>   at org.apache.flink.client.program.PackagedProgram.
> invokeInteractiveModeForExecution(PackagedProgram.java:419)
>   at org.apache.flink.client.program.OptimizerPlanEnvironment.
> getOptimizedPlan(OptimizerPlanEnvironment.java:80)
>  at org.apache.flink.client.program.ClusterClient.getOptimizedPlan(
> ClusterClient.java:318)
>  at org.apache.flink.runtime.webmonitor.handlers.JarActionHandler.
> getJobGraphAndClassLoader(JarActionHandler.java:72)
>  at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.
> handleJsonRequest(JarRunHandler.java:61)
>  at org.apache.flink.runtime.webmonitor.handlers.
> AbstractJsonRequestHandler.handleRequest(AbstractJsonRequestHandler.
> java:41)
>  at org.apache.flink.runtime.webmonitor.RuntimeMonitorHandler.
> respondAsLeader(RuntimeMonitorHandler.java:109)
>  at org.apache.flink.runtime.webmonitor.RuntimeMonitorHandlerBase.
> channelRead0(RuntimeMonitorHandlerBase.java:97)
>  at org.apache.flink.runtime.webmonitor.RuntimeMonitorHandlerBase.
> channelRead0(RuntimeMonitorHandlerBase.java:44)
>  at io.netty.channel.SimpleChannelInboundHandler.channelRead(
> SimpleChannelInboundHandler.java:105)
>  at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(
> AbstractChannelHandlerContext.java:339)
>  at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(
> AbstractChannelHandlerContext.java:324)
>  at io.netty.handler.codec.http.router.Handler.routed(Handler.java:62)
>  at io.netty.handler.codec.http.router.DualAbstractHandler.channelRead0(
> DualAbstractHandler.java:57)
>  at io.netty.handler.codec.http.router.DualAbstractHandler.channelRead0(
> DualAbstractHandler.java:20)
>  at io.netty.channel.SimpleChannelInboundHandler.channelRead(
> SimpleChannelInboundHandler.java:105)
>  at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(
> AbstractChannelHandlerContext.java:339)
>  at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(
> AbstractChannelHandlerContext.java:324)
>  at org.apache.flink.runtime.webmonitor.HttpRequestHandler.channelRead0(
> HttpRequestHandler.java:159)
>  at org.apache.flink.runtime.webmonitor.HttpRequestHandler.channelRead0(
> HttpRequestHandler.java:65)
>  at io.netty.channel.SimpleChannelInboundHandler.channelRead(
> SimpleChannelInboundHandler.java:105)
>  at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(
> AbstractChannelHandlerContext.java:339)
>  at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(
> AbstractChannelHandlerContext.java:324)
>  at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(
> ChannelInboundHandlerAdapter.java:86)
>  at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(
> AbstractChannelHandlerContext.java:339)
>  at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(
> AbstractChannelHandlerContext.java:324)
>  at io.netty.handler.codec.ByteToMessageDecoder.channelRead(
> ByteToMessageDecoder.java:242)
>  at io.netty.channel.CombinedChannelDuplexHandler.channelRead(
> CombinedChannelDuplexHandler.java:147)
>  at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(
> AbstractChannelHandlerContext.java:339)
>  at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(
> AbstractChannelHandlerContext.java:324)
>  at io.netty.channel.DefaultChannelPipeline.fireChannelRead(
> DefaultChannelPipeline.java:847)
>  at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(
> AbstractNioByteChannel.java:131)
>  at io.netty.channel.nio.NioEventLoop.processSelectedKey(
> NioEventLoop.java:511)
>  at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(
> NioEventLoop.java:468)
>  at io.netty.channel.nio.NioEventLoop.processSelectedKeys(
> NioEventLoop.java:382)
>  at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
>  at io.netty.util.concurrent.SingleThreadEventExecutor$2.
> 

schema to just read as "byte[] array" from kafka

2017-08-07 Thread Raja . Aravapalli

Hi

I am using SimpleStringSchema to deserialize a message read from kafka, but 
need some help to know if there is any schema available I can use rather than 
“SimpleStringSchema()” and instead just get “byte[]” without any 
deserialization happening!

Below is code I am currently using, but instead of SimpleStringSchema() which 
is giving me Strings, but I want the a raw byte array Byte[]:

FlinkKafkaConsumer08 myConsumer = new 
FlinkKafkaConsumer08<>("xxx_topic", new SimpleStringSchema(), properties);


Thanks a lot.


Regards,
Raja.


Re: [EXTERNAL] Re: Help required - "BucketingSink" usage to write HDFS Files

2017-08-07 Thread Raja . Aravapalli
Thanks very much for the pointers Vinay. That helps ☺


-Raja.

From: vinay patil 
Date: Monday, August 7, 2017 at 1:56 AM
To: "user@flink.apache.org" 
Subject: Re: [EXTERNAL] Re: Help required - "BucketingSink" usage to write HDFS 
Files

Hi Raja,

That is why they are in the pending state. You can enable checkpointing by 
setting env.enableCheckpointing()

After doing this they will not remain in pending state.

Check this out : 
https://ci.apache.org/projects/flink/flink-docs-release-1.3/api/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.html

Regards,
Vinay Patil

On Mon, Aug 7, 2017 at 9:15 AM, Raja.Aravapalli [via Apache Flink User Mailing 
List archive.] <[hidden 
email]> wrote:
Hi Vinay,

Thanks for the response.

I have NOT enabled any checkpointing.

Files are rolling out correctly for every 2mb, but the files are remaining as 
below:

-rw-r--r--   3 2097424 2017-08-06 21:10 ////Test/part-0-0.pending
-rw-r--r--   3 1431430 2017-08-06 21:12 ////Test/part-0-1.pending


Regards,
Raja.

From: vinay patil <[hidden 
email]>
Date: Sunday, August 6, 2017 at 10:40 PM
To: "[hidden email]" 
<[hidden email]>
Subject: [EXTERNAL] Re: Help required - "BucketingSink" usage to write HDFS 
Files

Hi Raja,

Have you enabled checkpointing?
The files will be rolled to complete state when the batch size is reached (in 
your case 2 MB) or when the bucket is inactive for a certain amount of time.

Regards,
Vinay Patil

On Mon, Aug 7, 2017 at 7:53 AM, Raja.Aravapalli [via Apache Flink User Mailing 
List archive.] <[hidden email]> wrote:

Hi,

I am working on a poc to write to hdfs files using BucketingSink class. Even 
thought I am the data is being writing to hdfs files, but the files are lying 
with “.pending” on hdfs.


Below is the code I am using. Can someone pls help me identify the issue and 
help me fix this ?


BucketingSink HdfsSink = new 
BucketingSink("hdfs://///Test/");
HdfsSink.setBucketer(new DateTimeBucketer("-MM-dd--HHmm"));
HdfsSink.setBatchSize(1024 * 1024 * 2); // this is 2 MB,
HdfsSink.setInactiveBucketCheckInterval(1L);
HdfsSink.setInactiveBucketThreshold(1L);


Thanks a lot.


Regards,
Raja.


If you reply to this email, your message will be added to the discussion below:
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Help-required-BucketingSink-usage-to-write-HDFS-Files-tp14714.html
To start a new topic under Apache Flink User Mailing List archive., email 
[hidden email]
To unsubscribe from Apache Flink User Mailing List archive., click here.
NAML



View this message in context: Re: Help required - "BucketingSink" usage to 
write HDFS 
Files
Sent from the Apache Flink User Mailing List archive. mailing list 
archive 
at Nabble.com.


If you reply to this email, your message will be added to the discussion below:
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Help-required-BucketingSink-usage-to-write-HDFS-Files-tp14714p14716.html
To start a new topic under Apache Flink User Mailing List archive., email 
[hidden email]
To unsubscribe from Apache Flink User Mailing List archive., click here.
NAML



View this message in context: Re: [EXTERNAL] Re: Help required - 
"BucketingSink" usage to write HDFS 
Files
Sent from the Apache Flink User Mailing List archive. mailing list 
archive 
at Nabble.com.



Flink REST API async?

2017-08-07 Thread Francisco Gonzalez Barea
Hi there!

We are doing some POCs submitting jobs remotely to Flink. We tried with Flink 
CLI and now we´re testing the Rest API.

So the point is that when we try to execute a set of requests in an async way 
(using CompletableFutures) only a couple of them run successfully. For the rest 
we get the exception copied at the end of the email.

Do you know the reason for this?

Thanks in advance!!
Regards,

org.apache.flink.client.program.ProgramInvocationException: The main method 
caused an error.
  at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:545)
  at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:419)
  at 
org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:80)
 at 
org.apache.flink.client.program.ClusterClient.getOptimizedPlan(ClusterClient.java:318)
 at 
org.apache.flink.runtime.webmonitor.handlers.JarActionHandler.getJobGraphAndClassLoader(JarActionHandler.java:72)
 at 
org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.handleJsonRequest(JarRunHandler.java:61)
 at 
org.apache.flink.runtime.webmonitor.handlers.AbstractJsonRequestHandler.handleRequest(AbstractJsonRequestHandler.java:41)
 at 
org.apache.flink.runtime.webmonitor.RuntimeMonitorHandler.respondAsLeader(RuntimeMonitorHandler.java:109)
 at 
org.apache.flink.runtime.webmonitor.RuntimeMonitorHandlerBase.channelRead0(RuntimeMonitorHandlerBase.java:97)
 at 
org.apache.flink.runtime.webmonitor.RuntimeMonitorHandlerBase.channelRead0(RuntimeMonitorHandlerBase.java:44)
 at 
io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
 at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
 at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
 at io.netty.handler.codec.http.router.Handler.routed(Handler.java:62)
 at 
io.netty.handler.codec.http.router.DualAbstractHandler.channelRead0(DualAbstractHandler.java:57)
 at 
io.netty.handler.codec.http.router.DualAbstractHandler.channelRead0(DualAbstractHandler.java:20)
 at 
io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
 at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
 at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
 at 
org.apache.flink.runtime.webmonitor.HttpRequestHandler.channelRead0(HttpRequestHandler.java:159)
 at 
org.apache.flink.runtime.webmonitor.HttpRequestHandler.channelRead0(HttpRequestHandler.java:65)
 at 
io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
 at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
 at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
 at 
io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86)
 at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
 at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
 at 
io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:242)
 at 
io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:147)
 at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
 at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
 at 
io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:847)
 at 
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
 at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
 at 
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
 at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
 at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
 at 
io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
 at 
io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137)
 at java.lang.Thread.run(Thread.java:748)
 \nCaused by: java.util.ConcurrentModificationException
 at java.util.ArrayList$Itr.checkForComodification(ArrayList.java:901)
 at java.util.ArrayList$Itr.next(ArrayList.java:851)
 at 
org.apache.flink.api.java.operators.OperatorTranslation.translateToPlan(OperatorTranslation.java:49)
 at 
org.apache.flink.api.java.ExecutionEnvironment.createProgramPlan(ExecutionEnvironment.java:1065)
 at 

Re: Test example and lambdas

2017-08-07 Thread Timo Walther

Hi,

Row is very special data type, because Flink cannot extract the field 
types automatically based on Java generics. By default it is serialized 
by Kryo, you need to specify the field types using 
Typles.ROW(Types.STRING, ...) and pass this information in your 
`.returns()` methods instead of `Row.class`.


Lambdas are sometimes a problem for Flink, see 
https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/java8.html.


In your example it might makes sense to use the Table API for 
pre-processing. The Table API has a built-in CSV input format and deals 
with all the type handling for you. You can convert a table back to 
DataStream.


Regards,
Timo


Am 05.08.17 um 13:49 schrieb Егор Литвиненко:

> No errors, and no
data in table.

2017-08-05 14:49 GMT+03:00 Егор Литвиненко >:


Hi

I try to test Flink and have a problems.
Why this example doesn't work -

https://github.com/egorlitvinenko/testparsing/blob/master/test-flink/src/main/java/org/egorlitvinenko/testflink/StreamingJob9C.java


Logs - https://pastebin.com/iuBZhfeG 
No errors, and no

One more question. Examples intentionally use anonymous classes,
instead of lambdas. Because with lambda it also doesn't work. I
did small investigation and found out that Flink has different
processing for lambdas. And in one moment when Flink processes
types, they are empty.  Do you have full lambdas support?

In best regards, Egor Litvinenko.






Re: [ANNOUNCE] Apache Flink 1.3.2 released

2017-08-07 Thread Fabian Hueske
Thanks Aljoscha and everybody who contributed with bug reports and fixes!

Best, Fabian

2017-08-07 11:07 GMT+02:00 Till Rohrmann :

> Thanks to the community and Aljoscha for the hard work to complete Flink
> 1.3.2.
>
> Cheers,
> Till
>
> On Sat, Aug 5, 2017 at 9:12 AM, Aljoscha Krettek 
> wrote:
>
>> The Apache Flink community is pleased to announce the release of Apache
>> Flink 1.3.2.
>>
>> Apache Flink® is an open-source stream processing framework for
>> distributed, high-performing, always-available, and accurate data streaming
>> applications.
>>
>> The release is available for download at:
>>
>>  https://flink.apache.org/downloads.html
>>
>> There is a blog post available on the Flink site listing all the
>> addressed issues:
>>
>>  - http://flink.apache.org/news/2017/08/05/release-1.3.2.html
>>
>> We would like to thank all contributors who made this release possible!
>
>
>


Re: [ANNOUNCE] Apache Flink 1.3.2 released

2017-08-07 Thread Till Rohrmann
Thanks to the community and Aljoscha for the hard work to complete Flink
1.3.2.

Cheers,
Till

On Sat, Aug 5, 2017 at 9:12 AM, Aljoscha Krettek 
wrote:

> The Apache Flink community is pleased to announce the release of Apache
> Flink 1.3.2.
>
> Apache Flink® is an open-source stream processing framework for
> distributed, high-performing, always-available, and accurate data streaming
> applications.
>
> The release is available for download at:
>
>  https://flink.apache.org/downloads.html
>
> There is a blog post available on the Flink site listing all the addressed
> issues:
>
>  - http://flink.apache.org/news/2017/08/05/release-1.3.2.html
>
> We would like to thank all contributors who made this release possible!


Re: JMX stats reporter with all task manager/job manager stats aggregated?

2017-08-07 Thread Chesnay Schepler

Hello,

there is no central place where JMX metrics are aggregated.

You can configure a port range for the reporter to prevent port 
conflicts on the same machine.


metrics.reporter.jmx.port:8789-8790

You can find out which port was used by checking the logs.

Regards,
Chesnay

On 05.08.2017 03:06, Ajay Tripathy wrote:
Sorry: neglected to include the stack trace for JMX failing to 
instantiate from a taskmanager:


017-08-05 00:59:09,388 INFO  org.apache.flink.runtime.metrics.MetricRegistry
   - Configuring JMXReporter with {port=8789, 
class=org.apache.flink.metrics.jmx.JMXReporter}.
2017-08-05 00:59:09,402 ERROR org.apache.flink.runtime.metrics.MetricRegistry   
- Could not instantiate metrics reporter jmx. Metrics might not be 
exposed/reported.
java.lang.RuntimeException: Could not start JMX server on any configured port. 
Ports: 8789
at org.apache.flink.metrics.jmx.JMXReporter.open(JMXReporter.java:127)
at 
org.apache.flink.runtime.metrics.MetricRegistry.(MetricRegistry.java:120)
at 
org.apache.flink.runtime.taskmanager.TaskManager$.createTaskManagerComponents(TaskManager.scala:2114)
at 
org.apache.flink.runtime.taskmanager.TaskManager$.startTaskManagerComponentsAndActor(TaskManager.scala:1873)
at 
org.apache.flink.runtime.taskmanager.TaskManager$.runTaskManager(TaskManager.scala:1769)
at 
org.apache.flink.runtime.taskmanager.TaskManager$.selectNetworkInterfaceAndRunTaskManager(TaskManager.scala:1637)
at 
org.apache.flink.runtime.taskmanager.TaskManager.selectNetworkInterfaceAndRunTaskManager(TaskManager.scala)
at 
org.apache.flink.yarn.YarnTaskManagerRunner$1.call(YarnTaskManagerRunner.java:146)
at 
org.apache.flink.yarn.YarnTaskManagerRunner$1.call(YarnTaskManagerRunner.java:142)
at 
org.apache.flink.runtime.security.HadoopSecurityContext$1.run(HadoopSecurityContext.java:43)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)
at 
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:40)
at 
org.apache.flink.yarn.YarnTaskManagerRunner.runYarnTaskManager(YarnTaskManagerRunner.java:142)
at org.apache.flink.yarn.YarnTaskManager$.main(YarnTaskManager.scala:64)
at org.apache.flink.yarn.YarnTaskManager.main(YarnTaskManager.scala)

On Fri, Aug 4, 2017 at 3:51 PM, Ajay Tripathy > wrote:


Hi, I'm running flink jobmanagers/taskmanagers with yarn. I've
turned on the JMX reporter in my flink-conf.yaml as follows:

metrics.reporters:jmx

metrics.reporter.jmx.class:org.apache.flink.metrics.jmx.JMXReporter


I was wondering:

Is there a JMX server with the aggregated stats across all jobs /
tasks? If so, where is it located? It appears that a JMX starts
for every single taskmanager and the jobmanagers do not have the
data reported from the taskmanagers.


I'm not sure if this is related, but when I try to specify a port
for the jmx reporter, like this:

metrics.reporter.jmx.port: 8789

I'm receiving an error where JMX servers from different task
managers fight for that port, and fail to start.






Re: Access Sliding window

2017-08-07 Thread Raj Kumar
Hi Fabian,

Can you please answer my last set of questions I have posted on the Forum.

Thanks.

On Friday, August 4, 2017, Fabian Hueske-2 [via Apache Flink User Mailing
List archive.]  wrote:

> TimeWindow.getStart() or TimeWindow.getEnd()
>
> -> https://ci.apache.org/projects/flink/flink-docs-
> release-1.3/dev/windows.html#incremental-window-aggregation-with-
> reducefunction
>
> 2017-08-04 22:43 GMT+02:00 Raj Kumar <[hidden email]
> >:
>
>> Thanks Fabian.
>>
>> The incoming events have the timestamps. Once I aggregate in the first
>> stream to get counts and calculate the mean/standard deviation in the
>> second
>> the new timestamps should be window start time ? How to tackle this issue
>> ?
>>
>>
>>
>>
>> --
>> View this message in context: http://apache-flink-user-maili
>> ng-list-archive.2336050.n4.nabble.com/Access-Sliding-wind
>> ow-tp14519p14698.html
>> Sent from the Apache Flink User Mailing List archive. mailing list
>> archive at Nabble.com.
>>
>
>
>
> --
> If you reply to this email, your message will be added to the discussion
> below:
> http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/Access-Sliding-window-tp14519p14699.html
> To unsubscribe from Access Sliding window, click here
> 
> .
> NAML
> 
>




--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Access-Sliding-window-tp14519p14718.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.

Re: [EXTERNAL] Re: Help required - "BucketingSink" usage to write HDFS Files

2017-08-07 Thread vinay patil
Hi Raja,

That is why they are in the pending state. You can enable checkpointing by
setting env.enableCheckpointing()

After doing this they will not remain in pending state.

Check this out :
https://ci.apache.org/projects/flink/flink-docs-release-1.3/api/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.html

Regards,
Vinay Patil

On Mon, Aug 7, 2017 at 9:15 AM, Raja.Aravapalli [via Apache Flink User
Mailing List archive.]  wrote:

> Hi Vinay,
>
>
>
> Thanks for the response.
>
>
>
> I have NOT enabled any checkpointing.
>
>
>
> Files are rolling out correctly for every 2mb, but the files are remaining
> as below:
>
>
>
> -rw-r--r--   3 2097424 2017-08-06 21:10 *///*/Test/part-0-0.
> pending
>
> -rw-r--r--   3 1431430 2017-08-06 21:12 *///*/Test/part-0-1.
> pending
>
>
>
>
>
> Regards,
>
> Raja.
>
>
>
> *From: *vinay patil <[hidden email]
> >
> *Date: *Sunday, August 6, 2017 at 10:40 PM
> *To: *"[hidden email]
> " <[hidden email]
> >
> *Subject: *[EXTERNAL] Re: Help required - "BucketingSink" usage to write
> HDFS Files
>
>
>
> Hi Raja,
>
> Have you enabled checkpointing?
>
> The files will be rolled to complete state when the batch size is reached
> (in your case 2 MB) or when the bucket is inactive for a certain amount of
> time.
>
>
> Regards,
>
> Vinay Patil
>
>
>
> On Mon, Aug 7, 2017 at 7:53 AM, Raja.Aravapalli [via Apache Flink User
> Mailing List archive.] <[hidden email]> wrote:
>
>
>
> Hi,
>
>
>
> I am working on a poc to write to hdfs files using BucketingSink class.
> Even thought I am the data is being writing to hdfs files, but the files
> are lying with “.pending” on hdfs.
>
>
>
>
>
> Below is the code I am using. Can someone pls help me identify the issue
> and help me fix this ?
>
>
>
>
>
> BucketingSink HdfsSink = *new *BucketingSink(
> *"hdfs://///Test/"*);
>
>
>
> *HdfsSink.setBucketer(new DateTimeBucketer("-MM-dd--HHmm"));
> HdfsSink.setBatchSize(1024 * 1024 * 2); // this is 2 MB,
> HdfsSink.setInactiveBucketCheckInterval(1L);
> HdfsSink.setInactiveBucketThreshold(1L);*
>
>
>
>
>
> Thanks a lot.
>
>
>
>
>
> Regards,
>
> Raja.
>
>
> --
>
> *If you reply to this email, your message will be added to the discussion
> below:*
>
> http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/Help-required-BucketingSink-usage-to-write-
> HDFS-Files-tp14714.html
>
> To start a new topic under Apache Flink User Mailing List archive., email 
> [hidden
> email]
> To unsubscribe from Apache Flink User Mailing List archive., click here.
> NAML
> 
>
>
>
>
> --
>
> View this message in context: Re: Help required - "BucketingSink" usage
> to write HDFS Files
> 
> Sent from the Apache Flink User Mailing List archive. mailing list archive
>  at
> Nabble.com.
>
>
>
> --
> If you reply to this email, your message will be added to the discussion
> below:
> http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/Help-required-BucketingSink-usage-to-write-
> HDFS-Files-tp14714p14716.html
> To start a new topic under Apache Flink User Mailing List archive., email
> ml+s2336050n1...@n4.nabble.com
> To unsubscribe from Apache Flink User Mailing List archive., click here
> 
> .
> NAML
> 
>




--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Help-required-BucketingSink-usage-to-write-HDFS-Files-tp14714p14717.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.