Fastapi: SAML Secured Endpoints

Created on 24 Aug 2020  路  2Comments  路  Source: tiangolo/fastapi

Example

Here's a self-contained, minimal, reproducible, example with my use case:


from fastapi import  FastAPI, HTTPException, Request
import app.SAML_Interface as SAML_Interface
from starlette.responses import RedirectResponse

@app.post("/acs")
async def acs(request: Request):
    try:
        form = await request.form()
        samlResponse = form["SAMLResponse"]
        #ACS URL listed here:
        strAcsUrl = 'http://localhost:6321/acs'


        if SAML_Interface.SAML_Response.ParseSAMLResponse(strAcsUrl,samlResponse)[0]:

           userId = SAML_Interface.SAML_Response.ParseSAMLResponse(strAcsUrl,samlResponse)[1]


        else:
            raise HTTPException(status_code=401)
    except KeyError:
        raise HTTPException(status_code=401)

@app.post("/login")
def login():


    strAcsUrl = 'http://localhost:6321/acs'
    #Issuer URL listed here:
    strIssuer = '***********************'
    #Sign In URL listed here:
    strSingleSignOnURL = '**********************************'  
    return RedirectResponse('{uIDPUrl}?{bParams}'.format(uIDPUrl = strSingleSignOnURL, bParams = SAML_Interface.SAML_Request.GetSAMLRequest(strAcsUrl, strIssuer)))

Description

I am currently trying to implement SAML auth for my API. The basic workflow is that the user will visit /login of course this endpoints redirects to the SAML url and upon return users are passed to the /acs/ endpoint. I have the login working and I am getting data about the user returned (ie userId variable). My question is how can I use the data that is returned to secure my endpoints? I am basically trying to do something like if a user utilizes another endpoint, say /home/ but userId is None then raise HTTPException(status_code=401).

Environment

  • OS: Windows 10
  • FastAPI Version: 0.61.0

  • Python version: 3.7

Additional Context

I would like to be able to use credentials and the user roles return from SAML to create multi-tenancy for my APIs data (ie users can only see data they create).

question

Most helpful comment

from fastapi import  FastAPI, HTTPException, Request
import app.SAML_Interface as SAML_Interface
from starlette.responses import RedirectResponse
from fastapi.security import APIKeyCookie

cookie_sec = APIKeyCookie(name="session")
secret_key = 'key'

def get_current_user(session: str = Depends(cookie_sec)):
    try:
        sess = jwt.decode(session, secret_key)
        return sess['userId']
    except Exception:
        raise HTTPException(
            status_code=401, detail="Invalid authentication"
        )


@app.post("/acs")
async def acs(request: Request):
    try:
        form = await request.form()
        samlResponse = form["SAMLResponse"]
        #ACS URL listed here:
        strAcsUrl = 'http://localhost:6321/acs'


        if SAML_Interface.SAML_Response.ParseSAMLResponse(strAcsUrl,samlResponse)[0]:

           userId = SAML_Interface.SAML_Response.ParseSAMLResponse(strAcsUrl,samlResponse)[1]


        else:
            raise HTTPException(status_code=401)
    except KeyError:
        raise HTTPException(status_code=401)

@app.post("/login")
def login():


    strAcsUrl = 'http://localhost:6321/acs'
    #Issuer URL listed here:
    strIssuer = '***********************'
    #Sign In URL listed here:
    strSingleSignOnURL = '**********************************'  
    return RedirectResponse('{uIDPUrl}?{bParams}'.format(uIDPUrl = strSingleSignOnURL, bParams = SAML_Interface.SAML_Request.GetSAMLRequest(strAcsUrl, strIssuer)))


@app.post('/private')
@app.get('/private')
def read(session: str = Depends(get_current_user)):
    return session

Never mind, I did figure things out. I just encoded the data and stored the object in APIKeyCookie then make subsequent endpoints depend on the get_current_user function. Hope this helps someone else.

All 2 comments

from fastapi import  FastAPI, HTTPException, Request
import app.SAML_Interface as SAML_Interface
from starlette.responses import RedirectResponse
from fastapi.security import APIKeyCookie

cookie_sec = APIKeyCookie(name="session")
secret_key = 'key'

def get_current_user(session: str = Depends(cookie_sec)):
    try:
        sess = jwt.decode(session, secret_key)
        return sess['userId']
    except Exception:
        raise HTTPException(
            status_code=401, detail="Invalid authentication"
        )


@app.post("/acs")
async def acs(request: Request):
    try:
        form = await request.form()
        samlResponse = form["SAMLResponse"]
        #ACS URL listed here:
        strAcsUrl = 'http://localhost:6321/acs'


        if SAML_Interface.SAML_Response.ParseSAMLResponse(strAcsUrl,samlResponse)[0]:

           userId = SAML_Interface.SAML_Response.ParseSAMLResponse(strAcsUrl,samlResponse)[1]


        else:
            raise HTTPException(status_code=401)
    except KeyError:
        raise HTTPException(status_code=401)

@app.post("/login")
def login():


    strAcsUrl = 'http://localhost:6321/acs'
    #Issuer URL listed here:
    strIssuer = '***********************'
    #Sign In URL listed here:
    strSingleSignOnURL = '**********************************'  
    return RedirectResponse('{uIDPUrl}?{bParams}'.format(uIDPUrl = strSingleSignOnURL, bParams = SAML_Interface.SAML_Request.GetSAMLRequest(strAcsUrl, strIssuer)))


@app.post('/private')
@app.get('/private')
def read(session: str = Depends(get_current_user)):
    return session

Never mind, I did figure things out. I just encoded the data and stored the object in APIKeyCookie then make subsequent endpoints depend on the get_current_user function. Hope this helps someone else.

Thanks for reporting back and closing the issue :+1:

Was this page helpful?
0 / 5 - 0 ratings