This is an automated email from the ASF dual-hosted git repository.

joellubi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/main by this push:
     new cc3e2db300 GH-41697: [Go][Parquet] Release BufferWriter when 
BufferedPageWriter is closed (#41698)
cc3e2db300 is described below

commit cc3e2db300947aaf777f1814ce5ee61f42410d4e
Author: Joel Lubinitsky <33523178+joell...@users.noreply.github.com>
AuthorDate: Mon May 20 09:14:50 2024 -0400

    GH-41697: [Go][Parquet] Release BufferWriter when BufferedPageWriter is 
closed (#41698)
    
    <!--
    Thanks for opening a pull request!
    If this is your first pull request you can find detailed information on
    how
    to contribute here:
    * [New Contributor's
    
Guide](https://arrow.apache.org/docs/dev/developers/guide/step_by_step/pr_lifecycle.html#reviews-and-merge-of-the-pull-request)
    * [Contributing
    Overview](https://arrow.apache.org/docs/dev/developers/overview.html)
    
    
    If this is not a [minor
    PR](https://github.com/apache/arrow/blob/main/CONTRIBUTING.md#Minor-Fixes).
    Could you open an issue for this pull request on GitHub?
    https://github.com/apache/arrow/issues/new/choose
    
    Opening GitHub issues ahead of time contributes to the
    
[Openness](http://theapacheway.com/open/#:~:text=Openness%20allows%20new%20users%20the,must%20happen%20in%20the%20open.)
    of the Apache Arrow project.
    
    Then could you also rename the pull request title in the following
    format?
    
        GH-${GITHUB_ISSUE_ID}: [${COMPONENT}] ${SUMMARY}
    
    or
    
        MINOR: [${COMPONENT}] ${SUMMARY}
    
    In the case of PARQUET issues on JIRA the title also supports:
    
        PARQUET-${JIRA_ISSUE_ID}: [${COMPONENT}] ${SUMMARY}
    
    -->
    
    ### Rationale for this change
    
    A small buffer gets reallocated after calling `Finish()`, causing that
    memory to leak.
    
    <!--
    Why are you proposing this change? If this is already explained clearly
    in the issue then this section is not needed.
    Explaining clearly why changes are proposed helps reviewers understand
    your changes and offer better suggestions for fixes.
    -->
    
    ### What changes are included in this PR?
    Release the buffer when the pagewriter is closed.
    
    <!--
    There is no need to duplicate the description in the issue here but it
    is sometimes worth providing a summary of the individual changes in this
    PR.
    -->
    
    ### Are these changes tested?
    Yes
    
    <!--
    We typically require tests for all PRs in order to:
    1. Prevent the code from being accidentally broken by subsequent changes
    2. Serve as another way to document the expected behavior of the code
    
    If tests are not included in your PR, please explain why (for example,
    are they covered by existing tests)?
    -->
    
    ### Are there any user-facing changes?
    Memory will not leak on this code path.
    
    <!--
    If there are user-facing changes then we may require documentation to be
    updated before approving the PR.
    -->
    
    <!--
    If there are any breaking changes to public APIs, please uncomment the
    line below and explain which changes are breaking.
    -->
    <!-- **This PR includes breaking changes to public APIs.** -->
    
    <!--
    Please uncomment the line below (and provide explanation) if the changes
    fix either (a) a security vulnerability, (b) a bug that caused incorrect
    or invalid data to be produced, or (c) a bug that causes a crash (even
    when the API contract is upheld). We use this to highlight fixes to
    issues that may affect users without their knowledge. For this reason,
    fixing bugs that cause errors don't count, since those are usually
    obvious.
    -->
    <!-- **This PR contains a "Critical Fix".** -->
    * GitHub Issue: #41697
---
 go/parquet/internal/encoding/types.go  |  7 +++++-
 go/parquet/pqarrow/file_writer_test.go | 42 ++++++++++++++++++++++++++++++++++
 2 files changed, 48 insertions(+), 1 deletion(-)

diff --git a/go/parquet/internal/encoding/types.go 
b/go/parquet/internal/encoding/types.go
index 147c1746c5..2d7a5d6b1d 100644
--- a/go/parquet/internal/encoding/types.go
+++ b/go/parquet/internal/encoding/types.go
@@ -361,11 +361,16 @@ func (b *BufferWriter) Truncate() {
 func (b *BufferWriter) Reset(initial int) {
        if b.buffer != nil {
                b.buffer.Release()
+       } else {
+               b.buffer = memory.NewResizableBuffer(b.mem)
        }
 
        b.pos = 0
        b.offset = 0
-       b.Reserve(initial)
+
+       if initial > 0 {
+               b.Reserve(initial)
+       }
 }
 
 // Reserve ensures that there is at least enough capacity to write nbytes
diff --git a/go/parquet/pqarrow/file_writer_test.go 
b/go/parquet/pqarrow/file_writer_test.go
index 425e4479f6..fc965279a9 100644
--- a/go/parquet/pqarrow/file_writer_test.go
+++ b/go/parquet/pqarrow/file_writer_test.go
@@ -18,6 +18,7 @@ package pqarrow_test
 
 import (
        "bytes"
+       "math"
        "strings"
        "testing"
 
@@ -87,3 +88,44 @@ func TestFileWriterNumRows(t *testing.T) {
        require.NoError(t, writer.Close())
        assert.Equal(t, 4, writer.NumRows())
 }
+
+func TestFileWriterBuffered(t *testing.T) {
+       schema := arrow.NewSchema([]arrow.Field{
+               {Name: "one", Nullable: true, Type: 
arrow.PrimitiveTypes.Float64},
+               {Name: "two", Nullable: true, Type: 
arrow.PrimitiveTypes.Float64},
+       }, nil)
+
+       data := `[
+               {"one": 1, "two": 2},
+               {"one": 1, "two": null},
+               {"one": null, "two": 2},
+               {"one": null, "two": null}
+       ]`
+
+       alloc := memory.NewCheckedAllocator(memory.DefaultAllocator)
+       defer alloc.AssertSize(t, 0)
+
+       record, _, err := array.RecordFromJSON(alloc, schema, 
strings.NewReader(data))
+       require.NoError(t, err)
+       defer record.Release()
+
+       output := &bytes.Buffer{}
+       writer, err := pqarrow.NewFileWriter(
+               schema,
+               output,
+               parquet.NewWriterProperties(
+                       parquet.WithAllocator(alloc),
+                       // Ensure enough space so we can close the writer with 
rows still buffered
+                       parquet.WithMaxRowGroupLength(math.MaxInt64),
+               ),
+               pqarrow.NewArrowWriterProperties(
+                       pqarrow.WithAllocator(alloc),
+               ),
+       )
+       require.NoError(t, err)
+
+       require.NoError(t, writer.WriteBuffered(record))
+
+       require.NoError(t, writer.Close())
+       assert.Equal(t, 4, writer.NumRows())
+}

Reply via email to