Re: There is no Open and Close method in Async I/O API of Scala

2017-02-12 Thread Tzu-Li (Gordon) Tai
Hi Howard,

I don’t think there is a rich variant for Async IO in Scala yet. We should 
perhaps add support for it.

Looped in Till who worked on the Async IO and its Scala support to clarify 
whether there were any concerns in not supporting it initially.

Cheers,
Gordon


On February 13, 2017 at 9:49:32 AM, Howard,Li(vip.com) (howard...@vipshop.com) 
wrote:

Hi,

 I’m going to test async IO of scala version. As we can see in 
https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/stream/asyncio.html.
 The java version of async IO API has method of open and close, in which I can 
do some init and clean work. The scala api, however,  has neither open nor 
close. Even if I can do init work while construct the class, the clean work 
can’t be done for there’s no close method.

 When I look into the RichAsyncFunction which the java api extends 
from, I find it’s the subclass of AbstractRichFunction which provide open and 
close method. I try to make my Async IO Function extends both 
AbstractRichFunction and AsyncFunction but find out that the open method does 
not called by flink so It won’t work.

 I managed to find a work around by getting javaStream from scala 
stream and use Java api instead, but I don’t think it’s idea.

 Did I miss something? Or it’s just a bug. If it is a bug, I can open a 
issue and try to fix it.

 

Thanks.

 

Howard

 

李哲豪 | 技术中心  实时计算平台

vip.com | 唯品会

唯品会 一家专门做特卖的网站

手机:15210965971 / 信箱: howard...@vipshop.com

地址:中国上海市闸北区西藏北路18号四行天地3楼

美国上市公司 纽交所代码:VIPS www.vip.com

 

本电子邮件可能为保密文件。如果阁下非电子邮件所指定之收件人,谨请立即通知本人。敬请阁下不要使用、保存、复印、打印、散布本电子邮件及其内容,或将其用于其他任何目的或向任何人披露。谢谢您的合作!
 This communication is intended only for the addressee(s) and may contain 
information that is privileged and confidential. You are hereby notified that, 
if you are not an intended recipient listed above, or an authorized employee or 
agent of an addressee of this communication responsible for delivering e-mail 
messages to an intended recipient, any dissemination, distribution or 
reproduction of this communication (including any attachments hereto) is 
strictly prohibited. If you have received this communication in error, please 
notify us immediately by a reply e-mail addressed to the sender and permanently 
delete the original e-mail communication and any attachments from all storage 
devices without making or otherwise retaining a copy.

Re: Specifying Schema dynamically

2017-02-12 Thread Tzu-Li (Gordon) Tai
Hi Luqman,

From your description, it seems like that you want to infer the type (case 
class, tuple, etc.) of a stream dynamically at runtime.
AFAIK, I don’t think this is supported in Flink. You’re required to have 
defined types for your DataStreams.

Could you also provide an example code of what the functionality you have in 
mind looks like?
That would help clarify if I have misunderstood and there’s actually a way to 
do it.

- Gordon

On February 12, 2017 at 4:30:56 PM, Luqman Ghani (lgsa...@gmail.com) wrote:

Like if a file has a header: id, first_name, last_name, last_login
and we infer schema as: Int, String, String, Long



There is no Open and Close method in Async I/O API of Scala

2017-02-12 Thread Howard,Li(vip.com)
Hi,
 I’m going to test async IO of scala version. As we can see in 
https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/stream/asyncio.html.
 The java version of async IO API has method of open and close, in which I can 
do some init and clean work. The scala api, however,  has neither open nor 
close. Even if I can do init work while construct the class, the clean work 
can’t be done for there’s no close method.
 When I look into the RichAsyncFunction which the java api extends 
from, I find it’s the subclass of AbstractRichFunction which provide open and 
close method. I try to make my Async IO Function extends both 
AbstractRichFunction and AsyncFunction but find out that the open method does 
not called by flink so It won’t work.
 I managed to find a work around by getting javaStream from scala 
stream and use Java api instead, but I don’t think it’s idea.
 Did I miss something? Or it’s just a bug. If it is a bug, I can open a 
issue and try to fix it.

Thanks.

Howard

李哲豪 | 技术中心  实时计算平台
vip.com | 唯品会
唯品会 一家专门做特卖的网站
手机:15210965971 / 信箱: howard...@vipshop.com
地址:中国上海市闸北区西藏北路18号四行天地3楼
美国上市公司 纽交所代码:VIPS www.vip.com

本电子邮件可能为保密文件。如果阁下非电子邮件所指定之收件人,谨请立即通知本人。敬请阁下不要使用、保存、复印、打印、散布本电子邮件及其内容,或将其用于其他任何目的或向任何人披露。谢谢您的合作!
 This communication is intended only for the addressee(s) and may contain 
information that is privileged and confidential. You are hereby notified that, 
if you are not an intended recipient listed above, or an authorized employee or 
agent of an addressee of this communication responsible for delivering e-mail 
messages to an intended recipient, any dissemination, distribution or 
reproduction of this communication (including any attachments hereto) is 
strictly prohibited. If you have received this communication in error, please 
notify us immediately by a reply e-mail addressed to the sender and permanently 
delete the original e-mail communication and any attachments from all storage 
devices without making or otherwise retaining a copy.


Re: Start streaming tuples depending on another streams rate

2017-02-12 Thread Jonas
For 2: You can also NOT read the Source (i.e. Kafka) while doing that. This
way you don't have to buffer.



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Start-streaming-tuples-depending-on-another-streams-rate-tp11542p11590.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Flink 1.2 Maven dependency

2017-02-12 Thread Robert Metzger
Hi Dominik,

I hope the artifacts were distributed properly.
did you get download errors for the 1.2.0 version from any official maven
servers?
Maybe mvnrepository.com is slow indexing new artifacts?

Best,
Robert

On Fri, Feb 10, 2017 at 12:02 AM, Yassine MARZOUGUI <
y.marzou...@mindlytix.com> wrote:

> Hi,
>
> I coud find the dependency here : https://search.maven.org/#ar
> tifactdetails%7Corg.apache.flink%7Cflink-core%7C1.2.0%7Cjar, I wonder why
> it still doesn't show in http://mvnrepository.com/ar
> tifact/org.apache.flink/flink-core.
>
> The dependency version for Flink 1.2 is 1.2.0.
>
> 
> org.apache.flink
> flink-core
> 1.2.0
> 
>
> Best,
> Yassine
>
>
> On Feb 9, 2017 20:39, "Dominik Safaric"  wrote:
>
> Hi,
>
> I’ve been trying to use the Flink 1.2 Maven dependency, but unfortunately
> I was not able to retrieve it.
>
> In addition, I cannot find the 1.2 version neither on the repository
> website (e.g. Flink core http://mvnrepository.com/
> artifact/org.apache.flink/flink-core).
>
> Could someone explain why there isn’t a Maven dependency available yet?
>
> Thanks,
> Dominik
>
>
>


Re: Flink 1.2 and Cassandra Connector

2017-02-12 Thread Robert Metzger
Hi Nico,
The cassandra connector should be available on Maven central:
http://search.maven.org/#artifactdetails%7Corg.apache.flink%7Cflink-connector-cassandra_2.10%7C1.2.0%7Cjar

Potentially, the issue you've mentioned is due to some shading issue. Is
the "com/codahale/metrics/Metric" class in your user code jar?

On Thu, Feb 9, 2017 at 2:56 PM, Nico  wrote:

> Hi,
>
> I would like to upgrade to the new stable version 1.2 - but i get an
> ClassNotFound exception when i start the application.
>
> Caused by: java.lang.NoClassDefFoundError: com/codahale/metrics/Metric
> at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1367)
> at com.datastax.driver.core.Cluster.init(Cluster.java:162)
> at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:333)
> at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:308)
> at com.datastax.driver.core.Cluster.connect(Cluster.java:250)
> at org.apache.flink.streaming.connectors.cassandra.
> CassandraSinkBase.open(CassandraSinkBase.java:67)
> at org.apache.flink.streaming.connectors.cassandra.
> CassandraTupleSink.open(CassandraTupleSink.java:42)
> at org.apache.flink.api.common.functions.util.FunctionUtils.
> openFunction(FunctionUtils.java:36)
> at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.
> open(AbstractUdfStreamOperator.java:112)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> openAllOperators(StreamTask.java:386)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.java:262)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
> at java.lang.Thread.run(Thread.java:745)
>
>
> So I think the cassandra connector is the reason for it. Moreover, i don't
> see a version 1.2 in the maven repository for the connector as mentioned in
> the doc.
>
> 
>   org.apache.flink
>   flink-connector-cassandra_2.10
>   1.2.0
> 
>
> Is there a plan to release a new version?
>
> Best,
> Nico
>


Specifying Schema dynamically

2017-02-12 Thread Luqman Ghani
Hi,

I hope everyone is doing well.

I have a use case where we infer schema according to file headers and other
information. Now, in Flink, we can specify schema of a stream with case
classes and tuples. With tuples, we cannot give names to fields, but we
will have to generate case classes on the fly if we use them. Is there any
way of specifying schema with a Map[String,Any] to Flink, so it can infer
schema from this map.

Like if a file has a header: id, first_name, last_name, last_login
and we infer schema as: Int, String, String, Long

Can we specify it as Map[String, Any]("id" -> Int, "first_name" -> String,
"last_name" -> String, "last_login" -> Long)

We want to use keyBy with field names instead of their indices. I hope
there is a way :)

I was looking into dynamically create case classes in scala using
scala-reflect, but I'm facing problems in getting that class that
forwarding it to Flink program.

Thanks,
Luqman