[ 
https://issues.apache.org/jira/browse/FLINK-21211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17281456#comment-17281456
 ] 

Chesnay Schepler commented on FLINK-21211:
------------------------------------------

You may have better chances with getting feedback on the user mailing list.

> Looking for reviews on a framework based on flink-statefun 
> -----------------------------------------------------------
>
>                 Key: FLINK-21211
>                 URL: https://issues.apache.org/jira/browse/FLINK-21211
>             Project: Flink
>          Issue Type: New Feature
>            Reporter: Zixuan Rao
>            Priority: Minor
>
> Hi, I am currently developing a framework targeting back end state 
> management. To ensure exactly-once processing of events in back end, I intend 
> to use Flink Stateful Functions runtime in combination with Python's asyncio. 
> I hope to receive some feedbacks. 
> The following code shows an example (draft) of writing a back end micro 
> service using my framework. It is intended to be equivalent (exchangeable) 
> with Flink-stateful examples/ridesharing. The idea is that "Event" is 
> reducible to an async function call, and external egress can be emitted by 
> saving an object. This preserves the exactly-once features of Flink-statefun 
> while adding a great deal of readability to the code. 
> Reviews are appreciated. Thank you! 
> {code}
> """
> Equivalent implementation for flink stateful functions example - ridesharing
> ref: 
> https://github.com/apache/flink-statefun/blob/master/statefun-examples/statefun-ridesharing-example/statefun-ridesharing-example-functions/src/main/java/org/apache/flink/statefun/examples/ridesharing/FnDriver.java
> """
> from onto.models.base import Serializable
> """
> Rewrite callback-style code to async-await: 
> ref: 
> https://www.coreycleary.me/how-to-rewrite-a-callback-function-in-promise-form-and-async-await-form-in-javascript
>  
> """
> from onto.domain_model import DomainModel
> from onto.attrs import attrs
> class RideshareBase(DomainModel):
>     pass
> class Passenger(RideshareBase):
>     async def request_ride(self, start_geo_cell, end_geo_cell):
>         r = Ride.create()  # TODO: implement create
>         await r.passenger_joins(
>             passenger=self,
>             start_geo_cell=start_geo_cell,
>             end_geo_cell=end_geo_cell
>         )
>     class PassengerMessage(DomainModel):
>         passenger = attrs.relation('Passenger')
>         class RideFailedMessage(Serializable):
>             ride = attrs.relation('Ride')
>         ride_failed = attrs.embed(RideFailedMessage).optional
>         class DriverHasBeenFoundMessage(Serializable):
>             driver = attrs.relation('Driver')
>             driver_geo_cell = attrs.relation('GeoCell')
>         driver_found = attrs.embed(RideFailedMessage).optional
>         class RideHasStarted(Serializable):
>             driver = attrs.relation('Driver')
>         ride_started = attrs.embed(RideHasStarted).optional
>         class RideHasEnded(Serializable):
>             pass  # TODO: make sure that empty class works
>         ride_ended = attrs.embed(RideHasEnded).optional
>     async def ride_failed(self, ride: 'Ride'):
>         message = self.PassengerMessage.new(
>             passenger=self,
>             ride_failed=self.PassengerMessage.RideFailedMessage.new(
>                 ride=ride
>             )
>         )
>         message.save()
>     async def driver_joins_ride(self, driver: 'Driver', driver_geo_cell: 
> 'GeoCell'):
>         message = self.PassengerMessage.new(
>             passenger=self,
>             driver_found=self.PassengerMessage.DriverHasBeenFoundMessage.new(
>                 driver=driver,
>                 driver_geo_cell=driver_geo_cell
>             )
>         )
>         message.save()
>     async def ride_started(self, driver: 'Driver'):
>         message = self.PassengerMessage.new(
>             passenger=self,
>             ride_started=self.PassengerMessage.RideHasStarted.new(
>                 driver=driver
>             )
>         )
>         message.save()
>     async def ride_ended(self):
>         message = self.PassengerMessage.new(
>             passenger=self,
>             ride_started=self.PassengerMessage.RideHasEnded.new()
>         )
>         message.save()
> class DriverRejectsPickupError(RideshareBase, Exception):
>     driver = attrs.relation(dm_cls='Driver')
>     ride = attrs.relation(dm_cls='Ride')
> class Driver(RideshareBase):
>     is_taken: bool = attrs.required
>     current_ride = attrs.relation(dm_cls='Ride').optional
>     current_location: 'GeoCell' = attrs.relation(dm_cls='GeoCell')
>     @is_taken.getter
>     def is_taken(self):
>         # TODO: make better
>         return self.current_ride is not None
>     async def pickup_passenger(self, ride: 'Ride', passenger: Passenger,
>                 passenger_start_cell: 'GeoCell',
>                 passenger_end_cell: 'GeoCell'):
>         if self.is_taken:
>             raise DriverRejectsPickupError(driver=self, ride=ride)
>         self.current_ride = ride
>         # "    // We also need to unregister ourselves from the current geo 
> cell we belong to."
>         if geo_cell := self.current_location:
>             await geo_cell.leave_cell(driver=self)
>         await ride.driver_joins(driver=self, 
> driver_location=self.current_location)
>         message = self.DriverMessage.new(
>             driver=self,
>             pickup_passenger=self.DriverMessage.PickupPassengerMessage.new(
>                 passenger=passenger,
>                 start_geo_location=passenger_start_cell,
>                 end_geo_location=passenger_end_cell
>             )
>         )
>         message.save()
>     class DriverMessage(RideshareBase):
>         driver = attrs.relation('Driver')
>         class PickupPassengerMessage(Serializable):
>             ride = attrs.relation('Ride')  # TODO: maybe passenger_id
>             start_geo_location = attrs.relation('GeoCell')
>             end_geo_location = attrs.relation('GeoCell')
>         pickup_passenger = attrs.embed(PickupPassengerMessage)
>     async def ride_has_started(self):
>         await self.current_ride.ride_started(driver=self, 
> driver_geo_cell=self.current_location)
>     async def ride_has_ended(self):
>         await self.current_ride.ride_ended()
>     async def location_is_updated(self, current_geo_cell: 'GeoCell'):
>         # TODO: maybe switch to embed:     final int updated = 
> locationUpdate.getLocationUpdate().getCurrentGeoCell();
>         updated = current_geo_cell
>         last = self.current_location
>         if last is None:
>             self.current_location = updated
>             await updated.join_cell()
>             return
>         elif last == updated:
>             return
>         else:
>             self.current_location = updated
> class Ride(RideshareBase):
>     passenger = attrs.relation(dm_cls=Passenger)
>     driver = attrs.relation(dm_cls=Driver)
>     async def ride_started(self, driver: Driver, driver_geo_cell: 'GeoCell'):
>         await self.passenger.ride_started(driver=driver)
>     async def passenger_joins(
>             self,
>             passenger: Passenger,
>             start_geo_cell: 'GeoCell',
>             end_geo_cell: 'GeoCell'
>     ):
>         self.passenger = passenger
>         MAX_RETRY = 5
>         # Ref: https://stackoverflow.com/a/7663441
>         for trial in range(MAX_RETRY):
>             if driver := start_geo_cell.get_driver():
>                 try:
>                     await driver.pickup_passenger(
>                         ride=self,
>                         passenger=passenger,
>                         passenger_start_cell=start_geo_cell,
>                         passenger_end_cell=end_geo_cell
>                     )
>                 except DriverRejectsPickupError as _:
>                     # TODO: NOTE difference from java impl
>                     """
>                     final int startGeoCell = 
> passenger.get().getStartGeoCell();
>                     String cellKey = String.valueOf(startGeoCell);
>                     context.send(FnGeoCell.TYPE, cellKey, 
> GetDriver.getDefaultInstance());
>                     """
>                     continue  # to retry
>                 else:
>                     break
>         else:
>             await passenger.ride_failed(ride=self)
>     async def driver_joins(self, driver, driver_location):
>         self.driver = driver
>         await self.passenger.driver_joins_ride(driver=driver, 
> driver_geo_cell=driver_location)
>     async def ride_ended(self, ):
>         await self.passenger.ride_ended()
>         self.passenger = None
>         self.driver = None
> class GeoCell(RideshareBase):
>     drivers: list = attrs.list(
>         value=attrs.relation(dm_cls=Driver)
>     )
>     async def get_driver(self) -> Driver:
>         if len(self.drivers) != 0:
>             next_driver = self.drivers[0]
>             return next_driver
>         else:
>             return None
>     async def leave_cell(self, driver: Driver):
>         self.drivers.remove(driver)
>     async def add_driver(self):
>         # TODO: mutated local variable vs mutated instance state;
>         #  may cause difference in behavior
>         if self.drivers is None:
>             self.drivers = list()
>         self.drivers.append(Driver)
>     join_cell = add_driver
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to