Httpx: Supporting async auth flows.

Created on 14 Aug 2020  Â·  15Comments  Â·  Source: encode/httpx

Hello,

I try to make asynchronous calls to the Microsoft Graph API using httpx.AsyncClient. For authentication I use the azure-identity package that provides several client classes to easily get an access token:

from azure.identity.aio import EnvironmentCredential

async with EnvironmentCredential() as credential:
    token = await credential.get_token(*scopes)

I'd like then to subclass httpx.Auth and implement the auth_flow method

class AzureAuth(Auth):

    async def auth_flow(self, request):
        token = await self._credential.get_token(*self._scopes) # Get the access token or retrieve it from cache
        request.headers["Authorization"] = "Bearer " + token.token
        yield request

so that I can do things like

async with AsyncClient(base_url="https://graph.microsoft.com", auth=AzureAuth(...)) as client:
    response = await client.get("/groups")

The problem is that httpx.Auth.auth_flow is synchronous. How can I make it asynchronous ?

enhancement

Most helpful comment

What I was thinking about and tried to outline in https://github.com/encode/httpx/issues/1176#issuecomment-674381006 was something like this:

class Auth:
    def auth_flow(self, request: Request) -> Generator[Request, Response, None]:
        yield request

    async def async_auth_flow(self, request: Request) -> AsyncGenerator[Request, Response]:
        ...  # Defer to `.auth_flow()`

Essentially, we add an .async_auth_flow() method to the Auth interface, which defaults to "do the same as the sync case". This allows us to simply switch AsyncClient to call into flow = auth.async_auth_flow() always, which is nice and simple. This would be fully transparent to users, _including_ for cases when the auth class does I/O and provides different sync and async implementations.

Example based on @victoraugustolls's "lock around token acquisition" use case:

class TokenAuth(Auth):
    def _get_token(self) -> str:
        with self._lock:
            ...

    async def _async_get_token(self) -> str:
        async with self._async_lock:
            ...

    def auth_flow(self, request: Request) -> Generator[Request, Response, None]:
        token = self._get_token()
        request.headers["Authorization"] = f"Token {token}"
        yield request

    async def async_auth_flow(self, request: Request) -> AsyncGenerator[Request, Response]:
        token = await self._async_get_token()
        request.headers["Authorization"] = f"Token {token}"
        yield request

All 15 comments

Hi!

This is not supported currently.

After thinking about this with alternatives such as running await calls in the event loop (works for asyncio, but trio doesn't support it, and we lose any concurrency benefits), it seems indeed that we might want to consider how to add async support to auth implementations - your use case here is legit.

It's a bit tricky in terms of design and API, but let's consider options…

  • Allow auth_flow to be either sync, or async (as you propose above), which means def auth_flow() _or_ async def auth_flow().

    • Cons:

    • Doesn't allow providing both sync and async implementations, so most likely a no-no.

  • Add a new async def async_auth_flow() method to the Auth interface.

    • On the client side, AsyncClient must be able to handle both methods. It would prefer async_auth_flow() and use auth_flow() as a fallback. Client will only call into auth_flow().

    • Pros:

    • Backwards compatible (fully additive).

    • We can implement both sync and async on the same Auth subclass, with the choice of implementing them up to the author (perhaps with different libraries).

    • Transparent to the user: authors can add async support without requiring the user to change their code.

    • Cons:

    • Quite verbose in the async case.

  • Introduce AsyncAuth with an async def auth_flow() method.

    • The AsyncClient must be able to handle both kinds of auth (Auth and AsyncAuth) transparently, while Client only accepts Auth.

    • Pros:

    • Backwards compatible: when the auth flow doesn't do I/O, authors and users can stick to Auth.

    • When the auth flow must do I/O, author can provide an async class, and tell users to use the async one with AsyncClient.

    • Cons:

    • Instead of a single class, developers now need to deal with "two kinds of classes". But this is only _required_ when the auth class does I/O (if it doesn't, there's no point in having an AsyncAuth). So, pretty confusing.

Right now I'm thinking 2/ might be the best option…

Now, another option: "do nothing". If an auth scheme really did require to perform I/O _on each request_, that would be quite wasteful, right? In practice, this doesn't seem like something that would pop up very often, if at all.

Indeed, for your specific use case it looks like the acquisition of a token does not depend on the request.

So you could perhaps just write a generic BearerAuth class, that takes a static token…

class BearerAuth:
    def __init__(self, token):
        self._token = token

    def auth_flow(self, request):
        request.headers["Authorization"] = f"Bearer {self.token}"
        yield request

And then use it like this…

from azure.identity.aio import EnvironmentCredential

scopes = ...

async with EnvironmentCredential() as credential:
    token = await credential.get_token(*scopes)

    async with httpx.AsyncClient(auth=BearerAuth(token.token)):
        ...

Closing since from the information provided, I think my suggestion above should solve your use case. If it doesn't, feel free to reach back!

Other options here...

  • Create an AsyncClient subclass and override .send(...) to deal with 403 responses, and renew the auth token when needed.
  • Figure out what the actual HTTP request flow is, so that you can implement it as a custom auth flow, rather than by relying on the helper lib.

Of those two, the first would be tied to async and is marginally less convenient if you wanted to provide it as a third party package, but is likely to provide the hook you need. The second option gives you a single implementation that works with both, and will end up making either sync or async requests as required, but can't give you eg async disk accesses or other async non-HTTP I/O (which for almost all auth schemes shouldn't really matter).

Hi! Sorry if I'm being intrusive on this matter, but as in my company we use httpx against multiple services, I've seen multiple different auth flows on our side, so:

If an auth scheme really did require to perform I/O on each request, that would be quite wasteful, right? In practice, this doesn't seem like something that would pop up very often, if at all.

That's true, but some requests (against the same service) might need I/O and some not. I think I can give two flow examples:

1st:

  • Fetch a token from a remote API which is valid for only 30 minutes
  • Save it in local cache (expired with TTL)
  • While it is valid, no I/O is performed
  • When it is invalid, repeat the process

2nd:

  • Fetch a token from a remote API, which you don't know for how long it is going to be valid
  • Use it until you receive a 401 response from some API
  • Refetch the token

Those two flows wouldn't make an I/O for each request, but might need to.

I do have these flow on my end, and many more (and having I/O or not in them is around 50/50), and would be great if we could use the Auth class for this.
I solved those, and others, on my end bu having a custom HTTPService class with a custom flow in each.

Just one snippet example:

    async def get_token(self):
        token = await self._cache.get("TOKEN")
        if token is not None:
            return token

        await self._lock.acquire()
        try:
            token = await self._cache.get("TOKEN")
            if token is not None:
                return token

            token = await self._request_token()
            await self._cache.set("TOKEN", token, 27 * HALF_MINUTE)
            return token
        finally:
            self._lock.release()

await self._request_token() is actually a pretty complex endpoint request, but just wanted to give an example here.

The locking around token acquiry is a really good example case, yup.

@tomchristie So should this issue be reopened? Or maybe open another one to track the need for an async authentication flow?

Seconding this request. We're dealing with a system that's very similar to the one described earlier, and having async auth would be really helpful.

Opening this up with a 1.0 milestone so that we can make sure we're absolutely happy with what we've got here.

Have been thinking about this a bit recently, and one thing that jumps out is that there are other ways besides the generator approach that would still give us essentially the same auth flow, but just by slightly different means, that would also allow us to support async-native auth flows for cases where they really are required.

Here's a bit of an example...

class Auth:
    """
    The base class for all authentication classes.

    Subclasses must implement the `authenticate()` method.
    """

    def authenticate(self, request: Request) -> typing.Union[Request, AuthFlow, AsyncAuthFlow]:
        """
        Perform the authentication, may return one of three things:

        * A `Request` instance, used to make the request. Good for simple authentication cases.
        * An `AuthFlow`, which is a stateful instance that provides an API for making multiple requests.
        * An `AsyncAuthFlow`, which is a stateful instance that provides an async API for making multiple requests.
        """
        ...

class AuthFlow:
    """
    A concrete subclass of AuthFlow may be returned by `authenticate()` in order
    to implement an authentication flow that may make multiple requests.

    Authentication classes using this style may be used by either `Client` or `AsyncClient`,
    unless the implementation uses sync I/O such as disk accesses, in which case the auth
    should be documented as only supporting usage with `Client`. Usage of sync I/O within
    an async codebase will block the event loop, and could cause performance issues.
    """

    def on_request(request) -> Request:
        """
        Called once. Return the initial request that should be used.
        """
        raise NotImplementedError()

    def on_response(response) -> Optional[Request]:
        """
        Called once or more. Optionally return another request to issue.
        """
        raise NotImplementedError()


class AsyncAuthFlow:
    """
    A concrete subclass of AsyncAuthFlow may be returned by `authenticate()` in order
    to implement an async authentication flow that may make multiple requests.

    Authentication classes using this style may only be used by `AsyncClient`.
    """

    async def on_request(request) -> Request:
        """
        Called once. Return the initial request that should be used.
        """
        raise NotImplementedError()

    async def on_response(response) -> Optional[Request]:
        """
        Called once or more. Optionally return another request to issue.
        """
        raise NotImplementedError()

We'd still end up with essentially the same implementation within the client codebase, we'd just be interfacing with the auth flow using a different API. We'd also be able to continue to support the existing auth_flow with a backwards compatible shim, for however long we'd want to support that.

🤔

@tomchristie Pretty interesting!

So just to be sure I'm getting this right... For cases where library authors want to provide both a sync and async implementation, they would provide two classes:

  • MyAuth, whose authenticate method returns an AuthFlow
  • MyAsyncAuth, whose authenticate method returns an AsyncAuthFlow

Is this correct? If so, this doesn't seem to satisfy the "users shouldn't need to care about sync vs async Auth" property (which I think would be nice have) since they'd need to deal with two possible Auth classes.

What I was thinking about and tried to outline in https://github.com/encode/httpx/issues/1176#issuecomment-674381006 was something like this:

class Auth:
    def auth_flow(self, request: Request) -> Generator[Request, Response, None]:
        yield request

    async def async_auth_flow(self, request: Request) -> AsyncGenerator[Request, Response]:
        ...  # Defer to `.auth_flow()`

Essentially, we add an .async_auth_flow() method to the Auth interface, which defaults to "do the same as the sync case". This allows us to simply switch AsyncClient to call into flow = auth.async_auth_flow() always, which is nice and simple. This would be fully transparent to users, _including_ for cases when the auth class does I/O and provides different sync and async implementations.

Example based on @victoraugustolls's "lock around token acquisition" use case:

class TokenAuth(Auth):
    def _get_token(self) -> str:
        with self._lock:
            ...

    async def _async_get_token(self) -> str:
        async with self._async_lock:
            ...

    def auth_flow(self, request: Request) -> Generator[Request, Response, None]:
        token = self._get_token()
        request.headers["Authorization"] = f"Token {token}"
        yield request

    async def async_auth_flow(self, request: Request) -> AsyncGenerator[Request, Response]:
        token = await self._async_get_token()
        request.headers["Authorization"] = f"Token {token}"
        yield request

Oh cool, that's rather neat!
(I'm not sure I realised that asend was even possible.)

@florimondmanca That's really nicely done.

I'm going to work through a bit more stuff here wrt. AuthFlow as a bit of an explore, to see if there's anything valuable that comes out of it.

Following your same style with async_auth_flow we might end up with something like this:

class Auth:
    """
    The base class for all authentication classes.

    Subclasses must implement the `authenticate()` method.
    """

    def authenticate(self, request: Request) -> typing.Union[Request, AuthFlow]:
        """
        Perform the authentication, may return one of two things:

        * A `Request` instance, used to make the request. Good for simple authentication cases.
        * An `AuthFlow`, which is a stateful instance that provides an API for making multiple requests.
        """
        ...

class AuthFlow:
    """
    A concrete subclass of AuthFlow may be returned by `authenticate()` in order
    to implement an authentication flow that may make multiple requests.

    Authentication classes using this style should normally override `on_request`/`on_response`
    and may be used by either `Client` or `AsyncClient`.

    If an authentication class needs to perform I/O, such as disk access or network calls,
    or uses concurrency primitives such as locks, then it may *instead* override either
    `sync_on_request`/`sync_on_response`, or `async_on_request`/`async_on_response`
    for a sync-only or async-only auth class, or override both pairs for a sync+async auth class.
    """

    # Auth flows should prefer not to access the request/response body if possible
    # since it forces streaming requests/responses to be loaded into memory,
    # but it might be required for some kinds of signing.
    requires_request_body = False
    requires_response_body = False

    def on_request(request) -> Request:
        """
        Called once. Return the initial request that should be used.
        """
        raise NotImplementedError()

    def on_response(response) -> Optional[Request]:
        """
        Called once or more. Optionally return another request to issue.
        """
        return None

    def sync_on_request(request) -> Request:
        """
        Called into by `Client` instances.

        May be overridden instead of `on_request` if you need to perform I/O such as disk access,
        network calls, or use concurrency primitives such as locks.
        """
        if self.requires_request_body:
            request.read()
        return self.on_request(request)

    def sync_on_response(response) -> Optional[Request]:
        """
        Called into by `Client` instances.

        May be overridden instead of `on_response` if you need to perform I/O such as disk access,
        network calls, or use concurrency primitives such as locks.
        """
        if self.requires_response_body:
            response.read()
        return self.on_response(response)

    def async_on_request(request) -> Request:
        """
        Called into by `AsyncClient` instances.

        May be overridden instead of `on_request` if you need to perform I/O such as disk access,
        network calls, or use concurrency primitives such as locks.
        """
        if self.requires_request_body:
            await request.aread()
        return self.on_request(request)

    def async_on_response(response) -> Optional[Request]:
        """
        Called into by `AsyncClient` instances.

        May be overridden instead of `on_response` if you need to perform I/O such as disk access,
        network calls, or use concurrency primitives such as locks.
        """
        if self.requires_response_body:
            await response.aread()
        return self.on_response(response)

The token example would then look like this...

class TokenAuth(httpx.Auth):
    def authenticate(self):
        return TokenAuthFlow()

class TokenAuthFlow(httpx.AuthFlow):
    def _get_token(self) -> str:
        with self._lock:
            ...

    async def _async_get_token(self) -> str:
        async with self._async_lock:
            ...

    def sync_on_request(self, request: Request) -> Request:
        token = self._get_token()
        request.headers["Authorization"] = f"Token {token}"
        return request

    async def async_on_request(self, request: Request) -> Request:
        token = await self._async_get_token()
        request.headers["Authorization"] = f"Token {token}"
        return request

There's a few things that I prefer about one style and a few things I prefer about the other, so...

  • It's clear how to test httpx.AuthFlow instances in isolation, wheras trying to do that with the generator style is a bit fiddly. Maybe there's something nicer we could do there to help?
  • I like how we can fit in the request/response body workflow within the auth implementation. Again, partly because it makes them more obviously testable in isolation.
  • The flow within the generator style is probably more graceful when handling multiple requests than the pair of methods plus state on the instance.
  • But the flip-side of that is that calling into __anext__ and asend directly feels potentially brittle or subject to unexpected corner cases(?), and generator cleanup is dark magic. https://github.com/python-trio/trio/issues/265

A bit open minded about it at the moment - interested to know what y'all think.

@tomchristie Interesting points, yes… I'll comment below.

Also something to consider is that changing the auth API now would mean having to deal with deprecations even after 1.0 lands, since there's already a rather large practical use cases for this out there, and I'm not sure we want to just break things and require users to rewrite their auth using a new style before being able to jump onto 1.0.

It's clear how to test httpx.AuthFlow instances in isolation, wheras trying to do that with the generator style is a bit fiddly. Maybe there's something nicer we could do there to help?

I'd argue in both cases (generator-based auth flows, and callback-method-based auth flow classes), there needs to be something that "drives" the auth flow, either on the client side or for testing.

Okay, it might intuitively feel easier to do when advancing the flow is as easy as calling .on_request() and .on_response(), such as…

auth = Auth(...)
flow = auth.authenticate(...)

request = flow.on_request(...)
assert f'something about {request}'

response = ...  # Make up mock response.
request = flow.on_response(...)
assert request is None

But actually, the generator-based style mostly just requires swapping .on_request() and .on_response() for .send(<Response or None>)...

auth = Auth(...)
flow = auth.auth_flow(...)

request = flow.send(None)
assert f'something about {request}'

response = ...  # Make up mock response.
request = flow.send(response)
assert request is None

Yes, this requires a bit of low-level knowledge about the generator protocol (which is probably what you meant by "fiddly"?), but otherwise I'd argue unit-testability support looks good enough.

But the flip-side of that is that calling into __anext__ and asend directly feels potentially brittle or subject to unexpected corner cases(?)

Note that we could very much only call .asend(), in stead of .__anext__() and .asend(). (Currently we call next() and __anext__() only because mypy cringes on .send(None) and .asend(None), since it would expect Response objects being passed due to how generators are typed. I guess we could just switch to that style and just # type: ignore there.)

In any case, I believe that ".send() until you get a StopIteration" is how generators are meant to be driven, and this is well spec'd by the generator protocol. So provided we get that right (which I _think_ we do currently?) there's not much danger there.

and generator cleanup is dark magic

Can't really comment about that, except that I don't think we've had issues with yet?


I'm trying to argue in favor of status quo here, because I _really_ think that the generator-based API is superior in terms of user experience compared to a callback-based style.

(Superior as in "much easier to write and understand auth flows". Generators naturally shine in "here's a series of steps to follow sequentially" situations, while callbacks require developers to jump through mental hoops, keep track of what step they are as class state, etc. Eg there's a natural "yield means 'send the request" metaphor, but we have no such thing for the on_request/on_response style.)

Was this page helpful?
0 / 5 - 0 ratings