Re: [PR] Deduplicate PhysicalExpr on proto ser/de using Arc pointer addresses [datafusion]

2026-01-11 Thread via GitHub


adriangb commented on PR #18192:
URL: https://github.com/apache/datafusion/pull/18192#issuecomment-3735720918

   Wondering if anyone who was interested in this PR would be willing to 
continue work on or donate review time to 
https://github.com/apache/datafusion/pull/19437? Tim has done the bulk of the 
work we just need to get it across the line.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Deduplicate PhysicalExpr on proto ser/de using Arc pointer addresses [datafusion]

2026-01-09 Thread via GitHub


github-actions[bot] closed pull request #18192: Deduplicate PhysicalExpr on 
proto ser/de using Arc pointer addresses
URL: https://github.com/apache/datafusion/pull/18192


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Deduplicate PhysicalExpr on proto ser/de using Arc pointer addresses [datafusion]

2026-01-02 Thread via GitHub


github-actions[bot] commented on PR #18192:
URL: https://github.com/apache/datafusion/pull/18192#issuecomment-3706604758

   Thank you for your contribution. Unfortunately, this pull request is stale 
because it has been open 60 days with no activity. Please remove the stale 
label or comment or this will be closed in 7 days.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Deduplicate PhysicalExpr on proto ser/de using Arc pointer addresses [datafusion]

2025-11-03 Thread via GitHub


adriangb commented on PR #18192:
URL: https://github.com/apache/datafusion/pull/18192#issuecomment-3482747583

   I proposed a tangential discussion in 
https://github.com/apache/datafusion/issues/18477 about the big picture of 
proto serde.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Deduplicate PhysicalExpr on proto ser/de using Arc pointer addresses [datafusion]

2025-10-27 Thread via GitHub


gabotechs commented on PR #18192:
URL: https://github.com/apache/datafusion/pull/18192#issuecomment-3450919514

   I think it might be worth to have an end-to-end plan for bringing dynamic 
filters into distributed contexts before continuing this discussion, as having 
the full picture on the table can help us better inform decisions. Created a 
ticket for bringing discussions there:
   
   https://github.com/apache/datafusion/issues/18296


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Deduplicate PhysicalExpr on proto ser/de using Arc pointer addresses [datafusion]

2025-10-25 Thread via GitHub


milenkovicm commented on PR #18192:
URL: https://github.com/apache/datafusion/pull/18192#issuecomment-3447827774

   > It depends on how you do the math (I did `(5.7-4)/4`) but that's a detail.
   
   lol, Im not sure you can redefine mats, there are strict rules around it 😂
   
   
   > IMO "control plane" vs "data plane" can get a bit blurry, eg in the case 
of `InList` it's possible (and I think common) to have relatively large amounts 
of data in the "control plane".
   > 
   > Besides: all of this is to further enable an optimization (dynamic 
filters) that can [make queries 25x 
faster](https://datafusion.apache.org/blog/2025/09/10/dynamic-filters/). We are 
in fact discussing using `InList` to push down join hash tables so large 
`InList` expressions are an excellent example. So what I'm saying is "let's 
make the data plane 25x faster by at the same time making the control plane use 
30% less memory, that requires some breaking API changes, we should figure out 
what those are". I think that's a pretty compelling story.
   
   I'm not sure why this argument is valid in this context 😕 I had sad nothing 
related to dynamic filters.
   
   > DataFusion makes plenty of breaking API changes, I don't even think this 
is that egregious of one. 
   
   That's perfectly fine but let's have some kind of high bar when we do that. 
   
   > Is it API changes in this part of code in general that you're opposed to, 
or mainly the footgun of re-using a cache / context causing collisions? I'm 
sure the later can be addressed in some way.
   
   Saving a 1.7MB on a executor which use 8GB does not make huge difference, 
that like 0.02%, yet for that you have introduced a possibility to shoot your 
foot and made interface more complex
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Deduplicate PhysicalExpr on proto ser/de using Arc pointer addresses [datafusion]

2025-10-25 Thread via GitHub


adriangb commented on PR #18192:
URL: https://github.com/apache/datafusion/pull/18192#issuecomment-3447787020

   It depends on how you do the math (I did `(5.7-4)/4`) but that's a detail.
   
   IMO "control plane" vs "data plane" can get a bit blurry, eg in the case of 
`InList` it's possible (and I think common) to have relatively large amounts of 
data in the "control plane".
   
   Besides: all of this is to further enable an optimization (dynamic filters) 
that can [make queries 25x 
faster](https://datafusion.apache.org/blog/2025/09/10/dynamic-filters/). We are 
in fact discussing using `InList` to push down join hash tables so large 
`InList` expressions are an excellent example. So what I'm saying is "let's 
make the data plane 25x faster by at the same time making the control plane use 
30% less memory, that requires some breaking API changes, we should figure out 
what those are".
   
   DataFusion makes plenty of breaking API changes, I don't even think this is 
that egregious of one.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Deduplicate PhysicalExpr on proto ser/de using Arc pointer addresses [datafusion]

2025-10-25 Thread via GitHub


milenkovicm commented on PR #18192:
URL: https://github.com/apache/datafusion/pull/18192#issuecomment-3447770818

   5.7 to 4 is around 30% saved, if I'm not mistaken 😀
   
   This optimisation is on the "control plane" not on the "data plane" (30% on 
data plane would make huge difference, we would not have this discussion in 
that case). IMHO, small improvements on the "control plane" does not justify 
additional  moving part or increase of interface/protocol complexity.  If we 
put current limitations in the API description we have made it (API) more 
complex, and provided an avenue to introduce bugs as someone did not read 
documentation
   
   I have never seen decoding having any significant impact, most tasks will 
take quite more time crunching data,  compared to few microseconds saved in 
decoding (on top of that data has already been moved over the network).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Deduplicate PhysicalExpr on proto ser/de using Arc pointer addresses [datafusion]

2025-10-25 Thread via GitHub


adriangb commented on PR #18192:
URL: https://github.com/apache/datafusion/pull/18192#issuecomment-3447736026

   > Most users do not have this problem to start with.
   
   I'd argue that most users _do_ have this problem. Consider a query like:
   
   ```sql
   SELECT *
   FROM 'file.parquet'
   WHERE id IN (1, 2, 3, 4, 5...);
   ```
   
   This PR improves memory usage for this query by avoiding duplicating the 
`InList` expression in a `FilterExec` and `ParquetSource` when deserializing. 
Here are flame graphs from `main` and `dedupe-expr` respectively:
   
   https://github.com/user-attachments/assets/a596f941-18e3-464e-b8a9-f3732351fceb";
 />
   
   https://github.com/user-attachments/assets/c63d1608-1a7d-467c-b325-d5fe05c1319d";
 />
   
   
   Raw data:
   [pprof.zip](https://github.com/user-attachments/files/23144387/pprof.zip)
   
   I generated this by adding the following example to `datafusion-examples`:
   
   ```rust
   use datafusion::{common::Result, prelude::*};
   use datafusion_proto::bytes::{physical_plan_from_bytes, 
physical_plan_to_bytes};
   use parquet::{arrow::ArrowWriter, file::properties::WriterProperties};
   
   
   #[cfg(not(target_env = "msvc"))]
   #[global_allocator]
   static ALLOC: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc;
   
   #[allow(non_upper_case_globals)]
   #[export_name = "malloc_conf"]
   pub static malloc_conf: &[u8] = 
b"prof:true,prof_active:true,lg_prof_sample:19\0";
   
   
   #[tokio::main]
   async fn main() -> Result<()> {
   let mut prof_ctl = 
jemalloc_pprof::PROF_CTL.as_ref().unwrap().lock().await;
   prof_ctl.activate().unwrap();
   
   let _plan = {
   let ctx = SessionContext::new();
   let batches = ctx.sql("SELECT c FROM generate_series(1, 100) 
t(c)").await?.collect().await?;
   let file = std::fs::File::create("test.parquet")?;
   let props = WriterProperties::builder()
   // limit batch sizes so that we have useful statistics
   .set_max_row_group_size(4096)
   .build();
   let mut writer = ArrowWriter::try_new(file, batches[0].schema(), 
Some(props))?;
   for batch in &batches {
   writer.write(batch)?;
   }
   writer.close()?;
   
   let mut df = ctx.read_parquet("test.parquet", 
ParquetReadOptions::default()).await?;
   df = df.filter(col("c").in_list((1_000..10_000).map(|v| 
lit(v)).collect(), false))?;
   let plan = df.create_physical_plan().await?;
   physical_plan_from_bytes(&physical_plan_to_bytes(plan)?, 
&ctx.task_ctx())?
   };
   
   let pprof = prof_ctl.dump_pprof().unwrap();
   std::fs::write("proto_memory.pprof", pprof).unwrap();
   
   Ok(())
   }
   ```
   
   Full diff:
   [diff.txt](https://github.com/user-attachments/files/23144394/diff.txt)
   
   
   It looks like this was able to deduplicate the `InList` expression, taking 
memory use after deserializing from 5.7MB to 4MB. Not every plan is going to 
have a 40% memory savings, but I think many plans will have some small amount 
of memory saving, some will have even larger than 40%. This is also not 
accounting for the CPU cycles saved by not deserializing the same thing 
multiple times.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Deduplicate PhysicalExpr on proto ser/de using Arc pointer addresses [datafusion]

2025-10-25 Thread via GitHub


adriangb commented on PR #18192:
URL: https://github.com/apache/datafusion/pull/18192#issuecomment-3447731663

   Yeah that makes sense! I'm not sure how to encode that into the APIs, but 
making it very explicit in the docs, etc. should be enough?
   
   One thing I'm thinking is if there's a way to satisfy all of the input here 
by making a new high level API.
   An annoyance I've had with the Codec system is that it's not clear how to 
inject extra behavior, it only functions as a fallback.
   
   Something along the lines of:
   
   ```rust
   pub trait Decoder {
  fn decode_plan(&self, plan: PhysicalPlanNode) -> Result>;
  fn decode_expression(&self, expression: PhysicalExprNode) -> 
Result>;
   }
   
   pub struct DefaultDecoder {
 ctx: TaskContext,
   }
   
   impl DefaultDecoder {
 pub fn new(ctx: TaskContext) -> Self {
   Self { ctx }
 }
   
   impl Decoder for DefaultDecoder {
  fn decode_plan(decoder: & dyn Decoder, plan: PhysicalPlanNode) -> 
Result> {
 // Essentially the code inside of 
`PhysicalPlanNode::try_from_physical_plan`
// but passing around a reference to ourselves as `&dyn Decoder` so 
that e.g. if we have to decode
// predicates inside of a plan it calls back into `decode_expression`
// Maybe delegates to 
  }
   
  fn decode_expression(decoder: &dyn Decoder expression: ExpressionNode, 
input_schema: &Schema) -> Result> {
 // essentially the code inside of `parse_physical_expr` but again 
passing around a reference to ourselves
  }
   }
   ```
   
   Then it's easy to make a custom `DefaultDecoder` that injects before/after 
behavior, e.g. caching or re-attaching custom bits to plans. I'd imagine we get 
rid of the `Codec` bit and have defaults just error if you don't match and 
handle extension/custom types yourself.
   
   Anyway that's a half baked idea and that discussion may be a blocker for 
this PR but I think it is largely unrelated to "is the deduplication worth 
doing by default", I'll address that in my next comment.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Deduplicate PhysicalExpr on proto ser/de using Arc pointer addresses [datafusion]

2025-10-25 Thread via GitHub


milenkovicm commented on PR #18192:
URL: https://github.com/apache/datafusion/pull/18192#issuecomment-3447579397

   > > if the TaskContext and DecodeContext are reused to decode protos from 
multiple clients or even from one client sending plan after plan, is there a 
chance of id collision?
   > 
   > Yes, my intention was that you create a new `DecodeContext` per plan that 
you decode.
   
   then public methods should consume (and invalidate) `DecodeContext` instead 
of passing is as a reference


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Deduplicate PhysicalExpr on proto ser/de using Arc pointer addresses [datafusion]

2025-10-25 Thread via GitHub


adriangb commented on PR #18192:
URL: https://github.com/apache/datafusion/pull/18192#issuecomment-3447067332

   > if the TaskContext and DecodeContext are reused to decode protos from 
multiple clients or even from one client sending plan after plan, is there a 
chance of id collision?
   
   Yes, my intention was that you create a new `DecodeContext` per plan that 
you decode.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Deduplicate PhysicalExpr on proto ser/de using Arc pointer addresses [datafusion]

2025-10-25 Thread via GitHub


milenkovicm commented on code in PR #18192:
URL: https://github.com/apache/datafusion/pull/18192#discussion_r2462959883


##
datafusion/proto/src/bytes/mod.rs:
##
@@ -313,7 +313,8 @@ pub fn physical_plan_from_json(
 let back: protobuf::PhysicalPlanNode = serde_json::from_str(json)
 .map_err(|e| plan_datafusion_err!("Error serializing plan: {e}"))?;
 let extension_codec = DefaultPhysicalExtensionCodec {};
-back.try_into_physical_plan(&ctx, &extension_codec)
+let decode_ctx = DecodeContext::new(ctx);

Review Comment:
   this comet is added for consistency with other public methods expecting 
DecodeCtx but I'm not sure we should expose &DecodeContext in public methods, 
explanation in following comment



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Deduplicate PhysicalExpr on proto ser/de using Arc pointer addresses [datafusion]

2025-10-25 Thread via GitHub


milenkovicm commented on PR #18192:
URL: https://github.com/apache/datafusion/pull/18192#issuecomment-3446916555

   One simple question: if the `TaskContext` and `DecodeContext` are reused to 
decode `protos` from multiple clients or even from one client sending plan 
after plan, is there a chance of id collision? 
   
   I believe the current implementation does not prevent the reuse of 
`DecodeContext` for two different plans.
   
   ```rust
   let task_ctx = ctx.task_ctx();
   let decode_ctx = DecodeContext::new(&task_ctx);
   let result_exec_plan: Arc = proto
   .try_into_physical_plan(&decode_ctx, codec)
   .expect("from proto");
   ```
   
   Two subsequent encoded plans coming to the same `DecodeContext`  (one after 
the other, even from a same client) may have identical IDs for different 
expressions (we were unlucky to have one arc doped and another arc created at 
the same location for two different queries coming one after the other. I know 
we have to be very, very unlucky, but we have no guarantee it wont). 
   Hence, we need to consume `DecodeContext` after decoding to prevent its 
re-use for the next decode in all public interfaces, preventing its re-use. 
   
   I can't really claim but current approach use of arc address may be safe for 
referencing expressions within single plan.
   
   If `DecodeContext` can't be re-used, how many times cache will be hit, and 
would it really save that much resources to add additional moving part, and 
changing public interface again?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Deduplicate PhysicalExpr on proto ser/de using Arc pointer addresses [datafusion]

2025-10-25 Thread via GitHub


milenkovicm commented on code in PR #18192:
URL: https://github.com/apache/datafusion/pull/18192#discussion_r2462956314


##
datafusion/proto/src/bytes/mod.rs:
##
@@ -333,5 +334,6 @@ pub fn physical_plan_from_bytes_with_extension_codec(
 ) -> Result> {
 let protobuf = protobuf::PhysicalPlanNode::decode(bytes)
 .map_err(|e| plan_datafusion_err!("Error decoding expr as protobuf: 
{e}"))?;
-protobuf.try_into_physical_plan(ctx, extension_codec)
+let decode_ctx = DecodeContext::new(ctx);

Review Comment:
   this comet is added for consistency with other public methods expecting 
`DecodeCtx` but I'm not sure we should expose &DecodeContext in public methods, 
explanation in following comment 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Deduplicate PhysicalExpr on proto ser/de using Arc pointer addresses [datafusion]

2025-10-25 Thread via GitHub


milenkovicm commented on code in PR #18192:
URL: https://github.com/apache/datafusion/pull/18192#discussion_r2462926295


##
datafusion/proto/src/bytes/mod.rs:
##
@@ -313,7 +313,8 @@ pub fn physical_plan_from_json(
 let back: protobuf::PhysicalPlanNode = serde_json::from_str(json)
 .map_err(|e| plan_datafusion_err!("Error serializing plan: {e}"))?;
 let extension_codec = DefaultPhysicalExtensionCodec {};
-back.try_into_physical_plan(&ctx, &extension_codec)
+let decode_ctx = DecodeContext::new(ctx);

Review Comment:
   should decode_ctx be method parameter rather than created here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Deduplicate PhysicalExpr on proto ser/de using Arc pointer addresses [datafusion]

2025-10-25 Thread via GitHub


milenkovicm commented on code in PR #18192:
URL: https://github.com/apache/datafusion/pull/18192#discussion_r2462926232


##
datafusion/proto/src/bytes/mod.rs:
##
@@ -333,5 +334,6 @@ pub fn physical_plan_from_bytes_with_extension_codec(
 ) -> Result> {
 let protobuf = protobuf::PhysicalPlanNode::decode(bytes)
 .map_err(|e| plan_datafusion_err!("Error decoding expr as protobuf: 
{e}"))?;
-protobuf.try_into_physical_plan(ctx, extension_codec)
+let decode_ctx = DecodeContext::new(ctx);

Review Comment:
   should decode_ctx be method parameter rather than created here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Deduplicate PhysicalExpr on proto ser/de using Arc pointer addresses [datafusion]

2025-10-25 Thread via GitHub


milenkovicm commented on PR #18192:
URL: https://github.com/apache/datafusion/pull/18192#issuecomment-3446701092

   > > If we want to proceed in this direction, making cache disabled by 
default would make sense, as the benefits of having it on are not really 
obvious and very use case specific. 
   > 
   > Isn't there a benefit for all users of reducing blowup of duplicate 
expressions? If duplicate expressions aren't a problem we wouldn't be Arcing 
them in the first place.
   > 
   > The cost is miniscule: a hashmap of integers and pointers.
   
   Most users do not have this problem to start with. 
   
   It's not issue with performance overhead issue is with user generated ids, 
and subtitle bugs it can bring. 
   
   Also I believe it's trivial to have two implementation one which will cache  
other which won't and change it as needed 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Deduplicate PhysicalExpr on proto ser/de using Arc pointer addresses [datafusion]

2025-10-25 Thread via GitHub


timsaucer commented on PR #18192:
URL: https://github.com/apache/datafusion/pull/18192#issuecomment-3446693442

   > > I'm not clear on what the pros/cons of `&TaskContext` vs. `&dyn 
FunctionRegistry` are. I fear that some folks doing distributed (cc @gabotechs) 
want "more" there not less while other folks doing FFI (I'm guessing that's 
your use case right @timsaucer?) want "less", those two things seem to be 
opposed with each other. I guess as long as downcast matching is possible it 
may be okay to have the `&dyn FunctionRegistry` in the API and have 
implementations that want a `&TaskContext` do some downcasting? Either way that 
seems like a bigger discussion to have on your PR / proposal / issue.
   > > > +1 on the impl Into> on the signatures as I think 
that will make for a much more ergonomic experience
   > > 
   > > 
   > > Hmm the only reason I see to do that is for backwards compatibility. The 
complexity and opacity introduced is not worth it otherwise IMO.
   > 
   > I can chime in here, `TaskContext` replaced `SessionContext` in #17601 , 
`SessionContext` was needed to provide `RuntimeEnv`
   
   This is good to know. Then either as this PR or as a follow on, it would be 
good to move the current `&dfn FunctionRegistry` on the logical side over to 
`&TaskContext` or `DecodeContext`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Deduplicate PhysicalExpr on proto ser/de using Arc pointer addresses [datafusion]

2025-10-25 Thread via GitHub


adriangb commented on PR #18192:
URL: https://github.com/apache/datafusion/pull/18192#issuecomment-3446687312

   > If we want to proceed in this direction, making cache disabled by default 
would make sense, as the benefits of having it on are not really obvious and 
very use case specific. 
   
   Isn't there a benefit for all users of reducing blowup of duplicate 
expressions? If duplicate expressions aren't a problem we wouldn't be Arcing 
them in the first place.
   
   The cost is miniscule: a hashmap of integers and pointers.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Deduplicate PhysicalExpr on proto ser/de using Arc pointer addresses [datafusion]

2025-10-24 Thread via GitHub


milenkovicm commented on PR #18192:
URL: https://github.com/apache/datafusion/pull/18192#issuecomment-3445960495

   > > I'm not clear on what the pros/cons of &TaskContext vs. &dyn 
FunctionRegistry are. I fear that some folks doing distributed (cc @gabotechs) 
want "more"
   > 
   > Note that having a `DecodeContext` or a `impl Into>` for 
tracking ids of expressions derived from their pointer addresses still leaves 
part of the challenge unsolved, as we'll still not be able use that for 
communicating dynamic filter updates over the wire in a distributed context as 
is.
   > 
   > It might be worth to at least have a plan on how to do that end to end 
before committing to introducing an API change that might need to get revisited 
for having a full solution.
   
   If we're talking about adding `impl Into>`  I believe it 
makes sense to add `&dyn DecodeContext` better, with non caching implementation 
as the default. 
   At the moment caching implementation **may have** benefits in very specific 
case, so for generic case having cache disabled does look like the best 
approach. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Deduplicate PhysicalExpr on proto ser/de using Arc pointer addresses [datafusion]

2025-10-24 Thread via GitHub


milenkovicm commented on PR #18192:
URL: https://github.com/apache/datafusion/pull/18192#issuecomment-3445949938

   I wonder in which cases decoding protobuf is a bottleneck? Do you have some 
flame graphs to show, or this might be theoretical bottleneck? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Deduplicate PhysicalExpr on proto ser/de using Arc pointer addresses [datafusion]

2025-10-24 Thread via GitHub


milenkovicm commented on PR #18192:
URL: https://github.com/apache/datafusion/pull/18192#issuecomment-3445943092

   > I'm not clear on what the pros/cons of `&TaskContext` vs. `&dyn 
FunctionRegistry` are. I fear that some folks doing distributed (cc @gabotechs) 
want "more" there not less while other folks doing FFI (I'm guessing that's 
your use case right @timsaucer?) want "less", those two things seem to be 
opposed with each other. I guess as long as downcast matching is possible it 
may be okay to have the `&dyn FunctionRegistry` in the API and have 
implementations that want a `&TaskContext` do some downcasting? Either way that 
seems like a bigger discussion to have on your PR / proposal / issue.
   > 
   > > +1 on the impl Into> on the signatures as I think that 
will make for a much more ergonomic experience
   > 
   > Hmm the only reason I see to do that is for backwards compatibility. The 
complexity and opacity introduced is not worth it otherwise IMO.
   
   I can chime in here, `TaskContext` replaced `SessionContext` in 
https://github.com/apache/datafusion/pull/17601 , `SessionContext` was needed 
to provide `RuntimeEnv` 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Deduplicate PhysicalExpr on proto ser/de using Arc pointer addresses [datafusion]

2025-10-24 Thread via GitHub


gabotechs commented on PR #18192:
URL: https://github.com/apache/datafusion/pull/18192#issuecomment-3445859583

   > I'm not clear on what the pros/cons of &TaskContext vs. &dyn 
FunctionRegistry are. I fear that some folks doing distributed (cc @gabotechs) 
want "more"
   
   Note that having a `DecodeContext` or a `impl Into>` for 
tracking ids of expressions derived from their pointer addresses still leaves 
part of the challenge unsolved, as we'll still not be able use that for 
communicating dynamic filter updates over the wire in a distributed context as 
is.
   
   It might be worth to at least have a plan on how to do that end to end 
before committing to introducing an API change that might need to get revisited 
for having a full solution.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Deduplicate PhysicalExpr on proto ser/de using Arc pointer addresses [datafusion]

2025-10-24 Thread via GitHub


Jefffrey commented on PR #18192:
URL: https://github.com/apache/datafusion/pull/18192#issuecomment-3445542589

   > Thanks for the ping. One thing I was planning to do this weekend was to 
write up a PR to move from `TaskContext` to `dyn FunctionRegistry`, which 
`TaskContext` implements. I believe that the function registry is the only 
portion of the `TaskContext` we need. I believe this would simplify the code as 
we currently have two paths, some like `parse_physical_exprs` that take 
`&TaskContext` and some like `parse_expr` that take `&dyn FunctionRegistry`.
   > 
   > I'm happy to put that PR in, but since you're digging into this bit of the 
code maybe we can include it?
   > 
   > Also +1 on the `impl Into>` on the signatures as I think 
that will make for a much more ergonomic experience.
   > 
   > I think the core idea is a good one.
   
   In regards to `parse_expr`, I actually have a PR that changes it to accept 
`TaskContext` in order to support subqueries, see #18167


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Deduplicate PhysicalExpr on proto ser/de using Arc pointer addresses [datafusion]

2025-10-24 Thread via GitHub


adriangb commented on PR #18192:
URL: https://github.com/apache/datafusion/pull/18192#issuecomment-3445155246

   I'm not clear on what the pros/cons of `&TaskContext` vs. `&dyn 
FunctionRegistry` are. I fear that some folks doing distributed (cc @gabotechs) 
want "more" there not less while other folks doing FFI (I'm guessing that's 
your use case right @timsaucer?) want "less", those two things seem to be 
opposed with each other. I guess as long as downcast matching is possible it 
may be okay to have the `&dyn FunctionRegistry` in the API and have 
implementations that want a `&TaskContext` do some downcasting? Either way that 
seems like a bigger discussion to have on your PR / proposal / issue.
   
   > +1 on the impl Into> on the signatures as I think that 
will make for a much more ergonomic experience
   
   Hmm the only reason I see to do that is for backwards compatibility. The 
complexity and opacity introduced is not worth it otherwise IMO.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Deduplicate PhysicalExpr on proto ser/de using Arc pointer addresses [datafusion]

2025-10-24 Thread via GitHub


timsaucer commented on PR #18192:
URL: https://github.com/apache/datafusion/pull/18192#issuecomment-3444863440

   Thanks for the ping. One thing I was planning to do this weekend was to 
write up a PR to move from `TaskContext` to `dyn FunctionRegistry`, which 
`TaskContext` implements. I believe that the function registry is the only 
portion of the `TaskContext` we need. I believe this would simplify the code as 
we currently have two paths, some like `parse_physical_exprs` that take 
`&TaskContext` and some like `parse_expr` that take `&dyn FunctionRegistry`.
   
   I'm happy to put that PR in, but since you're digging into this bit of the 
code maybe we can include it?
   
   Also +1 on the `impl Into>` on the signatures as I think 
that will make for a much more ergonomic experience.
   
   I think the core idea is a good one.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Deduplicate PhysicalExpr on proto ser/de using Arc pointer addresses [datafusion]

2025-10-23 Thread via GitHub


adriangb commented on code in PR #18192:
URL: https://github.com/apache/datafusion/pull/18192#discussion_r2455206114


##
datafusion/proto/src/physical_plan/from_proto.rs:
##
@@ -207,10 +212,19 @@ where
 /// * `codec` - An extension codec used to decode custom UDFs.
 pub fn parse_physical_expr(
 proto: &protobuf::PhysicalExprNode,
-ctx: &TaskContext,
+decode_ctx: &DecodeContext,
 input_schema: &Schema,
 codec: &dyn PhysicalExtensionCodec,
 ) -> Result> {
+// Check cache first if an ID is present
+if let Some(id) = proto.id {
+if let Some(cached) = decode_ctx.get_cached_expr(id) {
+return Ok(cached);
+}
+}
+

Review Comment:
   I think we may just need to go in that direction. Since it wouldn't be used 
for serialization it's okay if it's not implemented everywhere. As long as we 
implement it on the key nodes we know have dynamic filters it should work.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Deduplicate PhysicalExpr on proto ser/de using Arc pointer addresses [datafusion]

2025-10-22 Thread via GitHub


Jefffrey commented on PR #18192:
URL: https://github.com/apache/datafusion/pull/18192#issuecomment-3434801204

   > > > I think this makes sense to me. Main concern (other than the pointer 
bits) is the introduction of `DecodeContext`; I guess it wasn't easily possible 
to do via codec (I think you mentioned this already)?
   > > 
   > > 
   > > I don't see a better way to add the mutable context necessary for this 
to work. One olive branch I can extend to ease backwards compatibility is to 
`impl From<&'a TaskContext> for DecodeContext<'a>` and make `DecodeContext<'a>` 
`Clone` and then change the public signature of the functions that now require 
a `DecodeContext` to accept `impl Into>`. But that is quite a 
bit of added complexity, personally I don't think it's worth it but I can 
implement that if reviewers feel it is required.
   > 
   > Something like this: https://github.com/pydantic/datafusion/pull/41/files
   
   Hmm yeah I don't this is is any better, since we might as well go all the 
way instead of a halfway solution 😅 
   
   I'll cc @timsaucer too as they also changed the signatures recently for 
proto physical plan in #18123


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Deduplicate PhysicalExpr on proto ser/de using Arc pointer addresses [datafusion]

2025-10-22 Thread via GitHub


adriangb commented on PR #18192:
URL: https://github.com/apache/datafusion/pull/18192#issuecomment-3433253071

   > > I think this makes sense to me. Main concern (other than the pointer 
bits) is the introduction of `DecodeContext`; I guess it wasn't easily possible 
to do via codec (I think you mentioned this already)?
   > 
   > I don't see a better way to add the mutable context necessary for this to 
work. One olive branch I can extend to ease backwards compatibility is to `impl 
From<&'a TaskContext> for DecodeContext<'a>` and make `DecodeContext<'a>` 
`Clone` and then change the public signature of the functions that now require 
a `DecodeContext` to accept `impl Into>`. But that is quite a 
bit of added complexity, personally I don't think it's worth it but I can 
implement that if reviewers feel it is required.
   
   Something like this: https://github.com/pydantic/datafusion/pull/41/files


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Deduplicate PhysicalExpr on proto ser/de using Arc pointer addresses [datafusion]

2025-10-22 Thread via GitHub


adriangb commented on PR #18192:
URL: https://github.com/apache/datafusion/pull/18192#issuecomment-3433199893

   > I think this makes sense to me. Main concern (other than the pointer bits) 
is the introduction of `DecodeContext`; I guess it wasn't easily possible to do 
via codec (I think you mentioned this already)?
   
   I don't see a better way to add the mutable context necessary for this to 
work.
   One olive branch I can extend to ease backwards compatibility is to `impl 
From<&'a TaskContext> for DecodeContext<'a>` and make `DecodeContext<'a>` 
`Clone` and then change the public signature of the functions that now require 
a `DecodeContext` to accept `impl Into>`. But that is quite a 
bit of added complexity, personally I don't think it's worth it but I can 
implement that if reviewers feel it is required.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Deduplicate PhysicalExpr on proto ser/de using Arc pointer addresses [datafusion]

2025-10-21 Thread via GitHub


adriangb commented on code in PR #18192:
URL: https://github.com/apache/datafusion/pull/18192#discussion_r2450148410


##
datafusion/proto/proto/datafusion.proto:
##
@@ -865,6 +865,10 @@ message PhysicalExprNode {
 
 UnknownColumn unknown_column = 20;
   }
+
+  // Optional ID for caching during deserialization.
+  // Set to the Arc pointer address during serialization to enable 
deduplication.

Review Comment:
   Very nice suggestion, I'll commit it tomorrow :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Deduplicate PhysicalExpr on proto ser/de using Arc pointer addresses [datafusion]

2025-10-21 Thread via GitHub


Jefffrey commented on code in PR #18192:
URL: https://github.com/apache/datafusion/pull/18192#discussion_r2450132472


##
datafusion/proto/proto/datafusion.proto:
##
@@ -865,6 +865,10 @@ message PhysicalExprNode {
 
 UnknownColumn unknown_column = 20;
   }
+
+  // Optional ID for caching during deserialization.
+  // Set to the Arc pointer address during serialization to enable 
deduplication.

Review Comment:
   Maybe we can change the documentation to something like:
   
   ```
 // Optional ID for caching during deserialization. This is used for 
deduplication,
 // so PhysicalExprs with the same ID will be deserialized as Arcs pointing 
to the
 // same address (instead of distinct addresses) on the deserializing 
machine.
 //
 // We use the Arc pointer address during serialization as the ID, as this 
by default
 // indicates if a PhysicalExpr is identical to another on the serializing 
machine.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Deduplicate PhysicalExpr on proto ser/de using Arc pointer addresses [datafusion]

2025-10-21 Thread via GitHub


gabotechs commented on code in PR #18192:
URL: https://github.com/apache/datafusion/pull/18192#discussion_r2448962539


##
datafusion/proto/src/physical_plan/from_proto.rs:
##
@@ -207,10 +212,19 @@ where
 /// * `codec` - An extension codec used to decode custom UDFs.
 pub fn parse_physical_expr(
 proto: &protobuf::PhysicalExprNode,
-ctx: &TaskContext,
+decode_ctx: &DecodeContext,
 input_schema: &Schema,
 codec: &dyn PhysicalExtensionCodec,
 ) -> Result> {
+// Check cache first if an ID is present
+if let Some(id) = proto.id {
+if let Some(cached) = decode_ctx.get_cached_expr(id) {
+return Ok(cached);
+}
+}
+

Review Comment:
   I also played with the option of adding an `expressions(&self) -> Vec<&dyn 
PhysicalExpr>` or something similar to the `ExecutionPlan` trait, like the 
`children()` method, but it gets a bit messy as the relationship between 
expressions and an `ExecutionPlan` is a bit different.
   
   It was not too bad though, maybe that approach could be revisited.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Deduplicate PhysicalExpr on proto ser/de using Arc pointer addresses [datafusion]

2025-10-21 Thread via GitHub


gabotechs commented on code in PR #18192:
URL: https://github.com/apache/datafusion/pull/18192#discussion_r2448962539


##
datafusion/proto/src/physical_plan/from_proto.rs:
##
@@ -207,10 +212,19 @@ where
 /// * `codec` - An extension codec used to decode custom UDFs.
 pub fn parse_physical_expr(
 proto: &protobuf::PhysicalExprNode,
-ctx: &TaskContext,
+decode_ctx: &DecodeContext,
 input_schema: &Schema,
 codec: &dyn PhysicalExtensionCodec,
 ) -> Result> {
+// Check cache first if an ID is present
+if let Some(id) = proto.id {
+if let Some(cached) = decode_ctx.get_cached_expr(id) {
+return Ok(cached);
+}
+}
+

Review Comment:
   I also played with the option of adding an ‘expressions(&self) -> Vec<&dyn 
PhysicalExpr>’ or something similar to the ‘ExecutionPlan’ trait, like the 
‘children’ method, but it gets a bit messy as the relationship between 
expressions and an ‘ExecutionPlan’ is a bit different.
   
   It was not too bad though, maybe that approach could be revisited.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Deduplicate PhysicalExpr on proto ser/de using Arc pointer addresses [datafusion]

2025-10-21 Thread via GitHub


adriangb commented on PR #18192:
URL: https://github.com/apache/datafusion/pull/18192#issuecomment-3427251878

   @Jefffrey any chance you could give some input on this change?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Deduplicate PhysicalExpr on proto ser/de using Arc pointer addresses [datafusion]

2025-10-21 Thread via GitHub


adriangb commented on code in PR #18192:
URL: https://github.com/apache/datafusion/pull/18192#discussion_r2448731465


##
datafusion/proto/src/physical_plan/from_proto.rs:
##
@@ -207,10 +212,19 @@ where
 /// * `codec` - An extension codec used to decode custom UDFs.
 pub fn parse_physical_expr(
 proto: &protobuf::PhysicalExprNode,
-ctx: &TaskContext,
+decode_ctx: &DecodeContext,
 input_schema: &Schema,
 codec: &dyn PhysicalExtensionCodec,
 ) -> Result> {
+// Check cache first if an ID is present
+if let Some(id) = proto.id {
+if let Some(cached) = decode_ctx.get_cached_expr(id) {
+return Ok(cached);
+}
+}
+

Review Comment:
   Ah I see your point now. So we still need some way to get a reference to all 
`Arc` outside of serialization 🤔 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Deduplicate PhysicalExpr on proto ser/de using Arc pointer addresses [datafusion]

2025-10-21 Thread via GitHub


gabotechs commented on code in PR #18192:
URL: https://github.com/apache/datafusion/pull/18192#discussion_r2448506742


##
datafusion/proto/src/physical_plan/from_proto.rs:
##
@@ -207,10 +212,19 @@ where
 /// * `codec` - An extension codec used to decode custom UDFs.
 pub fn parse_physical_expr(
 proto: &protobuf::PhysicalExprNode,
-ctx: &TaskContext,
+decode_ctx: &DecodeContext,
 input_schema: &Schema,
 codec: &dyn PhysicalExtensionCodec,
 ) -> Result> {
+// Check cache first if an ID is present
+if let Some(id) = proto.id {
+if let Some(cached) = decode_ctx.get_cached_expr(id) {
+return Ok(cached);
+}
+}
+

Review Comment:
   But it will also not get serialized at all, any dynamic filter present above 
the first network boundary (reading from top to bottom) will never suffer any 
serialization or deserialization



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Deduplicate PhysicalExpr on proto ser/de using Arc pointer addresses [datafusion]

2025-10-21 Thread via GitHub


gabotechs commented on code in PR #18192:
URL: https://github.com/apache/datafusion/pull/18192#discussion_r2448506742


##
datafusion/proto/src/physical_plan/from_proto.rs:
##
@@ -207,10 +212,19 @@ where
 /// * `codec` - An extension codec used to decode custom UDFs.
 pub fn parse_physical_expr(
 proto: &protobuf::PhysicalExprNode,
-ctx: &TaskContext,
+decode_ctx: &DecodeContext,
 input_schema: &Schema,
 codec: &dyn PhysicalExtensionCodec,
 ) -> Result> {
+// Check cache first if an ID is present
+if let Some(id) = proto.id {
+if let Some(cached) = decode_ctx.get_cached_expr(id) {
+return Ok(cached);
+}
+}
+

Review Comment:
   But it will also not get serialized at all, any dynamic filter present above 
a network boundary will never suffer any serialization or deserialization



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Deduplicate PhysicalExpr on proto ser/de using Arc pointer addresses [datafusion]

2025-10-21 Thread via GitHub


adriangb commented on code in PR #18192:
URL: https://github.com/apache/datafusion/pull/18192#discussion_r2448491118


##
datafusion/proto/src/physical_plan/from_proto.rs:
##
@@ -207,10 +212,19 @@ where
 /// * `codec` - An extension codec used to decode custom UDFs.
 pub fn parse_physical_expr(
 proto: &protobuf::PhysicalExprNode,
-ctx: &TaskContext,
+decode_ctx: &DecodeContext,
 input_schema: &Schema,
 codec: &dyn PhysicalExtensionCodec,
 ) -> Result> {
+// Check cache first if an ID is present
+if let Some(id) = proto.id {
+if let Some(cached) = decode_ctx.get_cached_expr(id) {
+return Ok(cached);
+}
+}
+

Review Comment:
   I guess we can inject the hooks when we serialize as well?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Deduplicate PhysicalExpr on proto ser/de using Arc pointer addresses [datafusion]

2025-10-21 Thread via GitHub


adriangb commented on code in PR #18192:
URL: https://github.com/apache/datafusion/pull/18192#discussion_r2448494037


##
datafusion/proto/proto/datafusion.proto:
##
@@ -865,6 +865,10 @@ message PhysicalExprNode {
 
 UnknownColumn unknown_column = 20;
   }
+
+  // Optional ID for caching during deserialization.
+  // Set to the Arc pointer address during serialization to enable 
deduplication.

Review Comment:
   The thing is we're not really using the addresses as pointers, just as a way 
to identify which two expressions are the same expression.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Deduplicate PhysicalExpr on proto ser/de using Arc pointer addresses [datafusion]

2025-10-21 Thread via GitHub


gabotechs commented on code in PR #18192:
URL: https://github.com/apache/datafusion/pull/18192#discussion_r2448447879


##
datafusion/proto/proto/datafusion.proto:
##
@@ -865,6 +865,10 @@ message PhysicalExprNode {
 
 UnknownColumn unknown_column = 20;
   }
+
+  // Optional ID for caching during deserialization.
+  // Set to the Arc pointer address during serialization to enable 
deduplication.

Review Comment:
   Makes sense. Still feels a bit weird to use a pointer address as identifier 
where in most cases serialization/deserialization will happen in different 
machines, but I have to agree it gets the job done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Deduplicate PhysicalExpr on proto ser/de using Arc pointer addresses [datafusion]

2025-10-21 Thread via GitHub


gabotechs commented on code in PR #18192:
URL: https://github.com/apache/datafusion/pull/18192#discussion_r2448429028


##
datafusion/proto/src/physical_plan/from_proto.rs:
##
@@ -207,10 +212,19 @@ where
 /// * `codec` - An extension codec used to decode custom UDFs.
 pub fn parse_physical_expr(
 proto: &protobuf::PhysicalExprNode,
-ctx: &TaskContext,
+decode_ctx: &DecodeContext,
 input_schema: &Schema,
 codec: &dyn PhysicalExtensionCodec,
 ) -> Result> {
+// Check cache first if an ID is present
+if let Some(id) = proto.id {
+if let Some(cached) = decode_ctx.get_cached_expr(id) {
+return Ok(cached);
+}
+}
+

Review Comment:
   Note that it can happen that under certain circumstances the producer part 
of a dynamic filter is never serialized/deserialized, as it might never get 
sent over the wire, but the consumer part does. I imagine in this scenario we 
will be left with an un-connected dynamic filter.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Deduplicate PhysicalExpr on proto ser/de using Arc pointer addresses [datafusion]

2025-10-21 Thread via GitHub


adriangb commented on code in PR #18192:
URL: https://github.com/apache/datafusion/pull/18192#discussion_r2448132611


##
datafusion/proto/proto/datafusion.proto:
##
@@ -865,6 +865,10 @@ message PhysicalExprNode {
 
 UnknownColumn unknown_column = 20;
   }
+
+  // Optional ID for caching during deserialization.
+  // Set to the Arc pointer address during serialization to enable 
deduplication.

Review Comment:
   What this accomplishes is that if you deserialize both a `DataSourceExec` 
and `TopK` execution node on the same machine at least those two are connected.
   
   Connecting them *between* machines needs more work, e.g. 
https://github.com/apache/datafusion/pull/18192#discussion_r2448127711



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Deduplicate PhysicalExpr on proto ser/de using Arc pointer addresses [datafusion]

2025-10-21 Thread via GitHub


adriangb commented on code in PR #18192:
URL: https://github.com/apache/datafusion/pull/18192#discussion_r2448127711


##
datafusion/proto/src/physical_plan/from_proto.rs:
##
@@ -207,10 +212,19 @@ where
 /// * `codec` - An extension codec used to decode custom UDFs.
 pub fn parse_physical_expr(
 proto: &protobuf::PhysicalExprNode,
-ctx: &TaskContext,
+decode_ctx: &DecodeContext,
 input_schema: &Schema,
 codec: &dyn PhysicalExtensionCodec,
 ) -> Result> {
+// Check cache first if an ID is present
+if let Some(id) = proto.id {
+if let Some(cached) = decode_ctx.get_cached_expr(id) {
+return Ok(cached);
+}
+}
+

Review Comment:
   I was thinking that we set the callbacks when we deserialize in a custom 
codec or something. We might have to add a function to the codec trait along 
the lines of `visit_physical_expr(&self, expr: Arc) -> 
Arc`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Deduplicate PhysicalExpr on proto ser/de using Arc pointer addresses [datafusion]

2025-10-20 Thread via GitHub


gabotechs commented on code in PR #18192:
URL: https://github.com/apache/datafusion/pull/18192#discussion_r2446841662


##
datafusion/proto/src/physical_plan/from_proto.rs:
##
@@ -207,10 +212,19 @@ where
 /// * `codec` - An extension codec used to decode custom UDFs.
 pub fn parse_physical_expr(
 proto: &protobuf::PhysicalExprNode,
-ctx: &TaskContext,
+decode_ctx: &DecodeContext,
 input_schema: &Schema,
 codec: &dyn PhysicalExtensionCodec,
 ) -> Result> {
+// Check cache first if an ID is present
+if let Some(id) = proto.id {
+if let Some(cached) = decode_ctx.get_cached_expr(id) {
+return Ok(cached);
+}
+}
+

Review Comment:
   🤔 your idea for having callbacks in 
https://github.com/apache/datafusion/pull/17370 could be a good alternative, 
although I wonder how those can be set from the outside in an arbitrary 
`Arc`. 
   
   If we had a way of getting all the dynamic filter expressions in a plan 
(`fn(plan: &Arc) -> Vec<&DynamicPhysicalExpr>`) this would 
indeed be a very nice approach.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Deduplicate PhysicalExpr on proto ser/de using Arc pointer addresses [datafusion]

2025-10-20 Thread via GitHub


gabotechs commented on code in PR #18192:
URL: https://github.com/apache/datafusion/pull/18192#discussion_r2446849691


##
datafusion/proto/proto/datafusion.proto:
##
@@ -865,6 +865,10 @@ message PhysicalExprNode {
 
 UnknownColumn unknown_column = 20;
   }
+
+  // Optional ID for caching during deserialization.
+  // Set to the Arc pointer address during serialization to enable 
deduplication.

Review Comment:
   🤔 I'm not sure if this addresses the fact that serialization will most 
likely happen on one machine, but deserialization would happen in a different 
one.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Deduplicate PhysicalExpr on proto ser/de using Arc pointer addresses [datafusion]

2025-10-20 Thread via GitHub


gabotechs commented on code in PR #18192:
URL: https://github.com/apache/datafusion/pull/18192#discussion_r2446787526


##
datafusion/proto/src/physical_plan/from_proto.rs:
##
@@ -207,10 +212,19 @@ where
 /// * `codec` - An extension codec used to decode custom UDFs.
 pub fn parse_physical_expr(
 proto: &protobuf::PhysicalExprNode,
-ctx: &TaskContext,
+decode_ctx: &DecodeContext,
 input_schema: &Schema,
 codec: &dyn PhysicalExtensionCodec,
 ) -> Result> {
+// Check cache first if an ID is present
+if let Some(id) = proto.id {
+if let Some(cached) = decode_ctx.get_cached_expr(id) {
+return Ok(cached);
+}
+}
+

Review Comment:
   This looks like a very elegant solution!
   
   It does solve the problem at hand, but I wonder how could this be extended 
for the case where the producer part of a dynamic filter is deserialized in one 
machine, and the consumer part in deserialized in other different machine, 
which is almost always going to be the case in a distributed context.
   
   For example, the idea that powered 
https://github.com/gabotechs/datafusion/pull/7, is that users can subscribe to 
changes to the dynamic filters in the global registry, and send/produce updates 
over the wire, something like:
   
   ```rust
   let ctx = SessionContext::new();
   
   let registry = ctx
   .task_ctx()
   .session_config()
   .get_extension::();
   
   registry.subscribe_to_updates();
   registry.push_updates();
   
   let plan: Arc; // <- plan with dynamic filters
   execute_stream(plan, ctx.task_ctx());
   ```
   
   I really would love to see something like this happening without recurring 
to a "global place" for reading/writing updates to dynamic filters, but I 
cannot come up with other ideas. 
   
   Do you think there's a chance we can do that with something simpler like 
what this PR proposes?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Deduplicate PhysicalExpr on proto ser/de using Arc pointer addresses [datafusion]

2025-10-20 Thread via GitHub


gabotechs commented on code in PR #18192:
URL: https://github.com/apache/datafusion/pull/18192#discussion_r2446787526


##
datafusion/proto/src/physical_plan/from_proto.rs:
##
@@ -207,10 +212,19 @@ where
 /// * `codec` - An extension codec used to decode custom UDFs.
 pub fn parse_physical_expr(
 proto: &protobuf::PhysicalExprNode,
-ctx: &TaskContext,
+decode_ctx: &DecodeContext,
 input_schema: &Schema,
 codec: &dyn PhysicalExtensionCodec,
 ) -> Result> {
+// Check cache first if an ID is present
+if let Some(id) = proto.id {
+if let Some(cached) = decode_ctx.get_cached_expr(id) {
+return Ok(cached);
+}
+}
+

Review Comment:
   This looks like a very elegant solution!
   
   It does solve the problem at hand, but I wonder how could this be extended 
for the case where the producer part of a dynamic filter is deserialized in one 
machine, and the consumer part in deserialized in other different machine, 
which is almost always going to be the case in a distributed context.
   
   For example, the idea that powered 
https://github.com/gabotechs/datafusion/pull/7, is that users can subscribe to 
changes to the dynamic filters in the global registry, and send/produce updates 
over the wire, something like:
   
   ```rust
   let ctx = SessionContext::new();
   
   let registry = ctx
   .task_ctx()
   .session_config()
   .get_extension::();
   
   registry.subscribe_to_updates();
   registry.push_updates();
   
   let plan: Arc;
   execute_stream(plan, ctx.task_ctx());
   ```
   
   I really would love to see something like this happening without recurring 
to a "global place" for reading/writing updates to dynamic filters, but I 
cannot come up with other ideas. 
   
   Do you think there's a chance we can do that with something simpler like 
what this PR proposes?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Deduplicate PhysicalExpr on proto ser/de using Arc pointer addresses [datafusion]

2025-10-20 Thread via GitHub


adriangb commented on code in PR #18192:
URL: https://github.com/apache/datafusion/pull/18192#discussion_r2446640086


##
datafusion/proto/src/physical_plan/mod.rs:
##
@@ -101,6 +102,42 @@ use datafusion_physical_plan::{ExecutionPlan, 
InputOrderMode, PhysicalExpr, Wind
 use prost::bytes::BufMut;
 use prost::Message;
 
+/// Context for decoding physical expressions with caching support.
+///
+/// This struct wraps a `TaskContext` and maintains a cache of previously 
deserialized
+/// physical expressions. The cache is keyed by the expression's ID (derived 
from the
+/// Arc pointer during serialization), allowing duplicate expressions in a 
plan to be
+/// deserialized only once.
+pub struct DecodeContext<'a> {
+task_context: &'a TaskContext,
+cache: Mutex>>,
+}
+
+impl<'a> DecodeContext<'a> {
+/// Create a new DecodeContext wrapping the given TaskContext.
+pub fn new(task_context: &'a TaskContext) -> Self {
+Self {
+task_context,
+cache: Mutex::new(HashMap::new()),
+}
+}
+
+/// Get the underlying TaskContext reference.
+pub fn task_context(&self) -> &'a TaskContext {
+self.task_context
+}
+
+/// Attempt to retrieve a cached physical expression by its ID.
+pub fn get_cached_expr(&self, id: u64) -> Option> {
+self.cache.lock().unwrap().get(&id).cloned()
+}
+
+/// Insert a physical expression into the cache with the given ID.
+pub fn insert_cached_expr(&self, id: u64, expr: Arc) {
+self.cache.lock().unwrap().insert(id, expr);
+}

Review Comment:
   This does assume there's no other reader/writer so it's safe to check 
`get()` then call `insert()` later and it's not possible that someone else 
inserted before us.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]