GitHub user gidhubuser255 edited a discussion: Parallelization Enhancement Ideas
Hi, as I've been working with the Parallelizable/Collect capabilities of
Hamilton I've come across a couple limitations that I am having trouble finding
adequate workarounds for. I have some ideas that could address these that I
would like to discuss with the Hamilton team.
### The limitations:
1) **Nested Parallelizable** - something like this does not work:
```
def product_line() -> Parallelizable[str]:
return ['Cars', 'Trucks']
def product(product_line: str) -> Parallelizable[str]:
return {'Cars': ['Camry', 'Corolla'], 'Trucks': ['Tacoma',
'Tundra']}[product_line]
def product_data(product_line: str, product: str) -> int:
return len(product_line + product)
def product_line_data(product_data: Collect[int]) -> int:
return sum(product_data)
def all_data(product_line_data: Collect[int]) -> int:
return sum(product_line_data)
```
The workaround that I considered was flattening to just one Parallelizable that
returns a tuple of (product_lineX, productY), but this is no good because then
combining/processing the data will need to be done for both levels in one
function (all_data). This means you lose the parallelization benefits for the
product_line level (in this simple case it's negligible, but not in all real
world cases), and also the logic in all_data becomes more complicated as it
must now be responsible for parsing/processing multiple levels.
2) **Isolate Individual Parallelizable Path** - previously discussed here as
"not planned". However, I would request reopening discussion on this given the
context of the suggestions in this thread.
https://github.com/apache/hamilton/issues/1029
The workaround I considered here was something like this:
```
def product() -> Parallelizable[str]:
return ['Hat', 'Shirt', 'Shoes']
def product_data(product: str) -> Tuple[str, int]:
return (product, len(product))
def all_data_raw(product_data: Collect[Tuple[str, int]]) -> Dict[str, int]:
return {product: pd for product, pd in product_data}
def all_data(all_data_raw: Dict[str, int]) -> int:
return sum(all_data_raw.values())
def main(all_data: int, all_data_raw: Dict[str, int]) -> Tuple[int, int]:
return (all_data, all_data_raw['Hat'])
```
This pattern will give you the desired combined result (all_data), and also a
result indexed by individual path result (all_data_raw) that you can slice
from. However, this breaks down in a few ways. (1) You need to pass around a
tuple containing the info that the path is parameterized on, through each
function. (2) This will not allow you to access intermediate results in the
path, only the terminal node result, which is not necessarily the specific path
result you want. (3) If you only ever want a specific path result and not the
others, you'll still need to compute all the others, resulting in wasted
compute (or alternatively you could reconfigure your driver to avoid this, but
that also adds complexity).
### Proposal:
The idea is still rough and not fully fleshed out, but what I would propose is
a new additional syntax for defining parallizable paths. I think it's best if I
start with a simple example to illustrate the idea:
```
#################################################
# Existing Hamilton API
def product() -> Parallelizable[str]:
return ['Hat', 'Coat', ...]
def product_cost_raw(product: str) -> int:
return len(product)
def product_cost(product_cost_raw: int) -> int:
return product_cost_raw + 1
def total_cost_raw(product_cost: Collect[int]) -> int:
return sum(product_cost)
def total_cost(total_cost_raw: int) -> int:
return total_cost_raw + 1
driver = driver.Builder()…
driver.execute(['total_cost'])
#################################################
# Proposed Additional API - add driver and parallelized parameter as argument
for parallelized path functions
def product_cost_raw(driver, product: str) -> int:
return len(product)
def product_cost(driver, product: str) -> int:
return driver.call('product_cost_raw', product=product) + 1
def total_cost_raw(driver) -> int:
# parallelization happens here as a regular for loop
product_costs = [driver.call('product_cost', product=product) for product
in ['Hat', 'Coat', ...]]
product_costs = Collect[int](product_costs)
return sum(product_costs)
def total_cost(total_cost_raw: int) -> int: # remains the same
return total_cost_raw + 1
driver = driver.Builder()…
driver.execute('total_cost')
driver.execute('product_cost', product='Hat') # can now run individual
parallelized path in isolation
#################################################
```
The idea here is that when you want to do some branching you can provide to
your hamilton function a reference to the hamilton driver itself. This would be
used instead of the function arguments to encode the function dependencies by
"calling" them through the driver. And using Collect as a delimiting point to
say stop defining dependencies and collect their resolved values now. These
"calls" would be used during the graph build, so that instead of inspecting the
function arguments you run the function and follow its driver calls to the
dependencies, which can then potentially either be another "driver" function or
standard hamilton function. And what this means for parallelization is that
it's simply represented with a regular python for loop!
**This unlocks a few benefits:**
(1) similar to plain python - this follows extremely closely to how a naive
plain python implementation would be coded. Just replace `driver.call` with the
direct python call and remove `Collect` calls and you have your standard python
code. Plain for loops look the same, args passed the same way, etc, so it's
easy for users to understand what going on.
(2) nested parallelization possible - you could simply do something like this:
```
def C(driver, x):
return ...
def B(driver, x):
cs = [driver.C(i) for i in [1, 2, 3]]
cs = Collect[int](cs)
return sum(cs) + x
def A(driver):
bs = [driver.B(i) for i in [1, 2, 3]]
bs = Collect[int](bs)
return sum(bs)
```
(3) can call an individual path - e.g. can call this `driver.execute('B', x=1)`
(or driver.call within another hamilton function) without needing to build all
the other possible paths and without needing to alter/reconfigure the driver.
(4) can get a static graph - with Parallelizable the parallelized branches are
necessarily only known at run time. With this approach you can get the paths in
a static graph during the graph build, which opens up further
optimizations/inspections that can be done.
Would be happy to hear the teams thoughts and discuss further. Thanks!
GitHub link: https://github.com/apache/hamilton/discussions/1412
----
This is an automatically sent email for [email protected].
To unsubscribe, please send an email to:
[email protected]