Fastapi: [QUESTION] How to run the return variable coroutine before starting the application?

Created on 8 Jul 2020  路  5Comments  路  Source: tiangolo/fastapi

Hi, experts.

I want to create a global connection pool to Redis when the application starts using aioredis. I cannot use the startup event because I need to create a global variable. I was trying to do something like that:

main.py

from fastapi import FastAPI
from connections import redis


app = FastAPI()


@app.on_event('shutdown')
async def shutdown_event():
    redis.close()
    await redis.wait_closed()


@app.get('/')
async def redis_keys():
    rkey_list = await redis.keys('*')

    return rkey_list

connections.py

import aioredis
import asyncio


async def get_redis_pool() -> aioredis.Redis:
    redis = await aioredis.create_redis_pool("redis://localhost:6379/0?encoding=utf-8")

    return redis


redis: aioredis.Redis = asyncio.run(get_redis_pool())

and got an error RuntimeError: asyncio.run() cannot be called from a running event loop.

Is there any way I can run a coroutine when starting an application to create a global variable?

question

All 5 comments

Is there a reason you want to use global variable instead of putting the redis in the app.state?

Is there a reason you want to use global variable instead of putting the redis in the app.state?

Yes, there is. I need to use the connection pool not only in path operation functions. And when you import app into another module (to access app.state.redis), an error occurs when launching the application:
Error loading ASGI app. Could not import module "main"..

main.py

from fastapi import FastAPI
from connections import get_redis_pool
from red import get_all


app = FastAPI()


@app.on_event('startup')
async def starup_event():
    app.state.redis = await get_redis_pool()


@app.on_event('shutdown')
async def shutdown_event():
    app.state.redis.close()
    await app.state.redis.wait_closed()


@app.get('/')
async def redis_keys():
    return await get_all

red.py

from main import app


async def get_all():
    return await app.state.redis.keys('*')

connections.py

import aioredis


async def get_redis_pool() -> aioredis.Redis:
    redis = await aioredis.create_redis_pool("redis://localhost:6379/0?encoding=utf-8")

    return redis

@asviel
Hi, You're having a circular import problem. Your red.py depends on main.py and vice versa.

connections.py

from aioredis import create_redis_pool, Redis


async def get_redis_pool() -> Redis:
    redis = await create_redis_pool("redis://localhost:6379/0?encoding=utf-8")

    return redis

main.py

from fastapi import FastAPI
from connections import get_redis_pool
from uvicorn import run

app = FastAPI()


async def get_all():
    return await app.state.redis.keys('*')


@app.on_event('startup')
async def starup_event():
    app.state.redis = await get_redis_pool()


@app.on_event('shutdown')
async def shutdown_event():
    app.state.redis.close()
    await app.state.redis.wait_closed()


@app.get('/')
async def redis_keys():
    return get_all()


if __name__ == '__main__':
    run("main:app", port=3000, reload=True)

@MacMacky Yes, that's how I showed you why I can't use the solution suggested above.

@asviel you've probably solved this by now, but here is a working solution:

connections.py

from typing import Optional

from aioredis import Redis, create_redis_pool


class RedisCache:

    def __init__(self):
        self.redis_cache: Optional[Redis] = None

    async def init_cache(self):
        self.redis_cache = await create_redis_pool("redis://localhost:6379/0?encoding=utf-8")

    async def keys(self, pattern):
        return await self.redis_cache.keys(pattern)

    async def set(self, key, value):
        return await self.redis_cache.set(key, value)

    async def get(self, key):
        return await self.redis_cache.get(key)

    async def close(self):
        self.redis_cache.close()
        await self.redis_cache.wait_closed()


redis_cache = RedisCache()

main.py

from fastapi import FastAPI
from uvicorn import run

from connections import redis_cache

app = FastAPI()


async def get_all():
    return await redis_cache.keys('*')


@app.on_event('startup')
async def starup_event():
    await redis_cache.init_cache()


@app.on_event('shutdown')
async def shutdown_event():
    redis_cache.close()
    await redis_cache.wait_closed()


@app.get('/')
async def redis_keys():
    return await get_all()


if __name__ == '__main__':
    run("main:app", port=3000, reload=True)

This allows you to import redis_cache in other modules without any problems.

Was this page helpful?
0 / 5 - 0 ratings