Architectural challenges on integrating Rappi

gui commited 3 years ago · architecture 🐍 Python 🧩 Design Patterns microservices django ·

I talked previously about some challenges I faced when we integrated ifood at Mimic. The truth is that it was not the first to be integrated, it was Rappi.

If you don't remember, Mimic integrates with several food aggregators to provide the best customer experience at the lowest possible cost. So far we have integrations with Rappi, ifood, and Uber (besides other logistics integrations).

Rappi had its own challenges and decisions that made the work easier to integrate, I believe it's worthy to share some concepts and strategies used to scale and deliver it fast. We were able to finish the whole integration in a month! Before jumping into the tech (aka fun) part, let's understand the flow:

Rappi Flow

The Rappi-order flow

The customer orders through the app, the order is polled by a worker called fetcher, it forwards (through Kafka) to a microservice named Rappi Adapter which communicates with Rappi and accepts or rejects the order.

The Rappi-delivery flow

Once the order is accepted it starts being produced, then Rappi assigns a courier and then we start receiving events through webhooks.

Such webhooks are received by our microservice and handled properly. This is the expected "happy status" flow of a successful delivery:

  1. NOT_ASSIGNED (no courier assigned yet)
  2. ASSIGNED (a courier is on the way to the kitchen - this payload contains courier data)
  3. IN_STORE (the courier arrived to our kitchen)
  4. IN_TRANSIT (the courier took the order)
  5. AT_DESTINATION (the courier is waiting for the customer)
  6. DELIVERED (done!)

Surprise: Bundles!

Sometimes 2 customers that live close to each other decide to order at the same time. When this happens, Rappi decides to optimize and get the same courier to deliver 2 orders.

Unfortunately, there's no specific webhook to tell us that! The only way to discover is by asking the courier how many orders he needs to pick up OR estimating it. You can bet we decided to estimate that ourselves. If we don't estimate, we lose performance and prioritize poorly.

Let's say we have the following orders in sequence: A, B, and C

As you can see, order A and C are bundled and should be delivered by Bob! But B is a big order which takes longer to produce. If our kitchen is not aware of that, we're going to deliver a delicious burger to Bob from order A, and Bob will have to wait until they finish order B.

Joe laughs at Bob as he leaves to deliver order B. In the meantime, items from order A are getting colder.

That's a terrible experience for our customer. That's why we need to tell our kitchen: Cook orders A and C together!!!

But how to find it out? We realized that whenever we receive a "Courier assignment" (with a unique courier id) event we can query our database to find out whether the courier has been assigned to any other order which has not been delivered yet.

That's what great software is all about! The kitchen is happy without couriers complaining about the long wait. Couriers are happy because they can deliver the order faster. Customers are happy because they eat it fresh - yummy.

Tech

Cool, we know the business, it's time to code.

This post will not encompass the details of fetcher because it's boring. It's just a damn Kubernetes pod polling from time to time orders from Rappi - and that's pretty much it!

Here at Mimic, we use some boring stack to develop and deliver value as fast as possible. Basically, we use:

Webhooks

All webhooks incoming from Rappi are very similar:

Example: 10/couriers/at_hub/ represents an update to order #10 ( 10 ) to say the courier arrived at the kitchen ( we frequently call it hub - at_hub ).

Do you remember that we just need to update the order status in our database so we can let the kitchen know how to prioritize? Great, we use Django urls + views + serializers to make it with just a few lines.

See urls:

ALLOWED_METHODS_UPDATE_ORDER = {"patch": "update"}

urlpatterns = [
    path(
        "<str:aggregator_id>/couriers/assign/",
        views.CourierAssignEventView.as_view(ALLOWED_METHODS_UPDATE_ORDER),
        name="courier_assign",
        kwargs={"status": models.CourierStatus.ASSIGNED},
    ),
    path(
        "<str:aggregator_id>/couriers/at_hub/",
        views.CourierStatusEventView.as_view(ALLOWED_METHODS_UPDATE_ORDER),
        name="courier_at_hub",
        kwargs={"status": models.CourierStatus.AT_HUB},
    ),
    path(
        "<str:aggregator_id>/couriers/in_transit/",
        views.CourierStatusEventView.as_view(ALLOWED_METHODS_UPDATE_ORDER),
        name="courier_in_transit",
        kwargs={"status": models.CourierStatus.IN_TRANSIT},
    ),
    path(
        "<str:aggregator_id>/couriers/at_destination/",
        views.CourierStatusEventView.as_view(ALLOWED_METHODS_UPDATE_ORDER),
        name="courier_at_destination",
        kwargs={"status": models.CourierStatus.AT_DESTINATION},
    ),
    path(
        "<str:aggregator_id>/couriers/delivered/",
        views.CourierStatusEventView.as_view(ALLOWED_METHODS_UPDATE_ORDER),
        name="courier_delivered",
        kwargs={"status": models.CourierStatus.DELIVERED},
    ),
]

You may notice we pass a special argument "kwargs" which maps the status to an enum. Also, note how we reuse our views for most of the status!

Let's take a look at how views work, they should be complex, right?

class BaseOrderEventView(ABC, mixins.UpdateModelMixin, viewsets.GenericViewSet):
    authentication_classes = (authentication.RappiAuthentication,)
    permission_classes = (authentication.IsRappiAuthenticated,)
    queryset = Order.objects.all()
    lookup_field = "aggregator_id"

    @property
    @abstractmethod
    def status_property(self):
        pass

    def get_kwargs_context(self):
        return {self.status_property: self.kwargs.get("status")}

    def get_serializer(self, instance, data, **kwargs):
        updated_data = {**data, **self.get_kwargs_context()}
        return super().get_serializer(instance, data=updated_data, **kwargs)

    def update(self, request, *args, **kwargs):
        status = self.kwargs.get("status")
        logger.debug(
            f"Trying to partially update an order with payload: {self.request.data}"
        )
        response = super().update(request, *args, **kwargs)

        logger.info(
            f"Successfully updated order #{self.kwargs.get(self.lookup_field)} "
            f" as {status}"
        )
        return response


class KitchenStatusEventView(BaseOrderEventView):
    """ API Handler for events related to the order """
    
    serializer_class = KitchenStatusEventSerializer
    status_property = "kitchen_status"


class CourierStatusEventView(BaseOrderEventView):
    """ API Handler for events related to the courier """

    serializer_class = CourierStatusEventSerializer
    status_property = "courier_status"


class CourierAssignEventView(CourierStatusEventView):
    """ API Handler specifically for courier assign event """

    serializer_class = CourierAssignEventSerializer

Note how we apply Open/Closed principles by extending the base behavior (update). We just need to specify which serializer_class to use and which status_property to update. The enums are extracted from the urls and passed down to serializers as extra args.

If we wish to handle a new event related to kitchen status what we would do? You bet right, extend it and implement the new custom behavior in there! In case you don't remember, the Open/Closed principle states:

Code should be open for extension, closed for modification

Handling bundles without "if"s

I know, I know. It's so freaking easy to add an if:

courier_id = 8937
has_other_orders = find_other_orders(courier_id)
if has_other_orders:
    create_bundle()

Unfortunately, although easy, it wouldn't be as easy to scale.

Let's say later on we want to reward couriers that deliver orders under 10 minutes with a delicious burger. If you go "the easy way", it means you're very likely repeating the formula:

courier_id = 8937
has_other_orders = find_other_orders(courier_id)
if has_other_orders:
    create_bundle(courier_id)

is_worthy_burger = check_courier_is_worthy(courier_id)
if is_worthy_burger:
    generate_bonus_order(courier_id)

It does not just increases the complexity of software as it also breaks the Open/Closed principle we discussed before.

We're going to use the Strategy pattern kindly named "Side Effects" to represent special events happening under specific conditions.

The expected flow would be for a service (responsible for handling business logic) to generate instances named side effects, execute them and finally finish the transaction.

The class/hierarchy is not complex, see:

We have an abstract class, followed by 2 inheritances. You may think it's funny to have a NoSideEffect but it's just to ensure our service always have something to execute. After all, doing nothing is a decision as well.

The context object in there is a simple dataclass that collects info that should answer: "what's going on?". Don't believe me? Take a look yourself:

@dataclass
class UpdateContext:
    """ Represents the update in progress """

    order: Order
    transiting_to: str

See? Simple!

Now, let's see the "Create side effect" step:

class SideEffectFactory:
    def create(self, orders: Iterable[Order], transiting_to: str):
        for order in orders:
            context = UpdateContext(order, transiting_to)

            if context.transiting_to == CourierStatus.IN_TRANSIT:
                yield side_effects.CourierInTransitSideEffect(context)
            if context.transiting_to == CourierStatus.ASSIGNED:
                yield side_effects.CourierAssignedSideEffect(context)

            yield side_effects.NoSideEffect(context)

As you can see, anytime our service tries to update, it creates a side effect through a factory. This factory use generators because we don't want to limit ourselves with a transition producing only ONE side effect. Considering the courier reward scenario we mentioned earlier. We should be able to produce a bundle and reward the courier at the same time - two or N side effects per transition.

So our OrderService.update Β method is somewhat like this:

def update(self, orders: Iterable[Order], to_status: str):
    side_effects = self.side_effect_factory.create(orders, to_status)
    self._execute_side_effects(side_effects)
    self._batch_update(orders)

This is very powerful because we are very unlikely going to change this service again. If new business rules are added, we can always extend our strategies (or side_effects), appending a rule to our factory without modifying what already worked.

That's gold!

If you or your team had to handle all these API events. Would you do better/faster? If yes, then get in touch! I'm eager to hear. πŸ˜„

  • LinkedIn
  • Tumblr
  • Reddit
  • Google+
  • Pinterest
  • Pocket