Re: [PR] [python] Improve the consumer_manager API and support in file_store_table [paimon]
JingsongLi merged PR #7415: URL: https://github.com/apache/paimon/pull/7415 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] [python] Improve the consumer_manager API and support in file_store_table [paimon]
JingsongLi commented on PR #7415: URL: https://github.com/apache/paimon/pull/7415#issuecomment-4047121723 +1 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] [python] Improve the consumer_manager API and support in file_store_table [paimon]
JingsongLi commented on code in PR #7415:
URL: https://github.com/apache/paimon/pull/7415#discussion_r2924540079
##
docs/content/pypaimon/python-api.md:
##
@@ -591,6 +591,125 @@ to the appropriate rollback logic.
| f is not in [l1, l2] | PredicateBuilder.is_not_in(f, [l1, l2]) |
| lower <= f <= upper | PredicateBuilder.between(f, lower, upper) |
+## Consumer Management
+
+Consumer management allows you to track consumption progress, prevent snapshot
expiration, and resume from breakpoints.
+
+### Create ConsumerManager
+
+```python
+from pypaimon import CatalogFactory
+from pypaimon.consumer.consumer_manager import ConsumerManager
+
+# Get table and file_io
+catalog = CatalogFactory.create({'warehouse': 'file:///path/to/warehouse'})
+table = catalog.get_table('database_name.table_name')
+file_io = table.file_io()
+
+# Create consumer manager
+manager = table.consumer_manager
Review Comment:
table.consumer_manager()
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] [python] Improve the consumer_manager API and support in file_store_table [paimon]
JingsongLi commented on code in PR #7415: URL: https://github.com/apache/paimon/pull/7415#discussion_r2924537374 ## paimon-python/pypaimon/table/file_store_table.py: ## @@ -61,6 +62,7 @@ def __init__(self, file_io: FileIO, identifier: Identifier, table_path: str, self.total_buckets = self.options.bucket() self.schema_manager = SchemaManager(file_io, table_path) +self.consumer_manager = ConsumerManager(file_io, table_path, self.current_branch()) Review Comment: just provide a method `consumer_manager()`, and import `ConsumerManager` in it. ## docs/content/pypaimon/python-api.md: ## @@ -591,6 +591,125 @@ to the appropriate rollback logic. | f is not in [l1, l2] | PredicateBuilder.is_not_in(f, [l1, l2]) | | lower <= f <= upper | PredicateBuilder.between(f, lower, upper) | +## Consumer Management + +Consumer management allows you to track consumption progress, prevent snapshot expiration, and resume from breakpoints. + +### Create ConsumerManager + +```python +from pypaimon import CatalogFactory +from pypaimon.consumer.consumer_manager import ConsumerManager Review Comment: No need to import? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] [python] Improve the consumer_manager API and support in file_store_table [paimon]
xuzifu666 commented on code in PR #7415:
URL: https://github.com/apache/paimon/pull/7415#discussion_r2924355725
##
docs/content/pypaimon/python-api.md:
##
@@ -591,6 +591,129 @@ to the appropriate rollback logic.
| f is not in [l1, l2] | PredicateBuilder.is_not_in(f, [l1, l2]) |
| lower <= f <= upper | PredicateBuilder.between(f, lower, upper) |
+## Consumer Management
+
+Consumer management allows you to track consumption progress, prevent snapshot
expiration, and resume from breakpoints.
+
+### Create ConsumerManager
+
+```python
+from pypaimon import CatalogFactory
+from pypaimon.consumer.consumer_manager import ConsumerManager
+
+# Get table and file_io
+catalog = CatalogFactory.create({'warehouse': 'file:///path/to/warehouse'})
+table = catalog.get_table('database_name.table_name')
+file_io = table.file_io()
+
+# Create consumer manager
+manager = table.consumer_manager
+```
+
+### Get Consumer
+
+Retrieve a consumer by its ID:
+
+```python
+from pypaimon.consumer.consumer import Consumer
+
+consumer = manager.consumer('consumer_id')
+if consumer:
+print(f"Next snapshot: {consumer.next_snapshot}")
+else:
+print("Consumer not found")
+```
+
+### Reset Consumer
+
+Create or reset a consumer with a new snapshot ID:
+
+```python
+# Reset consumer to snapshot 10
+manager.reset_consumer('consumer_id', Consumer(next_snapshot=10))
+```
+
+### Delete Consumer
+
+Delete a consumer by its ID:
+
+```python
+manager.delete_consumer('consumer_id')
+```
+
+### List Consumers
+
+Get all consumers with their next snapshot IDs:
+
+```python
+consumers = manager.consumers()
+for consumer_id, next_snapshot in consumers.items():
+print(f"Consumer {consumer_id}: next snapshot {next_snapshot}")
+```
+
+### List All Consumer IDs
+
+List all consumer IDs:
+
+```python
+consumer_ids = manager.list_all_ids()
+for consumer_id in consumer_ids:
+print(consumer_id)
+```
+
+### Get Minimum Next Snapshot
+
+Get the minimum next snapshot across all consumers:
+
+```python
+min_snapshot = manager.min_next_snapshot()
+if min_snapshot:
+print(f"Minimum next snapshot: {min_snapshot}")
+```
+
+### Expire Consumers
+
+Expire consumers modified before a given datetime:
+
+```python
+from datetime import datetime, timedelta
+
+# Expire consumers older than 1 day
+expire_time = datetime.now() - timedelta(days=1)
+manager.expire(expire_time)
+```
+
+### Clear Consumers
+
+Clear consumers matching regular expression patterns:
+
+```python
+# Clear all consumers starting with "test_"
+manager.clear_consumers('test_.*')
+
+# Clear all consumers except those starting with "prod_"
+manager.clear_consumers(
+'.*',
+'prod_.*'
+)
+```
+
+### Branch Support
+
+ConsumerManager supports multiple branches:
+
+```python
+# Main branch (default)
+manager_main = ConsumerManager(file_io, table.location())
+
+# Custom branch
+manager_branch = ConsumerManager(file_io, table.location(),
branch='feature_branch')
Review Comment:
Done.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] [python] Improve the consumer_manager API and support in file_store_table [paimon]
JingsongLi commented on code in PR #7415:
URL: https://github.com/apache/paimon/pull/7415#discussion_r2924105265
##
docs/content/pypaimon/python-api.md:
##
@@ -591,6 +591,129 @@ to the appropriate rollback logic.
| f is not in [l1, l2] | PredicateBuilder.is_not_in(f, [l1, l2]) |
| lower <= f <= upper | PredicateBuilder.between(f, lower, upper) |
+## Consumer Management
+
+Consumer management allows you to track consumption progress, prevent snapshot
expiration, and resume from breakpoints.
+
+### Create ConsumerManager
+
+```python
+from pypaimon import CatalogFactory
+from pypaimon.consumer.consumer_manager import ConsumerManager
+
+# Get table and file_io
+catalog = CatalogFactory.create({'warehouse': 'file:///path/to/warehouse'})
+table = catalog.get_table('database_name.table_name')
+file_io = table.file_io()
+
+# Create consumer manager
+manager = table.consumer_manager
+```
+
+### Get Consumer
+
+Retrieve a consumer by its ID:
+
+```python
+from pypaimon.consumer.consumer import Consumer
+
+consumer = manager.consumer('consumer_id')
+if consumer:
+print(f"Next snapshot: {consumer.next_snapshot}")
+else:
+print("Consumer not found")
+```
+
+### Reset Consumer
+
+Create or reset a consumer with a new snapshot ID:
+
+```python
+# Reset consumer to snapshot 10
+manager.reset_consumer('consumer_id', Consumer(next_snapshot=10))
+```
+
+### Delete Consumer
+
+Delete a consumer by its ID:
+
+```python
+manager.delete_consumer('consumer_id')
+```
+
+### List Consumers
+
+Get all consumers with their next snapshot IDs:
+
+```python
+consumers = manager.consumers()
+for consumer_id, next_snapshot in consumers.items():
+print(f"Consumer {consumer_id}: next snapshot {next_snapshot}")
+```
+
+### List All Consumer IDs
+
+List all consumer IDs:
+
+```python
+consumer_ids = manager.list_all_ids()
+for consumer_id in consumer_ids:
+print(consumer_id)
+```
+
+### Get Minimum Next Snapshot
+
+Get the minimum next snapshot across all consumers:
+
+```python
+min_snapshot = manager.min_next_snapshot()
+if min_snapshot:
+print(f"Minimum next snapshot: {min_snapshot}")
+```
+
+### Expire Consumers
+
+Expire consumers modified before a given datetime:
+
+```python
+from datetime import datetime, timedelta
+
+# Expire consumers older than 1 day
+expire_time = datetime.now() - timedelta(days=1)
+manager.expire(expire_time)
+```
+
+### Clear Consumers
+
+Clear consumers matching regular expression patterns:
+
+```python
+# Clear all consumers starting with "test_"
+manager.clear_consumers('test_.*')
+
+# Clear all consumers except those starting with "prod_"
+manager.clear_consumers(
+'.*',
+'prod_.*'
+)
+```
+
+### Branch Support
+
+ConsumerManager supports multiple branches:
+
+```python
+# Main branch (default)
+manager_main = ConsumerManager(file_io, table.location())
+
+# Custom branch
+manager_branch = ConsumerManager(file_io, table.location(),
branch='feature_branch')
Review Comment:
branch_manager = manager.with_branch('xxx')
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] [python] Improve the consumer_manager API and support in file_store_table [paimon]
xuzifu666 commented on code in PR #7415:
URL: https://github.com/apache/paimon/pull/7415#discussion_r2923538104
##
docs/content/pypaimon/python-api.md:
##
@@ -591,6 +591,132 @@ to the appropriate rollback logic.
| f is not in [l1, l2] | PredicateBuilder.is_not_in(f, [l1, l2]) |
| lower <= f <= upper | PredicateBuilder.between(f, lower, upper) |
+## Consumer Management
+
+Consumer management allows you to track consumption progress, prevent snapshot
expiration, and resume from breakpoints.
+
+### Create ConsumerManager
+
+```python
+from pypaimon import CatalogFactory
+from pypaimon.consumer.consumer_manager import ConsumerManager
+
+# Get table and file_io
+catalog = CatalogFactory.create({'warehouse': 'file:///path/to/warehouse'})
+table = catalog.get_table('database_name.table_name')
+file_io = table.file_io()
+
+# Create consumer manager
+manager = ConsumerManager(file_io, table.location())
Review Comment:
Makes sense, it has been changed.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] [python] Improve the consumer_manager API and support in file_store_table [paimon]
JingsongLi commented on code in PR #7415:
URL: https://github.com/apache/paimon/pull/7415#discussion_r2923458809
##
docs/content/pypaimon/python-api.md:
##
@@ -591,6 +591,132 @@ to the appropriate rollback logic.
| f is not in [l1, l2] | PredicateBuilder.is_not_in(f, [l1, l2]) |
| lower <= f <= upper | PredicateBuilder.between(f, lower, upper) |
+## Consumer Management
+
+Consumer management allows you to track consumption progress, prevent snapshot
expiration, and resume from breakpoints.
+
+### Create ConsumerManager
+
+```python
+from pypaimon import CatalogFactory
+from pypaimon.consumer.consumer_manager import ConsumerManager
+
+# Get table and file_io
+catalog = CatalogFactory.create({'warehouse': 'file:///path/to/warehouse'})
+table = catalog.get_table('database_name.table_name')
+file_io = table.file_io()
+
+# Create consumer manager
+manager = ConsumerManager(file_io, table.location())
Review Comment:
ConsumerManager should be obtained from table?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
