FlinkKafkaConsumer configuration to consume from Multiple Kafka Topics

2018-07-17 Thread sagar loke
Hi,

We have a use case where we are consuming from more than 100s of Kafka
Topics. Each topic has different number of partitions.

As per the documentation, to parallelize a Kafka Topic, we need to use
setParallelism() == number of Kafka Partitions for a topic.

But if we are consuming multiple topics in Flink by providing pattern eg.
*my_topic_** and for each topic if there is different configuration for
partitions,

then how should we connect all these together so that we can map Kafka
Partition to Flink Parallelization correctly and programmatically (so that
we don't have to hard code all the topic names and parallelism --
considering we can access kafka topic <-> number of partitions mapping in
Flink) ?

Thanks,


Re: Parallel stream partitions

2018-07-17 Thread Ken Krugler
Hi Nick,

> On Jul 17, 2018, at 9:09 AM, Nicholas Walton  > wrote:
> 
> Suppose I have a data stream of tuples  
> with the sequence of ticks being 1,2,3,…. for each separate k.
> 
> I understand and keyBy(2)

I think you want keyBy(1), since it’s 0-based.

> will partition the stream so each partition has the same key in each tuple.

I don’t think that’s exactly correct.

Each tuple with the same key value will be in the same partition. But each 
partition can receive multiple key values, depending on the cardinality of the 
keys, the number of partitions, and how they get hashed.

> I now have a sequence of functions to apply to the streams say f(),g() and 
> h() in that order. 

Assuming these functions are all post-partitioning, then I would expect all 
tuples with the same key would be processed by the functions that are also 
running in the same partition.

So .keyBy(1).map(f).map(g).map(h) should partition by the key, and then chain 
the processing of tuples.

— Ken

> 
> With parallelism set to 1 then each partition-stream passes through f then g 
> then h (f | g | h) in order of tick.
> 
> I want to run each partition-stream in parallel, setting parallelism in the 
> Web GUI. 
> 
> My question is how do I ensure  each partition stream passes through a fixed 
> sequence (f | g | h)  rather than if parallelism is p running p instances 
> each of f g & h with no guarantee that each partition-stream flows through a 
> unique set of three instances  in tick-order, especially if p is greater than 
> the largest value of key. 
> 
> A typical use case would be to maintain a moving average over each key 
> 
> <1*Xjd2gfMhYqx0sIvAISR47A.png>
> 
> I need to remove the crossover in the middle box, so [1] -> [1] -> [1] and 
> [2] -> [2] -> [2], instead of  [1] -> [1] -> [1 or 2] .
> 
> Nick

--
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
Custom big data solutions & training
Flink, Solr, Hadoop, Cascading & Cassandra



Keeping only latest row by key?

2018-07-17 Thread Porritt, James
In Spark if I want to be able to get a set of unique rows by id, using the 
criteria of keeping the row with the latest timestamp, I would do the following:

.withColumn("rn",
F.row_number().over(
Window.partitionBy('id') \
.orderBy(F.col('timestamp').desc())
)
) \
.where(F.col("rn") == 1)

I see Flink has windowing functionality, but I don't see it has row 
enumeration? How best in that case would I achieve the above?

Thanks,
James.
##

The information contained in this communication is confidential and

intended only for the individual(s) named above. If you are not a named

addressee, please notify the sender immediately and delete this email

from your system and do not disclose the email or any part of it to any

person. The views expressed in this email are the views of the author

and do not necessarily represent the views of Millennium Capital Partners

LLP (MCP LLP) or any of its affiliates. Outgoing and incoming electronic

communications of MCP LLP and its affiliates, including telephone

communications, may be electronically archived and subject to review

and/or disclosure to someone other than the recipient. MCP LLP is

authorized and regulated by the Financial Conduct Authority. Millennium

Capital Partners LLP is a limited liability partnership registered in

England & Wales with number OC312897 and with its registered office at

50 Berkeley Street, London, W1J 8HD.

##


Re: clear method on Window Trigger

2018-07-17 Thread Hequn Cheng
Hi Soheil,

The clear() method performs any action needed upon removal of the
corresponding window. This is called when a window is purged. The
differences between FIRE and FIRE_AND_PURGE is FIRE only trigger the
computation while FIRE_AND_PURGE trigger the computation and clear the
elements in the window afterwards.

As for your example, if you use FIRE, when the new data comes, the window
is triggered for all data. Besides, some window functions can incrementally
aggregate the elements for each window as they arrive, for example
AggregateFunction and FoldFunction.

More details can be found here[1].

Best, Hequn

[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/windows.html#windows



On Tue, Jul 17, 2018 at 10:44 PM, Soheil Pourbafrani 
wrote:

> Hi,
>
> Can someone elaborate on when the clear method on class Trigger will be
> called and what is the duty of that? Also, I don't know what is the benefit
> of FIRE_AND_PURGE against FIRE and it's use case.
>
> For example, in a scenario, if we have a count of 3 Window that also will
> trigger after a timeout if we FIRE window on timeout and after a while, a
> new data will be added to the data, will window be triggered for all data
> or not just for new late data? I guess in such such case we should use the
> FIRE_AND_PURGE.
>
>


Parallel stream partitions

2018-07-17 Thread Nicholas Walton
Suppose I have a data stream of tuples  
with the sequence of ticks being 1,2,3,…. for each separate k.

I understand and keyBy(2) will partition the stream so each partition has the 
same key in each tuple. I now have a sequence of functions to apply to the 
streams say f(),g() and h() in that order. 

With parallelism set to 1 then each partition-stream passes through f then g 
then h (f | g | h) in order of tick.

I want to run each partition-stream in parallel, setting parallelism in the Web 
GUI. 

My question is how do I ensure  each partition stream passes through a fixed 
sequence (f | g | h)  rather than if parallelism is p running p instances each 
of f g & h with no guarantee that each partition-stream flows through a unique 
set of three instances  in tick-order, especially if p is greater than the 
largest value of key. 

A typical use case would be to maintain a moving average over each key 



I need to remove the crossover in the middle box, so [1] -> [1] -> [1] and [2] 
-> [2] -> [2], instead of  [1] -> [1] -> [1 or 2] .

Nick

Re: clear method on Window Trigger

2018-07-17 Thread vino yang
Hi Soheil,

Did you read the documentation about Flink Window/Trigger [1]?

FIRE_AND_PURGE usually used to implement the count window. Flink provide a
PurgingTrigger as a warper for other trigger to make those triggers can be
purge. One of this class use case is count window[2][3].

About your example, you mean the window will triggered by count and time?
if it is triggered by count condition we could use FIRE_AND_PURGE.

[1]:
https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/stream/operators/windows.html#fire-and-purge
[2]:
https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java#L765
[3]:
https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java#L628

Thanks, vino.

2018-07-17 22:44 GMT+08:00 Soheil Pourbafrani :

> Hi,
>
> Can someone elaborate on when the clear method on class Trigger will be
> called and what is the duty of that? Also, I don't know what is the benefit
> of FIRE_AND_PURGE against FIRE and it's use case.
>
> For example, in a scenario, if we have a count of 3 Window that also will
> trigger after a timeout if we FIRE window on timeout and after a while, a
> new data will be added to the data, will window be triggered for all data
> or not just for new late data? I guess in such such case we should use the
> FIRE_AND_PURGE.
>
>


Re: FlinkCEP and scientific papers ?

2018-07-17 Thread vino yang
Hi Esa,

AFAIK, the earlier Flink CEP refers to the Paper 《Efficient Pattern
Matching over Event Streams》[1]. Flink absorbed  two major idea from this
paper:

1. NFA-b model on event stream
2. a shared versioned match buffer which is a optimized data structure

To Till and Chesnay:

Did I missed anything when as time goes on and the development of Flink? If
yes, please give your additional remarks.

[1]: https://people.cs.umass.edu/~yanlei/publications/sase-sigmod08.pdf

Thanks, vino.

2018-07-17 22:01 GMT+08:00 Esa Heikkinen :

> Hi
>
>
>
> I don’t know this the correct forum to ask, but are there exist some good
> scientific papers about FlinkCEP (Complex Event Processing) ?
>
>
>
> I know Flink is based to Stratosphere, but how is it FlinkCEP ?
>
>
>
> BR Esa
>


[ANNOUNCE] Program for Flink Forward Berlin 2018 has been announced

2018-07-17 Thread Fabian Hueske
Hi everyone,

I'd like to announce the program for Flink Forward Berlin 2018.

The program committee [1] assembled a program of about 50 talks on use
cases, operations, ecosystem, tech deep dive, and research topics.
The conference will host speakers from Airbnb, Amazon, Google, ING, Lyft,
Microsoft, Netflix and Uber and many other organizations.

https://berlin-2018.flink-forward.org/conference-program

Flink Forward Berlin 2018 will take place on September 3-5.
As previously announced, the registration is open and as a member of the
Flink community we offer you a 20% discount on your conference pass if you
register with the code *MailingList*.

https://berlin-2018.flink-forward.org/register/

Best regards,
Fabian
(PC Chair for Flink Forward Berlin 2018)

[1] https://berlin-2018.flink-forward.org/program-committee/


Re: flink javax.xml.parser Error

2018-07-17 Thread antonio saldivar
If somebody is facing this issue I solve it by adding the exclusion to my
POM.xml and I am also using javax.xml




org.apache.flink

artifactId>flink-core

1.4.2



   

xml-apis

xml-apis



   






javax.xml

jaxb-api

2.1





El lun., 16 jul. 2018 a las 18:26, antonio saldivar ()
escribió:

> Hello
>
>
> I am getting this error when I run my application in Ambari local-cluster and 
> I get this error at runtime.
>
> Flink 1.4.2
>
> phoenix
>
> hbase
>
>
> Does any one have any recommendation to solve this issue?
>
>
>
>
> javax.xml.parsers.FactoryConfigurationError: Provider for class 
> javax.xml.parsers.DocumentBuilderFactory cannot be created
>   at 
> javax.xml.parsers.FactoryFinder.findServiceProvider(FactoryFinder.java:311)
>   at javax.xml.parsers.FactoryFinder.find(FactoryFinder.java:267)
>   at 
> javax.xml.parsers.DocumentBuilderFactory.newInstance(DocumentBuilderFactory.java:120)
>   at 
> org.apache.hadoop.conf.Configuration.loadResource(Configuration.java:2208)
>   at 
> org.apache.hadoop.conf.Configuration.loadResources(Configuration.java:2185)
>   at 
> org.apache.hadoop.conf.Configuration.getProps(Configuration.java:2102)
>   at org.apache.hadoop.conf.Configuration.get(Configuration.java:851)
>   at 
> org.apache.hadoop.conf.Configuration.getTrimmed(Configuration.java:870)
>   at 
> org.apache.hadoop.conf.Configuration.getBoolean(Configuration.java:1268)
>   at 
> org.apache.hadoop.hbase.HBaseConfiguration.checkDefaultsVersion(HBaseConfiguration.java:68)
>   at 
> org.apache.hadoop.hbase.HBaseConfiguration.addHbaseResources(HBaseConfiguration.java:82)
>   at 
> org.apache.hadoop.hbase.HBaseConfiguration.create(HBaseConfiguration.java:97)
>   at 
> org.apache.phoenix.query.ConfigurationFactory$ConfigurationFactoryImpl$1.call(ConfigurationFactory.java:49)
>   at 
> org.apache.phoenix.query.ConfigurationFactory$ConfigurationFactoryImpl$1.call(ConfigurationFactory.java:46)
>   at 
> org.apache.phoenix.util.PhoenixContextExecutor.call(PhoenixContextExecutor.java:78)
>   at 
> org.apache.phoenix.util.PhoenixContextExecutor.callWithoutPropagation(PhoenixContextExecutor.java:93)
>   at 
> org.apache.phoenix.query.ConfigurationFactory$ConfigurationFactoryImpl.getConfiguration(ConfigurationFactory.java:46)
>   at 
> org.apache.phoenix.query.QueryServicesOptions.withDefaults(QueryServicesOptions.java:285)
>   at 
> org.apache.phoenix.query.QueryServicesImpl.(QueryServicesImpl.java:36)
>   at 
> org.apache.phoenix.jdbc.PhoenixDriver.getQueryServices(PhoenixDriver.java:178)
>   at 
> org.apache.phoenix.jdbc.PhoenixDriver.getConnectionQueryServices(PhoenixDriver.java:214)
>   at 
> org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.createConnection(PhoenixEmbeddedDriver.java:142)
>   at org.apache.phoenix.jdbc.PhoenixDriver.connect(PhoenixDriver.java:202)
>   at java.sql.DriverManager.getConnection(DriverManager.java:664)
>   at java.sql.DriverManager.getConnection(DriverManager.java:270)
>   at 
> net.paladion.npci.dbsteps.FrmRawDataInsertStep$1.map(FrmRawDataInsertStep.java:49)
>   at 
> net.paladion.npci.dbsteps.FrmRawDataInsertStep$1.map(FrmRawDataInsertStep.java:1)
>   at 
> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:549)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:611)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:572)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:830)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:808)
>   at 
> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:549)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:830)
>   at 
> 

Re: AvroInputFormat NullPointerException issues

2018-07-17 Thread vino yang
Hi Porritt,

OK, it looks good.

Thanks, vino.

2018-07-17 23:13 GMT+08:00 Porritt, James :

> I got to the bottom of this – it was a namespace issue. My schema was;
>
> {
>
>   "type" : "record",
>
>   "name" : "MyAvroSchema",
>
>   "fields" : [ {
>
> "name" : "a",
>
> "type" : [ "null", "int" ]
>
>   }, {
>
> "name" : "b",
>
> "type" : [ "null", "string" ]
>
>   }]
>
> }
>
> But actually, I was putting the generated MyAvroSchema file into
> ‘my_stats’ namespace (along with my other application code) by adding a
> ‘package my_stats;’ line at the top. When I added “namespace”: “my_stats”
> to the schema and generated the Java that way it was fine.
>
>
>
> *From:* Porritt, James 
> *Sent:* 17 July 2018 15:10
> *To:* 'vino yang' 
> *Cc:* user@flink.apache.org
> *Subject:* RE: AvroInputFormat NullPointerException issues
>
>
>
> My MyAvroSchema class is as follows. It was generated using avro-tools:
>
>
>
> /**
>
> * Autogenerated by Avro
>
> *
>
> * DO NOT EDIT DIRECTLY
>
> */
>
>
>
> import org.apache.avro.specific.SpecificData;
>
> import org.apache.avro.message.BinaryMessageEncoder;
>
> import org.apache.avro.message.BinaryMessageDecoder;
>
> import org.apache.avro.message.SchemaStore;
>
>
>
> @SuppressWarnings("all")
>
> @org.apache.avro.specific.AvroGenerated
>
> public class MyAvroSchema extends org.apache.avro.specific.SpecificRecordBase
> implements org.apache.avro.specific.SpecificRecord {
>
>   private static final long serialVersionUID = 4994916517880671663L;
>
>   public static final org.apache.avro.Schema SCHEMA$ = new
> org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",
> \"name\":\"MyAvroSchema\",\"fields\":[{\"name\":\"a\",\"
> type\":[\"null\",\"int\"]},{\"name\":\"b\",\"type\":[\"null\
> ",\"string\"]}]}");
>
>   public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; }
>
>
>
>   private static SpecificData MODEL$ = new SpecificData();
>
>
>
>   private static final BinaryMessageEncoder ENCODER =
>
>   new BinaryMessageEncoder(MODEL$, SCHEMA$);
>
>
>
>   private static final BinaryMessageDecoder DECODER =
>
>   new BinaryMessageDecoder(MODEL$, SCHEMA$);
>
>
>
>   /**
>
>* Return the BinaryMessageDecoder instance used by this class.
>
>*/
>
>   public static BinaryMessageDecoder getDecoder() {
>
> return DECODER;
>
>   }
>
>
>
>   /**
>
>* Create a new BinaryMessageDecoder instance for this class that uses
> the specified {@link SchemaStore}.
>
>* @param resolver a {@link SchemaStore} used to find schemas by
> fingerprint
>
>*/
>
>   public static BinaryMessageDecoder
> createDecoder(SchemaStore resolver) {
>
> return new BinaryMessageDecoder(MODEL$, SCHEMA$,
> resolver);
>
>   }
>
>
>
>   /** Serializes this MyAvroSchema to a ByteBuffer. */
>
>   public java.nio.ByteBuffer toByteBuffer() throws java.io.IOException {
>
> return ENCODER.encode(this);
>
>   }
>
>
>
>   /** Deserializes a MyAvroSchema from a ByteBuffer. */
>
>   public static MyAvroSchema fromByteBuffer(
>
>   java.nio.ByteBuffer b) throws java.io.IOException {
>
> return DECODER.decode(b);
>
>   }
>
>
>
>   @Deprecated public java.lang.Integer a;
>
>   @Deprecated public java.lang.CharSequence b;
>
>
>
>   /**
>
>* Default constructor.  Note that this does not initialize fields
>
>* to their default values from the schema.  If that is desired then
>
>* one should use newBuilder().
>
>*/
>
>   public MyAvroSchema() {}
>
>
>
>   /**
>
>* All-args constructor.
>
>* @param a The new value for a
>
>* @param b The new value for b
>
>*/
>
>   public MyAvroSchema(java.lang.Integer a, java.lang.CharSequence b) {
>
> this.a = a;
>
> this.b = b;
>
>   }
>
>
>
>   public org.apache.avro.Schema getSchema() { return SCHEMA$; }
>
>   // Used by DatumWriter.  Applications should not call.
>
>   public java.lang.Object get(int field$) {
>
> switch (field$) {
>
> case 0: return a;
>
> case 1: return b;
>
> default: throw new org.apache.avro.AvroRuntimeException("Bad index");
>
> }
>
>   }
>
>
>
>   // Used by DatumReader.  Applications should not call.
>
>   @SuppressWarnings(value="unchecked")
>
>   public void put(int field$, java.lang.Object value$) {
>
> switch (field$) {
>
> case 0: a = (java.lang.Integer)value$; break;
>
> case 1: b = (java.lang.CharSequence)value$; break;
>
> default: throw new org.apache.avro.AvroRuntimeException("Bad index");
>
> }
>
>   }
>
>
>
>   /**
>
>* Gets the value of the 'a' field.
>
>* @return The value of the 'a' field.
>
>*/
>
>   public java.lang.Integer getA() {
>
> return a;
>
>   }
>
>
>
>   /**
>
>* Sets the value of the 'a' field.
>
>* @param value the value to set.
>
>*/
>
>   public void setA(java.lang.Integer value) {
>
> this.a = value;
>
>   }
>
>
>
>   /**
>
>* Gets the value of the 'b' field.
>
>* @return The value of the 'b' field.
>
>*/
>
>   public java.lang.CharSequence getB() {
>

RE: AvroInputFormat NullPointerException issues

2018-07-17 Thread Porritt, James
I got to the bottom of this – it was a namespace issue. My schema was;
{
  "type" : "record",
  "name" : "MyAvroSchema",
  "fields" : [ {
"name" : "a",
"type" : [ "null", "int" ]
  }, {
"name" : "b",
"type" : [ "null", "string" ]
  }]
}
But actually, I was putting the generated MyAvroSchema file into ‘my_stats’ 
namespace (along with my other application code) by adding a ‘package 
my_stats;’ line at the top. When I added “namespace”: “my_stats” to the schema 
and generated the Java that way it was fine.

From: Porritt, James 
Sent: 17 July 2018 15:10
To: 'vino yang' 
Cc: user@flink.apache.org
Subject: RE: AvroInputFormat NullPointerException issues

My MyAvroSchema class is as follows. It was generated using avro-tools:

/**
* Autogenerated by Avro
*
* DO NOT EDIT DIRECTLY
*/

import org.apache.avro.specific.SpecificData;
import org.apache.avro.message.BinaryMessageEncoder;
import org.apache.avro.message.BinaryMessageDecoder;
import org.apache.avro.message.SchemaStore;

@SuppressWarnings("all")
@org.apache.avro.specific.AvroGenerated
public class MyAvroSchema extends org.apache.avro.specific.SpecificRecordBase 
implements org.apache.avro.specific.SpecificRecord {
  private static final long serialVersionUID = 4994916517880671663L;
  public static final org.apache.avro.Schema SCHEMA$ = new 
org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"MyAvroSchema\",\"fields\":[{\"name\":\"a\",\"type\":[\"null\",\"int\"]},{\"name\":\"b\",\"type\":[\"null\",\"string\"]}]}");
  public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; }

  private static SpecificData MODEL$ = new SpecificData();

  private static final BinaryMessageEncoder ENCODER =
  new BinaryMessageEncoder(MODEL$, SCHEMA$);

  private static final BinaryMessageDecoder DECODER =
  new BinaryMessageDecoder(MODEL$, SCHEMA$);

  /**
   * Return the BinaryMessageDecoder instance used by this class.
   */
  public static BinaryMessageDecoder getDecoder() {
return DECODER;
  }

  /**
   * Create a new BinaryMessageDecoder instance for this class that uses the 
specified {@link SchemaStore}.
   * @param resolver a {@link SchemaStore} used to find schemas by fingerprint
   */
  public static BinaryMessageDecoder createDecoder(SchemaStore 
resolver) {
return new BinaryMessageDecoder(MODEL$, SCHEMA$, resolver);
  }

  /** Serializes this MyAvroSchema to a ByteBuffer. */
  public java.nio.ByteBuffer toByteBuffer() throws java.io.IOException {
return ENCODER.encode(this);
  }

  /** Deserializes a MyAvroSchema from a ByteBuffer. */
  public static MyAvroSchema fromByteBuffer(
  java.nio.ByteBuffer b) throws java.io.IOException {
return DECODER.decode(b);
  }

  @Deprecated public java.lang.Integer a;
  @Deprecated public java.lang.CharSequence b;

  /**
   * Default constructor.  Note that this does not initialize fields
   * to their default values from the schema.  If that is desired then
   * one should use newBuilder().
   */
  public MyAvroSchema() {}

  /**
   * All-args constructor.
   * @param a The new value for a
   * @param b The new value for b
   */
  public MyAvroSchema(java.lang.Integer a, java.lang.CharSequence b) {
this.a = a;
this.b = b;
  }

  public org.apache.avro.Schema getSchema() { return SCHEMA$; }
  // Used by DatumWriter.  Applications should not call.
  public java.lang.Object get(int field$) {
switch (field$) {
case 0: return a;
case 1: return b;
default: throw new org.apache.avro.AvroRuntimeException("Bad index");
}
  }

  // Used by DatumReader.  Applications should not call.
  @SuppressWarnings(value="unchecked")
  public void put(int field$, java.lang.Object value$) {
switch (field$) {
case 0: a = (java.lang.Integer)value$; break;
case 1: b = (java.lang.CharSequence)value$; break;
default: throw new org.apache.avro.AvroRuntimeException("Bad index");
}
  }

  /**
   * Gets the value of the 'a' field.
   * @return The value of the 'a' field.
   */
  public java.lang.Integer getA() {
return a;
  }

  /**
   * Sets the value of the 'a' field.
   * @param value the value to set.
   */
  public void setA(java.lang.Integer value) {
this.a = value;
  }

  /**
   * Gets the value of the 'b' field.
   * @return The value of the 'b' field.
   */
  public java.lang.CharSequence getB() {
return b;
  }

  /**
   * Sets the value of the 'b' field.
   * @param value the value to set.
   */
  public void setB(java.lang.CharSequence value) {
this.b = value;
  }

  /**
   * Creates a new MyAvroSchema RecordBuilder.
   * @return A new MyAvroSchema RecordBuilder
   */
  public static MyAvroSchema.Builder newBuilder() {
return new MyAvroSchema.Builder();
  }

  /**
   * Creates a new MyAvroSchema RecordBuilder by copying an existing Builder.
   * @param other The existing builder to copy.
   * @return A new MyAvroSchema RecordBuilder
   */
  public static MyAvroSchema.Builder newBuilder(MyAvroSchema.Builder 

回复:Flink job hangs/deadlocks (possibly related to out of memory)

2018-07-17 Thread Zhijiang(wangzhijiang999)
Hi Gerard,

From the jstack you provided, the task is serializing the output record and 
during this process it will not process the input data any more. 
It can not indicate out of memory issue from this stack. And if the output 
buffer is exhausted, the task will be blocked on requestBufferBlocking process.

I think the key point is your output record is too large and complicated 
structure, because every field and collection in this complicated class will be 
traversed to serialize, then it will cost much time and CPU usage. Furthermore, 
the checkpoint can not be done because of waiting for lock which is also 
occupied by task output process.

As you mentioned, it makes sense to check the data structure of the output 
record and reduces the size or make it lightweight to handle. 

Best,

Zhijiang


--
发件人:Gerard Garcia 
发送时间:2018年7月17日(星期二) 21:53
收件人:piotr 
抄 送:fhueske ; wangzhijiang999 ; 
user ; nico 
主 题:Re: Flink job hangs/deadlocks (possibly related to out of memory)

Yes, I'm using Flink 1.5.0 and what I'm serializing is a really big record 
(probably too big, we have already started working to reduce its size) which 
consists of several case classes which have (among others) fields of type 
String. 

I attach a CPU profile of the thread stuck serializing. I also attach the 
memory and GC telemetry that the profiler shows (which maybe is more 
informative than the one recorded from the JVM metrics). Only one node was 
actually "doing something" all others had CPU usage near zero.

The task is at the same time trying to perform a checkpoint but keeps failing. 
Would it make sense that the problem is that there is not enough memory 
available to perform the checkpoint so all operators are stuck waiting for it 
to finish, and at the same time, the operator stuck serializing is keeping all 
the memory so neither it nor the checkpoint can advance? 

I realized that I don't have a minimum pause between checkpoints so it is 
continuously trying. Maybe I can reduce the checkpoint timeout from the 10m 
default and introduce a minimum pause (e.g. 5m timeout and 5m minimum pause) 
and this way I could break the deadlock.

Gerard


On Tue, Jul 17, 2018 at 9:00 AM Piotr Nowojski  wrote:
Hi,

Thanks for the additional data. Just to make sure, are you using Flink 1.5.0?

There are a couple of threads that seams to be looping in serialisation, while 
others are blocked and either waiting for new data or waiting for some one to 
consume some data. Could you debug or CPU profile the code, in particularly 
focusing on threads with stack trace as below [1]. Aren’t you trying to 
serialise some gigantic String?

Piotrek

[1]:

"(...) (19/26)" #2737 prio=5 os_prio=0 tid=0x7f52584d2800 nid=0x6819 
runnable [0x7f451a843000]
   java.lang.Thread.State: RUNNABLE
 at 
org.apache.flink.core.memory.DataOutputSerializer.write(DataOutputSerializer.java:133)
 at org.apache.flink.types.StringValue.writeString(StringValue.java:812)
 at 
org.apache.flink.api.common.typeutils.base.StringSerializer.serialize(StringSerializer.java:64)
 at 
org.apache.flink.api.common.typeutils.base.StringSerializer.serialize(StringSerializer.java:28)
 at 
org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:113)
 at 
org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:32)
 at 
org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:113)
 at 
org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:32)
 at 
org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$serialize$1.apply(TraversableSerializer.scala:98)
 at 
org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$serialize$1.apply(TraversableSerializer.scala:93)
 at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
 at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
 at 
org.apache.flink.api.scala.typeutils.TraversableSerializer.serialize(TraversableSerializer.scala:93)
 at 
org.apache.flink.api.scala.typeutils.TraversableSerializer.serialize(TraversableSerializer.scala:33)
 at 
org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:113)
 at 
org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:32)
 at 
org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$serialize$1.apply(TraversableSerializer.scala:98)
 at 
org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$serialize$1.apply(TraversableSerializer.scala:93)
 at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
 at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
 at 
org.apache.flink.api.scala.typeutils.TraversableSerializer.serialize(TraversableSerializer.scala:93)
 at 

clear method on Window Trigger

2018-07-17 Thread Soheil Pourbafrani
Hi,

Can someone elaborate on when the clear method on class Trigger will be
called and what is the duty of that? Also, I don't know what is the benefit
of FIRE_AND_PURGE against FIRE and it's use case.

For example, in a scenario, if we have a count of 3 Window that also will
trigger after a timeout if we FIRE window on timeout and after a while, a
new data will be added to the data, will window be triggered for all data
or not just for new late data? I guess in such such case we should use the
FIRE_AND_PURGE.


RE: AvroInputFormat NullPointerException issues

2018-07-17 Thread Porritt, James
My MyAvroSchema class is as follows. It was generated using avro-tools:

/**
* Autogenerated by Avro
*
* DO NOT EDIT DIRECTLY
*/

import org.apache.avro.specific.SpecificData;
import org.apache.avro.message.BinaryMessageEncoder;
import org.apache.avro.message.BinaryMessageDecoder;
import org.apache.avro.message.SchemaStore;

@SuppressWarnings("all")
@org.apache.avro.specific.AvroGenerated
public class MyAvroSchema extends org.apache.avro.specific.SpecificRecordBase 
implements org.apache.avro.specific.SpecificRecord {
  private static final long serialVersionUID = 4994916517880671663L;
  public static final org.apache.avro.Schema SCHEMA$ = new 
org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"MyAvroSchema\",\"fields\":[{\"name\":\"a\",\"type\":[\"null\",\"int\"]},{\"name\":\"b\",\"type\":[\"null\",\"string\"]}]}");
  public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; }

  private static SpecificData MODEL$ = new SpecificData();

  private static final BinaryMessageEncoder ENCODER =
  new BinaryMessageEncoder(MODEL$, SCHEMA$);

  private static final BinaryMessageDecoder DECODER =
  new BinaryMessageDecoder(MODEL$, SCHEMA$);

  /**
   * Return the BinaryMessageDecoder instance used by this class.
   */
  public static BinaryMessageDecoder getDecoder() {
return DECODER;
  }

  /**
   * Create a new BinaryMessageDecoder instance for this class that uses the 
specified {@link SchemaStore}.
   * @param resolver a {@link SchemaStore} used to find schemas by fingerprint
   */
  public static BinaryMessageDecoder createDecoder(SchemaStore 
resolver) {
return new BinaryMessageDecoder(MODEL$, SCHEMA$, resolver);
  }

  /** Serializes this MyAvroSchema to a ByteBuffer. */
  public java.nio.ByteBuffer toByteBuffer() throws java.io.IOException {
return ENCODER.encode(this);
  }

  /** Deserializes a MyAvroSchema from a ByteBuffer. */
  public static MyAvroSchema fromByteBuffer(
  java.nio.ByteBuffer b) throws java.io.IOException {
return DECODER.decode(b);
  }

  @Deprecated public java.lang.Integer a;
  @Deprecated public java.lang.CharSequence b;

  /**
   * Default constructor.  Note that this does not initialize fields
   * to their default values from the schema.  If that is desired then
   * one should use newBuilder().
   */
  public MyAvroSchema() {}

  /**
   * All-args constructor.
   * @param a The new value for a
   * @param b The new value for b
   */
  public MyAvroSchema(java.lang.Integer a, java.lang.CharSequence b) {
this.a = a;
this.b = b;
  }

  public org.apache.avro.Schema getSchema() { return SCHEMA$; }
  // Used by DatumWriter.  Applications should not call.
  public java.lang.Object get(int field$) {
switch (field$) {
case 0: return a;
case 1: return b;
default: throw new org.apache.avro.AvroRuntimeException("Bad index");
}
  }

  // Used by DatumReader.  Applications should not call.
  @SuppressWarnings(value="unchecked")
  public void put(int field$, java.lang.Object value$) {
switch (field$) {
case 0: a = (java.lang.Integer)value$; break;
case 1: b = (java.lang.CharSequence)value$; break;
default: throw new org.apache.avro.AvroRuntimeException("Bad index");
}
  }

  /**
   * Gets the value of the 'a' field.
   * @return The value of the 'a' field.
   */
  public java.lang.Integer getA() {
return a;
  }

  /**
   * Sets the value of the 'a' field.
   * @param value the value to set.
   */
  public void setA(java.lang.Integer value) {
this.a = value;
  }

  /**
   * Gets the value of the 'b' field.
   * @return The value of the 'b' field.
   */
  public java.lang.CharSequence getB() {
return b;
  }

  /**
   * Sets the value of the 'b' field.
   * @param value the value to set.
   */
  public void setB(java.lang.CharSequence value) {
this.b = value;
  }

  /**
   * Creates a new MyAvroSchema RecordBuilder.
   * @return A new MyAvroSchema RecordBuilder
   */
  public static MyAvroSchema.Builder newBuilder() {
return new MyAvroSchema.Builder();
  }

  /**
   * Creates a new MyAvroSchema RecordBuilder by copying an existing Builder.
   * @param other The existing builder to copy.
   * @return A new MyAvroSchema RecordBuilder
   */
  public static MyAvroSchema.Builder newBuilder(MyAvroSchema.Builder other) {
return new MyAvroSchema.Builder(other);
  }

  /**
   * Creates a new MyAvroSchema RecordBuilder by copying an existing 
MyAvroSchema instance.
   * @param other The existing instance to copy.
   * @return A new MyAvroSchema RecordBuilder
   */
  public static MyAvroSchema.Builder newBuilder(MyAvroSchema other) {
return new MyAvroSchema.Builder(other);
  }

  /**
   * RecordBuilder for MyAvroSchema instances.
   */
  public static class Builder extends 
org.apache.avro.specific.SpecificRecordBuilderBase
implements org.apache.avro.data.RecordBuilder {

private java.lang.Integer a;
private java.lang.CharSequence b;

/** Creates a new 

FlinkCEP and scientific papers ?

2018-07-17 Thread Esa Heikkinen
Hi

I don't know this the correct forum to ask, but are there exist some good 
scientific papers about FlinkCEP (Complex Event Processing) ?

I know Flink is based to Stratosphere, but how is it FlinkCEP ?

BR Esa


RequiredParameters in Flink 1.5.1

2018-07-17 Thread Flavio Pompermaier
Hi to all,
I'm trying to migrate a job from Flink 1.3.1 to 1.5.1 but it seems that
RequiredParameters and ParameterTool works differently from before...

My code is the following:

ParameterTool parameters = ParameterTool.fromArgs(args);
RequiredParameters required = new RequiredParameters();
required.add(*getMyBooleanOption*());
required.applyTo(parameters);

where *getMyBooleanOption*() creates an Option 'xxx' with a default value
of "true".

In Flink 1.3.1 parameters.getBoolean(getMyBooleanOption().getName())
returns true while in 1.5.1 throws an exception:

Exception in thread "main" java.lang.RuntimeException: No data for required
key 'xxx'
at
org.apache.flink.api.java.utils.ParameterTool.getRequired(ParameterTool.java:289)
at
org.apache.flink.api.java.utils.ParameterTool.getBoolean(ParameterTool.java:427)

Is this a regression or is the inteded behaviour? This could be simply
fixed using

parameters.getBoolean(*getMyBooleanOption*().getName(),Boolean.valueOf(
*getMyBooleanOption*().getDefaultValue()));

but the previous mechanism was better (IMHO).

Thanks in advance,
Flavio


Re: Why is flink master bump version to 1.7?

2018-07-17 Thread 陈梓立
Yes I can see it now. Thank you all!

Till Rohrmann  于2018年7月17日周二 下午7:53写道:

> Yes, pulling from https://git-wip-us.apache.org/repos/asf/flink.git
> should show you the release-1.6 branch.
>
> Cheers,
> Till
>
> On Tue, Jul 17, 2018 at 10:37 AM Chesnay Schepler 
> wrote:
>
>> The release-1.6 branch exists (
>> https://git-wip-us.apache.org/repos/asf?p=flink.git;a=shortlog;h=refs/heads/release-1.6
>> ),
>> but wasn't synced to GitHub yet.
>>
>> On 17.07.2018 09:33, Timo Walther wrote:
>>
>> Hi Tison,
>>
>> I guess this was a mistake that will be fixed soon. Till (in CC) forked
>> off the release-1.6 branch yesterday?
>>
>> Regards,
>> Timo
>>
>> Am 17.07.18 um 04:00 schrieb 陈梓立:
>>
>> Hi,
>>
>> I see no 1.6 branch or tag. What's the reason we skip 1.6 and now
>> 1.7-SNAPSHOT? or there is a 1.6 I miss.
>>
>> Best,
>> tison
>>
>>
>>
>>


[ANNOUNCE] Weekly community update #29

2018-07-17 Thread Till Rohrmann
Dear community,

this is the weekly community update thread #29. Please post any news and
updates you want to share with the community to this thread.

# Feature freeze Flink 1.6

The Flink community has cut off the release branch for Flink 1.6 [1]. From
now on, the community will concentrate on fixing the last remaining release
blockers before Flink 1.6 will be released.

# Flink 1.5.1 released

The Flink community has released the first bug fix release 1.5.1 [2]
containing more than 60 fixes and improvements.

# Improve record serialization

Zhijiang kicked off a discussion about improving the way Flink serializes
records into multiple result subpartitions [3]. His proposal aims at
reducing redundant serialization overhead by sharing buffers across
multiple subpartitions. Please join the discussion if you want to learn
more.

[1]
https://git-wip-us.apache.org/repos/asf?p=flink.git;a=shortlog;h=refs/heads/release-1.6
[2]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/ANNOUNCE-Apache-Flink-1-5-1-released-td23300.html
[3]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Improve-broadcast-serialization-td23295.html

Cheers,
Till


Serialization questions

2018-07-17 Thread Flavio Pompermaier
Hi to all,
I was trying to check whether our jobs are properly typed or not.
I've started disabling generic types[1] in order to discover untyped
transformations and so I added the proper returns() to operators.

Unfortunately there are jobs where we serialize Thrift and DateTime
objects, so I need to properly configure the serializers in the
ExecutionEnvironment:

env.registerTypeWithKryoSerializer(DateTime.class,
JodaDateTimeSerializer.class);
env.getConfig().addDefaultKryoSerializer(EntitonAtom.class,
TBaseSerializer.class);
env.getConfig().addDefaultKryoSerializer(EntitonQuad.class,
TBaseSerializer.class);

Those jobs don't work when I disable generic types and I get the following
exception:

Exception in thread "main" java.lang.UnsupportedOperationException: Generic
types have been
disabled in the ExecutionConfig and type xxx.EntitonAtom is treated as a
generic type.

 I have a couple of questions:

   - addDefaultKryoSerializer differs from registerTypeWithKryoSerializer
   because addDefaultKryoSerializer use the passed serializer also for
   subclasses of the configured class. Am I right? This is not very clear in
   the method's Javadoc...
   - how to avoid that exception?

Best,
Flavio

[1] env.getConfig().disableGenericTypes();


Re: StateMigrationException when switching from TypeInformation.of to createTypeInformation

2018-07-17 Thread Till Rohrmann
Hi Elias,

I think introducing a new state and the deprecating the old one is
currently the only way to solve this problem.

The community is currently working on supporting state evolution [1]. With
this feature it should be possible to change serializers between two
savepoints. Unfortunately, the feature could not be completed for Flink
1.6. But I think it will be in the master soon.

[1] https://issues.apache.org/jira/browse/FLINK-9376

Cheers,
Till

On Sun, Jul 15, 2018 at 12:11 AM Elias Levy 
wrote:

> Apologies for the delay.  I've been traveling.
>
> On Mon, Jul 9, 2018 at 8:44 AM Till Rohrmann  wrote:
>
>> could you check whether the `TypeInformation` returned by
>> `TypeInformation.of(new TypeHint[ConfigState]() {}))` and
>> `createTypeInformation[ConfigState]` return the same `TypeInformation`
>> subtype? The problem is that the former goes through the Java TypeExtractor
>> whereas the latter goes through the Scala `TypeUtils#createTypeInfo` where
>> the resulting `TypeInformation` is created via Scala macros. It must be the
>> case that the Scala `TypeUtils` generate a different `TypeInformation`
>> (e.g. Java generating a GenericTypeInfo whereas Scala generates a
>> TraversableTypeInfo).
>>
>
> TypeInformation.of to returns a GenericTypeInfo and toString reports it
> as GenericType.
>
> createTypeInformation returns an anonymous class but toString reports it
> as interface scala.collection.mutable.Map[scala.Tuple2(_1: String, _2:
> scala.Tuple2(_1: GenericType, _2:
> byte[]))].
>
> Looks like you are correct about the Java version using GenericTypeInfo.
> I suppose the only way around this if we wanted to move over to 
> createTypeInformation
> is to release a job that supports both types and upgrade the state from one
> to the other, then drop support for the older state.  Yes?
>
> It would also be helpful if you could share the definition of
>> `ConfigState` in order to test it ourselves.
>>
>
> ConfigState is defined as type ConfigState =
> mutable.Map[String,ConfigStateValue] and ConfigStateValue is defined as type
> ConfigStateValue = (LazyObject,Array[Byte]).  LazyObject is from the
> Doubledutch LazyJSON  package.
>


Object reuse in DataStreams

2018-07-17 Thread Urs Schoenenberger
Hi all,

we came across some interesting behaviour today.
We enabled object reuse on a streaming job that looks like this:

stream = env.addSource(source)
stream.map(mapFnA).addSink(sinkA)
stream.map(mapFnB).addSink(sinkB)

Operator chaining is enabled, so the optimizer fuses all operations into
a single slot.
The same object reference gets passed to both mapFnA and mapFnB. This
makes sense when I think about the internal implementation, but it still
came as a bit of a surprise since the object reuse docs (for batch -
there are no official ones for streaming, right?) don't really deal with
splitting the DataSet/DataStream. I guess my case is *technically*
covered by the documented warning that it is unsafe to reuse an object
that has already been collected, only in this case this reuse is
"hidden" behind the stream definition DSL.

Is this the expected behaviour? Is object reuse for DataStreams
encouraged at all or is it more of a "hidden beta" feature until FLIP-21
is officially finished?

Best,
Urs

-- 
Urs Schönenberger - urs.schoenenber...@tngtech.com

TNG Technology Consulting GmbH, Beta-Straße 13a, 85774 Unterföhring
Geschäftsführer: Henrik Klagges, Dr. Robert Dahlke, Gerhard Müller
Sitz: Unterföhring * Amtsgericht München * HRB 135082


Re: rowTime from json nested timestamp field in SQL-Client

2018-07-17 Thread Timo Walther

Hi Ashwin,

if you quickly want to make this work you can look into 
org.apache.flink.table.descriptors.RowtimeValidator#getRowtimeComponents.


This is the component that converts the string property into a 
org.apache.flink.table.sources.tsextractors.TimestampExtractor. You can 
implement your custom timestamp extractor that performs some logic.


Regards,
Timo


Am 17.07.18 um 14:37 schrieb Ashwin Sinha:

Hi Timo,

We want to add this functionality in a forked branch. Can you guide us 
with steps to quickly apply a patch/fix for the same?


On Mon, Jul 16, 2018 at 9:06 PM Ashwin Sinha > wrote:


Thanks Timo for the clarification, but our processing actually
involves aggregations on huge past data also, which won't be
served by processing time.

Is this a WIP feature?

On Mon, Jul 16, 2018 at 7:29 PM Timo Walther mailto:twal...@apache.org>> wrote:

Hi Ashwin,

the SQL Client is in an early development stage right now and
has some limitations. Your problem is one of them. I files an
issue for this: https://issues.apache.org/jira/browse/FLINK-9864

There is no easy solution to fix this problem. Maybe you can
use processing-time for your windows?

Regards,
Timo

Am 16.07.18 um 14:25 schrieb Ashwin Sinha:

Hi Users,

In Flink1.5 SQL CLient

,
we are trying to define rowTime from a nested JSON element,
but struggling with syntax.

JSON data format: https://pastebin.com/ByCLhEnF
YML table config: https://pastebin.com/cgEtQPDQ

Now, in above config, we want to access
*payload.after.modifiedon *as rowTime column. We tried SQL
query  with aggregation on
'payload.after.modifiedon' as time but get this
 error.

Is there anyway where we can register nested timestamp field
as rowTime for the source table?

-- 
*Ashwin Sinha *| Data Engineer

ashwin.si...@go-mmt.com  |
9452075361



::DISCLAIMER::





This message is intended only for the use of the addressee
and may contain information that is privileged, confidential
and exempt from disclosure under applicable law. If the
reader of this message is not the intended recipient, or the
employee or agent responsible for delivering the message to
the intended recipient, you are hereby notified that any
dissemination, distribution or copying of this communication
is strictly prohibited. If you have received this e-mail in
error, please notify us immediately by return e-mail and
delete this e-mail and all attachments from your system.





-- 
*Ashwin Sinha *| Data Engineer

ashwin.si...@go-mmt.com  | 9452075361





--
*Ashwin Sinha *| Data Engineer
ashwin.si...@go-mmt.com  | 9452075361



::DISCLAIMER::




This message is intended only for the use of the addressee and may 
contain information that is privileged, confidential and exempt from 
disclosure under applicable law. If the reader of this message is not 
the intended recipient, or the employee or agent responsible for 
delivering the message to the intended recipient, you are hereby 
notified that any dissemination, distribution or copying of this 
communication is strictly prohibited. If you have received this e-mail 
in error, please notify us immediately by return e-mail and delete 
this e-mail and all attachments from your system.






Re: rowTime from json nested timestamp field in SQL-Client

2018-07-17 Thread Ashwin Sinha
Hi Timo,

We want to add this functionality in a forked branch. Can you guide us with
steps to quickly apply a patch/fix for the same?

On Mon, Jul 16, 2018 at 9:06 PM Ashwin Sinha 
wrote:

> Thanks Timo for the clarification, but our processing actually involves
> aggregations on huge past data also, which won't be served by processing
> time.
>
> Is this a WIP feature?
>
> On Mon, Jul 16, 2018 at 7:29 PM Timo Walther  wrote:
>
>> Hi Ashwin,
>>
>> the SQL Client is in an early development stage right now and has some
>> limitations. Your problem is one of them. I files an issue for this:
>> https://issues.apache.org/jira/browse/FLINK-9864
>>
>> There is no easy solution to fix this problem. Maybe you can use
>> processing-time for your windows?
>>
>> Regards,
>> Timo
>>
>> Am 16.07.18 um 14:25 schrieb Ashwin Sinha:
>>
>> Hi Users,
>>
>> In Flink1.5 SQL CLient
>> ,
>> we are trying to define rowTime from a nested JSON element, but struggling
>> with syntax.
>>
>> JSON data format: https://pastebin.com/ByCLhEnF
>> YML table config: https://pastebin.com/cgEtQPDQ
>>
>> Now, in above config, we want to access *payload.after.modifiedon *as
>> rowTime column. We tried SQL query  with
>> aggregation on 'payload.after.modifiedon' as time but get this
>>  error.
>>
>> Is there anyway where we can register nested timestamp field as rowTime
>> for the source table?
>>
>> --
>> *Ashwin Sinha *| Data Engineer
>> ashwin.si...@go-mmt.com  | 9452075361
>>  
>> 
>>
>> ::DISCLAIMER::
>>
>>
>> 
>>
>>
>> This message is intended only for the use of the addressee and may
>> contain information that is privileged, confidential and exempt from
>> disclosure under applicable law. If the reader of this message is not the
>> intended recipient, or the employee or agent responsible for delivering the
>> message to the intended recipient, you are hereby notified that any
>> dissemination, distribution or copying of this communication is strictly
>> prohibited. If you have received this e-mail in error, please notify us
>> immediately by return e-mail and delete this e-mail and all attachments
>> from your system.
>>
>>
>>
>
> --
> *Ashwin Sinha *| Data Engineer
> ashwin.si...@go-mmt.com  | 9452075361
>  
> 
>


-- 
*Ashwin Sinha *| Data Engineer
ashwin.si...@go-mmt.com  | 9452075361
 


-- 


::DISCLAIMER::








This message is intended only for the use of the addressee and may 
contain information that is privileged, confidential and exempt from 
disclosure under applicable law. If the reader of this message is not the 
intended recipient, or the employee or agent responsible for delivering the 
message to the intended recipient, you are hereby notified that any 
dissemination, distribution or copying of this communication is strictly 
prohibited. If you have received this e-mail in error, please notify us 
immediately by return e-mail and delete this e-mail and all attachments 
from your system.


Re: Flink CLI properties with HA

2018-07-17 Thread Till Rohrmann
Hi Sampath,

technically the client does not need to know the
`high-availability.storageDir` to submit a job. However, due to how we
construct the ZooKeeperHaServices it is still needed. The reason behind
this is that we use the same services for the server and the client. Thus,
the implementation needs to know the storageDir in both cases. The way it
should be done is to split the HighAvailabilityServices up into client and
server services. The former would then not depend on
`high-availability.storageDir`.

Cheers,
Till

On Tue, Jul 17, 2018 at 1:31 PM vino yang  wrote:

> Hi Sampath,
>
> It seems Flink CLI for standalone would not access
> *high-availability.storageDir.*
>
> What's the exception stack trace in your environment?
>
> Thanks, vino.
>
> 2018-07-17 15:08 GMT+08:00 Sampath Bhat :
>
>> Hi vino
>>
>> Should the flink CLI have access to the path mentioned in
>> *high-availability.storageDir*?
>> If my flink cluster is on set of machines and i submit my job from flink
>> CLI from another independent machine by giving necessary details will the
>> CLI try to access *high-availability.storageDir *path?
>>
>> I'm aware of the fact that flink client will connect to zookeeper to get
>> leader address and necessary information for job submission but my
>> confusion is with *high-availability.storageDir* and its necessity in
>> flink CLI configuration.
>>
>> On Mon, Jul 16, 2018 at 2:44 PM, vino yang  wrote:
>>
>>> Hi Sampath,
>>>
>>> Flink CLI need to retrieve the JobManager leader address, so it need  to
>>> access the HA specific configuration. Because if based on Zookeeper to
>>> implement the HA, the leader address information will fetch from Zookeeper.
>>>
>>> The main use of config item *high-availability.storageDir* is storage
>>> (Job graph, checkpoint and so on). Actually, the real data is stored under
>>> this path which used to recover purpose, zookeeper just store a state
>>> handle.
>>>
>>> ---
>>> Thanks.
>>> vino.
>>>
>>>
>>> 2018-07-16 15:28 GMT+08:00 Sampath Bhat :
>>>

 -- Forwarded message --
 From: Sampath Bhat 
 Date: Fri, Jul 13, 2018 at 3:18 PM
 Subject: Flink CLI properties with HA
 To: user 


 Hello

 When HA is enabled in the flink cluster and if I've to submit job via
 flink CLI then in the flink-conf.yaml of flink CLI should contain this
 properties -
 high-availability: zookeeper
 high-availability.cluster-id: flink
 high-availability.zookeeper.path.root: flink
 high-availability.storageDir: 
 high-availability.zookeeper.quorum: 

 What is the need of high-availability.storageDir for flink CLI. Does
 this mean that even flink client should be able to access the mentioned
 path or is it some check being done on the property name?

 Without these properties flink cli will not be able to submit job to
 flink cluster when HA is enabled.


>>>
>>
>


Re: Why is flink master bump version to 1.7?

2018-07-17 Thread Till Rohrmann
Yes, pulling from https://git-wip-us.apache.org/repos/asf/flink.git should
show you the release-1.6 branch.

Cheers,
Till

On Tue, Jul 17, 2018 at 10:37 AM Chesnay Schepler 
wrote:

> The release-1.6 branch exists (
> https://git-wip-us.apache.org/repos/asf?p=flink.git;a=shortlog;h=refs/heads/release-1.6
> ),
> but wasn't synced to GitHub yet.
>
> On 17.07.2018 09:33, Timo Walther wrote:
>
> Hi Tison,
>
> I guess this was a mistake that will be fixed soon. Till (in CC) forked
> off the release-1.6 branch yesterday?
>
> Regards,
> Timo
>
> Am 17.07.18 um 04:00 schrieb 陈梓立:
>
> Hi,
>
> I see no 1.6 branch or tag. What's the reason we skip 1.6 and now
> 1.7-SNAPSHOT? or there is a 1.6 I miss.
>
> Best,
> tison
>
>
>
>


Re: Flink CLI properties with HA

2018-07-17 Thread vino yang
Hi Sampath,

It seems Flink CLI for standalone would not access
*high-availability.storageDir.*

What's the exception stack trace in your environment?

Thanks, vino.

2018-07-17 15:08 GMT+08:00 Sampath Bhat :

> Hi vino
>
> Should the flink CLI have access to the path mentioned in
> *high-availability.storageDir*?
> If my flink cluster is on set of machines and i submit my job from flink
> CLI from another independent machine by giving necessary details will the
> CLI try to access *high-availability.storageDir *path?
>
> I'm aware of the fact that flink client will connect to zookeeper to get
> leader address and necessary information for job submission but my
> confusion is with *high-availability.storageDir* and its necessity in
> flink CLI configuration.
>
> On Mon, Jul 16, 2018 at 2:44 PM, vino yang  wrote:
>
>> Hi Sampath,
>>
>> Flink CLI need to retrieve the JobManager leader address, so it need  to
>> access the HA specific configuration. Because if based on Zookeeper to
>> implement the HA, the leader address information will fetch from Zookeeper.
>>
>> The main use of config item *high-availability.storageDir* is storage
>> (Job graph, checkpoint and so on). Actually, the real data is stored under
>> this path which used to recover purpose, zookeeper just store a state
>> handle.
>>
>> ---
>> Thanks.
>> vino.
>>
>>
>> 2018-07-16 15:28 GMT+08:00 Sampath Bhat :
>>
>>>
>>> -- Forwarded message --
>>> From: Sampath Bhat 
>>> Date: Fri, Jul 13, 2018 at 3:18 PM
>>> Subject: Flink CLI properties with HA
>>> To: user 
>>>
>>>
>>> Hello
>>>
>>> When HA is enabled in the flink cluster and if I've to submit job via
>>> flink CLI then in the flink-conf.yaml of flink CLI should contain this
>>> properties -
>>> high-availability: zookeeper
>>> high-availability.cluster-id: flink
>>> high-availability.zookeeper.path.root: flink
>>> high-availability.storageDir: 
>>> high-availability.zookeeper.quorum: 
>>>
>>> What is the need of high-availability.storageDir for flink CLI. Does
>>> this mean that even flink client should be able to access the mentioned
>>> path or is it some check being done on the property name?
>>>
>>> Without these properties flink cli will not be able to submit job to
>>> flink cluster when HA is enabled.
>>>
>>>
>>
>


Re: Global latency metrics

2018-07-17 Thread Chesnay Schepler

No, you can only get the latency for each operator.

For starters, how would a global latency even account for multiple 
sources/sink?


On 17.07.2018 10:22, shimin yang wrote:

Hi All,

Is there a method to get the global latency directly? Since I only 
find the latency for each operator in the Flink Rest API.


Best,

Shimin





Re: Why is flink master bump version to 1.7?

2018-07-17 Thread Chesnay Schepler
The release-1.6 branch exists 
(https://git-wip-us.apache.org/repos/asf?p=flink.git;a=shortlog;h=refs/heads/release-1.6),

but wasn't synced to GitHub yet.

On 17.07.2018 09:33, Timo Walther wrote:

Hi Tison,

I guess this was a mistake that will be fixed soon. Till (in CC) 
forked off the release-1.6 branch yesterday?


Regards,
Timo

Am 17.07.18 um 04:00 schrieb 陈梓立:

Hi,

I see no 1.6 branch or tag. What's the reason we skip 1.6 and now 
1.7-SNAPSHOT? or there is a 1.6 I miss.


Best,
tison







Re: Flink WindowedStream - Need assistance

2018-07-17 Thread Titus Rakkesh
Friends, any assistance regarding this?

On Mon, Jul 16, 2018 at 3:34 PM, Titus Rakkesh 
wrote:

> We have 2 independent streams which will receive elements in different
> frequency,
>
> DataStream> splittedActivationTuple;
>
> DataStream> unionReloadsStream;
>
> We have a requirement to keep "splittedActivationTuple" stream elements in
> a Window of eviction time period of 24 hours. So I created a
> "WindowedStream" like below,
>
> WindowedStream, Tuple, GlobalWindow> 
> keyedWindowedActStream = splittedActivationTuple
> .assignTimestampsAndWatermarks(new 
> IngestionTimeExtractor()).keyBy(0).window(GlobalWindows.create())
> .evictor(TimeEvictor.of(Time.of(24, TimeUnit.HOURS)));
>
> Our requirements are following,
>
>1.
>
>When "unionReloadsStream" receives data, we need to check whether the
>corresponding "String" field matches with the "String" field in the
>WindowedStream and accumulate "WindowedStream's" Double with
>"unionReloadsStream" Double.Will this possible with Flink? I checked
>CoGroup and CoMap. But I couldn't figure out how to do since I am new.
>2.
>
>CEP functionality to create a new Stream of from WindowedStream if the
>Double value > 100? I went through several flink's CEP tutorials. But
>couldn't able to figure out how to do with "WindowedStream"?
>
> I am very new to flink. Any assistance would be highly appreciated.
>
> Thanks.
>
>


Re: Parallelism and keyed streams

2018-07-17 Thread Nicholas Walton
Martin,

To clarify things the code causing the issue is here, nothing clever. The code 
fails at the line in bold. The Long index values are set earlier in sequence 
1,2,3,4,5,6,7…...

val scaledReadings : DataStream[(Int,Long, Double, Double)] = maxChannelReading
  .keyBy(0)
  .map { in =>
LOG.info (s"scaledReadings $in")
(in._1, in._2, in._3/in._4 + 2.0D, in._3) }


 val logRatioWindow: DataStream[(Int,Long, Int, Double)] = scaledReadings
  .keyBy(0)
  .countWindow(100, 99)
  .process(new logRatioWindowFunction() )


and

class logRatioWindowFunction extends ProcessWindowFunction[(Int, Long, Double, 
Double), (Int, Long, Int, Double), org.apache.flink.api.java.tuple.Tuple, 
GlobalWindow] {

  def process(key: Tuple, context: logRatioWindowFunction.this.Context, input: 
Iterable[(Int, Long, Double, Double)], out: Collector[(Int, Long, Int, 
Double)]) = 
  {

val a: Array[(Int, Long, Double, Double)] = input toArray;
val ch = a(0)._1
val s = a(0)._2
val l = input.size

if (l < 100) Job.LOG.info (s"Log ratio window length 
$l on channel $ch at sample $s")

for (i <- 1 to a.size - 1) assert (a(i)._2 == a(i-1)._2+1, 
"logRatioWindowFunction:Failure non-monotonic indexes "+  a(i-1)._2 + " and " + 
a(i)._2 )

if (l == 100) {
  for (i <- 0 to l-2) {
val v: Int = rint(100 * log (E + a(i+1)._3 / a(i)._3)) toInt;
assert(v > 0, "Bad minhash in medianLogRatioWindowFunction " + v)
Job.LOG.debug("logRatioWindowFunction [" + a(i+1)._1 + ", " + a(i+1)._2 
+ ", " +  v+ ", " +  a(i+1)._4 +"]")
out.collect(scala.Tuple4(a(i+1)._1, a(i+1)._2, v, a(i+1)._4))
  }
  Job.LOG.debug("logRatioWindowFunction [" + a.head._1 + ", " + a.head._2 + 
" ... " + a.last._2 +"] collected")
}
  }

}


> On 17 Jul 2018, at 00:15, Martin, Nick  > wrote:
> 
> Is value(index-1) stored in Keyed State, or just a local variable inside the 
> operator?
> 
> -Original Message-
> From: Nicholas Walton [mailto:nwal...@me.com ] 
> Sent: Monday, July 16, 2018 1:33 PM
> To: user@flink.apache.org 
> Subject: Parallelism and keyed streams
> 
> I have a stream of tuples  , which 
> I form into a keyedStream using keyBy on channel. I then need to process each 
> channel in parallel. Each parallel stream must be processed in strict 
> sequential order by index to calculate the ratios 
> value(index)/value(index-1). If I set parallelism to 1 all is well, each 
> channel is processed in order of index 1,2,3,,4…
> 
> My problem is when I set parallelism to a value greater than 1 each channel’s 
> keyedStream  appears to be split across multiple processes. So a channel may 
> be processed wrongly for example  as value(2), value(5), Value(6) , 
> value(9)…..
> 
> The number of channels N is unknown. So how do I rig up N processing streams 
> with an unknown parallelism so that each stream processes each channel by 
> strictly increasing index v(1),v(2),…..v(t),v(t+1),…..v(t+n)
> 
> Thanks in advance
> 
> NIck Walton
> 
> 
> --
> 
> Notice: This e-mail is intended solely for use of the individual or entity to 
> which it is addressed and may contain information that is proprietary, 
> privileged and/or exempt from disclosure under applicable law. If the reader 
> is not the intended recipient or agent responsible for delivering the message 
> to the intended recipient, you are hereby notified that any dissemination, 
> distribution or copying of this communication is strictly prohibited. This 
> communication may also contain data subject to U.S. export laws. If so, data 
> subject to the International Traffic in Arms Regulation cannot be 
> disseminated, distributed, transferred, or copied, whether incorporated or in 
> its original form, to foreign nationals residing in the U.S. or abroad, 
> absent the express prior approval of the U.S. Department of State. Data 
> subject to the Export Administration Act may not be disseminated, 
> distributed, transferred or copied contrary to U. S. Department of Commerce 
> regulations. If you have received this communication in error, please notify 
> the sender by reply e-mail and destroy the e-mail message and any physical 
> copies made of the communication.
> Thank you. 
> *



Re: Can not get OutPutTag datastream from Windowing function

2018-07-17 Thread Xingcan Cui
Hi Soheil,

The `getSideOutput()` method is defined on the operator instead of the 
datastream.
You can invoke it after any action (e.g., map, window) performed on a 
datastream.

Best,
Xingcan

> On Jul 17, 2018, at 3:36 PM, Soheil Pourbafrani  wrote:
> 
> Hi, according to the documents I tried to get late data using side output.
> 
> final OutputTag> lateOutputTag = new 
> OutputTag>("late-data"){};
> DataStream> res = aggregatedTuple
> .assignTimestampsAndWatermarks(new Bound())
> }).keyBy(1).timeWindow(Time.milliseconds(160))/*.countWindow(3)*/
> .allowedLateness(Time.milliseconds(2))
> .sideOutputLateData(lateOutputTag)
> .reduce(Do some process);
> 
> When trying to store late data in a Datastream (As shown in document):
> DataStream> lateData = res.
> there is no predefined getSideOutput method on DataStream res!
> But if I call getSideOutput just after reduce function, it is known! But I 
> don't want to save late data on res variable and I want to save them on 
> another variable!
> DataStream> res = aggregatedTuple
> .assignTimestampsAndWatermarks(new Bound())
> }).keyBy(1).timeWindow(Time.milliseconds(160))/*.countWindow(3)*/
> .allowedLateness(Time.milliseconds(2))
> .sideOutputLateData(lateOutputTag)
> .reduce(Do some process)
>  .getSideoutput(lateOutputTag);
> What is the problem here?
> 
> 



Re: Can not get OutPutTag datastream from Windowing function

2018-07-17 Thread Dawid Wysakowicz
Hi Soheil,

The /getSideOutput/ method is part of /SingleOutputStreamOperator/ which
extends /DataStream///. Try using /SingleOutputStreamOperator/ as the
type for your res variable.

Best,

Dawid


On 17/07/18 09:36, Soheil Pourbafrani wrote:
> Hi, according to the documents I tried to get late data using side
> output.
>
> final OutputTag> lateOutputTag = new 
> OutputTag>("late-data"){};
> DataStream> res = aggregatedTuple
> .assignTimestampsAndWatermarks(new Bound())
> }).keyBy(1).timeWindow(Time.milliseconds(160))/*.countWindow(3)*/ 
> .allowedLateness(Time.milliseconds(2))
> .sideOutputLateData(lateOutputTag)
> .reduce(Do some process);
>
> When trying to store late data in a Datastream (As shown in document):
> DataStream> lateData = res.
> there is no predefined getSideOutput method on DataStream res!
> But if I call getSideOutput just after reduce function, it is known!
> But I don't want to save late data on res variable and I want to save
> them on another variable!
> DataStream> res = aggregatedTuple
> .assignTimestampsAndWatermarks(new Bound())
> }).keyBy(1).timeWindow(Time.milliseconds(160))/*.countWindow(3)*/ 
> .allowedLateness(Time.milliseconds(2))
> .sideOutputLateData(lateOutputTag)
> .reduce(Do some process)
>  .getSideoutput(lateOutputTag);
> What is the problem here?
>
>



signature.asc
Description: OpenPGP digital signature


Can not get OutPutTag datastream from Windowing function

2018-07-17 Thread Soheil Pourbafrani
Hi, according to the documents I tried to get late data using side output.

final OutputTag> lateOutputTag = new
OutputTag>("late-data"){};

DataStream> res = aggregatedTuple
.assignTimestampsAndWatermarks(new Bound())
}).keyBy(1).timeWindow(Time.milliseconds(160))/*.countWindow(3)*/
.allowedLateness(Time.milliseconds(2))
.sideOutputLateData(lateOutputTag)
.reduce(Do some process);


When trying to store late data in a Datastream (As shown in document):

DataStream> lateData = res.

there is no predefined getSideOutput method on DataStream res!
But if I call getSideOutput just after reduce function, it is known! But I
don't want to save late data on res variable and I want to save them on
another variable!

DataStream> res = aggregatedTuple
.assignTimestampsAndWatermarks(new Bound())
}).keyBy(1).timeWindow(Time.milliseconds(160))/*.countWindow(3)*/
.allowedLateness(Time.milliseconds(2))
.sideOutputLateData(lateOutputTag)
.reduce(Do some process)

 .getSideoutput(lateOutputTag);

What is the problem here?


Re: Why is flink master bump version to 1.7?

2018-07-17 Thread Timo Walther

Hi Tison,

I guess this was a mistake that will be fixed soon. Till (in CC) forked 
off the release-1.6 branch yesterday?


Regards,
Timo

Am 17.07.18 um 04:00 schrieb 陈梓立:

Hi,

I see no 1.6 branch or tag. What's the reason we skip 1.6 and now 
1.7-SNAPSHOT? or there is a 1.6 I miss.


Best,
tison





Re: Flink CLI properties with HA

2018-07-17 Thread Sampath Bhat
Hi vino

Should the flink CLI have access to the path mentioned in
*high-availability.storageDir*?
If my flink cluster is on set of machines and i submit my job from flink
CLI from another independent machine by giving necessary details will the
CLI try to access *high-availability.storageDir *path?

I'm aware of the fact that flink client will connect to zookeeper to get
leader address and necessary information for job submission but my
confusion is with *high-availability.storageDir* and its necessity in flink
CLI configuration.

On Mon, Jul 16, 2018 at 2:44 PM, vino yang  wrote:

> Hi Sampath,
>
> Flink CLI need to retrieve the JobManager leader address, so it need  to
> access the HA specific configuration. Because if based on Zookeeper to
> implement the HA, the leader address information will fetch from Zookeeper.
>
> The main use of config item *high-availability.storageDir* is storage
> (Job graph, checkpoint and so on). Actually, the real data is stored under
> this path which used to recover purpose, zookeeper just store a state
> handle.
>
> ---
> Thanks.
> vino.
>
>
> 2018-07-16 15:28 GMT+08:00 Sampath Bhat :
>
>>
>> -- Forwarded message --
>> From: Sampath Bhat 
>> Date: Fri, Jul 13, 2018 at 3:18 PM
>> Subject: Flink CLI properties with HA
>> To: user 
>>
>>
>> Hello
>>
>> When HA is enabled in the flink cluster and if I've to submit job via
>> flink CLI then in the flink-conf.yaml of flink CLI should contain this
>> properties -
>> high-availability: zookeeper
>> high-availability.cluster-id: flink
>> high-availability.zookeeper.path.root: flink
>> high-availability.storageDir: 
>> high-availability.zookeeper.quorum: 
>>
>> What is the need of high-availability.storageDir for flink CLI. Does this
>> mean that even flink client should be able to access the mentioned path or
>> is it some check being done on the property name?
>>
>> Without these properties flink cli will not be able to submit job to
>> flink cluster when HA is enabled.
>>
>>
>


Re: Flink job hangs/deadlocks (possibly related to out of memory)

2018-07-17 Thread Piotr Nowojski
Hi,

Thanks for the additional data. Just to make sure, are you using Flink 1.5.0?

There are a couple of threads that seams to be looping in serialisation, while 
others are blocked and either waiting for new data or waiting for some one to 
consume some data. Could you debug or CPU profile the code, in particularly 
focusing on threads with stack trace as below [1]. Aren’t you trying to 
serialise some gigantic String?

Piotrek

[1]:

"(...) (19/26)" #2737 prio=5 os_prio=0 tid=0x7f52584d2800 nid=0x6819 
runnable [0x7f451a843000]
   java.lang.Thread.State: RUNNABLE
at 
org.apache.flink.core.memory.DataOutputSerializer.write(DataOutputSerializer.java:133)
at org.apache.flink.types.StringValue.writeString(StringValue.java:812)
at 
org.apache.flink.api.common.typeutils.base.StringSerializer.serialize(StringSerializer.java:64)
at 
org.apache.flink.api.common.typeutils.base.StringSerializer.serialize(StringSerializer.java:28)
at 
org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:113)
at 
org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:32)
at 
org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:113)
at 
org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:32)
at 
org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$serialize$1.apply(TraversableSerializer.scala:98)
at 
org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$serialize$1.apply(TraversableSerializer.scala:93)
at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at 
org.apache.flink.api.scala.typeutils.TraversableSerializer.serialize(TraversableSerializer.scala:93)
at 
org.apache.flink.api.scala.typeutils.TraversableSerializer.serialize(TraversableSerializer.scala:33)
at 
org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:113)
at 
org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:32)
at 
org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$serialize$1.apply(TraversableSerializer.scala:98)
at 
org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$serialize$1.apply(TraversableSerializer.scala:93)
at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at 
org.apache.flink.api.scala.typeutils.TraversableSerializer.serialize(TraversableSerializer.scala:93)
at 
org.apache.flink.api.scala.typeutils.TraversableSerializer.serialize(TraversableSerializer.scala:33)
at 
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:177)
at 
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:49)
at 
org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:54)
at 
org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:88)
at 
org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:129)
at 
org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:105)
at 
org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordWriter.java:81)
at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107)
at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingBroadcastingOutputCollector.collect(OperatorChain.java:667)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingBroadcastingOutputCollector.collect(OperatorChain.java:653)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:679)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:657)
at 
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
at (...)
at (...)
at 
org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:50)
at 
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
- locked 

Re: Persisting Table in Flink API

2018-07-17 Thread Shivam Sharma
Thanks, Vino & Hequn.

On Mon, Jul 16, 2018 at 5:47 PM Hequn Cheng  wrote:

> Hi Shivam,
>
> I think the non-window stream-stream join can solve your problem.
> The non-window join will store all data from both inputs and output joined
> results. The semantics of non-window join is exactly the same with batch
> join.
> One important thing to note is that the state of join might grow
> infinitely depending on the number of distinct input rows, so please
> provide a query configuration with valid retention interval[1] to prevent
> excessive state size.
>
> Let me know If you have any other confusions.
>
> Best, Hequn
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming.html#idle-state-retention-time
>
> On Mon, Jul 16, 2018 at 5:18 PM, Shivam Sharma <28shivamsha...@gmail.com>
> wrote:
>
>> Hi Vino,
>>
>> First I want to tell you that we are working on Flink SQL so there is no
>> chance to use Data Stream API.
>>
>> I will give one example of my use case here:-
>>
>> Let's say we have two Kafka Topics:
>>
>>1. UserName to UserId Mapping => {"userName": "shivam", "userId": 123}
>>2. User transactions information in which username is coming. => {"user":
>>"shivam", "transactionAmount": 3250}
>>
>> Final result should be like this  => {"user": "shivam", "userId": 123,
>> "transactionAmount": 3250}
>>
>> SQL Query for this: SELECT t2.user, t1.userID, t2.transactionAmount from
>> userTable as t1 join transactionTable as t2 on t1.userName = t2.user
>>
>> Now, whenever a transaction happens then we need to add UserId also in
>> the record using Flink SQL. We need to join these two streams. So need to
>> store userName to id mapping somewhere like in RocksDB
>>
>> Thanks
>>
>> On Mon, Jul 16, 2018 at 12:04 PM vino yang  wrote:
>>
>>> Hi Shivam,
>>>
>>> Can you provide more details about your use case? The join for batch or
>>> streaming? which join type (window or non-window or stream-dimension table
>>> join)?
>>>
>>> If it is stream-dimension table join and the table is huge, use Redis
>>> or some cache based on memory, can help to process your problem. And you
>>> can customize the flink's physical plan (like Hequn said) and use async
>>> operator to optimize access to the third-party system.
>>>
>>> Thanks,
>>> Vino yang.
>>>
>>> 2018-07-16 9:17 GMT+08:00 Hequn Cheng :
>>>
 Hi Shivam,

 Currently, fink sql/table-api support window join and non-window
 join[1].
 If your requirements are not being met by sql/table-api, you can also
 use the datastream to implement your own logic. You can refer to the
 non-window join implement as an example[2][3].

 Best, Hequn

 [1]
 https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql.html#joins
 [2]
 https://github.com/apache/flink/blob/master/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamJoin.scala
 [3]
 https://github.com/apache/flink/blob/master/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowInnerJoin.scala

 On Sun, Jul 15, 2018 at 11:29 PM, Shivam Sharma <
 28shivamsha...@gmail.com> wrote:

> Hi,
>
> We have one use case in which we need to persist Table in Flink which
> can be later used to join with other tables. This table can be huge so we
> need to store it in off-heap but faster access. Any suggestions regarding
> this?
>
> --
> Shivam Sharma
> Data Engineer @ Goibibo
> Indian Institute Of Information Technology, Design and Manufacturing
> Jabalpur
> Mobile No- (+91) 8882114744
> Email:- 28shivamsha...@gmail.com
> LinkedIn:-*https://www.linkedin.com/in/28shivamsharma
> *
>


>>>
>>
>> --
>> Shivam Sharma
>> Data Engineer @ Goibibo
>> Indian Institute Of Information Technology, Design and Manufacturing
>> Jabalpur
>> Mobile No- (+91) 8882114744
>> Email:- 28shivamsha...@gmail.com
>> LinkedIn:-*https://www.linkedin.com/in/28shivamsharma
>> *
>>
>
>

-- 
Shivam Sharma
Data Engineer @ Goibibo
Indian Institute Of Information Technology, Design and Manufacturing
Jabalpur
Mobile No- (+91) 8882114744
Email:- 28shivamsha...@gmail.com
LinkedIn:-*https://www.linkedin.com/in/28shivamsharma
*