退订

2022-11-15 Thread jack zhang



退订

2022-08-08 Thread jack zhang



如何按比例丢弃kafka中消费的数据

2022-02-25 Thread jack zhang
1、flink程序资源有限,kafka中数据比较多,想要按一定比例丢弃数据(或者其它策略),减轻flink 程序压力,有什么方法吗?


Re: Interactive Programming in Flink (Cache operation)

2021-05-04 Thread Jack Kolokasis

Hi Matthias,

Thank you for your reply. Are you going to include it in future versions?

Best,
Iacovos

On 4/5/21 9:10 π.μ., Matthias Pohl wrote:

Hi Iacovos,
unfortunately, it doesn't as the related FLINK-19343 [1] is not 
resolved, yet. The release notes for Flink 1.13 can be found in [2].


Best,
Matthias

[1] https://issues.apache.org/jira/browse/FLINK-19343 
<https://issues.apache.org/jira/browse/FLINK-19343>
[2] https://flink.apache.org/news/2021/05/03/release-1.13.0.html 
<https://flink.apache.org/news/2021/05/03/release-1.13.0.html>


On Mon, May 3, 2021 at 8:12 PM Jack Kolokasis <mailto:koloka...@ics.forth.gr>> wrote:


Hello,

Does the new release of Flink 1.13.0 includes the cache operation
feature

(https://cwiki.apache.org/confluence/display/FLINK/FLIP-36%3A+Support+Interactive+Programming+in+Flink

<https://cwiki.apache.org/confluence/display/FLINK/FLIP-36%3A+Support+Interactive+Programming+in+Flink>).

Thank you,
Iacovos



Interactive Programming in Flink (Cache operation)

2021-05-03 Thread Jack Kolokasis

Hello,

Does the new release of Flink 1.13.0 includes the cache operation 
feature 
(https://cwiki.apache.org/confluence/display/FLINK/FLIP-36%3A+Support+Interactive+Programming+in+Flink).


Thank you,
Iacovos



Re: Caching Mechanism in Flink

2020-11-11 Thread Jack Kolokasis

Hi Matthias,

Yeap, I am refer to the tasks' off-heap configuration value.

Best,
Iacovos

On 11/11/20 1:37 μ.μ., Matthias Pohl wrote:
When talking about the "off-heap" in your most recent message, are you 
still referring to the task's off-heap configuration value?
AFAIK, the HybridMemorySegment shouldn't be directly related to the 
off-heap parameter.


The HybridMemorySegment can be used as a wrapper around any kind of 
memory, i.e. byte[]. It can be either used for heap memory but also 
DirectByteBuffers (located in JVM's direct memory pool which is not 
part of the JVM's heap) or memory allocated through 
Unsafe's allocation methods (so-called native memory which is also not 
part of the JVM's heap).
The HybridMemorySegments are utilized within the MemoryManager class. 
The MemoryManager instances are responsible for maintaining the 
managed memory used in each of the TaskSlots. Managed Memory is used 
in different settings (e.g. for the RocksDB state backend in streaming 
applications). It can be configured using 
taskmanager.memory.managed.size (or the corresponding *.fraction 
parameter) [1]. See more details on that in [2].


I'm going to pull in Andrey as he has worked on that topic recently.

Best,
Matthias

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/config.html#taskmanager-memory-managed-size
[2] 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/memory/mem_setup_tm.html#managed-memory


On Wed, Nov 11, 2020 at 12:00 PM Jack Kolokasis 
mailto:koloka...@ics.forth.gr>> wrote:


Hi Matthias,

Thank you for your reply and useful information. I find that the
off-heap is used when Flink uses HybridMemorySegments. Well, how
the Flink knows when to use these HybridMemorySegments and in
which operations this is happened?

Best,
Iacovos

On 11/11/20 11:41 π.μ., Matthias Pohl wrote:

Hi Iacovos,
The task's off-heap configuration value is used when spinning up
TaskManager containers in a clustered environment. It will
contribute to the overall memory reserved for a TaskManager
container during deployment. This parameter can be used to
influence the amount of memory allocated if the user code relies
on DirectByteBuffers and/or native memory allocation. There is no
active memory pool management beyond that from Flink's side. The
configuration parameter is ignored if you run a Flink cluster
locally.

Besides this, Flink also utilizes the JVM's using
DirectByteBuffers (for network buffers) and native memory
(through Flink's internally used managed memory) internally.

You can find a more detailed description of Flink's memory model
in [1]. I hope that helps.

Best,
Matthias

[1]

https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/memory/mem_setup_tm.html#detailed-memory-model

On Tue, Nov 10, 2020 at 3:57 AM Jack Kolokasis
mailto:koloka...@ics.forth.gr>> wrote:

Thank you Xuannan for the reply.

Also I want to ask about how Flink uses the off-heap memory.
If I set taskmanager.memory.task.off-heap.size then which
data does Flink allocate off-heap? This is handle by the
programmer?

Best,
Iacovos

On 10/11/20 4:42 π.μ., Xuannan Su wrote:

Hi Jack,

At the moment, Flink doesn't support caching the
intermediate result. However, there is some ongoing effort
to support caching in Flink.
FLIP-36[1] propose to add the caching mechanism at the Table
API. And it is planned for 1.13.

Best,
Xuannan

On Nov 10, 2020, 4:29 AM +0800, Jack Kolokasis
mailto:koloka...@ics.forth.gr>>, wrote:

Hello all,

I am new to Flink and I want to ask if the Flink supports a
caching
mechanism to store intermediate results in memory for
machine learning
workloads.

If yes, how can I enable it and how can I use it?

Thank you,
Iacovos




Re: Caching Mechanism in Flink

2020-11-11 Thread Jack Kolokasis

Hi Matthias,

Thank you for your reply and useful information. I find that the 
off-heap is used when Flink uses HybridMemorySegments. Well, how the 
Flink knows when to use these HybridMemorySegments and in which 
operations this is happened?


Best,
Iacovos

On 11/11/20 11:41 π.μ., Matthias Pohl wrote:

Hi Iacovos,
The task's off-heap configuration value is used when spinning up 
TaskManager containers in a clustered environment. It will contribute 
to the overall memory reserved for a TaskManager container during 
deployment. This parameter can be used to influence the amount of 
memory allocated if the user code relies on DirectByteBuffers and/or 
native memory allocation. There is no active memory pool management 
beyond that from Flink's side. The configuration parameter is ignored 
if you run a Flink cluster locally.


Besides this, Flink also utilizes the JVM's using DirectByteBuffers 
(for network buffers) and native memory (through Flink's internally 
used managed memory) internally.


You can find a more detailed description of Flink's memory model in 
[1]. I hope that helps.


Best,
Matthias

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/memory/mem_setup_tm.html#detailed-memory-model


On Tue, Nov 10, 2020 at 3:57 AM Jack Kolokasis <mailto:koloka...@ics.forth.gr>> wrote:


Thank you Xuannan for the reply.

Also I want to ask about how Flink uses the off-heap memory. If I
set taskmanager.memory.task.off-heap.size then which data does
Flink allocate off-heap? This is handle by the programmer?

Best,
Iacovos

On 10/11/20 4:42 π.μ., Xuannan Su wrote:

    Hi Jack,

At the moment, Flink doesn't support caching the intermediate
result. However, there is some ongoing effort to support caching
in Flink.
FLIP-36[1] propose to add the caching mechanism at the Table API.
And it is planned for 1.13.

Best,
Xuannan

On Nov 10, 2020, 4:29 AM +0800, Jack Kolokasis
mailto:koloka...@ics.forth.gr>>, wrote:

Hello all,

I am new to Flink and I want to ask if the Flink supports a caching
mechanism to store intermediate results in memory for machine
learning
workloads.

If yes, how can I enable it and how can I use it?

Thank you,
Iacovos




Re: Caching Mechanism in Flink

2020-11-09 Thread Jack Kolokasis

Thank you Xuannan for the reply.

Also I want to ask about how Flink uses the off-heap memory. If I set 
taskmanager.memory.task.off-heap.size then which data does Flink 
allocate off-heap? This is handle by the programmer?


Best,
Iacovos

On 10/11/20 4:42 π.μ., Xuannan Su wrote:

Hi Jack,

At the moment, Flink doesn't support caching the intermediate result. 
However, there is some ongoing effort to support caching in Flink.
FLIP-36[1] propose to add the caching mechanism at the Table API. And 
it is planned for 1.13.


Best,
Xuannan

On Nov 10, 2020, 4:29 AM +0800, Jack Kolokasis <mailto:koloka...@ics.forth.gr>>, wrote:

Hello all,

I am new to Flink and I want to ask if the Flink supports a caching
mechanism to store intermediate results in memory for machine learning
workloads.

If yes, how can I enable it and how can I use it?

Thank you,
Iacovos



Caching Mechanism in Flink

2020-11-09 Thread Jack Kolokasis

Hello all,

I am new to Flink and I want to ask if the Flink supports a caching 
mechanism to store intermediate results in memory for machine learning 
workloads.


If yes, how can I enable it and how can I use it?

Thank you,
Iacovos



Kafka source, committing and retries

2020-07-31 Thread Jack Phelan
Scenario
===

A partition that Flink is reading:
[ 1 - 2 - 3 - 4 - 5 - 6 - 7 - |  8 _ 9 _ 10 _ 11 | 12 ~ 13 ]
[.   Committed.   | In flight  | unread  ]

Kafka basically breaks off pieces of the end of the queue and shoves them
downstream for processing?

So suppose while semantically:
- 8 &10 succeed (api call success)
- 9 & 11 fail (api failure).

Failure Handling options
==

Basically we have two options to handle failures?

A. Try/catch to deadletter queue
```
try {
api.write(8, 9, 10, 11);
} catch E {
// 9, 11 failed to write to the api so we deadletter them

deadletterQueue.write(E.failed_set())
}
```

B. Or it can fail - which will retry the batch?
```
api.write(8, 9, 10, 11);
// 9, 11 failed to write to the api
```

In situation (B.), we're rewriting 8 and 10 to the api, which is bad, so
situation (A.) seems better.


Challenge I can't understand
==

However in (A.) we then do something with the queue:

A2. Try/catch to another deadletter queue?
```
try {
api.write(9, 11);
} catch E {
//11 failed to write to the api
deadletterQueue2.write(E.failed_set())
}
```

Do you see what I mean? Is it turtles all the way down?

Should I create a separate index of semantic outcome? Where should it live?

Should I just keep things in the queue until


smime.p7s
Description: S/MIME Cryptographic Signature


Re:Re: pyflink的table api 能否根据 filed的值进行判断插入到不同的sink表中

2020-06-22 Thread jack
您好,jincheng老师,我已经验证了您提供的这种分开处理的逻辑,可以解决我的问题,非常感谢您的解惑




Best,
Jack







在 2020-06-22 14:28:04,"jincheng sun"  写道:

您好,jack:


Table API  不用 if/else 直接用类似逻辑即可:


val t1 = table.filter('x  > 2).groupBy(..)
val t2 = table.filter('x <= 2).groupBy(..)
t1.insert_into("sink1)
t2.insert_into("sink2")





Best,
Jincheng






jack  于2020年6月19日周五 上午10:35写道:





测试使用如下结构:
table= t_env.from_path("source")


if table.filter("logType=syslog"):
table.filter("logType=syslog").insert_into("sink1")
elif table.filter("logType=alarm"):
table.filter("logType=alarm").insert_into("sink2")




我测试了下,好像table.filter("logType=syslog").insert_into("sink1")生效,下面的elif不生效,原因是table.filter("logType=syslog")或者table.where在做条件判断的同时已经将数据进行过滤,走不到下面的分支??








在 2020-06-19 10:08:25,"jack"  写道:
>使用pyflink的table api 能否根据 filed的值进行判断插入到不同的sink表中?
>
>
>场景:使用pyflink通过filter进行条件过滤后插入到sink中,
>比如以下两条消息,logType不同,可以使用filter接口进行过滤后插入到sink表中:
>{
>"logType":"syslog",
>"message":"sla;flkdsjf"
>}
>{
>"logType":"alarm",
>"message":"sla;flkdsjf"
>}
>  t_env.from_path("source")\
>  .filter("logType=syslog")\
>  .insert_into("sink1")
>有方法直接在上面的代码中通过判断logType字段的类型实现类似if else的逻辑吗:
>if logType=="syslog":
>   insert_into(sink1)
>elif logType=="alarm":
>   insert_into(sink2)
>
>
>如果insert_into 有.filter .select等接口的返回值的话就好办了,可以接着往下通过filter进行判断,代码类似以下:
>
>
>  t_env.from_path("source")\
>  .filter("logType=syslog")\
>  .insert_into("sink1")\
>  .filter("logType=alarm")\
>  .insert_into("sink2")
>请各位大牛指点,感谢
>
>
>
>
>


Re:Re: pyflink的table api 能否根据 filed的值进行判断插入到不同的sink表中

2020-06-22 Thread jack
您好,jincheng老师,我已经验证了您提供的这种分开处理的逻辑,可以解决我的问题,非常感谢您的解惑




Best,
Jack







在 2020-06-22 14:28:04,"jincheng sun"  写道:

您好,jack:


Table API  不用 if/else 直接用类似逻辑即可:


val t1 = table.filter('x  > 2).groupBy(..)
val t2 = table.filter('x <= 2).groupBy(..)
t1.insert_into("sink1)
t2.insert_into("sink2")





Best,
Jincheng






jack  于2020年6月19日周五 上午10:35写道:





测试使用如下结构:
table= t_env.from_path("source")


if table.filter("logType=syslog"):
table.filter("logType=syslog").insert_into("sink1")
elif table.filter("logType=alarm"):
table.filter("logType=alarm").insert_into("sink2")




我测试了下,好像table.filter("logType=syslog").insert_into("sink1")生效,下面的elif不生效,原因是table.filter("logType=syslog")或者table.where在做条件判断的同时已经将数据进行过滤,走不到下面的分支??








在 2020-06-19 10:08:25,"jack"  写道:
>使用pyflink的table api 能否根据 filed的值进行判断插入到不同的sink表中?
>
>
>场景:使用pyflink通过filter进行条件过滤后插入到sink中,
>比如以下两条消息,logType不同,可以使用filter接口进行过滤后插入到sink表中:
>{
>"logType":"syslog",
>"message":"sla;flkdsjf"
>}
>{
>"logType":"alarm",
>"message":"sla;flkdsjf"
>}
>  t_env.from_path("source")\
>  .filter("logType=syslog")\
>  .insert_into("sink1")
>有方法直接在上面的代码中通过判断logType字段的类型实现类似if else的逻辑吗:
>if logType=="syslog":
>   insert_into(sink1)
>elif logType=="alarm":
>   insert_into(sink2)
>
>
>如果insert_into 有.filter .select等接口的返回值的话就好办了,可以接着往下通过filter进行判断,代码类似以下:
>
>
>  t_env.from_path("source")\
>  .filter("logType=syslog")\
>  .insert_into("sink1")\
>  .filter("logType=alarm")\
>  .insert_into("sink2")
>请各位大牛指点,感谢
>
>
>
>
>


Re:pyflink的table api 能否根据 filed的值进行判断插入到不同的sink表中

2020-06-18 Thread jack



测试使用如下结构:
table= t_env.from_path("source")


if table.filter("logType=syslog"):
table.filter("logType=syslog").insert_into("sink1")
elif table.filter("logType=alarm"):
table.filter("logType=alarm").insert_into("sink2")




我测试了下,好像table.filter("logType=syslog").insert_into("sink1")生效,下面的elif不生效,原因是table.filter("logType=syslog")或者table.where在做条件判断的同时已经将数据进行过滤,走不到下面的分支??








在 2020-06-19 10:08:25,"jack"  写道:
>使用pyflink的table api 能否根据 filed的值进行判断插入到不同的sink表中?
>
>
>场景:使用pyflink通过filter进行条件过滤后插入到sink中,
>比如以下两条消息,logType不同,可以使用filter接口进行过滤后插入到sink表中:
>{
>"logType":"syslog",
>"message":"sla;flkdsjf"
>}
>{
>"logType":"alarm",
>"message":"sla;flkdsjf"
>}
>  t_env.from_path("source")\
>  .filter("logType=syslog")\
>  .insert_into("sink1")
>有方法直接在上面的代码中通过判断logType字段的类型实现类似if else的逻辑吗:
>if logType=="syslog":
>   insert_into(sink1)
>elif logType=="alarm":
>   insert_into(sink2)
>
>
>如果insert_into 有.filter .select等接口的返回值的话就好办了,可以接着往下通过filter进行判断,代码类似以下:
>
>
>  t_env.from_path("source")\
>  .filter("logType=syslog")\
>  .insert_into("sink1")\
>  .filter("logType=alarm")\
>  .insert_into("sink2")
>请各位大牛指点,感谢
>
>
>
>
>


pyflink的table api 能否根据 filed的值进行判断插入到不同的sink表中

2020-06-18 Thread jack
使用pyflink的table api 能否根据 filed的值进行判断插入到不同的sink表中?


场景:使用pyflink通过filter进行条件过滤后插入到sink中,
比如以下两条消息,logType不同,可以使用filter接口进行过滤后插入到sink表中:
{
"logType":"syslog",
"message":"sla;flkdsjf"
}
{
"logType":"alarm",
"message":"sla;flkdsjf"
}
  t_env.from_path("source")\
  .filter("logType=syslog")\
  .insert_into("sink1")
有方法直接在上面的代码中通过判断logType字段的类型实现类似if else的逻辑吗:
if logType=="syslog":
   insert_into(sink1)
elif logType=="alarm":
   insert_into(sink2)


如果insert_into 有.filter .select等接口的返回值的话就好办了,可以接着往下通过filter进行判断,代码类似以下:


  t_env.from_path("source")\
  .filter("logType=syslog")\
  .insert_into("sink1")\
  .filter("logType=alarm")\
  .insert_into("sink2")
请各位大牛指点,感谢







pyflink的table api 能否根据 filed的值进行判断插入到不同的sink表中

2020-06-18 Thread jack
使用pyflink的table api 能否根据 filed的值进行判断插入到不同的sink表中?


场景:使用pyflink通过filter进行条件过滤后插入到sink中,
比如以下两条消息,logType不同,可以使用filter接口进行过滤后插入到sink表中:
{
"logType":"syslog",
"message":"sla;flkdsjf"
}
{
"logType":"alarm",
"message":"sla;flkdsjf"
}
  t_env.from_path("source")\
  .filter("logType=syslog")\
  .insert_into("sink1")
有方法直接在上面的代码中通过判断logType字段的类型实现类似if else的逻辑吗:
if logType=="syslog":
   insert_into(sink1)
elif logType=="alarm":
   insert_into(sink2)


如果insert_into 有.filter .select等接口的返回值的话就好办了,可以接着往下通过filter进行判断,代码类似以下:


  t_env.from_path("source")\
  .filter("logType=syslog")\
  .insert_into("sink1")\
  .filter("logType=alarm")\
  .insert_into("sink2")
请各位大牛指点,感谢







Re:Re: pyflink连接elasticsearch5.4问题

2020-06-16 Thread jack
连接的版本部分我本地已经修改为 5了,发生了下面的报错;
>> st_env.connect(
>> Elasticsearch()
>> .version("5")
>> .host("localhost", 9200, "http")
>> .index("taxiid-cnts")
>> .document_type('taxiidcnt')
>> .key_delimiter("$")) \














在 2020-06-16 15:38:28,"Dian Fu"  写道:
>I guess it's because the ES version specified in the job is `6`, however, the 
>jar used is `5`.
>
>> 在 2020年6月16日,下午1:47,jack  写道:
>> 
>> 我这边使用的是pyflink连接es的一个例子,我这边使用的es为5.4.1的版本,pyflink为1.10.1,连接jar包我使用的是 
>> flink-sql-connector-elasticsearch5_2.11-1.10.1.jar,kafka,json的连接包也下载了,连接kafka测试成功了。
>> 连接es的时候报错,findAndCreateTableSink   failed。 
>> 是不是es的连接jar包原因造成的?哪位遇到过类似问题还请指导一下,感谢。
>> 
>> Caused by Could not find a suitable  factory for   
>> ‘org.apache.flink.table.factories.TableSinkFactory’ in the classpath.
>> Reason: Required context properties mismatch
>> 
>> 
>> 
>> from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic
>> from pyflink.table import StreamTableEnvironment, DataTypes, 
>> EnvironmentSettings
>> from pyflink.table.descriptors import Schema, Kafka, Json, Rowtime, 
>> Elasticsearch
>> 
>> 
>> def area_cnts():
>> s_env = StreamExecutionEnvironment.get_execution_environment()
>> s_env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
>> s_env.set_parallelism(1)
>> 
>> # use blink table planner
>> st_env = StreamTableEnvironment \
>> .create(s_env, environment_settings=EnvironmentSettings
>> .new_instance()
>> .in_streaming_mode()
>> .use_blink_planner().build())
>> 
>> # register source and sink
>> register_rides_source(st_env)
>> register_cnt_sink(st_env)
>> 
>> # query
>> st_env.from_path("source")\
>> .group_by("taxiId")\
>> .select("taxiId, count(1) as cnt")\
>> .insert_into("sink")
>> 
>> # execute
>> st_env.execute("6-write_with_elasticsearch")
>> 
>> 
>> def register_rides_source(st_env):
>> st_env \
>> .connect(  # declare the external system to connect to
>> Kafka()
>> .version("universal")
>> .topic("Rides")
>> .start_from_earliest()
>> .property("zookeeper.connect", "zookeeper:2181")
>> .property("bootstrap.servers", "kafka:9092")) \
>> .with_format(  # declare a format for this system
>> Json()
>> .fail_on_missing_field(True)
>> .schema(DataTypes.ROW([
>> DataTypes.FIELD("rideId", DataTypes.BIGINT()),
>> DataTypes.FIELD("isStart", DataTypes.BOOLEAN()),
>> DataTypes.FIELD("eventTime", DataTypes.TIMESTAMP()),
>> DataTypes.FIELD("lon", DataTypes.FLOAT()),
>> DataTypes.FIELD("lat", DataTypes.FLOAT()),
>> DataTypes.FIELD("psgCnt", DataTypes.INT()),
>> DataTypes.FIELD("taxiId", DataTypes.BIGINT())]))) \
>> .with_schema(  # declare the schema of the table
>> Schema()
>> .field("rideId", DataTypes.BIGINT())
>> .field("taxiId", DataTypes.BIGINT())
>> .field("isStart", DataTypes.BOOLEAN())
>> .field("lon", DataTypes.FLOAT())
>> .field("lat", DataTypes.FLOAT())
>> .field("psgCnt", DataTypes.INT())
>> .field("rideTime", DataTypes.TIMESTAMP())
>> .rowtime(
>> Rowtime()
>> .timestamps_from_field("eventTime")
>> .watermarks_periodic_bounded(6))) \
>> .in_append_mode() \
>> .register_table_source("source")
>> 
>> 
>> def register_cnt_sink(st_env):
>> st_env.connect(
>> Elasticsearch()
>> .version("6")
>> .host("elasticsearch", 9200, "http")
>> .index("taxiid-cnts")
>> .document_type('taxiidcnt')
>> .key_delimiter("$")) \
>> .with_schema(
>> Schema()
>> .field("taxiId", DataTypes.BIGINT())
>> .field("cnt", DataTypes.BIGINT())) \
>> .with_format(
>>Json()
>>.derive_schema()) \
>> .in_upsert_mode() \
>> .register_table_sink("sink")
>> 
>> 
>> if __name__ == '__main__':
>> area_cnts()
>> 
>


Re:Re: pyflink连接elasticsearch5.4问题

2020-06-16 Thread jack
连接的版本部分我本地已经修改为 5了,发生了下面的报错;
>> st_env.connect(
>> Elasticsearch()
>> .version("5")
>> .host("localhost", 9200, "http")
>> .index("taxiid-cnts")
>> .document_type('taxiidcnt')
>> .key_delimiter("$")) \














在 2020-06-16 15:38:28,"Dian Fu"  写道:
>I guess it's because the ES version specified in the job is `6`, however, the 
>jar used is `5`.
>
>> 在 2020年6月16日,下午1:47,jack  写道:
>> 
>> 我这边使用的是pyflink连接es的一个例子,我这边使用的es为5.4.1的版本,pyflink为1.10.1,连接jar包我使用的是 
>> flink-sql-connector-elasticsearch5_2.11-1.10.1.jar,kafka,json的连接包也下载了,连接kafka测试成功了。
>> 连接es的时候报错,findAndCreateTableSink   failed。 
>> 是不是es的连接jar包原因造成的?哪位遇到过类似问题还请指导一下,感谢。
>> 
>> Caused by Could not find a suitable  factory for   
>> ‘org.apache.flink.table.factories.TableSinkFactory’ in the classpath.
>> Reason: Required context properties mismatch
>> 
>> 
>> 
>> from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic
>> from pyflink.table import StreamTableEnvironment, DataTypes, 
>> EnvironmentSettings
>> from pyflink.table.descriptors import Schema, Kafka, Json, Rowtime, 
>> Elasticsearch
>> 
>> 
>> def area_cnts():
>> s_env = StreamExecutionEnvironment.get_execution_environment()
>> s_env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
>> s_env.set_parallelism(1)
>> 
>> # use blink table planner
>> st_env = StreamTableEnvironment \
>> .create(s_env, environment_settings=EnvironmentSettings
>> .new_instance()
>> .in_streaming_mode()
>> .use_blink_planner().build())
>> 
>> # register source and sink
>> register_rides_source(st_env)
>> register_cnt_sink(st_env)
>> 
>> # query
>> st_env.from_path("source")\
>> .group_by("taxiId")\
>> .select("taxiId, count(1) as cnt")\
>> .insert_into("sink")
>> 
>> # execute
>> st_env.execute("6-write_with_elasticsearch")
>> 
>> 
>> def register_rides_source(st_env):
>> st_env \
>> .connect(  # declare the external system to connect to
>> Kafka()
>> .version("universal")
>> .topic("Rides")
>> .start_from_earliest()
>> .property("zookeeper.connect", "zookeeper:2181")
>> .property("bootstrap.servers", "kafka:9092")) \
>> .with_format(  # declare a format for this system
>> Json()
>> .fail_on_missing_field(True)
>> .schema(DataTypes.ROW([
>> DataTypes.FIELD("rideId", DataTypes.BIGINT()),
>> DataTypes.FIELD("isStart", DataTypes.BOOLEAN()),
>> DataTypes.FIELD("eventTime", DataTypes.TIMESTAMP()),
>> DataTypes.FIELD("lon", DataTypes.FLOAT()),
>> DataTypes.FIELD("lat", DataTypes.FLOAT()),
>> DataTypes.FIELD("psgCnt", DataTypes.INT()),
>> DataTypes.FIELD("taxiId", DataTypes.BIGINT())]))) \
>> .with_schema(  # declare the schema of the table
>> Schema()
>> .field("rideId", DataTypes.BIGINT())
>> .field("taxiId", DataTypes.BIGINT())
>> .field("isStart", DataTypes.BOOLEAN())
>> .field("lon", DataTypes.FLOAT())
>> .field("lat", DataTypes.FLOAT())
>> .field("psgCnt", DataTypes.INT())
>> .field("rideTime", DataTypes.TIMESTAMP())
>> .rowtime(
>> Rowtime()
>> .timestamps_from_field("eventTime")
>> .watermarks_periodic_bounded(6))) \
>> .in_append_mode() \
>> .register_table_source("source")
>> 
>> 
>> def register_cnt_sink(st_env):
>> st_env.connect(
>> Elasticsearch()
>> .version("6")
>> .host("elasticsearch", 9200, "http")
>> .index("taxiid-cnts")
>> .document_type('taxiidcnt')
>> .key_delimiter("$")) \
>> .with_schema(
>> Schema()
>> .field("taxiId", DataTypes.BIGINT())
>> .field("cnt", DataTypes.BIGINT())) \
>> .with_format(
>>Json()
>>.derive_schema()) \
>> .in_upsert_mode() \
>> .register_table_sink("sink")
>> 
>> 
>> if __name__ == '__main__':
>> area_cnts()
>> 
>


pyflink连接elasticsearch5.4问题

2020-06-15 Thread jack
我这边使用的是pyflink连接es的一个例子,我这边使用的es为5.4.1的版本,pyflink为1.10.1,连接jar包我使用的是 
flink-sql-connector-elasticsearch5_2.11-1.10.1.jar,kafka,json的连接包也下载了,连接kafka测试成功了。
连接es的时候报错,findAndCreateTableSink   failed。 
是不是es的连接jar包原因造成的?哪位遇到过类似问题还请指导一下,感谢。


Caused by Could not find a suitable  factory for   
‘org.apache.flink.table.factories.TableSinkFactory’ in the classpath.
Reason: Required context properties mismatch






from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic
from pyflink.table import StreamTableEnvironment, DataTypes, EnvironmentSettings
from pyflink.table.descriptors import Schema, Kafka, Json, Rowtime, 
Elasticsearch




def area_cnts():
s_env = StreamExecutionEnvironment.get_execution_environment()
s_env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
s_env.set_parallelism(1)


# use blink table planner
st_env = StreamTableEnvironment \
.create(s_env, environment_settings=EnvironmentSettings
.new_instance()
.in_streaming_mode()
.use_blink_planner().build())


# register source and sink
register_rides_source(st_env)
register_cnt_sink(st_env)


# query
st_env.from_path("source")\
.group_by("taxiId")\
.select("taxiId, count(1) as cnt")\
.insert_into("sink")


# execute
st_env.execute("6-write_with_elasticsearch")




def register_rides_source(st_env):
st_env \
.connect(  # declare the external system to connect to
Kafka()
.version("universal")
.topic("Rides")
.start_from_earliest()
.property("zookeeper.connect", "zookeeper:2181")
.property("bootstrap.servers", "kafka:9092")) \
.with_format(  # declare a format for this system
Json()
.fail_on_missing_field(True)
.schema(DataTypes.ROW([
DataTypes.FIELD("rideId", DataTypes.BIGINT()),
DataTypes.FIELD("isStart", DataTypes.BOOLEAN()),
DataTypes.FIELD("eventTime", DataTypes.TIMESTAMP()),
DataTypes.FIELD("lon", DataTypes.FLOAT()),
DataTypes.FIELD("lat", DataTypes.FLOAT()),
DataTypes.FIELD("psgCnt", DataTypes.INT()),
DataTypes.FIELD("taxiId", DataTypes.BIGINT())]))) \
.with_schema(  # declare the schema of the table
Schema()
.field("rideId", DataTypes.BIGINT())
.field("taxiId", DataTypes.BIGINT())
.field("isStart", DataTypes.BOOLEAN())
.field("lon", DataTypes.FLOAT())
.field("lat", DataTypes.FLOAT())
.field("psgCnt", DataTypes.INT())
.field("rideTime", DataTypes.TIMESTAMP())
.rowtime(
Rowtime()
.timestamps_from_field("eventTime")
.watermarks_periodic_bounded(6))) \
.in_append_mode() \
.register_table_source("source")




def register_cnt_sink(st_env):
st_env.connect(
Elasticsearch()
.version("6")
.host("elasticsearch", 9200, "http")
.index("taxiid-cnts")
.document_type('taxiidcnt')
.key_delimiter("$")) \
.with_schema(
Schema()
.field("taxiId", DataTypes.BIGINT())
.field("cnt", DataTypes.BIGINT())) \
.with_format(
   Json()
   .derive_schema()) \
.in_upsert_mode() \
.register_table_sink("sink")




if __name__ == '__main__':
area_cnts()



pyflink连接elasticsearch5.4问题

2020-06-15 Thread jack
我这边使用的是pyflink连接es的一个例子,我这边使用的es为5.4.1的版本,pyflink为1.10.1,连接jar包我使用的是 
flink-sql-connector-elasticsearch5_2.11-1.10.1.jar,kafka,json的连接包也下载了,连接kafka测试成功了。
连接es的时候报错,findAndCreateTableSink   failed。 
是不是es的连接jar包原因造成的?哪位遇到过类似问题还请指导一下,感谢。


Caused by Could not find a suitable  factory for   
‘org.apache.flink.table.factories.TableSinkFactory’ in the classpath.
Reason: Required context properties mismatch






from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic
from pyflink.table import StreamTableEnvironment, DataTypes, EnvironmentSettings
from pyflink.table.descriptors import Schema, Kafka, Json, Rowtime, 
Elasticsearch




def area_cnts():
s_env = StreamExecutionEnvironment.get_execution_environment()
s_env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
s_env.set_parallelism(1)


# use blink table planner
st_env = StreamTableEnvironment \
.create(s_env, environment_settings=EnvironmentSettings
.new_instance()
.in_streaming_mode()
.use_blink_planner().build())


# register source and sink
register_rides_source(st_env)
register_cnt_sink(st_env)


# query
st_env.from_path("source")\
.group_by("taxiId")\
.select("taxiId, count(1) as cnt")\
.insert_into("sink")


# execute
st_env.execute("6-write_with_elasticsearch")




def register_rides_source(st_env):
st_env \
.connect(  # declare the external system to connect to
Kafka()
.version("universal")
.topic("Rides")
.start_from_earliest()
.property("zookeeper.connect", "zookeeper:2181")
.property("bootstrap.servers", "kafka:9092")) \
.with_format(  # declare a format for this system
Json()
.fail_on_missing_field(True)
.schema(DataTypes.ROW([
DataTypes.FIELD("rideId", DataTypes.BIGINT()),
DataTypes.FIELD("isStart", DataTypes.BOOLEAN()),
DataTypes.FIELD("eventTime", DataTypes.TIMESTAMP()),
DataTypes.FIELD("lon", DataTypes.FLOAT()),
DataTypes.FIELD("lat", DataTypes.FLOAT()),
DataTypes.FIELD("psgCnt", DataTypes.INT()),
DataTypes.FIELD("taxiId", DataTypes.BIGINT())]))) \
.with_schema(  # declare the schema of the table
Schema()
.field("rideId", DataTypes.BIGINT())
.field("taxiId", DataTypes.BIGINT())
.field("isStart", DataTypes.BOOLEAN())
.field("lon", DataTypes.FLOAT())
.field("lat", DataTypes.FLOAT())
.field("psgCnt", DataTypes.INT())
.field("rideTime", DataTypes.TIMESTAMP())
.rowtime(
Rowtime()
.timestamps_from_field("eventTime")
.watermarks_periodic_bounded(6))) \
.in_append_mode() \
.register_table_source("source")




def register_cnt_sink(st_env):
st_env.connect(
Elasticsearch()
.version("6")
.host("elasticsearch", 9200, "http")
.index("taxiid-cnts")
.document_type('taxiidcnt')
.key_delimiter("$")) \
.with_schema(
Schema()
.field("taxiId", DataTypes.BIGINT())
.field("cnt", DataTypes.BIGINT())) \
.with_format(
   Json()
   .derive_schema()) \
.in_upsert_mode() \
.register_table_sink("sink")




if __name__ == '__main__':
area_cnts()



Re: pyflink数据查询

2020-06-15 Thread jack
hi


感谢您的建议,我这边尝试一下自定义实现sink的方式。




Best,
Jack










在 2020-06-15 18:08:15,"godfrey he"  写道:

hi jack,jincheng


Flink 1.11 支持直接将select的结果collect到本地,例如:
CloseableIterator it = tEnv.executeSql("select ...").collect();
while(it.hasNext()) {
   it.next() 
}


但是 pyflink 还没有引入 collect() 接口。(后续会完善?@jincheng)


但是1.11的TableResult#collect实现对流的query支持不完整(只支持append only的query),master已经完整支持。


可以参照 jincheng 的意见,(或者结合 TableResult#collect 的实现),完成一个自己的 sink 也可以。


Best,
Godfrey






jincheng sun  于2020年6月15日周一 下午4:14写道:

你好 Jack,


>  pyflink 从source通过sql对数据进行查询聚合等操作 
> 不输出到sink中,而是可以直接作为结果,我这边可以通过开发web接口直接查询这个结果,不必去sink中进行查询


我理解你上面说的 【直接作为结果】+ 【web接口查询】已经包含了“sink”的动作。只是这个“sink” 是这样的实现而已。对于您的场景:
1. 如果您想直接将结果不落地(不存储)执行推送的 web页面,可以自定义一个Web Socket的Sink。
2. 如果您不是想直接推送到web页面,而是通过查询拉取结果,那么您上面说的 
【直接作为结果】这句话就要描述一下,您想怎样作为结果?我理解是要落盘的(持久化),所以这里持久化本质也是一个sink。Flink可以支持很多中sink,比如:数据库,文件系统,消息队列等等。您可以参考官方文档:
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html



如果上面回复 没有解决你的问题,欢迎随时反馈~~


Best,
Jincheng






Jeff Zhang  于2020年6月9日周二 下午5:39写道:

可以用zeppelin的z.show 来查询job结果。这边有pyflink在zeppelin上的入门教程 
https://www.bilibili.com/video/BV1Te411W73b?p=20
可以加入钉钉群讨论:30022475







jack  于2020年6月9日周二 下午5:28写道:

问题请教:
描述: pyflink 从source通过sql对数据进行查询聚合等操作 
不输出到sink中,而是可以直接作为结果,我这边可以通过开发web接口直接查询这个结果,不必去sink中进行查询。


flink能否实现这样的方式?
感谢




--

Best Regards

Jeff Zhang

Re: pyflink数据查询

2020-06-15 Thread jack
hi


感谢您的建议,我这边尝试一下自定义实现sink的方式。




Best,
Jack










在 2020-06-15 18:08:15,"godfrey he"  写道:

hi jack,jincheng


Flink 1.11 支持直接将select的结果collect到本地,例如:
CloseableIterator it = tEnv.executeSql("select ...").collect();
while(it.hasNext()) {
   it.next() 
}


但是 pyflink 还没有引入 collect() 接口。(后续会完善?@jincheng)


但是1.11的TableResult#collect实现对流的query支持不完整(只支持append only的query),master已经完整支持。


可以参照 jincheng 的意见,(或者结合 TableResult#collect 的实现),完成一个自己的 sink 也可以。


Best,
Godfrey






jincheng sun  于2020年6月15日周一 下午4:14写道:

你好 Jack,


>  pyflink 从source通过sql对数据进行查询聚合等操作 
> 不输出到sink中,而是可以直接作为结果,我这边可以通过开发web接口直接查询这个结果,不必去sink中进行查询


我理解你上面说的 【直接作为结果】+ 【web接口查询】已经包含了“sink”的动作。只是这个“sink” 是这样的实现而已。对于您的场景:
1. 如果您想直接将结果不落地(不存储)执行推送的 web页面,可以自定义一个Web Socket的Sink。
2. 如果您不是想直接推送到web页面,而是通过查询拉取结果,那么您上面说的 
【直接作为结果】这句话就要描述一下,您想怎样作为结果?我理解是要落盘的(持久化),所以这里持久化本质也是一个sink。Flink可以支持很多中sink,比如:数据库,文件系统,消息队列等等。您可以参考官方文档:
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html



如果上面回复 没有解决你的问题,欢迎随时反馈~~


Best,
Jincheng






Jeff Zhang  于2020年6月9日周二 下午5:39写道:

可以用zeppelin的z.show 来查询job结果。这边有pyflink在zeppelin上的入门教程 
https://www.bilibili.com/video/BV1Te411W73b?p=20
可以加入钉钉群讨论:30022475







jack  于2020年6月9日周二 下午5:28写道:

问题请教:
描述: pyflink 从source通过sql对数据进行查询聚合等操作 
不输出到sink中,而是可以直接作为结果,我这边可以通过开发web接口直接查询这个结果,不必去sink中进行查询。


flink能否实现这样的方式?
感谢




--

Best Regards

Jeff Zhang

Re:Re: pyflink数据查询

2020-06-15 Thread jack
感谢您的建议,目前在学习使用pyflink,使用pyflink做各种有趣的尝试,包括udf函数做日志解析等,也看过
目前官方文档对于pyflink的文档和例子还是偏少,遇到问题了还是需要向各位大牛们多多请教。




Best,
Jack











在 2020-06-15 16:13:32,"jincheng sun"  写道:
>你好 Jack,
>
>>  pyflink 从source通过sql对数据进行查询聚合等操作 不输出到sink中,而是可以直接作为结果,
>我这边可以通过开发web接口直接查询这个结果,不必去sink中进行查询
>
>我理解你上面说的 【直接作为结果】+ 【web接口查询】已经包含了“sink”的动作。只是这个“sink” 是这样的实现而已。对于您的场景:
>1. 如果您想直接将结果不落地(不存储)执行推送的 web页面,可以自定义一个Web Socket的Sink。
>2. 如果您不是想直接推送到web页面,而是通过查询拉取结果,那么您上面说的
>【直接作为结果】这句话就要描述一下,您想怎样作为结果?我理解是要落盘的(持久化),所以这里持久化本质也是一个sink。Flink可以支持很多中sink,比如:数据库,文件系统,消息队列等等。您可以参考官方文档:
>https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html
>
>如果上面回复 没有解决你的问题,欢迎随时反馈~~
>
>Best,
>Jincheng
>
>
>
>Jeff Zhang  于2020年6月9日周二 下午5:39写道:
>
>> 可以用zeppelin的z.show 来查询job结果。这边有pyflink在zeppelin上的入门教程
>> https://www.bilibili.com/video/BV1Te411W73b?p=20
>> 可以加入钉钉群讨论:30022475
>>
>>
>>
>> jack  于2020年6月9日周二 下午5:28写道:
>>
>>> 问题请教:
>>> 描述: pyflink 从source通过sql对数据进行查询聚合等操作
>>> 不输出到sink中,而是可以直接作为结果,我这边可以通过开发web接口直接查询这个结果,不必去sink中进行查询。
>>>
>>> flink能否实现这样的方式?
>>> 感谢
>>>
>>
>>
>> --
>> Best Regards
>>
>> Jeff Zhang
>>


pyflink数据查询

2020-06-09 Thread jack
问题请教:
描述: pyflink 从source通过sql对数据进行查询聚合等操作 
不输出到sink中,而是可以直接作为结果,我这边可以通过开发web接口直接查询这个结果,不必去sink中进行查询。


flink能否实现这样的方式?
感谢

pyflink数据查询

2020-06-09 Thread jack
问题请教:
描述: pyflink 从source通过sql对数据进行查询聚合等操作 
不输出到sink中,而是可以直接作为结果,我这边可以通过开发web接口直接查询这个结果,不必去sink中进行查询。


flink能否实现这样的方式?
感谢

Re: pyflink 使用udf函数 处理成json字符串发送到kafka的格式问题

2020-06-01 Thread jack
非常感谢解惑,刚开始使用pyflink,不是很熟悉,还请多多指教
















在 2020-06-01 20:50:53,"Xingbo Huang"  写道:

Hi, 
其实这个是CSV 
connector的一个可选的quote_character参数的效果,默认值是",所以你每个字段都会这样外面包一层,你可以配置一下这个参数为\0,就可以得到你要的效果了。
st_env.connect(
Kafka()
.version("0.11")
.topic("logSink")
.start_from_earliest()
.property("zookeeper.connect", "localhost:2181")
.property("bootstrap.servers", "localhost:9092")) \
.with_format(  # declare a format for this system
Csv()
.schema(DataTypes.ROW([DataTypes.FIELD("log", DataTypes.STRING())]))
.quote_character("\0")
) \
.with_schema(  # declare the schema of the table
Schema()
.field("log", DataTypes.STRING())) \
.in_append_mode() \
.register_table_sink("sink")



Best,
Xingbo




Re: pyflink 使用udf函数 处理成json字符串发送到kafka的格式问题

2020-06-01 Thread jack
非常感谢解惑,刚开始使用pyflink,不是很熟悉,还请多多指教
















在 2020-06-01 20:50:53,"Xingbo Huang"  写道:

Hi, 
其实这个是CSV 
connector的一个可选的quote_character参数的效果,默认值是",所以你每个字段都会这样外面包一层,你可以配置一下这个参数为\0,就可以得到你要的效果了。
st_env.connect(
Kafka()
.version("0.11")
.topic("logSink")
.start_from_earliest()
.property("zookeeper.connect", "localhost:2181")
.property("bootstrap.servers", "localhost:9092")) \
.with_format(  # declare a format for this system
Csv()
.schema(DataTypes.ROW([DataTypes.FIELD("log", DataTypes.STRING())]))
.quote_character("\0")
) \
.with_schema(  # declare the schema of the table
Schema()
.field("log", DataTypes.STRING())) \
.in_append_mode() \
.register_table_sink("sink")



Best,
Xingbo




pyflink 使用udf函数 处理成json字符串发送到kafka的格式问题

2020-06-01 Thread jack
请教各位,我这边使用pyflink 消费kafka  json字符串数据发送到kafka, 把kafka消息按照一个字段来处理,

数据输入:
{"topic": "logSource", "message": "x=1,y=1,z=1"}


发送到kafka里面的数据结果如下:
"{""topic"": ""logSource"", ""message"": ""x=1,y=1,z=1""}"


又被双引号包了一层, 我的udf函数中使用的是 json模块进行处理,请各位帮我看一下怎么样转成正常的json串。


@udf(input_types=[DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()], 
result_type=DataTypes.STRING())
defkv(log, pair_sep=',', kv_sep='='):
import json

log = json.loads(log)
ret = {}

items = re.split(pair_sep, log.get("message"))

for item in items:

ret[re.split(kv_sep, item)[0]] = re.split(kv_sep, item)[1]

log.update(ret)
log = json.dumps(log)
return log


defregister_source(st_env):
st_env \
.connect( # declare the external system to connect to
Kafka()
.version("0.10")
.topic("logSource")
.start_from_latest()
.property("zookeeper.connect", "localhost:2181")
.property("bootstrap.servers", "localhost:9092")) \
.with_format( # declare a format for this system
Csv()
.schema(DataTypes.ROW([DataTypes.FIELD("log", DataTypes.STRING())]))
.field_delimiter("\n")) \
.with_schema( # declare the schema of the table
Schema()
.field("log", DataTypes.STRING())) \
.in_append_mode() \
.register_table_source("source")


defregister_sink(st_env):
st_env.connect(

Kafka()

.version("0.10")

.topic("logSink")

.start_from_earliest()

.property("zookeeper.connect", "localhost:2181")

.property("bootstrap.servers", "localhost:9092")) \

.with_format(  # declare a format for this system
Csv()

.schema(DataTypes.ROW([DataTypes.FIELD("log", 
DataTypes.STRING())]))) \

.with_schema(  # declare the schema of the table
Schema()

.field("log", DataTypes.STRING())) \

.in_append_mode() \

.register_table_sink("sink")


if __name__ == '__main__':



s_env = StreamExecutionEnvironment.get_execution_environment()

s_env.set_stream_time_characteristic(TimeCharacteristic.EventTime)

s_env.set_parallelism(1)

st_env = StreamTableEnvironment \

.create(s_env, environment_settings=EnvironmentSettings

.new_instance()

.in_streaming_mode()

.use_blink_planner().build())

st_env.register_function('e_kv', e_kv)
register_source(st_env)

register_sink(st_env)

st_env \

.from_path("source") \

.select("kv(log,',', '=') as log") \
.insert_into("sink") \

st_env.execute("test")





pyflink 使用udf函数 处理成json字符串发送到kafka的格式问题

2020-06-01 Thread jack
请教各位,我这边使用pyflink 消费kafka  json字符串数据发送到kafka, 把kafka消息按照一个字段来处理,

数据输入:
{"topic": "logSource", "message": "x=1,y=1,z=1"}


发送到kafka里面的数据结果如下:
"{""topic"": ""logSource"", ""message"": ""x=1,y=1,z=1""}"


又被双引号包了一层, 我的udf函数中使用的是 json模块进行处理,请各位帮我看一下怎么样转成正常的json串。


@udf(input_types=[DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()], 
result_type=DataTypes.STRING())
defkv(log, pair_sep=',', kv_sep='='):
import json

log = json.loads(log)
ret = {}

items = re.split(pair_sep, log.get("message"))

for item in items:

ret[re.split(kv_sep, item)[0]] = re.split(kv_sep, item)[1]

log.update(ret)
log = json.dumps(log)
return log


defregister_source(st_env):
st_env \
.connect( # declare the external system to connect to
Kafka()
.version("0.10")
.topic("logSource")
.start_from_latest()
.property("zookeeper.connect", "localhost:2181")
.property("bootstrap.servers", "localhost:9092")) \
.with_format( # declare a format for this system
Csv()
.schema(DataTypes.ROW([DataTypes.FIELD("log", DataTypes.STRING())]))
.field_delimiter("\n")) \
.with_schema( # declare the schema of the table
Schema()
.field("log", DataTypes.STRING())) \
.in_append_mode() \
.register_table_source("source")


defregister_sink(st_env):
st_env.connect(

Kafka()

.version("0.10")

.topic("logSink")

.start_from_earliest()

.property("zookeeper.connect", "localhost:2181")

.property("bootstrap.servers", "localhost:9092")) \

.with_format(  # declare a format for this system
Csv()

.schema(DataTypes.ROW([DataTypes.FIELD("log", 
DataTypes.STRING())]))) \

.with_schema(  # declare the schema of the table
Schema()

.field("log", DataTypes.STRING())) \

.in_append_mode() \

.register_table_sink("sink")


if __name__ == '__main__':



s_env = StreamExecutionEnvironment.get_execution_environment()

s_env.set_stream_time_characteristic(TimeCharacteristic.EventTime)

s_env.set_parallelism(1)

st_env = StreamTableEnvironment \

.create(s_env, environment_settings=EnvironmentSettings

.new_instance()

.in_streaming_mode()

.use_blink_planner().build())

st_env.register_function('e_kv', e_kv)
register_source(st_env)

register_sink(st_env)

st_env \

.from_path("source") \

.select("kv(log,',', '=') as log") \
.insert_into("sink") \

st_env.execute("test")





Re: pyflink Table Api 使用udf函数报错 Given parameters do not match any signature.

2020-06-01 Thread jack



您理解的是对的,我测试了下,好像pyflink的udf函数不太支持python的可变参数














在 2020-06-01 14:47:21,"Dian Fu"  写道:
>你传的第二个参数是string,这样试一下?
>select("drop_fields(message, array('x'))")
>
>不太确定我是否理解了你的问题(如果上面不行的话,建议你发一下exception)
>
>> 在 2020年6月1日,下午1:59,jack  写道:
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 是的,对应参数没有填写正确,感谢;
>> 另外请教,udf函数是不是目前不支持python的可变参数,我使用可变参数依然会报错参数不对的问题。
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 在 2020-06-01 11:01:34,"Dian Fu"  写道:
>>> The input types should be as following:
>>> 
>>> input_types=[DataTypes.STRING(), DataTypes.ARRAY(DataTypes.STRING())]
>>> 
>>> Regards,
>>> Dian
>>> 
>>>> 在 2020年6月1日,上午10:49,刘亚坤  写道:
>>>> 
>>>> 目前在学习使用pyflink的Table api,请教一个pyflink udf函数报错问题:
>>>> 
>>>> @udf(input_types=[DataTypes.STRING()], result_type=DataTypes.STRING())
>>>> def drop_fields(message, *fields):
>>>>  import json
>>>>  message = json.loads(message)
>>>>  for field in fields:
>>>>message.pop(field)
>>>>  return  json.dumps(message)
>>>> 
>>>> 
>>>> st_env \
>>>>  .form_path("source") \
>>>>  .select("drop_fields(message,'x')") \
>>>>  .insert_into("sink")
>>>> 
>>>> message 格式:
>>>> {“a”:"1","x","2"}
>>>> 
>>>> 报错参数类型不匹配:
>>>> Actual:(java.lang.String, java.lang.String)
>>>> Expected:(org.apache.flink.table.dataformat.BinaryString)
>>>> 
>>>> 新手入门,请多指教,感谢。
>>> 


Re:Re: pyflink Table Api 使用udf函数报错 Given parameters do not match any signature.

2020-05-31 Thread jack






是的,对应参数没有填写正确,感谢;
另外请教,udf函数是不是目前不支持python的可变参数,我使用可变参数依然会报错参数不对的问题。











在 2020-06-01 11:01:34,"Dian Fu"  写道:
>The input types should be as following:
>
>input_types=[DataTypes.STRING(), DataTypes.ARRAY(DataTypes.STRING())]
>
>Regards,
>Dian
>
>> 在 2020年6月1日,上午10:49,刘亚坤  写道:
>> 
>> 目前在学习使用pyflink的Table api,请教一个pyflink udf函数报错问题:
>> 
>> @udf(input_types=[DataTypes.STRING()], result_type=DataTypes.STRING())
>> def drop_fields(message, *fields):
>>   import json
>>   message = json.loads(message)
>>   for field in fields:
>> message.pop(field)
>>   return  json.dumps(message)
>> 
>> 
>> st_env \
>>   .form_path("source") \
>>   .select("drop_fields(message,'x')") \
>>   .insert_into("sink")
>> 
>> message 格式:
>> {“a”:"1","x","2"}
>> 
>> 报错参数类型不匹配:
>> Actual:(java.lang.String, java.lang.String)
>> Expected:(org.apache.flink.table.dataformat.BinaryString)
>> 
>> 新手入门,请多指教,感谢。
>


How to monitor Apache Flink in AWS EMR (ElasticMapReduce)?

2019-03-06 Thread Jack Tuck
I currently have Flink setup and have a Job running on EMR and I'm now trying 
to add monitoring by sending metrics off to prometheus.

I have come across an issue with running Flink on EMR. I'm using Terraform to 
provision EMR (I run ansible after to download and run a job).  Out the box, it 
does not look like EMR's Flink distribution includes the optional jars 
(flink-metrics-prometheus, flink-cep, etc).

Looking at Flink's documentation, it says
> "In order to use this reporter you must copy 
> `/opt/flink-metrics-prometheus-1.6.1.jar` into the `/lib` folder of your 
> Flink distribution"
https://ci.apache.org/projects/flink/flink-docs-release-1.6/monitoring/metrics.html#prometheuspushgateway-orgapacheflinkmetricsprometheusprometheuspushgatewayreporter

But when logging into the EMR master node, neither /etc/flink or /usr/lib/flink 
has a directory called `opts` and i can not see 
`flink-metrics-prometheus-1.6.1.jar` anywhere.

I know Flink has other optional libs you'd usually have to copy if you want to 
use them such as flink-cep, but I'm not sure how to do this when using EMR.

This is the exception i get, which I beleive is because it can not find the 
metrics jar in its classpath.
```
java.lang.ClassNotFoundException: 
org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:264)
at 
org.apache.flink.runtime.metrics.MetricRegistryImpl.(MetricRegistryImpl.java:144)
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.createMetricRegistry(ClusterEntrypoint.java:419)
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.initializeServices(ClusterEntrypoint.java:276)
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:227)
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$0(ClusterEntrypoint.java:191)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1844)
at 
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:190)
at 
org.apache.flink.yarn.entrypoint.YarnSessionClusterEntrypoint.main(YarnSessionClusterEntrypoint.java:137)
```

EMR resource in terraform
```resource "aws_emr_cluster" "emr_flink" {
  name  = "ce-emr-flink-arn"
  release_label = "emr-5.20.0" # 5.21.0 is not found, could be a region thing
  applications  = ["Flink"]

  ec2_attributes {
key_name  = "ce_test"
subnet_id = "${aws_subnet.ce_test_subnet_public.id}"
instance_profile  = 
"${aws_iam_instance_profile.emr_profile.arn}"
emr_managed_master_security_group = "${aws_security_group.allow_all_vpc.id}"
emr_managed_slave_security_group  = "${aws_security_group.allow_all_vpc.id}"
additional_master_security_groups  = 
"${aws_security_group.external_connectivity.id}"
additional_slave_security_groups  = 
"${aws_security_group.external_connectivity.id}"
  }

  ebs_root_volume_size = 100
  master_instance_type = "m4.xlarge"
  core_instance_type   = "m4.xlarge"
  core_instance_count  = 2

  service_role = "${aws_iam_role.iam_emr_service_role.arn}"

  configurations_json = <

How to monitor Apache Flink in AWS EMR (ElasticMapReduce)?

2019-03-06 Thread Jack Tuck
I currently have Flink setup and have a Job running on EMR and I'm now trying 
to add monitoring by sending metrics off to prometheus.

I have come across an issue with running Flink on EMR. I'm using Terraform to 
provision EMR (I run ansible after to download and run a job).  Out the box, it 
does not look like EMR's Flink distribution includes the optional jars 
(flink-metrics-prometheus, flink-cep, etc).

Looking at Flink's documentation, it says
> "In order to use this reporter you must copy 
> `/opt/flink-metrics-prometheus-1.6.1.jar` into the `/lib` folder of your 
> Flink distribution"
https://ci.apache.org/projects/flink/flink-docs-release-1.6/monitoring/metrics.html#prometheuspushgateway-orgapacheflinkmetricsprometheusprometheuspushgatewayreporter

But when logging into the EMR master node, neither /etc/flink or /usr/lib/flink 
has a directory called `opts` and i can not see 
`flink-metrics-prometheus-1.6.1.jar` anywhere.

I know Flink has other optional libs you'd usually have to copy if you want to 
use them such as flink-cep, but I'm not sure how to do this when using EMR.

This is the exception i get, which I beleive is because it can not find the 
metrics jar in its classpath.
```
java.lang.ClassNotFoundException: 
org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:264)
at 
org.apache.flink.runtime.metrics.MetricRegistryImpl.(MetricRegistryImpl.java:144)
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.createMetricRegistry(ClusterEntrypoint.java:419)
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.initializeServices(ClusterEntrypoint.java:276)
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:227)
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$0(ClusterEntrypoint.java:191)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1844)
at 
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:190)
at 
org.apache.flink.yarn.entrypoint.YarnSessionClusterEntrypoint.main(YarnSessionClusterEntrypoint.java:137)
```

EMR resource in terraform
```resource "aws_emr_cluster" "emr_flink" {
  name  = "ce-emr-flink-arn"
  release_label = "emr-5.20.0" # 5.21.0 is not found, could be a region thing
  applications  = ["Flink"]

  ec2_attributes {
key_name  = "ce_test"
subnet_id = "${aws_subnet.ce_test_subnet_public.id}"
instance_profile  = 
"${aws_iam_instance_profile.emr_profile.arn}"
emr_managed_master_security_group = "${aws_security_group.allow_all_vpc.id}"
emr_managed_slave_security_group  = "${aws_security_group.allow_all_vpc.id}"
additional_master_security_groups  = 
"${aws_security_group.external_connectivity.id}"
additional_slave_security_groups  = 
"${aws_security_group.external_connectivity.id}"
  }

  ebs_root_volume_size = 100
  master_instance_type = "m4.xlarge"
  core_instance_type   = "m4.xlarge"
  core_instance_count  = 2

  service_role = "${aws_iam_role.iam_emr_service_role.arn}"

  configurations_json = <

Re: Metrics not reported to graphite

2016-09-01 Thread Jack Huang
Found the solution to the follow-up question:
https://ci.apache.org/projects/flink/flink-docs-release-1.1/setup/config.html#metrics

On Thu, Sep 1, 2016 at 3:46 PM, Jack Huang <jackhu...@mz.com> wrote:

> Hi Greg,
>
> Following your hint, I found the solution here (
> https://issues.apache.org/jira/browse/FLINK-4396) and resolved the issue.
> I had to put all three jars to the lib directory to get it to work.
>
> A follow up questions: can I put a prefix (e.g. flink) to all flink
> metrics instead of having their names starting with the host ip address?
>
> Thanks,
> Jack
>
>
> On Thu, Sep 1, 2016 at 3:04 PM, Greg Hogan <c...@greghogan.com> wrote:
>
>> Have you copied the required jar files into your lib/ directory? Only JMX
>> support is provided in the distribution.
>>
>> On Thu, Sep 1, 2016 at 5:07 PM, Jack Huang <jackhu...@mz.com> wrote:
>>
>>> Hi all,
>>>
>>> I followed the instruction for reporting metrics to a Graphite server on
>>> the official document (https://ci.apache.org/project
>>> s/flink/flink-docs-release-1.1/apis/metrics.html#metric-types).
>>>
>>> Specifically, I have the following config/code in my project
>>>
>>>
>>> metrics.reporters: graphite
>>> metrics.reporter.graphite.class: 
>>> org.apache.flink.metrics.graphite.GraphiteReporter
>>> metrics.reporter.graphite.host: node12
>>> metrics.reporter.graphite.port: 2003
>>>
>>> ​
>>>
>>> events.filter(new RichFilterFunction[Event]() {
>>> lazy val incomingCounter = 
>>> getRuntimeContext.getMetricGroup.counter("event.incoming")
>>> lazy val downsampledCounter = 
>>> getRuntimeContext.getMetricGroup.counter("event.downsampled")
>>> def filter(event:Event):Boolean = {
>>> incomingCounter.inc
>>> if(!event.bid_id.startsWith("0")) {
>>> return false;
>>> }
>>> downsampledCounter.inc
>>> return true;
>>> }
>>> })
>>>
>>> ​
>>>
>>> However I don't see anything on my graphite server. What am I missing?
>>>
>>>
>>> Thanks,
>>> Jack
>>>
>>
>>
>


Re: Metrics not reported to graphite

2016-09-01 Thread Jack Huang
Hi Greg,

Following your hint, I found the solution here (https://issues.apache.org/
jira/browse/FLINK-4396) and resolved the issue. I had to put all three jars
to the lib directory to get it to work.

A follow up questions: can I put a prefix (e.g. flink) to all flink metrics
instead of having their names starting with the host ip address?

Thanks,
Jack


On Thu, Sep 1, 2016 at 3:04 PM, Greg Hogan <c...@greghogan.com> wrote:

> Have you copied the required jar files into your lib/ directory? Only JMX
> support is provided in the distribution.
>
> On Thu, Sep 1, 2016 at 5:07 PM, Jack Huang <jackhu...@mz.com> wrote:
>
>> Hi all,
>>
>> I followed the instruction for reporting metrics to a Graphite server on
>> the official document (https://ci.apache.org/project
>> s/flink/flink-docs-release-1.1/apis/metrics.html#metric-types).
>>
>> Specifically, I have the following config/code in my project
>>
>>
>> metrics.reporters: graphite
>> metrics.reporter.graphite.class: 
>> org.apache.flink.metrics.graphite.GraphiteReporter
>> metrics.reporter.graphite.host: node12
>> metrics.reporter.graphite.port: 2003
>>
>> ​
>>
>> events.filter(new RichFilterFunction[Event]() {
>> lazy val incomingCounter = 
>> getRuntimeContext.getMetricGroup.counter("event.incoming")
>> lazy val downsampledCounter = 
>> getRuntimeContext.getMetricGroup.counter("event.downsampled")
>> def filter(event:Event):Boolean = {
>> incomingCounter.inc
>> if(!event.bid_id.startsWith("0")) {
>> return false;
>> }
>> downsampledCounter.inc
>> return true;
>> }
>> })
>>
>> ​
>>
>> However I don't see anything on my graphite server. What am I missing?
>>
>>
>> Thanks,
>> Jack
>>
>
>


Metrics not reported to graphite

2016-09-01 Thread Jack Huang
Hi all,

I followed the instruction for reporting metrics to a Graphite server on
the official document (
https://ci.apache.org/projects/flink/flink-docs-release-1.1/apis/metrics.html#metric-types
).

Specifically, I have the following config/code in my project


metrics.reporters: graphite
metrics.reporter.graphite.class:
org.apache.flink.metrics.graphite.GraphiteReporter
metrics.reporter.graphite.host: node12
metrics.reporter.graphite.port: 2003

​

events.filter(new RichFilterFunction[Event]() {
lazy val incomingCounter =
getRuntimeContext.getMetricGroup.counter("event.incoming")
lazy val downsampledCounter =
getRuntimeContext.getMetricGroup.counter("event.downsampled")
def filter(event:Event):Boolean = {
incomingCounter.inc
if(!event.bid_id.startsWith("0")) {
return false;
}
downsampledCounter.inc
return true;
}
})

​

However I don't see anything on my graphite server. What am I missing?


Thanks,
Jack


Re: Handle deserialization error

2016-09-01 Thread Jack Huang
Hi Yassine,

For now my workaround is catching exceptions in my custom deserializer and
producing some default object to the downstream. It would still be very
nice to avoid this inefficiency by not producing an object at all.

Thanks,
Jack


On Fri, Aug 26, 2016 at 6:51 PM, Yassine Marzougui <yassmar...@gmail.com>
wrote:

> Hi Jack,
>
> As Robert Metzger mentioned in a previous thread, there's an ongoing
> discussion about the issue in this JIRA: https://issues.apache.
> org/jira/browse/FLINK-3679.
>
> A possible workaround is to use a SimpleStringSchema in the Kafka source,
> and chain it with a flatMap operator where you can use your custom
> deserializer and handle deserialization errors.
>
> Best,
> Yassine
>
> On Aug 27, 2016 02:37, "Jack Huang" <jackhu...@mz.com> wrote:
>
>> Hi all,
>>
>> I have a custom deserializer which I pass to a Kafka source to transform
>> JSON string to Scala case class.
>>
>> val events = env.addSource(new FlinkKafkaConsumer09[Event]("events", new 
>> JsonSerde(classOf[Event], new Event), kafkaProp))
>>
>> ​
>>
>> There are time when the JSON message is malformed, in which case I want
>> to catch the exception, log some error message, and go on to the next
>> message without producing an event to the downstream. It doesn't seem like
>> the DeserializationSchema interface allows such behavior. How could I
>> achieve this?
>>
>> Thanks,
>> Jack
>>
>


Re: Cannot pass objects with null-valued fields to the next operator

2016-09-01 Thread Jack Huang
Hi Stephan,

In the end I decided to specify a default value (e.g. empty string) when a
field is null.

On Mon, Aug 29, 2016 at 11:25 AM, Stephan Ewen <se...@apache.org> wrote:

> Hi!
>
> Null is indeed not supported for some basic data types (tuples / case
> classes).
>
> Can you use Option for nullable fields?
>
> Stephan
>
>
> On Mon, Aug 29, 2016 at 8:04 PM, Jack Huang <jackhu...@mz.com> wrote:
>
>> Hi all,
>>
>> It seems like flink does not allow passing case class objects with
>> null-valued fields to the next operators. I am getting the following error
>> message:
>>
>> *Caused by: java.lang.RuntimeException: Could not forward element to next 
>> operator*
>>at 
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:376)
>>at 
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:358)
>>at 
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:399)
>>at 
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:381)
>>at 
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:346)
>>at 
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:329)
>>at 
>> org.apache.flink.streaming.api.operators.StreamSource$ManualWatermarkContext.collect(StreamSource.java:340)
>>at 
>> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecord(AbstractFetcher.java:225)
>>at 
>> org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.run(Kafka09Fetcher.java:239)
>>at java.lang.Thread.run(Thread.java:745)*Caused by: 
>> java.lang.NullPointerException*
>>at 
>> org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:67)
>>at 
>> org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:32)
>>at 
>> org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:90)
>>at 
>> org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:30)
>>at 
>> org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:90)
>>at 
>> org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:30)
>>at 
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:371)
>>... 9 more
>>
>> ​
>> This error goes away when I force all objects to not have fields with
>> null values. However, null is a valid value in my use case. Is there a way
>> to make it work? I am using flink-1.1.1.
>>
>>
>> Thanks,
>> Jack
>>
>
>


Cannot pass objects with null-valued fields to the next operator

2016-08-29 Thread Jack Huang
Hi all,

It seems like flink does not allow passing case class objects with
null-valued fields to the next operators. I am getting the following error
message:

*Caused by: java.lang.RuntimeException: Could not forward element to
next operator*
   at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:376)
   at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:358)
   at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:399)
   at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:381)
   at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:346)
   at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:329)
   at 
org.apache.flink.streaming.api.operators.StreamSource$ManualWatermarkContext.collect(StreamSource.java:340)
   at 
org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecord(AbstractFetcher.java:225)
   at 
org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.run(Kafka09Fetcher.java:239)
   at java.lang.Thread.run(Thread.java:745)*Caused by:
java.lang.NullPointerException*
   at 
org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:67)
   at 
org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:32)
   at 
org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:90)
   at 
org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:30)
   at 
org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:90)
   at 
org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:30)
   at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:371)
   ... 9 more

​
This error goes away when I force all objects to not have fields with null
values. However, null is a valid value in my use case. Is there a way to
make it work? I am using flink-1.1.1.


Thanks,
Jack


Handle deserialization error

2016-08-26 Thread Jack Huang
Hi all,

I have a custom deserializer which I pass to a Kafka source to transform
JSON string to Scala case class.

val events = env.addSource(new FlinkKafkaConsumer09[Event]("events",
new JsonSerde(classOf[Event], new Event), kafkaProp))

​

There are time when the JSON message is malformed, in which case I want to
catch the exception, log some error message, and go on to the next message
without producing an event to the downstream. It doesn't seem like the
DeserializationSchema
interface allows such behavior. How could I achieve this?

Thanks,
Jack


Re: Cannot use WindowedStream.fold with EventTimeSessionWindows

2016-08-17 Thread Jack Huang
Hi Till,

The session I am dealing with does not have a reliable "end-of-session"
event. It could stop sending events all of sudden or it could keep sending
events forever. I need to be able to determine when a session expire due to
inactivity or to kill off a session if it lives longer than it should.
Therefore I need to perform incremental aggregation.

If I can assume ascending timestamp for the incoming events, would there be
a workaround?

Thanks,
Jack



On Wed, Aug 17, 2016 at 2:17 AM, Till Rohrmann <trohrm...@apache.org> wrote:

> Hi Jack,
>
> the problem with session windows and a fold operation, which is an
> incremental operation, is that you don't have a way to combine partial
> folds when merigng windows. As a workaround you have to specify a window
> function where you get an iterator over all your window elements and then
> perform the fold manually in the window function.
>
> Cheers,
> Till
>
> On Wed, Aug 17, 2016 at 3:21 AM, Jack Huang <jackhu...@mz.com> wrote:
>
>> Hi all,
>>
>> I want to window a series of events using SessionWindow and use fold
>> function to incrementally aggregate the result.
>>
>> events
>> .keyBy(_.id)
>> .window(EventTimeSessionWindows.withGap(Time.minutes(1)))
>> .fold(new Session)(eventFolder)
>>
>> ​
>> However I get
>>
>> java.lang.UnsupportedOperationException: Fold cannot be used with a merging 
>> WindowAssigner.
>>
>> ​
>>
>> Does anyone have a workaround?
>>
>>
>>
>> Thanks,
>> Jack
>>
>>
>>
>>
>


Cannot use WindowedStream.fold with EventTimeSessionWindows

2016-08-16 Thread Jack Huang
Hi all,

I want to window a series of events using SessionWindow and use fold
function to incrementally aggregate the result.

events
.keyBy(_.id)
.window(EventTimeSessionWindows.withGap(Time.minutes(1)))
.fold(new Session)(eventFolder)

​
However I get

java.lang.UnsupportedOperationException: Fold cannot be used with a
merging WindowAssigner.

​

Does anyone have a workaround?



Thanks,
Jack


Re: Parsing source JSON String as Scala Case Class

2016-08-05 Thread Jack Huang
Thanks Stephan. "lazy val" does the trick.

On Thu, Aug 4, 2016 at 2:33 AM, Stephan Ewen <se...@apache.org> wrote:

> If the class has non-serializable members, you need to initialize them
> "lazily" when the objects are already in the distributed execution (after
> serializing / distributing them).
>
> Making a Scala 'val' a 'lazy val' often does the trick (at minimal
> performance cost).
>
> On Thu, Aug 4, 2016 at 3:56 AM, Jack Huang <jackhu...@mz.com> wrote:
>
>> Hi all,
>>
>> I want to read a source of JSON String as Scala Case Class. I don't want
>> to have to write a serde for every case class I have. The idea is:
>>
>> val events = env.addSource(new FlinkKafkaConsumer09[Event]("events", new 
>> JsonSerde(classOf[Event]), kafkaProp))
>>
>> ​
>>
>> I was implementing my own JsonSerde with Jackson/Gson, but in both case I
>> get the error
>>
>> Task not serializable
>> 
>> org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:172)
>> 
>> org.apache.flink.api.scala.ClosureCleaner$.clean(ClosureCleaner.scala:164)
>> 
>> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.scalaClean(StreamExecutionEnvironment.scala:568)
>> 
>> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.scala:498)
>> com.fractionalmedia.stream.PricerEvent$.main(PricerEvent.scala:100)
>>
>> ​
>> It seems that both Jackson and Gson have classes that is not serializable.
>>
>> I couldn't find any other solution to perform this JSON-to-Case-Class
>> parsing, yet it seems a very basic need. What am I missing?
>>
>>
>> Thanks,
>> Jack
>>
>>
>>
>>
>


Parsing source JSON String as Scala Case Class

2016-08-03 Thread Jack Huang
Hi all,

I want to read a source of JSON String as Scala Case Class. I don't want to
have to write a serde for every case class I have. The idea is:

val events = env.addSource(new FlinkKafkaConsumer09[Event]("events",
new JsonSerde(classOf[Event]), kafkaProp))

​

I was implementing my own JsonSerde with Jackson/Gson, but in both case I
get the error

Task not serializable

org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:172)
org.apache.flink.api.scala.ClosureCleaner$.clean(ClosureCleaner.scala:164)

org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.scalaClean(StreamExecutionEnvironment.scala:568)

org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.scala:498)
com.fractionalmedia.stream.PricerEvent$.main(PricerEvent.scala:100)

​
It seems that both Jackson and Gson have classes that is not serializable.

I couldn't find any other solution to perform this JSON-to-Case-Class
parsing, yet it seems a very basic need. What am I missing?


Thanks,
Jack


Re: Container running beyond physical memory limits when processing DataStream

2016-08-03 Thread Jack Huang
Hi Max,

Changing yarn-heap-cutoff-ratio works seem to suffice for the time being.
Thanks for your help.

Regards,
Jack

On Tue, Aug 2, 2016 at 11:11 AM, Jack Huang <jackhu...@mz.com> wrote:

> Hi Max,
>
> Is there a way to limit the JVM memory usage (something like the -Xmx
> flag) for the task manager so that it won't go over the YARN limit but will
> just run GC until there is memory to use? Trying to allocate "enough"
> memory for this stream task is not ideal because I could have indefinitely
> many messages backed-up in the source to be process.
>
> Thanks,
> Jack
>
>
> On Tue, Aug 2, 2016 at 5:21 AM, Maximilian Michels <m...@apache.org> wrote:
>
>> Your job creates a lot of String objects which need to be garbage
>> collected. It could be that the JVM is not fast enough and Yarn kills
>> the JVM for consuming too much memory.
>>
>> You can try two things:
>>
>> 1) Give the task manager more memory
>> 2) Increase the Yarn heap cutoff ratio (e.g yarn.heap-cutoff-ratio: 0.4)
>>
>> If the error still occurs then we need to investigate further.
>>
>> Thanks,
>> Max
>>
>>
>> >
>> >
>> >
>> >
>> > On Fri, Jul 29, 2016 at 11:19 AM, Jack Huang <jackhu...@mz.com> wrote:
>> >>
>> >> Hi Max,
>> >>
>> >> Each events are only a few hundred bytes. I am reading from a Kafka
>> topic
>> >> from some offset in the past, so the events should be flowing in as
>> fast as
>> >> Flink can process them.
>> >>
>> >> The entire YARN task log, which contains both JobManager and
>> TaskManager
>> >> outputs, is attached.
>> >>
>> >> Thanks a lot,
>> >> Jack
>> >>
>> >>
>> >> On Fri, Jul 29, 2016 at 2:04 AM, Maximilian Michels <m...@apache.org>
>> >> wrote:
>> >>>
>> >>> Hi Jack,
>> >>>
>> >>> Considering the type of job you're running, you shouldn't run out of
>> >>> memory. Could it be that the events are quite large strings? It could
>> >>> be that the TextOutputFormat doesn't write to disk fast enough and
>> >>> accumulates memory. Actually, it doesn't perform regular flushing
>> >>> which could be an issue.
>> >>>
>> >>> I'm just guessing, we need to investigate further. Could you please
>> >>> supply the entire JobManager log file output?
>> >>>
>> >>> Thanks,
>> >>> Max
>> >>>
>> >>> On Fri, Jul 29, 2016 at 12:59 AM, Jack Huang <jackhu...@mz.com>
>> wrote:
>> >>> > Hi all,
>> >>> >
>> >>> > I am running a test Flink streaming task under YARN. It reads
>> messages
>> >>> > from
>> >>> > a Kafka topic and writes them to local file system.
>> >>> >
>> >>> > object PricerEvent {
>> >>> > def main(args:Array[String]) {
>> >>> > val kafkaProp = new Properties()
>> >>> > kafkaProp.setProperty("bootstrap.servers", "localhost:6667")
>> >>> > kafkaProp.setProperty("auto.offset.reset", "earliest")
>> >>> >
>> >>> > val env = StreamExecutionEnvironment.getExecutionEnvironment
>> >>> > env.setStateBackend(new MemoryStateBackend)
>> >>> >
>> >>> > val wins = env.addSource(new
>> >>> > FlinkKafkaConsumer09[String]("events",
>> >>> > new SimpleStringSchema, kafkaProp))
>> >>> > wins.writeAsText("/home/user/flink_out/" + new
>> >>> > SimpleDateFormat("-MM-dd_HH-mm-ss").format(new Date))
>> >>> >
>> >>> > env.execute
>> >>> > }
>> >>> > }
>> >>> >
>> >>> > With the command
>> >>> >
>> >>> > flink run -m yarn-cluster -yn 1 -ytm 2048 -c PricerEvent
>> >>> > /home/user/flink-example/build/libs/flink-example-1.0-all.jar
>> >>> >
>> >>> >
>> >>> > The task runs fine for a moment and then terminates. I looked into
>> the
>> >>> > error
>> >>> > log and found following out-of-memory error message:
>> >>

Re: Container running beyond physical memory limits when processing DataStream

2016-08-02 Thread Jack Huang
Hi Max,

Is there a way to limit the JVM memory usage (something like the -Xmx flag)
for the task manager so that it won't go over the YARN limit but will just
run GC until there is memory to use? Trying to allocate "enough" memory for
this stream task is not ideal because I could have indefinitely many
messages backed-up in the source to be process.

Thanks,
Jack


On Tue, Aug 2, 2016 at 5:21 AM, Maximilian Michels <m...@apache.org> wrote:

> Your job creates a lot of String objects which need to be garbage
> collected. It could be that the JVM is not fast enough and Yarn kills
> the JVM for consuming too much memory.
>
> You can try two things:
>
> 1) Give the task manager more memory
> 2) Increase the Yarn heap cutoff ratio (e.g yarn.heap-cutoff-ratio: 0.4)
>
> If the error still occurs then we need to investigate further.
>
> Thanks,
> Max
>
>
> >
> >
> >
> >
> > On Fri, Jul 29, 2016 at 11:19 AM, Jack Huang <jackhu...@mz.com> wrote:
> >>
> >> Hi Max,
> >>
> >> Each events are only a few hundred bytes. I am reading from a Kafka
> topic
> >> from some offset in the past, so the events should be flowing in as
> fast as
> >> Flink can process them.
> >>
> >> The entire YARN task log, which contains both JobManager and TaskManager
> >> outputs, is attached.
> >>
> >> Thanks a lot,
> >> Jack
> >>
> >>
> >> On Fri, Jul 29, 2016 at 2:04 AM, Maximilian Michels <m...@apache.org>
> >> wrote:
> >>>
> >>> Hi Jack,
> >>>
> >>> Considering the type of job you're running, you shouldn't run out of
> >>> memory. Could it be that the events are quite large strings? It could
> >>> be that the TextOutputFormat doesn't write to disk fast enough and
> >>> accumulates memory. Actually, it doesn't perform regular flushing
> >>> which could be an issue.
> >>>
> >>> I'm just guessing, we need to investigate further. Could you please
> >>> supply the entire JobManager log file output?
> >>>
> >>> Thanks,
> >>> Max
> >>>
> >>> On Fri, Jul 29, 2016 at 12:59 AM, Jack Huang <jackhu...@mz.com> wrote:
> >>> > Hi all,
> >>> >
> >>> > I am running a test Flink streaming task under YARN. It reads
> messages
> >>> > from
> >>> > a Kafka topic and writes them to local file system.
> >>> >
> >>> > object PricerEvent {
> >>> > def main(args:Array[String]) {
> >>> > val kafkaProp = new Properties()
> >>> > kafkaProp.setProperty("bootstrap.servers", "localhost:6667")
> >>> > kafkaProp.setProperty("auto.offset.reset", "earliest")
> >>> >
> >>> > val env = StreamExecutionEnvironment.getExecutionEnvironment
> >>> > env.setStateBackend(new MemoryStateBackend)
> >>> >
> >>> > val wins = env.addSource(new
> >>> > FlinkKafkaConsumer09[String]("events",
> >>> > new SimpleStringSchema, kafkaProp))
> >>> > wins.writeAsText("/home/user/flink_out/" + new
> >>> > SimpleDateFormat("-MM-dd_HH-mm-ss").format(new Date))
> >>> >
> >>> > env.execute
> >>> > }
> >>> > }
> >>> >
> >>> > With the command
> >>> >
> >>> > flink run -m yarn-cluster -yn 1 -ytm 2048 -c PricerEvent
> >>> > /home/user/flink-example/build/libs/flink-example-1.0-all.jar
> >>> >
> >>> >
> >>> > The task runs fine for a moment and then terminates. I looked into
> the
> >>> > error
> >>> > log and found following out-of-memory error message:
> >>> >
> >>> > 2016-07-28 22:34:40,397 INFO  org.apache.flink.yarn.YarnJobManager
> >>> > - Container container_e05_1467433388200_0136_01_02 is completed
> >>> > with
> >>> > diagnostics: Container
> >>> > [pid=5832,containerID=container_e05_1467433388200_0136_01_02] is
> >>> > running
> >>> > beyond physical memory limits. Current usage: 2.3 GB of 2 GB physical
> >>> > memory
> >>> > used; 6.1 GB of 4.2 GB virtual memory used. Killing container.
> >>> > Dump of the process-tree for
> contai

Container running beyond physical memory limits when processing DataStream

2016-07-28 Thread Jack Huang
Hi all,

I am running a test Flink streaming task under YARN. It reads messages from
a Kafka topic and writes them to local file system.

object PricerEvent {
def main(args:Array[String]) {
val kafkaProp = new Properties()
kafkaProp.setProperty("bootstrap.servers", "localhost:6667")
kafkaProp.setProperty("auto.offset.reset", "earliest")

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStateBackend(new MemoryStateBackend)

val wins = env.addSource(new
FlinkKafkaConsumer09[String]("events", new SimpleStringSchema,
kafkaProp))
wins.writeAsText("/home/user/flink_out/" + new
SimpleDateFormat("-MM-dd_HH-mm-ss").format(new Date))

env.execute
}
}

​
With the command

flink run -m yarn-cluster -yn 1 -ytm 2048 -c PricerEvent
/home/user/flink-example/build/libs/flink-example-1.0-all.jar

​

The task runs fine for a moment and then terminates. I looked into the
error log and found following out-of-memory error message:

2016-07-28 22:34:40,397 INFO  org.apache.flink.yarn.YarnJobManager
 - Container
container_e05_1467433388200_0136_01_02 is completed with
diagnostics: Container
[pid=5832,containerID=container_e05_1467433388200_0136_01_02] is
running beyond physical memory limits. Current usage: 2.3 GB of 2 GB
physical memory used; 6.1 GB of 4.2 GB virtual memory used. Killing
container.
Dump of the process-tree for container_e05_1467433388200_0136_01_02 :
|- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS)
SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES)
FULL_CMD_LINE
|- 5838 5832 5832 5832 (java) 2817 481 6553391104 592779
/usr/jdk64/jdk1.8.0_60/bin/java -Xms1448m -Xmx1448m
-XX:MaxDirectMemorySize=1448m
-Dlog.file=/mnt/a/hadoop/yarn/log/application_1467433388200_0136/container_e05_1467433388200_0136_01_02/taskmanager.log
-Dlogback.configurationFile=file:logback.xml
-Dlog4j.configuration=file:log4j.properties
org.apache.flink.yarn.YarnTaskManagerRunner --configDir .
|- 5832 5830 5832 5832 (bash) 0 0 12759040 357 /bin/bash -c
/usr/jdk64/jdk1.8.0_60/bin/java -Xms1448m -Xmx1448m
-XX:MaxDirectMemorySize=1448m
-Dlog.file=/mnt/a/hadoop/yarn/log/application_1467433388200_0136/container_e05_1467433388200_0136_01_02/taskmanager.log
-Dlogback.configurationFile=file:logback.xml
-Dlog4j.configuration=file:log4j.properties
org.apache.flink.yarn.YarnTaskManagerRunner --configDir . 1>
/mnt/a/hadoop/yarn/log/application_1467433388200_0136/container_e05_1467433388200_0136_01_02/taskmanager.out
2> 
/mnt/a/hadoop/yarn/log/application_1467433388200_0136/container_e05_1467433388200_0136_01_02/taskmanager.err

Container killed on request. Exit code is 143
Container exited with a non-zero exit code 143

​

I don't understand how it should fail due to out-of-memory error. I would
expect the task to slow down if there are too many messages to process, but
not failing altogether. I am not storing any states either.

Does anyone know the reason and the way to fix/avoid this issue?


Thanks,
Jack


Periodically evicting internal states when using mapWithState()

2016-06-06 Thread Jack Huang
Hi all,

I have an incoming stream of event objects, each with its session ID. I am
writing a task that aggregate the events by session. The general logics
looks like

case class Event(sessionId:Int, data:String)case class Session(id:Int,
var events:List[Event])
val events = ... //some source
events.
.keyBy((event:Event) => event.sessionId)
.mapWithState((event:Event, state:Option[Session]) => {
val session = state.getOrElse(Session(id=event.session_id, events=List()))
session.event = session.event :+ event
(session, Some(session))
})

The problem is that there is no reliable way of knowing the end of a
session, since events are likely to get lost. If I keep this process
running, the number of stored sessions will keep growing until it fills up
the disk.

Is there a recommended way of periodically evicting sessions that are too
old (e.g. a day old)?



Thanks,
Jack


Replays message in Kafka topics with FlinkKafkaConsumer09

2016-04-21 Thread Jack Huang
Hi all,

I am trying to force my job to reprocess old messages in my Kafka topics
but couldn't get it to work. Here is my FlinkKafkaConsumer09 setup:

val kafkaProp = new Properties()
kafkaProp.setProperty("bootstrap.servers", "localhost:6667")
kafkaProp.setProperty("auto.offset.reset", "earliest")

env.addSource(new FlinkKafkaConsumer09[String](input, new
SimpleStringSchema, kafkaProp))
.print

​

I thought *auto.offset.reset* is going to do the trick. What am I missing
here?


Thanks,

Jack Huang


Re: Checkpoint and restore states

2016-04-21 Thread Jack Huang
@Stefano, Aljoscha:

Thank you for pointing that out. With the following steps I verified that
the state of the job gets restored

   1. Use HDFS as state backend with env.setStateBackend(new
   FsStateBackend("hdfs:///home/user/flink/KafkaWordCount"))
   2. Start the job. In my case the job ID is
   e4b5316ae4ea0c8ed6fab4fa238b4b2f
   3. Observe that
   hdfs:///home/user/flink/KafkaWordCount/e4b5316ae4ea0c8ed6fab4fa238b4b2f
   is created
   4. Kill all TaskManager, but leave job manager running
   5. Restart all TaskManager with bin/start-cluster.sh
   6. Observe that the job manager automatically restarts the job under the
   same job ID
   7. Observe from the output that the states are restored


Jack



Jack Huang

On Thu, Apr 21, 2016 at 1:40 AM, Aljoscha Krettek <aljos...@apache.org>
wrote:

> Hi,
> yes Stefano is spot on! The state is only restored if a job is restarted
> because of abnormal failure. For state that survives stopping/canceling a
> job you can look at savepoints:
> https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/savepoints.html
>  This
> essentially uses the same mechanisms as the fault-tolerance stuff for state
> but makes it explicit and allows restarting from different savepoints.
>
> Cheers,
> Aljoscha
>
> On Wed, 20 Apr 2016 at 22:43 Stefano Baghino <
> stefano.bagh...@radicalbit.io> wrote:
>
>> Hello again,
>>
>> thanks for giving a shot at my advice anyway but Aljoscha is far more
>> knowledgeable then me regarding Flink. :)
>>
>> I hope I'm not getting mixed up again but I think gracefully canceling
>> your job means you lose your job state. Am I right in saying that the state
>> is preserved in case of abnormal termination (e.g.: the JobManager crashes)
>> or if you explicitly create a savepoint?
>>
>> On Wed, Apr 20, 2016 at 10:13 PM, Jack Huang <jackhu...@machinezone.com>
>> wrote:
>>
>>> @Aljoscha:
>>> For this word count example I am using a kafka topic as the input
>>> stream. The problem is that when I cancel the task and restart it, the task
>>> loses the accumulated word counts so far and start counting from 1 again.
>>> Am I missing something basic here?
>>>
>>> @Stefano:
>>> I also tried to implements the Checkpointed interface but had no luck
>>> either. Canceling and restarting the task did not restore the states. Here
>>> is my class:
>>>
>>> inStream.flatMap({ _.toLowerCase.split("\\W+") filter { _.nonEmpty } })
>>>>   .keyBy({s => s})
>>>>   .map(new StatefulCounter)
>>>
>>>
>>> class StatefulCounter extends RichMapFunction[String, (String,Int)] with
>>>> Checkpointed[Integer] {
>>>>   private var count: Integer = 0
>>>>
>>>>   def map(in: String): (String,Int) = {
>>>> count += 1
>>>> return (in, count)
>>>>   }
>>>>   def snapshotState(l: Long, l1: Long): Integer = {
>>>> count
>>>>   }
>>>>   def restoreState(state: Integer) {
>>>> count = state
>>>>   }
>>>> }
>>>
>>>
>>>
>>> Thanks,
>>>
>>>
>>> Jack Huang
>>>
>>> On Wed, Apr 20, 2016 at 5:36 AM, Stefano Baghino <
>>> stefano.bagh...@radicalbit.io> wrote:
>>>
>>>> My bad, thanks for pointing that out.
>>>>
>>>> On Wed, Apr 20, 2016 at 1:49 PM, Aljoscha Krettek <aljos...@apache.org>
>>>> wrote:
>>>>
>>>>> Hi,
>>>>> the *withState() family of functions use the Key/Value state interface
>>>>> internally, so that should work.
>>>>>
>>>>> On Wed, 20 Apr 2016 at 12:33 Stefano Baghino <
>>>>> stefano.bagh...@radicalbit.io> wrote:
>>>>>
>>>>>> Hi Jack,
>>>>>>
>>>>>> it seems you correctly enabled the checkpointing by calling
>>>>>> `env.enableCheckpointing`. However, your UDFs have to either implement 
>>>>>> the
>>>>>> Checkpointed interface or use the Key/Value State interface to make sure
>>>>>> the state of the computation is snapshotted.
>>>>>>
>>>>>> The documentation explains how to define your functions so that they
>>>>>> checkpoint the state far better than I could in this post:
>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/streaming/state.html
>>>>>>
>>&g

Re: Checkpoint and restore states

2016-04-20 Thread Jack Huang
@Aljoscha:
For this word count example I am using a kafka topic as the input stream.
The problem is that when I cancel the task and restart it, the task loses
the accumulated word counts so far and start counting from 1 again. Am I
missing something basic here?

@Stefano:
I also tried to implements the Checkpointed interface but had no luck
either. Canceling and restarting the task did not restore the states. Here
is my class:

inStream.flatMap({ _.toLowerCase.split("\\W+") filter { _.nonEmpty } })
>   .keyBy({s => s})
>   .map(new StatefulCounter)


class StatefulCounter extends RichMapFunction[String, (String,Int)] with
> Checkpointed[Integer] {
>   private var count: Integer = 0
>
>   def map(in: String): (String,Int) = {
> count += 1
> return (in, count)
>   }
>   def snapshotState(l: Long, l1: Long): Integer = {
> count
>   }
>   def restoreState(state: Integer) {
> count = state
>   }
> }



Thanks,


Jack Huang

On Wed, Apr 20, 2016 at 5:36 AM, Stefano Baghino <
stefano.bagh...@radicalbit.io> wrote:

> My bad, thanks for pointing that out.
>
> On Wed, Apr 20, 2016 at 1:49 PM, Aljoscha Krettek <aljos...@apache.org>
> wrote:
>
>> Hi,
>> the *withState() family of functions use the Key/Value state interface
>> internally, so that should work.
>>
>> On Wed, 20 Apr 2016 at 12:33 Stefano Baghino <
>> stefano.bagh...@radicalbit.io> wrote:
>>
>>> Hi Jack,
>>>
>>> it seems you correctly enabled the checkpointing by calling
>>> `env.enableCheckpointing`. However, your UDFs have to either implement the
>>> Checkpointed interface or use the Key/Value State interface to make sure
>>> the state of the computation is snapshotted.
>>>
>>> The documentation explains how to define your functions so that they
>>> checkpoint the state far better than I could in this post:
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/streaming/state.html
>>>
>>> I hope I've been of some help, I'll gladly help you further if you need
>>> it.
>>>
>>> On Wed, Apr 20, 2016 at 11:34 AM, Aljoscha Krettek <aljos...@apache.org>
>>> wrote:
>>>
>>>> Hi,
>>>> what seems to be the problem?
>>>>
>>>> Cheers,
>>>> Aljoscha
>>>>
>>>> On Wed, 20 Apr 2016 at 03:52 Jack Huang <jackhu...@machinezone.com>
>>>> wrote:
>>>>
>>>>> Hi all,
>>>>>
>>>>> I am doing a simple word count example and want to checkpoint the
>>>>> accumulated word counts. I am not having any luck getting the counts saved
>>>>> and restored. Can someone help?
>>>>>
>>>>> env.enableCheckpointing(1000)
>>>>>
>>>>> env.setStateBackend(new MemoryStateBackend())
>>>>>
>>>>>
>>>>>>  ...
>>>>>
>>>>>
>>>>>
>>>>> inStream
>>>>>> .keyBy({s => s})
>>>>>>
>>>>>>
>>>>>>
>>>>>> *.mapWithState((in:String, count:Option[Int]) => {val
>>>>>> newCount = count.getOrElse(0) + 1((in, newCount), Some(newCount))
>>>>>>   })*
>>>>>> .print()
>>>>>
>>>>>
>>>>>
>>>>> Thanks,
>>>>>
>>>>> Jack Huang
>>>>>
>>>>
>>>
>>>
>>> --
>>> BR,
>>> Stefano Baghino
>>>
>>> Software Engineer @ Radicalbit
>>>
>>
>
>
> --
> BR,
> Stefano Baghino
>
> Software Engineer @ Radicalbit
>


Checkpoint and restore states

2016-04-19 Thread Jack Huang
Hi all,

I am doing a simple word count example and want to checkpoint the
accumulated word counts. I am not having any luck getting the counts saved
and restored. Can someone help?

env.enableCheckpointing(1000)

env.setStateBackend(new MemoryStateBackend())


>  ...



inStream
> .keyBy({s => s})
>
>
>
> *.mapWithState((in:String, count:Option[Int]) => {val newCount =
> count.getOrElse(0) + 1((in, newCount), Some(newCount))})*
> .print()



Thanks,

Jack Huang


Re: Immutable data

2015-09-23 Thread Jack
Hi Stephan!

Here's the trace (flink 0.9.1 + scala 2.10.5)

Exception in thread "main" 
org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:314)
at 
scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
at 
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
at 
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
at 
org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36)
at 
org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
at 
org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)
at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
at 
org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
at akka.dispatch.Mailbox.run(Mailbox.scala:221)
at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.RuntimeException: Cannot instantiate StreamRecord.
at 
org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.createInstance(StreamRecordSerializer.java:63)
at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.readNext(OneInputStreamTask.java:66)
at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.invoke(OneInputStreamTask.java:101)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: Cannot instantiate class.
at 
org.apache.flink.api.java.typeutils.runtime.PojoSerializer.createInstance(PojoSerializer.java:225)
at 
org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.createInstance(StreamRecordSerializer.java:60)
... 4 more
Caused by: java.lang.IllegalArgumentException: Can not set int field 
org.myorg.quickstart.Test$X.id to org.myorg.quickstart.Test$Id
at 
sun.reflect.UnsafeFieldAccessorImpl.throwSetIllegalArgumentException(UnsafeFieldAccessorImpl.java:167)
at 
sun.reflect.UnsafeFieldAccessorImpl.throwSetIllegalArgumentException(UnsafeFieldAccessorImpl.java:171)
at 
sun.reflect.UnsafeIntegerFieldAccessorImpl.set(UnsafeIntegerFieldAccessorImpl.java:98)
at java.lang.reflect.Field.set(Field.java:764)
at 
org.apache.flink.api.java.typeutils.runtime.PojoSerializer.initializeFields(PojoSerializer.java:232)
at 
org.apache.flink.api.java.typeutils.runtime.PojoSerializer.createInstance(PojoSerializer.java:221)
... 5 more

> On 23 Sep 2015, at 14:37, Stephan Ewen <se...@apache.org> wrote:
> 
> Hi Jack!
> 
> This should be supported, there is no strict requirement for mutable types.
> 
> The POJO rules apply only if you want to use the "by-field-name" addressing 
> for keys. In Scala, you should be able to use case classes as well, even if 
> they are immutable.
> 
> Can you post the exception that you get?
> 
> Greetings,
> Stephan
> 
> 
>> On Wed, Sep 23, 2015 at 1:29 PM, Jack <jack-kn...@marmelandia.com> wrote:
>> Hi,
>> 
>> I'm having trouble integrating existing Scala code with Flink, due to 
>> POJO-only requirement.
>> 
>> We're using AnyVal heavily for type safety, and immutable classes as a 
>> default. For example, the following does not work:
>> 
>> object Test {
>>   class Id(val underlying: Int) extends AnyVal
>> 
>>   class X(var id: Id) {
>> def this() { this(new Id(0)) }
>>   }
>> 
>>   class MySource extends SourceFunction[X] {
>> def run(ctx: SourceFunction.SourceContext[X]) {
>>   ctx.collect(new X(new Id(1)))
>> }
>> def cancel() {}
>>   }
>> 
>>   def main(args: Array[String]) {
>> val env = StreamExecutionContext.getExecutionContext
>> env.addSource(new MySource).print
>> env.execute("Test")
>>   }
>> }
>> 
>> Currently I'm thinking that I would need to have duplicate classes and code 
>> for Flint and for non-Flint code, or somehow use immutable interfaces for 
>> non-Flint code. Both ways are expensive in terms of development time.
>> 
>> Would you have any guidance on how to integrate Flink with a code base that 
>> has immutability as a norm?
>> 
>> Thanks
> 


Re: Immutable data

2015-09-23 Thread Jack
Thanks Aljoscha, that works!

I tried passing values to base class constructor. Modifying the previous 
example slightly:

class Base(var a: Int)
class X(b: Int) extends Base(b) {
 this() { this(0) }
}

The code runs (even without Kryo) but compiler complains about b being 
immutable. 

> On 23 Sep 2015, at 15:02, Aljoscha Krettek <aljos...@apache.org> wrote:
> 
> Hi Jack,
> Stephan is right, this should work. Unfortunately the TypeAnalyzer does not 
> correctly detect that it cannot treat your Id class as a Pojo. I will add a 
> Jira issue for that. For the time being you can use this command to force the 
> system to use Kryo:
> 
> env.getConfig.enableForceKryo();
> 
> I hope this helps.
> 
> Regards,
> Aljoscha
> 
> 
>> On Wed, 23 Sep 2015 at 13:37 Stephan Ewen <se...@apache.org> wrote:
>> Hi Jack!
>> 
>> This should be supported, there is no strict requirement for mutable types.
>> 
>> The POJO rules apply only if you want to use the "by-field-name" addressing 
>> for keys. In Scala, you should be able to use case classes as well, even if 
>> they are immutable.
>> 
>> Can you post the exception that you get?
>> 
>> Greetings,
>> Stephan
>> 
>> 
>>> On Wed, Sep 23, 2015 at 1:29 PM, Jack <jack-kn...@marmelandia.com> wrote:
>>> Hi,
>>> 
>>> I'm having trouble integrating existing Scala code with Flink, due to 
>>> POJO-only requirement.
>>> 
>>> We're using AnyVal heavily for type safety, and immutable classes as a 
>>> default. For example, the following does not work:
>>> 
>>> object Test {
>>>   class Id(val underlying: Int) extends AnyVal
>>> 
>>>   class X(var id: Id) {
>>> def this() { this(new Id(0)) }
>>>   }
>>> 
>>>   class MySource extends SourceFunction[X] {
>>> def run(ctx: SourceFunction.SourceContext[X]) {
>>>   ctx.collect(new X(new Id(1)))
>>> }
>>> def cancel() {}
>>>   }
>>> 
>>>   def main(args: Array[String]) {
>>> val env = StreamExecutionContext.getExecutionContext
>>> env.addSource(new MySource).print
>>> env.execute("Test")
>>>   }
>>> }
>>> 
>>> Currently I'm thinking that I would need to have duplicate classes and code 
>>> for Flint and for non-Flint code, or somehow use immutable interfaces for 
>>> non-Flint code. Both ways are expensive in terms of development time.
>>> 
>>> Would you have any guidance on how to integrate Flink with a code base that 
>>> has immutability as a norm?
>>> 
>>> Thanks


Immutable data

2015-09-23 Thread Jack
Hi,

I'm having trouble integrating existing Scala code with Flink, due to POJO-only 
requirement. 

We're using AnyVal heavily for type safety, and immutable classes as a default. 
For example, the following does not work:

object Test {
  class Id(val underlying: Int) extends AnyVal

  class X(var id: Id) {
def this() { this(new Id(0)) } 
  }

  class MySource extends SourceFunction[X] {
def run(ctx: SourceFunction.SourceContext[X]) {
  ctx.collect(new X(new Id(1)))
}
def cancel() {}
  }

  def main(args: Array[String]) {
val env = StreamExecutionContext.getExecutionContext
env.addSource(new MySource).print
env.execute("Test")
  }
}

Currently I'm thinking that I would need to have duplicate classes and code for 
Flint and for non-Flint code, or somehow use immutable interfaces for non-Flint 
code. Both ways are expensive in terms of development time. 

Would you have any guidance on how to integrate Flink with a code base that has 
immutability as a norm?

Thanks