Here's a self-contained, minimal, reproducible, example with my use case:
from fastapi import FastAPI, HTTPException, Request
import app.SAML_Interface as SAML_Interface
from starlette.responses import RedirectResponse
@app.post("/acs")
async def acs(request: Request):
try:
form = await request.form()
samlResponse = form["SAMLResponse"]
#ACS URL listed here:
strAcsUrl = 'http://localhost:6321/acs'
if SAML_Interface.SAML_Response.ParseSAMLResponse(strAcsUrl,samlResponse)[0]:
userId = SAML_Interface.SAML_Response.ParseSAMLResponse(strAcsUrl,samlResponse)[1]
else:
raise HTTPException(status_code=401)
except KeyError:
raise HTTPException(status_code=401)
@app.post("/login")
def login():
strAcsUrl = 'http://localhost:6321/acs'
#Issuer URL listed here:
strIssuer = '***********************'
#Sign In URL listed here:
strSingleSignOnURL = '**********************************'
return RedirectResponse('{uIDPUrl}?{bParams}'.format(uIDPUrl = strSingleSignOnURL, bParams = SAML_Interface.SAML_Request.GetSAMLRequest(strAcsUrl, strIssuer)))
I am currently trying to implement SAML auth for my API. The basic workflow is that the user will visit /login of course this endpoints redirects to the SAML url and upon return users are passed to the /acs/ endpoint. I have the login working and I am getting data about the user returned (ie userId variable). My question is how can I use the data that is returned to secure my endpoints? I am basically trying to do something like if a user utilizes another endpoint, say /home/ but userId is None then raise HTTPException(status_code=401).
FastAPI Version: 0.61.0
Python version: 3.7
I would like to be able to use credentials and the user roles return from SAML to create multi-tenancy for my APIs data (ie users can only see data they create).
from fastapi import FastAPI, HTTPException, Request
import app.SAML_Interface as SAML_Interface
from starlette.responses import RedirectResponse
from fastapi.security import APIKeyCookie
cookie_sec = APIKeyCookie(name="session")
secret_key = 'key'
def get_current_user(session: str = Depends(cookie_sec)):
try:
sess = jwt.decode(session, secret_key)
return sess['userId']
except Exception:
raise HTTPException(
status_code=401, detail="Invalid authentication"
)
@app.post("/acs")
async def acs(request: Request):
try:
form = await request.form()
samlResponse = form["SAMLResponse"]
#ACS URL listed here:
strAcsUrl = 'http://localhost:6321/acs'
if SAML_Interface.SAML_Response.ParseSAMLResponse(strAcsUrl,samlResponse)[0]:
userId = SAML_Interface.SAML_Response.ParseSAMLResponse(strAcsUrl,samlResponse)[1]
else:
raise HTTPException(status_code=401)
except KeyError:
raise HTTPException(status_code=401)
@app.post("/login")
def login():
strAcsUrl = 'http://localhost:6321/acs'
#Issuer URL listed here:
strIssuer = '***********************'
#Sign In URL listed here:
strSingleSignOnURL = '**********************************'
return RedirectResponse('{uIDPUrl}?{bParams}'.format(uIDPUrl = strSingleSignOnURL, bParams = SAML_Interface.SAML_Request.GetSAMLRequest(strAcsUrl, strIssuer)))
@app.post('/private')
@app.get('/private')
def read(session: str = Depends(get_current_user)):
return session
Never mind, I did figure things out. I just encoded the data and stored the object in APIKeyCookie then make subsequent endpoints depend on the get_current_user function. Hope this helps someone else.
Thanks for reporting back and closing the issue :+1:
Most helpful comment
Never mind, I did figure things out. I just encoded the data and stored the object in
APIKeyCookiethen make subsequent endpoints depend on theget_current_userfunction. Hope this helps someone else.