[jira] [Commented] (BAHIR-283) InfluxDBWriter fails to write the final element in each element

2023-03-07 Thread ASF subversion and git services (Jira)


[ 
https://issues.apache.org/jira/browse/BAHIR-283?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17697474#comment-17697474
 ] 

ASF subversion and git services commented on BAHIR-283:
---

Commit c8b6f6122ae6411064a1ec55cf2c2fbc08d4979f in bahir-flink's branch 
refs/heads/BAHIR-308 from dave
[ https://gitbox.apache.org/repos/asf?p=bahir-flink.git;h=c8b6f61 ]

[BAHIR-283] Fix dropped elements on InfluxDbSink



> InfluxDBWriter fails to write the final element in each element
> ---
>
> Key: BAHIR-283
> URL: https://issues.apache.org/jira/browse/BAHIR-283
> Project: Bahir
>  Issue Type: Bug
>  Components: Flink Streaming Connectors
>Affects Versions: Flink-1.0
>Reporter: David Quigley
>Priority: Major
> Fix For: Flink-1.2.0
>
>   Original Estimate: 2h
>  Remaining Estimate: 2h
>
> {{/**
>  * This method calls the InfluxDB write API whenever the element list 
> reaches the {@link
>  * #bufferSize}. It keeps track of the latest timestamp of each element. 
> It compares the latest
>  * timestamp with the context.timestamp() and takes the bigger (latest) 
> timestamp.
>  *
>  * @param in incoming data
>  * @param context current Flink context
>  * @see org.apache.flink.api.connector.sink.SinkWriter.Context
>  */
> @Override
> public void write(final IN in, final Context context) throws IOException {
> if (this.elements.size() == this.bufferSize) {
> LOG.debug("Buffer size reached preparing to write the elements.");
> this.writeCurrentElements();
> this.elements.clear();
> } else {
> LOG.trace("Adding elements to buffer. Buffer size: {}", 
> this.elements.size());
> this.elements.add(this.schemaSerializer.serialize(in, context));
> if (context.timestamp() != null) {
> this.lastTimestamp = Math.max(this.lastTimestamp, 
> context.timestamp());
> }
> }
> }}}
> The bug is in this write method. If the number of elements in the buffer is 
> less than the configured buffer size, the current element is added to the 
> buffer. If the number of elements in the buffer is equal to the buffer size, 
> the buffer is flushed and the current element is not added to the next 
> buffer. This results in the current element being dropped.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (BAHIR-283) InfluxDBWriter fails to write the final element in each element

2023-03-07 Thread ASF subversion and git services (Jira)


[ 
https://issues.apache.org/jira/browse/BAHIR-283?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17697472#comment-17697472
 ] 

ASF subversion and git services commented on BAHIR-283:
---

Commit c8b6f6122ae6411064a1ec55cf2c2fbc08d4979f in bahir-flink's branch 
refs/heads/master from dave
[ https://gitbox.apache.org/repos/asf?p=bahir-flink.git;h=c8b6f61 ]

[BAHIR-283] Fix dropped elements on InfluxDbSink



> InfluxDBWriter fails to write the final element in each element
> ---
>
> Key: BAHIR-283
> URL: https://issues.apache.org/jira/browse/BAHIR-283
> Project: Bahir
>  Issue Type: Bug
>  Components: Flink Streaming Connectors
>Affects Versions: Flink-1.0
>Reporter: David Quigley
>Priority: Major
> Fix For: Flink-1.2.0
>
>   Original Estimate: 2h
>  Remaining Estimate: 2h
>
> {{/**
>  * This method calls the InfluxDB write API whenever the element list 
> reaches the {@link
>  * #bufferSize}. It keeps track of the latest timestamp of each element. 
> It compares the latest
>  * timestamp with the context.timestamp() and takes the bigger (latest) 
> timestamp.
>  *
>  * @param in incoming data
>  * @param context current Flink context
>  * @see org.apache.flink.api.connector.sink.SinkWriter.Context
>  */
> @Override
> public void write(final IN in, final Context context) throws IOException {
> if (this.elements.size() == this.bufferSize) {
> LOG.debug("Buffer size reached preparing to write the elements.");
> this.writeCurrentElements();
> this.elements.clear();
> } else {
> LOG.trace("Adding elements to buffer. Buffer size: {}", 
> this.elements.size());
> this.elements.add(this.schemaSerializer.serialize(in, context));
> if (context.timestamp() != null) {
> this.lastTimestamp = Math.max(this.lastTimestamp, 
> context.timestamp());
> }
> }
> }}}
> The bug is in this write method. If the number of elements in the buffer is 
> less than the configured buffer size, the current element is added to the 
> buffer. If the number of elements in the buffer is equal to the buffer size, 
> the buffer is flushed and the current element is not added to the next 
> buffer. This results in the current element being dropped.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)