Hi, experts.
I want to create a global connection pool to Redis when the application starts using aioredis. I cannot use the startup event because I need to create a global variable. I was trying to do something like that:
from fastapi import FastAPI
from connections import redis
app = FastAPI()
@app.on_event('shutdown')
async def shutdown_event():
redis.close()
await redis.wait_closed()
@app.get('/')
async def redis_keys():
rkey_list = await redis.keys('*')
return rkey_list
import aioredis
import asyncio
async def get_redis_pool() -> aioredis.Redis:
redis = await aioredis.create_redis_pool("redis://localhost:6379/0?encoding=utf-8")
return redis
redis: aioredis.Redis = asyncio.run(get_redis_pool())
and got an error RuntimeError: asyncio.run() cannot be called from a running event loop.
Is there any way I can run a coroutine when starting an application to create a global variable?
Is there a reason you want to use global variable instead of putting the redis in the app.state?
Is there a reason you want to use global variable instead of putting the redis in the app.state?
Yes, there is. I need to use the connection pool not only in path operation functions. And when you import app into another module (to access app.state.redis), an error occurs when launching the application:
Error loading ASGI app. Could not import module "main"..
from fastapi import FastAPI
from connections import get_redis_pool
from red import get_all
app = FastAPI()
@app.on_event('startup')
async def starup_event():
app.state.redis = await get_redis_pool()
@app.on_event('shutdown')
async def shutdown_event():
app.state.redis.close()
await app.state.redis.wait_closed()
@app.get('/')
async def redis_keys():
return await get_all
from main import app
async def get_all():
return await app.state.redis.keys('*')
import aioredis
async def get_redis_pool() -> aioredis.Redis:
redis = await aioredis.create_redis_pool("redis://localhost:6379/0?encoding=utf-8")
return redis
@asviel
Hi, You're having a circular import problem. Your red.py depends on main.py and vice versa.
from aioredis import create_redis_pool, Redis
async def get_redis_pool() -> Redis:
redis = await create_redis_pool("redis://localhost:6379/0?encoding=utf-8")
return redis
from fastapi import FastAPI
from connections import get_redis_pool
from uvicorn import run
app = FastAPI()
async def get_all():
return await app.state.redis.keys('*')
@app.on_event('startup')
async def starup_event():
app.state.redis = await get_redis_pool()
@app.on_event('shutdown')
async def shutdown_event():
app.state.redis.close()
await app.state.redis.wait_closed()
@app.get('/')
async def redis_keys():
return get_all()
if __name__ == '__main__':
run("main:app", port=3000, reload=True)
@MacMacky Yes, that's how I showed you why I can't use the solution suggested above.
@asviel you've probably solved this by now, but here is a working solution:
from typing import Optional
from aioredis import Redis, create_redis_pool
class RedisCache:
def __init__(self):
self.redis_cache: Optional[Redis] = None
async def init_cache(self):
self.redis_cache = await create_redis_pool("redis://localhost:6379/0?encoding=utf-8")
async def keys(self, pattern):
return await self.redis_cache.keys(pattern)
async def set(self, key, value):
return await self.redis_cache.set(key, value)
async def get(self, key):
return await self.redis_cache.get(key)
async def close(self):
self.redis_cache.close()
await self.redis_cache.wait_closed()
redis_cache = RedisCache()
from fastapi import FastAPI
from uvicorn import run
from connections import redis_cache
app = FastAPI()
async def get_all():
return await redis_cache.keys('*')
@app.on_event('startup')
async def starup_event():
await redis_cache.init_cache()
@app.on_event('shutdown')
async def shutdown_event():
redis_cache.close()
await redis_cache.wait_closed()
@app.get('/')
async def redis_keys():
return await get_all()
if __name__ == '__main__':
run("main:app", port=3000, reload=True)
This allows you to import redis_cache in other modules without any problems.