[jira] [Updated] (FLINK-22497) When using DefaultRollingPolicy in StreamingFileSink, the file will be finished delayed

2021-06-07 Thread Flink Jira Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22497?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flink Jira Bot updated FLINK-22497:
---
  Labels: auto-deprioritized-major  (was: stale-major)
Priority: Minor  (was: Major)

This issue was labeled "stale-major" 7 ago and has not received any updates so 
it is being deprioritized. If this ticket is actually Major, please raise the 
priority and ask a committer to assign you the issue or revive the public 
discussion.


> When using DefaultRollingPolicy in StreamingFileSink, the file will be 
> finished delayed
> ---
>
> Key: FLINK-22497
> URL: https://issues.apache.org/jira/browse/FLINK-22497
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / FileSystem
>Affects Versions: 1.11.2
> Environment: hadoop-2.8.4,Flink-1.11.2
>Reporter: ChangjiGuo
>Priority: Minor
>  Labels: auto-deprioritized-major
> Attachments: 1.png
>
>
> I had a doubt when testing StreamingFileSink:
> The default 60s rolling interval in DefaultRollingPolicy is detected by 
> procTimeService. If the rolling interval is not met this time, it will be 
> delayed to the next timer trigger point (after 60s), so this is not real-time 
> and does not match the maximum duration. For example, if the checkpoint 
> period is set to 60s, the file should be converted to finished at the second 
> checkpoint, but it will be delayed to the third checkpoint.
> You can refer to the attached picture for detail.
> If we add rollingPolicy.shouldRollOnProcessingTime to the if condition of 
> Bucket.write method, the file will be set to finished as we expect at the 
> second checkpoint.
> {code:java}
> void write(IN element, long currentTime) throws IOException {
> if (inProgressPart == null || 
> rollingPolicy.shouldRollOnEvent(inProgressPart, element) || 
> rollingPolicy.shouldRollOnProcessingTime(inProgressPart, currentTime)) {
> if (LOG.isDebugEnabled()) {
> LOG.info("Subtask {} closing in-progress part file for bucket 
> id={} due to element {}.", subtaskIndex, bucketId, element);
>   }
>   rollPartFile(currentTime);
> }
> inProgressPart.write(element, currentTime);
> }
> {code}
> Maybe we can replace periodic detection with this?
> Is my understanding correct? Or can we do this? 
>  Thanks! ^_^



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-22497) When using DefaultRollingPolicy in StreamingFileSink, the file will be finished delayed

2021-05-30 Thread Flink Jira Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22497?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flink Jira Bot updated FLINK-22497:
---
Labels: stale-major  (was: )

I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help 
the community manage its development. I see this issues has been marked as 
Major but is unassigned and neither itself nor its Sub-Tasks have been updated 
for 30 days. I have gone ahead and added a "stale-major" to the issue". If this 
ticket is a Major, please either assign yourself or give an update. Afterwards, 
please remove the label or in 7 days the issue will be deprioritized.


> When using DefaultRollingPolicy in StreamingFileSink, the file will be 
> finished delayed
> ---
>
> Key: FLINK-22497
> URL: https://issues.apache.org/jira/browse/FLINK-22497
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / FileSystem
>Affects Versions: 1.11.2
> Environment: hadoop-2.8.4,Flink-1.11.2
>Reporter: ChangjiGuo
>Priority: Major
>  Labels: stale-major
> Attachments: 1.png
>
>
> I had a doubt when testing StreamingFileSink:
> The default 60s rolling interval in DefaultRollingPolicy is detected by 
> procTimeService. If the rolling interval is not met this time, it will be 
> delayed to the next timer trigger point (after 60s), so this is not real-time 
> and does not match the maximum duration. For example, if the checkpoint 
> period is set to 60s, the file should be converted to finished at the second 
> checkpoint, but it will be delayed to the third checkpoint.
> You can refer to the attached picture for detail.
> If we add rollingPolicy.shouldRollOnProcessingTime to the if condition of 
> Bucket.write method, the file will be set to finished as we expect at the 
> second checkpoint.
> {code:java}
> void write(IN element, long currentTime) throws IOException {
> if (inProgressPart == null || 
> rollingPolicy.shouldRollOnEvent(inProgressPart, element) || 
> rollingPolicy.shouldRollOnProcessingTime(inProgressPart, currentTime)) {
> if (LOG.isDebugEnabled()) {
> LOG.info("Subtask {} closing in-progress part file for bucket 
> id={} due to element {}.", subtaskIndex, bucketId, element);
>   }
>   rollPartFile(currentTime);
> }
> inProgressPart.write(element, currentTime);
> }
> {code}
> Maybe we can replace periodic detection with this?
> Is my understanding correct? Or can we do this? 
>  Thanks! ^_^



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-22497) When using DefaultRollingPolicy in StreamingFileSink, the file will be finished delayed

2021-04-30 Thread ChangjiGuo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22497?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ChangjiGuo updated FLINK-22497:
---
Description: 
I had a doubt when testing StreamingFileSink:

The default 60s rolling interval in DefaultRollingPolicy is detected by 
procTimeService. If the rolling interval is not met this time, it will be 
delayed to the next timer trigger point (after 60s), so this is not real-time 
and does not match the maximum duration. For example, if the checkpoint period 
is set to 60s, the file should be converted to finished at the second 
checkpoint, but it will be delayed to the third checkpoint.

You can refer to the attached picture for detail.

If we add rollingPolicy.shouldRollOnProcessingTime to the if condition of 
Bucket.write method, the file will be set to finished as we expect at the 
second checkpoint.
{code:java}
void write(IN element, long currentTime) throws IOException {
if (inProgressPart == null || 
rollingPolicy.shouldRollOnEvent(inProgressPart, element) || 
rollingPolicy.shouldRollOnProcessingTime(inProgressPart, currentTime)) {
if (LOG.isDebugEnabled()) {
LOG.info("Subtask {} closing in-progress part file for bucket id={} 
due to element {}.", subtaskIndex, bucketId, element);
}
rollPartFile(currentTime);
}
inProgressPart.write(element, currentTime);
}
{code}
Maybe we can replace periodic detection with this?

Is my understanding correct? Or can we do this? 
 Thanks! ^_^

  was:
I had a doubt when testing StreamingFileSink:

The default 60s rolling interval in DefaultRollingPolicy is detected by 
procTimeService. If the rolling interval is not met this time, it will be 
delayed to the next timer trigger point (after 60s), so this is not real-time 
and does not match the maximum duration. For example, if the checkpoint period 
is set to 60s, the file should be converted to finished at the second 
checkpoint, but it will be delayed to the third checkpoint.

You can refer to the attached picture for detail.

If we add rollingPolicy.shouldRollOnProcessingTime to the if condition of 
Bucket.write method, the file will be set to finished as we expect at the 
second checkpoint.
{code:java}
void write(IN element, long currentTime) throws IOException {
if (inProgressPart == null || 
rollingPolicy.shouldRollOnEvent(inProgressPart, element) || 
rollingPolicy.shouldRollOnProcessingTime(inProgressPart, currentTime)) {
if (LOG.isDebugEnabled()) {
LOG.info("Subtask {} closing in-progress part file for bucket id={} 
due to element {}.", subtaskIndex, bucketId, element);
}
rollPartFile(currentTime);
}
inProgressPart.write(element, currentTime);
}
{code}
 
 Is my understanding correct? Or can we do this? 
 Thanks! ^_^


> When using DefaultRollingPolicy in StreamingFileSink, the file will be 
> finished delayed
> ---
>
> Key: FLINK-22497
> URL: https://issues.apache.org/jira/browse/FLINK-22497
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / FileSystem
>Affects Versions: 1.11.2
> Environment: hadoop-2.8.4,Flink-1.11.2
>Reporter: ChangjiGuo
>Priority: Major
> Attachments: 1.png
>
>
> I had a doubt when testing StreamingFileSink:
> The default 60s rolling interval in DefaultRollingPolicy is detected by 
> procTimeService. If the rolling interval is not met this time, it will be 
> delayed to the next timer trigger point (after 60s), so this is not real-time 
> and does not match the maximum duration. For example, if the checkpoint 
> period is set to 60s, the file should be converted to finished at the second 
> checkpoint, but it will be delayed to the third checkpoint.
> You can refer to the attached picture for detail.
> If we add rollingPolicy.shouldRollOnProcessingTime to the if condition of 
> Bucket.write method, the file will be set to finished as we expect at the 
> second checkpoint.
> {code:java}
> void write(IN element, long currentTime) throws IOException {
> if (inProgressPart == null || 
> rollingPolicy.shouldRollOnEvent(inProgressPart, element) || 
> rollingPolicy.shouldRollOnProcessingTime(inProgressPart, currentTime)) {
> if (LOG.isDebugEnabled()) {
> LOG.info("Subtask {} closing in-progress part file for bucket 
> id={} due to element {}.", subtaskIndex, bucketId, element);
>   }
>   rollPartFile(currentTime);
> }
> inProgressPart.write(element, currentTime);
> }
> {code}
> Maybe we can replace periodic detection with this?
> Is my understanding correct? Or can we do this? 
>  Thanks! ^_^



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-22497) When using DefaultRollingPolicy in StreamingFileSink, the file will be finished delayed

2021-04-29 Thread ChangjiGuo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22497?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ChangjiGuo updated FLINK-22497:
---
Description: 
I had a doubt when testing StreamingFileSink:

The default 60s rolling interval in DefaultRollingPolicy is detected by 
procTimeService. If the rolling interval is not met this time, it will be 
delayed to the next timer trigger point (after 60s), so this is not real-time 
and does not match the maximum duration. For example, if the checkpoint period 
is set to 60s, the file should be converted to finished at the second 
checkpoint, but it will be delayed to the third checkpoint.

You can refer to the attached picture for detail.

If we add rollingPolicy.shouldRollOnProcessingTime to the if condition of 
Bucket.write method, the file will be set to finished as we expect at the 
second checkpoint.
{code:java}
void write(IN element, long currentTime) throws IOException {
if (inProgressPart == null || 
rollingPolicy.shouldRollOnEvent(inProgressPart, element) || 
rollingPolicy.shouldRollOnProcessingTime(inProgressPart, currentTime)) {
if (LOG.isDebugEnabled()) {
LOG.info("Subtask {} closing in-progress part file for bucket id={} 
due to element {}.", subtaskIndex, bucketId, element);
}
rollPartFile(currentTime);
}
inProgressPart.write(element, currentTime);
}
{code}
 
 Is my understanding correct? Or can we do this? 
 Thanks! ^_^

  was:
I had a doubt when testing StreamingFileSink:

The default 60s rolling interval in DefaultRollingPolicy is detected by 
procTimeService. If the rolling interval is not met this time, it will be 
delayed to the next timer trigger point (after 60s), so this is not real-time. 
For example, if the checkpoint period is set to 60s, the file should be 
converted to finished at the second checkpoint, but it will be delayed to the 
third checkpoint.

You can refer to the attached picture for detail.

If we add rollingPolicy.shouldRollOnProcessingTime to the if condition of 
Bucket.write method, the file will be set to finished as we expect at the 
second checkpoint.
{code:java}
void write(IN element, long currentTime) throws IOException {
if (inProgressPart == null || 
rollingPolicy.shouldRollOnEvent(inProgressPart, element) || 
rollingPolicy.shouldRollOnProcessingTime(inProgressPart, currentTime)) {
if (LOG.isDebugEnabled()) {
LOG.info("Subtask {} closing in-progress part file for bucket id={} 
due to element {}.", subtaskIndex, bucketId, element);
}
rollPartFile(currentTime);
}
inProgressPart.write(element, currentTime);
}
{code}
 
 Is my understanding correct? Or can we do this? 
 Thanks! ^_^


> When using DefaultRollingPolicy in StreamingFileSink, the file will be 
> finished delayed
> ---
>
> Key: FLINK-22497
> URL: https://issues.apache.org/jira/browse/FLINK-22497
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / FileSystem
>Affects Versions: 1.11.2
> Environment: hadoop-2.8.4,Flink-1.11.2
>Reporter: ChangjiGuo
>Priority: Major
> Attachments: 1.png
>
>
> I had a doubt when testing StreamingFileSink:
> The default 60s rolling interval in DefaultRollingPolicy is detected by 
> procTimeService. If the rolling interval is not met this time, it will be 
> delayed to the next timer trigger point (after 60s), so this is not real-time 
> and does not match the maximum duration. For example, if the checkpoint 
> period is set to 60s, the file should be converted to finished at the second 
> checkpoint, but it will be delayed to the third checkpoint.
> You can refer to the attached picture for detail.
> If we add rollingPolicy.shouldRollOnProcessingTime to the if condition of 
> Bucket.write method, the file will be set to finished as we expect at the 
> second checkpoint.
> {code:java}
> void write(IN element, long currentTime) throws IOException {
> if (inProgressPart == null || 
> rollingPolicy.shouldRollOnEvent(inProgressPart, element) || 
> rollingPolicy.shouldRollOnProcessingTime(inProgressPart, currentTime)) {
> if (LOG.isDebugEnabled()) {
> LOG.info("Subtask {} closing in-progress part file for bucket 
> id={} due to element {}.", subtaskIndex, bucketId, element);
>   }
>   rollPartFile(currentTime);
> }
> inProgressPart.write(element, currentTime);
> }
> {code}
>  
>  Is my understanding correct? Or can we do this? 
>  Thanks! ^_^



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-22497) When using DefaultRollingPolicy in StreamingFileSink, the file will be finished delayed

2021-04-28 Thread ChangjiGuo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22497?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ChangjiGuo updated FLINK-22497:
---
Environment: hadoop-2.8.4,Flink-1.11.2  (was: hadoop-2.8.4)

> When using DefaultRollingPolicy in StreamingFileSink, the file will be 
> finished delayed
> ---
>
> Key: FLINK-22497
> URL: https://issues.apache.org/jira/browse/FLINK-22497
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / FileSystem
>Affects Versions: 1.11.2
> Environment: hadoop-2.8.4,Flink-1.11.2
>Reporter: ChangjiGuo
>Priority: Major
> Attachments: 1.png
>
>
> I had a doubt when testing StreamingFileSink:
> The default 60s rolling interval in DefaultRollingPolicy is detected by 
> procTimeService. If the rolling interval is not met this time, it will be 
> delayed to the next timer trigger point (after 60s), so this is not 
> real-time. For example, if the checkpoint period is set to 60s, the file 
> should be converted to finished at the second checkpoint, but it will be 
> delayed to the third checkpoint.
> You can refer to the attached picture for detail.
> If we add rollingPolicy.shouldRollOnProcessingTime to the if condition of 
> Bucket.write method, the file will be set to finished as we expect at the 
> second checkpoint.
> {code:java}
> void write(IN element, long currentTime) throws IOException {
> if (inProgressPart == null || 
> rollingPolicy.shouldRollOnEvent(inProgressPart, element) || 
> rollingPolicy.shouldRollOnProcessingTime(inProgressPart, currentTime)) {
> if (LOG.isDebugEnabled()) {
> LOG.info("Subtask {} closing in-progress part file for bucket 
> id={} due to element {}.", subtaskIndex, bucketId, element);
>   }
>   rollPartFile(currentTime);
> }
> inProgressPart.write(element, currentTime);
> }
> {code}
>  
>  Is my understanding correct? Or can we do this? 
>  Thanks! ^_^



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-22497) When using DefaultRollingPolicy in StreamingFileSink, the file will be finished delayed

2021-04-28 Thread ChangjiGuo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22497?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ChangjiGuo updated FLINK-22497:
---
Description: 
I had a doubt when testing StreamingFileSink:

The default 60s rolling interval in DefaultRollingPolicy is detected by 
procTimeService. If the rolling interval is not met this time, it will be 
delayed to the next timer trigger point (after 60s), so this is not real-time. 
For example, if the checkpoint period is set to 60s, the file should be 
converted to finished at the second checkpoint, but it will be delayed to the 
third checkpoint.

You can refer to the attached picture for detail.

If we add rollingPolicy.shouldRollOnProcessingTime to the if condition of 
Bucket.write method, the file will be set to finished as we expect at the 
second checkpoint.
{code:java}
void write(IN element, long currentTime) throws IOException {
if (inProgressPart == null || 
rollingPolicy.shouldRollOnEvent(inProgressPart, element) || 
rollingPolicy.shouldRollOnProcessingTime(inProgressPart, currentTime)) {
if (LOG.isDebugEnabled()) {
LOG.info("Subtask {} closing in-progress part file for bucket id={} 
due to element {}.", subtaskIndex, bucketId, element);
}
rollPartFile(currentTime);
}
inProgressPart.write(element, currentTime);
}
{code}
 
 Is my understanding correct? Or can we do this? 
 Thanks! ^_^

  was:
I had a doubt when testing StreamingFileSink:

The default 60s rolling interval in DefaultRollingPolicy is detected by 
procTimeService. If the rolling interval is not met this time, it will be 
delayed to the next timer trigger point (after 60s), so this is not real-time. 
For example, if the checkpoint period is set to 60s, the file should be 
converted to finished at the second checkpoint, but it will be delayed to the 
third checkpoint.

You can refer to the attached picture for detail.

If we add rollingPolicy.shouldRollOnProcessingTime to the if condition of 
Bucket.write method, the file will be set to finished as we expect at the 
second checkpoint.
{code:java}
void write(IN element, long currentTime) throws IOException {
if (inProgressPart == null || 
rollingPolicy.shouldRollOnEvent(inProgressPart, element)
|| rollingPolicy.shouldRollOnProcessingTime(inProgressPart, 
currentTime)) {
if (LOG.isDebugEnabled()) {
LOG.info("Subtask {} closing in-progress part file for 
bucket id={} due to element {}.", subtaskIndex, bucketId, element);
}
rollPartFile(currentTime);
}
inProgressPart.write(element, currentTime);
}
{code}
 
 Is my understanding correct? Or can we do this? 
 Thanks! ^_^


> When using DefaultRollingPolicy in StreamingFileSink, the file will be 
> finished delayed
> ---
>
> Key: FLINK-22497
> URL: https://issues.apache.org/jira/browse/FLINK-22497
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / FileSystem
>Affects Versions: 1.11.2
> Environment: hadoop-2.8.4
>Reporter: ChangjiGuo
>Priority: Major
> Attachments: 1.png
>
>
> I had a doubt when testing StreamingFileSink:
> The default 60s rolling interval in DefaultRollingPolicy is detected by 
> procTimeService. If the rolling interval is not met this time, it will be 
> delayed to the next timer trigger point (after 60s), so this is not 
> real-time. For example, if the checkpoint period is set to 60s, the file 
> should be converted to finished at the second checkpoint, but it will be 
> delayed to the third checkpoint.
> You can refer to the attached picture for detail.
> If we add rollingPolicy.shouldRollOnProcessingTime to the if condition of 
> Bucket.write method, the file will be set to finished as we expect at the 
> second checkpoint.
> {code:java}
> void write(IN element, long currentTime) throws IOException {
> if (inProgressPart == null || 
> rollingPolicy.shouldRollOnEvent(inProgressPart, element) || 
> rollingPolicy.shouldRollOnProcessingTime(inProgressPart, currentTime)) {
> if (LOG.isDebugEnabled()) {
> LOG.info("Subtask {} closing in-progress part file for bucket 
> id={} due to element {}.", subtaskIndex, bucketId, element);
>   }
>   rollPartFile(currentTime);
> }
> inProgressPart.write(element, currentTime);
> }
> {code}
>  
>  Is my understanding correct? Or can we do this? 
>  Thanks! ^_^



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-22497) When using DefaultRollingPolicy in StreamingFileSink, the file will be finished delayed

2021-04-28 Thread ChangjiGuo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22497?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ChangjiGuo updated FLINK-22497:
---
Description: 
I had a doubt when testing StreamingFileSink:

The default 60s rolling interval in DefaultRollingPolicy is detected by 
procTimeService. If the rolling interval is not met this time, it will be 
delayed to the next timer trigger point (after 60s), so this is not real-time. 
For example, if the checkpoint period is set to 60s, the file should be 
converted to finished at the second checkpoint, but it will be delayed to the 
third checkpoint.

You can refer to the attached picture for detail.

If we add rollingPolicy.shouldRollOnProcessingTime to the if condition of 
Bucket.write method, the file will be set to finished as we expect at the 
second checkpoint.
{code:java}
void write(IN element, long currentTime) throws IOException {
if (inProgressPart == null || 
rollingPolicy.shouldRollOnEvent(inProgressPart, element)
|| rollingPolicy.shouldRollOnProcessingTime(inProgressPart, 
currentTime)) {
if (LOG.isDebugEnabled()) {
LOG.info("Subtask {} closing in-progress part file for 
bucket id={} due to element {}.", subtaskIndex, bucketId, element);
}
rollPartFile(currentTime);
}
inProgressPart.write(element, currentTime);
}
{code}
 
 Is my understanding correct? Or can we do this? 
 Thanks! ^_^

  was:
I had a doubt when testing StreamingFileSink:

The default 60s rolling interval in DefaultRollingPolicy is detected by 
procTimeService. If the rolling interval is not met this time, it will be 
delayed to the next timer trigger point (after 60s), so this is not real-time. 
For example, if the checkpoint period is set to 60s, the file should be 
converted to finished at the second checkpoint, but it will be delayed to the 
third checkpoint.

You can refer to the attached picture for detail.

If we add rollingPolicy.shouldRollOnProcessingTime to the if condition of 
Bucket.write method, the file will be set to finished in the second checkpoint.
{code:java}
void write(IN element, long currentTime) throws IOException {
if (inProgressPart == null || 
rollingPolicy.shouldRollOnEvent(inProgressPart, element)
||rollingPolicy.shouldRollOnProcessingTime(inProgressPart, 
currentTime)) {
if (LOG.isDebugEnabled()) {
LOG.info("Subtask {} closing in-progress part file for 
bucket id={} due to element {}.",
subtaskIndex, bucketId, element);
}
rollPartFile(currentTime);
}
inProgressPart.write(element, currentTime);
}
{code}
 
Is my understanding correct? 
Thanks! ^_^


> When using DefaultRollingPolicy in StreamingFileSink, the file will be 
> finished delayed
> ---
>
> Key: FLINK-22497
> URL: https://issues.apache.org/jira/browse/FLINK-22497
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / FileSystem
>Affects Versions: 1.11.2
> Environment: hadoop-2.8.4
>Reporter: ChangjiGuo
>Priority: Major
> Attachments: 1.png
>
>
> I had a doubt when testing StreamingFileSink:
> The default 60s rolling interval in DefaultRollingPolicy is detected by 
> procTimeService. If the rolling interval is not met this time, it will be 
> delayed to the next timer trigger point (after 60s), so this is not 
> real-time. For example, if the checkpoint period is set to 60s, the file 
> should be converted to finished at the second checkpoint, but it will be 
> delayed to the third checkpoint.
> You can refer to the attached picture for detail.
> If we add rollingPolicy.shouldRollOnProcessingTime to the if condition of 
> Bucket.write method, the file will be set to finished as we expect at the 
> second checkpoint.
> {code:java}
> void write(IN element, long currentTime) throws IOException {
> if (inProgressPart == null || 
> rollingPolicy.shouldRollOnEvent(inProgressPart, element)
>   || rollingPolicy.shouldRollOnProcessingTime(inProgressPart, 
> currentTime)) {
>   if (LOG.isDebugEnabled()) {
>   LOG.info("Subtask {} closing in-progress part file for 
> bucket id={} due to element {}.", subtaskIndex, bucketId, element);
>   }
>   rollPartFile(currentTime);
>   }
>   inProgressPart.write(element, currentTime);
> }
> {code}
>  
>  Is my understanding correct? Or can we do this? 
>  Thanks! ^_^



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-22497) When using DefaultRollingPolicy in StreamingFileSink, the file will be finished delayed

2021-04-27 Thread ChangjiGuo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22497?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ChangjiGuo updated FLINK-22497:
---
Attachment: (was: 企业微信截图_43e36204-0a2f-4acd-ae17-56aa4d7661e4.png)

> When using DefaultRollingPolicy in StreamingFileSink, the file will be 
> finished delayed
> ---
>
> Key: FLINK-22497
> URL: https://issues.apache.org/jira/browse/FLINK-22497
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / FileSystem
>Affects Versions: 1.11.2
> Environment: hadoop-2.8.4
>Reporter: ChangjiGuo
>Priority: Major
> Attachments: 1.png
>
>
> I had a doubt when testing StreamingFileSink:
> The default 60s rolling interval in DefaultRollingPolicy is detected by 
> procTimeService. If the rolling interval is not met this time, it will be 
> delayed to the next timer trigger point (after 60s), so this is not 
> real-time. For example, if the checkpoint period is set to 60s, the file 
> should be converted to finished at the second checkpoint, but it will be 
> delayed to the third checkpoint.
> You can refer to the attached picture for detail.
> If we add rollingPolicy.shouldRollOnProcessingTime to the if condition of 
> Bucket.write method, the file will be set to finished in the second 
> checkpoint.
> {code:java}
> void write(IN element, long currentTime) throws IOException {
> if (inProgressPart == null || 
> rollingPolicy.shouldRollOnEvent(inProgressPart, element)
>   ||rollingPolicy.shouldRollOnProcessingTime(inProgressPart, 
> currentTime)) {
>   if (LOG.isDebugEnabled()) {
>   LOG.info("Subtask {} closing in-progress part file for 
> bucket id={} due to element {}.",
>   subtaskIndex, bucketId, element);
>   }
>   rollPartFile(currentTime);
>   }
>   inProgressPart.write(element, currentTime);
> }
> {code}
>  
> Is my understanding correct? 
> Thanks! ^_^



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-22497) When using DefaultRollingPolicy in StreamingFileSink, the file will be finished delayed

2021-04-27 Thread ChangjiGuo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22497?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ChangjiGuo updated FLINK-22497:
---
Attachment: 1.png

> When using DefaultRollingPolicy in StreamingFileSink, the file will be 
> finished delayed
> ---
>
> Key: FLINK-22497
> URL: https://issues.apache.org/jira/browse/FLINK-22497
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / FileSystem
>Affects Versions: 1.11.2
> Environment: hadoop-2.8.4
>Reporter: ChangjiGuo
>Priority: Major
> Attachments: 1.png
>
>
> I had a doubt when testing StreamingFileSink:
> The default 60s rolling interval in DefaultRollingPolicy is detected by 
> procTimeService. If the rolling interval is not met this time, it will be 
> delayed to the next timer trigger point (after 60s), so this is not 
> real-time. For example, if the checkpoint period is set to 60s, the file 
> should be converted to finished at the second checkpoint, but it will be 
> delayed to the third checkpoint.
> You can refer to the attached picture for detail.
> If we add rollingPolicy.shouldRollOnProcessingTime to the if condition of 
> Bucket.write method, the file will be set to finished in the second 
> checkpoint.
> {code:java}
> void write(IN element, long currentTime) throws IOException {
> if (inProgressPart == null || 
> rollingPolicy.shouldRollOnEvent(inProgressPart, element)
>   ||rollingPolicy.shouldRollOnProcessingTime(inProgressPart, 
> currentTime)) {
>   if (LOG.isDebugEnabled()) {
>   LOG.info("Subtask {} closing in-progress part file for 
> bucket id={} due to element {}.",
>   subtaskIndex, bucketId, element);
>   }
>   rollPartFile(currentTime);
>   }
>   inProgressPart.write(element, currentTime);
> }
> {code}
>  
> Is my understanding correct? 
> Thanks! ^_^



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-22497) When using DefaultRollingPolicy in StreamingFileSink, the file will be finished delayed

2021-04-27 Thread ChangjiGuo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22497?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ChangjiGuo updated FLINK-22497:
---
Attachment: 企业微信截图_43e36204-0a2f-4acd-ae17-56aa4d7661e4.png

> When using DefaultRollingPolicy in StreamingFileSink, the file will be 
> finished delayed
> ---
>
> Key: FLINK-22497
> URL: https://issues.apache.org/jira/browse/FLINK-22497
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / FileSystem
>Affects Versions: 1.11.2
> Environment: hadoop-2.8.4
>Reporter: ChangjiGuo
>Priority: Major
> Attachments: 1.png
>
>
> I had a doubt when testing StreamingFileSink:
> The default 60s rolling interval in DefaultRollingPolicy is detected by 
> procTimeService. If the rolling interval is not met this time, it will be 
> delayed to the next timer trigger point (after 60s), so this is not 
> real-time. For example, if the checkpoint period is set to 60s, the file 
> should be converted to finished at the second checkpoint, but it will be 
> delayed to the third checkpoint.
> You can refer to the attached picture for detail.
> If we add rollingPolicy.shouldRollOnProcessingTime to the if condition of 
> Bucket.write method, the file will be set to finished in the second 
> checkpoint.
> {code:java}
> void write(IN element, long currentTime) throws IOException {
> if (inProgressPart == null || 
> rollingPolicy.shouldRollOnEvent(inProgressPart, element)
>   ||rollingPolicy.shouldRollOnProcessingTime(inProgressPart, 
> currentTime)) {
>   if (LOG.isDebugEnabled()) {
>   LOG.info("Subtask {} closing in-progress part file for 
> bucket id={} due to element {}.",
>   subtaskIndex, bucketId, element);
>   }
>   rollPartFile(currentTime);
>   }
>   inProgressPart.write(element, currentTime);
> }
> {code}
>  
> Is my understanding correct? 
> Thanks! ^_^



--
This message was sent by Atlassian Jira
(v8.3.4#803005)