Fastapi: [FEATURE] Suggest middlwares for oauth2 against external providers

Created on 9 Jan 2020  路  15Comments  路  Source: tiangolo/fastapi

Is your feature request related to a problem

Is your feature request related to a problem? Yes

It is unclear how to integrate an external oauth provider such as Microsoft, Google, Auth0 with FastAPI.

The solution you would like

A section on the documentation describing how to achieve this, or which libraries do we recommend to do so.

Two examples include the client from authlib and starlette-oauth2-api.

Describe alternatives you've considered

PR an implementation to FastAPI. The reason I avoided this is because OAuth is not a thing of FastAPI only, but of a web app in general.

Additional context

I am the author of starlette-oauth2-api, which we have been using to secure an API of ours against access tokens signed by external providers (multi-tenancy).

enhancement

Most helpful comment

@jfmrm Obrigado pelo exemplo! I reached a very similar solution that doesn't require the full "builder" aspect.

auth0.py

import json

from fastapi import Depends
from fastapi.security import OAuth2AuthorizationCodeBearer, SecurityScopes
from jose import jwt
from six.moves.urllib.request import urlopen

AUTH0_DOMAIN = "<auth0-domain>"
API_AUDIENCE = "<auth0-api-audience>"
ALGORITHMS = ["RS256"]

class AuthError(Exception):
    def __init__(self, error, status_code):
        self.error = error
        self.status_code = status_code

# Define a Authorization scheme specific to our Auth0 config
auth0_scheme = OAuth2AuthorizationCodeBearer(
    authorizationUrl=AUTH0_DOMAIN, tokenUrl=API_AUDIENCE
)

async def get_current_user(security_scopes: SecurityScopes, token: str = Depends(auth0_scheme)):
    # This down to `END` comment is from https://auth0.com/docs/quickstart/backend/python/01-authorization#create-the-jwt-validation-decorator
    jsonurl = urlopen("https://" + AUTH0_DOMAIN + "/.well-known/jwks.json")
    jwks = json.loads(jsonurl.read())
    unverified_header = jwt.get_unverified_header(token)

    rsa_key = {}
    for key in jwks["keys"]:
        if key["kid"] == unverified_header["kid"]:
            rsa_key = {
                "kty": key["kty"],
                "kid": key["kid"],
                "use": key["use"],
                "n": key["n"],
                "e": key["e"],
            }

    if rsa_key:
        try:
            payload = jwt.decode(
                token,
                rsa_key,
                algorithms=ALGORITHMS,
                audience=API_AUDIENCE,
                issuer=f"https://{AUTH0_DOMAIN}/",
            )
        except jwt.ExpiredSignatureError:
            raise AuthError(
                {"code": "token_expired", "description": "token is expired"}, 401
            )
        except jwt.JWTClaimsError:
            raise AuthError(
                {
                    "code": "invalid_claims",
                    "description": "incorrect claims,"
                    "please check the audience and issuer",
                },
                401,
            )
        except Exception:
            raise AuthError(
                {
                    "code": "invalid_header",
                    "description": "Unable to parse authentication" " token.",
                },
                401,
            )
       # END from Auth0

        # token.scope is represented as a string of scopes space seperated
        token_scopes = payload.get("scope", "").split()

        # Check that we all scopes are present
        for scope in security_scopes.scopes:
            if scope not in token_scopes:
                raise AuthError(
                    {
                        "code": "Unauthorized",
                        "description": f"You don't have access to this resource. `{' '.join(security_scopes.scopes)}` scopes required",
                    },
                    403,
                )

        return payload

Using this is now as simple as
app.py

from fastapi import FastAPI, Security

from .auth0 import AuthError, get_current_user

app = FastAPI()

@app.exception_handler(AuthError)
def handle_auth_error(request: Request, ex: AuthError):
    return JSONResponse(status_code=ex.status_code, content=ex.error)

@app.get("/private")
def private(user=Security(get_current_user)):
    return {"message": "You're an authorized user"}

@app.get("/private-with-scopes")
def privateScopes(user=Security(get_current_user, scopes["read:example"])):
    return {"message": "You're authorized with scopes!"}

All 15 comments

I personally would welcome a docs PR, and I think the discussion at the end of https://github.com/tiangolo/fastapi/pull/797 may be relevant. I think @tiangolo might already be cooking up something along these lines.

Hi All,

I would need it too ;-)
I found a good doc for flask here:
https://realpython.com/flask-google-login/

BR,
George

Did not know if this helps but I think this code should run for auth0 + FastAPI (actually I am not able to test it, but i will do asap). I merged the suggestion by auth0 from this site https://auth0.com/docs/quickstart/backend/python/01-authorization#validate-scopes with my implementation from FastAPI that just checks a list of valid tokens.

First I define all authentication stuff in one python script. Please note that I am just enabled query authentication.

""" functions to connect token validation with auth0 """
import os
import json
from urllib.request import urlopen

from jose import jwt
from typing import Dict
from fastapi.security.api_key import APIKeyQuery
from fastapi import Security

AUTH0_DOMAIN = os.environ['AUTH0_DOMAIN']
API_AUDIENCE = os.environ['API_AUDIENCE']
ALGORITHMS = ["RS256"]

API_KEY_NAME = "access_token"
api_key_query = APIKeyQuery(name=API_KEY_NAME, auto_error=False)


class AuthError(Exception):
    """ Error handling object """
    def __init__(self, error: Dict[str, str], status_code: int):
        self.error = error
        self.status_code = status_code


def auth0_token_authentication(
    token: str = Security(api_key_query)
):
    """ authenticates token parsed via query with auth0 API """
    jsonurl = urlopen("https://" + AUTH0_DOMAIN + "/.well-known/jwks.json")
    jwks = json.loads(jsonurl.read())
    unverified_header = jwt.get_unverified_header(token)
    rsa_key = {}
    for key in jwks["keys"]:
        if key["kid"] == unverified_header["kid"]:
            rsa_key = {
                "kty": key["kty"],
                "kid": key["kid"],
                "use": key["use"],
                "n": key["n"],
                "e": key["e"]
            }
    if rsa_key:
        try:
            payload = jwt.decode(
                token,
                rsa_key,
                algorithms=ALGORITHMS,
                audience=API_AUDIENCE,
                issuer="https://" + AUTH0_DOMAIN + "/"
            )
        except jwt.ExpiredSignatureError:
            raise AuthError({"code": "token_expired",
                             "description": "token is expired"}, 401)
        except jwt.JWTClaimsError:
            raise AuthError({"code": "invalid_claims",
                             "description":
                                 "incorrect claims,"
                                 "please check the audience and issuer"}, 401)
        except Exception:
            raise AuthError({"code": "invalid_header",
                             "description":
                                 "Unable to parse authentication"
                                 " token."}, 401)


        return payload
    raise AuthError({"code": "invalid_header",
                     "description": "Unable to find appropriate key"}, 401)

In your app you have to declare an exception handler and add the well known Depends hint with the authentication function :

@app.exception_handler(AuthError)
async def unicorn_exception_handler(request: Request, exc: AuthError):
    return JSONResponse(
        status_code=exc.status_code,
        content=exc.error,
    ) 

@app.get("/documentation", tags=["documentation"])
async def get_documentation(api_key: APIKey = Depends(auth0_token_authentication)):
    response = get_swagger_ui_html(openapi_url="/openapi.json", title="docs")
    response.set_cookie(
        API_KEY_NAME,
        value=api_key,
        domain=COOKIE_DOMAIN,
        httponly=True,
        max_age=1800,
        expires=1800,
    )
    return response

I hope this can help you.

@meteoDaniel Thanks for your post! It helped me a lot. And I actually iterated through it a little, making a dependency injection version, and gone a little further. Implemented a couple of automated tests on it. Unfortunately I have not found a way of testing it without hitting Auth0 API. If anyone would have some solution for that I would be pleased.

Here follows my code:

import json
from urllib.request import urlopen
from jose import jwt
from typing import Dict
from fastapi import Security
from fastapi.security import OAuth2AuthorizationCodeBearer
from typing import List


class AuthError(Exception):
    """ Error handling object """
    def __init__(self, error: Dict[str, str], status_code: int):
        self.error = error
        self.status_code = status_code


def get_jwks(auth0_domain: str):
    json_url = urlopen(f"https://{auth0_domain}/.well-known/jwks.json")
    return json.loads(json_url.read())


def auth0_token_authenticator_builder(
        oauth2_scheme: OAuth2AuthorizationCodeBearer,
        auth0_domain: str,
        api_audience: str,
        algorithms: List[str],
        jwks: dict
):
    def auth0_token_authentication(
        token: str = Security(oauth2_scheme)
    ):
        unverified_header = jwt.get_unverified_header(token)
        rsa_key = {}
        for key in jwks["keys"]:
            if key["kid"] == unverified_header["kid"]:
                rsa_key = {
                    "kty": key["kty"],
                    "kid": key["kid"],
                    "use": key["use"],
                    "n": key["n"],
                    "e": key["e"]
                }
        if rsa_key:
            try:
                payload = jwt.decode(
                    token,
                    rsa_key,
                    algorithms=algorithms,
                    audience=api_audience,
                    issuer=f"https://{auth0_domain}/"
                )
            except jwt.ExpiredSignatureError:
                raise AuthError({"code": "token_expired",
                                 "description": "token is expired"}, 401)
            except jwt.JWTClaimsError:
                raise AuthError({"code": "invalid_claims",
                                 "description":
                                     "incorrect claims,"
                                     "please check the audience and issuer"}, 401)
            except Exception:
                raise AuthError({"code": "invalid_header",
                                 "description":
                                     "Unable to parse authentication"
                                     " token."}, 401)

            return payload
        raise AuthError({"code": "invalid_header",
                         "description": "Unable to find appropriate key"}, 401)

    return auth0_token_authentication


def scope_verifier_builder(oauth2_scheme: OAuth2AuthorizationCodeBearer, required_scope):
    def scope_verifier(token: str = Security(oauth2_scheme)):
        unverified_claims = jwt.get_unverified_claims(token)
        if unverified_claims.get("scope"):
            token_scopes = unverified_claims["scope"].split()
            for token_scope in token_scopes:
                if token_scope == required_scope:
                    return True
        raise AuthError({
            "code": "no_required_permissions",
            "description": "user does not have the required permissions"
        }, 403)

    return scope_verifier

here follows the tests:

import unittest
from fastapi import FastAPI, Depends, status
from fastapi.testclient import TestClient
from fastapi.security import OAuth2AuthorizationCodeBearer
from utils import authentication


class TestAuthentication(unittest.TestCase):
    def setUp(self) -> None:
        # mocked env variables
        AUTH0_DOMAIN = "your_auth0_domain"
        API_AUDIENCE = "your_audience_endpoint"
        ALGORITHMS = ["RS256"]

        jwks = authentication.get_jwks(AUTH0_DOMAIN)

        # mocked auth scheme
        oauth2_scheme = OAuth2AuthorizationCodeBearer(
            authorizationUrl=AUTH0_DOMAIN,
            tokenUrl=API_AUDIENCE
        )

        # build auth method
        auth = authentication.auth0_token_authenticator_builder(
            oauth2_scheme=oauth2_scheme,
            auth0_domain=AUTH0_DOMAIN,
            api_audience=API_AUDIENCE,
            algorithms=ALGORITHMS,
            jwks=jwks
        )

        # build scope verifier
        scope_verifier = authentication.scope_verifier_builder(
            oauth2_scheme=oauth2_scheme,
            required_scope='read:all'
        )

        # build mocked protected API
        api = FastAPI()

        @api.get("/test")
        def test_endpoint(api_key=Depends(auth)):
            return api_key

        @api.get("/test/scope")
        def test_scope(scope: bool = Depends(scope_verifier)):
            return scope

        self.client = TestClient(api)

        # mocked token
        self.token = "generate_a_token_and_put_it_here:)"

    def test_authenticate(self):
        response = self.client.get(
            "/test",
            headers={"authorization": f"Bearer {self.token}"}
        )
        self.assertEqual(status.HTTP_200_OK, response.status_code)

    def test_has_required_scope(self):
        response = self.client.get(
            "/test/scope",
            headers={"authorization": f"Bearer {self.token}"}
        )
        self.assertEqual(status.HTTP_200_OK, response.status_code)


if __name__ == '__main__':
    unittest.main()

@jfmrm Obrigado pelo exemplo! I reached a very similar solution that doesn't require the full "builder" aspect.

auth0.py

import json

from fastapi import Depends
from fastapi.security import OAuth2AuthorizationCodeBearer, SecurityScopes
from jose import jwt
from six.moves.urllib.request import urlopen

AUTH0_DOMAIN = "<auth0-domain>"
API_AUDIENCE = "<auth0-api-audience>"
ALGORITHMS = ["RS256"]

class AuthError(Exception):
    def __init__(self, error, status_code):
        self.error = error
        self.status_code = status_code

# Define a Authorization scheme specific to our Auth0 config
auth0_scheme = OAuth2AuthorizationCodeBearer(
    authorizationUrl=AUTH0_DOMAIN, tokenUrl=API_AUDIENCE
)

async def get_current_user(security_scopes: SecurityScopes, token: str = Depends(auth0_scheme)):
    # This down to `END` comment is from https://auth0.com/docs/quickstart/backend/python/01-authorization#create-the-jwt-validation-decorator
    jsonurl = urlopen("https://" + AUTH0_DOMAIN + "/.well-known/jwks.json")
    jwks = json.loads(jsonurl.read())
    unverified_header = jwt.get_unverified_header(token)

    rsa_key = {}
    for key in jwks["keys"]:
        if key["kid"] == unverified_header["kid"]:
            rsa_key = {
                "kty": key["kty"],
                "kid": key["kid"],
                "use": key["use"],
                "n": key["n"],
                "e": key["e"],
            }

    if rsa_key:
        try:
            payload = jwt.decode(
                token,
                rsa_key,
                algorithms=ALGORITHMS,
                audience=API_AUDIENCE,
                issuer=f"https://{AUTH0_DOMAIN}/",
            )
        except jwt.ExpiredSignatureError:
            raise AuthError(
                {"code": "token_expired", "description": "token is expired"}, 401
            )
        except jwt.JWTClaimsError:
            raise AuthError(
                {
                    "code": "invalid_claims",
                    "description": "incorrect claims,"
                    "please check the audience and issuer",
                },
                401,
            )
        except Exception:
            raise AuthError(
                {
                    "code": "invalid_header",
                    "description": "Unable to parse authentication" " token.",
                },
                401,
            )
       # END from Auth0

        # token.scope is represented as a string of scopes space seperated
        token_scopes = payload.get("scope", "").split()

        # Check that we all scopes are present
        for scope in security_scopes.scopes:
            if scope not in token_scopes:
                raise AuthError(
                    {
                        "code": "Unauthorized",
                        "description": f"You don't have access to this resource. `{' '.join(security_scopes.scopes)}` scopes required",
                    },
                    403,
                )

        return payload

Using this is now as simple as
app.py

from fastapi import FastAPI, Security

from .auth0 import AuthError, get_current_user

app = FastAPI()

@app.exception_handler(AuthError)
def handle_auth_error(request: Request, ex: AuthError):
    return JSONResponse(status_code=ex.status_code, content=ex.error)

@app.get("/private")
def private(user=Security(get_current_user)):
    return {"message": "You're an authorized user"}

@app.get("/private-with-scopes")
def privateScopes(user=Security(get_current_user, scopes["read:example"])):
    return {"message": "You're authorized with scopes!"}

@Vivalldi do you have this code anywhere on github or somewhere? I am trying to do something similar but I'm hitting issues on this code.

Thanks

Actually @Vivalldi I got your example working, my only question now is how do you auth on the /docs route to have it send you to Auth0 to login and back or is that even possible? Was just curious how users would use the api via the docs.

The OpenAPI spec doesn't include JWT bearer tokens yet. There's no defined standard as to where those tokens live and as such there's no OpenAPI config for it yet. I might be able to contrive an example where you set the token in the username field of basic auth (without pass) similar to stripe https://stripe.com/docs/api/authentication

@Vivalldi how are you handling your user relations this way? I'm guessing the sub data from the user object is basically the userid so would I just use this when I'm creating records that would have an ownerId for example or how have you been doing that?

This isn't directly related to this issue but in short, yes, you should use the user identifier.

https://auth0.com/docs/users/normalized/auth0/store-user-data

Here is a working Auth0 and FastAPI code snippet.
main.py

from fastapi import Depends, FastAPI
from routers import items
from utils.auth import require_auth

app = FastAPI()

app.include_router(
    items.router,
    prefix="/items",
    tags=["items"],
    dependencies=[Depends(require_auth)],
    responses={404: {"description": "Not found"}},
)

utils.auth.py

import json
import logging
import os
from fastapi import HTTPException, Header
from six.moves.urllib.request import urlopen
from jose import jwt
from jose import exceptions as JoseExceptions


logging.basicConfig(
    format='%(asctime)s %(message)s', 
    datefmt='%m/%d/%Y %I:%M:%S %p',
    level=logging.os.environ.get('LOGGING_LEVEL', 'INFO'))

AUTH0_DOMAIN = os.environ.get(
    'AUTH0_DOMAIN', '')

AUTH0_API_AUDIENCE = os.environ.get(
    'AUTH0_API_AUDIENCE', '')

AUTH0_ALGORITHMS = os.environ.get(
    'AUTH0_ALGORITHMS', 'RS256')


def get_token_auth_header(authorization):
    parts = authorization.split()

    if parts[0].lower() != "bearer":
        raise HTTPException(
            status_code=401, 
            detail='Authorization header must start with Bearer')
    elif len(parts) == 1:
        raise HTTPException(
            status_code=401, 
            detail='Authorization token not found')
    elif len(parts) > 2:
        raise HTTPException(
            status_code=401, 
            detail='Authorization header be Bearer token')

    token = parts[1]

    return token


def get_payload(jwks, unverified_header, token):
    rsa_key = {}
    payload = None
    for key in jwks["keys"]:
        if key["kid"] == unverified_header["kid"]:
            rsa_key = {
                "kty": key["kty"],
                "kid": key["kid"],
                "use": key["use"],
                "n": key["n"],
                "e": key["e"]
            }
        if rsa_key:
            try:
                payload = jwt.decode(
                    token,
                    rsa_key,
                    algorithms=AUTH0_ALGORITHMS,
                    audience=AUTH0_API_AUDIENCE,
                    issuer="https://"+AUTH0_DOMAIN+"/"
                )
            except jwt.ExpiredSignatureError:
                raise HTTPException(
                    status_code=401, 
                    detail='Authorization token expired')
            except jwt.JWTClaimsError:
                raise HTTPException(
                    status_code=401, 
                    detail='Incorrect claims, check the audience and issuer.')
            except Exception as e:
                logging.warning(e, exc_info=True)
                raise HTTPException(
                    status_code=401, 
                    detail='Unable to parse authentication token')

    return payload


async def require_auth(authorization: str = Header(...)):
    token = get_token_auth_header(authorization)
    jsonurl = urlopen("https://"+AUTH0_DOMAIN+"/.well-known/jwks.json")
    jwks = json.loads(jsonurl.read())

    try:
        unverified_header = jwt.get_unverified_header(token)
    except JoseExceptions.JWTError:
        raise HTTPException(
                    status_code=401, 
                    detail='Unable to decode authorization token headers')

    payload = get_payload(jwks, unverified_header, token)

    if not payload:
        raise HTTPException(
                    status_code=401, 
                    detail='Invalid authorization token')

    return payload

routers.items.py

from fastapi import APIRouter, HTTPException

router = APIRouter()


@router.get("/")
async def read_items():
    return [{"name": "Item Foo"}, {"name": "item Bar"}]


@router.get("/{item_id}")
async def read_item(item_id: str):
    return {"name": "Fake Specific Item", "item_id": item_id}


@router.put(
    "/{item_id}",
    tags=["custom"],
    responses={403: {"description": "Operation forbidden"}},
)
async def update_item(item_id: str):
    if item_id != "foo":
        raise HTTPException(status_code=403, detail="You can only update the item: foo")
    return {"item_id": item_id, "name": "The Fighters"}

@rsitro4 Nice example. How do you use swagger-ui (/docs) with this?

@LindezaGrey did you know how to use it with swagger-ui (/docs)?

@LuisHernandez1611681 well sort of. In auth0 i added another authorization flow and internally used OAuth2PasswordBearer for the docs page. This means that you need to provide also client_secret and client_id. In my use case only admins use the /docs page for data entry and retrieval so it's not a big deal. if the docs page is faces to customers/clients, of course you don't want to provide those credentials...

Was this page helpful?
0 / 5 - 0 ratings