Building Pyrseia III: Server Middleware, Client Senders, CLI and InApp Validators

This is the third article in the Pyrseia series. The others are:

If you want to follow along with the code, this article refers to commit 5abf2eda9be06b7417395a50ef676454bbd8f667.

Server Middleware

I've added the concept of server middleware to Pyrseia. Taking a page from aiohttp's book, each server middleware is basically a coroutine that gets called with 3 arguments: the current request context, a pyrseia.Call instance (which has the function name and arguments), and a coroutine to continue the chain. The type of this continuation coroutine is NextMiddleware, which is an alias for Callable[[CTXT, Call], Awaitable[Any]].

This gives middleware a very simple interface but a large amount of flexibility. Your middleware doesn't have to call the continuation coroutine, it can return a result or raise an error right then and there. Your middleware can also change the context and Call instance it received in whatever way it wants before passing them on.

A very simple logging middleware could look like:

async def logging_middleware(ctx, call: Call, next):
    log.info(f"Processing {call.name}")
    try:
        return await next(ctx, call)
    finally:
        log.info(f"Processed {call.name}")

You pass in a list of middleware when creating the server.

from aiohttp import web
from pyrseia.aiohttp import create_aiohttp_app
from tests.calculator import Calculator

serv = server(Calculator, web.Request, middleware=[logging_middleware])
app = create_aiohttp_app(serv)
web.run_app(app)

Clients don't currently support middleware, but they should since it would be very useful for mixing in generic behavior. For example, the middleware could retry requests on certain errors or implement exponential backoff. Clients don't support request contexts though, so the API would be a tiny bit different.

Client Rearchitecture

I've refactored what used to be client adapters into two parts by extracting the logic for actually making the request. So now we have:

  • the client adapters, which are async generators and esentially factories for framework-specific clients
  • client senders, which are coroutines that are given a framework-specific client, a Call instance and the type of the response and are supposed to actually perform the call and return a result

The adapters are supposed to be written once per framework, so one for aiohttp, one for httpx and so on.

Senders you are supposed to customize and replace to your heart's content. The currently available adapters take optional senders as arguments.

An aiohttp sender is defined as: Callable[[ClientSession, Call, Type[T]], Awaitable[T]]. In other words, it's something that gets an instance of aiohttp.ClientSession, an instance of Call and is supposed to make the request and return an instance of T (we get the T from the method signature). The default sender included with the aiohttp adapter is simple enough to be shown here, inline:

from msgpack import dumps, loads
from cattr import Converter

converter = Converter()

async def s(session: ClientSession, call: Call, type):
     async with session.post(
         url,
         data=dumps(converter.unstructure(call)),
         timeout=client_timeout,
      ) as resp:
           return converter.structure(loads(await resp.read()), type)

It basically uses msgpack and cattrs to prepare a payload and posts it to a URL. Then it reads the response, pulls it through msgpack and cattrs again and returns it. It's essentially 3 lines of code, and as such can be replaced very easily if you want to use ujson or any other serialization tech. (Note we haven't actually gotten to error handling yet, so that part's unspecified.) One of our next steps should be to introduce a similar receiver concept for servers.

Apple InApp Validation

Now that I've actually defined some fundamental concepts and written some code, let's see if all this effort actually survives contact with the outside world. Let's write some code to interface with Apple's and Google's systems for validating in-app purchases. Apple first.

Apple provides us with a simple HTTP endpoint, verifyReceipt, that we're supposed to POST a JSON payload to and it'll send us a JSON payload back. Let's model this as an interface with one method:

class AppStoreVerifier:
    @rpc
    async def verify_receipt(
        self,
        receipt_b64_data: str,
        password: Optional[str],
        exclude_old_transactions: bool,
    ) -> ResponseBody:
        ...

The ResponseBody class is omitted (over 100 lines of code, and that's with parts skipped), but you can take a look here. I've modeled it as a type-annotated attrs class so we can use cattrs to structure it up from JSON. I've also replaced a few of its fields with pendulum.DateTime instances, and set up structuring rules in cattrs to be able to convert a few of the JSON fields to DateTimes. Now all we need is a sender, and we can make calls.

Since the API only contains one method, the sender is fairly simple. You can take a look right here.

Interlude: A CLI Interface

Now that we've written our first real-world client, it'd be great if we could actually test it somehow. One way of trying it out is using the new asyncio shell available in Python 3.8 by using python -m asyncio, creating the client manually, and invoking one of its methods.

Another would be creating a small CLI utility for doing requests. I've done so in the pyrseia.__main__ module, using the Typer library. Here's a taste:

$ python -m pyrseia contrib.apple:create_verifier "verify_receipt('<receipt>', None, True)"

ResponseBody(status=<Status.SUCCESS: 0>, latest_receipt=None, latest_receipt_info=[], pending_renewal_info=[], receipt=ResponseBody.Receipt(<omitted>), environment=<Environment.PRODUCTION: 'Production'>, is_retryable=None)

The first argument is a coroutine that produces a pyrseia.Client. The second argument is a string that gets parsed, evaluated and invoked using said client. The result gets printed out, the client is closed, and that's that.

There's also the --interactive (or -i) flag, which will drop you down into a PDB session with the response available to you so you can tinker with it manually instead of just printing it out.

Google InApp Validation

Google's InApp interfaces are a little more complex than Apple's. They insist on you using their libraries (here) to access their APIs, which is probably a good idea cryptographically speaking. But their library doesn't support asyncio and would leave us with nothing to do in this section, so we're not doing that. ;)

First we need to deal with auth. Google will provide you with a service_account JSON file, containing a bunch of private cryptographic information. We use the data in that file to get a temporary token from their auth servers, valid for one hour. This call makes use of JWT, courtesy of the PyJWT library. We can then use that token to actually call their APIs.

Here's the API we're implementing (only two methods this time):

class GooglePlayDeveloperApi:
    @rpc
    async def get_purchases_products(
        self, package_name: str, product_id: str, token: str
    ) -> ProductPurchase:
        ...

    @rpc
    async def get_voided_purchases(
        self,
        package_name: str,
        start_time: Optional[int],
        end_time: Optional[int],
        token: Optional[str],
        type: int,
    ) -> VoidedPurchasesResponse:
        ...

(Class definitions omitted for brevity.)

The client network adapter implementation is available over here.

The interesting thing about this client adapter is sharing state between invocation of the sender (the client adapter is basically a sender factory, remember?). The state we want to share is the access token, which is only valid for an hour. We define the sender in the actual client adapter function body, so it captures the context of the client adapter function body as a closure. A small Python trick is that closures don't actually capture free variables, but the entire enclosing context. This gives us an easy way of sharing the access token.

We also want to be good citizens and avoid a thundering herd problem, where we might refresh the access token multiple times from multiple requests when it expires. To avoid this we actually share two pieces of data: the access token itself and an optional in-progress asyncio task fetching it when it expires. asyncio makes this relatively easy to do without race conditions, due to it being single-threaded and every suspension point being obvious (due to it needing to be awaited).