Re: [PR] Deduplicate PhysicalExpr on proto ser/de using Arc pointer addresses [datafusion]
adriangb commented on PR #18192: URL: https://github.com/apache/datafusion/pull/18192#issuecomment-3735720918 Wondering if anyone who was interested in this PR would be willing to continue work on or donate review time to https://github.com/apache/datafusion/pull/19437? Tim has done the bulk of the work we just need to get it across the line. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Deduplicate PhysicalExpr on proto ser/de using Arc pointer addresses [datafusion]
github-actions[bot] closed pull request #18192: Deduplicate PhysicalExpr on proto ser/de using Arc pointer addresses URL: https://github.com/apache/datafusion/pull/18192 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Deduplicate PhysicalExpr on proto ser/de using Arc pointer addresses [datafusion]
github-actions[bot] commented on PR #18192: URL: https://github.com/apache/datafusion/pull/18192#issuecomment-3706604758 Thank you for your contribution. Unfortunately, this pull request is stale because it has been open 60 days with no activity. Please remove the stale label or comment or this will be closed in 7 days. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Deduplicate PhysicalExpr on proto ser/de using Arc pointer addresses [datafusion]
adriangb commented on PR #18192: URL: https://github.com/apache/datafusion/pull/18192#issuecomment-3482747583 I proposed a tangential discussion in https://github.com/apache/datafusion/issues/18477 about the big picture of proto serde. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Deduplicate PhysicalExpr on proto ser/de using Arc pointer addresses [datafusion]
gabotechs commented on PR #18192: URL: https://github.com/apache/datafusion/pull/18192#issuecomment-3450919514 I think it might be worth to have an end-to-end plan for bringing dynamic filters into distributed contexts before continuing this discussion, as having the full picture on the table can help us better inform decisions. Created a ticket for bringing discussions there: https://github.com/apache/datafusion/issues/18296 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Deduplicate PhysicalExpr on proto ser/de using Arc pointer addresses [datafusion]
milenkovicm commented on PR #18192: URL: https://github.com/apache/datafusion/pull/18192#issuecomment-3447827774 > It depends on how you do the math (I did `(5.7-4)/4`) but that's a detail. lol, Im not sure you can redefine mats, there are strict rules around it 😂 > IMO "control plane" vs "data plane" can get a bit blurry, eg in the case of `InList` it's possible (and I think common) to have relatively large amounts of data in the "control plane". > > Besides: all of this is to further enable an optimization (dynamic filters) that can [make queries 25x faster](https://datafusion.apache.org/blog/2025/09/10/dynamic-filters/). We are in fact discussing using `InList` to push down join hash tables so large `InList` expressions are an excellent example. So what I'm saying is "let's make the data plane 25x faster by at the same time making the control plane use 30% less memory, that requires some breaking API changes, we should figure out what those are". I think that's a pretty compelling story. I'm not sure why this argument is valid in this context 😕 I had sad nothing related to dynamic filters. > DataFusion makes plenty of breaking API changes, I don't even think this is that egregious of one. That's perfectly fine but let's have some kind of high bar when we do that. > Is it API changes in this part of code in general that you're opposed to, or mainly the footgun of re-using a cache / context causing collisions? I'm sure the later can be addressed in some way. Saving a 1.7MB on a executor which use 8GB does not make huge difference, that like 0.02%, yet for that you have introduced a possibility to shoot your foot and made interface more complex -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Deduplicate PhysicalExpr on proto ser/de using Arc pointer addresses [datafusion]
adriangb commented on PR #18192: URL: https://github.com/apache/datafusion/pull/18192#issuecomment-3447787020 It depends on how you do the math (I did `(5.7-4)/4`) but that's a detail. IMO "control plane" vs "data plane" can get a bit blurry, eg in the case of `InList` it's possible (and I think common) to have relatively large amounts of data in the "control plane". Besides: all of this is to further enable an optimization (dynamic filters) that can [make queries 25x faster](https://datafusion.apache.org/blog/2025/09/10/dynamic-filters/). We are in fact discussing using `InList` to push down join hash tables so large `InList` expressions are an excellent example. So what I'm saying is "let's make the data plane 25x faster by at the same time making the control plane use 30% less memory, that requires some breaking API changes, we should figure out what those are". DataFusion makes plenty of breaking API changes, I don't even think this is that egregious of one. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Deduplicate PhysicalExpr on proto ser/de using Arc pointer addresses [datafusion]
milenkovicm commented on PR #18192: URL: https://github.com/apache/datafusion/pull/18192#issuecomment-3447770818 5.7 to 4 is around 30% saved, if I'm not mistaken 😀 This optimisation is on the "control plane" not on the "data plane" (30% on data plane would make huge difference, we would not have this discussion in that case). IMHO, small improvements on the "control plane" does not justify additional moving part or increase of interface/protocol complexity. If we put current limitations in the API description we have made it (API) more complex, and provided an avenue to introduce bugs as someone did not read documentation I have never seen decoding having any significant impact, most tasks will take quite more time crunching data, compared to few microseconds saved in decoding (on top of that data has already been moved over the network). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Deduplicate PhysicalExpr on proto ser/de using Arc pointer addresses [datafusion]
adriangb commented on PR #18192:
URL: https://github.com/apache/datafusion/pull/18192#issuecomment-3447736026
> Most users do not have this problem to start with.
I'd argue that most users _do_ have this problem. Consider a query like:
```sql
SELECT *
FROM 'file.parquet'
WHERE id IN (1, 2, 3, 4, 5...);
```
This PR improves memory usage for this query by avoiding duplicating the
`InList` expression in a `FilterExec` and `ParquetSource` when deserializing.
Here are flame graphs from `main` and `dedupe-expr` respectively:
https://github.com/user-attachments/assets/a596f941-18e3-464e-b8a9-f3732351fceb";
/>
https://github.com/user-attachments/assets/c63d1608-1a7d-467c-b325-d5fe05c1319d";
/>
Raw data:
[pprof.zip](https://github.com/user-attachments/files/23144387/pprof.zip)
I generated this by adding the following example to `datafusion-examples`:
```rust
use datafusion::{common::Result, prelude::*};
use datafusion_proto::bytes::{physical_plan_from_bytes,
physical_plan_to_bytes};
use parquet::{arrow::ArrowWriter, file::properties::WriterProperties};
#[cfg(not(target_env = "msvc"))]
#[global_allocator]
static ALLOC: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc;
#[allow(non_upper_case_globals)]
#[export_name = "malloc_conf"]
pub static malloc_conf: &[u8] =
b"prof:true,prof_active:true,lg_prof_sample:19\0";
#[tokio::main]
async fn main() -> Result<()> {
let mut prof_ctl =
jemalloc_pprof::PROF_CTL.as_ref().unwrap().lock().await;
prof_ctl.activate().unwrap();
let _plan = {
let ctx = SessionContext::new();
let batches = ctx.sql("SELECT c FROM generate_series(1, 100)
t(c)").await?.collect().await?;
let file = std::fs::File::create("test.parquet")?;
let props = WriterProperties::builder()
// limit batch sizes so that we have useful statistics
.set_max_row_group_size(4096)
.build();
let mut writer = ArrowWriter::try_new(file, batches[0].schema(),
Some(props))?;
for batch in &batches {
writer.write(batch)?;
}
writer.close()?;
let mut df = ctx.read_parquet("test.parquet",
ParquetReadOptions::default()).await?;
df = df.filter(col("c").in_list((1_000..10_000).map(|v|
lit(v)).collect(), false))?;
let plan = df.create_physical_plan().await?;
physical_plan_from_bytes(&physical_plan_to_bytes(plan)?,
&ctx.task_ctx())?
};
let pprof = prof_ctl.dump_pprof().unwrap();
std::fs::write("proto_memory.pprof", pprof).unwrap();
Ok(())
}
```
Full diff:
[diff.txt](https://github.com/user-attachments/files/23144394/diff.txt)
It looks like this was able to deduplicate the `InList` expression, taking
memory use after deserializing from 5.7MB to 4MB. Not every plan is going to
have a 40% memory savings, but I think many plans will have some small amount
of memory saving, some will have even larger than 40%. This is also not
accounting for the CPU cycles saved by not deserializing the same thing
multiple times.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] Deduplicate PhysicalExpr on proto ser/de using Arc pointer addresses [datafusion]
adriangb commented on PR #18192:
URL: https://github.com/apache/datafusion/pull/18192#issuecomment-3447731663
Yeah that makes sense! I'm not sure how to encode that into the APIs, but
making it very explicit in the docs, etc. should be enough?
One thing I'm thinking is if there's a way to satisfy all of the input here
by making a new high level API.
An annoyance I've had with the Codec system is that it's not clear how to
inject extra behavior, it only functions as a fallback.
Something along the lines of:
```rust
pub trait Decoder {
fn decode_plan(&self, plan: PhysicalPlanNode) -> Result>;
fn decode_expression(&self, expression: PhysicalExprNode) ->
Result>;
}
pub struct DefaultDecoder {
ctx: TaskContext,
}
impl DefaultDecoder {
pub fn new(ctx: TaskContext) -> Self {
Self { ctx }
}
impl Decoder for DefaultDecoder {
fn decode_plan(decoder: & dyn Decoder, plan: PhysicalPlanNode) ->
Result> {
// Essentially the code inside of
`PhysicalPlanNode::try_from_physical_plan`
// but passing around a reference to ourselves as `&dyn Decoder` so
that e.g. if we have to decode
// predicates inside of a plan it calls back into `decode_expression`
// Maybe delegates to
}
fn decode_expression(decoder: &dyn Decoder expression: ExpressionNode,
input_schema: &Schema) -> Result> {
// essentially the code inside of `parse_physical_expr` but again
passing around a reference to ourselves
}
}
```
Then it's easy to make a custom `DefaultDecoder` that injects before/after
behavior, e.g. caching or re-attaching custom bits to plans. I'd imagine we get
rid of the `Codec` bit and have defaults just error if you don't match and
handle extension/custom types yourself.
Anyway that's a half baked idea and that discussion may be a blocker for
this PR but I think it is largely unrelated to "is the deduplication worth
doing by default", I'll address that in my next comment.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] Deduplicate PhysicalExpr on proto ser/de using Arc pointer addresses [datafusion]
milenkovicm commented on PR #18192: URL: https://github.com/apache/datafusion/pull/18192#issuecomment-3447579397 > > if the TaskContext and DecodeContext are reused to decode protos from multiple clients or even from one client sending plan after plan, is there a chance of id collision? > > Yes, my intention was that you create a new `DecodeContext` per plan that you decode. then public methods should consume (and invalidate) `DecodeContext` instead of passing is as a reference -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Deduplicate PhysicalExpr on proto ser/de using Arc pointer addresses [datafusion]
adriangb commented on PR #18192: URL: https://github.com/apache/datafusion/pull/18192#issuecomment-3447067332 > if the TaskContext and DecodeContext are reused to decode protos from multiple clients or even from one client sending plan after plan, is there a chance of id collision? Yes, my intention was that you create a new `DecodeContext` per plan that you decode. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Deduplicate PhysicalExpr on proto ser/de using Arc pointer addresses [datafusion]
milenkovicm commented on code in PR #18192:
URL: https://github.com/apache/datafusion/pull/18192#discussion_r2462959883
##
datafusion/proto/src/bytes/mod.rs:
##
@@ -313,7 +313,8 @@ pub fn physical_plan_from_json(
let back: protobuf::PhysicalPlanNode = serde_json::from_str(json)
.map_err(|e| plan_datafusion_err!("Error serializing plan: {e}"))?;
let extension_codec = DefaultPhysicalExtensionCodec {};
-back.try_into_physical_plan(&ctx, &extension_codec)
+let decode_ctx = DecodeContext::new(ctx);
Review Comment:
this comet is added for consistency with other public methods expecting
DecodeCtx but I'm not sure we should expose &DecodeContext in public methods,
explanation in following comment
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] Deduplicate PhysicalExpr on proto ser/de using Arc pointer addresses [datafusion]
milenkovicm commented on PR #18192:
URL: https://github.com/apache/datafusion/pull/18192#issuecomment-3446916555
One simple question: if the `TaskContext` and `DecodeContext` are reused to
decode `protos` from multiple clients or even from one client sending plan
after plan, is there a chance of id collision?
I believe the current implementation does not prevent the reuse of
`DecodeContext` for two different plans.
```rust
let task_ctx = ctx.task_ctx();
let decode_ctx = DecodeContext::new(&task_ctx);
let result_exec_plan: Arc = proto
.try_into_physical_plan(&decode_ctx, codec)
.expect("from proto");
```
Two subsequent encoded plans coming to the same `DecodeContext` (one after
the other, even from a same client) may have identical IDs for different
expressions (we were unlucky to have one arc doped and another arc created at
the same location for two different queries coming one after the other. I know
we have to be very, very unlucky, but we have no guarantee it wont).
Hence, we need to consume `DecodeContext` after decoding to prevent its
re-use for the next decode in all public interfaces, preventing its re-use.
I can't really claim but current approach use of arc address may be safe for
referencing expressions within single plan.
If `DecodeContext` can't be re-used, how many times cache will be hit, and
would it really save that much resources to add additional moving part, and
changing public interface again?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] Deduplicate PhysicalExpr on proto ser/de using Arc pointer addresses [datafusion]
milenkovicm commented on code in PR #18192:
URL: https://github.com/apache/datafusion/pull/18192#discussion_r2462956314
##
datafusion/proto/src/bytes/mod.rs:
##
@@ -333,5 +334,6 @@ pub fn physical_plan_from_bytes_with_extension_codec(
) -> Result> {
let protobuf = protobuf::PhysicalPlanNode::decode(bytes)
.map_err(|e| plan_datafusion_err!("Error decoding expr as protobuf:
{e}"))?;
-protobuf.try_into_physical_plan(ctx, extension_codec)
+let decode_ctx = DecodeContext::new(ctx);
Review Comment:
this comet is added for consistency with other public methods expecting
`DecodeCtx` but I'm not sure we should expose &DecodeContext in public methods,
explanation in following comment
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] Deduplicate PhysicalExpr on proto ser/de using Arc pointer addresses [datafusion]
milenkovicm commented on code in PR #18192:
URL: https://github.com/apache/datafusion/pull/18192#discussion_r2462926295
##
datafusion/proto/src/bytes/mod.rs:
##
@@ -313,7 +313,8 @@ pub fn physical_plan_from_json(
let back: protobuf::PhysicalPlanNode = serde_json::from_str(json)
.map_err(|e| plan_datafusion_err!("Error serializing plan: {e}"))?;
let extension_codec = DefaultPhysicalExtensionCodec {};
-back.try_into_physical_plan(&ctx, &extension_codec)
+let decode_ctx = DecodeContext::new(ctx);
Review Comment:
should decode_ctx be method parameter rather than created here?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] Deduplicate PhysicalExpr on proto ser/de using Arc pointer addresses [datafusion]
milenkovicm commented on code in PR #18192:
URL: https://github.com/apache/datafusion/pull/18192#discussion_r2462926232
##
datafusion/proto/src/bytes/mod.rs:
##
@@ -333,5 +334,6 @@ pub fn physical_plan_from_bytes_with_extension_codec(
) -> Result> {
let protobuf = protobuf::PhysicalPlanNode::decode(bytes)
.map_err(|e| plan_datafusion_err!("Error decoding expr as protobuf:
{e}"))?;
-protobuf.try_into_physical_plan(ctx, extension_codec)
+let decode_ctx = DecodeContext::new(ctx);
Review Comment:
should decode_ctx be method parameter rather than created here?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] Deduplicate PhysicalExpr on proto ser/de using Arc pointer addresses [datafusion]
milenkovicm commented on PR #18192: URL: https://github.com/apache/datafusion/pull/18192#issuecomment-3446701092 > > If we want to proceed in this direction, making cache disabled by default would make sense, as the benefits of having it on are not really obvious and very use case specific. > > Isn't there a benefit for all users of reducing blowup of duplicate expressions? If duplicate expressions aren't a problem we wouldn't be Arcing them in the first place. > > The cost is miniscule: a hashmap of integers and pointers. Most users do not have this problem to start with. It's not issue with performance overhead issue is with user generated ids, and subtitle bugs it can bring. Also I believe it's trivial to have two implementation one which will cache other which won't and change it as needed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Deduplicate PhysicalExpr on proto ser/de using Arc pointer addresses [datafusion]
timsaucer commented on PR #18192: URL: https://github.com/apache/datafusion/pull/18192#issuecomment-3446693442 > > I'm not clear on what the pros/cons of `&TaskContext` vs. `&dyn FunctionRegistry` are. I fear that some folks doing distributed (cc @gabotechs) want "more" there not less while other folks doing FFI (I'm guessing that's your use case right @timsaucer?) want "less", those two things seem to be opposed with each other. I guess as long as downcast matching is possible it may be okay to have the `&dyn FunctionRegistry` in the API and have implementations that want a `&TaskContext` do some downcasting? Either way that seems like a bigger discussion to have on your PR / proposal / issue. > > > +1 on the impl Into> on the signatures as I think that will make for a much more ergonomic experience > > > > > > Hmm the only reason I see to do that is for backwards compatibility. The complexity and opacity introduced is not worth it otherwise IMO. > > I can chime in here, `TaskContext` replaced `SessionContext` in #17601 , `SessionContext` was needed to provide `RuntimeEnv` This is good to know. Then either as this PR or as a follow on, it would be good to move the current `&dfn FunctionRegistry` on the logical side over to `&TaskContext` or `DecodeContext`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Deduplicate PhysicalExpr on proto ser/de using Arc pointer addresses [datafusion]
adriangb commented on PR #18192: URL: https://github.com/apache/datafusion/pull/18192#issuecomment-3446687312 > If we want to proceed in this direction, making cache disabled by default would make sense, as the benefits of having it on are not really obvious and very use case specific. Isn't there a benefit for all users of reducing blowup of duplicate expressions? If duplicate expressions aren't a problem we wouldn't be Arcing them in the first place. The cost is miniscule: a hashmap of integers and pointers. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Deduplicate PhysicalExpr on proto ser/de using Arc pointer addresses [datafusion]
milenkovicm commented on PR #18192: URL: https://github.com/apache/datafusion/pull/18192#issuecomment-3445960495 > > I'm not clear on what the pros/cons of &TaskContext vs. &dyn FunctionRegistry are. I fear that some folks doing distributed (cc @gabotechs) want "more" > > Note that having a `DecodeContext` or a `impl Into>` for tracking ids of expressions derived from their pointer addresses still leaves part of the challenge unsolved, as we'll still not be able use that for communicating dynamic filter updates over the wire in a distributed context as is. > > It might be worth to at least have a plan on how to do that end to end before committing to introducing an API change that might need to get revisited for having a full solution. If we're talking about adding `impl Into>` I believe it makes sense to add `&dyn DecodeContext` better, with non caching implementation as the default. At the moment caching implementation **may have** benefits in very specific case, so for generic case having cache disabled does look like the best approach. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Deduplicate PhysicalExpr on proto ser/de using Arc pointer addresses [datafusion]
milenkovicm commented on PR #18192: URL: https://github.com/apache/datafusion/pull/18192#issuecomment-3445949938 I wonder in which cases decoding protobuf is a bottleneck? Do you have some flame graphs to show, or this might be theoretical bottleneck? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Deduplicate PhysicalExpr on proto ser/de using Arc pointer addresses [datafusion]
milenkovicm commented on PR #18192: URL: https://github.com/apache/datafusion/pull/18192#issuecomment-3445943092 > I'm not clear on what the pros/cons of `&TaskContext` vs. `&dyn FunctionRegistry` are. I fear that some folks doing distributed (cc @gabotechs) want "more" there not less while other folks doing FFI (I'm guessing that's your use case right @timsaucer?) want "less", those two things seem to be opposed with each other. I guess as long as downcast matching is possible it may be okay to have the `&dyn FunctionRegistry` in the API and have implementations that want a `&TaskContext` do some downcasting? Either way that seems like a bigger discussion to have on your PR / proposal / issue. > > > +1 on the impl Into> on the signatures as I think that will make for a much more ergonomic experience > > Hmm the only reason I see to do that is for backwards compatibility. The complexity and opacity introduced is not worth it otherwise IMO. I can chime in here, `TaskContext` replaced `SessionContext` in https://github.com/apache/datafusion/pull/17601 , `SessionContext` was needed to provide `RuntimeEnv` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Deduplicate PhysicalExpr on proto ser/de using Arc pointer addresses [datafusion]
gabotechs commented on PR #18192: URL: https://github.com/apache/datafusion/pull/18192#issuecomment-3445859583 > I'm not clear on what the pros/cons of &TaskContext vs. &dyn FunctionRegistry are. I fear that some folks doing distributed (cc @gabotechs) want "more" Note that having a `DecodeContext` or a `impl Into>` for tracking ids of expressions derived from their pointer addresses still leaves part of the challenge unsolved, as we'll still not be able use that for communicating dynamic filter updates over the wire in a distributed context as is. It might be worth to at least have a plan on how to do that end to end before committing to introducing an API change that might need to get revisited for having a full solution. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Deduplicate PhysicalExpr on proto ser/de using Arc pointer addresses [datafusion]
Jefffrey commented on PR #18192: URL: https://github.com/apache/datafusion/pull/18192#issuecomment-3445542589 > Thanks for the ping. One thing I was planning to do this weekend was to write up a PR to move from `TaskContext` to `dyn FunctionRegistry`, which `TaskContext` implements. I believe that the function registry is the only portion of the `TaskContext` we need. I believe this would simplify the code as we currently have two paths, some like `parse_physical_exprs` that take `&TaskContext` and some like `parse_expr` that take `&dyn FunctionRegistry`. > > I'm happy to put that PR in, but since you're digging into this bit of the code maybe we can include it? > > Also +1 on the `impl Into>` on the signatures as I think that will make for a much more ergonomic experience. > > I think the core idea is a good one. In regards to `parse_expr`, I actually have a PR that changes it to accept `TaskContext` in order to support subqueries, see #18167 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Deduplicate PhysicalExpr on proto ser/de using Arc pointer addresses [datafusion]
adriangb commented on PR #18192: URL: https://github.com/apache/datafusion/pull/18192#issuecomment-3445155246 I'm not clear on what the pros/cons of `&TaskContext` vs. `&dyn FunctionRegistry` are. I fear that some folks doing distributed (cc @gabotechs) want "more" there not less while other folks doing FFI (I'm guessing that's your use case right @timsaucer?) want "less", those two things seem to be opposed with each other. I guess as long as downcast matching is possible it may be okay to have the `&dyn FunctionRegistry` in the API and have implementations that want a `&TaskContext` do some downcasting? Either way that seems like a bigger discussion to have on your PR / proposal / issue. > +1 on the impl Into> on the signatures as I think that will make for a much more ergonomic experience Hmm the only reason I see to do that is for backwards compatibility. The complexity and opacity introduced is not worth it otherwise IMO. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Deduplicate PhysicalExpr on proto ser/de using Arc pointer addresses [datafusion]
timsaucer commented on PR #18192: URL: https://github.com/apache/datafusion/pull/18192#issuecomment-3444863440 Thanks for the ping. One thing I was planning to do this weekend was to write up a PR to move from `TaskContext` to `dyn FunctionRegistry`, which `TaskContext` implements. I believe that the function registry is the only portion of the `TaskContext` we need. I believe this would simplify the code as we currently have two paths, some like `parse_physical_exprs` that take `&TaskContext` and some like `parse_expr` that take `&dyn FunctionRegistry`. I'm happy to put that PR in, but since you're digging into this bit of the code maybe we can include it? Also +1 on the `impl Into>` on the signatures as I think that will make for a much more ergonomic experience. I think the core idea is a good one. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Deduplicate PhysicalExpr on proto ser/de using Arc pointer addresses [datafusion]
adriangb commented on code in PR #18192:
URL: https://github.com/apache/datafusion/pull/18192#discussion_r2455206114
##
datafusion/proto/src/physical_plan/from_proto.rs:
##
@@ -207,10 +212,19 @@ where
/// * `codec` - An extension codec used to decode custom UDFs.
pub fn parse_physical_expr(
proto: &protobuf::PhysicalExprNode,
-ctx: &TaskContext,
+decode_ctx: &DecodeContext,
input_schema: &Schema,
codec: &dyn PhysicalExtensionCodec,
) -> Result> {
+// Check cache first if an ID is present
+if let Some(id) = proto.id {
+if let Some(cached) = decode_ctx.get_cached_expr(id) {
+return Ok(cached);
+}
+}
+
Review Comment:
I think we may just need to go in that direction. Since it wouldn't be used
for serialization it's okay if it's not implemented everywhere. As long as we
implement it on the key nodes we know have dynamic filters it should work.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] Deduplicate PhysicalExpr on proto ser/de using Arc pointer addresses [datafusion]
Jefffrey commented on PR #18192: URL: https://github.com/apache/datafusion/pull/18192#issuecomment-3434801204 > > > I think this makes sense to me. Main concern (other than the pointer bits) is the introduction of `DecodeContext`; I guess it wasn't easily possible to do via codec (I think you mentioned this already)? > > > > > > I don't see a better way to add the mutable context necessary for this to work. One olive branch I can extend to ease backwards compatibility is to `impl From<&'a TaskContext> for DecodeContext<'a>` and make `DecodeContext<'a>` `Clone` and then change the public signature of the functions that now require a `DecodeContext` to accept `impl Into>`. But that is quite a bit of added complexity, personally I don't think it's worth it but I can implement that if reviewers feel it is required. > > Something like this: https://github.com/pydantic/datafusion/pull/41/files Hmm yeah I don't this is is any better, since we might as well go all the way instead of a halfway solution 😅 I'll cc @timsaucer too as they also changed the signatures recently for proto physical plan in #18123 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Deduplicate PhysicalExpr on proto ser/de using Arc pointer addresses [datafusion]
adriangb commented on PR #18192: URL: https://github.com/apache/datafusion/pull/18192#issuecomment-3433253071 > > I think this makes sense to me. Main concern (other than the pointer bits) is the introduction of `DecodeContext`; I guess it wasn't easily possible to do via codec (I think you mentioned this already)? > > I don't see a better way to add the mutable context necessary for this to work. One olive branch I can extend to ease backwards compatibility is to `impl From<&'a TaskContext> for DecodeContext<'a>` and make `DecodeContext<'a>` `Clone` and then change the public signature of the functions that now require a `DecodeContext` to accept `impl Into>`. But that is quite a bit of added complexity, personally I don't think it's worth it but I can implement that if reviewers feel it is required. Something like this: https://github.com/pydantic/datafusion/pull/41/files -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Deduplicate PhysicalExpr on proto ser/de using Arc pointer addresses [datafusion]
adriangb commented on PR #18192: URL: https://github.com/apache/datafusion/pull/18192#issuecomment-3433199893 > I think this makes sense to me. Main concern (other than the pointer bits) is the introduction of `DecodeContext`; I guess it wasn't easily possible to do via codec (I think you mentioned this already)? I don't see a better way to add the mutable context necessary for this to work. One olive branch I can extend to ease backwards compatibility is to `impl From<&'a TaskContext> for DecodeContext<'a>` and make `DecodeContext<'a>` `Clone` and then change the public signature of the functions that now require a `DecodeContext` to accept `impl Into>`. But that is quite a bit of added complexity, personally I don't think it's worth it but I can implement that if reviewers feel it is required. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Deduplicate PhysicalExpr on proto ser/de using Arc pointer addresses [datafusion]
adriangb commented on code in PR #18192:
URL: https://github.com/apache/datafusion/pull/18192#discussion_r2450148410
##
datafusion/proto/proto/datafusion.proto:
##
@@ -865,6 +865,10 @@ message PhysicalExprNode {
UnknownColumn unknown_column = 20;
}
+
+ // Optional ID for caching during deserialization.
+ // Set to the Arc pointer address during serialization to enable
deduplication.
Review Comment:
Very nice suggestion, I'll commit it tomorrow :)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] Deduplicate PhysicalExpr on proto ser/de using Arc pointer addresses [datafusion]
Jefffrey commented on code in PR #18192:
URL: https://github.com/apache/datafusion/pull/18192#discussion_r2450132472
##
datafusion/proto/proto/datafusion.proto:
##
@@ -865,6 +865,10 @@ message PhysicalExprNode {
UnknownColumn unknown_column = 20;
}
+
+ // Optional ID for caching during deserialization.
+ // Set to the Arc pointer address during serialization to enable
deduplication.
Review Comment:
Maybe we can change the documentation to something like:
```
// Optional ID for caching during deserialization. This is used for
deduplication,
// so PhysicalExprs with the same ID will be deserialized as Arcs pointing
to the
// same address (instead of distinct addresses) on the deserializing
machine.
//
// We use the Arc pointer address during serialization as the ID, as this
by default
// indicates if a PhysicalExpr is identical to another on the serializing
machine.
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] Deduplicate PhysicalExpr on proto ser/de using Arc pointer addresses [datafusion]
gabotechs commented on code in PR #18192:
URL: https://github.com/apache/datafusion/pull/18192#discussion_r2448962539
##
datafusion/proto/src/physical_plan/from_proto.rs:
##
@@ -207,10 +212,19 @@ where
/// * `codec` - An extension codec used to decode custom UDFs.
pub fn parse_physical_expr(
proto: &protobuf::PhysicalExprNode,
-ctx: &TaskContext,
+decode_ctx: &DecodeContext,
input_schema: &Schema,
codec: &dyn PhysicalExtensionCodec,
) -> Result> {
+// Check cache first if an ID is present
+if let Some(id) = proto.id {
+if let Some(cached) = decode_ctx.get_cached_expr(id) {
+return Ok(cached);
+}
+}
+
Review Comment:
I also played with the option of adding an `expressions(&self) -> Vec<&dyn
PhysicalExpr>` or something similar to the `ExecutionPlan` trait, like the
`children()` method, but it gets a bit messy as the relationship between
expressions and an `ExecutionPlan` is a bit different.
It was not too bad though, maybe that approach could be revisited.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] Deduplicate PhysicalExpr on proto ser/de using Arc pointer addresses [datafusion]
gabotechs commented on code in PR #18192:
URL: https://github.com/apache/datafusion/pull/18192#discussion_r2448962539
##
datafusion/proto/src/physical_plan/from_proto.rs:
##
@@ -207,10 +212,19 @@ where
/// * `codec` - An extension codec used to decode custom UDFs.
pub fn parse_physical_expr(
proto: &protobuf::PhysicalExprNode,
-ctx: &TaskContext,
+decode_ctx: &DecodeContext,
input_schema: &Schema,
codec: &dyn PhysicalExtensionCodec,
) -> Result> {
+// Check cache first if an ID is present
+if let Some(id) = proto.id {
+if let Some(cached) = decode_ctx.get_cached_expr(id) {
+return Ok(cached);
+}
+}
+
Review Comment:
I also played with the option of adding an ‘expressions(&self) -> Vec<&dyn
PhysicalExpr>’ or something similar to the ‘ExecutionPlan’ trait, like the
‘children’ method, but it gets a bit messy as the relationship between
expressions and an ‘ExecutionPlan’ is a bit different.
It was not too bad though, maybe that approach could be revisited.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] Deduplicate PhysicalExpr on proto ser/de using Arc pointer addresses [datafusion]
adriangb commented on PR #18192: URL: https://github.com/apache/datafusion/pull/18192#issuecomment-3427251878 @Jefffrey any chance you could give some input on this change? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Deduplicate PhysicalExpr on proto ser/de using Arc pointer addresses [datafusion]
adriangb commented on code in PR #18192:
URL: https://github.com/apache/datafusion/pull/18192#discussion_r2448731465
##
datafusion/proto/src/physical_plan/from_proto.rs:
##
@@ -207,10 +212,19 @@ where
/// * `codec` - An extension codec used to decode custom UDFs.
pub fn parse_physical_expr(
proto: &protobuf::PhysicalExprNode,
-ctx: &TaskContext,
+decode_ctx: &DecodeContext,
input_schema: &Schema,
codec: &dyn PhysicalExtensionCodec,
) -> Result> {
+// Check cache first if an ID is present
+if let Some(id) = proto.id {
+if let Some(cached) = decode_ctx.get_cached_expr(id) {
+return Ok(cached);
+}
+}
+
Review Comment:
Ah I see your point now. So we still need some way to get a reference to all
`Arc` outside of serialization 🤔
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] Deduplicate PhysicalExpr on proto ser/de using Arc pointer addresses [datafusion]
gabotechs commented on code in PR #18192:
URL: https://github.com/apache/datafusion/pull/18192#discussion_r2448506742
##
datafusion/proto/src/physical_plan/from_proto.rs:
##
@@ -207,10 +212,19 @@ where
/// * `codec` - An extension codec used to decode custom UDFs.
pub fn parse_physical_expr(
proto: &protobuf::PhysicalExprNode,
-ctx: &TaskContext,
+decode_ctx: &DecodeContext,
input_schema: &Schema,
codec: &dyn PhysicalExtensionCodec,
) -> Result> {
+// Check cache first if an ID is present
+if let Some(id) = proto.id {
+if let Some(cached) = decode_ctx.get_cached_expr(id) {
+return Ok(cached);
+}
+}
+
Review Comment:
But it will also not get serialized at all, any dynamic filter present above
the first network boundary (reading from top to bottom) will never suffer any
serialization or deserialization
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] Deduplicate PhysicalExpr on proto ser/de using Arc pointer addresses [datafusion]
gabotechs commented on code in PR #18192:
URL: https://github.com/apache/datafusion/pull/18192#discussion_r2448506742
##
datafusion/proto/src/physical_plan/from_proto.rs:
##
@@ -207,10 +212,19 @@ where
/// * `codec` - An extension codec used to decode custom UDFs.
pub fn parse_physical_expr(
proto: &protobuf::PhysicalExprNode,
-ctx: &TaskContext,
+decode_ctx: &DecodeContext,
input_schema: &Schema,
codec: &dyn PhysicalExtensionCodec,
) -> Result> {
+// Check cache first if an ID is present
+if let Some(id) = proto.id {
+if let Some(cached) = decode_ctx.get_cached_expr(id) {
+return Ok(cached);
+}
+}
+
Review Comment:
But it will also not get serialized at all, any dynamic filter present above
a network boundary will never suffer any serialization or deserialization
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] Deduplicate PhysicalExpr on proto ser/de using Arc pointer addresses [datafusion]
adriangb commented on code in PR #18192:
URL: https://github.com/apache/datafusion/pull/18192#discussion_r2448491118
##
datafusion/proto/src/physical_plan/from_proto.rs:
##
@@ -207,10 +212,19 @@ where
/// * `codec` - An extension codec used to decode custom UDFs.
pub fn parse_physical_expr(
proto: &protobuf::PhysicalExprNode,
-ctx: &TaskContext,
+decode_ctx: &DecodeContext,
input_schema: &Schema,
codec: &dyn PhysicalExtensionCodec,
) -> Result> {
+// Check cache first if an ID is present
+if let Some(id) = proto.id {
+if let Some(cached) = decode_ctx.get_cached_expr(id) {
+return Ok(cached);
+}
+}
+
Review Comment:
I guess we can inject the hooks when we serialize as well?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] Deduplicate PhysicalExpr on proto ser/de using Arc pointer addresses [datafusion]
adriangb commented on code in PR #18192:
URL: https://github.com/apache/datafusion/pull/18192#discussion_r2448494037
##
datafusion/proto/proto/datafusion.proto:
##
@@ -865,6 +865,10 @@ message PhysicalExprNode {
UnknownColumn unknown_column = 20;
}
+
+ // Optional ID for caching during deserialization.
+ // Set to the Arc pointer address during serialization to enable
deduplication.
Review Comment:
The thing is we're not really using the addresses as pointers, just as a way
to identify which two expressions are the same expression.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] Deduplicate PhysicalExpr on proto ser/de using Arc pointer addresses [datafusion]
gabotechs commented on code in PR #18192:
URL: https://github.com/apache/datafusion/pull/18192#discussion_r2448447879
##
datafusion/proto/proto/datafusion.proto:
##
@@ -865,6 +865,10 @@ message PhysicalExprNode {
UnknownColumn unknown_column = 20;
}
+
+ // Optional ID for caching during deserialization.
+ // Set to the Arc pointer address during serialization to enable
deduplication.
Review Comment:
Makes sense. Still feels a bit weird to use a pointer address as identifier
where in most cases serialization/deserialization will happen in different
machines, but I have to agree it gets the job done
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] Deduplicate PhysicalExpr on proto ser/de using Arc pointer addresses [datafusion]
gabotechs commented on code in PR #18192:
URL: https://github.com/apache/datafusion/pull/18192#discussion_r2448429028
##
datafusion/proto/src/physical_plan/from_proto.rs:
##
@@ -207,10 +212,19 @@ where
/// * `codec` - An extension codec used to decode custom UDFs.
pub fn parse_physical_expr(
proto: &protobuf::PhysicalExprNode,
-ctx: &TaskContext,
+decode_ctx: &DecodeContext,
input_schema: &Schema,
codec: &dyn PhysicalExtensionCodec,
) -> Result> {
+// Check cache first if an ID is present
+if let Some(id) = proto.id {
+if let Some(cached) = decode_ctx.get_cached_expr(id) {
+return Ok(cached);
+}
+}
+
Review Comment:
Note that it can happen that under certain circumstances the producer part
of a dynamic filter is never serialized/deserialized, as it might never get
sent over the wire, but the consumer part does. I imagine in this scenario we
will be left with an un-connected dynamic filter.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] Deduplicate PhysicalExpr on proto ser/de using Arc pointer addresses [datafusion]
adriangb commented on code in PR #18192:
URL: https://github.com/apache/datafusion/pull/18192#discussion_r2448132611
##
datafusion/proto/proto/datafusion.proto:
##
@@ -865,6 +865,10 @@ message PhysicalExprNode {
UnknownColumn unknown_column = 20;
}
+
+ // Optional ID for caching during deserialization.
+ // Set to the Arc pointer address during serialization to enable
deduplication.
Review Comment:
What this accomplishes is that if you deserialize both a `DataSourceExec`
and `TopK` execution node on the same machine at least those two are connected.
Connecting them *between* machines needs more work, e.g.
https://github.com/apache/datafusion/pull/18192#discussion_r2448127711
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] Deduplicate PhysicalExpr on proto ser/de using Arc pointer addresses [datafusion]
adriangb commented on code in PR #18192:
URL: https://github.com/apache/datafusion/pull/18192#discussion_r2448127711
##
datafusion/proto/src/physical_plan/from_proto.rs:
##
@@ -207,10 +212,19 @@ where
/// * `codec` - An extension codec used to decode custom UDFs.
pub fn parse_physical_expr(
proto: &protobuf::PhysicalExprNode,
-ctx: &TaskContext,
+decode_ctx: &DecodeContext,
input_schema: &Schema,
codec: &dyn PhysicalExtensionCodec,
) -> Result> {
+// Check cache first if an ID is present
+if let Some(id) = proto.id {
+if let Some(cached) = decode_ctx.get_cached_expr(id) {
+return Ok(cached);
+}
+}
+
Review Comment:
I was thinking that we set the callbacks when we deserialize in a custom
codec or something. We might have to add a function to the codec trait along
the lines of `visit_physical_expr(&self, expr: Arc) ->
Arc`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] Deduplicate PhysicalExpr on proto ser/de using Arc pointer addresses [datafusion]
gabotechs commented on code in PR #18192:
URL: https://github.com/apache/datafusion/pull/18192#discussion_r2446841662
##
datafusion/proto/src/physical_plan/from_proto.rs:
##
@@ -207,10 +212,19 @@ where
/// * `codec` - An extension codec used to decode custom UDFs.
pub fn parse_physical_expr(
proto: &protobuf::PhysicalExprNode,
-ctx: &TaskContext,
+decode_ctx: &DecodeContext,
input_schema: &Schema,
codec: &dyn PhysicalExtensionCodec,
) -> Result> {
+// Check cache first if an ID is present
+if let Some(id) = proto.id {
+if let Some(cached) = decode_ctx.get_cached_expr(id) {
+return Ok(cached);
+}
+}
+
Review Comment:
🤔 your idea for having callbacks in
https://github.com/apache/datafusion/pull/17370 could be a good alternative,
although I wonder how those can be set from the outside in an arbitrary
`Arc`.
If we had a way of getting all the dynamic filter expressions in a plan
(`fn(plan: &Arc) -> Vec<&DynamicPhysicalExpr>`) this would
indeed be a very nice approach.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] Deduplicate PhysicalExpr on proto ser/de using Arc pointer addresses [datafusion]
gabotechs commented on code in PR #18192:
URL: https://github.com/apache/datafusion/pull/18192#discussion_r2446849691
##
datafusion/proto/proto/datafusion.proto:
##
@@ -865,6 +865,10 @@ message PhysicalExprNode {
UnknownColumn unknown_column = 20;
}
+
+ // Optional ID for caching during deserialization.
+ // Set to the Arc pointer address during serialization to enable
deduplication.
Review Comment:
🤔 I'm not sure if this addresses the fact that serialization will most
likely happen on one machine, but deserialization would happen in a different
one.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] Deduplicate PhysicalExpr on proto ser/de using Arc pointer addresses [datafusion]
gabotechs commented on code in PR #18192:
URL: https://github.com/apache/datafusion/pull/18192#discussion_r2446787526
##
datafusion/proto/src/physical_plan/from_proto.rs:
##
@@ -207,10 +212,19 @@ where
/// * `codec` - An extension codec used to decode custom UDFs.
pub fn parse_physical_expr(
proto: &protobuf::PhysicalExprNode,
-ctx: &TaskContext,
+decode_ctx: &DecodeContext,
input_schema: &Schema,
codec: &dyn PhysicalExtensionCodec,
) -> Result> {
+// Check cache first if an ID is present
+if let Some(id) = proto.id {
+if let Some(cached) = decode_ctx.get_cached_expr(id) {
+return Ok(cached);
+}
+}
+
Review Comment:
This looks like a very elegant solution!
It does solve the problem at hand, but I wonder how could this be extended
for the case where the producer part of a dynamic filter is deserialized in one
machine, and the consumer part in deserialized in other different machine,
which is almost always going to be the case in a distributed context.
For example, the idea that powered
https://github.com/gabotechs/datafusion/pull/7, is that users can subscribe to
changes to the dynamic filters in the global registry, and send/produce updates
over the wire, something like:
```rust
let ctx = SessionContext::new();
let registry = ctx
.task_ctx()
.session_config()
.get_extension::();
registry.subscribe_to_updates();
registry.push_updates();
let plan: Arc; // <- plan with dynamic filters
execute_stream(plan, ctx.task_ctx());
```
I really would love to see something like this happening without recurring
to a "global place" for reading/writing updates to dynamic filters, but I
cannot come up with other ideas.
Do you think there's a chance we can do that with something simpler like
what this PR proposes?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] Deduplicate PhysicalExpr on proto ser/de using Arc pointer addresses [datafusion]
gabotechs commented on code in PR #18192:
URL: https://github.com/apache/datafusion/pull/18192#discussion_r2446787526
##
datafusion/proto/src/physical_plan/from_proto.rs:
##
@@ -207,10 +212,19 @@ where
/// * `codec` - An extension codec used to decode custom UDFs.
pub fn parse_physical_expr(
proto: &protobuf::PhysicalExprNode,
-ctx: &TaskContext,
+decode_ctx: &DecodeContext,
input_schema: &Schema,
codec: &dyn PhysicalExtensionCodec,
) -> Result> {
+// Check cache first if an ID is present
+if let Some(id) = proto.id {
+if let Some(cached) = decode_ctx.get_cached_expr(id) {
+return Ok(cached);
+}
+}
+
Review Comment:
This looks like a very elegant solution!
It does solve the problem at hand, but I wonder how could this be extended
for the case where the producer part of a dynamic filter is deserialized in one
machine, and the consumer part in deserialized in other different machine,
which is almost always going to be the case in a distributed context.
For example, the idea that powered
https://github.com/gabotechs/datafusion/pull/7, is that users can subscribe to
changes to the dynamic filters in the global registry, and send/produce updates
over the wire, something like:
```rust
let ctx = SessionContext::new();
let registry = ctx
.task_ctx()
.session_config()
.get_extension::();
registry.subscribe_to_updates();
registry.push_updates();
let plan: Arc;
execute_stream(plan, ctx.task_ctx());
```
I really would love to see something like this happening without recurring
to a "global place" for reading/writing updates to dynamic filters, but I
cannot come up with other ideas.
Do you think there's a chance we can do that with something simpler like
what this PR proposes?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] Deduplicate PhysicalExpr on proto ser/de using Arc pointer addresses [datafusion]
adriangb commented on code in PR #18192:
URL: https://github.com/apache/datafusion/pull/18192#discussion_r2446640086
##
datafusion/proto/src/physical_plan/mod.rs:
##
@@ -101,6 +102,42 @@ use datafusion_physical_plan::{ExecutionPlan,
InputOrderMode, PhysicalExpr, Wind
use prost::bytes::BufMut;
use prost::Message;
+/// Context for decoding physical expressions with caching support.
+///
+/// This struct wraps a `TaskContext` and maintains a cache of previously
deserialized
+/// physical expressions. The cache is keyed by the expression's ID (derived
from the
+/// Arc pointer during serialization), allowing duplicate expressions in a
plan to be
+/// deserialized only once.
+pub struct DecodeContext<'a> {
+task_context: &'a TaskContext,
+cache: Mutex>>,
+}
+
+impl<'a> DecodeContext<'a> {
+/// Create a new DecodeContext wrapping the given TaskContext.
+pub fn new(task_context: &'a TaskContext) -> Self {
+Self {
+task_context,
+cache: Mutex::new(HashMap::new()),
+}
+}
+
+/// Get the underlying TaskContext reference.
+pub fn task_context(&self) -> &'a TaskContext {
+self.task_context
+}
+
+/// Attempt to retrieve a cached physical expression by its ID.
+pub fn get_cached_expr(&self, id: u64) -> Option> {
+self.cache.lock().unwrap().get(&id).cloned()
+}
+
+/// Insert a physical expression into the cache with the given ID.
+pub fn insert_cached_expr(&self, id: u64, expr: Arc) {
+self.cache.lock().unwrap().insert(id, expr);
+}
Review Comment:
This does assume there's no other reader/writer so it's safe to check
`get()` then call `insert()` later and it's not possible that someone else
inserted before us.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
