Is your feature request related to a problem? Yes
It is unclear how to integrate an external oauth provider such as Microsoft, Google, Auth0 with FastAPI.
A section on the documentation describing how to achieve this, or which libraries do we recommend to do so.
Two examples include the client from authlib and starlette-oauth2-api.
PR an implementation to FastAPI. The reason I avoided this is because OAuth is not a thing of FastAPI only, but of a web app in general.
I am the author of starlette-oauth2-api, which we have been using to secure an API of ours against access tokens signed by external providers (multi-tenancy).
I personally would welcome a docs PR, and I think the discussion at the end of https://github.com/tiangolo/fastapi/pull/797 may be relevant. I think @tiangolo might already be cooking up something along these lines.
Hi All,
I would need it too ;-)
I found a good doc for flask here:
https://realpython.com/flask-google-login/
BR,
George
How to add google auth is explained in this article:
https://medium.com/data-rebels/fastapi-google-as-an-external-authentication-provider-3a527672cf33
Did not know if this helps but I think this code should run for auth0 + FastAPI (actually I am not able to test it, but i will do asap). I merged the suggestion by auth0 from this site https://auth0.com/docs/quickstart/backend/python/01-authorization#validate-scopes with my implementation from FastAPI that just checks a list of valid tokens.
First I define all authentication stuff in one python script. Please note that I am just enabled query authentication.
""" functions to connect token validation with auth0 """
import os
import json
from urllib.request import urlopen
from jose import jwt
from typing import Dict
from fastapi.security.api_key import APIKeyQuery
from fastapi import Security
AUTH0_DOMAIN = os.environ['AUTH0_DOMAIN']
API_AUDIENCE = os.environ['API_AUDIENCE']
ALGORITHMS = ["RS256"]
API_KEY_NAME = "access_token"
api_key_query = APIKeyQuery(name=API_KEY_NAME, auto_error=False)
class AuthError(Exception):
""" Error handling object """
def __init__(self, error: Dict[str, str], status_code: int):
self.error = error
self.status_code = status_code
def auth0_token_authentication(
token: str = Security(api_key_query)
):
""" authenticates token parsed via query with auth0 API """
jsonurl = urlopen("https://" + AUTH0_DOMAIN + "/.well-known/jwks.json")
jwks = json.loads(jsonurl.read())
unverified_header = jwt.get_unverified_header(token)
rsa_key = {}
for key in jwks["keys"]:
if key["kid"] == unverified_header["kid"]:
rsa_key = {
"kty": key["kty"],
"kid": key["kid"],
"use": key["use"],
"n": key["n"],
"e": key["e"]
}
if rsa_key:
try:
payload = jwt.decode(
token,
rsa_key,
algorithms=ALGORITHMS,
audience=API_AUDIENCE,
issuer="https://" + AUTH0_DOMAIN + "/"
)
except jwt.ExpiredSignatureError:
raise AuthError({"code": "token_expired",
"description": "token is expired"}, 401)
except jwt.JWTClaimsError:
raise AuthError({"code": "invalid_claims",
"description":
"incorrect claims,"
"please check the audience and issuer"}, 401)
except Exception:
raise AuthError({"code": "invalid_header",
"description":
"Unable to parse authentication"
" token."}, 401)
return payload
raise AuthError({"code": "invalid_header",
"description": "Unable to find appropriate key"}, 401)
In your app you have to declare an exception handler and add the well known Depends hint with the authentication function :
@app.exception_handler(AuthError)
async def unicorn_exception_handler(request: Request, exc: AuthError):
return JSONResponse(
status_code=exc.status_code,
content=exc.error,
)
@app.get("/documentation", tags=["documentation"])
async def get_documentation(api_key: APIKey = Depends(auth0_token_authentication)):
response = get_swagger_ui_html(openapi_url="/openapi.json", title="docs")
response.set_cookie(
API_KEY_NAME,
value=api_key,
domain=COOKIE_DOMAIN,
httponly=True,
max_age=1800,
expires=1800,
)
return response
I hope this can help you.
@meteoDaniel Thanks for your post! It helped me a lot. And I actually iterated through it a little, making a dependency injection version, and gone a little further. Implemented a couple of automated tests on it. Unfortunately I have not found a way of testing it without hitting Auth0 API. If anyone would have some solution for that I would be pleased.
Here follows my code:
import json
from urllib.request import urlopen
from jose import jwt
from typing import Dict
from fastapi import Security
from fastapi.security import OAuth2AuthorizationCodeBearer
from typing import List
class AuthError(Exception):
""" Error handling object """
def __init__(self, error: Dict[str, str], status_code: int):
self.error = error
self.status_code = status_code
def get_jwks(auth0_domain: str):
json_url = urlopen(f"https://{auth0_domain}/.well-known/jwks.json")
return json.loads(json_url.read())
def auth0_token_authenticator_builder(
oauth2_scheme: OAuth2AuthorizationCodeBearer,
auth0_domain: str,
api_audience: str,
algorithms: List[str],
jwks: dict
):
def auth0_token_authentication(
token: str = Security(oauth2_scheme)
):
unverified_header = jwt.get_unverified_header(token)
rsa_key = {}
for key in jwks["keys"]:
if key["kid"] == unverified_header["kid"]:
rsa_key = {
"kty": key["kty"],
"kid": key["kid"],
"use": key["use"],
"n": key["n"],
"e": key["e"]
}
if rsa_key:
try:
payload = jwt.decode(
token,
rsa_key,
algorithms=algorithms,
audience=api_audience,
issuer=f"https://{auth0_domain}/"
)
except jwt.ExpiredSignatureError:
raise AuthError({"code": "token_expired",
"description": "token is expired"}, 401)
except jwt.JWTClaimsError:
raise AuthError({"code": "invalid_claims",
"description":
"incorrect claims,"
"please check the audience and issuer"}, 401)
except Exception:
raise AuthError({"code": "invalid_header",
"description":
"Unable to parse authentication"
" token."}, 401)
return payload
raise AuthError({"code": "invalid_header",
"description": "Unable to find appropriate key"}, 401)
return auth0_token_authentication
def scope_verifier_builder(oauth2_scheme: OAuth2AuthorizationCodeBearer, required_scope):
def scope_verifier(token: str = Security(oauth2_scheme)):
unverified_claims = jwt.get_unverified_claims(token)
if unverified_claims.get("scope"):
token_scopes = unverified_claims["scope"].split()
for token_scope in token_scopes:
if token_scope == required_scope:
return True
raise AuthError({
"code": "no_required_permissions",
"description": "user does not have the required permissions"
}, 403)
return scope_verifier
here follows the tests:
import unittest
from fastapi import FastAPI, Depends, status
from fastapi.testclient import TestClient
from fastapi.security import OAuth2AuthorizationCodeBearer
from utils import authentication
class TestAuthentication(unittest.TestCase):
def setUp(self) -> None:
# mocked env variables
AUTH0_DOMAIN = "your_auth0_domain"
API_AUDIENCE = "your_audience_endpoint"
ALGORITHMS = ["RS256"]
jwks = authentication.get_jwks(AUTH0_DOMAIN)
# mocked auth scheme
oauth2_scheme = OAuth2AuthorizationCodeBearer(
authorizationUrl=AUTH0_DOMAIN,
tokenUrl=API_AUDIENCE
)
# build auth method
auth = authentication.auth0_token_authenticator_builder(
oauth2_scheme=oauth2_scheme,
auth0_domain=AUTH0_DOMAIN,
api_audience=API_AUDIENCE,
algorithms=ALGORITHMS,
jwks=jwks
)
# build scope verifier
scope_verifier = authentication.scope_verifier_builder(
oauth2_scheme=oauth2_scheme,
required_scope='read:all'
)
# build mocked protected API
api = FastAPI()
@api.get("/test")
def test_endpoint(api_key=Depends(auth)):
return api_key
@api.get("/test/scope")
def test_scope(scope: bool = Depends(scope_verifier)):
return scope
self.client = TestClient(api)
# mocked token
self.token = "generate_a_token_and_put_it_here:)"
def test_authenticate(self):
response = self.client.get(
"/test",
headers={"authorization": f"Bearer {self.token}"}
)
self.assertEqual(status.HTTP_200_OK, response.status_code)
def test_has_required_scope(self):
response = self.client.get(
"/test/scope",
headers={"authorization": f"Bearer {self.token}"}
)
self.assertEqual(status.HTTP_200_OK, response.status_code)
if __name__ == '__main__':
unittest.main()
@jfmrm Obrigado pelo exemplo! I reached a very similar solution that doesn't require the full "builder" aspect.
auth0.py
import json
from fastapi import Depends
from fastapi.security import OAuth2AuthorizationCodeBearer, SecurityScopes
from jose import jwt
from six.moves.urllib.request import urlopen
AUTH0_DOMAIN = "<auth0-domain>"
API_AUDIENCE = "<auth0-api-audience>"
ALGORITHMS = ["RS256"]
class AuthError(Exception):
def __init__(self, error, status_code):
self.error = error
self.status_code = status_code
# Define a Authorization scheme specific to our Auth0 config
auth0_scheme = OAuth2AuthorizationCodeBearer(
authorizationUrl=AUTH0_DOMAIN, tokenUrl=API_AUDIENCE
)
async def get_current_user(security_scopes: SecurityScopes, token: str = Depends(auth0_scheme)):
# This down to `END` comment is from https://auth0.com/docs/quickstart/backend/python/01-authorization#create-the-jwt-validation-decorator
jsonurl = urlopen("https://" + AUTH0_DOMAIN + "/.well-known/jwks.json")
jwks = json.loads(jsonurl.read())
unverified_header = jwt.get_unverified_header(token)
rsa_key = {}
for key in jwks["keys"]:
if key["kid"] == unverified_header["kid"]:
rsa_key = {
"kty": key["kty"],
"kid": key["kid"],
"use": key["use"],
"n": key["n"],
"e": key["e"],
}
if rsa_key:
try:
payload = jwt.decode(
token,
rsa_key,
algorithms=ALGORITHMS,
audience=API_AUDIENCE,
issuer=f"https://{AUTH0_DOMAIN}/",
)
except jwt.ExpiredSignatureError:
raise AuthError(
{"code": "token_expired", "description": "token is expired"}, 401
)
except jwt.JWTClaimsError:
raise AuthError(
{
"code": "invalid_claims",
"description": "incorrect claims,"
"please check the audience and issuer",
},
401,
)
except Exception:
raise AuthError(
{
"code": "invalid_header",
"description": "Unable to parse authentication" " token.",
},
401,
)
# END from Auth0
# token.scope is represented as a string of scopes space seperated
token_scopes = payload.get("scope", "").split()
# Check that we all scopes are present
for scope in security_scopes.scopes:
if scope not in token_scopes:
raise AuthError(
{
"code": "Unauthorized",
"description": f"You don't have access to this resource. `{' '.join(security_scopes.scopes)}` scopes required",
},
403,
)
return payload
Using this is now as simple as
app.py
from fastapi import FastAPI, Security
from .auth0 import AuthError, get_current_user
app = FastAPI()
@app.exception_handler(AuthError)
def handle_auth_error(request: Request, ex: AuthError):
return JSONResponse(status_code=ex.status_code, content=ex.error)
@app.get("/private")
def private(user=Security(get_current_user)):
return {"message": "You're an authorized user"}
@app.get("/private-with-scopes")
def privateScopes(user=Security(get_current_user, scopes["read:example"])):
return {"message": "You're authorized with scopes!"}
@Vivalldi do you have this code anywhere on github or somewhere? I am trying to do something similar but I'm hitting issues on this code.
Thanks
Actually @Vivalldi I got your example working, my only question now is how do you auth on the /docs route to have it send you to Auth0 to login and back or is that even possible? Was just curious how users would use the api via the docs.
The OpenAPI spec doesn't include JWT bearer tokens yet. There's no defined standard as to where those tokens live and as such there's no OpenAPI config for it yet. I might be able to contrive an example where you set the token in the username field of basic auth (without pass) similar to stripe https://stripe.com/docs/api/authentication
@Vivalldi how are you handling your user relations this way? I'm guessing the sub data from the user object is basically the userid so would I just use this when I'm creating records that would have an ownerId for example or how have you been doing that?
This isn't directly related to this issue but in short, yes, you should use the user identifier.
https://auth0.com/docs/users/normalized/auth0/store-user-data
Here is a working Auth0 and FastAPI code snippet.
main.py
from fastapi import Depends, FastAPI
from routers import items
from utils.auth import require_auth
app = FastAPI()
app.include_router(
items.router,
prefix="/items",
tags=["items"],
dependencies=[Depends(require_auth)],
responses={404: {"description": "Not found"}},
)
utils.auth.py
import json
import logging
import os
from fastapi import HTTPException, Header
from six.moves.urllib.request import urlopen
from jose import jwt
from jose import exceptions as JoseExceptions
logging.basicConfig(
format='%(asctime)s %(message)s',
datefmt='%m/%d/%Y %I:%M:%S %p',
level=logging.os.environ.get('LOGGING_LEVEL', 'INFO'))
AUTH0_DOMAIN = os.environ.get(
'AUTH0_DOMAIN', '')
AUTH0_API_AUDIENCE = os.environ.get(
'AUTH0_API_AUDIENCE', '')
AUTH0_ALGORITHMS = os.environ.get(
'AUTH0_ALGORITHMS', 'RS256')
def get_token_auth_header(authorization):
parts = authorization.split()
if parts[0].lower() != "bearer":
raise HTTPException(
status_code=401,
detail='Authorization header must start with Bearer')
elif len(parts) == 1:
raise HTTPException(
status_code=401,
detail='Authorization token not found')
elif len(parts) > 2:
raise HTTPException(
status_code=401,
detail='Authorization header be Bearer token')
token = parts[1]
return token
def get_payload(jwks, unverified_header, token):
rsa_key = {}
payload = None
for key in jwks["keys"]:
if key["kid"] == unverified_header["kid"]:
rsa_key = {
"kty": key["kty"],
"kid": key["kid"],
"use": key["use"],
"n": key["n"],
"e": key["e"]
}
if rsa_key:
try:
payload = jwt.decode(
token,
rsa_key,
algorithms=AUTH0_ALGORITHMS,
audience=AUTH0_API_AUDIENCE,
issuer="https://"+AUTH0_DOMAIN+"/"
)
except jwt.ExpiredSignatureError:
raise HTTPException(
status_code=401,
detail='Authorization token expired')
except jwt.JWTClaimsError:
raise HTTPException(
status_code=401,
detail='Incorrect claims, check the audience and issuer.')
except Exception as e:
logging.warning(e, exc_info=True)
raise HTTPException(
status_code=401,
detail='Unable to parse authentication token')
return payload
async def require_auth(authorization: str = Header(...)):
token = get_token_auth_header(authorization)
jsonurl = urlopen("https://"+AUTH0_DOMAIN+"/.well-known/jwks.json")
jwks = json.loads(jsonurl.read())
try:
unverified_header = jwt.get_unverified_header(token)
except JoseExceptions.JWTError:
raise HTTPException(
status_code=401,
detail='Unable to decode authorization token headers')
payload = get_payload(jwks, unverified_header, token)
if not payload:
raise HTTPException(
status_code=401,
detail='Invalid authorization token')
return payload
routers.items.py
from fastapi import APIRouter, HTTPException
router = APIRouter()
@router.get("/")
async def read_items():
return [{"name": "Item Foo"}, {"name": "item Bar"}]
@router.get("/{item_id}")
async def read_item(item_id: str):
return {"name": "Fake Specific Item", "item_id": item_id}
@router.put(
"/{item_id}",
tags=["custom"],
responses={403: {"description": "Operation forbidden"}},
)
async def update_item(item_id: str):
if item_id != "foo":
raise HTTPException(status_code=403, detail="You can only update the item: foo")
return {"item_id": item_id, "name": "The Fighters"}
@rsitro4 Nice example. How do you use swagger-ui (/docs) with this?
@LindezaGrey did you know how to use it with swagger-ui (/docs)?
@LuisHernandez1611681 well sort of. In auth0 i added another authorization flow and internally used OAuth2PasswordBearer for the docs page. This means that you need to provide also client_secret and client_id. In my use case only admins use the /docs page for data entry and retrieval so it's not a big deal. if the docs page is faces to customers/clients, of course you don't want to provide those credentials...
Most helpful comment
@jfmrm Obrigado pelo exemplo! I reached a very similar solution that doesn't require the full "builder" aspect.
auth0.pyUsing this is now as simple as
app.py