[jira] [Updated] (FLINK-15476) Update StreamingFileSink documentation -- bulk encoded writer now supports customized checkpoint policy

2021-06-22 Thread Ying Xu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15476?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ying Xu updated FLINK-15476:

Priority: Minor  (was: Major)

> Update StreamingFileSink documentation -- bulk encoded writer now supports 
> customized checkpoint policy
> ---
>
> Key: FLINK-15476
> URL: https://issues.apache.org/jira/browse/FLINK-15476
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / FileSystem, Documentation
>Affects Versions: 1.10.0
>Reporter: Ying Xu
>Priority: Minor
>  Labels: auto-unassigned, stale-major
>
> Per FLINK-13027, {{StreamingFileSink}}'s bulk encoded writer (created with 
> {{forBulkFormat}}) now supports customized checkpoint policies which roll 
> file at the checkpoint epoch. 
> The {{StreamingFileSink}} documentation needs to be updated accordingly. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-15476) Update StreamingFileSink documentation -- bulk encoded writer now supports customized checkpoint policy

2020-02-07 Thread Ying Xu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15476?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17032198#comment-17032198
 ] 

Ying Xu commented on FLINK-15476:
-

Thanks for asking [~kkl0u] .  I was busy with a few other work items. Would 
like to take on it in the next few days.  Thanks!

> Update StreamingFileSink documentation -- bulk encoded writer now supports 
> customized checkpoint policy
> ---
>
> Key: FLINK-15476
> URL: https://issues.apache.org/jira/browse/FLINK-15476
> Project: Flink
>  Issue Type: Task
>  Components: Connectors / FileSystem, Documentation
>Affects Versions: 1.10.0
>Reporter: Ying Xu
>Assignee: Ying Xu
>Priority: Major
>
> Per FLINK-13027, {{StreamingFileSink}}'s bulk encoded writer (created with 
> {{forBulkFormat}}) now supports customized checkpoint policies which roll 
> file at the checkpoint epoch. 
> The {{StreamingFileSink}} documentation needs to be updated accordingly. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-15476) Update StreamingFileSink documentation -- bulk encoded writer now supports customized checkpoint policy

2020-01-16 Thread Ying Xu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15476?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17017522#comment-17017522
 ] 

Ying Xu commented on FLINK-15476:
-

HI [~kkl0u] is it OK to pursue this Jira based on [the 
comment|https://github.com/apache/flink/pull/10653#issuecomment-568616531] in 
FLINK-13027 ?  

Thanks!

> Update StreamingFileSink documentation -- bulk encoded writer now supports 
> customized checkpoint policy
> ---
>
> Key: FLINK-15476
> URL: https://issues.apache.org/jira/browse/FLINK-15476
> Project: Flink
>  Issue Type: Task
>  Components: Documentation
>Reporter: Ying Xu
>Priority: Major
>
> Per FLINK-13027, {{StreamingFileSink}}'s bulk encoded writer (created with 
> {{forBulkFormat}}) now supports customized checkpoint policies which roll 
> file at the checkpoint epoch. 
> The {{StreamingFileSink}} documentation needs to be updated accordingly. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-15476) Update StreamingFileSink documentation -- bulk encoded writer now supports customized checkpoint policy

2020-01-03 Thread Ying Xu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15476?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ying Xu updated FLINK-15476:

Component/s: Documentation
Description: 
Per FLINK-13027, {{StreamingFileSink}}'s bulk encoded writer (created with 
{{forBulkFormat}}) now supports customized checkpoint policies which roll file 
at the checkpoint epoch. 

The {{StreamingFileSink}} documentation needs to be updated accordingly. 
 Issue Type: Task  (was: Improvement)

> Update StreamingFileSink documentation -- bulk encoded writer now supports 
> customized checkpoint policy
> ---
>
> Key: FLINK-15476
> URL: https://issues.apache.org/jira/browse/FLINK-15476
> Project: Flink
>  Issue Type: Task
>  Components: Documentation
>Reporter: Ying Xu
>Priority: Major
>
> Per FLINK-13027, {{StreamingFileSink}}'s bulk encoded writer (created with 
> {{forBulkFormat}}) now supports customized checkpoint policies which roll 
> file at the checkpoint epoch. 
> The {{StreamingFileSink}} documentation needs to be updated accordingly. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-15476) Update StreamingFileSink documentation -- bulk encoded writer now supports customized checkpoint policy

2020-01-03 Thread Ying Xu (Jira)
Ying Xu created FLINK-15476:
---

 Summary: Update StreamingFileSink documentation -- bulk encoded 
writer now supports customized checkpoint policy
 Key: FLINK-15476
 URL: https://issues.apache.org/jira/browse/FLINK-15476
 Project: Flink
  Issue Type: Improvement
Reporter: Ying Xu






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-15301) Flink Kinesis AsyncRecordEmitter needs to handle unchecked exception gracefully

2019-12-17 Thread Ying Xu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15301?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16998418#comment-16998418
 ] 

Ying Xu edited comment on FLINK-15301 at 12/17/19 5:45 PM:
---

Example stacktrace
{code:java}
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: 
Could not forward element to next operator
 {{ at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:596)}}
 {{ at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)}}
 {{ at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)}}
 {{ at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)}}
 {{ at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)}}
 {{ at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:310)}}
 {{ at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:409)}}
 {{ at 
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.emitRecordAndUpdateState(KinesisDataFetcher.java:772)}}
 {{ at 
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.access$000(KinesisDataFetcher.java:91)}}
 {{ at 
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher$AsyncKinesisRecordEmitter.emit(KinesisDataFetcher.java:272)}}
 {{ at 
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher$AsyncKinesisRecordEmitter.emit(KinesisDataFetcher.java:260)}}
 {{ at 
org.apache.flink.streaming.connectors.kinesis.util.RecordEmitter.run(RecordEmitter.java:230)}}
 {{ at java.lang.Thread.run(Thread.java:748)
...{code}


was (Author: yxu-apache):
Example stacktrace
{code:java}
05:39:32.393 INFO o.a.f.f.s.c.w.S3Committer - Committing 
f/event_name=dynamic_translation_missing/ds=2019-12-15/hr=05/part-450-96064 
with MPU ID 
y5e5QQ3kTzi7TkuEirSdr.enmM7GaIkIxvjVRqIT0kXaMSPzhVQKLRu3vQuzZ.oziFr2vuXXGXEThkpsOiCCV17UQxGm5AtgDqslw33uc1aTHyIKFQXwNRpQCZCttZ_AxcLbltEyjd8m7ea15Bhf.A--
 Exception in thread "recordEmitter-Source: json-events_source -> 
json-events_flatten_json_to_persistermessage -> json-events_local_record_count 
(188/512)" 
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: 
Could not forward element to next operator
 {{ at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:596)}}
 {{ at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)}}
 {{ at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)}}
 {{ at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)}}
 {{ at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)}}
 {{ at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:310)}}
 {{ at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:409)}}
 {{ at 
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.emitRecordAndUpdateState(KinesisDataFetcher.java:772)}}
 {{ at 
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.access$000(KinesisDataFetcher.java:91)}}
 {{ at 
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher$AsyncKinesisRecordEmitter.emit(KinesisDataFetcher.java:272)}}
 {{ at 
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher$AsyncKinesisRecordEmitter.emit(KinesisDataFetcher.java:260)}}
 {{ at 
org.apache.flink.streaming.connectors.kinesis.util.RecordEmitter.run(RecordEmitter.java:230)}}
 {{ at java.lang.Thread.run(Thread.java:748)
...{code}

> Flink Kinesis AsyncRecordEmitter needs to handle unchecked exception 
> gracefully
> ---
>
> Key: FLINK-15301
> URL: https://issues.apache.org/jira/browse/FLINK-15301
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kinesis
>Reporter: Ying Xu
>Priority: Major
>
> Currently, any runTime exception encountered inside the 
> `AsyncRecordEmitter.emitRecordAndUpdateState()` function could cause the 
> thread to exit silently. Flink job would continue to run, but the stopped 

[jira] [Commented] (FLINK-15301) Flink Kinesis AsyncRecordEmitter needs to handle unchecked exception gracefully

2019-12-17 Thread Ying Xu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15301?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16998418#comment-16998418
 ] 

Ying Xu commented on FLINK-15301:
-

Example stacktrace
{code:java}
05:39:32.393 INFO o.a.f.f.s.c.w.S3Committer - Committing 
f/event_name=dynamic_translation_missing/ds=2019-12-15/hr=05/part-450-96064 
with MPU ID 
y5e5QQ3kTzi7TkuEirSdr.enmM7GaIkIxvjVRqIT0kXaMSPzhVQKLRu3vQuzZ.oziFr2vuXXGXEThkpsOiCCV17UQxGm5AtgDqslw33uc1aTHyIKFQXwNRpQCZCttZ_AxcLbltEyjd8m7ea15Bhf.A--
 Exception in thread "recordEmitter-Source: json-events_source -> 
json-events_flatten_json_to_persistermessage -> json-events_local_record_count 
(188/512)" 
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: 
Could not forward element to next operator
 {{ at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:596)}}
 {{ at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)}}
 {{ at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)}}
 {{ at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)}}
 {{ at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)}}
 {{ at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:310)}}
 {{ at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:409)}}
 {{ at 
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.emitRecordAndUpdateState(KinesisDataFetcher.java:772)}}
 {{ at 
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.access$000(KinesisDataFetcher.java:91)}}
 {{ at 
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher$AsyncKinesisRecordEmitter.emit(KinesisDataFetcher.java:272)}}
 {{ at 
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher$AsyncKinesisRecordEmitter.emit(KinesisDataFetcher.java:260)}}
 {{ at 
org.apache.flink.streaming.connectors.kinesis.util.RecordEmitter.run(RecordEmitter.java:230)}}
 {{ at java.lang.Thread.run(Thread.java:748)
...{code}

> Flink Kinesis AsyncRecordEmitter needs to handle unchecked exception 
> gracefully
> ---
>
> Key: FLINK-15301
> URL: https://issues.apache.org/jira/browse/FLINK-15301
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kinesis
>Reporter: Ying Xu
>Priority: Major
>
> Currently, any runTime exception encountered inside the 
> `AsyncRecordEmitter.emitRecordAndUpdateState()` function could cause the 
> thread to exit silently. Flink job would continue to run, but the stopped 
> record emitter would subsequently cause Kinesis data consumption to stall. 
>  
> The AsyncRecordEmitter need to catch unchecked exception, log errors, and 
> perhaps trigger job restart subsequently. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-15301) Flink Kinesis AsyncRecordEmitter needs to handle unchecked exception gracefully

2019-12-17 Thread Ying Xu (Jira)
Ying Xu created FLINK-15301:
---

 Summary: Flink Kinesis AsyncRecordEmitter needs to handle 
unchecked exception gracefully
 Key: FLINK-15301
 URL: https://issues.apache.org/jira/browse/FLINK-15301
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / Kinesis
Reporter: Ying Xu


Currently, any runTime exception encountered inside the 
`AsyncRecordEmitter.emitRecordAndUpdateState()` function could cause the thread 
to exit silently. Flink job would continue to run, but the stopped record 
emitter would subsequently cause Kinesis data consumption to stall. 

 

The AsyncRecordEmitter need to catch unchecked exception, log errors, and 
perhaps trigger job restart subsequently. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-14039) Flink Kinesis consumer: configurable per-shard consumption rate when running in adaptive mode

2019-09-10 Thread Ying Xu (Jira)
Ying Xu created FLINK-14039:
---

 Summary: Flink Kinesis consumer: configurable per-shard 
consumption rate when running in adaptive mode
 Key: FLINK-14039
 URL: https://issues.apache.org/jira/browse/FLINK-14039
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / Kinesis
Reporter: Ying Xu


Currently, Flink kinesis connector has a fixed 
[2MB|https://github.com/apache/flink/blob/78748ea1aee8f9d0c0499180a2ef455490b32b24/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java#L59-L61]
 target rate (per-shard) when running in adaptive rate mode.  In specific 
scenarios, it is desirable that users would want a different target rate. For 
example, when two Kinesis consumers share a common stream, the user may want to 
de-prioritize one stream such that it runs with a target rate < 2MB. 

It is relatively straightforward to implement this feature – simply add a 
per-shard target rate consumer config and has the default set to 2MB. 

 

 



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Commented] (FLINK-13864) StreamingFileSink: Allow inherited classes to extend StreamingFileSink correctly

2019-08-30 Thread Ying Xu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-13864?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16919902#comment-16919902
 ] 

Ying Xu commented on FLINK-13864:
-

HI [~kkl0u] , this is Ying and I have been working with [~kailashhd] on this 
feature.   Posted a PR [https://github.com/apache/flink/pull/9581] where you 
can find more details.   

We've tested a customized StreamingfileSink built on top of the new interface, 
and it was working fine. Would love your comments there. 

> StreamingFileSink: Allow inherited classes to extend StreamingFileSink 
> correctly
> 
>
> Key: FLINK-13864
> URL: https://issues.apache.org/jira/browse/FLINK-13864
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / FileSystem
>Reporter: Kailash Hassan Dayanand
>Priority: Minor
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Currently the StreamingFileSink can't be extended correctly as there are a 
> few issues [PR |[https://github.com/apache/flink/pull/8469]] merged for  this 
> [Jira|https://issues.apache.org/jira/browse/FLINK-12539]
> Mailing list discussion: 
> [http://mail-archives.apache.org/mod_mbox/flink-dev/201908.mbox/%3CCACGLQUAxXjr2mBOf-6hbXcwmWoH5ib_0YEy-Vyjj%3DEPyQ25Qiw%40mail.gmail.com%3E]
>  



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Assigned] (FLINK-13027) StreamingFileSink bulk-encoded writer supports file rolling upon customized events

2019-07-03 Thread Ying Xu (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-13027?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ying Xu reassigned FLINK-13027:
---

Assignee: Ying Xu

> StreamingFileSink bulk-encoded writer supports file rolling upon customized 
> events
> --
>
> Key: FLINK-13027
> URL: https://issues.apache.org/jira/browse/FLINK-13027
> Project: Flink
>  Issue Type: New Feature
>  Components: API / DataStream
>Reporter: Ying Xu
>Assignee: Ying Xu
>Priority: Major
>
> When writing in bulk-encoded format such as Parquet, StreamingFileSink only 
> supports OnCheckpointRollingPolicy, which rolls file at checkpointing time.   
>  
> In many scenarios, it is beneficial that the sink can roll file upon certain 
> events, for example, when the file size reaches a limit. Such a rolling 
> policy can also potentially alleviate some of the side effects of 
> OnCheckpointRollingPolicy, e.g.,, most of the heavy liftings including file 
> uploading all happen at the checkpoint time.  
> Specifically, this Jira calls for a new rolling policy that rolls file: 
>  # whenever a customized event happens, e.g., the file size reaches certain 
> limit. 
>  # whenever a checkpoint happens. This is needed for providing exactly-once 
> guarantees when writing bulk-encoded files. 
> Users of this rolling policy need to be aware that the customized event and 
> the next checkpoint epoch may be close to each other, thus may yield a tiny 
> file per checkpoint at the worst. 
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-13027) StreamingFileSink bulk-encoded writer supports file rolling upon customized events

2019-06-28 Thread Ying Xu (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-13027?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ying Xu reassigned FLINK-13027:
---

Assignee: (was: Ying Xu)

> StreamingFileSink bulk-encoded writer supports file rolling upon customized 
> events
> --
>
> Key: FLINK-13027
> URL: https://issues.apache.org/jira/browse/FLINK-13027
> Project: Flink
>  Issue Type: New Feature
>  Components: API / DataStream
>Reporter: Ying Xu
>Priority: Major
>
> When writing in bulk-encoded format such as Parquet, StreamingFileSink only 
> supports OnCheckpointRollingPolicy, which rolls file at checkpointing time.   
>  
> In many scenarios, it is beneficial that the sink can roll file upon certain 
> events, for example, when the file size reaches a limit. Such a rolling 
> policy can also potentially alleviate some of the side effects of 
> OnCheckpointRollingPolicy, e.g.,, most of the heavy liftings including file 
> uploading all happen at the checkpoint time.  
> Specifically, this Jira calls for a new rolling policy that rolls file: 
>  # whenever a customized event happens, e.g., the file size reaches certain 
> limit. 
>  # whenever a checkpoint happens. This is needed for providing exactly-once 
> guarantees when writing bulk-encoded files. 
> Users of this rolling policy need to be aware that the customized event and 
> the next checkpoint epoch may be close to each other, thus may yield a tiny 
> file per checkpoint at the worst. 
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-13027) StreamingFileSink bulk-encoded writer supports file rolling upon customized events

2019-06-28 Thread Ying Xu (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-13027?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ying Xu reassigned FLINK-13027:
---

Assignee: Ying Xu

> StreamingFileSink bulk-encoded writer supports file rolling upon customized 
> events
> --
>
> Key: FLINK-13027
> URL: https://issues.apache.org/jira/browse/FLINK-13027
> Project: Flink
>  Issue Type: New Feature
>  Components: API / DataStream
>Reporter: Ying Xu
>Assignee: Ying Xu
>Priority: Major
>
> When writing in bulk-encoded format such as Parquet, StreamingFileSink only 
> supports OnCheckpointRollingPolicy, which rolls file at checkpointing time.   
>  
> In many scenarios, it is beneficial that the sink can roll file upon certain 
> events, for example, when the file size reaches a limit. Such a rolling 
> policy can also potentially alleviate some of the side effects of 
> OnCheckpointRollingPolicy, e.g.,, most of the heavy liftings including file 
> uploading all happen at the checkpoint time.  
> Specifically, this Jira calls for a new rolling policy that rolls file: 
>  # whenever a customized event happens, e.g., the file size reaches certain 
> limit. 
>  # whenever a checkpoint happens. This is needed for providing exactly-once 
> guarantees when writing bulk-encoded files. 
> Users of this rolling policy need to be aware that the customized event and 
> the next checkpoint epoch may be close to each other, thus may yield a tiny 
> file per checkpoint at the worst. 
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-13027) StreamingFileSink bulk-encoded writer supports file rolling upon customized events

2019-06-28 Thread Ying Xu (JIRA)
Ying Xu created FLINK-13027:
---

 Summary: StreamingFileSink bulk-encoded writer supports file 
rolling upon customized events
 Key: FLINK-13027
 URL: https://issues.apache.org/jira/browse/FLINK-13027
 Project: Flink
  Issue Type: New Feature
  Components: API / DataStream
Reporter: Ying Xu


When writing in bulk-encoded format such as Parquet, StreamingFileSink only 
supports OnCheckpointRollingPolicy, which rolls file at checkpointing time.    

In many scenarios, it is beneficial that the sink can roll file upon certain 
events, for example, when the file size reaches a limit. Such a rolling policy 
can also potentially alleviate some of the side effects of 
OnCheckpointRollingPolicy, e.g.,, most of the heavy liftings including file 
uploading all happen at the checkpoint time.  

Specifically, this Jira calls for a new rolling policy that rolls file: 
 # whenever a customized event happens, e.g., the file size reaches certain 
limit. 
 # whenever a checkpoint happens. This is needed for providing exactly-once 
guarantees when writing bulk-encoded files. 

Users of this rolling policy need to be aware that the customized event and the 
next checkpoint epoch may be close to each other, thus may yield a tiny file 
per checkpoint at the worst. 

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-4582) Allow FlinkKinesisConsumer to adapt for AWS DynamoDB Streams

2019-02-04 Thread Ying Xu (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-4582?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16760284#comment-16760284
 ] 

Ying Xu commented on FLINK-4582:


Thanks [~tinder-dthomson] .   Internally we use the *new ObjectMapper()* 
initialization but didn't observe similar issue.  But we may run with an older 
Flink distribution.

Yes please report this as a separate bug and perhaps attach full stack trace.  
If you already have a fix, feel free to post the PR as well. 

> Allow FlinkKinesisConsumer to adapt for AWS DynamoDB Streams
> 
>
> Key: FLINK-4582
> URL: https://issues.apache.org/jira/browse/FLINK-4582
> Project: Flink
>  Issue Type: New Feature
>  Components: Kinesis Connector, Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Ying Xu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> AWS DynamoDB is a NoSQL database service that has a CDC-like (change data 
> capture) feature called DynamoDB Streams 
> (http://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html),
>  which is a stream feed of item-level table activities.
> The DynamoDB Streams shard abstraction follows that of Kinesis Streams with 
> only a slight difference in resharding behaviours, so it is possible to build 
> on the internals of our Flink Kinesis Consumer for an exactly-once DynamoDB 
> Streams source.
> I propose an API something like this:
> {code}
> DataStream dynamoItemsCdc = 
>   FlinkKinesisConsumer.asDynamoDBStream(tableNames, schema, config)
> {code}
> The feature adds more connectivity to popular AWS services for Flink, and 
> combining what Flink has for exactly-once semantics, out-of-core state 
> backends, and queryable state with CDC can have very strong use cases. For 
> this feature there should only be an extra dependency to the AWS Java SDK for 
> DynamoDB, which has Apache License 2.0.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-4582) Allow FlinkKinesisConsumer to adapt for AWS DynamoDB Streams

2018-11-01 Thread Ying Xu (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-4582?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16671216#comment-16671216
 ] 

Ying Xu commented on FLINK-4582:


[~tinder-dthomson] Posted the PR here:  
https://github.com/apache/flink/pull/6968

> Allow FlinkKinesisConsumer to adapt for AWS DynamoDB Streams
> 
>
> Key: FLINK-4582
> URL: https://issues.apache.org/jira/browse/FLINK-4582
> Project: Flink
>  Issue Type: New Feature
>  Components: Kinesis Connector, Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Ying Xu
>Priority: Major
>  Labels: pull-request-available
>
> AWS DynamoDB is a NoSQL database service that has a CDC-like (change data 
> capture) feature called DynamoDB Streams 
> (http://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html),
>  which is a stream feed of item-level table activities.
> The DynamoDB Streams shard abstraction follows that of Kinesis Streams with 
> only a slight difference in resharding behaviours, so it is possible to build 
> on the internals of our Flink Kinesis Consumer for an exactly-once DynamoDB 
> Streams source.
> I propose an API something like this:
> {code}
> DataStream dynamoItemsCdc = 
>   FlinkKinesisConsumer.asDynamoDBStream(tableNames, schema, config)
> {code}
> The feature adds more connectivity to popular AWS services for Flink, and 
> combining what Flink has for exactly-once semantics, out-of-core state 
> backends, and queryable state with CDC can have very strong use cases. For 
> this feature there should only be an extra dependency to the AWS Java SDK for 
> DynamoDB, which has Apache License 2.0.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-4582) Allow FlinkKinesisConsumer to adapt for AWS DynamoDB Streams

2018-10-29 Thread Ying Xu (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-4582?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16667569#comment-16667569
 ] 

Ying Xu commented on FLINK-4582:


Thanks [~tinder-dthomson] for the detailed comments.   Yes that's exactly why I 
felt _efficient multi-stream_ support is somehow lacking :).  

Actually, we are running Flink 1.5.2 internally. For contributing to upstream, 
I'm currently adapting the patch to fit the master flink (1.7-SNAPSNOT). The 
main difference is flink 1.7 Kinesis connector uses the _listshards API_ to 
retrieve the shard list. For DynamoDB streams, we must use the _describeStreams 
API_ to retrieve such information since listshards is not supported. I am 
currently porting related logic around _describeStreams_ from the 1.5 flink to 
my patch.  I shall be able to post a meaningful PR in 1-2 days.  

> Allow FlinkKinesisConsumer to adapt for AWS DynamoDB Streams
> 
>
> Key: FLINK-4582
> URL: https://issues.apache.org/jira/browse/FLINK-4582
> Project: Flink
>  Issue Type: New Feature
>  Components: Kinesis Connector, Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Ying Xu
>Priority: Major
>
> AWS DynamoDB is a NoSQL database service that has a CDC-like (change data 
> capture) feature called DynamoDB Streams 
> (http://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html),
>  which is a stream feed of item-level table activities.
> The DynamoDB Streams shard abstraction follows that of Kinesis Streams with 
> only a slight difference in resharding behaviours, so it is possible to build 
> on the internals of our Flink Kinesis Consumer for an exactly-once DynamoDB 
> Streams source.
> I propose an API something like this:
> {code}
> DataStream dynamoItemsCdc = 
>   FlinkKinesisConsumer.asDynamoDBStream(tableNames, schema, config)
> {code}
> The feature adds more connectivity to popular AWS services for Flink, and 
> combining what Flink has for exactly-once semantics, out-of-core state 
> backends, and queryable state with CDC can have very strong use cases. For 
> this feature there should only be an extra dependency to the AWS Java SDK for 
> DynamoDB, which has Apache License 2.0.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-4582) Allow FlinkKinesisConsumer to adapt for AWS DynamoDB Streams

2018-10-26 Thread Ying Xu (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-4582?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16665527#comment-16665527
 ] 

Ying Xu commented on FLINK-4582:


Hi [~tinder-dthomson] thanks for raising this issue up.  And sorry for the 
delay in responding to the original request. 

We actually implemented a version of the flink-dynamodbstreams connector on top 
of the existing flink-kinesis connector. The work is currently in production 
and was presented in a meetup event back in Sep.  I wasn't able to get a chance 
to contribute back because of other work priorities – my bad!  

I looked at your PR.  The use of _DynamodbProxy.getShardList()_ is interesting. 
We took a slightly different approach, which plugs in a dynamodbstreams-kinesis 
adapter object into KinesisProxy and makes it an equivalent _DynamodbProxy_ 
(approach mentioned in another thread titled *Consuming data from dynamoDB 
streams to flink*).  We rely on the assumption that during re-sharing, one can 
retrieve all the new child shard Ids based on passing the last seen shardId. 
Although Dynamodbstreams do not officially claim this, we consistently observed 
similar behavior in production during resharding. 

Other benefits of directly embedding a dynamodbstreams-kinesis adapter is to 
allow *ONE* source (consumer) to consume from multiple data streams (which is 
important for our use cases), plus other error handling in the existing 
KinesisProxy. I agree that if the _DynamodbProxy_ provides _efficient 
multi-stream_ implementation, it is an interesting direction to look into. 

If you can wait a few days, I can adapt my PR on top of the OSS flink and post 
it by early next week.  We can have more discussions at then. What do you think?

Thank you very much!

 

> Allow FlinkKinesisConsumer to adapt for AWS DynamoDB Streams
> 
>
> Key: FLINK-4582
> URL: https://issues.apache.org/jira/browse/FLINK-4582
> Project: Flink
>  Issue Type: New Feature
>  Components: Kinesis Connector, Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Ying Xu
>Priority: Major
>
> AWS DynamoDB is a NoSQL database service that has a CDC-like (change data 
> capture) feature called DynamoDB Streams 
> (http://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html),
>  which is a stream feed of item-level table activities.
> The DynamoDB Streams shard abstraction follows that of Kinesis Streams with 
> only a slight difference in resharding behaviours, so it is possible to build 
> on the internals of our Flink Kinesis Consumer for an exactly-once DynamoDB 
> Streams source.
> I propose an API something like this:
> {code}
> DataStream dynamoItemsCdc = 
>   FlinkKinesisConsumer.asDynamoDBStream(tableNames, schema, config)
> {code}
> The feature adds more connectivity to popular AWS services for Flink, and 
> combining what Flink has for exactly-once semantics, out-of-core state 
> backends, and queryable state with CDC can have very strong use cases. For 
> this feature there should only be an extra dependency to the AWS Java SDK for 
> DynamoDB, which has Apache License 2.0.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10358) Flink kinesis connector could throw NPE during getRecords() call

2018-09-20 Thread Ying Xu (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10358?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16622739#comment-16622739
 ] 

Ying Xu commented on FLINK-10358:
-

[PR 6708|https://github.com/apache/flink/pull/6708] is merged

> Flink kinesis connector could throw NPE during getRecords() call 
> -
>
> Key: FLINK-10358
> URL: https://issues.apache.org/jira/browse/FLINK-10358
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Reporter: Ying Xu
>Assignee: Ying Xu
>Priority: Major
>  Labels: pull-request-available
>
> When extending the flink kinesis connector to consume from a dynamodb stream, 
> it was found NPE could be thrown at 
> [here|https://github.com/apache/flink/blob/e3c98f05d3544d0165c2d97d2d00fcd295cef8c8/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java#L376]
>  . 
> This is because the [getRecords 
> API|https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_streams_GetRecords.html]
>  in dynamodb streams does not return the millisBehindLatest field and has it 
> set to null.  Null check is probably needed here.
> See FLINK-4582 for the context of building dynamodb streams connector on top 
> of the Kinesis connector. 
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-10358) Flink kinesis connector could throw NPE during getRecords() call

2018-09-20 Thread Ying Xu (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10358?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ying Xu closed FLINK-10358.
---
Resolution: Fixed

> Flink kinesis connector could throw NPE during getRecords() call 
> -
>
> Key: FLINK-10358
> URL: https://issues.apache.org/jira/browse/FLINK-10358
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Reporter: Ying Xu
>Assignee: Ying Xu
>Priority: Major
>  Labels: pull-request-available
>
> When extending the flink kinesis connector to consume from a dynamodb stream, 
> it was found NPE could be thrown at 
> [here|https://github.com/apache/flink/blob/e3c98f05d3544d0165c2d97d2d00fcd295cef8c8/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java#L376]
>  . 
> This is because the [getRecords 
> API|https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_streams_GetRecords.html]
>  in dynamodb streams does not return the millisBehindLatest field and has it 
> set to null.  Null check is probably needed here.
> See FLINK-4582 for the context of building dynamodb streams connector on top 
> of the Kinesis connector. 
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-10358) Flink kinesis connector could throw NPE during getRecords() call

2018-09-17 Thread Ying Xu (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10358?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ying Xu reassigned FLINK-10358:
---

Assignee: Ying Xu

> Flink kinesis connector could throw NPE during getRecords() call 
> -
>
> Key: FLINK-10358
> URL: https://issues.apache.org/jira/browse/FLINK-10358
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Reporter: Ying Xu
>Assignee: Ying Xu
>Priority: Major
>  Labels: pull-request-available
>
> When extending the flink kinesis connector to consume from a dynamodb stream, 
> it was found NPE could be thrown at 
> [here|https://github.com/apache/flink/blob/e3c98f05d3544d0165c2d97d2d00fcd295cef8c8/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java#L376]
>  . 
> This is because the [getRecords 
> API|https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_streams_GetRecords.html]
>  in dynamodb streams does not return the millisBehindLatest field and has it 
> set to null.  Null check is probably needed here.
> See FLINK-4582 for the context of building dynamodb streams connector on top 
> of the Kinesis connector. 
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10358) Flink kinesis connector could throw NPE during getRecords() call

2018-09-17 Thread Ying Xu (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10358?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ying Xu updated FLINK-10358:

Description: 
When extending the flink kinesis connector to consume from a dynamodb stream, 
it was found NPE could be thrown at 
[here|https://github.com/apache/flink/blob/e3c98f05d3544d0165c2d97d2d00fcd295cef8c8/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java#L376]
 . 

This is because the [getRecords 
API|https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_streams_GetRecords.html]
 in dynamodb streams does not return the millisBehindLatest field and has it 
set to null.  Null check is probably needed here.

See FLINK-4582 for the context of building dynamodb streams connector on top of 
the Kinesis connector. 

 

  was:
When extending the flink kinesis connector to consume from a dynamodb stream, 
it was found NPE could be thrown at 
[here|https://github.com/apache/flink/blob/e3c98f05d3544d0165c2d97d2d00fcd295cef8c8/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java#L376]
 . 

This is because the [getRecords 
API|https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_streams_GetRecords.html]
 in dynamodb streams does not return the millisBehindLatest field and has it 
set to null.  Null check is probably needed here.

 


> Flink kinesis connector could throw NPE during getRecords() call 
> -
>
> Key: FLINK-10358
> URL: https://issues.apache.org/jira/browse/FLINK-10358
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Reporter: Ying Xu
>Priority: Major
>  Labels: pull-request-available
>
> When extending the flink kinesis connector to consume from a dynamodb stream, 
> it was found NPE could be thrown at 
> [here|https://github.com/apache/flink/blob/e3c98f05d3544d0165c2d97d2d00fcd295cef8c8/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java#L376]
>  . 
> This is because the [getRecords 
> API|https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_streams_GetRecords.html]
>  in dynamodb streams does not return the millisBehindLatest field and has it 
> set to null.  Null check is probably needed here.
> See FLINK-4582 for the context of building dynamodb streams connector on top 
> of the Kinesis connector. 
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10358) Flink kinesis connector could throw NPE during getRecords() call

2018-09-17 Thread Ying Xu (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10358?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ying Xu updated FLINK-10358:

Description: 
When extending the flink kinesis connector to consume from a dynamodb stream, 
it was found NPE could be thrown at 
[here|https://github.com/apache/flink/blob/e3c98f05d3544d0165c2d97d2d00fcd295cef8c8/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java#L376]
 . 

This is because the [getRecords 
API|https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_streams_GetRecords.html]
 in dynamodb streams does not return the millisBehindLatest field and has it 
set to null.  Null check is probably needed here.

 

  was:
When extending the flink kinesis connector to consume from a dynamodb stream, 
it was found NPE could be thrown at 
[here|[https://github.com/apache/flink/blob/e3c98f05d3544d0165c2d97d2d00fcd295cef8c8/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java#L376]]
 . 

This is because the [getRecords 
API|https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_streams_GetRecords.html]
 in dynamodb streams does not return the millisBehindLatest field and has it 
set to null.  Null check is probably needed here.

 


> Flink kinesis connector could throw NPE during getRecords() call 
> -
>
> Key: FLINK-10358
> URL: https://issues.apache.org/jira/browse/FLINK-10358
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Reporter: Ying Xu
>Priority: Major
>  Labels: pull-request-available
>
> When extending the flink kinesis connector to consume from a dynamodb stream, 
> it was found NPE could be thrown at 
> [here|https://github.com/apache/flink/blob/e3c98f05d3544d0165c2d97d2d00fcd295cef8c8/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java#L376]
>  . 
> This is because the [getRecords 
> API|https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_streams_GetRecords.html]
>  in dynamodb streams does not return the millisBehindLatest field and has it 
> set to null.  Null check is probably needed here.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10358) Flink kinesis connector could throw NPE during getRecords() call

2018-09-17 Thread Ying Xu (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10358?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ying Xu updated FLINK-10358:

Description: 
When extending the flink kinesis connector to consume from a dynamodb stream, 
it was found NPE could be thrown at 
[here|[https://github.com/apache/flink/blob/e3c98f05d3544d0165c2d97d2d00fcd295cef8c8/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java#L376]]
 . 

This is because the [getRecords 
API|https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_streams_GetRecords.html]
 in dynamodb streams does not return the millisBehindLatest field and has it 
set to null.  Null check is probably needed here.

 

  was:
When extending the flink kinesis connector to consume from a dynamodb stream, 
it was found NPE could be thrown at [this 
line|[https://github.com/apache/flink/blob/e3c98f05d3544d0165c2d97d2d00fcd295cef8c8/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java#L376]]
 . 

This is because the [getRecords 
API|https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_streams_GetRecords.html]
 in dynamodb streams does not return the millisBehindLatest field and has it 
set to null.  Null check is probably needed here.

 


> Flink kinesis connector could throw NPE during getRecords() call 
> -
>
> Key: FLINK-10358
> URL: https://issues.apache.org/jira/browse/FLINK-10358
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Reporter: Ying Xu
>Priority: Major
>  Labels: pull-request-available
>
> When extending the flink kinesis connector to consume from a dynamodb stream, 
> it was found NPE could be thrown at 
> [here|[https://github.com/apache/flink/blob/e3c98f05d3544d0165c2d97d2d00fcd295cef8c8/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java#L376]]
>  . 
> This is because the [getRecords 
> API|https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_streams_GetRecords.html]
>  in dynamodb streams does not return the millisBehindLatest field and has it 
> set to null.  Null check is probably needed here.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10358) Flink kinesis connector could throw NPE during getRecords() call

2018-09-17 Thread Ying Xu (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10358?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ying Xu updated FLINK-10358:

Description: 
When extending the flink kinesis connector to consume from a dynamodb stream, 
it was found NPE could be thrown at [this 
line|[https://github.com/apache/flink/blob/e3c98f05d3544d0165c2d97d2d00fcd295cef8c8/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java#L376]]
 . 

This is because the [getRecords 
API|https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_streams_GetRecords.html]
 in dynamodb streams does not return the millisBehindLatest field and has it 
set to null.  Null check is probably needed here.

 

  was:
When extending the flink kinesis connector to consume from a dynamodb stream, 
it was found NPE could be thrown at [this 
line|[https://github.com/apache/flink/blob/e3c98f05d3544d0165c2d97d2d00fcd295cef8c8/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java#L376|https://github.com/apache/flink/blob/e3c98f05d3544d0165c2d97d2d00fcd295cef8c8/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java#L376].]]
 . 

This is because the [getRecords 
API|https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_streams_GetRecords.html]
 in dynamodb streams does not return the millisBehindLatest field and has it 
set to null.  Null check is probably needed here.

 


> Flink kinesis connector could throw NPE during getRecords() call 
> -
>
> Key: FLINK-10358
> URL: https://issues.apache.org/jira/browse/FLINK-10358
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Reporter: Ying Xu
>Priority: Major
>  Labels: pull-request-available
>
> When extending the flink kinesis connector to consume from a dynamodb stream, 
> it was found NPE could be thrown at [this 
> line|[https://github.com/apache/flink/blob/e3c98f05d3544d0165c2d97d2d00fcd295cef8c8/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java#L376]]
>  . 
> This is because the [getRecords 
> API|https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_streams_GetRecords.html]
>  in dynamodb streams does not return the millisBehindLatest field and has it 
> set to null.  Null check is probably needed here.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10358) Flink kinesis connector could throw NPE during getRecords() call

2018-09-17 Thread Ying Xu (JIRA)
Ying Xu created FLINK-10358:
---

 Summary: Flink kinesis connector could throw NPE during 
getRecords() call 
 Key: FLINK-10358
 URL: https://issues.apache.org/jira/browse/FLINK-10358
 Project: Flink
  Issue Type: Bug
  Components: Kinesis Connector
Reporter: Ying Xu


When extending the flink kinesis connector to consume from a dynamodb stream, 
it was found NPE could be thrown at [this 
line|[https://github.com/apache/flink/blob/e3c98f05d3544d0165c2d97d2d00fcd295cef8c8/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java#L376|https://github.com/apache/flink/blob/e3c98f05d3544d0165c2d97d2d00fcd295cef8c8/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java#L376].]]
 . 

This is because the [getRecords 
API|https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_streams_GetRecords.html]
 in dynamodb streams does not return the millisBehindLatest field and has it 
set to null.  Null check is probably needed here.

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-4582) Allow FlinkKinesisConsumer to adapt for AWS DynamoDB Streams

2018-07-27 Thread Ying Xu (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-4582?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16560327#comment-16560327
 ] 

Ying Xu commented on FLINK-4582:


[~tzulitai]  Just an update, we are very close to have a working version. 

> Allow FlinkKinesisConsumer to adapt for AWS DynamoDB Streams
> 
>
> Key: FLINK-4582
> URL: https://issues.apache.org/jira/browse/FLINK-4582
> Project: Flink
>  Issue Type: New Feature
>  Components: Kinesis Connector, Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>Priority: Major
>
> AWS DynamoDB is a NoSQL database service that has a CDC-like (change data 
> capture) feature called DynamoDB Streams 
> (http://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html),
>  which is a stream feed of item-level table activities.
> The DynamoDB Streams shard abstraction follows that of Kinesis Streams with 
> only a slight difference in resharding behaviours, so it is possible to build 
> on the internals of our Flink Kinesis Consumer for an exactly-once DynamoDB 
> Streams source.
> I propose an API something like this:
> {code}
> DataStream dynamoItemsCdc = 
>   FlinkKinesisConsumer.asDynamoDBStream(tableNames, schema, config)
> {code}
> The feature adds more connectivity to popular AWS services for Flink, and 
> combining what Flink has for exactly-once semantics, out-of-core state 
> backends, and queryable state with CDC can have very strong use cases. For 
> this feature there should only be an extra dependency to the AWS Java SDK for 
> DynamoDB, which has Apache License 2.0.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-4582) Allow FlinkKinesisConsumer to adapt for AWS DynamoDB Streams

2018-06-27 Thread Ying Xu (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-4582?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16525650#comment-16525650
 ] 

Ying Xu commented on FLINK-4582:


Hi: 

[~tzulitai]  [~mingdaoy]  

I'm following up on this JIRA as we currently have a production use case which 
requires injecting the DynamoDB changelogs into Kafka. Interested in 
contributing to related efforts as well. I have raised a request on the dev 
mailing list ([raw 
message|https://mail-archives.apache.org/mod_mbox/flink-dev/201806.mbox/raw/%3CCAJ5M44_FC8u713SWHCZx02FtEfyM8RpDF%2BeTNS9W%3DTC4JkVicQ%40mail.gmail.com%3E])
 

 

Thanks. 

> Allow FlinkKinesisConsumer to adapt for AWS DynamoDB Streams
> 
>
> Key: FLINK-4582
> URL: https://issues.apache.org/jira/browse/FLINK-4582
> Project: Flink
>  Issue Type: New Feature
>  Components: Kinesis Connector, Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Mingdao Yang
>Priority: Major
>
> AWS DynamoDB is a NoSQL database service that has a CDC-like (change data 
> capture) feature called DynamoDB Streams 
> (http://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html),
>  which is a stream feed of item-level table activities.
> The DynamoDB Streams shard abstraction follows that of Kinesis Streams with 
> only a slight difference in resharding behaviours, so it is possible to build 
> on the internals of our Flink Kinesis Consumer for an exactly-once DynamoDB 
> Streams source.
> I propose an API something like this:
> {code}
> DataStream dynamoItemsCdc = 
>   FlinkKinesisConsumer.asDynamoDBStream(tableNames, schema, config)
> {code}
> The feature adds more connectivity to popular AWS services for Flink, and 
> combining what Flink has for exactly-once semantics, out-of-core state 
> backends, and queryable state with CDC can have very strong use cases. For 
> this feature there should only be an extra dependency to the AWS Java SDK for 
> DynamoDB, which has Apache License 2.0.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)