GitHub user gidhubuser255 edited a discussion: Parallelization Enhancement Ideas

Hi, as I've been working with the Parallelizable/Collect capabilities of 
Hamilton I've come across a couple limitations that I am having trouble finding 
adequate workarounds for. I have some ideas that could address these that I 
would like to discuss with the Hamilton team.

### The limitations:
1) **Nested Parallelizable** - something like this does not work:
```
def product_line() -> Parallelizable[str]:
    return ['Cars', 'Trucks']

def product(product_line: str) -> Parallelizable[str]:
    return {'Cars': ['Camry', 'Corolla'], 'Trucks': ['Tacoma', 
'Tundra']}[product_line]

def product_data(product_line: str, product: str) -> int:
    return len(product_line + product)

def product_line_data(product_data: Collect[int]) -> int:
    return sum(product_data)

def all_data(product_line_data: Collect[int]) -> int:
    return sum(product_line_data)
```

The workaround that I considered was flattening to just one Parallelizable that 
returns a tuple of (product_lineX, productY), but this is no good because then 
combining/processing the data will need to be done for both levels in one 
function (all_data). This means you lose the parallelization benefits for the 
product_line level (in this simple case it's negligible, but not in all real 
world cases), and also the logic in all_data becomes more complicated as it 
must now be responsible for parsing/processing multiple levels.

2) **Isolate Individual Parallelizable Path** - previously discussed here as 
"not planned". However, I would request reopening discussion on this given the 
context of the suggestions in this thread.
https://github.com/apache/hamilton/issues/1029

The workaround I considered here was something like this:
```
def product() -> Parallelizable[str]:
    return ['Hat', 'Shirt', 'Shoes']

def product_data(product: str) -> Tuple[str, int]:
    return (product, len(product))

def all_data_raw(product_data: Collect[Tuple[str, int]]) -> Dict[str, int]:
    return {product: pd for product, pd in product_data}

def all_data(all_data_raw: Dict[str, int]) -> int:
    return sum(all_data_raw.values())

def main(all_data: int, all_data_raw: Dict[str, int]) -> Tuple[int, int]:
    return (all_data, all_data_raw['Hat'])
```
This pattern will give you the desired combined result (all_data), and also a 
result indexed by individual path result (all_data_raw) that you can slice 
from. However, this breaks down in a few ways. (1) You need to pass around a 
tuple containing the info that the path is parameterized on, through each 
function. (2) This will not allow you to access intermediate results in the 
path, only the terminal node result, which is not necessarily the specific path 
result you want. (3) If you only ever want a specific path result and not the 
others, you'll still need to compute all the others, resulting in wasted 
compute (or alternatively you could reconfigure your driver to avoid this, but 
that also adds complexity).

### Proposal:
The idea is still rough and not fully fleshed out, but what I would propose is 
a new additional syntax for defining parallizable paths. I think it's best if I 
start with a simple example to illustrate the idea:

```
#################################################

# Existing Hamilton API
 
def product() -> Parallelizable[str]:
    return ['Hat', 'Coat', ...]
 
def product_cost_raw(product: str) -> int:
    return len(product)
 
def product_cost(product_cost_raw: int) -> int:
    return product_cost_raw + 1
 
def total_cost_raw(product_cost: Collect[int]) -> int:
    return sum(product_cost)
 
def total_cost(total_cost_raw: int) -> int:
    return total_cost_raw + 1
 
driver = driver.Builder()…
driver.execute(['total_cost'])

#################################################

# Proposed Additional API - add driver and parallelized parameter as argument 
for parallelized path functions
 
def product_cost_raw(driver, product: str) -> int:
    return len(product)
 
def product_cost(driver, product: str) -> int:
    return driver.call('product_cost_raw', product=product) + 1
 
def total_cost_raw(driver) -> int:
    # parallelization happens here as a regular for loop
    product_costs = [driver.call('product_cost', product=product) for product 
in ['Hat', 'Coat', ...]]
    product_costs = Collect[int](product_costs)
    return sum(product_costs)
 
def total_cost(total_cost_raw: int) -> int:  # remains the same
    return total_cost_raw + 1
 
driver = driver.Builder()…
driver.execute('total_cost')
driver.execute('product_cost', product='Hat')  # can now run individual 
parallelized path in isolation

#################################################
```

The idea here is that when you want to do some branching you can provide to 
your hamilton function a reference to the hamilton driver itself. This would be 
used instead of the function arguments to encode the function dependencies by 
"calling" them through the driver. And using Collect as a delimiting point to 
say stop defining dependencies and collect their resolved values now. These 
"calls" would be used during the graph build, so that instead of inspecting the 
function arguments you run the function and follow its driver calls to the 
dependencies, which can then potentially either be another "driver" function or 
standard hamilton function. And what this means for parallelization is that 
it's simply represented with a regular python for loop!

**This unlocks a few benefits:**
(1) similar to plain python - this follows extremely closely to how a naive 
plain python implementation would be coded. Just replace `driver.call` with the 
direct python call and remove `Collect` calls and you have your standard python 
code. Plain for loops look the same, args passed the same way, etc, so it's 
easy for users to understand what going on.

(2) nested parallelization possible - you could simply do something like this:
```
def C(driver, x):
    return ...

def B(driver, x):
    cs = [driver.C(i) for i in [1, 2, 3]]
    cs = Collect[int](cs)
    return sum(cs) + x

def A(driver):
    bs = [driver.B(i) for i in [1, 2, 3]]
    bs = Collect[int](bs)
    return sum(bs)
```
(3) can call an individual path - e.g. can call this `driver.execute('B', x=1)` 
(or driver.call within another hamilton function) without needing to build all 
the other possible paths and without needing to alter/reconfigure the driver.

(4) can get a static graph - with Parallelizable the parallelized branches are 
necessarily only known at run time. With this approach you can get the paths in 
a static graph during the graph build, which opens up further 
optimizations/inspections that can be done.

Would be happy to hear the teams thoughts and discuss further. Thanks!



GitHub link: https://github.com/apache/hamilton/discussions/1412

----
This is an automatically sent email for [email protected].
To unsubscribe, please send an email to: 
[email protected]

Reply via email to