Re: NoClassDefFoundError of a Avro class after cancel then resubmit the same job

2018-02-22 Thread Elias Levy
Something seems to be off with the user code class loader.  The only way I
can get my job to start is if I drop the job into the lib folder in the JM
and configure the JM's classloader.resolve-order to parent-first.

Suggestions?

On Thu, Feb 22, 2018 at 12:52 PM, Elias Levy 
wrote:

> I am currently suffering through similar issues.
>
> Had a job running happily, but when it the cluster tried to restarted it
> would not find the JSON serializer in it. The job kept trying to restart in
> a loop.
>
> Just today I was running a job I built locally.  The job ran fine.  I
> added two commits and rebuilt the jar.  Now the job dies when it tries to
> start claiming it can't find the time assigner class.  I've unzipped the
> job jar, both locally and in the TM blob directory and have confirmed the
> class is in it.
>
> This is the backtrace:
>
> java.lang.ClassNotFoundException: com.foo.flink.common.util.TimeAssigner
>   at java.net.URLClassLoader.findClass(Unknown Source)
>   at java.lang.ClassLoader.loadClass(Unknown Source)
>   at sun.misc.Launcher$AppClassLoader.loadClass(Unknown Source)
>   at java.lang.ClassLoader.loadClass(Unknown Source)
>   at java.lang.Class.forName0(Native Method)
>   at java.lang.Class.forName(Unknown Source)
>   at 
> org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:73)
>   at java.io.ObjectInputStream.readNonProxyDesc(Unknown Source)
>   at java.io.ObjectInputStream.readClassDesc(Unknown Source)
>   at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
>   at java.io.ObjectInputStream.readObject0(Unknown Source)
>   at java.io.ObjectInputStream.readObject(Unknown Source)
>   at 
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:393)
>   at 
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:380)
>   at 
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:368)
>   at 
> org.apache.flink.util.SerializedValue.deserializeValue(SerializedValue.java:58)
>   at 
> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.createPartitionStateHolders(AbstractFetcher.java:542)
>   at 
> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.(AbstractFetcher.java:167)
>   at 
> org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.(Kafka09Fetcher.java:89)
>   at 
> org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.(Kafka010Fetcher.java:62)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010.createFetcher(FlinkKafkaConsumer010.java:203)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:564)
>   at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:86)
>   at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55)
>   at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:94)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>   at java.lang.Thread.run(Unknown Source)
>
>
> On Tue, Jan 23, 2018 at 7:51 AM, Stephan Ewen  wrote:
>
>> Hi!
>>
>> We changed a few things between 1.3 and 1.4 concerning Avro. One of the
>> main things is that Avro is no longer part of the core Flink class library,
>> but needs to be packaged into your application jar file.
>>
>> The class loading / caching issues of 1.3 with respect to Avro should
>> disappear in Flink 1.4, because Avro classes and caches are scoped to the
>> job classloaders, so the caches do not go across different jobs, or even
>> different operators.
>>
>>
>> *Please check: Make sure you have Avro as a dependency in your jar file
>> (in scope "compile").*
>>
>> Hope that solves the issue.
>>
>> Stephan
>>
>>
>> On Mon, Jan 22, 2018 at 2:31 PM, Edward  wrote:
>>
>>> Yes, we've seen this issue as well, though it usually takes many more
>>> resubmits before the error pops up. Interestingly, of the 7 jobs we run
>>> (all
>>> of which use different Avro schemas), we only see this issue on 1 of
>>> them.
>>> Once the NoClassDefFoundError crops up though, it is necessary to
>>> recreate
>>> the task managers.
>>>
>>> There's a whole page on the Flink documentation on debugging
>>> classloading,
>>> and Avro is mentioned several times on that page:
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/
>>> monitoring/debugging_classloading.html
>>>
>>> It seems that (in 1.3 at least) each submitted job has its own
>>> classloader,
>>> and its own instance of the Avro class definitions. However, the Avro
>>> class
>>> cache will keep references to the Avro classes from 

Re: NoClassDefFoundError of a Avro class after cancel then resubmit the same job

2018-02-22 Thread Elias Levy
I am currently suffering through similar issues.

Had a job running happily, but when it the cluster tried to restarted it
would not find the JSON serializer in it. The job kept trying to restart in
a loop.

Just today I was running a job I built locally.  The job ran fine.  I added
two commits and rebuilt the jar.  Now the job dies when it tries to start
claiming it can't find the time assigner class.  I've unzipped the job jar,
both locally and in the TM blob directory and have confirmed the class is
in it.

This is the backtrace:

java.lang.ClassNotFoundException: com.foo.flink.common.util.TimeAssigner
at java.net.URLClassLoader.findClass(Unknown Source)
at java.lang.ClassLoader.loadClass(Unknown Source)
at sun.misc.Launcher$AppClassLoader.loadClass(Unknown Source)
at java.lang.ClassLoader.loadClass(Unknown Source)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Unknown Source)
at 
org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:73)
at java.io.ObjectInputStream.readNonProxyDesc(Unknown Source)
at java.io.ObjectInputStream.readClassDesc(Unknown Source)
at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
at java.io.ObjectInputStream.readObject0(Unknown Source)
at java.io.ObjectInputStream.readObject(Unknown Source)
at 
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:393)
at 
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:380)
at 
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:368)
at 
org.apache.flink.util.SerializedValue.deserializeValue(SerializedValue.java:58)
at 
org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.createPartitionStateHolders(AbstractFetcher.java:542)
at 
org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.(AbstractFetcher.java:167)
at 
org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.(Kafka09Fetcher.java:89)
at 
org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.(Kafka010Fetcher.java:62)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010.createFetcher(FlinkKafkaConsumer010.java:203)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:564)
at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:86)
at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55)
at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:94)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Unknown Source)


On Tue, Jan 23, 2018 at 7:51 AM, Stephan Ewen  wrote:

> Hi!
>
> We changed a few things between 1.3 and 1.4 concerning Avro. One of the
> main things is that Avro is no longer part of the core Flink class library,
> but needs to be packaged into your application jar file.
>
> The class loading / caching issues of 1.3 with respect to Avro should
> disappear in Flink 1.4, because Avro classes and caches are scoped to the
> job classloaders, so the caches do not go across different jobs, or even
> different operators.
>
>
> *Please check: Make sure you have Avro as a dependency in your jar file
> (in scope "compile").*
>
> Hope that solves the issue.
>
> Stephan
>
>
> On Mon, Jan 22, 2018 at 2:31 PM, Edward  wrote:
>
>> Yes, we've seen this issue as well, though it usually takes many more
>> resubmits before the error pops up. Interestingly, of the 7 jobs we run
>> (all
>> of which use different Avro schemas), we only see this issue on 1 of them.
>> Once the NoClassDefFoundError crops up though, it is necessary to recreate
>> the task managers.
>>
>> There's a whole page on the Flink documentation on debugging classloading,
>> and Avro is mentioned several times on that page:
>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/
>> monitoring/debugging_classloading.html
>>
>> It seems that (in 1.3 at least) each submitted job has its own
>> classloader,
>> and its own instance of the Avro class definitions. However, the Avro
>> class
>> cache will keep references to the Avro classes from classloaders for the
>> previous cancelled jobs. That said, we haven't been able to find a
>> solution
>> to this error yet. Flink 1.4 would be worth a try because the of the
>> changes
>> to the default classloading behaviour (child-first is the new default, not
>> parent-first).
>>
>>
>>
>>
>>
>> --
>> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.
>> nabble.com/
>>
>
>


Re: TaskManager crashes with PageRank algorithm in Gelly

2018-02-22 Thread santoshg
An update - I was able to overcome these issues by setting the preallocate
flag to true. Not sure why this fixes the problem. Need to dig a bit deeper.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Window with recent messages

2018-02-22 Thread Krzysztof Białek
Hi Fabian,

Thank you for your suggestion. In the meantime I rethought this problem and
implemented alternative solution without using windows at all.
I used plain ProcessFunction with
1. Keyed state (by companyId) - to keep ratings per key
2. EventTime timers - to remove outdated ratings from state and emit
recalculated score immediately

This solution gives results in real-time, windows would delay the results
by 1 day I think.

Regards,
Krzysztof


On Thu, Feb 22, 2018 at 9:44 AM, Fabian Hueske  wrote:

> Hi Krzysztof,
>
> you could compute the stats in two stages:
>
> 1) compute an daily window. You should use a ReduceFunction or
> AggreagteFunction here if possible to perform the computation eagerly.
> 2) compute a sliding window of 90 days with a 1 day hop (or 90 rows with a
> 1 row hop).
>
> That will crunch down the data in the first window and the second window
> will combine the pre-aggregated results.
>
> Hope this helps,
> Fabian
>
> 2018-02-19 16:36 GMT+01:00 Krzysztof Białek :
>
>> Hi,
>>
>> My app is calculating Companies scores from Ratings given by users.
>> Only ratings from last 90 days should be considered.
>>
>> 1. Is it possible to construct window processing ratings from last 90
>> days?
>> I've started with *misusing* countWindow but this solution looks ugly for
>> me.
>>
>> ratingStream
>>   .filter(new OutdatedRatingsFilter(maxRatingAge))
>>   .keyBy(_.companyId)
>>   .countWindow(0L).trigger(new OnEventTrigger).evictor(new 
>> OutdatedRatingsEvictor(maxRatingAge))
>>   .process(ratingFunction)
>>
>>
>> 2. How to recalculate score once the rating expires (after 90 days)?
>> I don't want to put artificial ratings into the stream to trigger the
>> recalculation.
>>
>> Any idea how can I do it better?
>>
>> Regards,
>> Krzysztof
>>
>>
>>
>


How to use Dimension table in Flink TableAPI with StreamExecutionEnvironment ?

2018-02-22 Thread 叶振宝
Hey, I am new to flink and I have a question and want to see if anyone can
help here.

How to use Dimension table in Flink TableAPI with StreamExecutionEnvironment ?

I use TableFuncion to deal this question, but it have some problem in debug 
like this:
LogicalProject(col_1=[$0])
  LogicalJoin(condition=[true], joinType=[left])
LogicalTableScan(table=[[test]])
LogicalTableFunctionScan(invocation=[dim_test()], 
rowType=[RecordType(VARCHAR(65536) col, VARCHAR(65536) name)], 
elementType=[class [Ljava.lang.Object;])

This exception indicates that the query uses an unsupported SQL feature.
Please check the documentation for the set of currently supported SQL features.
at 
org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:274)
at 
org.apache.flink.table.api.StreamTableEnvironment.optimize(StreamTableEnvironment.scala:674)
at 
org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:730)
at 
org.apache.flink.table.api.StreamTableEnvironment.writeToSink(StreamTableEnvironment.scala:216)
at 
org.apache.flink.table.api.TableEnvironment.insertInto(TableEnvironment.scala:692)
at com.bigdata.stream.streamsql.FlinkSqlTest.main(FlinkSqlTest.java:64)

SQL : select t.col_1 from test t left join lateral table(dim_test()) b on true

Main Code:
public static void main(String[] args) throws Exception {
String sql = "select t.col_1 from test t left join lateral 
table(dim_test()) b on true";
StreamExecutionEnvironment streamEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment stEnv = 
TableEnvironment.getTableEnvironment(streamEnv);
Properties kafkaProps = new Properties();
kafkaProps.setProperty("zookeeper.connect", "localhost:2181");
kafkaProps.setProperty("bootstrap.servers", "localhost:9092");
kafkaProps.setProperty("group.id", "test");
Kafka010JsonTableSource tableSource = Kafka010JsonTableSource.builder()
.forTopic("test")
.withKafkaProperties(kafkaProps)
.withSchema(TableSchema.builder()
.field("col_1", Types.STRING)
.field("col_2",Types.STRING).build())
.build();
stEnv.registerTableSource("test", tableSource);
String[] columns = {"col","name"};
TypeInformation[] typeInformations = 
{TypeInformation.of(String.class),TypeInformation.of(String.class)};
TableSchema tableSchema = new TableSchema(columns,typeInformations);
Map context = new HashMap<>();
context.put("mysql.url","jdbc:mysql://localhost:3306/test");
context.put("mysql.driver","com.mysql.jdbc.Driver");
context.put("mysql.user","test");
context.put("mysql.password","test");
context.put("mysql.table","dim_test");
StreamSqlDim dim = new MySqlDimFactory().getInstance(tableSchema,new 
StreamSqlContext(context));
stEnv.registerFunction("dim_test",dim);

String[] outColumns = {"col"};
TypeInformation[] outType = {TypeInformation.of(String.class)};
TableSink tableSink = new Kafka010JsonTableSink("test_out",kafkaProps);
stEnv.registerTableSink("test_out",outColumns,outType,tableSink);
Table t = stEnv.sql(sql);
stEnv.insertInto(t,"test_out",stEnv.queryConfig());
streamEnv.execute();
}

MySqlDim is extends TableFunction ,and the method eval() is empty,like this:
public void eval(){

} 





RE: Problems to use toAppendStream

2018-02-22 Thread Esa Heikkinen
Hi

But next problem ☹

When I try to run I got error:

Exception in thread "main" java.lang.NoClassDefFoundError: scala/collection/Seq
 at pack.CepTest2.main(CepTest2.scala)
Caused by: java.lang.ClassNotFoundException: scala.collection.Seq
 at 
java.net.URLClassLoader.findClass(URLClassLoader.java:381)
 at 
java.lang.ClassLoader.loadClass(ClassLoader.java:424)
 at 
sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:335)
 at 
java.lang.ClassLoader.loadClass(ClassLoader.java:357)

What would be the reason for that ?

BR Esa

From: Esa Heikkinen [mailto:esa.heikki...@student.tut.fi]
Sent: Thursday, February 22, 2018 1:01 PM
To: Xingcan Cui 
Cc: Fabian Hueske ; user@flink.apache.org
Subject: RE: Problems to use toAppendStream

Hi

It works now. Thank you ☺

How to know what the imports are incompatible or something like that ?

BR Esa

From: Xingcan Cui [mailto:xingc...@gmail.com]
Sent: Thursday, February 22, 2018 12:00 PM
To: Esa Heikkinen 
>
Cc: Fabian Hueske >; 
user@flink.apache.org
Subject: Re: Problems to use toAppendStream

Hi Esa and Fabian,

sorry for my inaccurate conclusion before, but I think the reason is clear now. 
The org.apache.flink.streaming.api.scala._ and org.apache.flink.api.scala._  
should not be imported simultaneously due to conflict. Just remove either of 
them.

Best,
Xingcan

On 22 Feb 2018, at 5:20 PM, Xingcan Cui 
> wrote:

Hi Fabian and Esa,

I ran the code myself and also noticed the strange behavior. It seems that only 
I explicitly import the function i.e., 
org.apache.flink.streaming.api.scala.asScalaStream, can it works. In other 
words, the underscore import becomes useless. I also checked other package 
objects (e.g., org.apache.flink.table.api.scala._) and they are the same.

@Esa, you can temporarily solve the problem by importing 
org.apache.flink.streaming.api.scala.asScalaStream in your code and we'll 
continue working on this issue.

Best,
Xingcan

On 22 Feb 2018, at 4:47 PM, Esa Heikkinen 
> wrote:

Hi

How to check versions ?

In pom.xml there are lines:

 
  
UTF-8
  
1.4.0
  
1.7.7
  
1.2.17
  
2.11
  
2.11.11
 

BR Esa

From: Fabian Hueske [mailto:fhue...@gmail.com]
Sent: Thursday, February 22, 2018 10:35 AM
To: Esa Heikkinen 
>
Cc: Xingcan Cui >; 
user@flink.apache.org
Subject: Re: Problems to use toAppendStream

Hi Esa,
which Scala version do you use?
Flink supports Scala 2.11 (and Scala 2.10 support was dropped with Flink 1.4.0).
Fabian

2018-02-22 9:28 GMT+01:00 Esa Heikkinen 
>:


It should be ok. This is the list of my all imports. First part of it has been 
highlighted weaker. I don’t know why.

import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.cep.scala.{CEP, PatternStream}
import org.apache.flink.cep.scala.pattern.Pattern
import org.apache.flink.cep.{PatternFlatSelectFunction, 
PatternFlatTimeoutFunction}
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator
import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction
import 
org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
import org.apache.flink.util.Collector
import org.apache.flink.streaming.api.scala._
import org.apache.flink.api.scala._
import org.apache.flink.table.api.scala._
import org.apache.flink.table.api.scala.StreamTableEnvironment
import org.apache.flink.table.api.java.StreamTableEnvironment


import org.apache.flink.types.Row
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala.{DataStream, 
StreamExecutionEnvironment}
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.sources.CsvTableSource
import org.apache.flink.api.common.typeinfo.Types

BR Esa

From: Xingcan Cui 

Timestamp from Kafka record and watermark generation

2018-02-22 Thread Federico D'Ambrosio
Hello everyone,

I'm consuming from a Kafka topic, on which I'm writing with a
FlinkKafkaProducer, with the timestamp relative flag set to true.

>From what I gather from the documentation [1], Flink is aware of Kafka
Record's timestamp and only the watermark should be set with an appropriate
TimestampExtractor, still I'm failing to understand how to implement it in
the right way.

I thought that it would be possible to use the already existent
AscendingTimestampExtractor, overriding the extractTimestamp method, but
it's marked final.

new FlinkKafkaConsumer010[Event](ingestion_topic, new
JSONDeserializationSchema(), consumerConfig)
  .setStartFromLatest()
.assignTimestampsAndWatermarks(new AscendingTimestampExtractor[Event]() {
  def extractAscendingTimestamp(element: Event): Long = ???
})

Should I need to implement my own TimestampExtractor (with the appropriate
getCurrentWatermark and extractTimestamp methods) ?

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kafka.html#using-kafka-timestamps-and-flink-event-time-in-kafka-010


Thank you,
Federico


RE: Problems to use toAppendStream

2018-02-22 Thread Esa Heikkinen
Hi

It works now. Thank you ☺

How to know what the imports are incompatible or something like that ?

BR Esa

From: Xingcan Cui [mailto:xingc...@gmail.com]
Sent: Thursday, February 22, 2018 12:00 PM
To: Esa Heikkinen 
Cc: Fabian Hueske ; user@flink.apache.org
Subject: Re: Problems to use toAppendStream

Hi Esa and Fabian,

sorry for my inaccurate conclusion before, but I think the reason is clear now. 
The org.apache.flink.streaming.api.scala._ and org.apache.flink.api.scala._  
should not be imported simultaneously due to conflict. Just remove either of 
them.

Best,
Xingcan

On 22 Feb 2018, at 5:20 PM, Xingcan Cui 
> wrote:

Hi Fabian and Esa,

I ran the code myself and also noticed the strange behavior. It seems that only 
I explicitly import the function i.e., 
org.apache.flink.streaming.api.scala.asScalaStream, can it works. In other 
words, the underscore import becomes useless. I also checked other package 
objects (e.g., org.apache.flink.table.api.scala._) and they are the same.

@Esa, you can temporarily solve the problem by importing 
org.apache.flink.streaming.api.scala.asScalaStream in your code and we'll 
continue working on this issue.

Best,
Xingcan


On 22 Feb 2018, at 4:47 PM, Esa Heikkinen 
> wrote:

Hi

How to check versions ?

In pom.xml there are lines:

 
  
UTF-8
  
1.4.0
  
1.7.7
  
1.2.17
  
2.11
  
2.11.11
 

BR Esa

From: Fabian Hueske [mailto:fhue...@gmail.com]
Sent: Thursday, February 22, 2018 10:35 AM
To: Esa Heikkinen 
>
Cc: Xingcan Cui >; 
user@flink.apache.org
Subject: Re: Problems to use toAppendStream

Hi Esa,
which Scala version do you use?
Flink supports Scala 2.11 (and Scala 2.10 support was dropped with Flink 1.4.0).
Fabian

2018-02-22 9:28 GMT+01:00 Esa Heikkinen 
>:


It should be ok. This is the list of my all imports. First part of it has been 
highlighted weaker. I don’t know why.

import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.cep.scala.{CEP, PatternStream}
import org.apache.flink.cep.scala.pattern.Pattern
import org.apache.flink.cep.{PatternFlatSelectFunction, 
PatternFlatTimeoutFunction}
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator
import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction
import 
org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
import org.apache.flink.util.Collector
import org.apache.flink.streaming.api.scala._
import org.apache.flink.api.scala._
import org.apache.flink.table.api.scala._
import org.apache.flink.table.api.scala.StreamTableEnvironment
import org.apache.flink.table.api.java.StreamTableEnvironment


import org.apache.flink.types.Row
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala.{DataStream, 
StreamExecutionEnvironment}
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.sources.CsvTableSource
import org.apache.flink.api.common.typeinfo.Types

BR Esa

From: Xingcan Cui [mailto:xingc...@gmail.com]
Sent: Thursday, February 22, 2018 10:09 AM

To: Esa Heikkinen 
>
Cc: user@flink.apache.org
Subject: Re: Problems to use toAppendStream

Hi Esa,

just to remind that don’t miss the dot and underscore.

Best,
Xingcan

On 22 Feb 2018, at 3:59 PM, Esa Heikkinen 
> wrote:

Hi

Actually I have also line “import org.apache.flink.streaming.api.scala” on my 
code, but this line seems to be highlighted weaker in window of IDEA IntelliJ 
editor. What does this mean ?

But the same errors will still be generated.

Esa

From: Fabian Hueske [mailto:fhue...@gmail.com]
Sent: Wednesday, February 21, 2018 9:41 PM
To: Esa Heikkinen 
>
Cc: user@flink.apache.org
Subject: Re: Problems to use toAppendStream

Hi Esa,
whenever you observe the error "could not find 

Re: Problems to use toAppendStream

2018-02-22 Thread Xingcan Cui
Hi Esa and Fabian,

sorry for my inaccurate conclusion before, but I think the reason is clear now. 
The org.apache.flink.streaming.api.scala._ and org.apache.flink.api.scala._  
should not be imported simultaneously due to conflict. Just remove either of 
them.

Best,
Xingcan

> On 22 Feb 2018, at 5:20 PM, Xingcan Cui  wrote:
> 
> Hi Fabian and Esa,
> 
> I ran the code myself and also noticed the strange behavior. It seems that 
> only I explicitly import the function i.e., 
> org.apache.flink.streaming.api.scala.asScalaStream, can it works. In other 
> words, the underscore import becomes useless. I also checked other package 
> objects (e.g., org.apache.flink.table.api.scala._) and they are the same.
> 
> @Esa, you can temporarily solve the problem by importing 
> org.apache.flink.streaming.api.scala.asScalaStream in your code and we'll 
> continue working on this issue.
> 
> Best,
> Xingcan
> 
>> On 22 Feb 2018, at 4:47 PM, Esa Heikkinen > > wrote:
>> 
>> Hi
>>  
>> How to check versions ?
>>  
>> In pom.xml there are lines:
>>  
>>  
>>   
>> UTF-8
>>   
>> 1.4.0
>>   
>> 1.7.7
>>   
>> 1.2.17
>>   
>> 2.11
>>   
>> 2.11.11
>>  
>>  
>> BR Esa
>>  
>> From: Fabian Hueske [mailto:fhue...@gmail.com ] 
>> Sent: Thursday, February 22, 2018 10:35 AM
>> To: Esa Heikkinen > >
>> Cc: Xingcan Cui >; 
>> user@flink.apache.org 
>> Subject: Re: Problems to use toAppendStream
>>  
>> Hi Esa,
>> 
>> which Scala version do you use?
>> Flink supports Scala 2.11 (and Scala 2.10 support was dropped with Flink 
>> 1.4.0).
>> 
>> Fabian
>>  
>> 2018-02-22 9:28 GMT+01:00 Esa Heikkinen > >:
>>  
>> 
>> It should be ok. This is the list of my all imports. First part of it has 
>> been highlighted weaker. I don’t know why.
>>  
>> import org.apache.flink.streaming.api.windowing.time.Time
>> import org.apache.flink.api.java.utils.ParameterTool
>> import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
>> import org.apache.flink.streaming.api.windowing.time.Time
>> import org.apache.flink.cep.scala.{CEP, PatternStream}
>> import org.apache.flink.cep.scala.pattern.Pattern
>> import org.apache.flink.cep.{PatternFlatSelectFunction, 
>> PatternFlatTimeoutFunction}
>> import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator
>> import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction
>> import 
>> org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
>> import org.apache.flink.util.Collector
>> import org.apache.flink.streaming.api.scala._
>> import org.apache.flink.api.scala._
>> import org.apache.flink.table.api.scala._
>> import org.apache.flink.table.api.scala.StreamTableEnvironment
>> import org.apache.flink.table.api.java.StreamTableEnvironment
>>  
>>  
>> import org.apache.flink.types.Row
>> import org.apache.flink.streaming.api.TimeCharacteristic
>> import org.apache.flink.streaming.api.scala.{DataStream, 
>> StreamExecutionEnvironment}
>> import org.apache.flink.table.api.TableEnvironment
>> import org.apache.flink.table.sources.CsvTableSource
>> import org.apache.flink.api.common.typeinfo.Types
>>  
>> BR Esa
>>  
>> From: Xingcan Cui [mailto:xingc...@gmail.com ] 
>> Sent: Thursday, February 22, 2018 10:09 AM
>> 
>> To: Esa Heikkinen > >
>> Cc: user@flink.apache.org 
>> Subject: Re: Problems to use toAppendStream
>>  
>> Hi Esa,
>>  
>> just to remind that don’t miss the dot and underscore.
>>  
>> Best,
>> Xingcan
>>  
>> 
>> On 22 Feb 2018, at 3:59 PM, Esa Heikkinen > > wrote:
>>  
>> Hi
>>  
>> Actually I have also line “import org.apache.flink.streaming.api.scala” on 
>> my code, but this line seems to be highlighted weaker in window of IDEA 
>> IntelliJ editor. What does this mean ?
>>  
>> But the same errors will still be generated.
>>  
>> Esa
>>  
>> From: Fabian Hueske [mailto:fhue...@gmail.com ] 
>> Sent: Wednesday, February 21, 2018 9:41 PM
>> To: Esa Heikkinen > >
>> Cc: user@flink.apache.org 
>> Subject: Re: 

Problem when uploading a java flink program to aws lambda

2018-02-22 Thread Kulasangar
I have created a java application using flink api and table api. I can
provide the source code if needed.

The application works perfectly locally. But when I tried to upload the
created jar in aws lambda and execute it I'm being thrown with the following
error:

*reference.conf: 804: Could not resolve substitution to a value:
${akka.stream.materializer}:
com.typesafe.config.ConfigException$UnresolvedSubstitution
com.typesafe.config.ConfigException$UnresolvedSubstitution: reference.conf:
804: Could not resolve substitution to a value: ${akka.stream.materializer}
at
com.typesafe.config.impl.ConfigReference.resolveSubstitutions(ConfigReference.java:87)
at
com.typesafe.config.impl.ResolveSource.resolveCheckingReplacement(ResolveSource.java:110)*

I have posted only the first few lines of the exception. Has anyone come
across this typa error? 

Any help would be appreciated.

Regards.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Problems to use toAppendStream

2018-02-22 Thread Xingcan Cui
Hi Fabian and Esa,

I ran the code myself and also noticed the strange behavior. It seems that only 
I explicitly import the function i.e., 
org.apache.flink.streaming.api.scala.asScalaStream, can it works. In other 
words, the underscore import becomes useless. I also checked other package 
objects (e.g., org.apache.flink.table.api.scala._) and they are the same.

@Esa, you can temporarily solve the problem by importing 
org.apache.flink.streaming.api.scala.asScalaStream in your code and we'll 
continue working on this issue.

Best,
Xingcan

> On 22 Feb 2018, at 4:47 PM, Esa Heikkinen  
> wrote:
> 
> Hi
>  
> How to check versions ?
>  
> In pom.xml there are lines:
>  
>  
>   
> UTF-8
>   
> 1.4.0
>   
> 1.7.7
>   
> 1.2.17
>   
> 2.11
>   
> 2.11.11
>  
>  
> BR Esa
>  
> From: Fabian Hueske [mailto:fhue...@gmail.com] 
> Sent: Thursday, February 22, 2018 10:35 AM
> To: Esa Heikkinen 
> Cc: Xingcan Cui ; user@flink.apache.org
> Subject: Re: Problems to use toAppendStream
>  
> Hi Esa,
> 
> which Scala version do you use?
> Flink supports Scala 2.11 (and Scala 2.10 support was dropped with Flink 
> 1.4.0).
> 
> Fabian
>  
> 2018-02-22 9:28 GMT+01:00 Esa Heikkinen  >:
>  
> 
> It should be ok. This is the list of my all imports. First part of it has 
> been highlighted weaker. I don’t know why.
>  
> import org.apache.flink.streaming.api.windowing.time.Time
> import org.apache.flink.api.java.utils.ParameterTool
> import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
> import org.apache.flink.streaming.api.windowing.time.Time
> import org.apache.flink.cep.scala.{CEP, PatternStream}
> import org.apache.flink.cep.scala.pattern.Pattern
> import org.apache.flink.cep.{PatternFlatSelectFunction, 
> PatternFlatTimeoutFunction}
> import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator
> import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction
> import 
> org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
> import org.apache.flink.util.Collector
> import org.apache.flink.streaming.api.scala._
> import org.apache.flink.api.scala._
> import org.apache.flink.table.api.scala._
> import org.apache.flink.table.api.scala.StreamTableEnvironment
> import org.apache.flink.table.api.java.StreamTableEnvironment
>  
>  
> import org.apache.flink.types.Row
> import org.apache.flink.streaming.api.TimeCharacteristic
> import org.apache.flink.streaming.api.scala.{DataStream, 
> StreamExecutionEnvironment}
> import org.apache.flink.table.api.TableEnvironment
> import org.apache.flink.table.sources.CsvTableSource
> import org.apache.flink.api.common.typeinfo.Types
>  
> BR Esa
>  
> From: Xingcan Cui [mailto:xingc...@gmail.com ] 
> Sent: Thursday, February 22, 2018 10:09 AM
> 
> To: Esa Heikkinen  >
> Cc: user@flink.apache.org 
> Subject: Re: Problems to use toAppendStream
>  
> Hi Esa,
>  
> just to remind that don’t miss the dot and underscore.
>  
> Best,
> Xingcan
>  
> 
> On 22 Feb 2018, at 3:59 PM, Esa Heikkinen  > wrote:
>  
> Hi
>  
> Actually I have also line “import org.apache.flink.streaming.api.scala” on my 
> code, but this line seems to be highlighted weaker in window of IDEA IntelliJ 
> editor. What does this mean ?
>  
> But the same errors will still be generated.
>  
> Esa
>  
> From: Fabian Hueske [mailto:fhue...@gmail.com ] 
> Sent: Wednesday, February 21, 2018 9:41 PM
> To: Esa Heikkinen  >
> Cc: user@flink.apache.org 
> Subject: Re: Problems to use toAppendStream
>  
> Hi Esa,
> 
> whenever you observe the error "could not find implicit value for evidence 
> parameter of type X" in a streaming program, you need to add the following 
> import:
> 
> import org.apache.flink.streaming.api.scala._
> 
> Best, Fabian
>  
> 2018-02-21 19:49 GMT+01:00 Esa Heikkinen  >:
>  
> Hi
>  
>  
> I have tried to solve below Errors for long time, but no succeed yet. Could 
> you give some hint how to solve it ?
>  
> Errors in compiling:
> --
> Error:(56, 46) could not find implicit value for evidence parameter of type 
> 

RE: Problems to use toAppendStream

2018-02-22 Thread Esa Heikkinen
Hi

How to check versions ?

In pom.xml there are lines:

 
  
UTF-8
  
1.4.0
  
1.7.7
  
1.2.17
  
2.11
  
2.11.11
 

BR Esa

From: Fabian Hueske [mailto:fhue...@gmail.com]
Sent: Thursday, February 22, 2018 10:35 AM
To: Esa Heikkinen 
Cc: Xingcan Cui ; user@flink.apache.org
Subject: Re: Problems to use toAppendStream

Hi Esa,
which Scala version do you use?
Flink supports Scala 2.11 (and Scala 2.10 support was dropped with Flink 1.4.0).
Fabian

2018-02-22 9:28 GMT+01:00 Esa Heikkinen 
>:


It should be ok. This is the list of my all imports. First part of it has been 
highlighted weaker. I don’t know why.

import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.cep.scala.{CEP, PatternStream}
import org.apache.flink.cep.scala.pattern.Pattern
import org.apache.flink.cep.{PatternFlatSelectFunction, 
PatternFlatTimeoutFunction}
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator
import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction
import 
org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
import org.apache.flink.util.Collector
import org.apache.flink.streaming.api.scala._
import org.apache.flink.api.scala._
import org.apache.flink.table.api.scala._
import org.apache.flink.table.api.scala.StreamTableEnvironment
import org.apache.flink.table.api.java.StreamTableEnvironment


import org.apache.flink.types.Row
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala.{DataStream, 
StreamExecutionEnvironment}
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.sources.CsvTableSource
import org.apache.flink.api.common.typeinfo.Types

BR Esa

From: Xingcan Cui [mailto:xingc...@gmail.com]
Sent: Thursday, February 22, 2018 10:09 AM

To: Esa Heikkinen 
>
Cc: user@flink.apache.org
Subject: Re: Problems to use toAppendStream

Hi Esa,

just to remind that don’t miss the dot and underscore.

Best,
Xingcan

On 22 Feb 2018, at 3:59 PM, Esa Heikkinen 
> wrote:

Hi

Actually I have also line “import org.apache.flink.streaming.api.scala” on my 
code, but this line seems to be highlighted weaker in window of IDEA IntelliJ 
editor. What does this mean ?

But the same errors will still be generated.

Esa

From: Fabian Hueske [mailto:fhue...@gmail.com]
Sent: Wednesday, February 21, 2018 9:41 PM
To: Esa Heikkinen 
>
Cc: user@flink.apache.org
Subject: Re: Problems to use toAppendStream

Hi Esa,
whenever you observe the error "could not find implicit value for evidence 
parameter of type X" in a streaming program, you need to add the following 
import:
import org.apache.flink.streaming.api.scala._
Best, Fabian

2018-02-21 19:49 GMT+01:00 Esa Heikkinen 
>:



Hi





I have tried to solve below Errors for long time, but no succeed yet. Could you 
give some hint how to solve it ?



Errors in compiling:

--

Error:(56, 46) could not find implicit value for evidence parameter of type 
org.apache.flink.api.common.typeinfo.TypeInformation[org.apache.flink.types.Row]

val stream = tableEnv.toAppendStream[Row](tableTest)



Error:(56, 46) not enough arguments for method toAppendStream: (implicit 
evidence$3: 
org.apache.flink.api.common.typeinfo.TypeInformation[org.apache.flink.types.Row])org.apache.flink.streaming.api.scala.DataStream[org.apache.flink.types.Row].

Unspecified value parameter evidence$3.

val stream = tableEnv.toAppendStream[Row](tableTest)



Code:

-

import org.apache.flink.types.Row

import org.apache.flink.streaming.api.TimeCharacteristic

import org.apache.flink.streaming.api.scala.{DataStream, 
StreamExecutionEnvironment}

import org.apache.flink.table.api.TableEnvironment

import org.apache.flink.table.sources.CsvTableSource

import org.apache.flink.api.common.typeinfo.Types



object CepTest2 {



  def main(args: Array[String]) {



println("Start ...")



val env = 

Re: Window with recent messages

2018-02-22 Thread Fabian Hueske
Hi Krzysztof,

you could compute the stats in two stages:

1) compute an daily window. You should use a ReduceFunction or
AggreagteFunction here if possible to perform the computation eagerly.
2) compute a sliding window of 90 days with a 1 day hop (or 90 rows with a
1 row hop).

That will crunch down the data in the first window and the second window
will combine the pre-aggregated results.

Hope this helps,
Fabian

2018-02-19 16:36 GMT+01:00 Krzysztof Białek :

> Hi,
>
> My app is calculating Companies scores from Ratings given by users.
> Only ratings from last 90 days should be considered.
>
> 1. Is it possible to construct window processing ratings from last 90 days?
> I've started with *misusing* countWindow but this solution looks ugly for
> me.
>
> ratingStream
>   .filter(new OutdatedRatingsFilter(maxRatingAge))
>   .keyBy(_.companyId)
>   .countWindow(0L).trigger(new OnEventTrigger).evictor(new 
> OutdatedRatingsEvictor(maxRatingAge))
>   .process(ratingFunction)
>
>
> 2. How to recalculate score once the rating expires (after 90 days)?
> I don't want to put artificial ratings into the stream to trigger the
> recalculation.
>
> Any idea how can I do it better?
>
> Regards,
> Krzysztof
>
>
>


RE: Problems to use toAppendStream

2018-02-22 Thread Esa Heikkinen


It should be ok. This is the list of my all imports. First part of it has been 
highlighted weaker. I don’t know why.

import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.cep.scala.{CEP, PatternStream}
import org.apache.flink.cep.scala.pattern.Pattern
import org.apache.flink.cep.{PatternFlatSelectFunction, 
PatternFlatTimeoutFunction}
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator
import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction
import 
org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
import org.apache.flink.util.Collector
import org.apache.flink.streaming.api.scala._
import org.apache.flink.api.scala._
import org.apache.flink.table.api.scala._
import org.apache.flink.table.api.scala.StreamTableEnvironment
import org.apache.flink.table.api.java.StreamTableEnvironment


import org.apache.flink.types.Row
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala.{DataStream, 
StreamExecutionEnvironment}
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.sources.CsvTableSource
import org.apache.flink.api.common.typeinfo.Types

BR Esa

From: Xingcan Cui [mailto:xingc...@gmail.com]
Sent: Thursday, February 22, 2018 10:09 AM
To: Esa Heikkinen 
Cc: user@flink.apache.org
Subject: Re: Problems to use toAppendStream

Hi Esa,

just to remind that don’t miss the dot and underscore.

Best,
Xingcan


On 22 Feb 2018, at 3:59 PM, Esa Heikkinen 
> wrote:

Hi

Actually I have also line “import org.apache.flink.streaming.api.scala” on my 
code, but this line seems to be highlighted weaker in window of IDEA IntelliJ 
editor. What does this mean ?

But the same errors will still be generated.

Esa

From: Fabian Hueske [mailto:fhue...@gmail.com]
Sent: Wednesday, February 21, 2018 9:41 PM
To: Esa Heikkinen 
>
Cc: user@flink.apache.org
Subject: Re: Problems to use toAppendStream

Hi Esa,
whenever you observe the error "could not find implicit value for evidence 
parameter of type X" in a streaming program, you need to add the following 
import:
import org.apache.flink.streaming.api.scala._
Best, Fabian

2018-02-21 19:49 GMT+01:00 Esa Heikkinen 
>:



Hi





I have tried to solve below Errors for long time, but no succeed yet. Could you 
give some hint how to solve it ?



Errors in compiling:

--

Error:(56, 46) could not find implicit value for evidence parameter of type 
org.apache.flink.api.common.typeinfo.TypeInformation[org.apache.flink.types.Row]

val stream = tableEnv.toAppendStream[Row](tableTest)



Error:(56, 46) not enough arguments for method toAppendStream: (implicit 
evidence$3: 
org.apache.flink.api.common.typeinfo.TypeInformation[org.apache.flink.types.Row])org.apache.flink.streaming.api.scala.DataStream[org.apache.flink.types.Row].

Unspecified value parameter evidence$3.

val stream = tableEnv.toAppendStream[Row](tableTest)



Code:

-

import org.apache.flink.types.Row

import org.apache.flink.streaming.api.TimeCharacteristic

import org.apache.flink.streaming.api.scala.{DataStream, 
StreamExecutionEnvironment}

import org.apache.flink.table.api.TableEnvironment

import org.apache.flink.table.sources.CsvTableSource

import org.apache.flink.api.common.typeinfo.Types



object CepTest2 {



  def main(args: Array[String]) {



println("Start ...")



val env = StreamExecutionEnvironment.getExecutionEnvironment

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)



//val tableEnv = StreamTableEnvironment.getTableEnvironment(env)

val tableEnv = TableEnvironment.getTableEnvironment(env)



val csvtable = CsvTableSource

  .builder

  .path("/home/esa/Log_EX1_gen_track_5.csv")

  .ignoreFirstLine

  .fieldDelimiter(",")

  .field("time", Types.INT)

  .field("id", Types.STRING)

  .field("sources", Types.STRING)

  .field("targets", Types.STRING)

  .field("attr", Types.STRING)

  .field("data", Types.STRING)

  .build



tableEnv.registerTableSource("test", csvtable)



val tableTest = 
tableEnv.scan("test").where("id='5'").select("id,sources,targets")



val stream = tableEnv.toAppendStream[Row](tableTest)



stream.print

env.execute()

  }

}





Re: Problems to use toAppendStream

2018-02-22 Thread Xingcan Cui
Hi Esa,

just to remind that don’t miss the dot and underscore.

Best,
Xingcan

> On 22 Feb 2018, at 3:59 PM, Esa Heikkinen  
> wrote:
> 
> Hi
>  
> Actually I have also line “import org.apache.flink.streaming.api.scala” on my 
> code, but this line seems to be highlighted weaker in window of IDEA IntelliJ 
> editor. What does this mean ?
>  
> But the same errors will still be generated.
>  
> Esa
>  
> From: Fabian Hueske [mailto:fhue...@gmail.com ] 
> Sent: Wednesday, February 21, 2018 9:41 PM
> To: Esa Heikkinen  >
> Cc: user@flink.apache.org 
> Subject: Re: Problems to use toAppendStream
>  
> Hi Esa,
> 
> whenever you observe the error "could not find implicit value for evidence 
> parameter of type X" in a streaming program, you need to add the following 
> import:
> 
> import org.apache.flink.streaming.api.scala._
> 
> Best, Fabian
>  
> 2018-02-21 19:49 GMT+01:00 Esa Heikkinen  >:
>  
> Hi
>  
>  
> I have tried to solve below Errors for long time, but no succeed yet. Could 
> you give some hint how to solve it ?
>  
> Errors in compiling:
> --
> Error:(56, 46) could not find implicit value for evidence parameter of type 
> org.apache.flink.api.common.typeinfo.TypeInformation[org.apache.flink.types.Row]
> val stream = tableEnv.toAppendStream[Row](tableTest)
>  
> Error:(56, 46) not enough arguments for method toAppendStream: (implicit 
> evidence$3: 
> org.apache.flink.api.common.typeinfo.TypeInformation[org.apache.flink.types.Row])org.apache.flink.streaming.api.scala.DataStream[org.apache.flink.types.Row].
> Unspecified value parameter evidence$3.
> val stream = tableEnv.toAppendStream[Row](tableTest)
>  
> Code:
> -
> import org.apache.flink.types.Row
> import org.apache.flink.streaming.api.TimeCharacteristic
> import org.apache.flink.streaming.api.scala.{DataStream, 
> StreamExecutionEnvironment}
> import org.apache.flink.table.api.TableEnvironment
> import org.apache.flink.table.sources.CsvTableSource
> import org.apache.flink.api.common.typeinfo.Types
>  
> object CepTest2 {
>  
>   def main(args: Array[String]) {
>  
> println("Start ...")
>  
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>  
> //val tableEnv = StreamTableEnvironment.getTableEnvironment(env)
> val tableEnv = TableEnvironment.getTableEnvironment(env)
>  
> val csvtable = CsvTableSource
>   .builder
>   .path("/home/esa/Log_EX1_gen_track_5.csv")
>   .ignoreFirstLine
>   .fieldDelimiter(",")
>   .field("time", Types.INT)
>   .field("id", Types.STRING)
>   .field("sources", Types.STRING)
>   .field("targets", Types.STRING)
>   .field("attr", Types.STRING)
>   .field("data", Types.STRING)
>   .build
>  
> tableEnv.registerTableSource("test", csvtable)
>  
> val tableTest = 
> tableEnv.scan("test").where("id='5'").select("id,sources,targets")
>  
> val stream = tableEnv.toAppendStream[Row](tableTest)
>  
> stream.print
> env.execute()
>   }
> }
>