Re: [PR] feat: support append data file and add e2e test [iceberg-rust]

2024-12-01 Thread via GitHub


liurenjie1024 commented on PR #349:
URL: https://github.com/apache/iceberg-rust/pull/349#issuecomment-2510452060

   > @ZENOTME thanks for getting back to me here. I don't want to be the one 
holding this up, so I would suggest that we get this in. There are still some 
gaps I see because I think it is pretty easy to brick a table by adding some 
incompatible files. I'll raise new issues for that so we can pick up those 
independently.
   > 
   > Any concerns @c-thiel @liurenjie1024 @Xuanwo ?
   
   Hi, @Fokko sorry for late reply, I took a pr and agree that there are still 
some improvements, and we can resolve them in follow up issues.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] feat: support append data file and add e2e test [iceberg-rust]

2024-12-01 Thread via GitHub


liurenjie1024 commented on code in PR #349:
URL: https://github.com/apache/iceberg-rust/pull/349#discussion_r1865150243


##
crates/iceberg/src/transaction.rs:
##
@@ -122,6 +172,365 @@ impl<'a> Transaction<'a> {
 }
 }
 
+/// FastAppendAction is a transaction action for fast append data files to the 
table.
+pub struct FastAppendAction<'a> {
+snapshot_produce_action: SnapshotProduceAction<'a>,
+}
+
+impl<'a> FastAppendAction<'a> {
+#[allow(clippy::too_many_arguments)]
+pub(crate) fn new(
+tx: Transaction<'a>,
+snapshot_id: i64,
+commit_uuid: Uuid,
+key_metadata: Vec,
+snapshot_properties: HashMap,
+) -> Result {
+Ok(Self {
+snapshot_produce_action: SnapshotProduceAction::new(
+tx,
+snapshot_id,
+key_metadata,
+commit_uuid,
+snapshot_properties,
+)?,
+})
+}
+
+/// Add data files to the snapshot.
+pub fn add_data_files(
+&mut self,
+data_files: impl IntoIterator,
+) -> Result<&mut Self> {
+self.snapshot_produce_action.add_data_files(data_files)?;
+Ok(self)
+}
+
+/// Finished building the action and apply it to the transaction.
+pub async fn apply(self) -> Result> {
+self.snapshot_produce_action
+.apply(FastAppendOperation, DefaultManifestProcess)
+.await
+}
+}
+
+struct FastAppendOperation;
+
+impl SnapshotProduceOperation for FastAppendOperation {
+fn operation(&self) -> Operation {
+Operation::Append
+}
+
+async fn delete_entries(
+&self,
+_snapshot_produce: &SnapshotProduceAction<'_>,
+) -> Result> {
+Ok(vec![])
+}
+
+async fn existing_manifest(
+&self,
+snapshot_produce: &SnapshotProduceAction<'_>,
+) -> Result> {
+let Some(snapshot) = 
snapshot_produce.tx.table.metadata().current_snapshot() else {
+return Ok(vec![]);
+};
+
+let manifest_list = snapshot
+.load_manifest_list(
+snapshot_produce.tx.table.file_io(),
+&snapshot_produce.tx.table.metadata_ref(),
+)
+.await?;
+
+Ok(manifest_list
+.entries()
+.iter()
+.filter(|entry| entry.has_added_files() || 
entry.has_existing_files())
+.cloned()
+.collect())
+}
+}
+
+trait SnapshotProduceOperation: Send + Sync {
+fn operation(&self) -> Operation;
+#[allow(unused)]
+fn delete_entries(
+&self,
+snapshot_produce: &SnapshotProduceAction,
+) -> impl Future>> + Send;
+fn existing_manifest(
+&self,
+snapshot_produce: &SnapshotProduceAction,
+) -> impl Future>> + Send;
+}
+
+struct DefaultManifestProcess;
+
+impl ManifestProcess for DefaultManifestProcess {
+fn process_manifeset(&self, manifests: Vec) -> 
Vec {
+manifests
+}
+}
+
+trait ManifestProcess: Send + Sync {
+fn process_manifeset(&self, manifests: Vec) -> 
Vec;
+}
+
+struct SnapshotProduceAction<'a> {

Review Comment:
   Usually an action is a transaction action, I think naming it 
`SnapshotProducer` would be enough?



##
crates/iceberg/src/transaction.rs:
##
@@ -122,6 +172,365 @@ impl<'a> Transaction<'a> {
 }
 }
 
+/// FastAppendAction is a transaction action for fast append data files to the 
table.
+pub struct FastAppendAction<'a> {

Review Comment:
   It would be better to move this part to a standalone module under 
transaction module.



##
crates/iceberg/src/transaction.rs:
##
@@ -122,6 +172,365 @@ impl<'a> Transaction<'a> {
 }
 }
 
+/// FastAppendAction is a transaction action for fast append data files to the 
table.
+pub struct FastAppendAction<'a> {
+snapshot_produce_action: SnapshotProduceAction<'a>,
+}
+
+impl<'a> FastAppendAction<'a> {
+#[allow(clippy::too_many_arguments)]
+pub(crate) fn new(
+tx: Transaction<'a>,
+snapshot_id: i64,
+commit_uuid: Uuid,
+key_metadata: Vec,
+snapshot_properties: HashMap,
+) -> Result {
+Ok(Self {
+snapshot_produce_action: SnapshotProduceAction::new(
+tx,
+snapshot_id,
+key_metadata,
+commit_uuid,
+snapshot_properties,
+)?,
+})
+}
+
+/// Add data files to the snapshot.
+pub fn add_data_files(
+&mut self,
+data_files: impl IntoIterator,
+) -> Result<&mut Self> {
+self.snapshot_produce_action.add_data_files(data_files)?;
+Ok(self)
+}
+
+/// Finished building the action and apply it to the transaction.
+pub async fn apply(self) -> Result> {
+self.snapshot_produce_action
+.apply(FastAppendOperation, DefaultManifestProcess)
+.await
+}
+}
+
+struct FastA

Re: [PR] feat: support append data file and add e2e test [iceberg-rust]

2024-11-28 Thread via GitHub


Fokko merged PR #349:
URL: https://github.com/apache/iceberg-rust/pull/349


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] feat: support append data file and add e2e test [iceberg-rust]

2024-11-27 Thread via GitHub


Fokko commented on PR #349:
URL: https://github.com/apache/iceberg-rust/pull/349#issuecomment-2503865388

   @ZENOTME thanks for getting back to me here. I don't want to be the one 
holding this up, so I would suggest that we get this in. There are still some 
gaps I see because I think it is pretty easy to brick a table by adding some 
incompatible files. I'll raise new issues for that so we can pick up those 
independently.
   
   Any concerns @c-thiel @liurenjie1024 @Xuanwo ?
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] feat: support append data file and add e2e test [iceberg-rust]

2024-11-27 Thread via GitHub


ZENOTME commented on PR #349:
URL: https://github.com/apache/iceberg-rust/pull/349#issuecomment-2503852172

   > @ZENOTME I think you're on Linux? I'm on Mac, and I think the docker 
networking is different.
   > 
   > First I have to change the networking quite a bit, because the loop that 
waits for the rest catalog to become ready is pinging a `172.` IP address, 
which is not responding, so it is a `while true`. If I change it to 
`127.0.0.1`, and expose the ports in docker, then I get the weird error around 
the security token.
   
   I use Mac M1. I try to change the following to `127.0.0.1` directly but it 
also works. 
   
https://github.com/apache/iceberg-rust/blob/6293aacddf60a3f846c98d7adb3fbb7e4a48f001/crates/integration_tests/src/lib.rs#L44
 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] feat: support append data file and add e2e test [iceberg-rust]

2024-11-27 Thread via GitHub


ZENOTME commented on code in PR #349:
URL: https://github.com/apache/iceberg-rust/pull/349#discussion_r1860543850


##
crates/iceberg/src/transaction.rs:
##
@@ -122,6 +189,383 @@ impl<'a> Transaction<'a> {
 }
 }
 
+/// FastAppendAction is a transaction action for fast append data files to the 
table.
+pub struct FastAppendAction<'a> {
+snapshot_produce_action: SnapshotProduceAction<'a>,
+}
+
+impl<'a> FastAppendAction<'a> {
+#[allow(clippy::too_many_arguments)]
+pub(crate) fn new(
+tx: Transaction<'a>,
+parent_snapshot_id: Option,
+snapshot_id: i64,
+schema: Schema,
+schema_id: i32,
+format_version: FormatVersion,
+partition_spec: Arc,
+key_metadata: Vec,
+commit_uuid: Uuid,
+snapshot_properties: HashMap,
+) -> Result {
+Ok(Self {
+snapshot_produce_action: SnapshotProduceAction::new(
+tx,
+snapshot_id,
+parent_snapshot_id,
+schema_id,
+format_version,
+partition_spec,
+schema,
+key_metadata,
+commit_uuid,
+snapshot_properties,
+)?,
+})
+}
+
+/// Add data files to the snapshot.
+pub fn add_data_files(
+&mut self,
+data_files: impl IntoIterator,
+) -> Result<&mut Self> {
+self.snapshot_produce_action.add_data_files(data_files)?;
+Ok(self)
+}
+
+/// Finished building the action and apply it to the transaction.
+pub async fn apply(self) -> Result> {
+self.snapshot_produce_action
+.apply(FastAppendOperation, DefaultManifestProcess)
+.await
+}
+}
+
+struct FastAppendOperation;
+
+impl SnapshotProduceOperation for FastAppendOperation {
+fn operation(&self) -> Operation {
+Operation::Append
+}
+
+async fn delete_entries(
+&self,
+_snapshot_produce: &SnapshotProduceAction<'_>,
+) -> Result> {
+Ok(vec![])
+}
+
+async fn existing_manifest(
+&self,
+snapshot_produce: &SnapshotProduceAction<'_>,
+) -> Result> {
+let Some(snapshot) = snapshot_produce
+.parent_snapshot_id
+.and_then(|id| 
snapshot_produce.tx.table.metadata().snapshot_by_id(id))
+else {
+return Ok(vec![]);
+};
+
+let manifest_list = snapshot
+.load_manifest_list(
+snapshot_produce.tx.table.file_io(),
+&snapshot_produce.tx.table.metadata_ref(),
+)
+.await?;
+
+Ok(manifest_list
+.entries()
+.iter()
+.filter(|entry| entry.has_added_files() || 
entry.has_existing_files())
+.cloned()
+.collect())
+}
+}
+
+trait SnapshotProduceOperation: Send + Sync {
+fn operation(&self) -> Operation;
+#[allow(unused)]
+fn delete_entries(
+&self,
+snapshot_produce: &SnapshotProduceAction,
+) -> impl Future>> + Send;
+fn existing_manifest(
+&self,
+snapshot_produce: &SnapshotProduceAction,
+) -> impl Future>> + Send;
+}
+
+struct DefaultManifestProcess;
+
+impl ManifestProcess for DefaultManifestProcess {
+fn process_manifeset(&self, manifests: Vec) -> 
Vec {
+manifests
+}
+}
+
+trait ManifestProcess: Send + Sync {
+fn process_manifeset(&self, manifests: Vec) -> 
Vec;
+}
+
+struct SnapshotProduceAction<'a> {
+tx: Transaction<'a>,
+
+parent_snapshot_id: Option,
+snapshot_id: i64,
+schema_id: i32,
+format_version: FormatVersion,
+partition_spec: Arc,
+schema: Schema,
+key_metadata: Vec,
+
+commit_uuid: Uuid,
+
+snapshot_properties: HashMap,
+added_data_files: Vec,
+
+// A counter used to generate unique manifest file names.
+// It starts from 0 and increments for each new manifest file.
+// Note: This counter is limited to the range of (0..u64::MAX).
+manifest_counter: RangeFrom,
+}
+
+impl<'a> SnapshotProduceAction<'a> {
+#[allow(clippy::too_many_arguments)]
+pub(crate) fn new(
+tx: Transaction<'a>,
+snapshot_id: i64,
+parent_snapshot_id: Option,
+schema_id: i32,
+format_version: FormatVersion,
+partition_spec: Arc,
+schema: Schema,
+key_metadata: Vec,
+commit_uuid: Uuid,
+snapshot_properties: HashMap,
+) -> Result {
+Ok(Self {
+tx,
+parent_snapshot_id,
+snapshot_id,
+schema_id,
+format_version,
+commit_uuid,
+snapshot_properties,
+added_data_files: vec![],
+manifest_counter: (0..),
+partition_spec,
+schema,
+key_metadata,
+})
+}
+
+// Check if the partition value is compatible with the partition type.
+  

Re: [PR] feat: support append data file and add e2e test [iceberg-rust]

2024-11-27 Thread via GitHub


Fokko commented on code in PR #349:
URL: https://github.com/apache/iceberg-rust/pull/349#discussion_r1860405562


##
crates/e2e_test/testdata/docker-compose.yaml:
##
@@ -0,0 +1,62 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+version: '3.8'
+
+services:
+  rest:
+image: tabulario/iceberg-rest:0.10.0
+environment:
+  - AWS_ACCESS_KEY_ID=admin
+  - AWS_SECRET_ACCESS_KEY=password
+  - AWS_REGION=us-east-1
+  - CATALOG_URI=jdbc:sqlite:file:/tmp/iceberg_rest_mode=memory
+  - CATALOG_WAREHOUSE=s3://icebergdata/demo
+  - CATALOG_IO__IMPL=org.apache.iceberg.aws.s3.S3FileIO
+  - CATALOG_S3_ENDPOINT=http://minio:9000
+depends_on:
+  - minio
+links:
+  - minio:icebergdata.minio
+ports:
+  - 8181:8181
+expose:
+  - 8181
+
+  minio:
+image: minio/minio:latest
+environment:
+  - MINIO_ROOT_USER=admin
+  - MINIO_ROOT_PASSWORD=password
+  - MINIO_DOMAIN=minio
+ports:
+  - 9001:9001
+expose:
+  - 9001
+  - 9000
+command: [ "server", "/data", "--console-address", ":9001" ]
+
+  mc:

Review Comment:
   We also use the network in PyIceberg for the integration tests: 
https://github.com/apache/iceberg-python/blob/main/dev/docker-compose-integration.yml



##
crates/e2e_test/testdata/docker-compose.yaml:
##
@@ -0,0 +1,62 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+version: '3.8'
+
+services:
+  rest:
+image: tabulario/iceberg-rest:0.10.0
+environment:
+  - AWS_ACCESS_KEY_ID=admin
+  - AWS_SECRET_ACCESS_KEY=password
+  - AWS_REGION=us-east-1
+  - CATALOG_URI=jdbc:sqlite:file:/tmp/iceberg_rest_mode=memory
+  - CATALOG_WAREHOUSE=s3://icebergdata/demo
+  - CATALOG_IO__IMPL=org.apache.iceberg.aws.s3.S3FileIO
+  - CATALOG_S3_ENDPOINT=http://minio:9000
+depends_on:
+  - minio
+links:
+  - minio:icebergdata.minio

Review Comment:
   We also use the network in PyIceberg for the integration tests: 
https://github.com/apache/iceberg-python/blob/main/dev/docker-compose-integration.yml



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] feat: support append data file and add e2e test [iceberg-rust]

2024-11-27 Thread via GitHub


Fokko commented on code in PR #349:
URL: https://github.com/apache/iceberg-rust/pull/349#discussion_r1860405562


##
crates/e2e_test/testdata/docker-compose.yaml:
##
@@ -0,0 +1,62 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+version: '3.8'
+
+services:
+  rest:
+image: tabulario/iceberg-rest:0.10.0
+environment:
+  - AWS_ACCESS_KEY_ID=admin
+  - AWS_SECRET_ACCESS_KEY=password
+  - AWS_REGION=us-east-1
+  - CATALOG_URI=jdbc:sqlite:file:/tmp/iceberg_rest_mode=memory
+  - CATALOG_WAREHOUSE=s3://icebergdata/demo
+  - CATALOG_IO__IMPL=org.apache.iceberg.aws.s3.S3FileIO
+  - CATALOG_S3_ENDPOINT=http://minio:9000
+depends_on:
+  - minio
+links:
+  - minio:icebergdata.minio
+ports:
+  - 8181:8181
+expose:
+  - 8181
+
+  minio:
+image: minio/minio:latest
+environment:
+  - MINIO_ROOT_USER=admin
+  - MINIO_ROOT_PASSWORD=password
+  - MINIO_DOMAIN=minio
+ports:
+  - 9001:9001
+expose:
+  - 9001
+  - 9000
+command: [ "server", "/data", "--console-address", ":9001" ]
+
+  mc:

Review Comment:
   We also use the network in PyIceberg for the integration tests: 
https://github.com/apache/iceberg-python/blob/main/dev/docker-compose-integration.yml



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] feat: support append data file and add e2e test [iceberg-rust]

2024-11-27 Thread via GitHub


Fokko commented on code in PR #349:
URL: https://github.com/apache/iceberg-rust/pull/349#discussion_r1860359778


##
crates/iceberg/src/transaction.rs:
##
@@ -122,6 +189,383 @@ impl<'a> Transaction<'a> {
 }
 }
 
+/// FastAppendAction is a transaction action for fast append data files to the 
table.
+pub struct FastAppendAction<'a> {
+snapshot_produce_action: SnapshotProduceAction<'a>,
+}
+
+impl<'a> FastAppendAction<'a> {
+#[allow(clippy::too_many_arguments)]
+pub(crate) fn new(
+tx: Transaction<'a>,
+parent_snapshot_id: Option,
+snapshot_id: i64,
+schema: Schema,
+schema_id: i32,
+format_version: FormatVersion,
+partition_spec: Arc,
+key_metadata: Vec,
+commit_uuid: Uuid,
+snapshot_properties: HashMap,
+) -> Result {
+Ok(Self {
+snapshot_produce_action: SnapshotProduceAction::new(
+tx,
+snapshot_id,
+parent_snapshot_id,
+schema_id,
+format_version,
+partition_spec,
+schema,
+key_metadata,
+commit_uuid,
+snapshot_properties,
+)?,
+})
+}
+
+/// Add data files to the snapshot.
+pub fn add_data_files(
+&mut self,
+data_files: impl IntoIterator,
+) -> Result<&mut Self> {
+self.snapshot_produce_action.add_data_files(data_files)?;
+Ok(self)
+}
+
+/// Finished building the action and apply it to the transaction.
+pub async fn apply(self) -> Result> {
+self.snapshot_produce_action
+.apply(FastAppendOperation, DefaultManifestProcess)
+.await
+}
+}
+
+struct FastAppendOperation;
+
+impl SnapshotProduceOperation for FastAppendOperation {
+fn operation(&self) -> Operation {
+Operation::Append
+}
+
+async fn delete_entries(
+&self,
+_snapshot_produce: &SnapshotProduceAction<'_>,
+) -> Result> {
+Ok(vec![])
+}
+
+async fn existing_manifest(
+&self,
+snapshot_produce: &SnapshotProduceAction<'_>,
+) -> Result> {
+let Some(snapshot) = snapshot_produce
+.parent_snapshot_id
+.and_then(|id| 
snapshot_produce.tx.table.metadata().snapshot_by_id(id))
+else {
+return Ok(vec![]);
+};
+
+let manifest_list = snapshot
+.load_manifest_list(
+snapshot_produce.tx.table.file_io(),
+&snapshot_produce.tx.table.metadata_ref(),
+)
+.await?;
+
+Ok(manifest_list
+.entries()
+.iter()
+.filter(|entry| entry.has_added_files() || 
entry.has_existing_files())
+.cloned()
+.collect())
+}
+}
+
+trait SnapshotProduceOperation: Send + Sync {
+fn operation(&self) -> Operation;
+#[allow(unused)]
+fn delete_entries(
+&self,
+snapshot_produce: &SnapshotProduceAction,
+) -> impl Future>> + Send;
+fn existing_manifest(
+&self,
+snapshot_produce: &SnapshotProduceAction,
+) -> impl Future>> + Send;
+}
+
+struct DefaultManifestProcess;
+
+impl ManifestProcess for DefaultManifestProcess {
+fn process_manifeset(&self, manifests: Vec) -> 
Vec {
+manifests
+}
+}
+
+trait ManifestProcess: Send + Sync {
+fn process_manifeset(&self, manifests: Vec) -> 
Vec;
+}
+
+struct SnapshotProduceAction<'a> {
+tx: Transaction<'a>,
+
+parent_snapshot_id: Option,
+snapshot_id: i64,
+schema_id: i32,
+format_version: FormatVersion,
+partition_spec: Arc,
+schema: Schema,
+key_metadata: Vec,
+
+commit_uuid: Uuid,
+
+snapshot_properties: HashMap,
+added_data_files: Vec,
+
+// A counter used to generate unique manifest file names.
+// It starts from 0 and increments for each new manifest file.
+// Note: This counter is limited to the range of (0..u64::MAX).
+manifest_counter: RangeFrom,
+}
+
+impl<'a> SnapshotProduceAction<'a> {
+#[allow(clippy::too_many_arguments)]
+pub(crate) fn new(
+tx: Transaction<'a>,
+snapshot_id: i64,
+parent_snapshot_id: Option,
+schema_id: i32,
+format_version: FormatVersion,
+partition_spec: Arc,
+schema: Schema,
+key_metadata: Vec,
+commit_uuid: Uuid,
+snapshot_properties: HashMap,
+) -> Result {
+Ok(Self {
+tx,
+parent_snapshot_id,
+snapshot_id,
+schema_id,
+format_version,
+commit_uuid,
+snapshot_properties,
+added_data_files: vec![],
+manifest_counter: (0..),
+partition_spec,
+schema,
+key_metadata,
+})
+}
+
+// Check if the partition value is compatible with the partition type.
+

Re: [PR] feat: support append data file and add e2e test [iceberg-rust]

2024-11-27 Thread via GitHub


c-thiel commented on code in PR #349:
URL: https://github.com/apache/iceberg-rust/pull/349#discussion_r1860342931


##
crates/e2e_test/testdata/docker-compose.yaml:
##
@@ -0,0 +1,62 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+version: '3.8'
+
+services:
+  rest:
+image: tabulario/iceberg-rest:0.10.0
+environment:
+  - AWS_ACCESS_KEY_ID=admin
+  - AWS_SECRET_ACCESS_KEY=password
+  - AWS_REGION=us-east-1
+  - CATALOG_URI=jdbc:sqlite:file:/tmp/iceberg_rest_mode=memory
+  - CATALOG_WAREHOUSE=s3://icebergdata/demo
+  - CATALOG_IO__IMPL=org.apache.iceberg.aws.s3.S3FileIO
+  - CATALOG_S3_ENDPOINT=http://minio:9000
+depends_on:
+  - minio
+links:
+  - minio:icebergdata.minio
+ports:
+  - 8181:8181
+expose:
+  - 8181
+
+  minio:
+image: minio/minio:latest
+environment:
+  - MINIO_ROOT_USER=admin
+  - MINIO_ROOT_PASSWORD=password
+  - MINIO_DOMAIN=minio

Review Comment:
   ```suggestion
 - MINIO_DOMAIN=minio
 - MINIO_DEFAULT_BUCKETS=icebergdata
   healthcheck:
 test: [ "CMD", "mc", "ls", "local", "|", "grep", "icebergdata" ]
 interval: 2s
 timeout: 10s
 retries: 2
 start_period: 15s
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] feat: support append data file and add e2e test [iceberg-rust]

2024-11-27 Thread via GitHub


c-thiel commented on code in PR #349:
URL: https://github.com/apache/iceberg-rust/pull/349#discussion_r1860337732


##
crates/e2e_test/testdata/docker-compose.yaml:
##
@@ -0,0 +1,62 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+version: '3.8'
+
+services:
+  rest:
+image: tabulario/iceberg-rest:0.10.0
+environment:
+  - AWS_ACCESS_KEY_ID=admin
+  - AWS_SECRET_ACCESS_KEY=password
+  - AWS_REGION=us-east-1
+  - CATALOG_URI=jdbc:sqlite:file:/tmp/iceberg_rest_mode=memory
+  - CATALOG_WAREHOUSE=s3://icebergdata/demo
+  - CATALOG_IO__IMPL=org.apache.iceberg.aws.s3.S3FileIO
+  - CATALOG_S3_ENDPOINT=http://minio:9000
+depends_on:
+  - minio
+links:
+  - minio:icebergdata.minio
+ports:
+  - 8181:8181
+expose:
+  - 8181
+
+  minio:
+image: minio/minio:latest
+environment:
+  - MINIO_ROOT_USER=admin
+  - MINIO_ROOT_PASSWORD=password
+  - MINIO_DOMAIN=minio
+ports:
+  - 9001:9001
+expose:
+  - 9001
+  - 9000
+command: [ "server", "/data", "--console-address", ":9001" ]
+
+  mc:

Review Comment:
   Instead of having this pod here, we could also just add 
`MINIO_DEFAULT_BUCKETS= icebergdata ` to the `minio` image above? This is how 
we do it with lakekeeper:
   
https://github.com/lakekeeper/lakekeeper/blob/8e2420f0ad4a993508a91a6f4abd756bb24457af/examples/self-contained/docker-compose.yaml#L126-L145



##
crates/e2e_test/testdata/docker-compose.yaml:
##
@@ -0,0 +1,62 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+version: '3.8'
+
+services:
+  rest:
+image: tabulario/iceberg-rest:0.10.0
+environment:
+  - AWS_ACCESS_KEY_ID=admin
+  - AWS_SECRET_ACCESS_KEY=password
+  - AWS_REGION=us-east-1
+  - CATALOG_URI=jdbc:sqlite:file:/tmp/iceberg_rest_mode=memory
+  - CATALOG_WAREHOUSE=s3://icebergdata/demo
+  - CATALOG_IO__IMPL=org.apache.iceberg.aws.s3.S3FileIO
+  - CATALOG_S3_ENDPOINT=http://minio:9000
+depends_on:
+  - minio
+links:
+  - minio:icebergdata.minio
+ports:
+  - 8181:8181
+expose:
+  - 8181
+
+  minio:
+image: minio/minio:latest
+environment:
+  - MINIO_ROOT_USER=admin
+  - MINIO_ROOT_PASSWORD=password
+  - MINIO_DOMAIN=minio

Review Comment:
   ```suggestion
 - MINIO_DOMAIN=minio
 - MINIO_DEFAULT_BUCKETS=icebergdata
   healthcheck:
 test: [ "CMD", "mc", "ls", "local", "|", "grep", "examples" ]
 interval: 2s
 timeout: 10s
 retries: 2
 start_period: 15s
   ```



##
crates/e2e_test/testdata/docker-compose.yaml:
##
@@ -0,0 +1,62 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+

Re: [PR] feat: support append data file and add e2e test [iceberg-rust]

2024-11-27 Thread via GitHub


c-thiel commented on code in PR #349:
URL: https://github.com/apache/iceberg-rust/pull/349#discussion_r1860331145


##
crates/iceberg/src/transaction.rs:
##
@@ -122,6 +172,364 @@ impl<'a> Transaction<'a> {
 }
 }
 
+/// FastAppendAction is a transaction action for fast append data files to the 
table.
+pub struct FastAppendAction<'a> {
+snapshot_produce_action: SnapshotProduceAction<'a>,
+}
+
+impl<'a> FastAppendAction<'a> {
+#[allow(clippy::too_many_arguments)]
+pub(crate) fn new(
+tx: Transaction<'a>,
+snapshot_id: i64,
+commit_uuid: Uuid,
+key_metadata: Vec,
+snapshot_properties: HashMap,
+) -> Result {
+Ok(Self {
+snapshot_produce_action: SnapshotProduceAction::new(
+tx,
+snapshot_id,
+key_metadata,
+commit_uuid,
+snapshot_properties,
+)?,
+})
+}
+
+/// Add data files to the snapshot.
+pub fn add_data_files(
+&mut self,
+data_files: impl IntoIterator,
+) -> Result<&mut Self> {
+self.snapshot_produce_action.add_data_files(data_files)?;
+Ok(self)
+}
+
+/// Finished building the action and apply it to the transaction.
+pub async fn apply(self) -> Result> {
+self.snapshot_produce_action
+.apply(FastAppendOperation, DefaultManifestProcess)
+.await
+}
+}
+
+struct FastAppendOperation;
+
+impl SnapshotProduceOperation for FastAppendOperation {
+fn operation(&self) -> Operation {
+Operation::Append
+}
+
+async fn delete_entries(
+&self,
+_snapshot_produce: &SnapshotProduceAction<'_>,
+) -> Result> {
+Ok(vec![])
+}
+
+async fn existing_manifest(
+&self,
+snapshot_produce: &SnapshotProduceAction<'_>,
+) -> Result> {
+let Some(snapshot) = 
snapshot_produce.tx.table.metadata().current_snapshot() else {
+return Ok(vec![]);
+};
+
+let manifest_list = snapshot
+.load_manifest_list(
+snapshot_produce.tx.table.file_io(),
+&snapshot_produce.tx.table.metadata_ref(),
+)
+.await?;
+
+Ok(manifest_list
+.entries()
+.iter()
+.filter(|entry| entry.has_added_files() || 
entry.has_existing_files())
+.cloned()
+.collect())
+}
+}
+
+trait SnapshotProduceOperation: Send + Sync {
+fn operation(&self) -> Operation;
+#[allow(unused)]
+fn delete_entries(
+&self,
+snapshot_produce: &SnapshotProduceAction,
+) -> impl Future>> + Send;
+fn existing_manifest(
+&self,
+snapshot_produce: &SnapshotProduceAction,
+) -> impl Future>> + Send;
+}
+
+struct DefaultManifestProcess;
+
+impl ManifestProcess for DefaultManifestProcess {
+fn process_manifeset(&self, manifests: Vec) -> 
Vec {
+manifests
+}
+}
+
+trait ManifestProcess: Send + Sync {
+fn process_manifeset(&self, manifests: Vec) -> 
Vec;
+}
+
+struct SnapshotProduceAction<'a> {
+tx: Transaction<'a>,
+snapshot_id: i64,
+key_metadata: Vec,
+commit_uuid: Uuid,
+snapshot_properties: HashMap,
+added_data_files: Vec,
+// A counter used to generate unique manifest file names.
+// It starts from 0 and increments for each new manifest file.
+// Note: This counter is limited to the range of (0..u64::MAX).
+manifest_counter: RangeFrom,
+}
+
+impl<'a> SnapshotProduceAction<'a> {
+pub(crate) fn new(
+tx: Transaction<'a>,
+snapshot_id: i64,
+key_metadata: Vec,
+commit_uuid: Uuid,
+snapshot_properties: HashMap,
+) -> Result {
+Ok(Self {
+tx,
+snapshot_id,
+commit_uuid,
+snapshot_properties,
+added_data_files: vec![],
+manifest_counter: (0..),
+key_metadata,
+})
+}
+
+// Check if the partition value is compatible with the partition type.
+fn validate_partition_value(
+partition_value: &Struct,
+partition_type: &StructType,
+) -> Result<()> {
+if partition_value.fields().len() != partition_type.fields().len() {
+return Err(Error::new(
+ErrorKind::DataInvalid,
+"Partition value is not compatitable with partition type",
+));
+}
+if partition_value
+.fields()
+.iter()
+.zip(partition_type.fields())
+.any(|(value, field)| {
+!field
+.field_type
+.as_primitive_type()
+.unwrap()
+.compatible(&value.as_primitive_literal().unwrap())
+})
+{
+return Err(Error::new(
+ErrorKind::DataInvalid,
+"Partition value is

Re: [PR] feat: support append data file and add e2e test [iceberg-rust]

2024-11-27 Thread via GitHub


c-thiel commented on code in PR #349:
URL: https://github.com/apache/iceberg-rust/pull/349#discussion_r1860328978


##
crates/iceberg/src/transaction.rs:
##
@@ -122,6 +189,383 @@ impl<'a> Transaction<'a> {
 }
 }
 
+/// FastAppendAction is a transaction action for fast append data files to the 
table.
+pub struct FastAppendAction<'a> {
+snapshot_produce_action: SnapshotProduceAction<'a>,
+}
+
+impl<'a> FastAppendAction<'a> {
+#[allow(clippy::too_many_arguments)]
+pub(crate) fn new(
+tx: Transaction<'a>,
+parent_snapshot_id: Option,
+snapshot_id: i64,
+schema: Schema,
+schema_id: i32,
+format_version: FormatVersion,
+partition_spec: Arc,
+key_metadata: Vec,
+commit_uuid: Uuid,
+snapshot_properties: HashMap,
+) -> Result {
+Ok(Self {
+snapshot_produce_action: SnapshotProduceAction::new(
+tx,
+snapshot_id,
+parent_snapshot_id,
+schema_id,
+format_version,
+partition_spec,
+schema,
+key_metadata,
+commit_uuid,
+snapshot_properties,
+)?,
+})
+}
+
+/// Add data files to the snapshot.
+pub fn add_data_files(
+&mut self,
+data_files: impl IntoIterator,
+) -> Result<&mut Self> {
+self.snapshot_produce_action.add_data_files(data_files)?;
+Ok(self)
+}
+
+/// Finished building the action and apply it to the transaction.
+pub async fn apply(self) -> Result> {
+self.snapshot_produce_action
+.apply(FastAppendOperation, DefaultManifestProcess)
+.await
+}
+}
+
+struct FastAppendOperation;
+
+impl SnapshotProduceOperation for FastAppendOperation {
+fn operation(&self) -> Operation {
+Operation::Append
+}
+
+async fn delete_entries(
+&self,
+_snapshot_produce: &SnapshotProduceAction<'_>,
+) -> Result> {
+Ok(vec![])
+}
+
+async fn existing_manifest(
+&self,
+snapshot_produce: &SnapshotProduceAction<'_>,
+) -> Result> {
+let Some(snapshot) = snapshot_produce
+.parent_snapshot_id
+.and_then(|id| 
snapshot_produce.tx.table.metadata().snapshot_by_id(id))
+else {
+return Ok(vec![]);
+};
+
+let manifest_list = snapshot
+.load_manifest_list(
+snapshot_produce.tx.table.file_io(),
+&snapshot_produce.tx.table.metadata_ref(),
+)
+.await?;
+
+Ok(manifest_list
+.entries()
+.iter()
+.filter(|entry| entry.has_added_files() || 
entry.has_existing_files())
+.cloned()
+.collect())
+}
+}
+
+trait SnapshotProduceOperation: Send + Sync {
+fn operation(&self) -> Operation;
+#[allow(unused)]
+fn delete_entries(
+&self,
+snapshot_produce: &SnapshotProduceAction,
+) -> impl Future>> + Send;
+fn existing_manifest(
+&self,
+snapshot_produce: &SnapshotProduceAction,
+) -> impl Future>> + Send;
+}
+
+struct DefaultManifestProcess;
+
+impl ManifestProcess for DefaultManifestProcess {
+fn process_manifeset(&self, manifests: Vec) -> 
Vec {
+manifests
+}
+}
+
+trait ManifestProcess: Send + Sync {
+fn process_manifeset(&self, manifests: Vec) -> 
Vec;
+}
+
+struct SnapshotProduceAction<'a> {
+tx: Transaction<'a>,
+
+parent_snapshot_id: Option,
+snapshot_id: i64,
+schema_id: i32,
+format_version: FormatVersion,
+partition_spec: Arc,
+schema: Schema,
+key_metadata: Vec,
+
+commit_uuid: Uuid,
+
+snapshot_properties: HashMap,
+added_data_files: Vec,
+
+// A counter used to generate unique manifest file names.
+// It starts from 0 and increments for each new manifest file.
+// Note: This counter is limited to the range of (0..u64::MAX).
+manifest_counter: RangeFrom,
+}
+
+impl<'a> SnapshotProduceAction<'a> {
+#[allow(clippy::too_many_arguments)]
+pub(crate) fn new(
+tx: Transaction<'a>,
+snapshot_id: i64,
+parent_snapshot_id: Option,
+schema_id: i32,
+format_version: FormatVersion,
+partition_spec: Arc,
+schema: Schema,
+key_metadata: Vec,
+commit_uuid: Uuid,
+snapshot_properties: HashMap,
+) -> Result {
+Ok(Self {
+tx,
+parent_snapshot_id,
+snapshot_id,
+schema_id,
+format_version,
+commit_uuid,
+snapshot_properties,
+added_data_files: vec![],
+manifest_counter: (0..),
+partition_spec,
+schema,
+key_metadata,
+})
+}
+
+// Check if the partition value is compatible with the partition type.
+  

Re: [PR] feat: support append data file and add e2e test [iceberg-rust]

2024-11-27 Thread via GitHub


Fokko commented on PR #349:
URL: https://github.com/apache/iceberg-rust/pull/349#issuecomment-2503385681

   @ZENOTME I think you're on Linux? I'm on Mac, and I think the docker 
networking is different.
   
   First I have to change the networking quite a bit, because the loop that 
waits for the rest catalog to become ready is pinging a `172.` IP address, 
which is not responding, so it is a `while true`. If I change it to 
`127.0.0.1`, and expose the ports in docker, then I get the weird error around 
the security token.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] feat: support append data file and add e2e test [iceberg-rust]

2024-11-27 Thread via GitHub


Fokko commented on code in PR #349:
URL: https://github.com/apache/iceberg-rust/pull/349#discussion_r1860256574


##
crates/iceberg/src/transaction.rs:
##
@@ -122,6 +172,364 @@ impl<'a> Transaction<'a> {
 }
 }
 
+/// FastAppendAction is a transaction action for fast append data files to the 
table.
+pub struct FastAppendAction<'a> {
+snapshot_produce_action: SnapshotProduceAction<'a>,
+}
+
+impl<'a> FastAppendAction<'a> {
+#[allow(clippy::too_many_arguments)]
+pub(crate) fn new(
+tx: Transaction<'a>,
+snapshot_id: i64,
+commit_uuid: Uuid,
+key_metadata: Vec,
+snapshot_properties: HashMap,
+) -> Result {
+Ok(Self {
+snapshot_produce_action: SnapshotProduceAction::new(
+tx,
+snapshot_id,
+key_metadata,
+commit_uuid,
+snapshot_properties,
+)?,
+})
+}
+
+/// Add data files to the snapshot.
+pub fn add_data_files(
+&mut self,
+data_files: impl IntoIterator,
+) -> Result<&mut Self> {
+self.snapshot_produce_action.add_data_files(data_files)?;
+Ok(self)
+}
+
+/// Finished building the action and apply it to the transaction.
+pub async fn apply(self) -> Result> {
+self.snapshot_produce_action
+.apply(FastAppendOperation, DefaultManifestProcess)
+.await
+}
+}
+
+struct FastAppendOperation;
+
+impl SnapshotProduceOperation for FastAppendOperation {
+fn operation(&self) -> Operation {
+Operation::Append
+}
+
+async fn delete_entries(
+&self,
+_snapshot_produce: &SnapshotProduceAction<'_>,
+) -> Result> {
+Ok(vec![])
+}
+
+async fn existing_manifest(
+&self,
+snapshot_produce: &SnapshotProduceAction<'_>,
+) -> Result> {
+let Some(snapshot) = 
snapshot_produce.tx.table.metadata().current_snapshot() else {
+return Ok(vec![]);
+};
+
+let manifest_list = snapshot
+.load_manifest_list(
+snapshot_produce.tx.table.file_io(),
+&snapshot_produce.tx.table.metadata_ref(),
+)
+.await?;
+
+Ok(manifest_list
+.entries()
+.iter()
+.filter(|entry| entry.has_added_files() || 
entry.has_existing_files())
+.cloned()
+.collect())
+}
+}
+
+trait SnapshotProduceOperation: Send + Sync {
+fn operation(&self) -> Operation;
+#[allow(unused)]
+fn delete_entries(
+&self,
+snapshot_produce: &SnapshotProduceAction,
+) -> impl Future>> + Send;
+fn existing_manifest(
+&self,
+snapshot_produce: &SnapshotProduceAction,
+) -> impl Future>> + Send;
+}
+
+struct DefaultManifestProcess;
+
+impl ManifestProcess for DefaultManifestProcess {
+fn process_manifeset(&self, manifests: Vec) -> 
Vec {
+manifests
+}
+}
+
+trait ManifestProcess: Send + Sync {
+fn process_manifeset(&self, manifests: Vec) -> 
Vec;
+}
+
+struct SnapshotProduceAction<'a> {
+tx: Transaction<'a>,
+snapshot_id: i64,
+key_metadata: Vec,
+commit_uuid: Uuid,
+snapshot_properties: HashMap,
+added_data_files: Vec,
+// A counter used to generate unique manifest file names.
+// It starts from 0 and increments for each new manifest file.
+// Note: This counter is limited to the range of (0..u64::MAX).
+manifest_counter: RangeFrom,
+}
+
+impl<'a> SnapshotProduceAction<'a> {
+pub(crate) fn new(
+tx: Transaction<'a>,
+snapshot_id: i64,
+key_metadata: Vec,
+commit_uuid: Uuid,
+snapshot_properties: HashMap,
+) -> Result {
+Ok(Self {
+tx,
+snapshot_id,
+commit_uuid,
+snapshot_properties,
+added_data_files: vec![],
+manifest_counter: (0..),
+key_metadata,
+})
+}
+
+// Check if the partition value is compatible with the partition type.
+fn validate_partition_value(
+partition_value: &Struct,
+partition_type: &StructType,
+) -> Result<()> {
+if partition_value.fields().len() != partition_type.fields().len() {
+return Err(Error::new(
+ErrorKind::DataInvalid,
+"Partition value is not compatitable with partition type",
+));
+}
+if partition_value
+.fields()
+.iter()
+.zip(partition_type.fields())
+.any(|(value, field)| {
+!field
+.field_type
+.as_primitive_type()
+.unwrap()
+.compatible(&value.as_primitive_literal().unwrap())
+})
+{
+return Err(Error::new(
+ErrorKind::DataInvalid,
+"Partition value is n

Re: [PR] feat: support append data file and add e2e test [iceberg-rust]

2024-11-27 Thread via GitHub


c-thiel commented on code in PR #349:
URL: https://github.com/apache/iceberg-rust/pull/349#discussion_r1860178688


##
crates/iceberg/src/transaction.rs:
##
@@ -122,6 +189,383 @@ impl<'a> Transaction<'a> {
 }
 }
 
+/// FastAppendAction is a transaction action for fast append data files to the 
table.
+pub struct FastAppendAction<'a> {
+snapshot_produce_action: SnapshotProduceAction<'a>,
+}
+
+impl<'a> FastAppendAction<'a> {
+#[allow(clippy::too_many_arguments)]
+pub(crate) fn new(
+tx: Transaction<'a>,
+parent_snapshot_id: Option,
+snapshot_id: i64,
+schema: Schema,
+schema_id: i32,
+format_version: FormatVersion,
+partition_spec: Arc,
+key_metadata: Vec,
+commit_uuid: Uuid,
+snapshot_properties: HashMap,
+) -> Result {
+Ok(Self {
+snapshot_produce_action: SnapshotProduceAction::new(
+tx,
+snapshot_id,
+parent_snapshot_id,
+schema_id,
+format_version,
+partition_spec,
+schema,
+key_metadata,
+commit_uuid,
+snapshot_properties,
+)?,
+})
+}
+
+/// Add data files to the snapshot.
+pub fn add_data_files(
+&mut self,
+data_files: impl IntoIterator,
+) -> Result<&mut Self> {
+self.snapshot_produce_action.add_data_files(data_files)?;
+Ok(self)
+}
+
+/// Finished building the action and apply it to the transaction.
+pub async fn apply(self) -> Result> {
+self.snapshot_produce_action
+.apply(FastAppendOperation, DefaultManifestProcess)
+.await
+}
+}
+
+struct FastAppendOperation;
+
+impl SnapshotProduceOperation for FastAppendOperation {
+fn operation(&self) -> Operation {
+Operation::Append
+}
+
+async fn delete_entries(
+&self,
+_snapshot_produce: &SnapshotProduceAction<'_>,
+) -> Result> {
+Ok(vec![])
+}
+
+async fn existing_manifest(
+&self,
+snapshot_produce: &SnapshotProduceAction<'_>,
+) -> Result> {
+let Some(snapshot) = snapshot_produce
+.parent_snapshot_id
+.and_then(|id| 
snapshot_produce.tx.table.metadata().snapshot_by_id(id))
+else {
+return Ok(vec![]);
+};
+
+let manifest_list = snapshot
+.load_manifest_list(
+snapshot_produce.tx.table.file_io(),
+&snapshot_produce.tx.table.metadata_ref(),
+)
+.await?;
+
+Ok(manifest_list
+.entries()
+.iter()
+.filter(|entry| entry.has_added_files() || 
entry.has_existing_files())
+.cloned()
+.collect())
+}
+}
+
+trait SnapshotProduceOperation: Send + Sync {
+fn operation(&self) -> Operation;
+#[allow(unused)]
+fn delete_entries(
+&self,
+snapshot_produce: &SnapshotProduceAction,
+) -> impl Future>> + Send;
+fn existing_manifest(
+&self,
+snapshot_produce: &SnapshotProduceAction,
+) -> impl Future>> + Send;
+}
+
+struct DefaultManifestProcess;
+
+impl ManifestProcess for DefaultManifestProcess {
+fn process_manifeset(&self, manifests: Vec) -> 
Vec {
+manifests
+}
+}
+
+trait ManifestProcess: Send + Sync {
+fn process_manifeset(&self, manifests: Vec) -> 
Vec;
+}
+
+struct SnapshotProduceAction<'a> {
+tx: Transaction<'a>,
+
+parent_snapshot_id: Option,
+snapshot_id: i64,
+schema_id: i32,
+format_version: FormatVersion,
+partition_spec: Arc,
+schema: Schema,
+key_metadata: Vec,
+
+commit_uuid: Uuid,
+
+snapshot_properties: HashMap,
+added_data_files: Vec,
+
+// A counter used to generate unique manifest file names.
+// It starts from 0 and increments for each new manifest file.
+// Note: This counter is limited to the range of (0..u64::MAX).
+manifest_counter: RangeFrom,
+}
+
+impl<'a> SnapshotProduceAction<'a> {
+#[allow(clippy::too_many_arguments)]
+pub(crate) fn new(
+tx: Transaction<'a>,
+snapshot_id: i64,
+parent_snapshot_id: Option,
+schema_id: i32,
+format_version: FormatVersion,
+partition_spec: Arc,
+schema: Schema,
+key_metadata: Vec,
+commit_uuid: Uuid,
+snapshot_properties: HashMap,
+) -> Result {
+Ok(Self {
+tx,
+parent_snapshot_id,
+snapshot_id,
+schema_id,
+format_version,
+commit_uuid,
+snapshot_properties,
+added_data_files: vec![],
+manifest_counter: (0..),
+partition_spec,
+schema,
+key_metadata,
+})
+}
+
+// Check if the partition value is compatible with the partition type.
+  

Re: [PR] feat: support append data file and add e2e test [iceberg-rust]

2024-11-27 Thread via GitHub


c-thiel commented on code in PR #349:
URL: https://github.com/apache/iceberg-rust/pull/349#discussion_r1860177985


##
crates/iceberg/src/transaction.rs:
##
@@ -122,6 +189,383 @@ impl<'a> Transaction<'a> {
 }
 }
 
+/// FastAppendAction is a transaction action for fast append data files to the 
table.
+pub struct FastAppendAction<'a> {
+snapshot_produce_action: SnapshotProduceAction<'a>,
+}
+
+impl<'a> FastAppendAction<'a> {
+#[allow(clippy::too_many_arguments)]
+pub(crate) fn new(
+tx: Transaction<'a>,
+parent_snapshot_id: Option,
+snapshot_id: i64,
+schema: Schema,
+schema_id: i32,
+format_version: FormatVersion,
+partition_spec: Arc,
+key_metadata: Vec,
+commit_uuid: Uuid,
+snapshot_properties: HashMap,
+) -> Result {
+Ok(Self {
+snapshot_produce_action: SnapshotProduceAction::new(
+tx,
+snapshot_id,
+parent_snapshot_id,
+schema_id,
+format_version,
+partition_spec,
+schema,
+key_metadata,
+commit_uuid,
+snapshot_properties,
+)?,
+})
+}
+
+/// Add data files to the snapshot.
+pub fn add_data_files(
+&mut self,
+data_files: impl IntoIterator,
+) -> Result<&mut Self> {
+self.snapshot_produce_action.add_data_files(data_files)?;
+Ok(self)
+}
+
+/// Finished building the action and apply it to the transaction.
+pub async fn apply(self) -> Result> {
+self.snapshot_produce_action
+.apply(FastAppendOperation, DefaultManifestProcess)
+.await
+}
+}
+
+struct FastAppendOperation;
+
+impl SnapshotProduceOperation for FastAppendOperation {
+fn operation(&self) -> Operation {
+Operation::Append
+}
+
+async fn delete_entries(
+&self,
+_snapshot_produce: &SnapshotProduceAction<'_>,
+) -> Result> {
+Ok(vec![])
+}
+
+async fn existing_manifest(
+&self,
+snapshot_produce: &SnapshotProduceAction<'_>,
+) -> Result> {
+let Some(snapshot) = snapshot_produce
+.parent_snapshot_id
+.and_then(|id| 
snapshot_produce.tx.table.metadata().snapshot_by_id(id))
+else {
+return Ok(vec![]);
+};
+
+let manifest_list = snapshot
+.load_manifest_list(
+snapshot_produce.tx.table.file_io(),
+&snapshot_produce.tx.table.metadata_ref(),
+)
+.await?;
+
+Ok(manifest_list
+.entries()
+.iter()
+.filter(|entry| entry.has_added_files() || 
entry.has_existing_files())
+.cloned()
+.collect())
+}
+}
+
+trait SnapshotProduceOperation: Send + Sync {
+fn operation(&self) -> Operation;
+#[allow(unused)]
+fn delete_entries(
+&self,
+snapshot_produce: &SnapshotProduceAction,
+) -> impl Future>> + Send;
+fn existing_manifest(
+&self,
+snapshot_produce: &SnapshotProduceAction,
+) -> impl Future>> + Send;
+}
+
+struct DefaultManifestProcess;
+
+impl ManifestProcess for DefaultManifestProcess {
+fn process_manifeset(&self, manifests: Vec) -> 
Vec {
+manifests
+}
+}
+
+trait ManifestProcess: Send + Sync {
+fn process_manifeset(&self, manifests: Vec) -> 
Vec;
+}
+
+struct SnapshotProduceAction<'a> {
+tx: Transaction<'a>,
+
+parent_snapshot_id: Option,
+snapshot_id: i64,
+schema_id: i32,
+format_version: FormatVersion,
+partition_spec: Arc,
+schema: Schema,
+key_metadata: Vec,
+
+commit_uuid: Uuid,
+
+snapshot_properties: HashMap,
+added_data_files: Vec,
+
+// A counter used to generate unique manifest file names.
+// It starts from 0 and increments for each new manifest file.
+// Note: This counter is limited to the range of (0..u64::MAX).
+manifest_counter: RangeFrom,
+}
+
+impl<'a> SnapshotProduceAction<'a> {
+#[allow(clippy::too_many_arguments)]
+pub(crate) fn new(
+tx: Transaction<'a>,
+snapshot_id: i64,
+parent_snapshot_id: Option,
+schema_id: i32,
+format_version: FormatVersion,
+partition_spec: Arc,
+schema: Schema,
+key_metadata: Vec,
+commit_uuid: Uuid,
+snapshot_properties: HashMap,
+) -> Result {
+Ok(Self {
+tx,
+parent_snapshot_id,
+snapshot_id,
+schema_id,
+format_version,
+commit_uuid,
+snapshot_properties,
+added_data_files: vec![],
+manifest_counter: (0..),
+partition_spec,
+schema,
+key_metadata,
+})
+}
+
+// Check if the partition value is compatible with the partition type.
+  

Re: [PR] feat: support append data file and add e2e test [iceberg-rust]

2024-11-27 Thread via GitHub


c-thiel commented on code in PR #349:
URL: https://github.com/apache/iceberg-rust/pull/349#discussion_r1860174591


##
crates/iceberg/src/transaction.rs:
##
@@ -122,6 +189,383 @@ impl<'a> Transaction<'a> {
 }
 }
 
+/// FastAppendAction is a transaction action for fast append data files to the 
table.
+pub struct FastAppendAction<'a> {
+snapshot_produce_action: SnapshotProduceAction<'a>,
+}
+
+impl<'a> FastAppendAction<'a> {
+#[allow(clippy::too_many_arguments)]
+pub(crate) fn new(
+tx: Transaction<'a>,
+parent_snapshot_id: Option,
+snapshot_id: i64,
+schema: Schema,
+schema_id: i32,

Review Comment:
   I can't mark this as resolved, but it's fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] feat: support append data file and add e2e test [iceberg-rust]

2024-11-26 Thread via GitHub


Fokko commented on code in PR #349:
URL: https://github.com/apache/iceberg-rust/pull/349#discussion_r1860112766


##
crates/iceberg/src/transaction.rs:
##
@@ -96,6 +109,60 @@ impl<'a> Transaction<'a> {
 Ok(self)
 }
 
+fn generate_unique_snapshot_id(&self) -> i64 {
+let generate_random_id = || -> i64 {
+let (lhs, rhs) = Uuid::new_v4().as_u64_pair();
+let snapshot_id = (lhs ^ rhs) as i64;
+if snapshot_id < 0 {
+-snapshot_id
+} else {
+snapshot_id
+}
+};
+let mut snapshot_id = generate_random_id();
+while self
+.table
+.metadata()
+.snapshots()
+.any(|s| s.snapshot_id() == snapshot_id)
+{
+snapshot_id = generate_random_id();
+}
+snapshot_id
+}
+
+/// Creates a fast append action.
+pub fn fast_append(
+self,
+commit_uuid: Option,
+key_metadata: Vec,
+) -> Result> {
+let parent_snapshot_id = self
+.table
+.metadata()
+.current_snapshot()
+.map(|s| s.snapshot_id());
+let snapshot_id = self.generate_unique_snapshot_id();
+let schema = self.table.metadata().current_schema().as_ref().clone();
+let schema_id = schema.schema_id();
+let format_version = self.table.metadata().format_version();
+let partition_spec = 
self.table.metadata().default_partition_spec().clone();
+let commit_uuid = commit_uuid.unwrap_or_else(Uuid::new_v4);

Review Comment:
   I don't think this is mentioned in the spec, I think V7 is fine as well, as 
it randomizes much better. See 
https://materializedview.io/i/142011675/uuid-shootout
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] feat: support append data file and add e2e test [iceberg-rust]

2024-11-26 Thread via GitHub


Fokko commented on code in PR #349:
URL: https://github.com/apache/iceberg-rust/pull/349#discussion_r1860102313


##
crates/iceberg/src/spec/manifest_list.rs:
##
@@ -106,34 +106,38 @@ impl std::fmt::Debug for ManifestListWriter {
 
 impl ManifestListWriter {
 /// Construct a v1 [`ManifestListWriter`] that writes to a provided 
[`OutputFile`].
-pub fn v1(output_file: OutputFile, snapshot_id: i64, parent_snapshot_id: 
i64) -> Self {
-let metadata = HashMap::from_iter([
+pub fn v1(output_file: OutputFile, snapshot_id: i64, parent_snapshot_id: 
Option) -> Self {
+let mut metadata = HashMap::from_iter([
 ("snapshot-id".to_string(), snapshot_id.to_string()),
-(
-"parent-snapshot-id".to_string(),
-parent_snapshot_id.to_string(),
-),
 ("format-version".to_string(), "1".to_string()),
 ]);
+if let Some(parent_snapshot_id) = parent_snapshot_id {
+metadata.insert(
+"parent-snapshot-id".to_string(),
+parent_snapshot_id.to_string(),
+);
+}
 Self::new(FormatVersion::V1, output_file, metadata, 0, snapshot_id)
 }
 
 /// Construct a v2 [`ManifestListWriter`] that writes to a provided 
[`OutputFile`].
 pub fn v2(
 output_file: OutputFile,
 snapshot_id: i64,
-parent_snapshot_id: i64,
+parent_snapshot_id: Option,
 sequence_number: i64,
 ) -> Self {
-let metadata = HashMap::from_iter([
+let mut metadata = HashMap::from_iter([
 ("snapshot-id".to_string(), snapshot_id.to_string()),
-(
-"parent-snapshot-id".to_string(),
-parent_snapshot_id.to_string(),
-),
 ("sequence-number".to_string(), sequence_number.to_string()),
 ("format-version".to_string(), "2".to_string()),
 ]);
+if let Some(parent_snapshot_id) = parent_snapshot_id {

Review Comment:
   Let's get this fixed on the PyIceberg side: 
https://github.com/apache/iceberg-python/pull/1383



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] feat: support append data file and add e2e test [iceberg-rust]

2024-11-26 Thread via GitHub


Fokko commented on code in PR #349:
URL: https://github.com/apache/iceberg-rust/pull/349#discussion_r1860074706


##
crates/iceberg/src/spec/manifest_list.rs:
##
@@ -106,34 +106,38 @@ impl std::fmt::Debug for ManifestListWriter {
 
 impl ManifestListWriter {
 /// Construct a v1 [`ManifestListWriter`] that writes to a provided 
[`OutputFile`].
-pub fn v1(output_file: OutputFile, snapshot_id: i64, parent_snapshot_id: 
i64) -> Self {
-let metadata = HashMap::from_iter([
+pub fn v1(output_file: OutputFile, snapshot_id: i64, parent_snapshot_id: 
Option) -> Self {
+let mut metadata = HashMap::from_iter([
 ("snapshot-id".to_string(), snapshot_id.to_string()),
-(
-"parent-snapshot-id".to_string(),
-parent_snapshot_id.to_string(),
-),
 ("format-version".to_string(), "1".to_string()),
 ]);
+if let Some(parent_snapshot_id) = parent_snapshot_id {
+metadata.insert(
+"parent-snapshot-id".to_string(),
+parent_snapshot_id.to_string(),
+);
+}
 Self::new(FormatVersion::V1, output_file, metadata, 0, snapshot_id)
 }
 
 /// Construct a v2 [`ManifestListWriter`] that writes to a provided 
[`OutputFile`].
 pub fn v2(
 output_file: OutputFile,
 snapshot_id: i64,
-parent_snapshot_id: i64,
+parent_snapshot_id: Option,
 sequence_number: i64,
 ) -> Self {
-let metadata = HashMap::from_iter([
+let mut metadata = HashMap::from_iter([
 ("snapshot-id".to_string(), snapshot_id.to_string()),
-(
-"parent-snapshot-id".to_string(),
-parent_snapshot_id.to_string(),
-),
 ("sequence-number".to_string(), sequence_number.to_string()),
 ("format-version".to_string(), "2".to_string()),
 ]);
+if let Some(parent_snapshot_id) = parent_snapshot_id {

Review Comment:
   Since the Java implementation is the reference implementation, I fully agree 
with @c-thiel to set it to `null`:
   
   
![image](https://github.com/user-attachments/assets/47da8d0c-6c1d-4f75-a2f7-a72add51d7fa)
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] feat: support append data file and add e2e test [iceberg-rust]

2024-11-26 Thread via GitHub


Fokko commented on code in PR #349:
URL: https://github.com/apache/iceberg-rust/pull/349#discussion_r1860068128


##
crates/e2e_test/Cargo.toml:
##
@@ -0,0 +1,37 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+[package]
+name = "iceberg-e2e_test"

Review Comment:
   I would go with `iceberg-integration-tests` since that's what we use for 
PyIceberg as well :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] feat: support append data file and add e2e test [iceberg-rust]

2024-11-24 Thread via GitHub


ZENOTME commented on code in PR #349:
URL: https://github.com/apache/iceberg-rust/pull/349#discussion_r1855708285


##
crates/iceberg/src/transaction.rs:
##
@@ -96,6 +109,60 @@ impl<'a> Transaction<'a> {
 Ok(self)
 }
 
+fn generate_unique_snapshot_id(&self) -> i64 {
+let generate_random_id = || -> i64 {
+let (lhs, rhs) = Uuid::new_v4().as_u64_pair();
+let snapshot_id = (lhs ^ rhs) as i64;
+if snapshot_id < 0 {
+-snapshot_id
+} else {
+snapshot_id
+}
+};
+let mut snapshot_id = generate_random_id();
+while self
+.table
+.metadata()
+.snapshots()
+.any(|s| s.snapshot_id() == snapshot_id)
+{
+snapshot_id = generate_random_id();
+}
+snapshot_id
+}
+
+/// Creates a fast append action.
+pub fn fast_append(
+self,
+commit_uuid: Option,
+key_metadata: Vec,
+) -> Result> {
+let parent_snapshot_id = self
+.table
+.metadata()
+.current_snapshot()
+.map(|s| s.snapshot_id());
+let snapshot_id = self.generate_unique_snapshot_id();
+let schema = self.table.metadata().current_schema().as_ref().clone();
+let schema_id = schema.schema_id();
+let format_version = self.table.metadata().format_version();
+let partition_spec = 
self.table.metadata().default_partition_spec().clone();
+let commit_uuid = commit_uuid.unwrap_or_else(Uuid::new_v4);

Review Comment:
   Actually, I'm not familiar with this here. Use v4 reference from: 
https://github.com/apache/iceberg-python/blob/c21aefde15cbc3ff9fbb3aaddb17e3855ced7032/pyiceberg/table/update/snapshot.py#L116.
 cc @Fokko  How do you think?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] feat: support append data file and add e2e test [iceberg-rust]

2024-11-24 Thread via GitHub


ZENOTME commented on code in PR #349:
URL: https://github.com/apache/iceberg-rust/pull/349#discussion_r1855695281


##
crates/iceberg/src/transaction.rs:
##
@@ -122,6 +189,383 @@ impl<'a> Transaction<'a> {
 }
 }
 
+/// FastAppendAction is a transaction action for fast append data files to the 
table.
+pub struct FastAppendAction<'a> {
+snapshot_produce_action: SnapshotProduceAction<'a>,
+}
+
+impl<'a> FastAppendAction<'a> {
+#[allow(clippy::too_many_arguments)]
+pub(crate) fn new(
+tx: Transaction<'a>,
+parent_snapshot_id: Option,
+snapshot_id: i64,
+schema: Schema,
+schema_id: i32,
+format_version: FormatVersion,
+partition_spec: Arc,
+key_metadata: Vec,
+commit_uuid: Uuid,
+snapshot_properties: HashMap,
+) -> Result {
+Ok(Self {
+snapshot_produce_action: SnapshotProduceAction::new(
+tx,
+snapshot_id,
+parent_snapshot_id,
+schema_id,
+format_version,
+partition_spec,
+schema,
+key_metadata,
+commit_uuid,
+snapshot_properties,
+)?,
+})
+}
+
+/// Add data files to the snapshot.
+pub fn add_data_files(
+&mut self,
+data_files: impl IntoIterator,
+) -> Result<&mut Self> {
+self.snapshot_produce_action.add_data_files(data_files)?;
+Ok(self)
+}
+
+/// Finished building the action and apply it to the transaction.
+pub async fn apply(self) -> Result> {
+self.snapshot_produce_action
+.apply(FastAppendOperation, DefaultManifestProcess)
+.await
+}
+}
+
+struct FastAppendOperation;
+
+impl SnapshotProduceOperation for FastAppendOperation {
+fn operation(&self) -> Operation {
+Operation::Append
+}
+
+async fn delete_entries(
+&self,
+_snapshot_produce: &SnapshotProduceAction<'_>,
+) -> Result> {
+Ok(vec![])
+}
+
+async fn existing_manifest(
+&self,
+snapshot_produce: &SnapshotProduceAction<'_>,
+) -> Result> {
+let Some(snapshot) = snapshot_produce
+.parent_snapshot_id
+.and_then(|id| 
snapshot_produce.tx.table.metadata().snapshot_by_id(id))
+else {
+return Ok(vec![]);
+};
+
+let manifest_list = snapshot
+.load_manifest_list(
+snapshot_produce.tx.table.file_io(),
+&snapshot_produce.tx.table.metadata_ref(),
+)
+.await?;
+
+Ok(manifest_list
+.entries()
+.iter()
+.filter(|entry| entry.has_added_files() || 
entry.has_existing_files())
+.cloned()
+.collect())
+}
+}
+
+trait SnapshotProduceOperation: Send + Sync {
+fn operation(&self) -> Operation;
+#[allow(unused)]
+fn delete_entries(
+&self,
+snapshot_produce: &SnapshotProduceAction,
+) -> impl Future>> + Send;
+fn existing_manifest(
+&self,
+snapshot_produce: &SnapshotProduceAction,
+) -> impl Future>> + Send;
+}
+
+struct DefaultManifestProcess;
+
+impl ManifestProcess for DefaultManifestProcess {
+fn process_manifeset(&self, manifests: Vec) -> 
Vec {
+manifests
+}
+}
+
+trait ManifestProcess: Send + Sync {
+fn process_manifeset(&self, manifests: Vec) -> 
Vec;
+}
+
+struct SnapshotProduceAction<'a> {
+tx: Transaction<'a>,
+
+parent_snapshot_id: Option,
+snapshot_id: i64,
+schema_id: i32,
+format_version: FormatVersion,
+partition_spec: Arc,
+schema: Schema,
+key_metadata: Vec,
+
+commit_uuid: Uuid,
+
+snapshot_properties: HashMap,
+added_data_files: Vec,
+
+// A counter used to generate unique manifest file names.
+// It starts from 0 and increments for each new manifest file.
+// Note: This counter is limited to the range of (0..u64::MAX).
+manifest_counter: RangeFrom,
+}
+
+impl<'a> SnapshotProduceAction<'a> {
+#[allow(clippy::too_many_arguments)]
+pub(crate) fn new(
+tx: Transaction<'a>,
+snapshot_id: i64,
+parent_snapshot_id: Option,
+schema_id: i32,
+format_version: FormatVersion,
+partition_spec: Arc,
+schema: Schema,
+key_metadata: Vec,
+commit_uuid: Uuid,
+snapshot_properties: HashMap,
+) -> Result {
+Ok(Self {
+tx,
+parent_snapshot_id,
+snapshot_id,
+schema_id,
+format_version,
+commit_uuid,
+snapshot_properties,
+added_data_files: vec![],
+manifest_counter: (0..),
+partition_spec,
+schema,
+key_metadata,
+})
+}
+
+// Check if the partition value is compatible with the partition type.
+  

Re: [PR] feat: support append data file and add e2e test [iceberg-rust]

2024-11-24 Thread via GitHub


ZENOTME commented on code in PR #349:
URL: https://github.com/apache/iceberg-rust/pull/349#discussion_r1855684639


##
crates/iceberg/src/transaction.rs:
##
@@ -122,6 +189,383 @@ impl<'a> Transaction<'a> {
 }
 }
 
+/// FastAppendAction is a transaction action for fast append data files to the 
table.
+pub struct FastAppendAction<'a> {
+snapshot_produce_action: SnapshotProduceAction<'a>,
+}
+
+impl<'a> FastAppendAction<'a> {
+#[allow(clippy::too_many_arguments)]
+pub(crate) fn new(
+tx: Transaction<'a>,
+parent_snapshot_id: Option,
+snapshot_id: i64,
+schema: Schema,
+schema_id: i32,
+format_version: FormatVersion,
+partition_spec: Arc,
+key_metadata: Vec,
+commit_uuid: Uuid,
+snapshot_properties: HashMap,
+) -> Result {
+Ok(Self {
+snapshot_produce_action: SnapshotProduceAction::new(
+tx,
+snapshot_id,
+parent_snapshot_id,
+schema_id,
+format_version,
+partition_spec,
+schema,
+key_metadata,
+commit_uuid,
+snapshot_properties,
+)?,
+})
+}
+
+/// Add data files to the snapshot.
+pub fn add_data_files(
+&mut self,
+data_files: impl IntoIterator,
+) -> Result<&mut Self> {
+self.snapshot_produce_action.add_data_files(data_files)?;
+Ok(self)
+}
+
+/// Finished building the action and apply it to the transaction.
+pub async fn apply(self) -> Result> {
+self.snapshot_produce_action
+.apply(FastAppendOperation, DefaultManifestProcess)
+.await
+}
+}
+
+struct FastAppendOperation;
+
+impl SnapshotProduceOperation for FastAppendOperation {
+fn operation(&self) -> Operation {
+Operation::Append
+}
+
+async fn delete_entries(
+&self,
+_snapshot_produce: &SnapshotProduceAction<'_>,
+) -> Result> {
+Ok(vec![])
+}
+
+async fn existing_manifest(
+&self,
+snapshot_produce: &SnapshotProduceAction<'_>,
+) -> Result> {
+let Some(snapshot) = snapshot_produce
+.parent_snapshot_id
+.and_then(|id| 
snapshot_produce.tx.table.metadata().snapshot_by_id(id))
+else {
+return Ok(vec![]);
+};
+
+let manifest_list = snapshot
+.load_manifest_list(
+snapshot_produce.tx.table.file_io(),
+&snapshot_produce.tx.table.metadata_ref(),
+)
+.await?;
+
+Ok(manifest_list
+.entries()
+.iter()
+.filter(|entry| entry.has_added_files() || 
entry.has_existing_files())
+.cloned()
+.collect())
+}
+}
+
+trait SnapshotProduceOperation: Send + Sync {
+fn operation(&self) -> Operation;
+#[allow(unused)]
+fn delete_entries(
+&self,
+snapshot_produce: &SnapshotProduceAction,
+) -> impl Future>> + Send;
+fn existing_manifest(
+&self,
+snapshot_produce: &SnapshotProduceAction,
+) -> impl Future>> + Send;
+}
+
+struct DefaultManifestProcess;
+
+impl ManifestProcess for DefaultManifestProcess {
+fn process_manifeset(&self, manifests: Vec) -> 
Vec {
+manifests
+}
+}
+
+trait ManifestProcess: Send + Sync {
+fn process_manifeset(&self, manifests: Vec) -> 
Vec;
+}
+
+struct SnapshotProduceAction<'a> {
+tx: Transaction<'a>,
+
+parent_snapshot_id: Option,
+snapshot_id: i64,
+schema_id: i32,
+format_version: FormatVersion,
+partition_spec: Arc,
+schema: Schema,
+key_metadata: Vec,
+
+commit_uuid: Uuid,
+
+snapshot_properties: HashMap,
+added_data_files: Vec,
+
+// A counter used to generate unique manifest file names.
+// It starts from 0 and increments for each new manifest file.
+// Note: This counter is limited to the range of (0..u64::MAX).
+manifest_counter: RangeFrom,
+}
+
+impl<'a> SnapshotProduceAction<'a> {
+#[allow(clippy::too_many_arguments)]
+pub(crate) fn new(
+tx: Transaction<'a>,
+snapshot_id: i64,
+parent_snapshot_id: Option,
+schema_id: i32,
+format_version: FormatVersion,
+partition_spec: Arc,
+schema: Schema,
+key_metadata: Vec,
+commit_uuid: Uuid,
+snapshot_properties: HashMap,
+) -> Result {
+Ok(Self {
+tx,
+parent_snapshot_id,
+snapshot_id,
+schema_id,
+format_version,
+commit_uuid,
+snapshot_properties,
+added_data_files: vec![],
+manifest_counter: (0..),
+partition_spec,
+schema,
+key_metadata,
+})
+}
+
+// Check if the partition value is compatible with the partition type.
+  

Re: [PR] feat: support append data file and add e2e test [iceberg-rust]

2024-11-24 Thread via GitHub


ZENOTME commented on code in PR #349:
URL: https://github.com/apache/iceberg-rust/pull/349#discussion_r1855682480


##
crates/iceberg/src/transaction.rs:
##
@@ -122,6 +189,383 @@ impl<'a> Transaction<'a> {
 }
 }
 
+/// FastAppendAction is a transaction action for fast append data files to the 
table.
+pub struct FastAppendAction<'a> {
+snapshot_produce_action: SnapshotProduceAction<'a>,
+}
+
+impl<'a> FastAppendAction<'a> {
+#[allow(clippy::too_many_arguments)]
+pub(crate) fn new(
+tx: Transaction<'a>,
+parent_snapshot_id: Option,
+snapshot_id: i64,
+schema: Schema,
+schema_id: i32,
+format_version: FormatVersion,
+partition_spec: Arc,
+key_metadata: Vec,
+commit_uuid: Uuid,
+snapshot_properties: HashMap,
+) -> Result {
+Ok(Self {
+snapshot_produce_action: SnapshotProduceAction::new(
+tx,
+snapshot_id,
+parent_snapshot_id,
+schema_id,
+format_version,
+partition_spec,
+schema,
+key_metadata,
+commit_uuid,
+snapshot_properties,
+)?,
+})
+}
+
+/// Add data files to the snapshot.
+pub fn add_data_files(
+&mut self,
+data_files: impl IntoIterator,
+) -> Result<&mut Self> {
+self.snapshot_produce_action.add_data_files(data_files)?;
+Ok(self)
+}
+
+/// Finished building the action and apply it to the transaction.
+pub async fn apply(self) -> Result> {
+self.snapshot_produce_action
+.apply(FastAppendOperation, DefaultManifestProcess)
+.await
+}
+}
+
+struct FastAppendOperation;
+
+impl SnapshotProduceOperation for FastAppendOperation {
+fn operation(&self) -> Operation {
+Operation::Append
+}
+
+async fn delete_entries(
+&self,
+_snapshot_produce: &SnapshotProduceAction<'_>,
+) -> Result> {
+Ok(vec![])
+}
+
+async fn existing_manifest(
+&self,
+snapshot_produce: &SnapshotProduceAction<'_>,
+) -> Result> {
+let Some(snapshot) = snapshot_produce
+.parent_snapshot_id
+.and_then(|id| 
snapshot_produce.tx.table.metadata().snapshot_by_id(id))
+else {
+return Ok(vec![]);
+};
+
+let manifest_list = snapshot
+.load_manifest_list(
+snapshot_produce.tx.table.file_io(),
+&snapshot_produce.tx.table.metadata_ref(),
+)
+.await?;
+
+Ok(manifest_list
+.entries()
+.iter()
+.filter(|entry| entry.has_added_files() || 
entry.has_existing_files())
+.cloned()
+.collect())
+}
+}
+
+trait SnapshotProduceOperation: Send + Sync {
+fn operation(&self) -> Operation;
+#[allow(unused)]
+fn delete_entries(
+&self,
+snapshot_produce: &SnapshotProduceAction,
+) -> impl Future>> + Send;
+fn existing_manifest(
+&self,
+snapshot_produce: &SnapshotProduceAction,
+) -> impl Future>> + Send;
+}
+
+struct DefaultManifestProcess;
+
+impl ManifestProcess for DefaultManifestProcess {
+fn process_manifeset(&self, manifests: Vec) -> 
Vec {
+manifests
+}
+}
+
+trait ManifestProcess: Send + Sync {
+fn process_manifeset(&self, manifests: Vec) -> 
Vec;
+}
+
+struct SnapshotProduceAction<'a> {
+tx: Transaction<'a>,
+
+parent_snapshot_id: Option,
+snapshot_id: i64,
+schema_id: i32,
+format_version: FormatVersion,
+partition_spec: Arc,
+schema: Schema,
+key_metadata: Vec,
+
+commit_uuid: Uuid,
+
+snapshot_properties: HashMap,
+added_data_files: Vec,
+
+// A counter used to generate unique manifest file names.
+// It starts from 0 and increments for each new manifest file.
+// Note: This counter is limited to the range of (0..u64::MAX).
+manifest_counter: RangeFrom,
+}
+
+impl<'a> SnapshotProduceAction<'a> {
+#[allow(clippy::too_many_arguments)]
+pub(crate) fn new(
+tx: Transaction<'a>,
+snapshot_id: i64,
+parent_snapshot_id: Option,
+schema_id: i32,
+format_version: FormatVersion,
+partition_spec: Arc,
+schema: Schema,
+key_metadata: Vec,
+commit_uuid: Uuid,
+snapshot_properties: HashMap,
+) -> Result {
+Ok(Self {
+tx,
+parent_snapshot_id,
+snapshot_id,
+schema_id,
+format_version,
+commit_uuid,
+snapshot_properties,
+added_data_files: vec![],
+manifest_counter: (0..),
+partition_spec,
+schema,
+key_metadata,
+})
+}
+
+// Check if the partition value is compatible with the partition type.
+  

Re: [PR] feat: support append data file and add e2e test [iceberg-rust]

2024-11-22 Thread via GitHub


ZENOTME commented on code in PR #349:
URL: https://github.com/apache/iceberg-rust/pull/349#discussion_r1854528111


##
crates/iceberg/src/spec/manifest_list.rs:
##
@@ -106,34 +106,38 @@ impl std::fmt::Debug for ManifestListWriter {
 
 impl ManifestListWriter {
 /// Construct a v1 [`ManifestListWriter`] that writes to a provided 
[`OutputFile`].
-pub fn v1(output_file: OutputFile, snapshot_id: i64, parent_snapshot_id: 
i64) -> Self {
-let metadata = HashMap::from_iter([
+pub fn v1(output_file: OutputFile, snapshot_id: i64, parent_snapshot_id: 
Option) -> Self {
+let mut metadata = HashMap::from_iter([
 ("snapshot-id".to_string(), snapshot_id.to_string()),
-(
-"parent-snapshot-id".to_string(),
-parent_snapshot_id.to_string(),
-),
 ("format-version".to_string(), "1".to_string()),
 ]);
+if let Some(parent_snapshot_id) = parent_snapshot_id {
+metadata.insert(
+"parent-snapshot-id".to_string(),
+parent_snapshot_id.to_string(),
+);
+}
 Self::new(FormatVersion::V1, output_file, metadata, 0, snapshot_id)
 }
 
 /// Construct a v2 [`ManifestListWriter`] that writes to a provided 
[`OutputFile`].
 pub fn v2(
 output_file: OutputFile,
 snapshot_id: i64,
-parent_snapshot_id: i64,
+parent_snapshot_id: Option,
 sequence_number: i64,
 ) -> Self {
-let metadata = HashMap::from_iter([
+let mut metadata = HashMap::from_iter([
 ("snapshot-id".to_string(), snapshot_id.to_string()),
-(
-"parent-snapshot-id".to_string(),
-parent_snapshot_id.to_string(),
-),
 ("sequence-number".to_string(), sequence_number.to_string()),
 ("format-version".to_string(), "2".to_string()),
 ]);
+if let Some(parent_snapshot_id) = parent_snapshot_id {

Review Comment:
   Good catch! Since the value here in pyiceberg will be `None`. I don't see 
any reference about the metadata of ManifestList in iceberg spec. cc @Fokko 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] feat: support append data file and add e2e test [iceberg-rust]

2024-11-21 Thread via GitHub


c-thiel commented on code in PR #349:
URL: https://github.com/apache/iceberg-rust/pull/349#discussion_r1852590840


##
crates/iceberg/src/transaction.rs:
##
@@ -96,6 +109,60 @@ impl<'a> Transaction<'a> {
 Ok(self)
 }
 
+fn generate_unique_snapshot_id(&self) -> i64 {
+let generate_random_id = || -> i64 {
+let (lhs, rhs) = Uuid::new_v4().as_u64_pair();
+let snapshot_id = (lhs ^ rhs) as i64;
+if snapshot_id < 0 {
+-snapshot_id
+} else {
+snapshot_id
+}
+};
+let mut snapshot_id = generate_random_id();
+while self
+.table
+.metadata()
+.snapshots()
+.any(|s| s.snapshot_id() == snapshot_id)
+{
+snapshot_id = generate_random_id();
+}
+snapshot_id
+}
+
+/// Creates a fast append action.
+pub fn fast_append(
+self,
+commit_uuid: Option,
+key_metadata: Vec,
+) -> Result> {
+let parent_snapshot_id = self
+.table
+.metadata()
+.current_snapshot()
+.map(|s| s.snapshot_id());
+let snapshot_id = self.generate_unique_snapshot_id();
+let schema = self.table.metadata().current_schema().as_ref().clone();
+let schema_id = schema.schema_id();
+let format_version = self.table.metadata().format_version();
+let partition_spec = 
self.table.metadata().default_partition_spec().clone();

Review Comment:
   Do we have an advantage of extracting `parent_snapshot_id`, `schema`, 
`schema_id`, `format_version` and `partition_spec` here? The `tx` is passed 
owned to the FastAppendAction and in turn owns `table`. So we have a bit of a 
redundancy. I think I would just calculate those values when needed instead of 
duplicating them eagerly. This would make the `SnapshotProduceAction` a bit 
slimmer.



##
crates/e2e_test/Cargo.toml:
##
@@ -0,0 +1,37 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+[package]
+name = "iceberg-e2e_test"

Review Comment:
   should we mix "-" and "_" here?
   As our main crate is "iceberg-catalog-rest", I would have opted for 
"iceberg-e2e_test".



##
crates/iceberg/src/transaction.rs:
##
@@ -122,6 +189,383 @@ impl<'a> Transaction<'a> {
 }
 }
 
+/// FastAppendAction is a transaction action for fast append data files to the 
table.
+pub struct FastAppendAction<'a> {
+snapshot_produce_action: SnapshotProduceAction<'a>,
+}
+
+impl<'a> FastAppendAction<'a> {
+#[allow(clippy::too_many_arguments)]
+pub(crate) fn new(
+tx: Transaction<'a>,
+parent_snapshot_id: Option,
+snapshot_id: i64,
+schema: Schema,
+schema_id: i32,

Review Comment:
   And we have it again in `tx.table.schma` - see also my comment in the 
`fast_append` method



##
crates/iceberg/src/spec/manifest_list.rs:
##
@@ -106,34 +106,38 @@ impl std::fmt::Debug for ManifestListWriter {
 
 impl ManifestListWriter {
 /// Construct a v1 [`ManifestListWriter`] that writes to a provided 
[`OutputFile`].
-pub fn v1(output_file: OutputFile, snapshot_id: i64, parent_snapshot_id: 
i64) -> Self {
-let metadata = HashMap::from_iter([
+pub fn v1(output_file: OutputFile, snapshot_id: i64, parent_snapshot_id: 
Option) -> Self {
+let mut metadata = HashMap::from_iter([
 ("snapshot-id".to_string(), snapshot_id.to_string()),
-(
-"parent-snapshot-id".to_string(),
-parent_snapshot_id.to_string(),
-),
 ("format-version".to_string(), "1".to_string()),
 ]);
+if let Some(parent_snapshot_id) = parent_snapshot_id {
+metadata.insert(
+"parent-snapshot-id".to_string(),
+parent_snapshot_id.to_string(),
+);
+}
 Self::new(FormatVersion::V1, output_file, metadata, 0, snapshot_id)
 }
 
 /// Construct a v2 [`ManifestListWriter`] that writes to a provided 
[`OutputFile`].
 pub fn v2(
 output_file: OutputFile,
 snapshot_id: i64,
-parent_snapshot_id

Re: [PR] feat: support append data file and add e2e test [iceberg-rust]

2024-11-21 Thread via GitHub


c-thiel commented on code in PR #349:
URL: https://github.com/apache/iceberg-rust/pull/349#discussion_r1852078486


##
crates/e2e_test/Cargo.toml:
##
@@ -0,0 +1,37 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+[package]
+name = "iceberg-e2e_test"

Review Comment:
   should we mix "-" and "_" here?
   As our main crate is "iceberg-catalog-rest", I would have opted for 
"iceberg-e2e-test".



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] feat: support append data file and add e2e test [iceberg-rust]

2024-11-19 Thread via GitHub


ZENOTME commented on PR #349:
URL: https://github.com/apache/iceberg-rust/pull/349#issuecomment-2487346035

   > @ZENOTME Thanks, `make test` also runs successfully for me. I'm pretty 
sure that the test works, but I want to assert certain things on the metadata. 
Having the IDE to set breakpoints makes life much easier. When I run it in my 
IDE it gets stuck on waiting for the REST catalog. When I open up the ports I 
get the exception that the security token is invalid, which is probably an 
issue with the signing.
   > 
   > Edit: I'm unable to get it to work on my local machine, could you check if 
you can fix that? We want to be able to run these kind of tests from our IDE as 
well. Looping in @Xuanwo since this might be related to OpenDAL.
   
   Hi, I try to set breakpoints in vscode and it works. I set the checkpoint at 
https://github.com/apache/iceberg-rust/blob/4262a09ecac2c98bda22860e6e45b80f3010a82c/crates/e2e_test/src/lib.rs#L68,
 and then it can successfully continue to run.
   
   Could you give more detail about how to reproduce this, e.g. where to set 
the breakpoint?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] feat: support append data file and add e2e test [iceberg-rust]

2024-11-19 Thread via GitHub


Fokko commented on PR #349:
URL: https://github.com/apache/iceberg-rust/pull/349#issuecomment-2485342410

   @ZENOTME Thanks, `make test` also runs successfully for me. I'm pretty sure 
that the test works, but I want to assert certain things on the metadata. 
Having the IDE to set breakpoints makes life much easier. When I run it in my 
IDE it gets stuck on waiting for the REST catalog. When I open up the ports I 
get the exception that the security token is invalid, which is probably an 
issue with the signing.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] feat: support append data file and add e2e test [iceberg-rust]

2024-11-19 Thread via GitHub


Fokko commented on code in PR #349:
URL: https://github.com/apache/iceberg-rust/pull/349#discussion_r1847922844


##
crates/iceberg/src/spec/manifest.rs:
##
@@ -1328,7 +1334,7 @@ mod _serde {
 Ok(Self {
 content: value.content as i32,
 file_path: value.file_path,
-file_format: value.file_format.to_string(),
+file_format: 
value.file_format.to_string().to_ascii_uppercase(),

Review Comment:
   Good catch, I've created an issue to get that fixed: 
https://github.com/apache/iceberg-python/issues/1340



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] feat: support append data file and add e2e test [iceberg-rust]

2024-11-18 Thread via GitHub


ZENOTME commented on code in PR #349:
URL: https://github.com/apache/iceberg-rust/pull/349#discussion_r1847748258


##
crates/iceberg/src/spec/manifest.rs:
##
@@ -1328,7 +1334,7 @@ mod _serde {
 Ok(Self {
 content: value.content as i32,
 file_path: value.file_path,
-file_format: value.file_format.to_string(),
+file_format: 
value.file_format.to_string().to_ascii_uppercase(),

Review Comment:
   Pyiceberg fail to read if this file format is not uppercase. BTW, we can add 
test using pyiceberg to ensure the write is valid for other SDK later.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] feat: support append data file and add e2e test [iceberg-rust]

2024-11-18 Thread via GitHub


ZENOTME commented on PR #349:
URL: https://github.com/apache/iceberg-rust/pull/349#issuecomment-2484580413

   > @ZENOTME Are you able to run the tests from your IDE?
   > 
   > When running from RustRover, the test is stuck on starting the containers. 
When I change the code to exclude the starting of the container, I get:
   > 
   > ```json
   > { code: "InvalidTokenId", message: "The security token included in the 
request is invalid", resource: 
"/icebergdata/demo/apple/ios/t1/data/test-0.parquet", request_id: 
"18092B0D0CE2D09A" }
   > ```
   > 
   > I'll dig deeper
   
   I can run the test `make test` in the terminal successfully.🤔


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] feat: support append data file and add e2e test [iceberg-rust]

2024-11-18 Thread via GitHub


Fokko commented on PR #349:
URL: https://github.com/apache/iceberg-rust/pull/349#issuecomment-2484102346

   @ZENOTME Are you able to run the tests from your IDE?
   
   When running from RustRover, the test is stuck on starting the containers. 
When I change the code to exclude the starting of the container, I get:
   
   ```json
   { code: "InvalidTokenId", message: "The security token included in the 
request is invalid", resource: 
"/icebergdata/demo/apple/ios/t1/data/test-0.parquet", request_id: 
"18092B0D0CE2D09A" }
   ```
   
   I'll dig deeper


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] feat: support append data file and add e2e test [iceberg-rust]

2024-11-18 Thread via GitHub


Fokko commented on code in PR #349:
URL: https://github.com/apache/iceberg-rust/pull/349#discussion_r1847225254


##
crates/e2e_test/tests/conflict_commit_test.rs:
##
@@ -0,0 +1,196 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Integration tests for rest catalog.
+
+use std::collections::HashMap;
+use std::sync::Arc;
+
+use arrow_array::{ArrayRef, BooleanArray, Int32Array, RecordBatch, 
StringArray};
+use futures::TryStreamExt;
+use iceberg::io::{S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION, 
S3_SECRET_ACCESS_KEY};
+use iceberg::spec::{NestedField, PrimitiveType, Schema, Type};
+use iceberg::transaction::Transaction;
+use iceberg::writer::base_writer::data_file_writer::{DataFileWriterBuilder, 
DataFileWriterConfig};
+use iceberg::writer::file_writer::location_generator::{
+DefaultFileNameGenerator, DefaultLocationGenerator,
+};
+use iceberg::writer::file_writer::ParquetWriterBuilder;
+use iceberg::writer::{IcebergWriter, IcebergWriterBuilder};
+use iceberg::{Catalog, Namespace, NamespaceIdent, TableCreation};
+use iceberg_catalog_rest::{RestCatalog, RestCatalogConfig};
+use iceberg_test_utils::docker::DockerCompose;
+use iceberg_test_utils::{normalize_test_name, set_up};
+use parquet::file::properties::WriterProperties;
+use port_scanner::scan_port_addr;
+use tokio::time::sleep;
+
+const REST_CATALOG_PORT: u16 = 8181;
+
+struct TestFixture {
+_docker_compose: DockerCompose,
+rest_catalog: RestCatalog,
+}
+
+async fn set_test_fixture(func: &str) -> TestFixture {
+set_up();
+let docker_compose = DockerCompose::new(
+normalize_test_name(format!("{}_{func}", module_path!())),
+format!("{}/testdata", env!("CARGO_MANIFEST_DIR")),
+);
+
+// Start docker compose
+docker_compose.run();
+
+let rest_catalog_ip = docker_compose.get_container_ip("rest");
+
+let read_port = format!("{}:{}", rest_catalog_ip, REST_CATALOG_PORT);
+loop {
+if !scan_port_addr(&read_port) {
+log::info!("Waiting for 1s rest catalog to ready...");
+sleep(std::time::Duration::from_millis(1000)).await;
+} else {
+break;
+}
+}
+
+let container_ip = docker_compose.get_container_ip("minio");
+let read_port = format!("{}:{}", container_ip, 9000);
+
+let config = RestCatalogConfig::builder()
+.uri(format!("http://{}:{}";, rest_catalog_ip, REST_CATALOG_PORT))
+.props(HashMap::from([
+(S3_ENDPOINT.to_string(), format!("http://{}";, read_port)),
+(S3_ACCESS_KEY_ID.to_string(), "admin".to_string()),
+(S3_SECRET_ACCESS_KEY.to_string(), "password".to_string()),
+(S3_REGION.to_string(), "us-east-1".to_string()),
+]))
+.build();
+let rest_catalog = RestCatalog::new(config);
+
+TestFixture {
+_docker_compose: docker_compose,
+rest_catalog,
+}
+}
+
+#[tokio::test]
+async fn test_append_data_file_conflict() {
+let fixture = set_test_fixture("test_create_table").await;
+
+let ns = Namespace::with_properties(
+NamespaceIdent::from_strs(["apple", "ios"]).unwrap(),
+HashMap::from([
+("owner".to_string(), "ray".to_string()),
+("community".to_string(), "apache".to_string()),
+]),
+);
+
+fixture
+.rest_catalog
+.create_namespace(ns.name(), ns.properties().clone())
+.await
+.unwrap();
+
+let schema = Schema::builder()
+.with_schema_id(1)
+.with_identifier_field_ids(vec![2])
+.with_fields(vec![
+NestedField::optional(1, "foo", 
Type::Primitive(PrimitiveType::String)).into(),
+NestedField::required(2, "bar", 
Type::Primitive(PrimitiveType::Int)).into(),
+NestedField::optional(3, "baz", 
Type::Primitive(PrimitiveType::Boolean)).into(),
+])
+.build()
+.unwrap();
+
+let table_creation = TableCreation::builder()
+.name("t1".to_string())
+.schema(schema.clone())
+.build();
+
+let table = fixture
+.rest_catalog
+.create_table(ns.name(), table_creation)
+.await
+.unwrap();
+
+// Create the writer and write the da

Re: [PR] feat: support append data file and add e2e test [iceberg-rust]

2024-11-18 Thread via GitHub


Fokko commented on code in PR #349:
URL: https://github.com/apache/iceberg-rust/pull/349#discussion_r1847168059


##
crates/iceberg/src/transaction.rs:
##
@@ -96,6 +109,60 @@ impl<'a> Transaction<'a> {
 Ok(self)
 }
 
+fn generate_unique_snapshot_id(&self) -> i64 {
+let generate_random_id = || -> i64 {
+let (lhs, rhs) = Uuid::new_v4().as_u64_pair();
+let snapshot_id = (lhs ^ rhs) as i64;
+if snapshot_id < 0 {
+-snapshot_id
+} else {
+snapshot_id
+}
+};
+let mut snapshot_id = generate_random_id();
+while self
+.table
+.metadata()
+.snapshots()
+.any(|s| s.snapshot_id() == snapshot_id)
+{
+snapshot_id = generate_random_id();
+}
+snapshot_id
+}
+
+/// Creates a fast append action.
+pub fn fast_append(
+self,
+commit_uuid: Option,
+key_metadata: Vec,
+) -> Result> {
+let parent_snapshot_id = self
+.table
+.metadata()
+.current_snapshot()
+.map(|s| s.snapshot_id());
+let snapshot_id = self.generate_unique_snapshot_id();
+let schema = self.table.metadata().current_schema().as_ref().clone();
+let schema_id = schema.schema_id();
+let format_version = self.table.metadata().format_version();
+let partition_spec = 
self.table.metadata().default_partition_spec().clone();
+let commit_uuid = commit_uuid.unwrap_or_else(|| 
uuid::Uuid::new_v4().to_string());

Review Comment:
   Not needed
   ```suggestion
   let commit_uuid = commit_uuid.unwrap_or_else(|| 
Uuid::new_v4().to_string());
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] feat: support append data file and add e2e test [iceberg-rust]

2024-11-18 Thread via GitHub


Fokko commented on code in PR #349:
URL: https://github.com/apache/iceberg-rust/pull/349#discussion_r1847167614


##
crates/iceberg/src/transaction.rs:
##
@@ -96,6 +109,60 @@ impl<'a> Transaction<'a> {
 Ok(self)
 }
 
+fn generate_unique_snapshot_id(&self) -> i64 {
+let generate_random_id = || -> i64 {
+let (lhs, rhs) = Uuid::new_v4().as_u64_pair();
+let snapshot_id = (lhs ^ rhs) as i64;
+if snapshot_id < 0 {
+-snapshot_id
+} else {
+snapshot_id
+}
+};
+let mut snapshot_id = generate_random_id();
+while self
+.table
+.metadata()
+.snapshots()
+.any(|s| s.snapshot_id() == snapshot_id)
+{
+snapshot_id = generate_random_id();
+}
+snapshot_id
+}
+
+/// Creates a fast append action.
+pub fn fast_append(
+self,
+commit_uuid: Option,

Review Comment:
   ```suggestion
   commit_uuid: Option,
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] feat: support append data file and add e2e test [iceberg-rust]

2024-11-18 Thread via GitHub


ZENOTME commented on PR #349:
URL: https://github.com/apache/iceberg-rust/pull/349#issuecomment-2483287118

   > @ZENOTME Sorry for leaving this hanging for so long, I'll do another pass 
today. I wanted to do it, but running into some issues with RustRover. I have 
to step out for some other issues, but I'll do another check of the metadata 
today.
   
   Thanks!
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] feat: support append data file and add e2e test [iceberg-rust]

2024-11-18 Thread via GitHub


ZENOTME commented on code in PR #349:
URL: https://github.com/apache/iceberg-rust/pull/349#discussion_r1846727927


##
crates/e2e_test/tests/conflict_commit_test.rs:
##
@@ -0,0 +1,196 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Integration tests for rest catalog.
+
+use std::collections::HashMap;
+use std::sync::Arc;
+
+use arrow_array::{ArrayRef, BooleanArray, Int32Array, RecordBatch, 
StringArray};
+use futures::TryStreamExt;
+use iceberg::io::{S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION, 
S3_SECRET_ACCESS_KEY};
+use iceberg::spec::{NestedField, PrimitiveType, Schema, Type};
+use iceberg::transaction::Transaction;
+use iceberg::writer::base_writer::data_file_writer::{DataFileWriterBuilder, 
DataFileWriterConfig};
+use iceberg::writer::file_writer::location_generator::{
+DefaultFileNameGenerator, DefaultLocationGenerator,
+};
+use iceberg::writer::file_writer::ParquetWriterBuilder;
+use iceberg::writer::{IcebergWriter, IcebergWriterBuilder};
+use iceberg::{Catalog, Namespace, NamespaceIdent, TableCreation};
+use iceberg_catalog_rest::{RestCatalog, RestCatalogConfig};
+use iceberg_test_utils::docker::DockerCompose;
+use iceberg_test_utils::{normalize_test_name, set_up};
+use parquet::file::properties::WriterProperties;
+use port_scanner::scan_port_addr;
+use tokio::time::sleep;
+
+const REST_CATALOG_PORT: u16 = 8181;
+
+struct TestFixture {
+_docker_compose: DockerCompose,
+rest_catalog: RestCatalog,
+}
+
+async fn set_test_fixture(func: &str) -> TestFixture {
+set_up();
+let docker_compose = DockerCompose::new(
+normalize_test_name(format!("{}_{func}", module_path!())),
+format!("{}/testdata", env!("CARGO_MANIFEST_DIR")),
+);
+
+// Start docker compose
+docker_compose.run();
+
+let rest_catalog_ip = docker_compose.get_container_ip("rest");
+
+let read_port = format!("{}:{}", rest_catalog_ip, REST_CATALOG_PORT);
+loop {
+if !scan_port_addr(&read_port) {
+log::info!("Waiting for 1s rest catalog to ready...");
+sleep(std::time::Duration::from_millis(1000)).await;
+} else {
+break;
+}
+}
+
+let container_ip = docker_compose.get_container_ip("minio");
+let read_port = format!("{}:{}", container_ip, 9000);
+
+let config = RestCatalogConfig::builder()
+.uri(format!("http://{}:{}";, rest_catalog_ip, REST_CATALOG_PORT))
+.props(HashMap::from([
+(S3_ENDPOINT.to_string(), format!("http://{}";, read_port)),
+(S3_ACCESS_KEY_ID.to_string(), "admin".to_string()),
+(S3_SECRET_ACCESS_KEY.to_string(), "password".to_string()),
+(S3_REGION.to_string(), "us-east-1".to_string()),
+]))
+.build();
+let rest_catalog = RestCatalog::new(config);
+
+TestFixture {
+_docker_compose: docker_compose,
+rest_catalog,
+}
+}
+
+#[tokio::test]
+async fn test_append_data_file_conflict() {
+let fixture = set_test_fixture("test_create_table").await;
+
+let ns = Namespace::with_properties(
+NamespaceIdent::from_strs(["apple", "ios"]).unwrap(),
+HashMap::from([
+("owner".to_string(), "ray".to_string()),
+("community".to_string(), "apache".to_string()),
+]),
+);
+
+fixture
+.rest_catalog
+.create_namespace(ns.name(), ns.properties().clone())
+.await
+.unwrap();
+
+let schema = Schema::builder()
+.with_schema_id(1)
+.with_identifier_field_ids(vec![2])
+.with_fields(vec![
+NestedField::optional(1, "foo", 
Type::Primitive(PrimitiveType::String)).into(),
+NestedField::required(2, "bar", 
Type::Primitive(PrimitiveType::Int)).into(),
+NestedField::optional(3, "baz", 
Type::Primitive(PrimitiveType::Boolean)).into(),
+])
+.build()
+.unwrap();
+
+let table_creation = TableCreation::builder()
+.name("t1".to_string())
+.schema(schema.clone())
+.build();
+
+let table = fixture
+.rest_catalog
+.create_table(ns.name(), table_creation)
+.await
+.unwrap();
+
+// Create the writer and write the 

Re: [PR] feat: support append data file and add e2e test [iceberg-rust]

2024-11-18 Thread via GitHub


Fokko commented on PR #349:
URL: https://github.com/apache/iceberg-rust/pull/349#issuecomment-2483245755

   @ZENOTME Sorry for leaving this hanging for so long, I'll do another pass 
today. I wanted to do it, but running into some issues with RustRover. I have 
to step out for some other issues, but I'll do another check of the metadata 
today.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] feat: support append data file and add e2e test [iceberg-rust]

2024-11-18 Thread via GitHub


Fokko commented on code in PR #349:
URL: https://github.com/apache/iceberg-rust/pull/349#discussion_r1846645067


##
crates/iceberg/src/transaction.rs:
##
@@ -122,6 +189,387 @@ impl<'a> Transaction<'a> {
 }
 }
 
+/// FastAppendAction is a transaction action for fast append data files to the 
table.
+pub struct FastAppendAction<'a> {
+snapshot_produce_action: SnapshotProduceAction<'a>,
+}
+
+impl<'a> FastAppendAction<'a> {
+#[allow(clippy::too_many_arguments)]
+pub(crate) fn new(
+tx: Transaction<'a>,
+parent_snapshot_id: Option,
+snapshot_id: i64,
+schema: Schema,
+schema_id: i32,
+format_version: FormatVersion,
+partition_spec: Arc,
+key_metadata: Vec,
+commit_uuid: String,
+snapshot_properties: HashMap,
+) -> Result {
+Ok(Self {
+snapshot_produce_action: SnapshotProduceAction::new(
+tx,
+snapshot_id,
+parent_snapshot_id,
+schema_id,
+format_version,
+partition_spec,
+schema,
+key_metadata,
+commit_uuid,
+snapshot_properties,
+)?,
+})
+}
+
+/// Add data files to the snapshot.
+pub fn add_data_files(
+&mut self,
+data_files: impl IntoIterator,
+) -> Result<&mut Self> {
+self.snapshot_produce_action.add_data_files(data_files)?;
+Ok(self)
+}
+
+/// Finished building the action and apply it to the transaction.
+pub async fn apply(self) -> Result> {
+self.snapshot_produce_action
+.apply(FastAppendOperation, DefaultManifestProcess)
+.await
+}
+}
+
+struct FastAppendOperation;
+
+impl SnapshotProduceOperation for FastAppendOperation {
+fn operation(&self) -> Operation {
+Operation::Append
+}
+
+async fn delete_entries(
+&self,
+_snapshot_produce: &SnapshotProduceAction<'_>,
+) -> Result> {
+Ok(vec![])
+}
+
+async fn existing_manifest(
+&self,
+snapshot_produce: &SnapshotProduceAction<'_>,
+) -> Result> {
+let Some(snapshot) = snapshot_produce
+.parent_snapshot_id
+.and_then(|id| 
snapshot_produce.tx.table.metadata().snapshot_by_id(id))
+else {
+return Ok(vec![]);
+};
+
+let manifest_list = snapshot
+.load_manifest_list(
+snapshot_produce.tx.table.file_io(),
+&snapshot_produce.tx.table.metadata_ref(),
+)
+.await?;
+
+Ok(manifest_list
+.entries()
+.iter()
+.filter(|entry| {
+entry.has_added_files()
+|| entry.has_existing_files()
+|| entry.added_snapshot_id == snapshot_produce.snapshot_id

Review Comment:
   Thanks for checking that, I would be in favor of leaving it out for now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] feat: support append data file and add e2e test [iceberg-rust]

2024-11-18 Thread via GitHub


Fokko commented on code in PR #349:
URL: https://github.com/apache/iceberg-rust/pull/349#discussion_r1846625846


##
crates/e2e_test/tests/conflict_commit_test.rs:
##
@@ -0,0 +1,196 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Integration tests for rest catalog.
+
+use std::collections::HashMap;
+use std::sync::Arc;
+
+use arrow_array::{ArrayRef, BooleanArray, Int32Array, RecordBatch, 
StringArray};
+use futures::TryStreamExt;
+use iceberg::io::{S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION, 
S3_SECRET_ACCESS_KEY};
+use iceberg::spec::{NestedField, PrimitiveType, Schema, Type};
+use iceberg::transaction::Transaction;
+use iceberg::writer::base_writer::data_file_writer::{DataFileWriterBuilder, 
DataFileWriterConfig};
+use iceberg::writer::file_writer::location_generator::{
+DefaultFileNameGenerator, DefaultLocationGenerator,
+};
+use iceberg::writer::file_writer::ParquetWriterBuilder;
+use iceberg::writer::{IcebergWriter, IcebergWriterBuilder};
+use iceberg::{Catalog, Namespace, NamespaceIdent, TableCreation};
+use iceberg_catalog_rest::{RestCatalog, RestCatalogConfig};
+use iceberg_test_utils::docker::DockerCompose;
+use iceberg_test_utils::{normalize_test_name, set_up};
+use parquet::file::properties::WriterProperties;
+use port_scanner::scan_port_addr;
+use tokio::time::sleep;
+
+const REST_CATALOG_PORT: u16 = 8181;
+
+struct TestFixture {
+_docker_compose: DockerCompose,
+rest_catalog: RestCatalog,
+}
+
+async fn set_test_fixture(func: &str) -> TestFixture {
+set_up();
+let docker_compose = DockerCompose::new(
+normalize_test_name(format!("{}_{func}", module_path!())),
+format!("{}/testdata", env!("CARGO_MANIFEST_DIR")),
+);
+
+// Start docker compose
+docker_compose.run();
+
+let rest_catalog_ip = docker_compose.get_container_ip("rest");
+
+let read_port = format!("{}:{}", rest_catalog_ip, REST_CATALOG_PORT);
+loop {
+if !scan_port_addr(&read_port) {
+log::info!("Waiting for 1s rest catalog to ready...");
+sleep(std::time::Duration::from_millis(1000)).await;
+} else {
+break;
+}
+}
+
+let container_ip = docker_compose.get_container_ip("minio");
+let read_port = format!("{}:{}", container_ip, 9000);
+
+let config = RestCatalogConfig::builder()
+.uri(format!("http://{}:{}";, rest_catalog_ip, REST_CATALOG_PORT))
+.props(HashMap::from([
+(S3_ENDPOINT.to_string(), format!("http://{}";, read_port)),
+(S3_ACCESS_KEY_ID.to_string(), "admin".to_string()),
+(S3_SECRET_ACCESS_KEY.to_string(), "password".to_string()),
+(S3_REGION.to_string(), "us-east-1".to_string()),
+]))
+.build();
+let rest_catalog = RestCatalog::new(config);
+
+TestFixture {
+_docker_compose: docker_compose,
+rest_catalog,
+}
+}
+
+#[tokio::test]
+async fn test_append_data_file_conflict() {
+let fixture = set_test_fixture("test_create_table").await;
+
+let ns = Namespace::with_properties(
+NamespaceIdent::from_strs(["apple", "ios"]).unwrap(),
+HashMap::from([
+("owner".to_string(), "ray".to_string()),
+("community".to_string(), "apache".to_string()),
+]),
+);
+
+fixture
+.rest_catalog
+.create_namespace(ns.name(), ns.properties().clone())
+.await
+.unwrap();
+
+let schema = Schema::builder()
+.with_schema_id(1)
+.with_identifier_field_ids(vec![2])
+.with_fields(vec![
+NestedField::optional(1, "foo", 
Type::Primitive(PrimitiveType::String)).into(),
+NestedField::required(2, "bar", 
Type::Primitive(PrimitiveType::Int)).into(),
+NestedField::optional(3, "baz", 
Type::Primitive(PrimitiveType::Boolean)).into(),
+])
+.build()
+.unwrap();
+
+let table_creation = TableCreation::builder()
+.name("t1".to_string())
+.schema(schema.clone())
+.build();
+
+let table = fixture
+.rest_catalog
+.create_table(ns.name(), table_creation)
+.await
+.unwrap();
+
+// Create the writer and write the da

Re: [PR] feat: support append data file and add e2e test [iceberg-rust]

2024-11-18 Thread via GitHub


Fokko commented on code in PR #349:
URL: https://github.com/apache/iceberg-rust/pull/349#discussion_r1846624096


##
crates/e2e_test/tests/conflict_commit_test.rs:
##
@@ -0,0 +1,196 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Integration tests for rest catalog.
+
+use std::collections::HashMap;
+use std::sync::Arc;
+
+use arrow_array::{ArrayRef, BooleanArray, Int32Array, RecordBatch, 
StringArray};
+use futures::TryStreamExt;
+use iceberg::io::{S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION, 
S3_SECRET_ACCESS_KEY};
+use iceberg::spec::{NestedField, PrimitiveType, Schema, Type};
+use iceberg::transaction::Transaction;
+use iceberg::writer::base_writer::data_file_writer::{DataFileWriterBuilder, 
DataFileWriterConfig};
+use iceberg::writer::file_writer::location_generator::{
+DefaultFileNameGenerator, DefaultLocationGenerator,
+};
+use iceberg::writer::file_writer::ParquetWriterBuilder;
+use iceberg::writer::{IcebergWriter, IcebergWriterBuilder};
+use iceberg::{Catalog, Namespace, NamespaceIdent, TableCreation};
+use iceberg_catalog_rest::{RestCatalog, RestCatalogConfig};
+use iceberg_test_utils::docker::DockerCompose;
+use iceberg_test_utils::{normalize_test_name, set_up};
+use parquet::file::properties::WriterProperties;
+use port_scanner::scan_port_addr;
+use tokio::time::sleep;
+
+const REST_CATALOG_PORT: u16 = 8181;
+
+struct TestFixture {
+_docker_compose: DockerCompose,
+rest_catalog: RestCatalog,
+}
+
+async fn set_test_fixture(func: &str) -> TestFixture {
+set_up();
+let docker_compose = DockerCompose::new(
+normalize_test_name(format!("{}_{func}", module_path!())),
+format!("{}/testdata", env!("CARGO_MANIFEST_DIR")),
+);
+
+// Start docker compose
+docker_compose.run();
+
+let rest_catalog_ip = docker_compose.get_container_ip("rest");
+
+let read_port = format!("{}:{}", rest_catalog_ip, REST_CATALOG_PORT);
+loop {
+if !scan_port_addr(&read_port) {
+log::info!("Waiting for 1s rest catalog to ready...");
+sleep(std::time::Duration::from_millis(1000)).await;
+} else {
+break;
+}
+}
+
+let container_ip = docker_compose.get_container_ip("minio");
+let read_port = format!("{}:{}", container_ip, 9000);
+
+let config = RestCatalogConfig::builder()
+.uri(format!("http://{}:{}";, rest_catalog_ip, REST_CATALOG_PORT))
+.props(HashMap::from([
+(S3_ENDPOINT.to_string(), format!("http://{}";, read_port)),

Review Comment:
   I noticed that we have the configuration twice (also in 
`append_data_file_test.rs`), should we consolidate these?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] feat: support append data file and add e2e test [iceberg-rust]

2024-11-15 Thread via GitHub


ZENOTME commented on code in PR #349:
URL: https://github.com/apache/iceberg-rust/pull/349#discussion_r1844241268


##
crates/iceberg/src/transaction.rs:
##
@@ -122,6 +189,387 @@ impl<'a> Transaction<'a> {
 }
 }
 
+/// FastAppendAction is a transaction action for fast append data files to the 
table.
+pub struct FastAppendAction<'a> {
+snapshot_produce_action: SnapshotProduceAction<'a>,
+}
+
+impl<'a> FastAppendAction<'a> {
+#[allow(clippy::too_many_arguments)]
+pub(crate) fn new(
+tx: Transaction<'a>,
+parent_snapshot_id: Option,
+snapshot_id: i64,
+schema: Schema,
+schema_id: i32,
+format_version: FormatVersion,
+partition_spec: Arc,
+key_metadata: Vec,
+commit_uuid: String,
+snapshot_properties: HashMap,
+) -> Result {
+Ok(Self {
+snapshot_produce_action: SnapshotProduceAction::new(
+tx,
+snapshot_id,
+parent_snapshot_id,
+schema_id,
+format_version,
+partition_spec,
+schema,
+key_metadata,
+commit_uuid,
+snapshot_properties,
+)?,
+})
+}
+
+/// Add data files to the snapshot.
+pub fn add_data_files(
+&mut self,
+data_files: impl IntoIterator,
+) -> Result<&mut Self> {
+self.snapshot_produce_action.add_data_files(data_files)?;
+Ok(self)
+}
+
+/// Finished building the action and apply it to the transaction.
+pub async fn apply(self) -> Result> {
+self.snapshot_produce_action
+.apply(FastAppendOperation, DefaultManifestProcess)
+.await
+}
+}
+
+struct FastAppendOperation;
+
+impl SnapshotProduceOperation for FastAppendOperation {
+fn operation(&self) -> Operation {
+Operation::Append
+}
+
+async fn delete_entries(
+&self,
+_snapshot_produce: &SnapshotProduceAction<'_>,
+) -> Result> {
+Ok(vec![])
+}
+
+async fn existing_manifest(
+&self,
+snapshot_produce: &SnapshotProduceAction<'_>,
+) -> Result> {
+let Some(snapshot) = snapshot_produce
+.parent_snapshot_id
+.and_then(|id| 
snapshot_produce.tx.table.metadata().snapshot_by_id(id))
+else {
+return Ok(vec![]);
+};
+
+let manifest_list = snapshot
+.load_manifest_list(
+snapshot_produce.tx.table.file_io(),
+&snapshot_produce.tx.table.metadata_ref(),
+)
+.await?;
+
+Ok(manifest_list
+.entries()
+.iter()
+.filter(|entry| {
+entry.has_added_files()
+|| entry.has_existing_files()
+|| entry.added_snapshot_id == snapshot_produce.snapshot_id

Review Comment:
   For this check, explanation comes from @Fokko 
   
   > We've copied this from the Java code: 
[https://github.com/apache/iceberg/blob/307593ffd99752b2d62cc91f4928285fc0c62b75/co[…]e/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java](https://github.com/apache/iceberg/blob/307593ffd99752b2d62cc91f4928285fc0c62b75/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java#L934-L939)
   > I don't think this is used in PyIceberg today. In Java it is possible to 
add existing manifests to a commit, while in PyIceberg we only accept 
datafiles. This is on purpose since the ability to add manifests also comes 
with problems of its own.
   
   I think this check is also useless for iceberg-rust right now. To avoid 
confusion, maybe we should:
   1. Add a comment to explain it 
   2. Or discard it now and add it back when we support adding existing 
manifests like java.



##
crates/iceberg/src/transaction.rs:
##
@@ -122,6 +189,387 @@ impl<'a> Transaction<'a> {
 }
 }
 
+/// FastAppendAction is a transaction action for fast append data files to the 
table.
+pub struct FastAppendAction<'a> {
+snapshot_produce_action: SnapshotProduceAction<'a>,
+}
+
+impl<'a> FastAppendAction<'a> {
+#[allow(clippy::too_many_arguments)]
+pub(crate) fn new(
+tx: Transaction<'a>,
+parent_snapshot_id: Option,
+snapshot_id: i64,
+schema: Schema,
+schema_id: i32,
+format_version: FormatVersion,
+partition_spec: Arc,
+key_metadata: Vec,
+commit_uuid: String,
+snapshot_properties: HashMap,
+) -> Result {
+Ok(Self {
+snapshot_produce_action: SnapshotProduceAction::new(
+tx,
+snapshot_id,
+parent_snapshot_id,
+schema_id,
+format_version,
+partition_spec,
+schema,
+key_metadata,
+commit_uuid,
+snapshot_properties,
+   

Re: [PR] feat: support append data file and add e2e test [iceberg-rust]

2024-11-14 Thread via GitHub


ZENOTME commented on PR #349:
URL: https://github.com/apache/iceberg-rust/pull/349#issuecomment-2477887517

   Hi, I think this PR has been blocked for a long time. Recently AFAIK there 
have been some users who want to get write ability.  Can we make progress on 
this? cc @Fokko @liurenjie1024 @Xuanwo


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] feat: support append data file and add e2e test [iceberg-rust]

2024-11-14 Thread via GitHub


ZENOTME commented on PR #696:
URL: https://github.com/apache/iceberg-rust/pull/696#issuecomment-2475882857

   Sorry, Please ignore this mistake operation.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] feat: support append data file and add e2e test [iceberg-rust]

2024-11-14 Thread via GitHub


ZENOTME closed pull request #696: feat: support append data file and add e2e 
test
URL: https://github.com/apache/iceberg-rust/pull/696


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] feat: support append data file and add e2e test [iceberg-rust]

2024-11-14 Thread via GitHub


ZENOTME commented on code in PR #349:
URL: https://github.com/apache/iceberg-rust/pull/349#discussion_r1841846043


##
crates/e2e_test/tests/append_data_file_test.rs:
##
@@ -0,0 +1,234 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Integration tests for rest catalog.
+
+use arrow_array::{ArrayRef, BooleanArray, Int32Array, RecordBatch, 
StringArray};
+use futures::TryStreamExt;
+use iceberg::io::{S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION, 
S3_SECRET_ACCESS_KEY};
+use iceberg::spec::{NestedField, PrimitiveType, Schema, Type};
+use iceberg::transaction::Transaction;
+use iceberg::writer::base_writer::data_file_writer::{DataFileWriterBuilder, 
DataFileWriterConfig};
+use iceberg::writer::file_writer::location_generator::{
+DefaultFileNameGenerator, DefaultLocationGenerator,
+};
+use iceberg::writer::file_writer::ParquetWriterBuilder;
+use iceberg::writer::{IcebergWriter, IcebergWriterBuilder};
+use iceberg::{Catalog, Namespace, NamespaceIdent, TableCreation};
+use iceberg_catalog_rest::{RestCatalog, RestCatalogConfig};
+use iceberg_test_utils::docker::DockerCompose;
+use iceberg_test_utils::{normalize_test_name, set_up};
+use parquet::arrow::arrow_reader::ArrowReaderOptions;
+use parquet::file::properties::WriterProperties;
+use port_scanner::scan_port_addr;
+use std::collections::HashMap;
+use std::sync::Arc;
+use tokio::time::sleep;
+
+const REST_CATALOG_PORT: u16 = 8181;
+
+struct TestFixture {
+_docker_compose: DockerCompose,
+rest_catalog: RestCatalog,
+}
+
+async fn set_test_fixture(func: &str) -> TestFixture {
+set_up();
+let docker_compose = DockerCompose::new(
+normalize_test_name(format!("{}_{func}", module_path!())),
+format!("{}/testdata", env!("CARGO_MANIFEST_DIR")),
+);
+
+// Start docker compose
+docker_compose.run();
+
+let rest_catalog_ip = docker_compose.get_container_ip("rest");
+
+let read_port = format!("{}:{}", rest_catalog_ip, REST_CATALOG_PORT);
+loop {
+if !scan_port_addr(&read_port) {
+log::info!("Waiting for 1s rest catalog to ready...");
+sleep(std::time::Duration::from_millis(1000)).await;
+} else {
+break;
+}
+}
+
+let container_ip = docker_compose.get_container_ip("minio");
+let read_port = format!("{}:{}", container_ip, 9000);
+
+let config = RestCatalogConfig::builder()
+.uri(format!("http://{}:{}";, rest_catalog_ip, REST_CATALOG_PORT))
+.props(HashMap::from([
+(S3_ENDPOINT.to_string(), format!("http://{}";, read_port)),
+(S3_ACCESS_KEY_ID.to_string(), "admin".to_string()),
+(S3_SECRET_ACCESS_KEY.to_string(), "password".to_string()),
+(S3_REGION.to_string(), "us-east-1".to_string()),
+]))
+.build();
+let rest_catalog = RestCatalog::new(config).await.unwrap();
+
+TestFixture {
+_docker_compose: docker_compose,
+rest_catalog,
+}
+}
+
+#[tokio::test]
+async fn test_append_data_file() {
+let fixture = set_test_fixture("test_create_table").await;
+
+let ns = Namespace::with_properties(
+NamespaceIdent::from_strs(["apple", "ios"]).unwrap(),
+HashMap::from([
+("owner".to_string(), "ray".to_string()),
+("community".to_string(), "apache".to_string()),
+]),
+);
+
+fixture
+.rest_catalog
+.create_namespace(ns.name(), ns.properties().clone())
+.await
+.unwrap();
+
+let schema = Schema::builder()
+.with_schema_id(1)
+.with_identifier_field_ids(vec![2])
+.with_fields(vec![
+NestedField::optional(1, "foo", 
Type::Primitive(PrimitiveType::String)).into(),
+NestedField::required(2, "bar", 
Type::Primitive(PrimitiveType::Int)).into(),
+NestedField::optional(3, "baz", 
Type::Primitive(PrimitiveType::Boolean)).into(),
+])
+.build()
+.unwrap();
+
+let table_creation = TableCreation::builder()
+.name("t1".to_string())
+.schema(schema.clone())
+.build();
+
+let table = fixture
+.rest_catalog
+.create_table(ns.name(), table_creation)

Review Comment:
 

Re: [PR] feat: support append data file and add e2e test [iceberg-rust]

2024-09-26 Thread via GitHub


ZENOTME commented on PR #349:
URL: https://github.com/apache/iceberg-rust/pull/349#issuecomment-2376706216

   > We're introducing a lot of new concepts here and generate a lot of open 
end (snapshot summary generation, metrics collection, schema compatibility 
checks, etc). I think it would be best to break this PR into smaller pieces. 
For example, I'm not sure if the way we create the fast-append is very 
extensible and I think we can copy a lot from PyIceberg where we track the 
changes of the metadata.
   
   Sorry for being late. Recently I'm back to active for this PR now. I have 
separated some code from this PR; for now, this mainly contains the support for 
the append action of the transaction. I have refactored the design of fast 
append to the following design of PyIceberg so that is more extensible. Feel 
free to make any suggestions that things need to change. cc @Fokko 
@liurenjie1024 @Xuanwo 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] feat: support append data file and add e2e test [iceberg-rust]

2024-05-23 Thread via GitHub


Fokko commented on code in PR #349:
URL: https://github.com/apache/iceberg-rust/pull/349#discussion_r1611289703


##
crates/iceberg/src/io.rs:
##
@@ -368,6 +368,9 @@ impl Storage {
 new_props.insert("root".to_string(), DEFAULT_ROOT_PATH.to_string());
 
 match scheme {
+Scheme::Memory => Ok(Self::LocalFs {

Review Comment:
   I don't think we validate the requirements anywhere today either :(



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] feat: support append data file and add e2e test [iceberg-rust]

2024-05-23 Thread via GitHub


Fokko commented on code in PR #349:
URL: https://github.com/apache/iceberg-rust/pull/349#discussion_r1611285227


##
crates/e2e_test/tests/append_data_file_test.rs:
##
@@ -0,0 +1,234 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Integration tests for rest catalog.
+
+use arrow_array::{ArrayRef, BooleanArray, Int32Array, RecordBatch, 
StringArray};
+use futures::TryStreamExt;
+use iceberg::io::{S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION, 
S3_SECRET_ACCESS_KEY};
+use iceberg::spec::{NestedField, PrimitiveType, Schema, Type};
+use iceberg::transaction::Transaction;
+use iceberg::writer::base_writer::data_file_writer::{DataFileWriterBuilder, 
DataFileWriterConfig};
+use iceberg::writer::file_writer::location_generator::{
+DefaultFileNameGenerator, DefaultLocationGenerator,
+};
+use iceberg::writer::file_writer::ParquetWriterBuilder;
+use iceberg::writer::{IcebergWriter, IcebergWriterBuilder};
+use iceberg::{Catalog, Namespace, NamespaceIdent, TableCreation};
+use iceberg_catalog_rest::{RestCatalog, RestCatalogConfig};
+use iceberg_test_utils::docker::DockerCompose;
+use iceberg_test_utils::{normalize_test_name, set_up};
+use parquet::arrow::arrow_reader::ArrowReaderOptions;
+use parquet::file::properties::WriterProperties;
+use port_scanner::scan_port_addr;
+use std::collections::HashMap;
+use std::sync::Arc;
+use tokio::time::sleep;
+
+const REST_CATALOG_PORT: u16 = 8181;
+
+struct TestFixture {
+_docker_compose: DockerCompose,
+rest_catalog: RestCatalog,
+}
+
+async fn set_test_fixture(func: &str) -> TestFixture {
+set_up();
+let docker_compose = DockerCompose::new(
+normalize_test_name(format!("{}_{func}", module_path!())),
+format!("{}/testdata", env!("CARGO_MANIFEST_DIR")),
+);
+
+// Start docker compose
+docker_compose.run();
+
+let rest_catalog_ip = docker_compose.get_container_ip("rest");
+
+let read_port = format!("{}:{}", rest_catalog_ip, REST_CATALOG_PORT);
+loop {
+if !scan_port_addr(&read_port) {
+log::info!("Waiting for 1s rest catalog to ready...");
+sleep(std::time::Duration::from_millis(1000)).await;
+} else {
+break;
+}
+}
+
+let container_ip = docker_compose.get_container_ip("minio");
+let read_port = format!("{}:{}", container_ip, 9000);
+
+let config = RestCatalogConfig::builder()
+.uri(format!("http://{}:{}";, rest_catalog_ip, REST_CATALOG_PORT))
+.props(HashMap::from([
+(S3_ENDPOINT.to_string(), format!("http://{}";, read_port)),
+(S3_ACCESS_KEY_ID.to_string(), "admin".to_string()),
+(S3_SECRET_ACCESS_KEY.to_string(), "password".to_string()),
+(S3_REGION.to_string(), "us-east-1".to_string()),
+]))
+.build();
+let rest_catalog = RestCatalog::new(config).await.unwrap();
+
+TestFixture {
+_docker_compose: docker_compose,
+rest_catalog,
+}
+}
+
+#[tokio::test]
+async fn test_append_data_file() {
+let fixture = set_test_fixture("test_create_table").await;
+
+let ns = Namespace::with_properties(
+NamespaceIdent::from_strs(["apple", "ios"]).unwrap(),
+HashMap::from([
+("owner".to_string(), "ray".to_string()),
+("community".to_string(), "apache".to_string()),
+]),
+);
+
+fixture
+.rest_catalog
+.create_namespace(ns.name(), ns.properties().clone())
+.await
+.unwrap();
+
+let schema = Schema::builder()
+.with_schema_id(1)
+.with_identifier_field_ids(vec![2])
+.with_fields(vec![
+NestedField::optional(1, "foo", 
Type::Primitive(PrimitiveType::String)).into(),
+NestedField::required(2, "bar", 
Type::Primitive(PrimitiveType::Int)).into(),
+NestedField::optional(3, "baz", 
Type::Primitive(PrimitiveType::Boolean)).into(),
+])
+.build()
+.unwrap();
+
+let table_creation = TableCreation::builder()
+.name("t1".to_string())
+.schema(schema.clone())
+.build();
+
+let table = fixture
+.rest_catalog
+.create_table(ns.name(), table_creation)

Review Comment:
   

Re: [PR] feat: support append data file and add e2e test [iceberg-rust]

2024-05-23 Thread via GitHub


Fokko commented on code in PR #349:
URL: https://github.com/apache/iceberg-rust/pull/349#discussion_r1611283989


##
crates/iceberg/src/transaction.rs:
##
@@ -121,6 +190,313 @@ impl<'a> Transaction<'a> {
 }
 }
 
+/// FastAppendAction is a transaction action for fast append data files to the 
table.
+pub struct FastAppendAction<'a> {
+tx: Transaction<'a>,
+
+parent_snapshot_id: Option,
+snapshot_id: i64,
+schema: Schema,
+schema_id: i32,
+format_version: FormatVersion,
+partition_spec: PartitionSpec,
+key_metadata: Vec,
+
+commit_uuid: String,
+manifest_id: i64,
+
+appended_data_files: Vec,
+}
+
+impl<'a> FastAppendAction<'a> {
+#[allow(clippy::too_many_arguments)]
+pub(crate) fn new(
+tx: Transaction<'a>,
+parent_snapshot_id: Option,
+snapshot_id: i64,
+schema: Schema,
+schema_id: i32,
+format_version: FormatVersion,
+partition_spec: PartitionSpec,
+key_metadata: Vec,
+commit_uuid: String,
+) -> Result {
+Ok(Self {
+tx,
+parent_snapshot_id,
+snapshot_id,
+schema,
+schema_id,
+format_version,
+partition_spec,
+key_metadata,
+commit_uuid,
+manifest_id: 0,
+appended_data_files: vec![],
+})
+}
+
+// Check if the partition value is compatible with the partition type.
+fn validate_partition_value(
+partition_value: &Struct,
+partition_type: &StructType,
+) -> Result<()> {
+if partition_value.fields().len() != partition_type.fields().len() {
+return Err(Error::new(
+ErrorKind::DataInvalid,
+"Partition value is not compatitable with partition type",
+));
+}
+if partition_value
+.fields()
+.iter()
+.zip(partition_type.fields())
+.any(|(field, field_type)| 
!field_type.field_type.compatible(field))
+{
+return Err(Error::new(
+ErrorKind::DataInvalid,
+"Partition value is not compatitable partition type",
+));
+}
+Ok(())
+}
+
+/// Add data files to the snapshot.
+pub fn add_data_files(
+&mut self,
+data_files: impl IntoIterator,
+) -> Result<&mut Self> {
+let data_files: Vec = data_files.into_iter().collect();
+for data_file in &data_files {
+if data_file.content_type() != crate::spec::DataContentType::Data {
+return Err(Error::new(
+ErrorKind::DataInvalid,
+"Only data content type is allowed for fast append",
+));
+}
+Self::validate_partition_value(
+data_file.partition(),
+&self.partition_spec.partition_type(&self.schema)?,
+)?;
+}
+self.appended_data_files.extend(data_files);
+Ok(self)
+}
+
+fn generate_manifest_file_path(&mut self) -> String {
+let manifest_id = self.manifest_id;
+self.manifest_id += 1;
+format!(
+"{}/{}/{}-m{}.{}",
+self.tx.table.metadata().location(),
+META_ROOT_PATH,
+&self.commit_uuid,
+manifest_id,
+DataFileFormat::Avro
+)
+}
+
+async fn manifest_from_parent_snapshot(&self) -> Result> 
{
+if let Some(snapshot) = self.tx.table.metadata().current_snapshot() {
+let manifest_list = snapshot
+.load_manifest_list(self.tx.table.file_io(), 
&self.tx.table.metadata_ref())
+.await?;
+let mut manifest_files = 
Vec::with_capacity(manifest_list.entries().len());
+for entry in manifest_list.entries() {
+// From: 
https://github.com/apache/iceberg-python/blob/659a951d6397ab280cae80206fe6e8e4be2d3738/pyiceberg/table/__init__.py#L2921
+// Why we need this?
+if entry.added_snapshot_id == self.snapshot_id {
+continue;
+}
+let manifest = 
entry.load_manifest(self.tx.table.file_io()).await?;
+// Skip manifest with all delete entries.
+if manifest.entries().iter().all(|entry| !entry.is_alive()) {
+continue;
+}
+manifest_files.push(entry.clone());
+}
+Ok(manifest_files)
+} else {
+Ok(vec![])
+}
+}
+
+// Write manifest file for added data files and return the ManifestFile 
for ManifestList.
+async fn manifest_for_data_file(&mut self) -> Result {
+let appended_data_files = std::mem::take(&mut 
self.appended_data_files);
+let manifest_entries = appended_data_files
+.into_iter()
+.map(|data_file| 

Re: [PR] feat: support append data file and add e2e test [iceberg-rust]

2024-05-23 Thread via GitHub


ZENOTME commented on code in PR #349:
URL: https://github.com/apache/iceberg-rust/pull/349#discussion_r1611275943


##
crates/iceberg/src/transaction.rs:
##
@@ -121,6 +166,270 @@ impl<'a> Transaction<'a> {
 }
 }
 
+/// FastAppendAction is a transaction action for fast append data files to the 
table.
+pub struct FastAppendAction<'a> {
+tx: Transaction<'a>,
+
+parent_snapshot_id: Option,
+snapshot_id: i64,
+schema: Schema,
+schema_id: i32,
+format_version: FormatVersion,
+partition_spec: PartitionSpec,
+key_metadata: Vec,
+
+commit_uuid: String,
+manifest_id: i64,
+
+appended_data_files: Vec,
+}
+
+impl<'a> FastAppendAction<'a> {
+#[allow(clippy::too_many_arguments)]
+pub(crate) fn new(
+tx: Transaction<'a>,
+parent_snapshot_id: Option,
+snapshot_id: i64,
+schema: Schema,
+schema_id: i32,
+format_version: FormatVersion,
+partition_spec: PartitionSpec,
+key_metadata: Vec,
+commit_uuid: String,
+) -> Result {
+Ok(Self {
+tx,
+parent_snapshot_id,
+snapshot_id,
+schema,
+schema_id,
+format_version,
+partition_spec,
+key_metadata,
+commit_uuid,
+manifest_id: 0,
+appended_data_files: vec![],
+})
+}
+
+/// Add data files to the snapshot.
+pub fn add_data_files(
+&mut self,
+data_file: impl IntoIterator,
+) -> Result<&mut Self> {
+self.appended_data_files.extend(data_file);
+Ok(self)
+}
+
+fn generate_manifest_file_path(&mut self) -> String {
+let manifest_id = self.manifest_id;
+self.manifest_id += 1;
+format!(
+"{}/{}/{}-m{}.{}",
+self.tx.table.metadata().location(),
+META_ROOT_PATH,
+&self.commit_uuid,
+manifest_id,
+DataFileFormat::Avro
+)
+}
+
+async fn manifest_from_parent_snapshot(&self) -> Result> 
{
+if let Some(snapshot) = self.tx.table.metadata().current_snapshot() {
+let manifest_list = snapshot
+.load_manifest_list(self.tx.table.file_io(), 
&self.tx.table.metadata_ref())
+.await?;
+let mut manifest_files = 
Vec::with_capacity(manifest_list.entries().len());
+for entry in manifest_list.entries() {
+// From: 
https://github.com/apache/iceberg-python/blob/659a951d6397ab280cae80206fe6e8e4be2d3738/pyiceberg/table/__init__.py#L2921
+// Why we need this?
+if entry.added_snapshot_id == self.snapshot_id {
+continue;
+}
+let manifest = 
entry.load_manifest(self.tx.table.file_io()).await?;
+// Skip manifest with all delete entries.
+if manifest.entries().iter().all(|entry| !entry.is_alive()) {
+continue;
+}
+manifest_files.push(entry.clone());
+}
+Ok(manifest_files)
+} else {
+Ok(vec![])
+}
+}
+
+// Write manifest file for added data files and return the ManifestFile 
for ManifestList.
+async fn manifest_for_data_file(&mut self) -> Result {
+let appended_data_files = std::mem::take(&mut 
self.appended_data_files);
+let manifest_entries = appended_data_files
+.into_iter()
+.map(|data_file| {
+ManifestEntry::builder()
+.status(crate::spec::ManifestStatus::Added)
+.snapshot_id(self.snapshot_id)
+.data_file(data_file)
+.build()
+})
+.collect();
+let manifest_meta = ManifestMetadata::builder()
+.schema(self.schema.clone())
+.schema_id(self.schema_id)
+.format_version(self.format_version)
+.partition_spec(self.partition_spec.clone())

Review Comment:
   LGTM. I will separate the partition table support out to this PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] feat: support append data file and add e2e test [iceberg-rust]

2024-05-23 Thread via GitHub


Fokko commented on code in PR #349:
URL: https://github.com/apache/iceberg-rust/pull/349#discussion_r1611273092


##
crates/iceberg/src/transaction.rs:
##
@@ -121,6 +190,313 @@ impl<'a> Transaction<'a> {
 }
 }
 
+/// FastAppendAction is a transaction action for fast append data files to the 
table.
+pub struct FastAppendAction<'a> {
+tx: Transaction<'a>,
+
+parent_snapshot_id: Option,
+snapshot_id: i64,
+schema: Schema,
+schema_id: i32,
+format_version: FormatVersion,
+partition_spec: PartitionSpec,
+key_metadata: Vec,
+
+commit_uuid: String,
+manifest_id: i64,
+
+appended_data_files: Vec,
+}
+
+impl<'a> FastAppendAction<'a> {
+#[allow(clippy::too_many_arguments)]
+pub(crate) fn new(
+tx: Transaction<'a>,
+parent_snapshot_id: Option,
+snapshot_id: i64,
+schema: Schema,
+schema_id: i32,
+format_version: FormatVersion,
+partition_spec: PartitionSpec,
+key_metadata: Vec,
+commit_uuid: String,
+) -> Result {
+Ok(Self {
+tx,
+parent_snapshot_id,
+snapshot_id,
+schema,
+schema_id,
+format_version,
+partition_spec,
+key_metadata,
+commit_uuid,
+manifest_id: 0,
+appended_data_files: vec![],
+})
+}
+
+// Check if the partition value is compatible with the partition type.
+fn validate_partition_value(
+partition_value: &Struct,
+partition_type: &StructType,
+) -> Result<()> {
+if partition_value.fields().len() != partition_type.fields().len() {
+return Err(Error::new(
+ErrorKind::DataInvalid,
+"Partition value is not compatitable with partition type",
+));
+}
+if partition_value
+.fields()
+.iter()
+.zip(partition_type.fields())
+.any(|(field, field_type)| 
!field_type.field_type.compatible(field))
+{
+return Err(Error::new(
+ErrorKind::DataInvalid,
+"Partition value is not compatitable partition type",
+));
+}
+Ok(())
+}
+
+/// Add data files to the snapshot.
+pub fn add_data_files(
+&mut self,
+data_files: impl IntoIterator,
+) -> Result<&mut Self> {
+let data_files: Vec = data_files.into_iter().collect();
+for data_file in &data_files {
+if data_file.content_type() != crate::spec::DataContentType::Data {
+return Err(Error::new(
+ErrorKind::DataInvalid,
+"Only data content type is allowed for fast append",
+));
+}
+Self::validate_partition_value(
+data_file.partition(),
+&self.partition_spec.partition_type(&self.schema)?,
+)?;
+}
+self.appended_data_files.extend(data_files);
+Ok(self)
+}
+
+fn generate_manifest_file_path(&mut self) -> String {
+let manifest_id = self.manifest_id;
+self.manifest_id += 1;
+format!(
+"{}/{}/{}-m{}.{}",
+self.tx.table.metadata().location(),
+META_ROOT_PATH,
+&self.commit_uuid,
+manifest_id,
+DataFileFormat::Avro
+)
+}
+
+async fn manifest_from_parent_snapshot(&self) -> Result> 
{

Review Comment:
   ```suggestion
   async fn manifests_from_parent_snapshot(&self) -> 
Result> {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] feat: support append data file and add e2e test [iceberg-rust]

2024-05-23 Thread via GitHub


ZENOTME commented on code in PR #349:
URL: https://github.com/apache/iceberg-rust/pull/349#discussion_r1611265301


##
crates/e2e_test/tests/append_data_file_test.rs:
##
@@ -0,0 +1,212 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Integration tests for rest catalog.
+
+use arrow_array::{ArrayRef, BooleanArray, Int32Array, RecordBatch, 
StringArray};
+use futures::TryStreamExt;
+use iceberg::io::{S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION, 
S3_SECRET_ACCESS_KEY};
+use iceberg::spec::{NestedField, PrimitiveType, Schema, Type};
+use iceberg::transaction::Transaction;
+use iceberg::writer::base_writer::data_file_writer::{DataFileWriterBuilder, 
DataFileWriterConfig};
+use iceberg::writer::file_writer::location_generator::{
+DefaultFileNameGenerator, DefaultLocationGenerator,
+};
+use iceberg::writer::file_writer::ParquetWriterBuilder;
+use iceberg::writer::{IcebergWriter, IcebergWriterBuilder};
+use iceberg::{Catalog, Namespace, NamespaceIdent, TableCreation};
+use iceberg_catalog_rest::{RestCatalog, RestCatalogConfig};
+use iceberg_test_utils::docker::DockerCompose;
+use iceberg_test_utils::{normalize_test_name, set_up};
+use parquet::file::properties::WriterProperties;
+use port_scanner::scan_port_addr;
+use std::collections::HashMap;
+use std::sync::Arc;
+use tokio::time::sleep;
+
+const REST_CATALOG_PORT: u16 = 8181;
+
+struct TestFixture {
+_docker_compose: DockerCompose,
+rest_catalog: RestCatalog,
+}
+
+async fn set_test_fixture(func: &str) -> TestFixture {
+set_up();
+let docker_compose = DockerCompose::new(
+normalize_test_name(format!("{}_{func}", module_path!())),
+format!("{}/testdata", env!("CARGO_MANIFEST_DIR")),
+);
+
+// Start docker compose
+docker_compose.run();
+
+let rest_catalog_ip = docker_compose.get_container_ip("rest");
+
+let read_port = format!("{}:{}", rest_catalog_ip, REST_CATALOG_PORT);
+loop {
+if !scan_port_addr(&read_port) {
+log::info!("Waiting for 1s rest catalog to ready...");
+sleep(std::time::Duration::from_millis(1000)).await;
+} else {
+break;
+}
+}
+
+let container_ip = docker_compose.get_container_ip("minio");
+let read_port = format!("{}:{}", container_ip, 9000);
+
+let config = RestCatalogConfig::builder()
+.uri(format!("http://{}:{}";, rest_catalog_ip, REST_CATALOG_PORT))
+.props(HashMap::from([
+(S3_ENDPOINT.to_string(), format!("http://{}";, read_port)),
+(S3_ACCESS_KEY_ID.to_string(), "admin".to_string()),
+(S3_SECRET_ACCESS_KEY.to_string(), "password".to_string()),
+(S3_REGION.to_string(), "us-east-1".to_string()),
+]))
+.build();
+let rest_catalog = RestCatalog::new(config).await.unwrap();
+
+TestFixture {
+_docker_compose: docker_compose,
+rest_catalog,
+}
+}
+
+#[tokio::test]
+async fn test_append_data_file() {
+let fixture = set_test_fixture("test_create_table").await;
+
+let ns = Namespace::with_properties(
+NamespaceIdent::from_strs(["apple", "ios"]).unwrap(),
+HashMap::from([
+("owner".to_string(), "ray".to_string()),
+("community".to_string(), "apache".to_string()),
+]),
+);
+
+fixture
+.rest_catalog
+.create_namespace(ns.name(), ns.properties().clone())
+.await
+.unwrap();
+
+let schema = Schema::builder()
+.with_schema_id(1)
+.with_identifier_field_ids(vec![2])
+.with_fields(vec![
+NestedField::optional(1, "foo", 
Type::Primitive(PrimitiveType::String)).into(),
+NestedField::required(2, "bar", 
Type::Primitive(PrimitiveType::Int)).into(),
+NestedField::optional(3, "baz", 
Type::Primitive(PrimitiveType::Boolean)).into(),
+])
+.build()
+.unwrap();
+
+let table_creation = TableCreation::builder()
+.name("t1".to_string())
+.schema(schema.clone())
+.build();
+
+let table = fixture
+.rest_catalog
+.create_table(ns.name(), table_creation)
+.await
+.unwrap();
+
+// Create the writer and write

Re: [PR] feat: support append data file and add e2e test [iceberg-rust]

2024-05-23 Thread via GitHub


Fokko commented on code in PR #349:
URL: https://github.com/apache/iceberg-rust/pull/349#discussion_r1611264064


##
crates/iceberg/src/spec/datatypes.rs:
##
@@ -260,6 +279,29 @@ impl<'de> Deserialize<'de> for PrimitiveType {
 }
 }
 
+impl PrimitiveType {
+/// Check whether literal is compatible with the type.
+pub fn compatible(&self, literal: &PrimitiveLiteral) -> bool {
+matches!(
+(self, literal),
+(PrimitiveType::Boolean, PrimitiveLiteral::Boolean(_))
+| (PrimitiveType::Int, PrimitiveLiteral::Int(_))
+| (PrimitiveType::Long, PrimitiveLiteral::Long(_))
+| (PrimitiveType::Float, PrimitiveLiteral::Float(_))
+| (PrimitiveType::Double, PrimitiveLiteral::Double(_))
+| (PrimitiveType::Decimal { .. }, PrimitiveLiteral::Decimal(_))
+| (PrimitiveType::Date, PrimitiveLiteral::Date(_))
+| (PrimitiveType::Time, PrimitiveLiteral::Time(_))
+| (PrimitiveType::Timestamp, PrimitiveLiteral::Timestamp(_))
+| (PrimitiveType::Timestamptz, 
PrimitiveLiteral::TimestampTZ(_))

Review Comment:
   Some painful differences in casing 😱 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] feat: support append data file and add e2e test [iceberg-rust]

2024-05-23 Thread via GitHub


Fokko commented on code in PR #349:
URL: https://github.com/apache/iceberg-rust/pull/349#discussion_r1611263732


##
crates/iceberg/src/spec/datatypes.rs:
##
@@ -260,6 +279,29 @@ impl<'de> Deserialize<'de> for PrimitiveType {
 }
 }
 
+impl PrimitiveType {
+/// Check whether literal is compatible with the type.
+pub fn compatible(&self, literal: &PrimitiveLiteral) -> bool {
+matches!(
+(self, literal),
+(PrimitiveType::Boolean, PrimitiveLiteral::Boolean(_))
+| (PrimitiveType::Int, PrimitiveLiteral::Int(_))
+| (PrimitiveType::Long, PrimitiveLiteral::Long(_))
+| (PrimitiveType::Float, PrimitiveLiteral::Float(_))
+| (PrimitiveType::Double, PrimitiveLiteral::Double(_))
+| (PrimitiveType::Decimal { .. }, PrimitiveLiteral::Decimal(_))

Review Comment:
   If we decide to go this route, we probably also want to check the 
precision/scale.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] feat: support append data file and add e2e test [iceberg-rust]

2024-05-23 Thread via GitHub


Fokko commented on code in PR #349:
URL: https://github.com/apache/iceberg-rust/pull/349#discussion_r1611261437


##
crates/iceberg/src/transaction.rs:
##
@@ -121,6 +166,270 @@ impl<'a> Transaction<'a> {
 }
 }
 
+/// FastAppendAction is a transaction action for fast append data files to the 
table.
+pub struct FastAppendAction<'a> {
+tx: Transaction<'a>,
+
+parent_snapshot_id: Option,
+snapshot_id: i64,
+schema: Schema,
+schema_id: i32,
+format_version: FormatVersion,
+partition_spec: PartitionSpec,
+key_metadata: Vec,
+
+commit_uuid: String,
+manifest_id: i64,
+
+appended_data_files: Vec,
+}
+
+impl<'a> FastAppendAction<'a> {
+#[allow(clippy::too_many_arguments)]
+pub(crate) fn new(
+tx: Transaction<'a>,
+parent_snapshot_id: Option,
+snapshot_id: i64,
+schema: Schema,
+schema_id: i32,
+format_version: FormatVersion,
+partition_spec: PartitionSpec,
+key_metadata: Vec,
+commit_uuid: String,
+) -> Result {
+Ok(Self {
+tx,
+parent_snapshot_id,
+snapshot_id,
+schema,
+schema_id,
+format_version,
+partition_spec,
+key_metadata,
+commit_uuid,
+manifest_id: 0,
+appended_data_files: vec![],
+})
+}
+
+/// Add data files to the snapshot.
+pub fn add_data_files(
+&mut self,
+data_file: impl IntoIterator,
+) -> Result<&mut Self> {
+self.appended_data_files.extend(data_file);
+Ok(self)
+}
+
+fn generate_manifest_file_path(&mut self) -> String {
+let manifest_id = self.manifest_id;
+self.manifest_id += 1;
+format!(
+"{}/{}/{}-m{}.{}",
+self.tx.table.metadata().location(),
+META_ROOT_PATH,
+&self.commit_uuid,
+manifest_id,
+DataFileFormat::Avro
+)
+}
+
+async fn manifest_from_parent_snapshot(&self) -> Result> 
{
+if let Some(snapshot) = self.tx.table.metadata().current_snapshot() {
+let manifest_list = snapshot
+.load_manifest_list(self.tx.table.file_io(), 
&self.tx.table.metadata_ref())
+.await?;
+let mut manifest_files = 
Vec::with_capacity(manifest_list.entries().len());
+for entry in manifest_list.entries() {
+// From: 
https://github.com/apache/iceberg-python/blob/659a951d6397ab280cae80206fe6e8e4be2d3738/pyiceberg/table/__init__.py#L2921
+// Why we need this?
+if entry.added_snapshot_id == self.snapshot_id {
+continue;
+}
+let manifest = 
entry.load_manifest(self.tx.table.file_io()).await?;
+// Skip manifest with all delete entries.
+if manifest.entries().iter().all(|entry| !entry.is_alive()) {
+continue;
+}
+manifest_files.push(entry.clone());
+}
+Ok(manifest_files)
+} else {
+Ok(vec![])
+}
+}
+
+// Write manifest file for added data files and return the ManifestFile 
for ManifestList.
+async fn manifest_for_data_file(&mut self) -> Result {
+let appended_data_files = std::mem::take(&mut 
self.appended_data_files);
+let manifest_entries = appended_data_files
+.into_iter()
+.map(|data_file| {
+ManifestEntry::builder()
+.status(crate::spec::ManifestStatus::Added)
+.snapshot_id(self.snapshot_id)
+.data_file(data_file)
+.build()
+})
+.collect();
+let manifest_meta = ManifestMetadata::builder()
+.schema(self.schema.clone())
+.schema_id(self.schema_id)
+.format_version(self.format_version)
+.partition_spec(self.partition_spec.clone())

Review Comment:
   How about only accepting unpartitioned tables at first?  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] feat: support append data file and add e2e test [iceberg-rust]

2024-05-23 Thread via GitHub


Fokko commented on code in PR #349:
URL: https://github.com/apache/iceberg-rust/pull/349#discussion_r1611259929


##
crates/iceberg/src/io.rs:
##
@@ -368,6 +368,9 @@ impl Storage {
 new_props.insert("root".to_string(), DEFAULT_ROOT_PATH.to_string());
 
 match scheme {
+Scheme::Memory => Ok(Self::LocalFs {

Review Comment:
   I would highly recommend the Rest catalog that we use for PyIceberg as well: 
https://github.com/apache/iceberg-python/blob/996afd0c44717d6ac345b8419bf01b25be2d6051/dev/docker-compose-integration.yml#L43-L56
   
   The nice thing here that it will also validate the responses



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] feat: support append data file and add e2e test [iceberg-rust]

2024-05-23 Thread via GitHub


Fokko commented on code in PR #349:
URL: https://github.com/apache/iceberg-rust/pull/349#discussion_r1611247468


##
crates/e2e_test/tests/append_data_file_test.rs:
##
@@ -0,0 +1,212 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Integration tests for rest catalog.
+
+use arrow_array::{ArrayRef, BooleanArray, Int32Array, RecordBatch, 
StringArray};
+use futures::TryStreamExt;
+use iceberg::io::{S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION, 
S3_SECRET_ACCESS_KEY};
+use iceberg::spec::{NestedField, PrimitiveType, Schema, Type};
+use iceberg::transaction::Transaction;
+use iceberg::writer::base_writer::data_file_writer::{DataFileWriterBuilder, 
DataFileWriterConfig};
+use iceberg::writer::file_writer::location_generator::{
+DefaultFileNameGenerator, DefaultLocationGenerator,
+};
+use iceberg::writer::file_writer::ParquetWriterBuilder;
+use iceberg::writer::{IcebergWriter, IcebergWriterBuilder};
+use iceberg::{Catalog, Namespace, NamespaceIdent, TableCreation};
+use iceberg_catalog_rest::{RestCatalog, RestCatalogConfig};
+use iceberg_test_utils::docker::DockerCompose;
+use iceberg_test_utils::{normalize_test_name, set_up};
+use parquet::file::properties::WriterProperties;
+use port_scanner::scan_port_addr;
+use std::collections::HashMap;
+use std::sync::Arc;
+use tokio::time::sleep;
+
+const REST_CATALOG_PORT: u16 = 8181;
+
+struct TestFixture {
+_docker_compose: DockerCompose,
+rest_catalog: RestCatalog,
+}
+
+async fn set_test_fixture(func: &str) -> TestFixture {
+set_up();
+let docker_compose = DockerCompose::new(
+normalize_test_name(format!("{}_{func}", module_path!())),
+format!("{}/testdata", env!("CARGO_MANIFEST_DIR")),
+);
+
+// Start docker compose
+docker_compose.run();
+
+let rest_catalog_ip = docker_compose.get_container_ip("rest");
+
+let read_port = format!("{}:{}", rest_catalog_ip, REST_CATALOG_PORT);
+loop {
+if !scan_port_addr(&read_port) {
+log::info!("Waiting for 1s rest catalog to ready...");
+sleep(std::time::Duration::from_millis(1000)).await;
+} else {
+break;
+}
+}
+
+let container_ip = docker_compose.get_container_ip("minio");
+let read_port = format!("{}:{}", container_ip, 9000);
+
+let config = RestCatalogConfig::builder()
+.uri(format!("http://{}:{}";, rest_catalog_ip, REST_CATALOG_PORT))
+.props(HashMap::from([
+(S3_ENDPOINT.to_string(), format!("http://{}";, read_port)),
+(S3_ACCESS_KEY_ID.to_string(), "admin".to_string()),
+(S3_SECRET_ACCESS_KEY.to_string(), "password".to_string()),
+(S3_REGION.to_string(), "us-east-1".to_string()),
+]))
+.build();
+let rest_catalog = RestCatalog::new(config).await.unwrap();
+
+TestFixture {
+_docker_compose: docker_compose,
+rest_catalog,
+}
+}
+
+#[tokio::test]
+async fn test_append_data_file() {
+let fixture = set_test_fixture("test_create_table").await;
+
+let ns = Namespace::with_properties(
+NamespaceIdent::from_strs(["apple", "ios"]).unwrap(),
+HashMap::from([
+("owner".to_string(), "ray".to_string()),
+("community".to_string(), "apache".to_string()),
+]),
+);
+
+fixture
+.rest_catalog
+.create_namespace(ns.name(), ns.properties().clone())
+.await
+.unwrap();
+
+let schema = Schema::builder()
+.with_schema_id(1)
+.with_identifier_field_ids(vec![2])
+.with_fields(vec![
+NestedField::optional(1, "foo", 
Type::Primitive(PrimitiveType::String)).into(),
+NestedField::required(2, "bar", 
Type::Primitive(PrimitiveType::Int)).into(),
+NestedField::optional(3, "baz", 
Type::Primitive(PrimitiveType::Boolean)).into(),
+])
+.build()
+.unwrap();
+
+let table_creation = TableCreation::builder()
+.name("t1".to_string())
+.schema(schema.clone())
+.build();
+
+let table = fixture
+.rest_catalog
+.create_table(ns.name(), table_creation)
+.await
+.unwrap();
+
+// Create the writer and write t

Re: [PR] feat: support append data file and add e2e test [iceberg-rust]

2024-05-23 Thread via GitHub


Fokko commented on code in PR #349:
URL: https://github.com/apache/iceberg-rust/pull/349#discussion_r1611244871


##
crates/iceberg/src/transaction.rs:
##
@@ -121,6 +166,270 @@ impl<'a> Transaction<'a> {
 }
 }
 
+/// FastAppendAction is a transaction action for fast append data files to the 
table.
+pub struct FastAppendAction<'a> {
+tx: Transaction<'a>,
+
+parent_snapshot_id: Option,
+snapshot_id: i64,
+schema: Schema,
+schema_id: i32,
+format_version: FormatVersion,
+partition_spec: PartitionSpec,
+key_metadata: Vec,
+
+commit_uuid: String,
+manifest_id: i64,
+
+appended_data_files: Vec,
+}
+
+impl<'a> FastAppendAction<'a> {
+#[allow(clippy::too_many_arguments)]
+pub(crate) fn new(
+tx: Transaction<'a>,
+parent_snapshot_id: Option,
+snapshot_id: i64,
+schema: Schema,
+schema_id: i32,
+format_version: FormatVersion,
+partition_spec: PartitionSpec,
+key_metadata: Vec,
+commit_uuid: String,
+) -> Result {
+Ok(Self {
+tx,
+parent_snapshot_id,
+snapshot_id,
+schema,
+schema_id,
+format_version,
+partition_spec,
+key_metadata,
+commit_uuid,
+manifest_id: 0,
+appended_data_files: vec![],
+})
+}
+
+/// Add data files to the snapshot.
+pub fn add_data_files(
+&mut self,
+data_file: impl IntoIterator,
+) -> Result<&mut Self> {
+self.appended_data_files.extend(data_file);
+Ok(self)
+}
+
+fn generate_manifest_file_path(&mut self) -> String {
+let manifest_id = self.manifest_id;
+self.manifest_id += 1;
+format!(
+"{}/{}/{}-m{}.{}",
+self.tx.table.metadata().location(),
+META_ROOT_PATH,
+&self.commit_uuid,
+manifest_id,
+DataFileFormat::Avro
+)
+}
+
+async fn manifest_from_parent_snapshot(&self) -> Result> 
{
+if let Some(snapshot) = self.tx.table.metadata().current_snapshot() {
+let manifest_list = snapshot
+.load_manifest_list(self.tx.table.file_io(), 
&self.tx.table.metadata_ref())
+.await?;
+let mut manifest_files = 
Vec::with_capacity(manifest_list.entries().len());
+for entry in manifest_list.entries() {
+// From: 
https://github.com/apache/iceberg-python/blob/659a951d6397ab280cae80206fe6e8e4be2d3738/pyiceberg/table/__init__.py#L2921
+// Why we need this?
+if entry.added_snapshot_id == self.snapshot_id {
+continue;
+}
+let manifest = 
entry.load_manifest(self.tx.table.file_io()).await?;
+// Skip manifest with all delete entries.
+if manifest.entries().iter().all(|entry| !entry.is_alive()) {
+continue;
+}
+manifest_files.push(entry.clone());
+}
+Ok(manifest_files)
+} else {
+Ok(vec![])
+}
+}
+
+// Write manifest file for added data files and return the ManifestFile 
for ManifestList.
+async fn manifest_for_data_file(&mut self) -> Result {
+let appended_data_files = std::mem::take(&mut 
self.appended_data_files);
+let manifest_entries = appended_data_files
+.into_iter()
+.map(|data_file| {
+ManifestEntry::builder()
+.status(crate::spec::ManifestStatus::Added)
+.snapshot_id(self.snapshot_id)
+.data_file(data_file)
+.build()
+})
+.collect();
+let manifest_meta = ManifestMetadata::builder()
+.schema(self.schema.clone())
+.schema_id(self.schema_id)
+.format_version(self.format_version)
+.partition_spec(self.partition_spec.clone())
+.content(crate::spec::ManifestContentType::Data)
+.build();
+let manifest = Manifest::new(manifest_meta, manifest_entries);
+let writer = ManifestWriter::new(
+self.tx
+.table
+.file_io()
+.new_output(self.generate_manifest_file_path())?,
+self.snapshot_id,
+self.key_metadata.clone(),
+);
+writer.write(manifest).await
+}
+
+fn summary(&self) -> Summary {
+Summary {
+operation: crate::spec::Operation::Append,
+other: HashMap::new(),
+}
+}
+
+/// Finished building the action and apply it to the transaction.
+pub async fn apply(mut self) -> Result> {
+let summary = self.summary();
+let manifest = self.manifest_for_data_file().await?;
+let existing_manifest_files = 
self.manifest_from_par

Re: [PR] feat: support append data file and add e2e test [iceberg-rust]

2024-05-23 Thread via GitHub


Fokko commented on code in PR #349:
URL: https://github.com/apache/iceberg-rust/pull/349#discussion_r1611236654


##
crates/iceberg/src/transaction.rs:
##
@@ -121,6 +166,270 @@ impl<'a> Transaction<'a> {
 }
 }
 
+/// FastAppendAction is a transaction action for fast append data files to the 
table.
+pub struct FastAppendAction<'a> {
+tx: Transaction<'a>,
+
+parent_snapshot_id: Option,
+snapshot_id: i64,
+schema: Schema,
+schema_id: i32,
+format_version: FormatVersion,
+partition_spec: PartitionSpec,
+key_metadata: Vec,
+
+commit_uuid: String,
+manifest_id: i64,
+
+appended_data_files: Vec,
+}
+
+impl<'a> FastAppendAction<'a> {
+#[allow(clippy::too_many_arguments)]
+pub(crate) fn new(
+tx: Transaction<'a>,
+parent_snapshot_id: Option,
+snapshot_id: i64,
+schema: Schema,
+schema_id: i32,
+format_version: FormatVersion,
+partition_spec: PartitionSpec,
+key_metadata: Vec,
+commit_uuid: String,
+) -> Result {
+Ok(Self {
+tx,
+parent_snapshot_id,
+snapshot_id,
+schema,
+schema_id,
+format_version,
+partition_spec,
+key_metadata,
+commit_uuid,
+manifest_id: 0,
+appended_data_files: vec![],
+})
+}
+
+/// Add data files to the snapshot.
+pub fn add_data_files(
+&mut self,
+data_file: impl IntoIterator,
+) -> Result<&mut Self> {
+self.appended_data_files.extend(data_file);
+Ok(self)
+}
+
+fn generate_manifest_file_path(&mut self) -> String {
+let manifest_id = self.manifest_id;
+self.manifest_id += 1;
+format!(
+"{}/{}/{}-m{}.{}",
+self.tx.table.metadata().location(),
+META_ROOT_PATH,
+&self.commit_uuid,
+manifest_id,
+DataFileFormat::Avro
+)
+}
+
+async fn manifest_from_parent_snapshot(&self) -> Result> 
{
+if let Some(snapshot) = self.tx.table.metadata().current_snapshot() {
+let manifest_list = snapshot
+.load_manifest_list(self.tx.table.file_io(), 
&self.tx.table.metadata_ref())
+.await?;
+let mut manifest_files = 
Vec::with_capacity(manifest_list.entries().len());
+for entry in manifest_list.entries() {
+// From: 
https://github.com/apache/iceberg-python/blob/659a951d6397ab280cae80206fe6e8e4be2d3738/pyiceberg/table/__init__.py#L2921
+// Why we need this?
+if entry.added_snapshot_id == self.snapshot_id {
+continue;
+}
+let manifest = 
entry.load_manifest(self.tx.table.file_io()).await?;
+// Skip manifest with all delete entries.
+if manifest.entries().iter().all(|entry| !entry.is_alive()) {
+continue;
+}
+manifest_files.push(entry.clone());
+}
+Ok(manifest_files)
+} else {
+Ok(vec![])
+}
+}
+
+// Write manifest file for added data files and return the ManifestFile 
for ManifestList.
+async fn manifest_for_data_file(&mut self) -> Result {
+let appended_data_files = std::mem::take(&mut 
self.appended_data_files);
+let manifest_entries = appended_data_files
+.into_iter()
+.map(|data_file| {
+ManifestEntry::builder()
+.status(crate::spec::ManifestStatus::Added)
+.snapshot_id(self.snapshot_id)
+.data_file(data_file)
+.build()
+})
+.collect();
+let manifest_meta = ManifestMetadata::builder()
+.schema(self.schema.clone())
+.schema_id(self.schema_id)
+.format_version(self.format_version)
+.partition_spec(self.partition_spec.clone())
+.content(crate::spec::ManifestContentType::Data)
+.build();
+let manifest = Manifest::new(manifest_meta, manifest_entries);
+let writer = ManifestWriter::new(
+self.tx
+.table
+.file_io()
+.new_output(self.generate_manifest_file_path())?,
+self.snapshot_id,
+self.key_metadata.clone(),
+);
+writer.write(manifest).await
+}
+
+fn summary(&self) -> Summary {
+Summary {
+operation: crate::spec::Operation::Append,
+other: HashMap::new(),
+}
+}
+
+/// Finished building the action and apply it to the transaction.
+pub async fn apply(mut self) -> Result> {
+let summary = self.summary();
+let manifest = self.manifest_for_data_file().await?;
+let existing_manifest_files = 
self.manifest_from_par

Re: [PR] feat: support append data file and add e2e test [iceberg-rust]

2024-05-13 Thread via GitHub


ZENOTME commented on PR #349:
URL: https://github.com/apache/iceberg-rust/pull/349#issuecomment-2108162712

   Hi, I have tried to fix this PR. Some things may not be fixed well now:
   1. https://github.com/apache/iceberg-rust/pull/349#discussion_r1580444775
 I'm not sure whether my understanding is correct 
   2. todo, we can do them in later PR:
   - https://github.com/apache/iceberg-rust/pull/349#discussion_r1580446821
   - https://github.com/apache/iceberg-rust/pull/349#discussion_r1579420662
   3. https://github.com/apache/iceberg-rust/pull/349#discussion_r1580571634
   Please let me know if there are other things I miss and need to fix. cc 
@Fokko 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] feat: support append data file and add e2e test [iceberg-rust]

2024-05-13 Thread via GitHub


ZENOTME commented on code in PR #349:
URL: https://github.com/apache/iceberg-rust/pull/349#discussion_r1598744515


##
crates/iceberg/src/transaction.rs:
##
@@ -121,6 +166,270 @@ impl<'a> Transaction<'a> {
 }
 }
 
+/// FastAppendAction is a transaction action for fast append data files to the 
table.
+pub struct FastAppendAction<'a> {
+tx: Transaction<'a>,
+
+parent_snapshot_id: Option,
+snapshot_id: i64,
+schema: Schema,
+schema_id: i32,
+format_version: FormatVersion,
+partition_spec: PartitionSpec,
+key_metadata: Vec,
+
+commit_uuid: String,
+manifest_id: i64,
+
+appended_data_files: Vec,
+}
+
+impl<'a> FastAppendAction<'a> {
+#[allow(clippy::too_many_arguments)]
+pub(crate) fn new(
+tx: Transaction<'a>,
+parent_snapshot_id: Option,
+snapshot_id: i64,
+schema: Schema,
+schema_id: i32,
+format_version: FormatVersion,
+partition_spec: PartitionSpec,
+key_metadata: Vec,
+commit_uuid: String,
+) -> Result {
+Ok(Self {
+tx,
+parent_snapshot_id,
+snapshot_id,
+schema,
+schema_id,
+format_version,
+partition_spec,
+key_metadata,
+commit_uuid,
+manifest_id: 0,
+appended_data_files: vec![],
+})
+}
+
+/// Add data files to the snapshot.
+pub fn add_data_files(
+&mut self,
+data_file: impl IntoIterator,
+) -> Result<&mut Self> {
+self.appended_data_files.extend(data_file);
+Ok(self)
+}
+
+fn generate_manifest_file_path(&mut self) -> String {
+let manifest_id = self.manifest_id;
+self.manifest_id += 1;
+format!(
+"{}/{}/{}-m{}.{}",
+self.tx.table.metadata().location(),
+META_ROOT_PATH,
+&self.commit_uuid,
+manifest_id,
+DataFileFormat::Avro
+)
+}
+
+async fn manifest_from_parent_snapshot(&self) -> Result> 
{
+if let Some(snapshot) = self.tx.table.metadata().current_snapshot() {
+let manifest_list = snapshot
+.load_manifest_list(self.tx.table.file_io(), 
&self.tx.table.metadata_ref())
+.await?;
+let mut manifest_files = 
Vec::with_capacity(manifest_list.entries().len());
+for entry in manifest_list.entries() {
+// From: 
https://github.com/apache/iceberg-python/blob/659a951d6397ab280cae80206fe6e8e4be2d3738/pyiceberg/table/__init__.py#L2921
+// Why we need this?
+if entry.added_snapshot_id == self.snapshot_id {
+continue;
+}
+let manifest = 
entry.load_manifest(self.tx.table.file_io()).await?;
+// Skip manifest with all delete entries.
+if manifest.entries().iter().all(|entry| !entry.is_alive()) {
+continue;
+}
+manifest_files.push(entry.clone());
+}
+Ok(manifest_files)
+} else {
+Ok(vec![])
+}
+}
+
+// Write manifest file for added data files and return the ManifestFile 
for ManifestList.
+async fn manifest_for_data_file(&mut self) -> Result {
+let appended_data_files = std::mem::take(&mut 
self.appended_data_files);
+let manifest_entries = appended_data_files
+.into_iter()
+.map(|data_file| {
+ManifestEntry::builder()
+.status(crate::spec::ManifestStatus::Added)
+.snapshot_id(self.snapshot_id)
+.data_file(data_file)
+.build()
+})
+.collect();
+let manifest_meta = ManifestMetadata::builder()
+.schema(self.schema.clone())
+.schema_id(self.schema_id)
+.format_version(self.format_version)
+.partition_spec(self.partition_spec.clone())
+.content(crate::spec::ManifestContentType::Data)
+.build();
+let manifest = Manifest::new(manifest_meta, manifest_entries);
+let writer = ManifestWriter::new(
+self.tx
+.table
+.file_io()
+.new_output(self.generate_manifest_file_path())?,
+self.snapshot_id,
+self.key_metadata.clone(),
+);
+writer.write(manifest).await
+}
+
+fn summary(&self) -> Summary {
+Summary {
+operation: crate::spec::Operation::Append,
+other: HashMap::new(),
+}
+}
+
+/// Finished building the action and apply it to the transaction.
+pub async fn apply(mut self) -> Result> {
+let summary = self.summary();
+let manifest = self.manifest_for_data_file().await?;
+let existing_manifest_files = 
self.manifest_from_p

Re: [PR] feat: support append data file and add e2e test [iceberg-rust]

2024-05-13 Thread via GitHub


ZENOTME commented on code in PR #349:
URL: https://github.com/apache/iceberg-rust/pull/349#discussion_r1598713751


##
crates/iceberg/src/transaction.rs:
##
@@ -121,6 +166,270 @@ impl<'a> Transaction<'a> {
 }
 }
 
+/// FastAppendAction is a transaction action for fast append data files to the 
table.
+pub struct FastAppendAction<'a> {
+tx: Transaction<'a>,
+
+parent_snapshot_id: Option,
+snapshot_id: i64,
+schema: Schema,
+schema_id: i32,
+format_version: FormatVersion,
+partition_spec: PartitionSpec,
+key_metadata: Vec,
+
+commit_uuid: String,
+manifest_id: i64,
+
+appended_data_files: Vec,
+}
+
+impl<'a> FastAppendAction<'a> {
+#[allow(clippy::too_many_arguments)]
+pub(crate) fn new(
+tx: Transaction<'a>,
+parent_snapshot_id: Option,
+snapshot_id: i64,
+schema: Schema,
+schema_id: i32,
+format_version: FormatVersion,
+partition_spec: PartitionSpec,
+key_metadata: Vec,
+commit_uuid: String,
+) -> Result {
+Ok(Self {
+tx,
+parent_snapshot_id,
+snapshot_id,
+schema,
+schema_id,
+format_version,
+partition_spec,
+key_metadata,
+commit_uuid,
+manifest_id: 0,
+appended_data_files: vec![],
+})
+}
+
+/// Add data files to the snapshot.
+pub fn add_data_files(
+&mut self,
+data_file: impl IntoIterator,
+) -> Result<&mut Self> {
+self.appended_data_files.extend(data_file);
+Ok(self)
+}
+
+fn generate_manifest_file_path(&mut self) -> String {
+let manifest_id = self.manifest_id;
+self.manifest_id += 1;
+format!(
+"{}/{}/{}-m{}.{}",
+self.tx.table.metadata().location(),
+META_ROOT_PATH,
+&self.commit_uuid,
+manifest_id,
+DataFileFormat::Avro
+)
+}
+
+async fn manifest_from_parent_snapshot(&self) -> Result> 
{
+if let Some(snapshot) = self.tx.table.metadata().current_snapshot() {
+let manifest_list = snapshot
+.load_manifest_list(self.tx.table.file_io(), 
&self.tx.table.metadata_ref())
+.await?;
+let mut manifest_files = 
Vec::with_capacity(manifest_list.entries().len());
+for entry in manifest_list.entries() {
+// From: 
https://github.com/apache/iceberg-python/blob/659a951d6397ab280cae80206fe6e8e4be2d3738/pyiceberg/table/__init__.py#L2921
+// Why we need this?
+if entry.added_snapshot_id == self.snapshot_id {
+continue;
+}
+let manifest = 
entry.load_manifest(self.tx.table.file_io()).await?;
+// Skip manifest with all delete entries.
+if manifest.entries().iter().all(|entry| !entry.is_alive()) {
+continue;
+}
+manifest_files.push(entry.clone());
+}
+Ok(manifest_files)
+} else {
+Ok(vec![])
+}
+}
+
+// Write manifest file for added data files and return the ManifestFile 
for ManifestList.
+async fn manifest_for_data_file(&mut self) -> Result {
+let appended_data_files = std::mem::take(&mut 
self.appended_data_files);
+let manifest_entries = appended_data_files
+.into_iter()
+.map(|data_file| {
+ManifestEntry::builder()
+.status(crate::spec::ManifestStatus::Added)
+.snapshot_id(self.snapshot_id)
+.data_file(data_file)
+.build()
+})
+.collect();
+let manifest_meta = ManifestMetadata::builder()
+.schema(self.schema.clone())
+.schema_id(self.schema_id)
+.format_version(self.format_version)
+.partition_spec(self.partition_spec.clone())

Review Comment:
   I add the 
https://github.com/apache/iceberg-rust/blob/950c40edd751b307ee98b7a76f4242bfaef87d86/crates/iceberg/src/transaction.rs#L216
 for this 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] feat: support append data file and add e2e test [iceberg-rust]

2024-05-09 Thread via GitHub


ZENOTME commented on code in PR #349:
URL: https://github.com/apache/iceberg-rust/pull/349#discussion_r1595472104


##
crates/e2e_test/tests/append_data_file_test.rs:
##
@@ -0,0 +1,212 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Integration tests for rest catalog.
+
+use arrow_array::{ArrayRef, BooleanArray, Int32Array, RecordBatch, 
StringArray};
+use futures::TryStreamExt;
+use iceberg::io::{S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION, 
S3_SECRET_ACCESS_KEY};
+use iceberg::spec::{NestedField, PrimitiveType, Schema, Type};
+use iceberg::transaction::Transaction;
+use iceberg::writer::base_writer::data_file_writer::{DataFileWriterBuilder, 
DataFileWriterConfig};
+use iceberg::writer::file_writer::location_generator::{
+DefaultFileNameGenerator, DefaultLocationGenerator,
+};
+use iceberg::writer::file_writer::ParquetWriterBuilder;
+use iceberg::writer::{IcebergWriter, IcebergWriterBuilder};
+use iceberg::{Catalog, Namespace, NamespaceIdent, TableCreation};
+use iceberg_catalog_rest::{RestCatalog, RestCatalogConfig};
+use iceberg_test_utils::docker::DockerCompose;
+use iceberg_test_utils::{normalize_test_name, set_up};
+use parquet::file::properties::WriterProperties;
+use port_scanner::scan_port_addr;
+use std::collections::HashMap;
+use std::sync::Arc;
+use tokio::time::sleep;
+
+const REST_CATALOG_PORT: u16 = 8181;
+
+struct TestFixture {
+_docker_compose: DockerCompose,
+rest_catalog: RestCatalog,
+}
+
+async fn set_test_fixture(func: &str) -> TestFixture {

Review Comment:
   LGTM. We can do this later.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] feat: support append data file and add e2e test [iceberg-rust]

2024-04-28 Thread via GitHub


marvinlanhenke commented on code in PR #349:
URL: https://github.com/apache/iceberg-rust/pull/349#discussion_r1582058099


##
crates/e2e_test/tests/append_data_file_test.rs:
##
@@ -0,0 +1,212 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Integration tests for rest catalog.
+
+use arrow_array::{ArrayRef, BooleanArray, Int32Array, RecordBatch, 
StringArray};
+use futures::TryStreamExt;
+use iceberg::io::{S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION, 
S3_SECRET_ACCESS_KEY};
+use iceberg::spec::{NestedField, PrimitiveType, Schema, Type};
+use iceberg::transaction::Transaction;
+use iceberg::writer::base_writer::data_file_writer::{DataFileWriterBuilder, 
DataFileWriterConfig};
+use iceberg::writer::file_writer::location_generator::{
+DefaultFileNameGenerator, DefaultLocationGenerator,
+};
+use iceberg::writer::file_writer::ParquetWriterBuilder;
+use iceberg::writer::{IcebergWriter, IcebergWriterBuilder};
+use iceberg::{Catalog, Namespace, NamespaceIdent, TableCreation};
+use iceberg_catalog_rest::{RestCatalog, RestCatalogConfig};
+use iceberg_test_utils::docker::DockerCompose;
+use iceberg_test_utils::{normalize_test_name, set_up};
+use parquet::file::properties::WriterProperties;
+use port_scanner::scan_port_addr;
+use std::collections::HashMap;
+use std::sync::Arc;
+use tokio::time::sleep;
+
+const REST_CATALOG_PORT: u16 = 8181;
+
+struct TestFixture {
+_docker_compose: DockerCompose,
+rest_catalog: RestCatalog,
+}
+
+async fn set_test_fixture(func: &str) -> TestFixture {

Review Comment:
   possible refactor (if we'll use e2e_test for all integrations e.g. hive, 
glue, etc.):
   - relocate `set_test_fixture` into /test_utils/
   - run & expose all services (or allow for configurion param (enum) which 
services to set up



##
crates/e2e_test/testdata/docker-compose.yaml:
##
@@ -0,0 +1,59 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+version: '3.8'
+
+services:

Review Comment:
   I'll guess once this is merged when can reuse this and extend with images 
for `Glue` and `HiveMetastore` as well - this would remove duplicates from the 
specific catalog impl crates and the datafusion/integration crate and avoids 
having each crate run & define same testing infr.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] feat: support append data file and add e2e test [iceberg-rust]

2024-04-26 Thread via GitHub


ZENOTME commented on code in PR #349:
URL: https://github.com/apache/iceberg-rust/pull/349#discussion_r1580571634


##
crates/iceberg/src/io.rs:
##
@@ -368,6 +368,9 @@ impl Storage {
 new_props.insert("root".to_string(), DEFAULT_ROOT_PATH.to_string());
 
 match scheme {
+Scheme::Memory => Ok(Self::LocalFs {

Review Comment:
   >  inherently you're testing a different path than it would normally would.
   
   Yes, but we only limit it to the storage layer. It can provide a writable 
mock layer for unit tests which can make us test the process logic and internal 
state in units easier. Otherwise, we only can test this in e2e test. And the 
problem with e2e test is that we can't check the internal state. 🤔



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] feat: support append data file and add e2e test [iceberg-rust]

2024-04-25 Thread via GitHub


ZENOTME commented on code in PR #349:
URL: https://github.com/apache/iceberg-rust/pull/349#discussion_r1580446821


##
crates/e2e_test/tests/append_data_file_test.rs:
##
@@ -0,0 +1,212 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Integration tests for rest catalog.
+
+use arrow_array::{ArrayRef, BooleanArray, Int32Array, RecordBatch, 
StringArray};
+use futures::TryStreamExt;
+use iceberg::io::{S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION, 
S3_SECRET_ACCESS_KEY};
+use iceberg::spec::{NestedField, PrimitiveType, Schema, Type};
+use iceberg::transaction::Transaction;
+use iceberg::writer::base_writer::data_file_writer::{DataFileWriterBuilder, 
DataFileWriterConfig};
+use iceberg::writer::file_writer::location_generator::{
+DefaultFileNameGenerator, DefaultLocationGenerator,
+};
+use iceberg::writer::file_writer::ParquetWriterBuilder;
+use iceberg::writer::{IcebergWriter, IcebergWriterBuilder};
+use iceberg::{Catalog, Namespace, NamespaceIdent, TableCreation};
+use iceberg_catalog_rest::{RestCatalog, RestCatalogConfig};
+use iceberg_test_utils::docker::DockerCompose;
+use iceberg_test_utils::{normalize_test_name, set_up};
+use parquet::file::properties::WriterProperties;
+use port_scanner::scan_port_addr;
+use std::collections::HashMap;
+use std::sync::Arc;
+use tokio::time::sleep;
+
+const REST_CATALOG_PORT: u16 = 8181;
+
+struct TestFixture {
+_docker_compose: DockerCompose,
+rest_catalog: RestCatalog,
+}
+
+async fn set_test_fixture(func: &str) -> TestFixture {
+set_up();
+let docker_compose = DockerCompose::new(
+normalize_test_name(format!("{}_{func}", module_path!())),
+format!("{}/testdata", env!("CARGO_MANIFEST_DIR")),
+);
+
+// Start docker compose
+docker_compose.run();
+
+let rest_catalog_ip = docker_compose.get_container_ip("rest");
+
+let read_port = format!("{}:{}", rest_catalog_ip, REST_CATALOG_PORT);
+loop {
+if !scan_port_addr(&read_port) {
+log::info!("Waiting for 1s rest catalog to ready...");
+sleep(std::time::Duration::from_millis(1000)).await;
+} else {
+break;
+}
+}
+
+let container_ip = docker_compose.get_container_ip("minio");
+let read_port = format!("{}:{}", container_ip, 9000);
+
+let config = RestCatalogConfig::builder()
+.uri(format!("http://{}:{}";, rest_catalog_ip, REST_CATALOG_PORT))
+.props(HashMap::from([
+(S3_ENDPOINT.to_string(), format!("http://{}";, read_port)),
+(S3_ACCESS_KEY_ID.to_string(), "admin".to_string()),
+(S3_SECRET_ACCESS_KEY.to_string(), "password".to_string()),
+(S3_REGION.to_string(), "us-east-1".to_string()),
+]))
+.build();
+let rest_catalog = RestCatalog::new(config).await.unwrap();
+
+TestFixture {
+_docker_compose: docker_compose,
+rest_catalog,
+}
+}
+
+#[tokio::test]
+async fn test_append_data_file() {
+let fixture = set_test_fixture("test_create_table").await;
+
+let ns = Namespace::with_properties(
+NamespaceIdent::from_strs(["apple", "ios"]).unwrap(),
+HashMap::from([
+("owner".to_string(), "ray".to_string()),
+("community".to_string(), "apache".to_string()),
+]),
+);
+
+fixture
+.rest_catalog
+.create_namespace(ns.name(), ns.properties().clone())
+.await
+.unwrap();
+
+let schema = Schema::builder()
+.with_schema_id(1)
+.with_identifier_field_ids(vec![2])
+.with_fields(vec![
+NestedField::optional(1, "foo", 
Type::Primitive(PrimitiveType::String)).into(),
+NestedField::required(2, "bar", 
Type::Primitive(PrimitiveType::Int)).into(),
+NestedField::optional(3, "baz", 
Type::Primitive(PrimitiveType::Boolean)).into(),
+])
+.build()
+.unwrap();
+
+let table_creation = TableCreation::builder()
+.name("t1".to_string())
+.schema(schema.clone())
+.build();
+
+let table = fixture
+.rest_catalog
+.create_table(ns.name(), table_creation)
+.await
+.unwrap();
+
+// Create the writer and write

Re: [PR] feat: support append data file and add e2e test [iceberg-rust]

2024-04-25 Thread via GitHub


ZENOTME commented on code in PR #349:
URL: https://github.com/apache/iceberg-rust/pull/349#discussion_r1580444775


##
crates/iceberg/src/transaction.rs:
##
@@ -121,6 +166,270 @@ impl<'a> Transaction<'a> {
 }
 }
 
+/// FastAppendAction is a transaction action for fast append data files to the 
table.
+pub struct FastAppendAction<'a> {
+tx: Transaction<'a>,
+
+parent_snapshot_id: Option,
+snapshot_id: i64,
+schema: Schema,
+schema_id: i32,
+format_version: FormatVersion,
+partition_spec: PartitionSpec,
+key_metadata: Vec,
+
+commit_uuid: String,
+manifest_id: i64,
+
+appended_data_files: Vec,
+}
+
+impl<'a> FastAppendAction<'a> {
+#[allow(clippy::too_many_arguments)]
+pub(crate) fn new(
+tx: Transaction<'a>,
+parent_snapshot_id: Option,
+snapshot_id: i64,
+schema: Schema,
+schema_id: i32,
+format_version: FormatVersion,
+partition_spec: PartitionSpec,
+key_metadata: Vec,
+commit_uuid: String,
+) -> Result {
+Ok(Self {
+tx,
+parent_snapshot_id,
+snapshot_id,
+schema,
+schema_id,
+format_version,
+partition_spec,
+key_metadata,
+commit_uuid,
+manifest_id: 0,
+appended_data_files: vec![],
+})
+}
+
+/// Add data files to the snapshot.
+pub fn add_data_files(
+&mut self,
+data_file: impl IntoIterator,
+) -> Result<&mut Self> {
+self.appended_data_files.extend(data_file);
+Ok(self)
+}
+
+fn generate_manifest_file_path(&mut self) -> String {
+let manifest_id = self.manifest_id;
+self.manifest_id += 1;
+format!(
+"{}/{}/{}-m{}.{}",
+self.tx.table.metadata().location(),
+META_ROOT_PATH,
+&self.commit_uuid,
+manifest_id,
+DataFileFormat::Avro
+)
+}
+
+async fn manifest_from_parent_snapshot(&self) -> Result> 
{
+if let Some(snapshot) = self.tx.table.metadata().current_snapshot() {
+let manifest_list = snapshot
+.load_manifest_list(self.tx.table.file_io(), 
&self.tx.table.metadata_ref())
+.await?;
+let mut manifest_files = 
Vec::with_capacity(manifest_list.entries().len());
+for entry in manifest_list.entries() {
+// From: 
https://github.com/apache/iceberg-python/blob/659a951d6397ab280cae80206fe6e8e4be2d3738/pyiceberg/table/__init__.py#L2921
+// Why we need this?
+if entry.added_snapshot_id == self.snapshot_id {
+continue;
+}
+let manifest = 
entry.load_manifest(self.tx.table.file_io()).await?;
+// Skip manifest with all delete entries.
+if manifest.entries().iter().all(|entry| !entry.is_alive()) {
+continue;
+}
+manifest_files.push(entry.clone());
+}
+Ok(manifest_files)
+} else {
+Ok(vec![])
+}
+}
+
+// Write manifest file for added data files and return the ManifestFile 
for ManifestList.
+async fn manifest_for_data_file(&mut self) -> Result {
+let appended_data_files = std::mem::take(&mut 
self.appended_data_files);
+let manifest_entries = appended_data_files
+.into_iter()
+.map(|data_file| {
+ManifestEntry::builder()
+.status(crate::spec::ManifestStatus::Added)
+.snapshot_id(self.snapshot_id)
+.data_file(data_file)
+.build()
+})
+.collect();
+let manifest_meta = ManifestMetadata::builder()
+.schema(self.schema.clone())
+.schema_id(self.schema_id)
+.format_version(self.format_version)
+.partition_spec(self.partition_spec.clone())
+.content(crate::spec::ManifestContentType::Data)
+.build();
+let manifest = Manifest::new(manifest_meta, manifest_entries);
+let writer = ManifestWriter::new(
+self.tx
+.table
+.file_io()
+.new_output(self.generate_manifest_file_path())?,
+self.snapshot_id,
+self.key_metadata.clone(),
+);
+writer.write(manifest).await
+}
+
+fn summary(&self) -> Summary {
+Summary {
+operation: crate::spec::Operation::Append,
+other: HashMap::new(),
+}
+}
+
+/// Finished building the action and apply it to the transaction.
+pub async fn apply(mut self) -> Result> {
+let summary = self.summary();
+let manifest = self.manifest_for_data_file().await?;
+let existing_manifest_files = 
self.manifest_from_p

Re: [PR] feat: support append data file and add e2e test [iceberg-rust]

2024-04-25 Thread via GitHub


Fokko commented on code in PR #349:
URL: https://github.com/apache/iceberg-rust/pull/349#discussion_r1579520268


##
crates/e2e_test/tests/append_data_file_test.rs:
##
@@ -0,0 +1,212 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Integration tests for rest catalog.
+
+use arrow_array::{ArrayRef, BooleanArray, Int32Array, RecordBatch, 
StringArray};
+use futures::TryStreamExt;
+use iceberg::io::{S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION, 
S3_SECRET_ACCESS_KEY};
+use iceberg::spec::{NestedField, PrimitiveType, Schema, Type};
+use iceberg::transaction::Transaction;
+use iceberg::writer::base_writer::data_file_writer::{DataFileWriterBuilder, 
DataFileWriterConfig};
+use iceberg::writer::file_writer::location_generator::{
+DefaultFileNameGenerator, DefaultLocationGenerator,
+};
+use iceberg::writer::file_writer::ParquetWriterBuilder;
+use iceberg::writer::{IcebergWriter, IcebergWriterBuilder};
+use iceberg::{Catalog, Namespace, NamespaceIdent, TableCreation};
+use iceberg_catalog_rest::{RestCatalog, RestCatalogConfig};
+use iceberg_test_utils::docker::DockerCompose;
+use iceberg_test_utils::{normalize_test_name, set_up};
+use parquet::file::properties::WriterProperties;
+use port_scanner::scan_port_addr;
+use std::collections::HashMap;
+use std::sync::Arc;
+use tokio::time::sleep;
+
+const REST_CATALOG_PORT: u16 = 8181;
+
+struct TestFixture {
+_docker_compose: DockerCompose,
+rest_catalog: RestCatalog,
+}
+
+async fn set_test_fixture(func: &str) -> TestFixture {
+set_up();
+let docker_compose = DockerCompose::new(
+normalize_test_name(format!("{}_{func}", module_path!())),
+format!("{}/testdata", env!("CARGO_MANIFEST_DIR")),
+);
+
+// Start docker compose
+docker_compose.run();
+
+let rest_catalog_ip = docker_compose.get_container_ip("rest");
+
+let read_port = format!("{}:{}", rest_catalog_ip, REST_CATALOG_PORT);
+loop {
+if !scan_port_addr(&read_port) {
+log::info!("Waiting for 1s rest catalog to ready...");
+sleep(std::time::Duration::from_millis(1000)).await;
+} else {
+break;
+}
+}
+
+let container_ip = docker_compose.get_container_ip("minio");
+let read_port = format!("{}:{}", container_ip, 9000);
+
+let config = RestCatalogConfig::builder()
+.uri(format!("http://{}:{}";, rest_catalog_ip, REST_CATALOG_PORT))
+.props(HashMap::from([
+(S3_ENDPOINT.to_string(), format!("http://{}";, read_port)),
+(S3_ACCESS_KEY_ID.to_string(), "admin".to_string()),
+(S3_SECRET_ACCESS_KEY.to_string(), "password".to_string()),
+(S3_REGION.to_string(), "us-east-1".to_string()),
+]))
+.build();
+let rest_catalog = RestCatalog::new(config).await.unwrap();
+
+TestFixture {
+_docker_compose: docker_compose,
+rest_catalog,
+}
+}
+
+#[tokio::test]
+async fn test_append_data_file() {
+let fixture = set_test_fixture("test_create_table").await;
+
+let ns = Namespace::with_properties(
+NamespaceIdent::from_strs(["apple", "ios"]).unwrap(),
+HashMap::from([
+("owner".to_string(), "ray".to_string()),
+("community".to_string(), "apache".to_string()),
+]),
+);
+
+fixture
+.rest_catalog
+.create_namespace(ns.name(), ns.properties().clone())
+.await
+.unwrap();
+
+let schema = Schema::builder()
+.with_schema_id(1)
+.with_identifier_field_ids(vec![2])
+.with_fields(vec![
+NestedField::optional(1, "foo", 
Type::Primitive(PrimitiveType::String)).into(),
+NestedField::required(2, "bar", 
Type::Primitive(PrimitiveType::Int)).into(),
+NestedField::optional(3, "baz", 
Type::Primitive(PrimitiveType::Boolean)).into(),
+])
+.build()
+.unwrap();
+
+let table_creation = TableCreation::builder()
+.name("t1".to_string())
+.schema(schema.clone())
+.build();
+
+let table = fixture
+.rest_catalog
+.create_table(ns.name(), table_creation)
+.await
+.unwrap();
+
+// Create the writer and write t

Re: [PR] feat: support append data file and add e2e test [iceberg-rust]

2024-04-25 Thread via GitHub


Fokko commented on code in PR #349:
URL: https://github.com/apache/iceberg-rust/pull/349#discussion_r1579472707


##
crates/iceberg/src/transaction.rs:
##
@@ -121,6 +166,270 @@ impl<'a> Transaction<'a> {
 }
 }
 
+/// FastAppendAction is a transaction action for fast append data files to the 
table.
+pub struct FastAppendAction<'a> {
+tx: Transaction<'a>,
+
+parent_snapshot_id: Option,
+snapshot_id: i64,
+schema: Schema,
+schema_id: i32,
+format_version: FormatVersion,
+partition_spec: PartitionSpec,
+key_metadata: Vec,
+
+commit_uuid: String,
+manifest_id: i64,
+
+appended_data_files: Vec,
+}
+
+impl<'a> FastAppendAction<'a> {
+#[allow(clippy::too_many_arguments)]
+pub(crate) fn new(
+tx: Transaction<'a>,
+parent_snapshot_id: Option,
+snapshot_id: i64,
+schema: Schema,
+schema_id: i32,
+format_version: FormatVersion,
+partition_spec: PartitionSpec,
+key_metadata: Vec,
+commit_uuid: String,
+) -> Result {
+Ok(Self {
+tx,
+parent_snapshot_id,
+snapshot_id,
+schema,
+schema_id,
+format_version,
+partition_spec,
+key_metadata,
+commit_uuid,
+manifest_id: 0,
+appended_data_files: vec![],
+})
+}
+
+/// Add data files to the snapshot.
+pub fn add_data_files(
+&mut self,
+data_file: impl IntoIterator,
+) -> Result<&mut Self> {
+self.appended_data_files.extend(data_file);
+Ok(self)
+}
+
+fn generate_manifest_file_path(&mut self) -> String {
+let manifest_id = self.manifest_id;
+self.manifest_id += 1;
+format!(
+"{}/{}/{}-m{}.{}",
+self.tx.table.metadata().location(),
+META_ROOT_PATH,
+&self.commit_uuid,
+manifest_id,
+DataFileFormat::Avro
+)
+}
+
+async fn manifest_from_parent_snapshot(&self) -> Result> 
{
+if let Some(snapshot) = self.tx.table.metadata().current_snapshot() {
+let manifest_list = snapshot
+.load_manifest_list(self.tx.table.file_io(), 
&self.tx.table.metadata_ref())
+.await?;
+let mut manifest_files = 
Vec::with_capacity(manifest_list.entries().len());
+for entry in manifest_list.entries() {
+// From: 
https://github.com/apache/iceberg-python/blob/659a951d6397ab280cae80206fe6e8e4be2d3738/pyiceberg/table/__init__.py#L2921
+// Why we need this?
+if entry.added_snapshot_id == self.snapshot_id {
+continue;
+}
+let manifest = 
entry.load_manifest(self.tx.table.file_io()).await?;
+// Skip manifest with all delete entries.
+if manifest.entries().iter().all(|entry| !entry.is_alive()) {
+continue;
+}
+manifest_files.push(entry.clone());
+}
+Ok(manifest_files)
+} else {
+Ok(vec![])
+}
+}
+
+// Write manifest file for added data files and return the ManifestFile 
for ManifestList.
+async fn manifest_for_data_file(&mut self) -> Result {
+let appended_data_files = std::mem::take(&mut 
self.appended_data_files);
+let manifest_entries = appended_data_files
+.into_iter()
+.map(|data_file| {
+ManifestEntry::builder()
+.status(crate::spec::ManifestStatus::Added)
+.snapshot_id(self.snapshot_id)

Review Comment:
   This is different for V1 and V2. For V1 we do want to write this, for V2 
typically not. This is because when the commit fails due to a conflict, we have 
to rewrite the manifest-list but can-reuse the already written manifests.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] feat: support append data file and add e2e test [iceberg-rust]

2024-04-25 Thread via GitHub


Fokko commented on code in PR #349:
URL: https://github.com/apache/iceberg-rust/pull/349#discussion_r1579466191


##
crates/iceberg/src/transaction.rs:
##
@@ -121,6 +166,270 @@ impl<'a> Transaction<'a> {
 }
 }
 
+/// FastAppendAction is a transaction action for fast append data files to the 
table.
+pub struct FastAppendAction<'a> {
+tx: Transaction<'a>,
+
+parent_snapshot_id: Option,
+snapshot_id: i64,
+schema: Schema,
+schema_id: i32,
+format_version: FormatVersion,
+partition_spec: PartitionSpec,
+key_metadata: Vec,
+
+commit_uuid: String,
+manifest_id: i64,
+
+appended_data_files: Vec,
+}
+
+impl<'a> FastAppendAction<'a> {
+#[allow(clippy::too_many_arguments)]
+pub(crate) fn new(
+tx: Transaction<'a>,
+parent_snapshot_id: Option,
+snapshot_id: i64,
+schema: Schema,
+schema_id: i32,
+format_version: FormatVersion,
+partition_spec: PartitionSpec,
+key_metadata: Vec,
+commit_uuid: String,
+) -> Result {
+Ok(Self {
+tx,
+parent_snapshot_id,
+snapshot_id,
+schema,
+schema_id,
+format_version,
+partition_spec,
+key_metadata,
+commit_uuid,
+manifest_id: 0,
+appended_data_files: vec![],
+})
+}
+
+/// Add data files to the snapshot.
+pub fn add_data_files(
+&mut self,
+data_file: impl IntoIterator,
+) -> Result<&mut Self> {
+self.appended_data_files.extend(data_file);
+Ok(self)
+}
+
+fn generate_manifest_file_path(&mut self) -> String {
+let manifest_id = self.manifest_id;
+self.manifest_id += 1;
+format!(
+"{}/{}/{}-m{}.{}",
+self.tx.table.metadata().location(),
+META_ROOT_PATH,
+&self.commit_uuid,
+manifest_id,
+DataFileFormat::Avro
+)
+}
+
+async fn manifest_from_parent_snapshot(&self) -> Result> 
{
+if let Some(snapshot) = self.tx.table.metadata().current_snapshot() {
+let manifest_list = snapshot
+.load_manifest_list(self.tx.table.file_io(), 
&self.tx.table.metadata_ref())
+.await?;
+let mut manifest_files = 
Vec::with_capacity(manifest_list.entries().len());
+for entry in manifest_list.entries() {
+// From: 
https://github.com/apache/iceberg-python/blob/659a951d6397ab280cae80206fe6e8e4be2d3738/pyiceberg/table/__init__.py#L2921
+// Why we need this?
+if entry.added_snapshot_id == self.snapshot_id {
+continue;
+}
+let manifest = 
entry.load_manifest(self.tx.table.file_io()).await?;
+// Skip manifest with all delete entries.
+if manifest.entries().iter().all(|entry| !entry.is_alive()) {
+continue;
+}
+manifest_files.push(entry.clone());
+}
+Ok(manifest_files)
+} else {
+Ok(vec![])
+}
+}
+
+// Write manifest file for added data files and return the ManifestFile 
for ManifestList.
+async fn manifest_for_data_file(&mut self) -> Result {
+let appended_data_files = std::mem::take(&mut 
self.appended_data_files);
+let manifest_entries = appended_data_files
+.into_iter()
+.map(|data_file| {
+ManifestEntry::builder()
+.status(crate::spec::ManifestStatus::Added)
+.snapshot_id(self.snapshot_id)
+.data_file(data_file)
+.build()
+})
+.collect();
+let manifest_meta = ManifestMetadata::builder()
+.schema(self.schema.clone())
+.schema_id(self.schema_id)
+.format_version(self.format_version)
+.partition_spec(self.partition_spec.clone())

Review Comment:
   How do we know if the written data adheres to the partition spec?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] feat: support append data file and add e2e test [iceberg-rust]

2024-04-25 Thread via GitHub


Fokko commented on code in PR #349:
URL: https://github.com/apache/iceberg-rust/pull/349#discussion_r1579449633


##
crates/iceberg/src/transaction.rs:
##
@@ -121,6 +166,270 @@ impl<'a> Transaction<'a> {
 }
 }
 
+/// FastAppendAction is a transaction action for fast append data files to the 
table.
+pub struct FastAppendAction<'a> {
+tx: Transaction<'a>,
+
+parent_snapshot_id: Option,
+snapshot_id: i64,
+schema: Schema,
+schema_id: i32,
+format_version: FormatVersion,
+partition_spec: PartitionSpec,
+key_metadata: Vec,
+
+commit_uuid: String,
+manifest_id: i64,
+
+appended_data_files: Vec,
+}
+
+impl<'a> FastAppendAction<'a> {
+#[allow(clippy::too_many_arguments)]
+pub(crate) fn new(
+tx: Transaction<'a>,
+parent_snapshot_id: Option,
+snapshot_id: i64,
+schema: Schema,
+schema_id: i32,
+format_version: FormatVersion,
+partition_spec: PartitionSpec,
+key_metadata: Vec,
+commit_uuid: String,
+) -> Result {
+Ok(Self {
+tx,
+parent_snapshot_id,
+snapshot_id,
+schema,
+schema_id,
+format_version,
+partition_spec,
+key_metadata,
+commit_uuid,
+manifest_id: 0,
+appended_data_files: vec![],
+})
+}
+
+/// Add data files to the snapshot.
+pub fn add_data_files(
+&mut self,
+data_file: impl IntoIterator,
+) -> Result<&mut Self> {
+self.appended_data_files.extend(data_file);
+Ok(self)
+}
+
+fn generate_manifest_file_path(&mut self) -> String {
+let manifest_id = self.manifest_id;
+self.manifest_id += 1;
+format!(
+"{}/{}/{}-m{}.{}",
+self.tx.table.metadata().location(),
+META_ROOT_PATH,
+&self.commit_uuid,
+manifest_id,
+DataFileFormat::Avro
+)
+}
+
+async fn manifest_from_parent_snapshot(&self) -> Result> 
{
+if let Some(snapshot) = self.tx.table.metadata().current_snapshot() {
+let manifest_list = snapshot
+.load_manifest_list(self.tx.table.file_io(), 
&self.tx.table.metadata_ref())
+.await?;
+let mut manifest_files = 
Vec::with_capacity(manifest_list.entries().len());
+for entry in manifest_list.entries() {
+// From: 
https://github.com/apache/iceberg-python/blob/659a951d6397ab280cae80206fe6e8e4be2d3738/pyiceberg/table/__init__.py#L2921
+// Why we need this?
+if entry.added_snapshot_id == self.snapshot_id {
+continue;
+}
+let manifest = 
entry.load_manifest(self.tx.table.file_io()).await?;
+// Skip manifest with all delete entries.
+if manifest.entries().iter().all(|entry| !entry.is_alive()) {
+continue;
+}
+manifest_files.push(entry.clone());
+}
+Ok(manifest_files)
+} else {
+Ok(vec![])
+}
+}
+
+// Write manifest file for added data files and return the ManifestFile 
for ManifestList.
+async fn manifest_for_data_file(&mut self) -> Result {
+let appended_data_files = std::mem::take(&mut 
self.appended_data_files);
+let manifest_entries = appended_data_files
+.into_iter()
+.map(|data_file| {
+ManifestEntry::builder()
+.status(crate::spec::ManifestStatus::Added)
+.snapshot_id(self.snapshot_id)
+.data_file(data_file)
+.build()
+})
+.collect();
+let manifest_meta = ManifestMetadata::builder()
+.schema(self.schema.clone())
+.schema_id(self.schema_id)
+.format_version(self.format_version)
+.partition_spec(self.partition_spec.clone())
+.content(crate::spec::ManifestContentType::Data)
+.build();
+let manifest = Manifest::new(manifest_meta, manifest_entries);
+let writer = ManifestWriter::new(
+self.tx
+.table
+.file_io()
+.new_output(self.generate_manifest_file_path())?,
+self.snapshot_id,
+self.key_metadata.clone(),
+);
+writer.write(manifest).await
+}
+
+fn summary(&self) -> Summary {
+Summary {
+operation: crate::spec::Operation::Append,
+other: HashMap::new(),
+}
+}
+
+/// Finished building the action and apply it to the transaction.
+pub async fn apply(mut self) -> Result> {
+let summary = self.summary();
+let manifest = self.manifest_for_data_file().await?;
+let existing_manifest_files = 
self.manifest_from_par

Re: [PR] feat: support append data file and add e2e test [iceberg-rust]

2024-04-25 Thread via GitHub


Fokko commented on code in PR #349:
URL: https://github.com/apache/iceberg-rust/pull/349#discussion_r1579449633


##
crates/iceberg/src/transaction.rs:
##
@@ -121,6 +166,270 @@ impl<'a> Transaction<'a> {
 }
 }
 
+/// FastAppendAction is a transaction action for fast append data files to the 
table.
+pub struct FastAppendAction<'a> {
+tx: Transaction<'a>,
+
+parent_snapshot_id: Option,
+snapshot_id: i64,
+schema: Schema,
+schema_id: i32,
+format_version: FormatVersion,
+partition_spec: PartitionSpec,
+key_metadata: Vec,
+
+commit_uuid: String,
+manifest_id: i64,
+
+appended_data_files: Vec,
+}
+
+impl<'a> FastAppendAction<'a> {
+#[allow(clippy::too_many_arguments)]
+pub(crate) fn new(
+tx: Transaction<'a>,
+parent_snapshot_id: Option,
+snapshot_id: i64,
+schema: Schema,
+schema_id: i32,
+format_version: FormatVersion,
+partition_spec: PartitionSpec,
+key_metadata: Vec,
+commit_uuid: String,
+) -> Result {
+Ok(Self {
+tx,
+parent_snapshot_id,
+snapshot_id,
+schema,
+schema_id,
+format_version,
+partition_spec,
+key_metadata,
+commit_uuid,
+manifest_id: 0,
+appended_data_files: vec![],
+})
+}
+
+/// Add data files to the snapshot.
+pub fn add_data_files(
+&mut self,
+data_file: impl IntoIterator,
+) -> Result<&mut Self> {
+self.appended_data_files.extend(data_file);
+Ok(self)
+}
+
+fn generate_manifest_file_path(&mut self) -> String {
+let manifest_id = self.manifest_id;
+self.manifest_id += 1;
+format!(
+"{}/{}/{}-m{}.{}",
+self.tx.table.metadata().location(),
+META_ROOT_PATH,
+&self.commit_uuid,
+manifest_id,
+DataFileFormat::Avro
+)
+}
+
+async fn manifest_from_parent_snapshot(&self) -> Result> 
{
+if let Some(snapshot) = self.tx.table.metadata().current_snapshot() {
+let manifest_list = snapshot
+.load_manifest_list(self.tx.table.file_io(), 
&self.tx.table.metadata_ref())
+.await?;
+let mut manifest_files = 
Vec::with_capacity(manifest_list.entries().len());
+for entry in manifest_list.entries() {
+// From: 
https://github.com/apache/iceberg-python/blob/659a951d6397ab280cae80206fe6e8e4be2d3738/pyiceberg/table/__init__.py#L2921
+// Why we need this?
+if entry.added_snapshot_id == self.snapshot_id {
+continue;
+}
+let manifest = 
entry.load_manifest(self.tx.table.file_io()).await?;
+// Skip manifest with all delete entries.
+if manifest.entries().iter().all(|entry| !entry.is_alive()) {
+continue;
+}
+manifest_files.push(entry.clone());
+}
+Ok(manifest_files)
+} else {
+Ok(vec![])
+}
+}
+
+// Write manifest file for added data files and return the ManifestFile 
for ManifestList.
+async fn manifest_for_data_file(&mut self) -> Result {
+let appended_data_files = std::mem::take(&mut 
self.appended_data_files);
+let manifest_entries = appended_data_files
+.into_iter()
+.map(|data_file| {
+ManifestEntry::builder()
+.status(crate::spec::ManifestStatus::Added)
+.snapshot_id(self.snapshot_id)
+.data_file(data_file)
+.build()
+})
+.collect();
+let manifest_meta = ManifestMetadata::builder()
+.schema(self.schema.clone())
+.schema_id(self.schema_id)
+.format_version(self.format_version)
+.partition_spec(self.partition_spec.clone())
+.content(crate::spec::ManifestContentType::Data)
+.build();
+let manifest = Manifest::new(manifest_meta, manifest_entries);
+let writer = ManifestWriter::new(
+self.tx
+.table
+.file_io()
+.new_output(self.generate_manifest_file_path())?,
+self.snapshot_id,
+self.key_metadata.clone(),
+);
+writer.write(manifest).await
+}
+
+fn summary(&self) -> Summary {
+Summary {
+operation: crate::spec::Operation::Append,
+other: HashMap::new(),
+}
+}
+
+/// Finished building the action and apply it to the transaction.
+pub async fn apply(mut self) -> Result> {
+let summary = self.summary();
+let manifest = self.manifest_for_data_file().await?;
+let existing_manifest_files = 
self.manifest_from_par

Re: [PR] feat: support append data file and add e2e test [iceberg-rust]

2024-04-25 Thread via GitHub


Fokko commented on code in PR #349:
URL: https://github.com/apache/iceberg-rust/pull/349#discussion_r1579426812


##
crates/e2e_test/testdata/docker-compose.yaml:
##
@@ -0,0 +1,59 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+version: '3.8'
+
+services:
+  rest:
+image: tabulario/iceberg-rest:0.10.0
+environment:
+  - AWS_ACCESS_KEY_ID=admin
+  - AWS_SECRET_ACCESS_KEY=password
+  - AWS_REGION=us-east-1
+  - CATALOG_CATOLOG__IMPL=org.apache.iceberg.jdbc.JdbcCatalog
+  - CATALOG_URI=jdbc:sqlite:file:/tmp/iceberg_rest_mode=memory
+  - CATALOG_WAREHOUSE=s3://icebergdata/demo
+  - CATALOG_IO__IMPL=org.apache.iceberg.aws.s3.S3FileIO
+  - CATALOG_S3_ENDPOINT=http://minio:9000
+depends_on:
+  - minio
+links:
+  - minio:icebergdata.minio
+expose:
+  - 8181
+
+  minio:

Review Comment:
   I'm not sure if the error is relevant. I'm able to run it locally now.
   
   > how about change `minio/minio:RELEASE.2024-03-07T00-43-48Z`, 
`minio/mc:RELEASE.2024-03-07T00-31-49Z` to `minio/minio:latest`, 
`minio/mc:latest`.
   
   +1 on that. I haven't encountered any issues with minio updates



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] feat: support append data file and add e2e test [iceberg-rust]

2024-04-25 Thread via GitHub


Fokko commented on code in PR #349:
URL: https://github.com/apache/iceberg-rust/pull/349#discussion_r1579420662


##
crates/iceberg/src/transaction.rs:
##
@@ -121,6 +166,270 @@ impl<'a> Transaction<'a> {
 }
 }
 
+/// FastAppendAction is a transaction action for fast append data files to the 
table.
+pub struct FastAppendAction<'a> {
+tx: Transaction<'a>,
+
+parent_snapshot_id: Option,
+snapshot_id: i64,
+schema: Schema,
+schema_id: i32,
+format_version: FormatVersion,
+partition_spec: PartitionSpec,
+key_metadata: Vec,
+
+commit_uuid: String,
+manifest_id: i64,
+
+appended_data_files: Vec,
+}
+
+impl<'a> FastAppendAction<'a> {
+#[allow(clippy::too_many_arguments)]
+pub(crate) fn new(
+tx: Transaction<'a>,
+parent_snapshot_id: Option,
+snapshot_id: i64,
+schema: Schema,
+schema_id: i32,
+format_version: FormatVersion,
+partition_spec: PartitionSpec,
+key_metadata: Vec,
+commit_uuid: String,
+) -> Result {
+Ok(Self {
+tx,
+parent_snapshot_id,
+snapshot_id,
+schema,
+schema_id,
+format_version,
+partition_spec,
+key_metadata,
+commit_uuid,
+manifest_id: 0,
+appended_data_files: vec![],
+})
+}
+
+/// Add data files to the snapshot.
+pub fn add_data_files(
+&mut self,
+data_file: impl IntoIterator,
+) -> Result<&mut Self> {
+self.appended_data_files.extend(data_file);
+Ok(self)
+}
+
+fn generate_manifest_file_path(&mut self) -> String {
+let manifest_id = self.manifest_id;
+self.manifest_id += 1;
+format!(
+"{}/{}/{}-m{}.{}",
+self.tx.table.metadata().location(),
+META_ROOT_PATH,
+&self.commit_uuid,
+manifest_id,
+DataFileFormat::Avro
+)
+}
+
+async fn manifest_from_parent_snapshot(&self) -> Result> 
{
+if let Some(snapshot) = self.tx.table.metadata().current_snapshot() {
+let manifest_list = snapshot
+.load_manifest_list(self.tx.table.file_io(), 
&self.tx.table.metadata_ref())
+.await?;
+let mut manifest_files = 
Vec::with_capacity(manifest_list.entries().len());
+for entry in manifest_list.entries() {
+// From: 
https://github.com/apache/iceberg-python/blob/659a951d6397ab280cae80206fe6e8e4be2d3738/pyiceberg/table/__init__.py#L2921
+// Why we need this?
+if entry.added_snapshot_id == self.snapshot_id {
+continue;
+}
+let manifest = 
entry.load_manifest(self.tx.table.file_io()).await?;
+// Skip manifest with all delete entries.
+if manifest.entries().iter().all(|entry| !entry.is_alive()) {
+continue;
+}
+manifest_files.push(entry.clone());
+}
+Ok(manifest_files)
+} else {
+Ok(vec![])
+}
+}
+
+// Write manifest file for added data files and return the ManifestFile 
for ManifestList.
+async fn manifest_for_data_file(&mut self) -> Result {
+let appended_data_files = std::mem::take(&mut 
self.appended_data_files);
+let manifest_entries = appended_data_files
+.into_iter()
+.map(|data_file| {
+ManifestEntry::builder()
+.status(crate::spec::ManifestStatus::Added)
+.snapshot_id(self.snapshot_id)
+.data_file(data_file)
+.build()
+})
+.collect();
+let manifest_meta = ManifestMetadata::builder()
+.schema(self.schema.clone())
+.schema_id(self.schema_id)
+.format_version(self.format_version)
+.partition_spec(self.partition_spec.clone())
+.content(crate::spec::ManifestContentType::Data)
+.build();
+let manifest = Manifest::new(manifest_meta, manifest_entries);
+let writer = ManifestWriter::new(
+self.tx
+.table
+.file_io()
+.new_output(self.generate_manifest_file_path())?,
+self.snapshot_id,
+self.key_metadata.clone(),
+);
+writer.write(manifest).await
+}
+
+fn summary(&self) -> Summary {
+Summary {
+operation: crate::spec::Operation::Append,
+other: HashMap::new(),

Review Comment:
   The summary generation is missing here. I'm okay with doing that in a 
separate PR, but we have to make sure that we add it before the release.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above

Re: [PR] feat: support append data file and add e2e test [iceberg-rust]

2024-04-25 Thread via GitHub


Fokko commented on code in PR #349:
URL: https://github.com/apache/iceberg-rust/pull/349#discussion_r1579412576


##
crates/iceberg/src/transaction.rs:
##
@@ -95,6 +104,42 @@ impl<'a> Transaction<'a> {
 Ok(self)
 }
 
+/// Creates a fast append action.
+pub fn fast_append(
+self,
+commit_uuid: Option,
+key_metadata: Vec,
+) -> Result> {
+let parent_snapshot_id = self
+.table
+.metadata()
+.current_snapshot()
+.map(|s| s.snapshot_id());
+let snapshot_id = parent_snapshot_id.map(|id| id + 1).unwrap_or(0);

Review Comment:
   The snapshot ID is a random int, and it should be checked if it hasn't been 
used before:
   
   
https://github.com/apache/iceberg-python/blob/f72e363b18baa181c998bbdef657982159a22d48/pyiceberg/table/metadata.py#L315-L326



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] feat: support append data file and add e2e test [iceberg-rust]

2024-04-25 Thread via GitHub


ZENOTME commented on code in PR #349:
URL: https://github.com/apache/iceberg-rust/pull/349#discussion_r1579406252


##
crates/e2e_test/testdata/docker-compose.yaml:
##
@@ -0,0 +1,59 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+version: '3.8'
+
+services:
+  rest:
+image: tabulario/iceberg-rest:0.10.0
+environment:
+  - AWS_ACCESS_KEY_ID=admin
+  - AWS_SECRET_ACCESS_KEY=password
+  - AWS_REGION=us-east-1
+  - CATALOG_CATOLOG__IMPL=org.apache.iceberg.jdbc.JdbcCatalog
+  - CATALOG_URI=jdbc:sqlite:file:/tmp/iceberg_rest_mode=memory
+  - CATALOG_WAREHOUSE=s3://icebergdata/demo
+  - CATALOG_IO__IMPL=org.apache.iceberg.aws.s3.S3FileIO
+  - CATALOG_S3_ENDPOINT=http://minio:9000
+depends_on:
+  - minio
+links:
+  - minio:icebergdata.minio
+expose:
+  - 8181
+
+  minio:
+image: minio/minio:RELEASE.2024-03-07T00-43-48Z
+environment:
+  - MINIO_ROOT_USER=admin
+  - MINIO_ROOT_PASSWORD=password
+  - MINIO_DOMAIN=minio
+expose:
+  - 9001
+  - 9000

Review Comment:
   LGTM. I also love to expose to make it easier to debug.🤣



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] feat: support append data file and add e2e test [iceberg-rust]

2024-04-25 Thread via GitHub


Fokko commented on code in PR #349:
URL: https://github.com/apache/iceberg-rust/pull/349#discussion_r1579402269


##
crates/e2e_test/tests/append_data_file_test.rs:
##
@@ -0,0 +1,212 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Integration tests for rest catalog.
+
+use arrow_array::{ArrayRef, BooleanArray, Int32Array, RecordBatch, 
StringArray};
+use futures::TryStreamExt;
+use iceberg::io::{S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION, 
S3_SECRET_ACCESS_KEY};
+use iceberg::spec::{NestedField, PrimitiveType, Schema, Type};
+use iceberg::transaction::Transaction;
+use iceberg::writer::base_writer::data_file_writer::{DataFileWriterBuilder, 
DataFileWriterConfig};
+use iceberg::writer::file_writer::location_generator::{
+DefaultFileNameGenerator, DefaultLocationGenerator,
+};
+use iceberg::writer::file_writer::ParquetWriterBuilder;
+use iceberg::writer::{IcebergWriter, IcebergWriterBuilder};
+use iceberg::{Catalog, Namespace, NamespaceIdent, TableCreation};
+use iceberg_catalog_rest::{RestCatalog, RestCatalogConfig};
+use iceberg_test_utils::docker::DockerCompose;
+use iceberg_test_utils::{normalize_test_name, set_up};
+use parquet::file::properties::WriterProperties;
+use port_scanner::scan_port_addr;
+use std::collections::HashMap;
+use std::sync::Arc;
+use tokio::time::sleep;
+
+const REST_CATALOG_PORT: u16 = 8181;
+
+struct TestFixture {
+_docker_compose: DockerCompose,
+rest_catalog: RestCatalog,
+}
+
+async fn set_test_fixture(func: &str) -> TestFixture {
+set_up();
+let docker_compose = DockerCompose::new(
+normalize_test_name(format!("{}_{func}", module_path!())),
+format!("{}/testdata", env!("CARGO_MANIFEST_DIR")),
+);
+
+// Start docker compose
+docker_compose.run();
+
+let rest_catalog_ip = docker_compose.get_container_ip("rest");
+
+let read_port = format!("{}:{}", rest_catalog_ip, REST_CATALOG_PORT);
+loop {
+if !scan_port_addr(&read_port) {
+log::info!("Waiting for 1s rest catalog to ready...");
+sleep(std::time::Duration::from_millis(1000)).await;
+} else {
+break;
+}
+}
+
+let container_ip = docker_compose.get_container_ip("minio");
+let read_port = format!("{}:{}", container_ip, 9000);
+
+let config = RestCatalogConfig::builder()
+.uri(format!("http://{}:{}";, rest_catalog_ip, REST_CATALOG_PORT))
+.props(HashMap::from([
+(S3_ENDPOINT.to_string(), format!("http://{}";, read_port)),
+(S3_ACCESS_KEY_ID.to_string(), "admin".to_string()),
+(S3_SECRET_ACCESS_KEY.to_string(), "password".to_string()),
+(S3_REGION.to_string(), "us-east-1".to_string()),
+]))
+.build();
+let rest_catalog = RestCatalog::new(config).await.unwrap();
+
+TestFixture {
+_docker_compose: docker_compose,
+rest_catalog,
+}
+}
+
+#[tokio::test]
+async fn test_append_data_file() {
+let fixture = set_test_fixture("test_create_table").await;
+
+let ns = Namespace::with_properties(
+NamespaceIdent::from_strs(["apple", "ios"]).unwrap(),
+HashMap::from([
+("owner".to_string(), "ray".to_string()),
+("community".to_string(), "apache".to_string()),
+]),
+);
+
+fixture
+.rest_catalog
+.create_namespace(ns.name(), ns.properties().clone())
+.await
+.unwrap();
+
+let schema = Schema::builder()
+.with_schema_id(1)
+.with_identifier_field_ids(vec![2])
+.with_fields(vec![
+NestedField::optional(1, "foo", 
Type::Primitive(PrimitiveType::String)).into(),
+NestedField::required(2, "bar", 
Type::Primitive(PrimitiveType::Int)).into(),
+NestedField::optional(3, "baz", 
Type::Primitive(PrimitiveType::Boolean)).into(),
+])
+.build()
+.unwrap();
+
+let table_creation = TableCreation::builder()
+.name("t1".to_string())
+.schema(schema.clone())
+.build();
+
+let table = fixture
+.rest_catalog
+.create_table(ns.name(), table_creation)
+.await
+.unwrap();
+
+// Create the writer and write t

Re: [PR] feat: support append data file and add e2e test [iceberg-rust]

2024-04-25 Thread via GitHub


ZENOTME commented on code in PR #349:
URL: https://github.com/apache/iceberg-rust/pull/349#discussion_r1579401733


##
crates/e2e_test/testdata/docker-compose.yaml:
##
@@ -0,0 +1,59 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+version: '3.8'
+
+services:
+  rest:
+image: tabulario/iceberg-rest:0.10.0
+environment:
+  - AWS_ACCESS_KEY_ID=admin
+  - AWS_SECRET_ACCESS_KEY=password
+  - AWS_REGION=us-east-1
+  - CATALOG_CATOLOG__IMPL=org.apache.iceberg.jdbc.JdbcCatalog
+  - CATALOG_URI=jdbc:sqlite:file:/tmp/iceberg_rest_mode=memory
+  - CATALOG_WAREHOUSE=s3://icebergdata/demo
+  - CATALOG_IO__IMPL=org.apache.iceberg.aws.s3.S3FileIO
+  - CATALOG_S3_ENDPOINT=http://minio:9000
+depends_on:
+  - minio
+links:
+  - minio:icebergdata.minio
+expose:
+  - 8181
+
+  minio:

Review Comment:
   how about change `minio/minio:RELEASE.2024-03-07T00-43-48Z`, 
`minio/mc:RELEASE.2024-03-07T00-31-49Z` to `minio/minio:latest`, 
`minio/mc:latest`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] feat: support append data file and add e2e test [iceberg-rust]

2024-04-25 Thread via GitHub


Fokko commented on code in PR #349:
URL: https://github.com/apache/iceberg-rust/pull/349#discussion_r1579401183


##
crates/e2e_test/tests/append_data_file_test.rs:
##
@@ -0,0 +1,212 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Integration tests for rest catalog.
+
+use arrow_array::{ArrayRef, BooleanArray, Int32Array, RecordBatch, 
StringArray};
+use futures::TryStreamExt;
+use iceberg::io::{S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION, 
S3_SECRET_ACCESS_KEY};
+use iceberg::spec::{NestedField, PrimitiveType, Schema, Type};
+use iceberg::transaction::Transaction;
+use iceberg::writer::base_writer::data_file_writer::{DataFileWriterBuilder, 
DataFileWriterConfig};
+use iceberg::writer::file_writer::location_generator::{
+DefaultFileNameGenerator, DefaultLocationGenerator,
+};
+use iceberg::writer::file_writer::ParquetWriterBuilder;
+use iceberg::writer::{IcebergWriter, IcebergWriterBuilder};
+use iceberg::{Catalog, Namespace, NamespaceIdent, TableCreation};
+use iceberg_catalog_rest::{RestCatalog, RestCatalogConfig};
+use iceberg_test_utils::docker::DockerCompose;
+use iceberg_test_utils::{normalize_test_name, set_up};
+use parquet::file::properties::WriterProperties;
+use port_scanner::scan_port_addr;
+use std::collections::HashMap;
+use std::sync::Arc;
+use tokio::time::sleep;
+
+const REST_CATALOG_PORT: u16 = 8181;
+
+struct TestFixture {
+_docker_compose: DockerCompose,
+rest_catalog: RestCatalog,
+}
+
+async fn set_test_fixture(func: &str) -> TestFixture {
+set_up();
+let docker_compose = DockerCompose::new(
+normalize_test_name(format!("{}_{func}", module_path!())),
+format!("{}/testdata", env!("CARGO_MANIFEST_DIR")),
+);
+
+// Start docker compose
+docker_compose.run();
+
+let rest_catalog_ip = docker_compose.get_container_ip("rest");
+
+let read_port = format!("{}:{}", rest_catalog_ip, REST_CATALOG_PORT);
+loop {
+if !scan_port_addr(&read_port) {
+log::info!("Waiting for 1s rest catalog to ready...");
+sleep(std::time::Duration::from_millis(1000)).await;
+} else {
+break;
+}
+}
+
+let container_ip = docker_compose.get_container_ip("minio");

Review Comment:
   I've exposed the ports, and this allows me to just point to 127.0.0.1 👍 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] feat: support append data file and add e2e test [iceberg-rust]

2024-04-25 Thread via GitHub


Fokko commented on code in PR #349:
URL: https://github.com/apache/iceberg-rust/pull/349#discussion_r1579375646


##
crates/e2e_test/testdata/docker-compose.yaml:
##
@@ -0,0 +1,59 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+version: '3.8'
+
+services:
+  rest:
+image: tabulario/iceberg-rest:0.10.0
+environment:
+  - AWS_ACCESS_KEY_ID=admin
+  - AWS_SECRET_ACCESS_KEY=password
+  - AWS_REGION=us-east-1
+  - CATALOG_CATOLOG__IMPL=org.apache.iceberg.jdbc.JdbcCatalog
+  - CATALOG_URI=jdbc:sqlite:file:/tmp/iceberg_rest_mode=memory
+  - CATALOG_WAREHOUSE=s3://icebergdata/demo
+  - CATALOG_IO__IMPL=org.apache.iceberg.aws.s3.S3FileIO
+  - CATALOG_S3_ENDPOINT=http://minio:9000
+depends_on:
+  - minio
+links:
+  - minio:icebergdata.minio
+expose:
+  - 8181

Review Comment:
   I would love to run this from my IDE as well:
   ```suggestion
   ports:
 - 8181:8181
   expose:
 - 8181
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] feat: support append data file and add e2e test [iceberg-rust]

2024-04-25 Thread via GitHub


Fokko commented on code in PR #349:
URL: https://github.com/apache/iceberg-rust/pull/349#discussion_r1579372414


##
crates/e2e_test/testdata/docker-compose.yaml:
##
@@ -0,0 +1,59 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+version: '3.8'
+
+services:
+  rest:
+image: tabulario/iceberg-rest:0.10.0
+environment:
+  - AWS_ACCESS_KEY_ID=admin
+  - AWS_SECRET_ACCESS_KEY=password
+  - AWS_REGION=us-east-1
+  - CATALOG_CATOLOG__IMPL=org.apache.iceberg.jdbc.JdbcCatalog
+  - CATALOG_URI=jdbc:sqlite:file:/tmp/iceberg_rest_mode=memory
+  - CATALOG_WAREHOUSE=s3://icebergdata/demo
+  - CATALOG_IO__IMPL=org.apache.iceberg.aws.s3.S3FileIO
+  - CATALOG_S3_ENDPOINT=http://minio:9000
+depends_on:
+  - minio
+links:
+  - minio:icebergdata.minio
+expose:
+  - 8181
+
+  minio:
+image: minio/minio:RELEASE.2024-03-07T00-43-48Z
+environment:
+  - MINIO_ROOT_USER=admin
+  - MINIO_ROOT_PASSWORD=password
+  - MINIO_DOMAIN=minio
+expose:
+  - 9001
+  - 9000

Review Comment:
   How about also exposing the management console:
   ```suggestion
   ports:
 - 9001:9001
   expose:
 - 9001
 - 9000
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] feat: support append data file and add e2e test [iceberg-rust]

2024-04-25 Thread via GitHub


Fokko commented on code in PR #349:
URL: https://github.com/apache/iceberg-rust/pull/349#discussion_r1579369755


##
crates/e2e_test/testdata/docker-compose.yaml:
##
@@ -0,0 +1,59 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+version: '3.8'
+
+services:
+  rest:
+image: tabulario/iceberg-rest:0.10.0
+environment:
+  - AWS_ACCESS_KEY_ID=admin
+  - AWS_SECRET_ACCESS_KEY=password
+  - AWS_REGION=us-east-1
+  - CATALOG_CATOLOG__IMPL=org.apache.iceberg.jdbc.JdbcCatalog
+  - CATALOG_URI=jdbc:sqlite:file:/tmp/iceberg_rest_mode=memory
+  - CATALOG_WAREHOUSE=s3://icebergdata/demo
+  - CATALOG_IO__IMPL=org.apache.iceberg.aws.s3.S3FileIO
+  - CATALOG_S3_ENDPOINT=http://minio:9000
+depends_on:
+  - minio
+links:
+  - minio:icebergdata.minio
+expose:
+  - 8181
+
+  minio:

Review Comment:
   It doesn't boot on my end:
   
   ```
   ➜  iceberg-rust git:(tx_append) docker logs -f d7a12d1f9d30
   Formatting 1st pool, 1 set(s), 1 drives per set.
   WARNING: Host local has more than 0 drives of set. A host failure will 
result in data becoming unavailable.
   MinIO Object Storage Server
   Copyright: 2015-2024 MinIO, Inc.
   License: GNU AGPLv3 
   Version: RELEASE.2024-03-07T00-43-48Z (go1.21.8 linux/arm64)
   
   Status: 1 Online, 0 Offline. 
   S3-API: http://172.20.0.2:9000  http://127.0.0.1:9000 
   Console: http://172.20.0.2:9001 http://127.0.0.1:9001   
   
   Documentation: https://min.io/docs/minio/linux/index.html
   Warning: The standard parity is set to 0. This can lead to data loss.
   
   API: ListObjectsV2(bucket=icebergdata)
   Time: 12:11:34 UTC 04/25/2024
   DeploymentID: 0d2c88aa-2393-4c17-a28c-560e6cfe4b9b
   RequestID: 17C984C03C8E1AFA
   RemoteHost: 172.20.0.3
   Host: minio:9000
   UserAgent: MinIO (linux; arm64) minio-go/v7.0.67 
mc/RELEASE.2024-03-07T00-31-49Z
   Error: volume not found (cmd.StorageErr)
  7: internal/logger/logonce.go:118:logger.(*logOnceType).logOnceIf()
  6: internal/logger/logonce.go:149:logger.LogOnceIf()
  5: 
cmd/erasure-server-pool.go:1516:cmd.(*erasureServerPools).ListObjects()
  4: 
cmd/erasure-server-pool.go:1275:cmd.(*erasureServerPools).ListObjectsV2()
  3: 
cmd/bucket-listobjects-handlers.go:210:cmd.objectAPIHandlers.listObjectsV2Handler()
  2: 
cmd/bucket-listobjects-handlers.go:156:cmd.objectAPIHandlers.ListObjectsV2Handler()
  1: net/http/server.go:2136:http.HandlerFunc.ServeHTTP()
   
You are running an older version of MinIO released 1 month before the 
latest release 
Update: Run `mc admin update ALIAS` 
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] feat: support append data file and add e2e test [iceberg-rust]

2024-04-25 Thread via GitHub


Fokko commented on code in PR #349:
URL: https://github.com/apache/iceberg-rust/pull/349#discussion_r1579368639


##
crates/e2e_test/testdata/docker-compose.yaml:
##
@@ -0,0 +1,59 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+version: '3.8'
+
+services:
+  rest:
+image: tabulario/iceberg-rest:0.10.0
+environment:
+  - AWS_ACCESS_KEY_ID=admin
+  - AWS_SECRET_ACCESS_KEY=password
+  - AWS_REGION=us-east-1
+  - CATALOG_CATOLOG__IMPL=org.apache.iceberg.jdbc.JdbcCatalog

Review Comment:
   There is a typo in this. I think this line can go.
   ```suggestion
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] feat: support append data file and add e2e test [iceberg-rust]

2024-04-25 Thread via GitHub


Fokko commented on code in PR #349:
URL: https://github.com/apache/iceberg-rust/pull/349#discussion_r1579004695


##
crates/iceberg/src/io.rs:
##
@@ -368,6 +368,9 @@ impl Storage {
 new_props.insert("root".to_string(), DEFAULT_ROOT_PATH.to_string());
 
 match scheme {
+Scheme::Memory => Ok(Self::LocalFs {

Review Comment:
   I try to avoid adding code just for the sake of testing, inherently you're 
testing a different path than it would normally would.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] feat: support append data file and add e2e test [iceberg-rust]

2024-04-25 Thread via GitHub


Fokko commented on code in PR #349:
URL: https://github.com/apache/iceberg-rust/pull/349#discussion_r1579004695


##
crates/iceberg/src/io.rs:
##
@@ -368,6 +368,9 @@ impl Storage {
 new_props.insert("root".to_string(), DEFAULT_ROOT_PATH.to_string());
 
 match scheme {
+Scheme::Memory => Ok(Self::LocalFs {

Review Comment:
   I try to avoid adding code just for the sake of testing :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] feat: support append data file and add e2e test [iceberg-rust]

2024-04-25 Thread via GitHub


Fokko commented on code in PR #349:
URL: https://github.com/apache/iceberg-rust/pull/349#discussion_r1578992843


##
crates/iceberg/src/transaction.rs:
##
@@ -121,6 +166,270 @@ impl<'a> Transaction<'a> {
 }
 }
 
+/// FastAppendAction is a transaction action for fast append data files to the 
table.
+pub struct FastAppendAction<'a> {
+tx: Transaction<'a>,
+
+parent_snapshot_id: Option,
+snapshot_id: i64,
+schema: Schema,
+schema_id: i32,
+format_version: FormatVersion,
+partition_spec: PartitionSpec,
+key_metadata: Vec,
+
+commit_uuid: String,
+manifest_id: i64,
+
+appended_data_files: Vec,
+}
+
+impl<'a> FastAppendAction<'a> {
+#[allow(clippy::too_many_arguments)]
+pub(crate) fn new(
+tx: Transaction<'a>,
+parent_snapshot_id: Option,
+snapshot_id: i64,
+schema: Schema,
+schema_id: i32,
+format_version: FormatVersion,
+partition_spec: PartitionSpec,
+key_metadata: Vec,
+commit_uuid: String,
+) -> Result {
+Ok(Self {
+tx,
+parent_snapshot_id,
+snapshot_id,
+schema,
+schema_id,
+format_version,
+partition_spec,
+key_metadata,
+commit_uuid,
+manifest_id: 0,
+appended_data_files: vec![],
+})
+}
+
+/// Add data files to the snapshot.
+pub fn add_data_files(
+&mut self,
+data_file: impl IntoIterator,
+) -> Result<&mut Self> {
+self.appended_data_files.extend(data_file);
+Ok(self)
+}
+
+fn generate_manifest_file_path(&mut self) -> String {
+let manifest_id = self.manifest_id;
+self.manifest_id += 1;
+format!(
+"{}/{}/{}-m{}.{}",
+self.tx.table.metadata().location(),
+META_ROOT_PATH,
+&self.commit_uuid,
+manifest_id,
+DataFileFormat::Avro
+)
+}
+
+async fn manifest_from_parent_snapshot(&self) -> Result> 
{
+if let Some(snapshot) = self.tx.table.metadata().current_snapshot() {
+let manifest_list = snapshot
+.load_manifest_list(self.tx.table.file_io(), 
&self.tx.table.metadata_ref())
+.await?;
+let mut manifest_files = 
Vec::with_capacity(manifest_list.entries().len());
+for entry in manifest_list.entries() {
+// From: 
https://github.com/apache/iceberg-python/blob/659a951d6397ab280cae80206fe6e8e4be2d3738/pyiceberg/table/__init__.py#L2921
+// Why we need this?
+if entry.added_snapshot_id == self.snapshot_id {
+continue;
+}
+let manifest = 
entry.load_manifest(self.tx.table.file_io()).await?;
+// Skip manifest with all delete entries.
+if manifest.entries().iter().all(|entry| !entry.is_alive()) {
+continue;
+}
+manifest_files.push(entry.clone());
+}
+Ok(manifest_files)
+} else {
+Ok(vec![])
+}
+}
+
+// Write manifest file for added data files and return the ManifestFile 
for ManifestList.
+async fn manifest_for_data_file(&mut self) -> Result {
+let appended_data_files = std::mem::take(&mut 
self.appended_data_files);
+let manifest_entries = appended_data_files
+.into_iter()
+.map(|data_file| {
+ManifestEntry::builder()
+.status(crate::spec::ManifestStatus::Added)
+.snapshot_id(self.snapshot_id)
+.data_file(data_file)
+.build()
+})
+.collect();
+let manifest_meta = ManifestMetadata::builder()
+.schema(self.schema.clone())
+.schema_id(self.schema_id)
+.format_version(self.format_version)
+.partition_spec(self.partition_spec.clone())
+.content(crate::spec::ManifestContentType::Data)
+.build();
+let manifest = Manifest::new(manifest_meta, manifest_entries);
+let writer = ManifestWriter::new(
+self.tx
+.table
+.file_io()
+.new_output(self.generate_manifest_file_path())?,
+self.snapshot_id,
+self.key_metadata.clone(),
+);
+writer.write(manifest).await
+}
+
+fn summary(&self) -> Summary {
+Summary {
+operation: crate::spec::Operation::Append,
+other: HashMap::new(),
+}
+}
+
+/// Finished building the action and apply it to the transaction.
+pub async fn apply(mut self) -> Result> {
+let summary = self.summary();
+let manifest = self.manifest_for_data_file().await?;
+let existing_manifest_files = 
self.manifest_from_par

Re: [PR] feat: support append data file and add e2e test [iceberg-rust]

2024-04-24 Thread via GitHub


ZENOTME commented on code in PR #349:
URL: https://github.com/apache/iceberg-rust/pull/349#discussion_r1578812779


##
crates/iceberg/src/transaction.rs:
##
@@ -121,6 +166,270 @@ impl<'a> Transaction<'a> {
 }
 }
 
+/// FastAppendAction is a transaction action for fast append data files to the 
table.
+pub struct FastAppendAction<'a> {
+tx: Transaction<'a>,
+
+parent_snapshot_id: Option,
+snapshot_id: i64,
+schema: Schema,
+schema_id: i32,
+format_version: FormatVersion,
+partition_spec: PartitionSpec,
+key_metadata: Vec,
+
+commit_uuid: String,
+manifest_id: i64,
+
+appended_data_files: Vec,
+}
+
+impl<'a> FastAppendAction<'a> {
+#[allow(clippy::too_many_arguments)]
+pub(crate) fn new(
+tx: Transaction<'a>,
+parent_snapshot_id: Option,
+snapshot_id: i64,
+schema: Schema,
+schema_id: i32,
+format_version: FormatVersion,
+partition_spec: PartitionSpec,
+key_metadata: Vec,
+commit_uuid: String,
+) -> Result {
+Ok(Self {
+tx,
+parent_snapshot_id,
+snapshot_id,
+schema,
+schema_id,
+format_version,
+partition_spec,
+key_metadata,
+commit_uuid,
+manifest_id: 0,
+appended_data_files: vec![],
+})
+}
+
+/// Add data files to the snapshot.
+pub fn add_data_files(
+&mut self,
+data_file: impl IntoIterator,
+) -> Result<&mut Self> {
+self.appended_data_files.extend(data_file);
+Ok(self)
+}
+
+fn generate_manifest_file_path(&mut self) -> String {
+let manifest_id = self.manifest_id;
+self.manifest_id += 1;
+format!(
+"{}/{}/{}-m{}.{}",
+self.tx.table.metadata().location(),
+META_ROOT_PATH,
+&self.commit_uuid,
+manifest_id,
+DataFileFormat::Avro
+)
+}
+
+async fn manifest_from_parent_snapshot(&self) -> Result> 
{
+if let Some(snapshot) = self.tx.table.metadata().current_snapshot() {
+let manifest_list = snapshot
+.load_manifest_list(self.tx.table.file_io(), 
&self.tx.table.metadata_ref())
+.await?;
+let mut manifest_files = 
Vec::with_capacity(manifest_list.entries().len());
+for entry in manifest_list.entries() {
+// From: 
https://github.com/apache/iceberg-python/blob/659a951d6397ab280cae80206fe6e8e4be2d3738/pyiceberg/table/__init__.py#L2921
+// Why we need this?
+if entry.added_snapshot_id == self.snapshot_id {
+continue;
+}
+let manifest = 
entry.load_manifest(self.tx.table.file_io()).await?;
+// Skip manifest with all delete entries.
+if manifest.entries().iter().all(|entry| !entry.is_alive()) {
+continue;
+}
+manifest_files.push(entry.clone());
+}
+Ok(manifest_files)
+} else {
+Ok(vec![])
+}
+}
+
+// Write manifest file for added data files and return the ManifestFile 
for ManifestList.
+async fn manifest_for_data_file(&mut self) -> Result {
+let appended_data_files = std::mem::take(&mut 
self.appended_data_files);
+let manifest_entries = appended_data_files
+.into_iter()
+.map(|data_file| {
+ManifestEntry::builder()
+.status(crate::spec::ManifestStatus::Added)
+.snapshot_id(self.snapshot_id)
+.data_file(data_file)
+.build()
+})
+.collect();
+let manifest_meta = ManifestMetadata::builder()
+.schema(self.schema.clone())
+.schema_id(self.schema_id)
+.format_version(self.format_version)
+.partition_spec(self.partition_spec.clone())
+.content(crate::spec::ManifestContentType::Data)
+.build();
+let manifest = Manifest::new(manifest_meta, manifest_entries);
+let writer = ManifestWriter::new(
+self.tx
+.table
+.file_io()
+.new_output(self.generate_manifest_file_path())?,
+self.snapshot_id,
+self.key_metadata.clone(),
+);
+writer.write(manifest).await
+}
+
+fn summary(&self) -> Summary {
+Summary {
+operation: crate::spec::Operation::Append,
+other: HashMap::new(),
+}
+}
+
+/// Finished building the action and apply it to the transaction.
+pub async fn apply(mut self) -> Result> {
+let summary = self.summary();
+let manifest = self.manifest_for_data_file().await?;
+let existing_manifest_files = 
self.manifest_from_p

Re: [PR] feat: support append data file and add e2e test [iceberg-rust]

2024-04-24 Thread via GitHub


ZENOTME commented on code in PR #349:
URL: https://github.com/apache/iceberg-rust/pull/349#discussion_r1578811481


##
crates/iceberg/src/io.rs:
##
@@ -368,6 +368,9 @@ impl Storage {
 new_props.insert("root".to_string(), DEFAULT_ROOT_PATH.to_string());
 
 match scheme {
+Scheme::Memory => Ok(Self::LocalFs {

Review Comment:
   To enable writing for unit tests, I add a schema for memory but consider it 
as LocalFs Storage, do we need to add a storage type for storage? cc 
@liurenjie1024 @Xuanwo 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] feat: support append data file and add e2e test [iceberg-rust]

2024-04-24 Thread via GitHub


ZENOTME commented on PR #349:
URL: https://github.com/apache/iceberg-rust/pull/349#issuecomment-2076295905

   cc @liurenjie1024 @Fokko @Xuanwo 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org