Re: How does flink know which data is modified in dynamic table?

2018-08-20 Thread 徐涛
Hi Hequn,
Another question, for some case, I think update the timestamp of the 
retract row is reasonable, for example, some user does not want to the hard 
delete, but the soft delete, so I write code when the retract row comes I only 
do the soft delete, but I want the update_timestamp different so the ETL 
program can know that this line has changed.


For example, if the value is updated from 1 to 2, 
previous row:  add (a, 1, 2018-08-20 20:18:10.286)
retract row: delete (a, 1, 2018-08-20 20:18:10.386)
new row: add (a, 2, 2018-08-20 20:18:10.486)

> 在 2018年8月21日,下午12:25,Hequn Cheng  写道:
> 
> Hi Henry,
> 
> You are right that, in MySQL, SYSDATE returns the time at which it executes 
> while LOCALTIMESTAMP returns a constant time that indicates the time at which 
> the statement began to execute.
> But other database system seems don't have this constraint(correct me if I'm 
> wrong). Sometimes we don't have to follow MySQL.
> 
> Best, Hequn
> 
> On Tue, Aug 21, 2018 at 10:21 AM, 徐涛  > wrote:
> Hi Hequn,
>   Maybe I do not express clearly. I mean if only the update_timestamp of 
> the increment data is updated, it is not enough. Because from the sql, it 
> express the idea “all the time in the table is the same”, but actually each 
> item in the table may be different. It is a bit weird.
> 
> Best, Henry
> 
> 
> 
>> 在 2018年8月21日,上午10:09,Hequn Cheng > > 写道:
>> 
>> Hi Henry,
>> 
>> If you upsert by key 'article_id', the result is correct, i.e, the result is 
>> (a, 2, 2018-08-20 20:18:10.486). What do you think?
>> 
>> Best, Hequn
>> 
>> 
>> 
>> On Tue, Aug 21, 2018 at 9:44 AM, 徐涛 > > wrote:
>> Hi Hequn,
>>  However is it semantically correct? because the sql result is not equal 
>> to the bounded table.
>>  
>> 
>>> 在 2018年8月20日,下午8:34,Hequn Cheng >> > 写道:
>>> 
>>> Hi Henry,
>>> 
>>> Both sql output incrementally. 
>>> 
>>> However there are some problems if you use retract sink. You have to pay 
>>> attention to the timestamp field since each time the value is different.  
>>> For example, if the value is updated from 1 to 2, 
>>> previous row:  add (a, 1, 2018-08-20 20:18:10.286)
>>> retract row: delete (a, 1, 2018-08-20 20:18:10.386)
>>> new row: add (a, 2, 2018-08-20 20:18:10.486)
>>> The retract row is different from the previous row because of the time 
>>> field.
>>> 
>>> Of course, this problem should be fixed later.
>>> 
>>> Best, Hequn
>>> 
>>> On Mon, Aug 20, 2018 at 6:43 PM, 徐涛 >> > wrote:
>>> Hi All,
>>> Like the following code,If I use retract stream, I think Flink is able 
>>> to know which item is modified( if praise has 1 items now, when one 
>>> item comes to the stream, only very small amount of data is write to sink) 
>>> var praiseAggr = tableEnv.sqlQuery(s"SELECT article_id,hll(uid) as PU 
>>> FROM praise group by article_id” )
>>> tableEnv.registerTable("finalTable", praiseAggr)
>>> tableEnv.sqlUpdate(s"insert into sinkTableName SELECT * from 
>>> finalTable")
>>> 
>>> But if I use the following sql, by adding a dynamic timestamp field:
>>> var praiseAggr = tableEnv.sqlQuery(s"SELECT article_id,hll(uid) 
>>> as PU,LOCALTIMESTAMP as update_timestamp FROM praise group by article_id” )
>>>   Is the whole table flush to the sink? Or only the incremental value 
>>> will flush to the sink? Why?
>>> 
>>> Thanks,
>>> Henry
>>> 
>>> 
>> 
>> 
> 
> 



Re: Flink checkpointing to Google Cloud Storage

2018-08-20 Thread Oleksandr Serdiukov
Hi Vino,

I don’t think this is lack of dependencies.
If you look at the last line before NoClassDefFoundError you’ll see that the 
class actually is GoogleCloudStorageImpl and missing dependency is 
GoogleCloudStorageImpl$6.I can see both classes in the shaded jar. Seems like 
classloader issue. But I am still lost of what can I try next.

Best regards,
Oleksandr


> On Aug 21, 2018, at 5:00 AM, vino yang  wrote:
> 
> Hi Oleksandr,
> 
> From the exception log, you seem to lack the relevant dependencies? 
> You can check again which dependency the related class belongs to.
> 
> Thanks, vino.
> 
> Oleksandr Serdiukov mailto:d...@serdukoff.me>> 
> 于2018年8月21日周二 上午12:04写道:
> Hello All!
> 
> I am trying to configure checkpoints for flink jobs in GCS. 
> Now I am able to write checkpoints but cannot restore from it:
> 
> java.lang.NoClassDefFoundError: 
> com/google/cloud/hadoop/gcsio/GoogleCloudStorageImpl$6
>   at 
> com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl.open(GoogleCloudStorageImpl.java:666)
>   at 
> com.google.cloud.hadoop.gcsio.GoogleCloudStorageFileSystem.open(GoogleCloudStorageFileSystem.java:323)
>   at 
> com.google.cloud.hadoop.fs.gcs.GoogleHadoopFSInputStream.(GoogleHadoopFSInputStream.java:136)
>   at 
> com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.open(GoogleHadoopFileSystemBase.java:1102)
>   at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:787)
>   at 
> org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.open(HadoopFileSystem.java:119)
>   at 
> org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.open(HadoopFileSystem.java:36)
>   at 
> org.apache.flink.core.fs.SafetyNetWrapperFileSystem.open(SafetyNetWrapperFileSystem.java:80)
>   at 
> org.apache.flink.runtime.state.filesystem.FileStateHandle.openInputStream(FileStateHandle.java:68)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalRestoreOperation.copyStateDataHandleData(RocksDBKeyedStateBackend.java:1005)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalRestoreOperation.transferAllDataFromStateHandles(RocksDBKeyedStateBackend.java:988)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalRestoreOperation.transferAllStateDataToDirectory(RocksDBKeyedStateBackend.java:974)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalRestoreOperation.restoreInstance(RocksDBKeyedStateBackend.java:758)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalRestoreOperation.restore(RocksDBKeyedStateBackend.java:732)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.restore(RocksDBKeyedStateBackend.java:443)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.restore(RocksDBKeyedStateBackend.java:149)
>   at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:151)
>   at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:123)
>   at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:276)
>   at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:132)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:227)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:730)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:295)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
>   at java.lang.Thread.run(Thread.java:748)
> 
> My current setup:
> 
> 
> com.google.cloud.bigdataoss
> gcs-connector
> hadoop2-1.9.5
> 
> 
> Flink image: flink:1.5.2-hadoop28
> 
> Thank you in advance!



Re: How does flink know which data is modified in dynamic table?

2018-08-20 Thread Hequn Cheng
Hi Henry,

You are right that, in MySQL, SYSDATE returns the time at which it executes
while LOCALTIMESTAMP returns a constant time that indicates the time at
which the statement began to execute.
But other database system seems don't have this constraint(correct me if
I'm wrong). Sometimes we don't have to follow MySQL.

Best, Hequn

On Tue, Aug 21, 2018 at 10:21 AM, 徐涛  wrote:

> Hi Hequn,
> Maybe I do not express clearly. I mean if only the update_timestamp of the
> increment data is updated, it is not enough. Because from the sql, it
> express the idea “all the time in the table is the same”, but actually each
> item in the table may be different. It is a bit weird.
>
> Best, Henry
>
>
>
> 在 2018年8月21日,上午10:09,Hequn Cheng  写道:
>
> Hi Henry,
>
> If you upsert by key 'article_id', the result is correct, i.e, the result
> is (a, 2, 2018-08-20 20:18:10.486). What do you think?
>
> Best, Hequn
>
>
>
> On Tue, Aug 21, 2018 at 9:44 AM, 徐涛  wrote:
>
>> Hi Hequn,
>> However is it semantically correct? because the sql result is not equal
>> to the bounded table.
>>
>>
>> 在 2018年8月20日,下午8:34,Hequn Cheng  写道:
>>
>> Hi Henry,
>>
>> Both sql output incrementally.
>>
>> However there are some problems if you use retract sink. You have to pay
>> attention to the timestamp field since each time the value is different.
>> For example, if the value is updated from 1 to 2,
>>
>> previous row:  add (a, 1, 2018-08-20 20:18:10.286)
>> retract row: delete (a, 1, 2018-08-20 20:18:10.386)
>> new row: add (a, 2, 2018-08-20 20:18:10.486)
>>
>> The retract row is different from the previous row because of the time
>> field.
>>
>> Of course, this problem should be fixed later.
>>
>> Best, Hequn
>>
>> On Mon, Aug 20, 2018 at 6:43 PM, 徐涛  wrote:
>>
>>> Hi All,
>>> Like the following code,If I use retract stream, I think Flink is able
>>> to know which item is modified( if praise has 1 items now, when one
>>> item comes to the stream, only very small amount of data is write to sink)
>>>
>>> var praiseAggr = tableEnv.sqlQuery(*s"SELECT article_id,hll(uid) as PU 
>>> FROM praise group by article_id**”* )
>>>
>>> tableEnv.registerTable("finalTable", praiseAggr)
>>>
>>> tableEnv.sqlUpdate(s"insert into sinkTableName SELECT * from 
>>> finalTable")
>>>
>>>
>>> But if I use the following sql, by adding a dynamic timestamp
>>> field:
>>> var praiseAggr = tableEnv.sqlQuery(s"SELECT article_id,hll(uid) as 
>>> PU,LOCALTIMESTAMP
>>> as update_timestamp* FROM praise group by article_id**”* )
>>>   Is the whole table flush to the sink? Or only the incremental
>>> value will flush to the sink? Why?
>>>
>>> Thanks,
>>> Henry
>>>
>>>
>>
>>
>
>


Re: How does flink know which data is modified in dynamic table?

2018-08-20 Thread 徐涛
Hi Hequn,
Maybe I do not express clearly. I mean if only the update_timestamp of 
the increment data is updated, it is not enough. Because from the sql, it 
express the idea “all the time in the table is the same”, but actually each 
item in the table may be different. It is a bit weird.

Best, Henry



> 在 2018年8月21日,上午10:09,Hequn Cheng  写道:
> 
> Hi Henry,
> 
> If you upsert by key 'article_id', the result is correct, i.e, the result is 
> (a, 2, 2018-08-20 20:18:10.486). What do you think?
> 
> Best, Hequn
> 
> 
> 
> On Tue, Aug 21, 2018 at 9:44 AM, 徐涛  > wrote:
> Hi Hequn,
>   However is it semantically correct? because the sql result is not equal 
> to the bounded table.
>   
> 
>> 在 2018年8月20日,下午8:34,Hequn Cheng > > 写道:
>> 
>> Hi Henry,
>> 
>> Both sql output incrementally. 
>> 
>> However there are some problems if you use retract sink. You have to pay 
>> attention to the timestamp field since each time the value is different.  
>> For example, if the value is updated from 1 to 2, 
>> previous row:  add (a, 1, 2018-08-20 20:18:10.286)
>> retract row: delete (a, 1, 2018-08-20 20:18:10.386)
>> new row: add (a, 2, 2018-08-20 20:18:10.486)
>> The retract row is different from the previous row because of the time field.
>> 
>> Of course, this problem should be fixed later.
>> 
>> Best, Hequn
>> 
>> On Mon, Aug 20, 2018 at 6:43 PM, 徐涛 > > wrote:
>> Hi All,
>>  Like the following code,If I use retract stream, I think Flink is able 
>> to know which item is modified( if praise has 1 items now, when one item 
>> comes to the stream, only very small amount of data is write to sink) 
>>  var praiseAggr = tableEnv.sqlQuery(s"SELECT article_id,hll(uid) as PU 
>> FROM praise group by article_id” )
>> tableEnv.registerTable("finalTable", praiseAggr)
>>  tableEnv.sqlUpdate(s"insert into sinkTableName SELECT * from 
>> finalTable")
>> 
>> But if I use the following sql, by adding a dynamic timestamp field:
>>  var praiseAggr = tableEnv.sqlQuery(s"SELECT article_id,hll(uid) 
>> as PU,LOCALTIMESTAMP as update_timestamp FROM praise group by article_id” )
>>   Is the whole table flush to the sink? Or only the incremental value 
>> will flush to the sink? Why?
>> 
>> Thanks,
>> Henry
>> 
>> 
> 
> 



Re: How does flink know which data is modified in dynamic table?

2018-08-20 Thread Hequn Cheng
Hi Henry,

If you upsert by key 'article_id', the result is correct, i.e, the result
is (a, 2, 2018-08-20 20:18:10.486). What do you think?

Best, Hequn



On Tue, Aug 21, 2018 at 9:44 AM, 徐涛  wrote:

> Hi Hequn,
> However is it semantically correct? because the sql result is not equal to
> the bounded table.
>
>
> 在 2018年8月20日,下午8:34,Hequn Cheng  写道:
>
> Hi Henry,
>
> Both sql output incrementally.
>
> However there are some problems if you use retract sink. You have to pay
> attention to the timestamp field since each time the value is different.
> For example, if the value is updated from 1 to 2,
>
> previous row:  add (a, 1, 2018-08-20 20:18:10.286)
> retract row: delete (a, 1, 2018-08-20 20:18:10.386)
> new row: add (a, 2, 2018-08-20 20:18:10.486)
>
> The retract row is different from the previous row because of the time
> field.
>
> Of course, this problem should be fixed later.
>
> Best, Hequn
>
> On Mon, Aug 20, 2018 at 6:43 PM, 徐涛  wrote:
>
>> Hi All,
>> Like the following code,If I use retract stream, I think Flink is able to
>> know which item is modified( if praise has 1 items now, when one item
>> comes to the stream, only very small amount of data is write to sink)
>>
>>  var praiseAggr = tableEnv.sqlQuery(*s"SELECT article_id,hll(uid) as PU 
>> FROM praise group by article_id**”* )
>>
>> tableEnv.registerTable("finalTable", praiseAggr)
>>
>>  tableEnv.sqlUpdate(s"insert into sinkTableName SELECT * from 
>> finalTable")
>>
>>
>> But if I use the following sql, by adding a dynamic timestamp
>> field:
>> var praiseAggr = tableEnv.sqlQuery(s"SELECT article_id,hll(uid) as 
>> PU,LOCALTIMESTAMP
>> as update_timestamp* FROM praise group by article_id**”* )
>>   Is the whole table flush to the sink? Or only the incremental
>> value will flush to the sink? Why?
>>
>> Thanks,
>> Henry
>>
>>
>
>


Re: Flink checkpointing to Google Cloud Storage

2018-08-20 Thread vino yang
Hi Oleksandr,

>From the exception log, you seem to lack the relevant dependencies?
You can check again which dependency the related class belongs to.

Thanks, vino.

Oleksandr Serdiukov  于2018年8月21日周二 上午12:04写道:

> Hello All!
>
> I am trying to configure checkpoints for flink jobs in GCS.
> Now I am able to write checkpoints but cannot restore from it:
>
> java.lang.NoClassDefFoundError: 
> com/google/cloud/hadoop/gcsio/GoogleCloudStorageImpl$6
>   at 
> com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl.open(GoogleCloudStorageImpl.java:666)
>   at 
> com.google.cloud.hadoop.gcsio.GoogleCloudStorageFileSystem.open(GoogleCloudStorageFileSystem.java:323)
>   at 
> com.google.cloud.hadoop.fs.gcs.GoogleHadoopFSInputStream.(GoogleHadoopFSInputStream.java:136)
>   at 
> com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.open(GoogleHadoopFileSystemBase.java:1102)
>   at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:787)
>   at 
> org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.open(HadoopFileSystem.java:119)
>   at 
> org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.open(HadoopFileSystem.java:36)
>   at 
> org.apache.flink.core.fs.SafetyNetWrapperFileSystem.open(SafetyNetWrapperFileSystem.java:80)
>   at 
> org.apache.flink.runtime.state.filesystem.FileStateHandle.openInputStream(FileStateHandle.java:68)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalRestoreOperation.copyStateDataHandleData(RocksDBKeyedStateBackend.java:1005)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalRestoreOperation.transferAllDataFromStateHandles(RocksDBKeyedStateBackend.java:988)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalRestoreOperation.transferAllStateDataToDirectory(RocksDBKeyedStateBackend.java:974)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalRestoreOperation.restoreInstance(RocksDBKeyedStateBackend.java:758)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalRestoreOperation.restore(RocksDBKeyedStateBackend.java:732)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.restore(RocksDBKeyedStateBackend.java:443)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.restore(RocksDBKeyedStateBackend.java:149)
>   at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:151)
>   at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:123)
>   at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:276)
>   at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:132)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:227)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:730)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:295)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
>   at java.lang.Thread.run(Thread.java:748)
>
>
> My current setup:
>
> 
> com.google.cloud.bigdataoss
> gcs-connector
> hadoop2-1.9.5
> 
>
> Flink image: flink:1.5.2-hadoop28
>
> Thank you in advance!
>


Re: classloading strangeness with Avro in Flink

2018-08-20 Thread vino yang
Hi Cliff,

If so, you can explicitly exclude Avro's dependencies from related
dependencies (using ) and then directly introduce dependencies on
the Avro version you need.

Thanks, vino.

Cliff Resnick  于2018年8月21日周二 上午5:13写道:

> Hi Vino,
>
> Unfortunately, I'm still stuck here. By moving the avro dependency chain
> to lib (and removing it from user jar), my OCFs decode but I get the error
> described here:
>
> https://github.com/confluentinc/schema-registry/pull/509
>
> However, the Flink fix described in the PR above was to move the Avro
> dependency to the user jar. However, since I'm using YARN, I'm required to
> have flink-shaded-hadoop2-uber.jar loaded from lib -- and that has
> avro bundled un-shaded. So I'm back to the start problem...
>
> Any advice is welcome!
>
> -Cliff
>
>
> On Mon, Aug 20, 2018 at 1:42 PM Cliff Resnick  wrote:
>
>> Hi Vino,
>>
>> You were right in your assumption -- unshaded avro was being added to our
>> application jar via third-party dependency. Excluding it in packaging fixed
>> the issue. For the record, it looks flink-avro must be loaded from the lib
>> or there will be errors in checkpoint restores.
>>
>> On Mon, Aug 20, 2018 at 8:43 AM Cliff Resnick  wrote:
>>
>>> Hi Vino,
>>>
>>> Thanks for the explanation, but the job only ever uses the Avro (1.8.2)
>>> pulled in by flink-formats/avro, so it's not a class version conflict
>>> there.
>>>
>>> I'm using default child-first loading. It might be a further transitive
>>> dependency, though it's not clear by stack trace or stepping through the
>>> process. When I get a chance I'll look further into it but in case anyone
>>> is experiencing similar problems, what is clear is that classloader order
>>> does matter with Avro.
>>>
>>> On Sun, Aug 19, 2018, 11:36 PM vino yang  wrote:
>>>
 Hi Cliff,

 My personal guess is that this may be caused by Job's Avro conflict
 with the Avro that the Flink framework itself relies on.
 Flink has provided some configuration parameters which allows you to
 determine the order of the classloaders yourself. [1]
 Alternatively, you can debug classloading and participate in the
 documentation.[2]

 [1]:
 https://ci.apache.org/projects/flink/flink-docs-release-1.6/ops/config.html
 [2]:
 https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/debugging_classloading.html

 Thanks, vino.

 Cliff Resnick  于2018年8月20日周一 上午10:40写道:

> Our Flink/YARN pipeline has been reading Avro from Kafka for a while
> now. We just introduced a source of Avro OCF (Object Container Files) read
> from S3. The Kafka Avro continued to decode without incident, but the OCF
> files failed 100% with anomalous parse errors in the decoding phase after
> the schema and codec were successfully read from them. The pipeline would
> work on my laptop, and when I submitted a test Main program to the Flink
> Session in YARN, that would also successfully decode. Only the actual
> pipeline run from the TaskManager failed. At one point I even remote
> debugged the TaskManager process and stepped through what looked like a
> normal Avro decode (if you can describe Avro code as normal!) -- until it
> abruptly failed with an int decode or what-have-you.
>
> This stumped me for a while, but I finally tried moving flink-avro.jar
> from the lib to the application jar, and that fixed it. I'm not sure why
> this is, especially since there were no typical classloader-type errors.
> This issue was observed both on Flink 1.5 and 1.6 in Flip-6 mode.
>
> -Cliff
>
>
>
>
>
>


Re: How does flink know which data is modified in dynamic table?

2018-08-20 Thread 徐涛
Hi Hequn,
However is it semantically correct? because the sql result is not equal 
to the bounded table.


> 在 2018年8月20日,下午8:34,Hequn Cheng  写道:
> 
> Hi Henry,
> 
> Both sql output incrementally. 
> 
> However there are some problems if you use retract sink. You have to pay 
> attention to the timestamp field since each time the value is different.  
> For example, if the value is updated from 1 to 2, 
> previous row:  add (a, 1, 2018-08-20 20:18:10.286)
> retract row: delete (a, 1, 2018-08-20 20:18:10.386)
> new row: add (a, 2, 2018-08-20 20:18:10.486)
> The retract row is different from the previous row because of the time field.
> 
> Of course, this problem should be fixed later.
> 
> Best, Hequn
> 
> On Mon, Aug 20, 2018 at 6:43 PM, 徐涛  > wrote:
> Hi All,
>   Like the following code,If I use retract stream, I think Flink is able 
> to know which item is modified( if praise has 1 items now, when one item 
> comes to the stream, only very small amount of data is write to sink) 
>   var praiseAggr = tableEnv.sqlQuery(s"SELECT article_id,hll(uid) as PU 
> FROM praise group by article_id” )
> tableEnv.registerTable("finalTable", praiseAggr)
>   tableEnv.sqlUpdate(s"insert into sinkTableName SELECT * from 
> finalTable")
> 
> But if I use the following sql, by adding a dynamic timestamp field:
>   var praiseAggr = tableEnv.sqlQuery(s"SELECT article_id,hll(uid) 
> as PU,LOCALTIMESTAMP as update_timestamp FROM praise group by article_id” )
>   Is the whole table flush to the sink? Or only the incremental value 
> will flush to the sink? Why?
> 
> Thanks,
> Henry
> 
> 



RE: Access to kafka partition per record

2018-08-20 Thread John O
Found it. Using KeyedDeserializationSchema, I can get access to the partition, 
offset, key and value information.


From: John O
Sent: Monday, August 20, 2018 3:15 PM
To: user 
Subject: Access to kafka partition per record

I am consuming data from a kafka topic that has multiple partitions. I would 
like to keyby(record.partition). What would be the best way to get access to 
the partition information?


Jo


Access to kafka partition per record

2018-08-20 Thread John O
I am consuming data from a kafka topic that has multiple partitions. I would 
like to keyby(record.partition). What would be the best way to get access to 
the partition information?


Jo


Re: classloading strangeness with Avro in Flink

2018-08-20 Thread Cliff Resnick
Hi Vino,

Unfortunately, I'm still stuck here. By moving the avro dependency chain to
lib (and removing it from user jar), my OCFs decode but I get the error
described here:

https://github.com/confluentinc/schema-registry/pull/509

However, the Flink fix described in the PR above was to move the Avro
dependency to the user jar. However, since I'm using YARN, I'm required to
have flink-shaded-hadoop2-uber.jar loaded from lib -- and that has
avro bundled un-shaded. So I'm back to the start problem...

Any advice is welcome!

-Cliff


On Mon, Aug 20, 2018 at 1:42 PM Cliff Resnick  wrote:

> Hi Vino,
>
> You were right in your assumption -- unshaded avro was being added to our
> application jar via third-party dependency. Excluding it in packaging fixed
> the issue. For the record, it looks flink-avro must be loaded from the lib
> or there will be errors in checkpoint restores.
>
> On Mon, Aug 20, 2018 at 8:43 AM Cliff Resnick  wrote:
>
>> Hi Vino,
>>
>> Thanks for the explanation, but the job only ever uses the Avro (1.8.2)
>> pulled in by flink-formats/avro, so it's not a class version conflict
>> there.
>>
>> I'm using default child-first loading. It might be a further transitive
>> dependency, though it's not clear by stack trace or stepping through the
>> process. When I get a chance I'll look further into it but in case anyone
>> is experiencing similar problems, what is clear is that classloader order
>> does matter with Avro.
>>
>> On Sun, Aug 19, 2018, 11:36 PM vino yang  wrote:
>>
>>> Hi Cliff,
>>>
>>> My personal guess is that this may be caused by Job's Avro conflict with
>>> the Avro that the Flink framework itself relies on.
>>> Flink has provided some configuration parameters which allows you to
>>> determine the order of the classloaders yourself. [1]
>>> Alternatively, you can debug classloading and participate in the
>>> documentation.[2]
>>>
>>> [1]:
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.6/ops/config.html
>>> [2]:
>>> https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/debugging_classloading.html
>>>
>>> Thanks, vino.
>>>
>>> Cliff Resnick  于2018年8月20日周一 上午10:40写道:
>>>
 Our Flink/YARN pipeline has been reading Avro from Kafka for a while
 now. We just introduced a source of Avro OCF (Object Container Files) read
 from S3. The Kafka Avro continued to decode without incident, but the OCF
 files failed 100% with anomalous parse errors in the decoding phase after
 the schema and codec were successfully read from them. The pipeline would
 work on my laptop, and when I submitted a test Main program to the Flink
 Session in YARN, that would also successfully decode. Only the actual
 pipeline run from the TaskManager failed. At one point I even remote
 debugged the TaskManager process and stepped through what looked like a
 normal Avro decode (if you can describe Avro code as normal!) -- until it
 abruptly failed with an int decode or what-have-you.

 This stumped me for a while, but I finally tried moving flink-avro.jar
 from the lib to the application jar, and that fixed it. I'm not sure why
 this is, especially since there were no typical classloader-type errors.
 This issue was observed both on Flink 1.5 and 1.6 in Flip-6 mode.

 -Cliff








Re: classloading strangeness with Avro in Flink

2018-08-20 Thread Cliff Resnick
Hi Jamie,

No, it was nothing of the class not found variety, just parse errors. It
had to do with Avro getting mixed up with different versions.

-Cliff

On Mon, Aug 20, 2018 at 4:18 PM Jamie Grier  wrote:

> Hey Cliff, can you provide the stack trace of the issue you were seeing?
> We recently ran into a similar issue that we're still debugging.  Did it
> look like this:
>
> java.lang.IllegalStateException: Could not initialize operator state
>> backend.
>> at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initOperatorState(AbstractStreamOperator.java:301)
>> at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:248)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>> at java.lang.Thread.run(Thread.java:748)
>> Caused by: java.lang.UnsupportedOperationException: Could not find
>> required Avro dependency.
>> at
>> org.apache.flink.api.java.typeutils.runtime.kryo.Serializers$DummyAvroKryoSerializerClass.read(Serializers.java:170)
>> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>> at
>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:249)
>> at
>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:136)
>> at
>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
>> at
>> org.apache.flink.runtime.state.DefaultOperatorStateBackend.deserializeStateValues(DefaultOperatorStateBackend.java:584)
>> at
>> org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:399)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.createOperatorStateBackend(StreamTask.java:733)
>> at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initOperatorState(AbstractStreamOperator.java:299)
>> ... 6 common frames omitted
>> 00:17:19.626 INFO o.a.f.r.e.ExecutionGraph - Job
>> ClientEventToElasticsearchJob (5cec438674e9a111703c83897f7c8138) switched
>> from state RUNNING to FAILING.
>> java.lang.IllegalStateException: Could not initialize operator state
>> backend.
>> at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initOperatorState(AbstractStreamOperator.java:301)
>> at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:248)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>> at java.lang.Thread.run(Thread.java:748)
>> Caused by: java.lang.UnsupportedOperationException: Could not find
>> required Avro dependency.
>> at
>> org.apache.flink.api.java.typeutils.runtime.kryo.Serializers$DummyAvroKryoSerializerClass.read(Serializers.java:170)
>> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>> at
>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:249)
>> at
>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:136)
>> at
>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
>> at
>> org.apache.flink.runtime.state.DefaultOperatorStateBackend.deserializeStateValues(DefaultOperatorStateBackend.java:584)
>> at
>> org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:399)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.createOperatorStateBackend(StreamTask.java:733)
>> at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initOperatorState(AbstractStreamOperator.java:299)
>> ... 6 common frames omitted
>
>
> -Jamie
>
>
> On Mon, Aug 20, 2018 at 10:42 AM, Cliff Resnick  wrote:
>
>> Hi Vino,
>>
>> You were right in your assumption -- unshaded avro was being added to our
>> application jar via third-party dependency. Excluding it in packaging fixed
>> the issue. For the record, it looks flink-avro must be loaded from the lib
>> or there will be errors in checkpoint restores.
>>
>> On Mon, Aug 20, 2018 at 8:43 AM Cliff Resnick  wrote:
>>
>>> Hi Vino,
>>>
>>> Thanks for the explanation, but the job only ever uses the Avro (1.8.2)
>>> pulled in by flink-formats/avro, so it's not a class version conflict
>>> there.
>>>
>>> I'm using default child-first loading. It might be a further transitive
>>> dependency, though it's not clear by stack 

Re: classloading strangeness with Avro in Flink

2018-08-20 Thread Jamie Grier
Hey Cliff, can you provide the stack trace of the issue you were seeing?
We recently ran into a similar issue that we're still debugging.  Did it
look like this:

java.lang.IllegalStateException: Could not initialize operator state
> backend.
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initOperatorState(AbstractStreamOperator.java:301)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:248)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.UnsupportedOperationException: Could not find
> required Avro dependency.
> at
> org.apache.flink.api.java.typeutils.runtime.kryo.Serializers$DummyAvroKryoSerializerClass.read(Serializers.java:170)
> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
> at
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:249)
> at
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:136)
> at
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
> at
> org.apache.flink.runtime.state.DefaultOperatorStateBackend.deserializeStateValues(DefaultOperatorStateBackend.java:584)
> at
> org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:399)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.createOperatorStateBackend(StreamTask.java:733)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initOperatorState(AbstractStreamOperator.java:299)
> ... 6 common frames omitted
> 00:17:19.626 INFO o.a.f.r.e.ExecutionGraph - Job
> ClientEventToElasticsearchJob (5cec438674e9a111703c83897f7c8138) switched
> from state RUNNING to FAILING.
> java.lang.IllegalStateException: Could not initialize operator state
> backend.
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initOperatorState(AbstractStreamOperator.java:301)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:248)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.UnsupportedOperationException: Could not find
> required Avro dependency.
> at
> org.apache.flink.api.java.typeutils.runtime.kryo.Serializers$DummyAvroKryoSerializerClass.read(Serializers.java:170)
> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
> at
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:249)
> at
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:136)
> at
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
> at
> org.apache.flink.runtime.state.DefaultOperatorStateBackend.deserializeStateValues(DefaultOperatorStateBackend.java:584)
> at
> org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:399)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.createOperatorStateBackend(StreamTask.java:733)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initOperatorState(AbstractStreamOperator.java:299)
> ... 6 common frames omitted


-Jamie


On Mon, Aug 20, 2018 at 10:42 AM, Cliff Resnick  wrote:

> Hi Vino,
>
> You were right in your assumption -- unshaded avro was being added to our
> application jar via third-party dependency. Excluding it in packaging fixed
> the issue. For the record, it looks flink-avro must be loaded from the lib
> or there will be errors in checkpoint restores.
>
> On Mon, Aug 20, 2018 at 8:43 AM Cliff Resnick  wrote:
>
>> Hi Vino,
>>
>> Thanks for the explanation, but the job only ever uses the Avro (1.8.2)
>> pulled in by flink-formats/avro, so it's not a class version conflict
>> there.
>>
>> I'm using default child-first loading. It might be a further transitive
>> dependency, though it's not clear by stack trace or stepping through the
>> process. When I get a chance I'll look further into it but in case anyone
>> is experiencing similar problems, what is clear is that classloader order
>> does matter with Avro.
>>
>> On Sun, Aug 19, 2018, 11:36 PM vino yang  wrote:
>>
>>> Hi Cliff,
>>>
>>> My personal guess is 

Re: classloading strangeness with Avro in Flink

2018-08-20 Thread Cliff Resnick
Hi Vino,

You were right in your assumption -- unshaded avro was being added to our
application jar via third-party dependency. Excluding it in packaging fixed
the issue. For the record, it looks flink-avro must be loaded from the lib
or there will be errors in checkpoint restores.

On Mon, Aug 20, 2018 at 8:43 AM Cliff Resnick  wrote:

> Hi Vino,
>
> Thanks for the explanation, but the job only ever uses the Avro (1.8.2)
> pulled in by flink-formats/avro, so it's not a class version conflict
> there.
>
> I'm using default child-first loading. It might be a further transitive
> dependency, though it's not clear by stack trace or stepping through the
> process. When I get a chance I'll look further into it but in case anyone
> is experiencing similar problems, what is clear is that classloader order
> does matter with Avro.
>
> On Sun, Aug 19, 2018, 11:36 PM vino yang  wrote:
>
>> Hi Cliff,
>>
>> My personal guess is that this may be caused by Job's Avro conflict with
>> the Avro that the Flink framework itself relies on.
>> Flink has provided some configuration parameters which allows you to
>> determine the order of the classloaders yourself. [1]
>> Alternatively, you can debug classloading and participate in the
>> documentation.[2]
>>
>> [1]:
>> https://ci.apache.org/projects/flink/flink-docs-release-1.6/ops/config.html
>> [2]:
>> https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/debugging_classloading.html
>>
>> Thanks, vino.
>>
>> Cliff Resnick  于2018年8月20日周一 上午10:40写道:
>>
>>> Our Flink/YARN pipeline has been reading Avro from Kafka for a while
>>> now. We just introduced a source of Avro OCF (Object Container Files) read
>>> from S3. The Kafka Avro continued to decode without incident, but the OCF
>>> files failed 100% with anomalous parse errors in the decoding phase after
>>> the schema and codec were successfully read from them. The pipeline would
>>> work on my laptop, and when I submitted a test Main program to the Flink
>>> Session in YARN, that would also successfully decode. Only the actual
>>> pipeline run from the TaskManager failed. At one point I even remote
>>> debugged the TaskManager process and stepped through what looked like a
>>> normal Avro decode (if you can describe Avro code as normal!) -- until it
>>> abruptly failed with an int decode or what-have-you.
>>>
>>> This stumped me for a while, but I finally tried moving flink-avro.jar
>>> from the lib to the application jar, and that fixed it. I'm not sure why
>>> this is, especially since there were no typical classloader-type errors.
>>> This issue was observed both on Flink 1.5 and 1.6 in Flip-6 mode.
>>>
>>> -Cliff
>>>
>>>
>>>
>>>
>>>
>>>


Re: Override CaseClassSerializer with custom serializer

2018-08-20 Thread Gerard Garcia
Hi Timo,

I see. Yes, we have already use the "Object Reuse" option. It was a nice
performance improvement when we first set it!

I guess another option we can try is to somehow make things "easier" to
Flink so it can chain operators together. Most of them are not chained, I
think it's because they have a control stream as source together with the
main stream. I'll need to check that and see if we can re-architecture them.

Thanks,

Gerard

On Fri, Aug 17, 2018 at 11:21 PM Timo Walther  wrote:

> Hi Gerard,
>
> you are correct, Kryo serializers are only used when no built-in Flink
> serializer is available.
>
> Actually, the tuple and case class serializers are one of the most
> performant serializers in Flink (due to their fixed length, no null
> support). If you really want to reduce the serialization overhead you
> could look into the object reuse mode. We had this topic on the mailing
> list recently, I will just copy it here:
>
> If you want to improve the performance of a collect() between operators,
> you could also enable object reuse. You can read more about this here
> [1] (section "Issue 2: Object Reuse"), but make sure your implementation
> is correct because an operator could modify the objects of follwing
> operators.
>
> I hope this helps.
>
> Regards,
> Timo
>
> [1]
>
> https://data-artisans.com/blog/curious-case-broken-benchmark-revisiting-apache-flink-vs-databricks-runtime
>
> Am 17.08.18 um 17:29 schrieb gerardg:
> > Hello,
> >
> > I can't seem to be able to override the CaseClassSerializer with my
> custom
> > serializer. I'm using env.getConfig.addDefaultKryoSerializer() to add the
> > custom serializer but I don't see it being used. I guess it is because it
> > only uses Kryo based serializers if it can't find a Flink serializer?
> >
> > Is then worth it to replace the CaseClassSerializer with a custom
> > serializer? (when I profile the CaseClassSerializer.(de)serialize method
> > appears as the most used so I wanted to give it a try) If so, how can I
> do
> > it?
> >
> > Thanks,
> >
> > Gerard
> >
> >
> >
> > --
> > Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>
>
>


Flink checkpointing to Google Cloud Storage

2018-08-20 Thread Oleksandr Serdiukov
Hello All!

I am trying to configure checkpoints for flink jobs in GCS. 
Now I am able to write checkpoints but cannot restore from it:

java.lang.NoClassDefFoundError: 
com/google/cloud/hadoop/gcsio/GoogleCloudStorageImpl$6
at 
com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl.open(GoogleCloudStorageImpl.java:666)
at 
com.google.cloud.hadoop.gcsio.GoogleCloudStorageFileSystem.open(GoogleCloudStorageFileSystem.java:323)
at 
com.google.cloud.hadoop.fs.gcs.GoogleHadoopFSInputStream.(GoogleHadoopFSInputStream.java:136)
at 
com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.open(GoogleHadoopFileSystemBase.java:1102)
at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:787)
at 
org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.open(HadoopFileSystem.java:119)
at 
org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.open(HadoopFileSystem.java:36)
at 
org.apache.flink.core.fs.SafetyNetWrapperFileSystem.open(SafetyNetWrapperFileSystem.java:80)
at 
org.apache.flink.runtime.state.filesystem.FileStateHandle.openInputStream(FileStateHandle.java:68)
at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalRestoreOperation.copyStateDataHandleData(RocksDBKeyedStateBackend.java:1005)
at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalRestoreOperation.transferAllDataFromStateHandles(RocksDBKeyedStateBackend.java:988)
at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalRestoreOperation.transferAllStateDataToDirectory(RocksDBKeyedStateBackend.java:974)
at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalRestoreOperation.restoreInstance(RocksDBKeyedStateBackend.java:758)
at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalRestoreOperation.restore(RocksDBKeyedStateBackend.java:732)
at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.restore(RocksDBKeyedStateBackend.java:443)
at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.restore(RocksDBKeyedStateBackend.java:149)
at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:151)
at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:123)
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:276)
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:132)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:227)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:730)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:295)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
at java.lang.Thread.run(Thread.java:748)

My current setup:


com.google.cloud.bigdataoss
gcs-connector
hadoop2-1.9.5


Flink image: flink:1.5.2-hadoop28

Thank you in advance!

Re: Cluster die when one of the TM killed

2018-08-20 Thread Lasse Nedergaard
Hi. 
We have seen the same behaviour on Yarn. It turned out that the default 
settings for was not optimal. 
yarn.maximum-failed-containers: The maximum number of failed containers the 
ApplicationMaster accepts until it fails the YARN session. Default: The number 
of initially requested TaskManagers (-n).
So try to lookup the configuration for your system. 
Next step is to investigate why the task manager is killed. 


Med venlig hilsen / Best regards
Lasse Nedergaard


> Den 20. aug. 2018 kl. 16.34 skrev Dominik Wosiński :
> 
> Hey, 
> Can You please provide a little more information about your setup and maybe 
> logs showing when the crash occurs? 
> Best Regards,
> Dominik
> 
> 2018-08-20 16:23 GMT+02:00 Siew Wai Yow :
>> Hi,
>> 
>> When one of the task manager is killed, the whole cluster die, is this 
>> something expected? We are using Flink 1.4. Thank you.
>> 
>> Regards,
>> Yow
> 


Re: Cluster die when one of the TM killed

2018-08-20 Thread Dominik Wosiński
Hey,
Can You please provide a little more information about your setup and maybe
logs showing when the crash occurs?
Best Regards,
Dominik

2018-08-20 16:23 GMT+02:00 Siew Wai Yow :

> Hi,
>
>
> When one of the task manager is killed, the whole cluster die, is this
> something expected? We are using Flink 1.4. Thank you.
>
>
> Regards,
>
> Yow
>


Cluster die when one of the TM killed

2018-08-20 Thread Siew Wai Yow
Hi,


When one of the task manager is killed, the whole cluster die, is this 
something expected? We are using Flink 1.4. Thank you.


Regards,

Yow


Re: classloading strangeness with Avro in Flink

2018-08-20 Thread Cliff Resnick
Hi Vino,

Thanks for the explanation, but the job only ever uses the Avro (1.8.2)
pulled in by flink-formats/avro, so it's not a class version conflict
there.

I'm using default child-first loading. It might be a further transitive
dependency, though it's not clear by stack trace or stepping through the
process. When I get a chance I'll look further into it but in case anyone
is experiencing similar problems, what is clear is that classloader order
does matter with Avro.

On Sun, Aug 19, 2018, 11:36 PM vino yang  wrote:

> Hi Cliff,
>
> My personal guess is that this may be caused by Job's Avro conflict with
> the Avro that the Flink framework itself relies on.
> Flink has provided some configuration parameters which allows you to
> determine the order of the classloaders yourself. [1]
> Alternatively, you can debug classloading and participate in the
> documentation.[2]
>
> [1]:
> https://ci.apache.org/projects/flink/flink-docs-release-1.6/ops/config.html
> [2]:
> https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/debugging_classloading.html
>
> Thanks, vino.
>
> Cliff Resnick  于2018年8月20日周一 上午10:40写道:
>
>> Our Flink/YARN pipeline has been reading Avro from Kafka for a while now.
>> We just introduced a source of Avro OCF (Object Container Files) read from
>> S3. The Kafka Avro continued to decode without incident, but the OCF files
>> failed 100% with anomalous parse errors in the decoding phase after the
>> schema and codec were successfully read from them. The pipeline would work
>> on my laptop, and when I submitted a test Main program to the Flink Session
>> in YARN, that would also successfully decode. Only the actual pipeline run
>> from the TaskManager failed. At one point I even remote debugged the
>> TaskManager process and stepped through what looked like a normal Avro
>> decode (if you can describe Avro code as normal!) -- until it abruptly
>> failed with an int decode or what-have-you.
>>
>> This stumped me for a while, but I finally tried moving flink-avro.jar
>> from the lib to the application jar, and that fixed it. I'm not sure why
>> this is, especially since there were no typical classloader-type errors.
>> This issue was observed both on Flink 1.5 and 1.6 in Flip-6 mode.
>>
>> -Cliff
>>
>>
>>
>>
>>
>>


Re: How does flink know which data is modified in dynamic table?

2018-08-20 Thread Hequn Cheng
Hi Henry,

Both sql output incrementally.

However there are some problems if you use retract sink. You have to pay
attention to the timestamp field since each time the value is different.
For example, if the value is updated from 1 to 2,

previous row:  add (a, 1, 2018-08-20 20:18:10.286)
retract row: delete (a, 1, 2018-08-20 20:18:10.386)
new row: add (a, 2, 2018-08-20 20:18:10.486)

The retract row is different from the previous row because of the time
field.

Of course, this problem should be fixed later.

Best, Hequn

On Mon, Aug 20, 2018 at 6:43 PM, 徐涛  wrote:

> Hi All,
> Like the following code,If I use retract stream, I think Flink is able to
> know which item is modified( if praise has 1 items now, when one item
> comes to the stream, only very small amount of data is write to sink)
>
>   var praiseAggr = tableEnv.sqlQuery(*s"SELECT article_id,hll(uid) as PU 
> FROM praise group by article_id**”* )
>
> tableEnv.registerTable("finalTable", praiseAggr)
>
>   tableEnv.sqlUpdate(s"insert into sinkTableName SELECT * from 
> finalTable")
>
>
> But if I use the following sql, by adding a dynamic timestamp
> field:
> var praiseAggr = tableEnv.sqlQuery(s"SELECT article_id,hll(uid) as 
> PU,LOCALTIMESTAMP
> as update_timestamp* FROM praise group by article_id**”* )
>   Is the whole table flush to the sink? Or only the incremental value
> will flush to the sink? Why?
>
> Thanks,
> Henry
>
>


Re: Flink Jobmanager Failover in HA mode

2018-08-20 Thread Helmut Zechmann
Hi Dominik,

all jobs on the cluster (batch only jobs without state) where in status
FINISHED.

Best,

Helmut

On Fri, Aug 17, 2018 at 8:04 PM Dominik Wosiński  wrote:

> I have faced this issue, but in 1.4.0 IIRC. This seems to be related to
> https://issues.apache.org/jira/browse/FLINK-10011. What was the status of
> the jobs when the main Job Manager has been stopped ?
>
> 2018-08-17 17:08 GMT+02:00 Helmut Zechmann :
>
>> Hi all,
>>
>> we have a problem with flink 1.5.2 high availability in standalone mode.
>>
>> We have two jobmanagers running. When I shut down the main job manager,
>> the failover job manager encounters an error during failover.
>>
>> Logs:
>>
>>
>> 2018-08-17 14:38:16,478 WARN  akka.remote.ReliableDeliverySupervisor
>>   - Association with remote system [akka.tcp://
>> fl...@seg-1.adjust.com:29095] has failed, address is now gated for [50]
>> ms. Reason: [Disassociated]
>> 2018-08-17 14:38:31,449 WARN  akka.remote.transport.netty.NettyTransport
>>   - Remote connection to [null] failed with
>> java.net.ConnectException: Connection refused:
>> seg-1.adjust.com/178.162.219.66:29095
>> 2018-08-17 
>> 14:38:31,451 WARN  akka.remote.ReliableDeliverySupervisor
>>   - Association with remote system [akka.tcp://
>> fl...@seg-1.adjust.com:29095] has failed, address is now gated for [50]
>> ms. Reason: [Association failed with [akka.tcp://
>> fl...@seg-1.adjust.com:29095]] Caused by: [Connection refused:
>> seg-1.adjust.com/178.162.219.66:29095]
>> 2018-08-17 14:38:41,379 ERROR
>> org.apache.flink.runtime.rest.handler.legacy.files.StaticFileServerHandler
>> - Could not retrieve the redirect address.
>> java.util.concurrent.CompletionException:
>> akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka.tcp://
>> fl...@seg-1.adjust.com:29095/user/dispatcher#-1599908403]] after [1
>> ms]. Sender[null] sent message of type
>> "org.apache.flink.runtime.rpc.messages.RemoteFencedMessage".
>> at
>> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
>> [... shortened ...]
>> Caused by: akka.pattern.AskTimeoutException: Ask timed out on
>> [Actor[akka.tcp://
>> fl...@seg-1.adjust.com:29095/user/dispatcher#-1599908403]] after [1
>> ms]. Sender[null] sent message of type
>> "org.apache.flink.runtime.rpc.messages.RemoteFencedMessage".
>> at
>> akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:604)
>> ... 9 more
>> 2018-08-17 14:38:48,005 INFO
>> org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint-
>> http://seg-2.adjust.com:8083 was granted leadership with
>> leaderSessionID=708d1a64-c353-448b-9101-7eb3f910970e
>> 2018-08-17 14:38:48,005 INFO
>> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  -
>> ResourceManager akka.tcp://
>> fl...@seg-2.adjust.com:30169/user/resourcemanager was granted leadership
>> with fencing token 8de829de14876a367a80d37194b944ee
>> 2018-08-17 14:38:48,006 INFO
>> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager  -
>> Starting the SlotManager.
>> 2018-08-17 14:38:48,007 INFO
>> org.apache.flink.runtime.dispatcher.StandaloneDispatcher  - Dispatcher
>> akka.tcp://fl...@seg-2.adjust.com:30169/user/dispatcher was granted
>> leadership with fencing token 684f50f8-327c-47e1-a53c-931c4f4ea3e5
>> 2018-08-17 14:38:48,007 INFO
>> org.apache.flink.runtime.dispatcher.StandaloneDispatcher  - Recovering
>> all persisted jobs.
>> 2018-08-17 14:38:48,021 INFO
>> org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore  -
>> Recovered SubmittedJobGraph(b951bbf518bcf6cc031be6d2ccc441bb, null).
>> 2018-08-17 14:38:48,028 INFO
>> org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore  -
>> Recovered SubmittedJobGraph(06ed64f48fa0a7cffde53b99cbaa073f, null).
>> 2018-08-17 14:38:48,035 ERROR
>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Fatal error
>> occurred in the cluster entrypoint.
>> java.lang.RuntimeException:
>> org.apache.flink.runtime.client.JobExecutionException: Could not set up
>> JobManager
>> at
>> org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:199)
>> [... shortened ...]
>> Caused by: org.apache.flink.runtime.client.JobExecutionException: Could
>> not set up JobManager
>> at
>> org.apache.flink.runtime.jobmaster.JobManagerRunner.(JobManagerRunner.java:176)
>> at
>> org.apache.flink.runtime.dispatcher.Dispatcher$DefaultJobManagerRunnerFactory.createJobManagerRunner(Dispatcher.java:936)
>> at
>> org.apache.flink.runtime.dispatcher.Dispatcher.createJobManagerRunner(Dispatcher.java:291)
>> at
>> org.apache.flink.runtime.dispatcher.Dispatcher.runJob(Dispatcher.java:281)
>> at
>> org.apache.flink.util.function.ConsumerWithException.accept(ConsumerWithException.java:38)
>> ... 21 more
>> Caused by: java.lang.Exception: Cannot 

How does flink know which data is modified in dynamic table?

2018-08-20 Thread 徐涛
Hi All,
Like the following code,If I use retract stream, I think Flink is able 
to know which item is modified( if praise has 1 items now, when one item 
comes to the stream, only very small amount of data is write to sink) 
var praiseAggr = tableEnv.sqlQuery(s"SELECT article_id,hll(uid) as PU 
FROM praise group by article_id” )
tableEnv.registerTable("finalTable", praiseAggr)
tableEnv.sqlUpdate(s"insert into sinkTableName SELECT * from 
finalTable")

But if I use the following sql, by adding a dynamic timestamp field:
var praiseAggr = tableEnv.sqlQuery(s"SELECT article_id,hll(uid) 
as PU,LOCALTIMESTAMP as update_timestamp FROM praise group by article_id” )
  Is the whole table flush to the sink? Or only the incremental value will 
flush to the sink? Why?

Thanks,
Henry



How does flink know which data is modified in dynamic table?

2018-08-20 Thread 徐涛
Hi All,	Like the following code,If I use retract stream, I think Flink is able to know which item is modified( if praise has 1 items now, when one item comes to the stream, only very small amount of data is write to sink) 	var praiseAggr = tableEnv.sqlQuery(s"SELECT article_id,hll(uid) as PU FROM praise group by article_id” )tableEnv.registerTable("finalTable", praiseAggr)	tableEnv.sqlUpdate(s"insert into sinkTableName SELECT * from finalTable")        But if I use the following sql, by adding a dynamic timestamp field:		var praiseAggr = tableEnv.sqlQuery(s"SELECT article_id,hll(uid) as PU,LOCALTIMESTAMP as update_timestamp FROM praise group by article_id” )      Is the whole table flush to the sink? Or only the incremental value will flush to the sink? Why?Thanks,Henry

How does flink know which data is modified in dynamic table?

2018-08-20 Thread 徐涛
Hi All,
Like the following code,If I use retract stream, I think Flink is able 
to know which item is modified( if praise has 1 items now, when one item 
comes to the stream, only very small amount of data is write to sink) 
var praiseAggr = tableEnv.sqlQuery(s"SELECT article_id,hll(uid) as PU 
FROM praise group by article_id” )
tableEnv.registerTable("finalTable", praiseAggr)
tableEnv.sqlUpdate(s"insert into sinkTableName SELECT * from 
finalTable")

But if I use the following sql, by adding a dynamic timestamp field:
var praiseAggr = tableEnv.sqlQuery(s"SELECT article_id,hll(uid) 
as PU,LOCALTIMESTAMP as update_timestamp FROM praise group by article_id” )
  Is the whole table flush to the sink? Or only the incremental value will 
flush to the sink? Why?

Thanks a lot,
Henry



Re: processWindowFunction

2018-08-20 Thread antonio saldivar
Maybe the usage of that function change, now I have to use it as this [1]


[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html#the-keyedprocessfunction

El lun., 20 ago. 2018 a las 5:56, vino yang ()
escribió:

> Hi antonio,
>
> Oh, if you can't use KeyedProcessFunction, then this would be a pity.
> Then you can use MapState, where Key is used to store the key of your
> partition.
> But I am not sure if this will achieve the effect you want.
>
> Thanks, vino.
>
> antonio saldivar  于2018年8月20日周一 下午4:32写道:
>
>> Hello
>>
>> Thank you for the information, for some reason this KeyedProcessFunction
>> is not found in my Flink version 1.4.2 I can only find ProcessFunction and
>> work like this
>>
>> public class TxnProcessFn extends ProcessFunction {
>>
>>  public void open(Configuration parameters) throws Exception {
>>
>> state1 = getRuntimeContext().getState(new ValueStateDescriptor<>(
>> "objState1", Object.class));
>>
>> state2 = getRuntimeContext().getState(new ValueStateDescriptor<>(
>> "objState2", Object.class));
>>
>> state3 = getRuntimeContext().getState(new ValueStateDescriptor<>(
>> "objState3", Object.class));
>>
>> }
>>
>> @Override
>>
>> public void processElement(
>>
>> Object obj,
>>
>> Context ctx,
>>
>> Collector out) throws Exception {
>>
>> // TODO Auto-generated method stub
>>
>> Object current = state.value();
>>
>> if (current == null) {
>>
>> current = new Object();
>>
>> current.id=obj.id();
>>
>>
>>
>> }
>>
>> }
>>
>> El lun., 20 ago. 2018 a las 2:24, vino yang ()
>> escribió:
>>
>>> Hi antonio,
>>>
>>> First, I suggest you use KeyedProcessFunction if you have an operation
>>> similar to keyBy.
>>> The implementation is similar to the Fixed window.
>>> You can create three state collections to determine whether the time of
>>> each element belongs to a state collection.
>>> At the time of the trigger, the elements in the collection are evaluated.
>>>
>>> Thanks, vino.
>>>
>>> antonio saldivar  于2018年8月20日周一 上午11:54写道:
>>>
 Thank you fro the references

 I have now my processFunction and getting the state but now how can i
 do for the threshold times to group the elements and also as this is a
 global window, how to purge because if going to keep increasing

 El dom., 19 ago. 2018 a las 8:57, vino yang ()
 escribió:

> Hi antonio,
>
> Regarding your scenario, I think maybe you can consider using the
> ProcessFunction (or keyed ProcessFunction) function directly on the 
> Stream.
> [1]
> It can handle each of your elements with a Timer, and you can combine
> Flink's state API[2] to store your data.
>
> [1]:
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html#process-function-low-level-operations
> [2]:
> https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/state/state.html#working-with-state
>
> Thanks, vino.
>
> antonio saldivar  于2018年8月19日周日 上午10:18写道:
>
>> hi Vino
>>
>> it is possible to use global window, then set the trigger onElement
>> comparing the element that has arrived with for example 10 mins, 20 mins
>> and 60 mins of data?
>>
>> I have rules evaluating sum of amount for 10,20 or 60 mins for the
>> same keyed element if the same id sum like $200 total within those
>> thresholds and count more or equals to 3 I need to be able to set some
>> values to the object if the object does not reach those thresholds i do 
>> not
>> set the values and keep sending the output with or without those value.
>>
>> just processing the object on the fly and send output
>>
>>
>>
>>
>>
>>
>>
>> El vie., 17 ago. 2018 a las 22:14, vino yang ()
>> escribió:
>>
>>> Hi antonio,
>>>
>>> Yes, ProcessWindowFunction is a very low level window function.
>>> It allows you to access the data in the window and allows you to
>>> customize the output of the window.
>>> So if you use it, while giving you flexibility, you need to think
>>> about other things, which may require you to write more processing 
>>> logic.
>>>
>>> Generally speaking, sliding windows usually have some data that is
>>> repeated, but a common mode is to apply a reduce function on it to get 
>>> your
>>> calculation results.
>>> If you only send data, there will definitely be some duplication.
>>>
>>> Thanks, vino.
>>>
>>> antonio saldivar  于2018年8月17日周五 下午12:01写道:
>>>
 Hi Vino
 thank you for the information, actually I am using a trigger alert
 and processWindowFunction to send my results, but when my window 
 slides or
 ends it sends again the objects and I an getting duplicated data

 El jue., 16 ago. 2018 a las 22:05, vino yang (<
 

Re: processWindowFunction

2018-08-20 Thread vino yang
Hi antonio,

Oh, if you can't use KeyedProcessFunction, then this would be a pity.
Then you can use MapState, where Key is used to store the key of your
partition.
But I am not sure if this will achieve the effect you want.

Thanks, vino.

antonio saldivar  于2018年8月20日周一 下午4:32写道:

> Hello
>
> Thank you for the information, for some reason this KeyedProcessFunction
> is not found in my Flink version 1.4.2 I can only find ProcessFunction and
> work like this
>
> public class TxnProcessFn extends ProcessFunction {
>
>  public void open(Configuration parameters) throws Exception {
>
> state1 = getRuntimeContext().getState(new ValueStateDescriptor<>(
> "objState1", Object.class));
>
> state2 = getRuntimeContext().getState(new ValueStateDescriptor<>(
> "objState2", Object.class));
>
> state3 = getRuntimeContext().getState(new ValueStateDescriptor<>(
> "objState3", Object.class));
>
> }
>
> @Override
>
> public void processElement(
>
> Object obj,
>
> Context ctx,
>
> Collector out) throws Exception {
>
> // TODO Auto-generated method stub
>
> Object current = state.value();
>
> if (current == null) {
>
> current = new Object();
>
> current.id=obj.id();
>
>
>
> }
>
> }
>
> El lun., 20 ago. 2018 a las 2:24, vino yang ()
> escribió:
>
>> Hi antonio,
>>
>> First, I suggest you use KeyedProcessFunction if you have an operation
>> similar to keyBy.
>> The implementation is similar to the Fixed window.
>> You can create three state collections to determine whether the time of
>> each element belongs to a state collection.
>> At the time of the trigger, the elements in the collection are evaluated.
>>
>> Thanks, vino.
>>
>> antonio saldivar  于2018年8月20日周一 上午11:54写道:
>>
>>> Thank you fro the references
>>>
>>> I have now my processFunction and getting the state but now how can i do
>>> for the threshold times to group the elements and also as this is a global
>>> window, how to purge because if going to keep increasing
>>>
>>> El dom., 19 ago. 2018 a las 8:57, vino yang ()
>>> escribió:
>>>
 Hi antonio,

 Regarding your scenario, I think maybe you can consider using the
 ProcessFunction (or keyed ProcessFunction) function directly on the Stream.
 [1]
 It can handle each of your elements with a Timer, and you can combine
 Flink's state API[2] to store your data.

 [1]:
 https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html#process-function-low-level-operations
 [2]:
 https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/state/state.html#working-with-state

 Thanks, vino.

 antonio saldivar  于2018年8月19日周日 上午10:18写道:

> hi Vino
>
> it is possible to use global window, then set the trigger onElement
> comparing the element that has arrived with for example 10 mins, 20 mins
> and 60 mins of data?
>
> I have rules evaluating sum of amount for 10,20 or 60 mins for the
> same keyed element if the same id sum like $200 total within those
> thresholds and count more or equals to 3 I need to be able to set some
> values to the object if the object does not reach those thresholds i do 
> not
> set the values and keep sending the output with or without those value.
>
> just processing the object on the fly and send output
>
>
>
>
>
>
>
> El vie., 17 ago. 2018 a las 22:14, vino yang ()
> escribió:
>
>> Hi antonio,
>>
>> Yes, ProcessWindowFunction is a very low level window function.
>> It allows you to access the data in the window and allows you to
>> customize the output of the window.
>> So if you use it, while giving you flexibility, you need to think
>> about other things, which may require you to write more processing logic.
>>
>> Generally speaking, sliding windows usually have some data that is
>> repeated, but a common mode is to apply a reduce function on it to get 
>> your
>> calculation results.
>> If you only send data, there will definitely be some duplication.
>>
>> Thanks, vino.
>>
>> antonio saldivar  于2018年8月17日周五 下午12:01写道:
>>
>>> Hi Vino
>>> thank you for the information, actually I am using a trigger alert
>>> and processWindowFunction to send my results, but when my window slides 
>>> or
>>> ends it sends again the objects and I an getting duplicated data
>>>
>>> El jue., 16 ago. 2018 a las 22:05, vino yang ()
>>> escribió:
>>>
 Hi Antonio,

 What results do not you want to get when creating each window?
 Examples of the use of ProcessWindowFunction are included in many
 test files in Flink's project, such as SideOutputITCase.scala or
 WindowTranslationTest.scala.

 For more information on ProcessWindowFunction, you can refer to the
 official website.[1]

 [1]:

Re: Flink socketTextStream UDP connection

2018-08-20 Thread Soheil Pourbafrani
Thank you for the information.

On Mon, Aug 13, 2018 at 1:51 PM Fabian Hueske  wrote:

> Hi,
>
> ExecutionEnvironment.socketTextStream is deprecated and it is very likely
> that it will be removed because of its limited use.
> I would recommend to have at the implementation of the SourceFunction [1]
> and adapt it to your needs.
>
> Best, Fabian
>
> [1]
> https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java
>
> 2018-08-13 10:04 GMT+02:00 Soheil Pourbafrani :
>
>> Flink socketTextStream received data using the TCP protocol. Is there any
>> way to get data using the UDP protocol?
>>
>
>


Re: processWindowFunction

2018-08-20 Thread antonio saldivar
Hello

Thank you for the information, for some reason this KeyedProcessFunction is
not found in my Flink version 1.4.2 I can only find ProcessFunction and
work like this

public class TxnProcessFn extends ProcessFunction {

 public void open(Configuration parameters) throws Exception {

state1 = getRuntimeContext().getState(new ValueStateDescriptor<>("objState1",
Object.class));

state2 = getRuntimeContext().getState(new ValueStateDescriptor<>("objState2",
Object.class));

state3 = getRuntimeContext().getState(new ValueStateDescriptor<>("objState3",
Object.class));

}

@Override

public void processElement(

Object obj,

Context ctx,

Collector out) throws Exception {

// TODO Auto-generated method stub

Object current = state.value();

if (current == null) {

current = new Object();

current.id=obj.id();



}

}

El lun., 20 ago. 2018 a las 2:24, vino yang ()
escribió:

> Hi antonio,
>
> First, I suggest you use KeyedProcessFunction if you have an operation
> similar to keyBy.
> The implementation is similar to the Fixed window.
> You can create three state collections to determine whether the time of
> each element belongs to a state collection.
> At the time of the trigger, the elements in the collection are evaluated.
>
> Thanks, vino.
>
> antonio saldivar  于2018年8月20日周一 上午11:54写道:
>
>> Thank you fro the references
>>
>> I have now my processFunction and getting the state but now how can i do
>> for the threshold times to group the elements and also as this is a global
>> window, how to purge because if going to keep increasing
>>
>> El dom., 19 ago. 2018 a las 8:57, vino yang ()
>> escribió:
>>
>>> Hi antonio,
>>>
>>> Regarding your scenario, I think maybe you can consider using the
>>> ProcessFunction (or keyed ProcessFunction) function directly on the Stream.
>>> [1]
>>> It can handle each of your elements with a Timer, and you can combine
>>> Flink's state API[2] to store your data.
>>>
>>> [1]:
>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html#process-function-low-level-operations
>>> [2]:
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/state/state.html#working-with-state
>>>
>>> Thanks, vino.
>>>
>>> antonio saldivar  于2018年8月19日周日 上午10:18写道:
>>>
 hi Vino

 it is possible to use global window, then set the trigger onElement
 comparing the element that has arrived with for example 10 mins, 20 mins
 and 60 mins of data?

 I have rules evaluating sum of amount for 10,20 or 60 mins for the same
 keyed element if the same id sum like $200 total within those thresholds
 and count more or equals to 3 I need to be able to set some values to the
 object if the object does not reach those thresholds i do not set the
 values and keep sending the output with or without those value.

 just processing the object on the fly and send output







 El vie., 17 ago. 2018 a las 22:14, vino yang ()
 escribió:

> Hi antonio,
>
> Yes, ProcessWindowFunction is a very low level window function.
> It allows you to access the data in the window and allows you to
> customize the output of the window.
> So if you use it, while giving you flexibility, you need to think
> about other things, which may require you to write more processing logic.
>
> Generally speaking, sliding windows usually have some data that is
> repeated, but a common mode is to apply a reduce function on it to get 
> your
> calculation results.
> If you only send data, there will definitely be some duplication.
>
> Thanks, vino.
>
> antonio saldivar  于2018年8月17日周五 下午12:01写道:
>
>> Hi Vino
>> thank you for the information, actually I am using a trigger alert
>> and processWindowFunction to send my results, but when my window slides 
>> or
>> ends it sends again the objects and I an getting duplicated data
>>
>> El jue., 16 ago. 2018 a las 22:05, vino yang ()
>> escribió:
>>
>>> Hi Antonio,
>>>
>>> What results do not you want to get when creating each window?
>>> Examples of the use of ProcessWindowFunction are included in many
>>> test files in Flink's project, such as SideOutputITCase.scala or
>>> WindowTranslationTest.scala.
>>>
>>> For more information on ProcessWindowFunction, you can refer to the
>>> official website.[1]
>>>
>>> [1]:
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/operators/windows.html#processwindowfunction
>>>
>>> Thanks, vino.
>>>
>>> antonio saldivar  于2018年8月17日周五 上午6:24写道:
>>>
 Hello

 I am implementing a data stream where I use sliding windows but I
 am stuck because I need to set values to my object based on some if
 statements in my process function  and send the object to the next 

Re: What's the advantage of using BroadcastState?

2018-08-20 Thread Fabian Hueske
Hi,

I've recently published a blog post about Broadcast State [1].

Cheers,
Fabian

[1]
https://data-artisans.com/blog/a-practical-guide-to-broadcast-state-in-apache-flink

2018-08-20 3:58 GMT+02:00 Paul Lam :

> Hi Rong, Hequn
>
> Your answers are very helpful! Thank you!
>
> Best Regards,
> Paul Lam
>
> 在 2018年8月19日,23:30,Rong Rong  写道:
>
> Hi Paul,
>
> To add to Hequn's answer. Broadcast state can typically be used as "a
> low-throughput stream containing a set of rules which we want to evaluate
> against all elements coming from another stream" [1]
> So to add to the difference list is: whether it is "broadcast" across all
> keys if processing a keyed stream. This is typically when it is not
> possible to derive same key field using KeySelector in CoStream.
> Another additional difference is performance: BroadcastStream is "stored
> locally and is used to process all incoming elements on the other stream"
> thus requires to carefully manage the size of the BroadcastStream.
>
> [1]: https://ci.apache.org/projects/flink/flink-docs-
> release-1.6/dev/stream/state/broadcast_state.html
>
> On Sun, Aug 19, 2018 at 1:40 AM Hequn Cheng  wrote:
>
>> Hi Paul,
>>
>> There are some differences:
>> 1. The BroadcastStream can broadcast data for you, i.e, data will be
>> broadcasted to all downstream tasks automatically.
>> 2. To guarantee that the contents in the Broadcast State are the same
>> across all parallel instances of our operator, read-write access is only
>> given to the broadcast side
>> 3. For BroadcastState, flink guarantees that upon restoring/rescaling
>> there will be no duplicates and no missing data. In case of recovery with
>> the same or smaller parallelism, each task reads its checkpointed state.
>> Upon scaling up, each task reads its own state, and the remaining tasks
>> (p_new-p_old) read checkpoints of previous tasks in a round-robin manner.
>> While MapState doesn't have such abilities.
>>
>> Best, Hequn
>>
>> On Sun, Aug 19, 2018 at 11:18 AM, Paul Lam  wrote:
>>
>>> Hi,
>>>
>>> AFAIK, the difference between a BroadcastStream and a normal DataStream
>>> is that the BroadcastStream is with a BroadcastState, but it seems that the
>>> functionality of BroadcastState can also be achieved by MapState in a
>>> CoMapFunction or something since the control stream is still broadcasted
>>> without being turned into BroadcastStream. So, I’m wondering what’s the
>>> advantage of using BroadcastState? Thanks a lot!
>>>
>>> Best Regards,
>>> Paul Lam
>>>
>>
>>
>


Re: processWindowFunction

2018-08-20 Thread vino yang
Hi antonio,

First, I suggest you use KeyedProcessFunction if you have an operation
similar to keyBy.
The implementation is similar to the Fixed window.
You can create three state collections to determine whether the time of
each element belongs to a state collection.
At the time of the trigger, the elements in the collection are evaluated.

Thanks, vino.

antonio saldivar  于2018年8月20日周一 上午11:54写道:

> Thank you fro the references
>
> I have now my processFunction and getting the state but now how can i do
> for the threshold times to group the elements and also as this is a global
> window, how to purge because if going to keep increasing
>
> El dom., 19 ago. 2018 a las 8:57, vino yang ()
> escribió:
>
>> Hi antonio,
>>
>> Regarding your scenario, I think maybe you can consider using the
>> ProcessFunction (or keyed ProcessFunction) function directly on the Stream.
>> [1]
>> It can handle each of your elements with a Timer, and you can combine
>> Flink's state API[2] to store your data.
>>
>> [1]:
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html#process-function-low-level-operations
>> [2]:
>> https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/state/state.html#working-with-state
>>
>> Thanks, vino.
>>
>> antonio saldivar  于2018年8月19日周日 上午10:18写道:
>>
>>> hi Vino
>>>
>>> it is possible to use global window, then set the trigger onElement
>>> comparing the element that has arrived with for example 10 mins, 20 mins
>>> and 60 mins of data?
>>>
>>> I have rules evaluating sum of amount for 10,20 or 60 mins for the same
>>> keyed element if the same id sum like $200 total within those thresholds
>>> and count more or equals to 3 I need to be able to set some values to the
>>> object if the object does not reach those thresholds i do not set the
>>> values and keep sending the output with or without those value.
>>>
>>> just processing the object on the fly and send output
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> El vie., 17 ago. 2018 a las 22:14, vino yang ()
>>> escribió:
>>>
 Hi antonio,

 Yes, ProcessWindowFunction is a very low level window function.
 It allows you to access the data in the window and allows you to
 customize the output of the window.
 So if you use it, while giving you flexibility, you need to think about
 other things, which may require you to write more processing logic.

 Generally speaking, sliding windows usually have some data that is
 repeated, but a common mode is to apply a reduce function on it to get your
 calculation results.
 If you only send data, there will definitely be some duplication.

 Thanks, vino.

 antonio saldivar  于2018年8月17日周五 下午12:01写道:

> Hi Vino
> thank you for the information, actually I am using a trigger alert and
> processWindowFunction to send my results, but when my window slides or 
> ends
> it sends again the objects and I an getting duplicated data
>
> El jue., 16 ago. 2018 a las 22:05, vino yang ()
> escribió:
>
>> Hi Antonio,
>>
>> What results do not you want to get when creating each window?
>> Examples of the use of ProcessWindowFunction are included in many
>> test files in Flink's project, such as SideOutputITCase.scala or
>> WindowTranslationTest.scala.
>>
>> For more information on ProcessWindowFunction, you can refer to the
>> official website.[1]
>>
>> [1]:
>> https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/operators/windows.html#processwindowfunction
>>
>> Thanks, vino.
>>
>> antonio saldivar  于2018年8月17日周五 上午6:24写道:
>>
>>> Hello
>>>
>>> I am implementing a data stream where I use sliding windows but I am
>>> stuck because I need to set values to my object based on some if 
>>> statements
>>> in my process function  and send the object to the next step but I don't
>>> want results every time a window is creating
>>>
>>> if anyone has a good example on this that can help me
>>>
>>