Hi,
I'm writing an application using sanic with multiple workers.
There's a piece of data I want to share between workers. Each worker can update to this, and I want other workers to have the updated information as well.
Is there any way to have such sharing between workers?
Thanks
Hi!
I use multiprocessing.Manager for the same task:
Managers provide a way to create data which can be shared between different processes, including sharing over a network between processes running on different machines. A manager object controls a server process which manages shared objects. Other processes can access the shared objects by using proxies.
In other words, multiprocessing.Manager() creates another process parallel to the workers, which stores all shared data. But this approach requires some configuring in Sanic, and my decision is not perfect and has one hack (about it below). Perhaps someone will tell you (and me) how to implement it more elegantly.
First, create a classes SharedDataManager and MultiProcessLock (in my case in a file core/middlewares/multiprocessing.py):
# File "core/middlewares/multiprocessing.py"
from copy import copy
import multiprocessing
# Field indexes in SharedDataManager._data_dict
_DATA = 0
_LOCK = 1
class SharedDataManager:
def __init__(self, other_shared_data_mgr=None):
if other_shared_data_mgr is not None:
self._manager = other_shared_data_mgr._manager
self._data_dict = other_shared_data_mgr._data_dict
else:
self._manager = multiprocessing.Manager()
self._data_dict = self._manager.dict() # Create a shared dict for data
# self._data_dict structure:
# {"some_name": self._manager.list([data_value, self._manager.RLock()])}
def get(self, name, default_value=None):
"""Get value (copy) by name."""
data = self._data_dict.get(name, None)
if data is not None:
return data[_DATA]
return copy(default_value)
def set(self, name, value):
"""Set value by name."""
if name in self._data_dict:
self._data_dict[name][_DATA] = value
else:
self._data_dict[name] = self._manager.list((value, self._manager.RLock()))
def lock(self, name):
"""Get a lock on the resource by name (with a wait if the lock is already captured).
Returns ContextManager-object."""
if name not in self._data_dict:
self._data_dict[name] = self._manager.list((None, self._manager.RLock()))
return MultiProcessLock(self._data_dict[name][_LOCK], True)
def try_lock(self, name):
"""Get a lock on the resource by name (without waiting, if the lock is already captured).
Returns ContextManager-object."""
if name not in self._data_dict:
self._data_dict[name] = self._manager.list((None, self._manager.RLock()))
return MultiProcessLock(self._data_dict[name][_LOCK], False)
class MultiProcessLock:
__slots__ = ("_rlock", "_blocking", "_acquired")
def __init__(self, rlock, blocking):
self._rlock = rlock
self._blocking = blocking
self._acquired = False
@property
def acquired(self):
return self._acquired
def __enter__(self):
self._acquired = self._rlock.acquire(blocking=self._blocking)
return self
def __exit__(self, exc_type, exc_val, exc_tb):
if self._acquired:
self._rlock.release()
Then, in the place where you initialize Sanic app:
from sanic import Sanic
from core.middlewares.multiprocessing import SharedDataManager
app = Sanic(__name__)
if not hasattr(app, "shared"):
app.shared = SharedDataManager() # Create a 'shared' field in Sanic app object.
Naturally, core.middlewares.multiprocessing must be replaced with your module.
Now the shared data can be accessed from anywhere in the program:
with app.shared.lock("my_data"): # Acquiring a lock to data
data = app.shared.get("my_data") # Getting data
if data is not None:
data.append("new value") # Modifying the data. In this example, 'data' is a list
app.shared.set("my_data", data) # Setting a new value (under the same lock as getting data)
But for this to work, SharedDataManager must be created before the creation of workers. Because multiprocessing.Manager() creates own process for hold a shared data. And the object from multiprocessing.Manager() should be the same for all workers (SharedDataManager._manager in my case).
For this I create my custom Gunicorn worker class:
# File "core/run/gunicorn/worker.py"
from sanic.worker import GunicornWorker
from core.middlewares.multiprocessing import SharedDataManager
class CoreWorker(GunicornWorker):
def __init__(self, *args, **kwargs):
self._shared = None
super().__init__(*args, **kwargs)
def create_shared_data(self, server_shared=None):
self._shared = SharedDataManager(server_shared)
return self._shared
def inject_shared_data(self):
# Inject SharedDataManager into app.shared
sanic_app_obj = self.app.callable
if sanic_app_obj is not None:
sanic_app_obj.shared = self._shared
And custom Gunicorn config file:
# core/run/gunicorn/config.py
workers = 4
timeout = 30
bind = "0.0.0.0:8000"
worker_class = "core.run.gunicorn.worker.CoreWorker"
def pre_fork(server, worker):
if hasattr(server, "_sanic_shared_data"):
# SharedDataManager is already created
worker.create_shared_data(server._sanic_shared_data)
else:
# SharedDataManager is not created, create a new one
server._sanic_shared_data = worker.create_shared_data()
def post_fork(server, worker):
# Hack:
# Remove _exit_function, added by the SharedDataManager._manager
# inside worker.create_shared_data().
# Otherwise, we get the exception "AssertionError: can only
# join a child process" at the end of the worker process.
import atexit
import multiprocessing.util
atexit.unregister(multiprocessing.util._exit_function)
def post_worker_init(worker):
# Inject SharedDataManager into app.shared
worker.inject_shared_data()
And the last thing, I run Gunicorn by passing the path to the config file:
/usr/bin/gunicorn -c core/run/gunicorn/config.py wsgi:app
I hope my decision will help you. If you have questions, ask.
I think the Sanic Plugins Framework has a way to attach shared objects to an app without making a 'shared' attribute on the app as you do here -- that seems fragile to me since something else could decide to use the name 'shared'. See https://github.com/ashleysommer/sanicpluginsframework and specifically the contextualize plugin within that project.
Thanks a lot @m-equinox and @garyo, I'll try it out and let you know.
Just a note: I was working in this same question a couple of days ago and I realized that this kind of thing (sharing states between workers) should be production specific to the scenario one wants to accomplish. You can share states between workers in the same machine (and OS / memory space), but in a clustered scenario (let's say using docker or kubernetes), this will not work (perhaps using some kind of page file with mmap might even work, but bound this to the underlying FS and ... Argh, this just don't make my mind). Anyway, just a tip :wink:
Hmmm getting into complicated territory here with shared state. Have a look at : https://github.com/jreese/aiomultiprocess.
Why not use something like redis for accessing a shared state?
@c-goosen, I think, aiomultiprocess still not a bit about the current task.
Personally, I use "sharing resource between workers" only for caching. And this cache much faster (because local memory), than Redis, or any other out-of-server DB :)
I see. Makes sense. Are you sharing a read-only cache or locking on process manipulation of variable?
@c-goosen
Why not use something like redis for accessing a shared state?
Agreed :) For simple information or simpler data objects, of course. For more complex objects structures, well ... There might be a lot of options, depending on your production environment (I guess everything depends on that).
@m-equinox , thoughts?
@c-goosen, my cache is read-write, and it locks access to variable during the update process. But this only happens if the cache misses, and should not be too often.
@vltr, I think that there should be a comprehensive approach to the caching. I usually use local memory as the first level cache (fast), and the Redis cluster as the second level cache (more slow).
I'm going to weigh in and then close.
While I like the idea of a comprehensive approach to caching, it really depends on architecture and design decisions. In the idea of trying to keep sanic lightweight an only opinionated about itself, caching stands somewhat outside of the expected purview. I think it could especially be handled with possible decorators in order to handle tiered caching strategies or shared caching strategies where necessary.
Which isn't to say that there couldn't be advice about cache fallthrough, but at its core sanic isn't responsible for that right now.
@sjsadowski I agree. I even think that this topic leans to provide examples in the project website rather than stick to one specific solution "from Sanic", after all, there may be many solutions for many scenarios.