Hello Congrats for this library @miguelgrinberg .
Here trying to adapt original https://github.com/miguelgrinberg/Flask-SocketIO/tree/master/example in app.py and trying to build two background processes in FlaskSocketio.
i) The Server generated 'iddle' that comes already with that code, which pings every 10 seconds (and is working).
ii) I have a code that is working with watchdog to see if files are modified (log.txt) inside a directory. And when it does i need to refresh it on a pre tag (like a dynamic log while server does heavy computing).
The strategy i came up for it was using a global variable dictionary, from these two threads t1, t2 and pass it into socketio thread ie background_process (and with try to even do everything into one process).
Have searched and read many forums and still this kind of functionality was not found in the available applications.
#import libs
#python builtins
import os
import datetime
from datetime import timedelta
#work with processes
import psutil
#work with files
import shutil
from flask import Flask, flash, request, redirect, render_template, Response, escape, jsonify, url_for, session, copy_current_request_context
#socketio
from flask_socketio import SocketIO, send, emit, join_room, leave_room, close_room, rooms, disconnect
import logging
#multithreads
import threading
from threading import Thread, Event, Lock
#implement watchfiles
import sys
import time
from time import sleep
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
#events
import events
#for the uploads
import urllib.request
########## Related to Thread i ################
def fn_counter():
global dict_emit
count = 0
while True:
time.sleep(10)
count += 1
dict_emit = {'data': 'Server generated event', 'count': count}
######################################
##########Related to thread ii##########
#Classes
class Watcher:
def __init__(self):
self.observer = Observer()
def run(self, str_watchdir):
logger.info('=> running background thread for file watchdog in '+INSTANCE_DIR)
event_handler = Handler()
self.observer.schedule(event_handler, str_watchdir, recursive=True)
self.observer.start()
try:
while True:
time.sleep(5)
except:
self.observer.stop()
logger.info("Error")
self.observer.join()
class Handler(FileSystemEventHandler):
@staticmethod
def on_any_event(event):
global dict_emit
global lst_clients
#logger.info('=> on_any_event') works well
#some available events
#event.event_type == 'created' when file created
#event.event_type == 'deleted' when file deleted
#event.event_type == 'moved' when file renamed
#event.event_type == 'modified' when file contents changed
if event.is_directory:
return None
elif event.event_type == 'modified':
# Take any action here when a file is first created.
logger.info('file updated')#+" "+os.path.basename(event.src_path)
dict_emit = {'data': 'Server generated event', 'file updated': os.path.basename(event.src_path)}
####################
async_mode = None
#vars
#root
# |-templates
# |-index.html
# |-static
# |-sessions
# |-instance1
# |-log.txt
# |-...
# |-assets
# |-log.txt
ROOT_DIR = os.getcwd()
TEMPLATES_DIR = os.path.join(ROOT_DIR, 'templates')
STATIC_DIR = os.path.join(ROOT_DIR, 'static')
SESSIONS_DIR = os.path.join(STATIC_DIR,'sessions')
INSTANCE_DIR = os.path.join(SESSIONS_DIR, datetime.datetime.now().strftime('%d-%m-%Y_%H-%M-%S'))
os.makedirs(INSTANCE_DIR)
shutil.copyfile(os.path.join(ASSETS_DIR, "log.txt"),
os.path.join(INSTANCE_DIR, "log.txt"))
#Utilities
def background_thread():
global dict_emit
"""Example of how to send server generated events to clients."""
while True:
time.sleep(4)
if dict_emit:
socketio.emit('my_response',
dict_emit, #{'data': 'Server generated event', 'count': count},
namespace='/test')
dict_emit={}
app = Flask(__name__)
app.config.update(
#built-in parameters
APPLICATION_ROOT ='/', #default is '/'
SECRET_KEY="secret_key", #Default is None or app.config['SECRET_KEY']="secret key"
MAX_CONTENT_LENGTH= 160 * 1024 * 1024, #is defaults None or app.config['MAX_CONTENT_LENGTH'] = 16 * 1024 * 1024 for 160 MB
TEMPLATES_AUTO_RELOAD = False, #defaults None
ENV='Production', #default 'Production'
EXPLAIN_TEMPLATE_LOADING=True, #defaults False
DEBUG=False, #defaults True if ENV='development'
TESTING=False, #default False
PROPAGATE_EXCEPTIONS=None, #default None
PRESERVE_CONTEXT_ON_EXCEPTION=None, #defaults True if Debug True
TRAP_HTTP_EXCEPTIONS=None, #Default False
TRAP_BAD_REQUEST_ERRORS =None, #Defaults None
SESSION_COOKIE_NAME='session', #Defaults session
SESSION_COOKIE_DOMAIN=None, #defaults None
SESSION_COOKIE_PATH=None, #defaults None
#customized parameters
UPLOAD_FOLDER= INSTANCE_DIR
)
#socketio
socketio = SocketIO(app, async_mode=async_mode)
thread = None
thread_lock = Lock()
lst_clients=[] #with this accessing client sid it's possible to manually emit for clients whereas otherwise only is possible from server events of app context only
#configure logger
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def background_thread():
global dict_emit
logger.info("=> Running SocketIO background_thread in 3rd background thread.")
"""Example of how to send server generated events to clients."""
while True:
time.sleep(4)
if dict_emit:
socketio.emit('my_response',
dict_emit, #{'data': 'Server generated event', 'count': count},
namespace='/test')
dict_emit={}
@app.route('/')
def index():
return render_template('index.html', async_mode=socketio.async_mode)
@socketio.on('connect', namespace='/test')
def test_connect():
global thread
global lst_clients
logger.info(' Client connected ' + request.sid)
lst_clients.append(request.sid)
with thread_lock:
if thread is None:
thread = socketio.start_background_task(background_thread)
emit('my_response', {'data': 'Connected', 'count': 0})
if __name__ == '__main__':
global t1
global t2
#threads share same namespace on variables
#launch filewatchdog
w1=Watcher()
# creating threads
t1 = threading.Thread(target=w1.run, args=(INSTANCE_DIR,))
# starting thread 1
t1.start()
logger.info("=> Running watchdog for " + INSTANCE_DIR + " in 1st background thread.")
#launch counter
t2 = threading.Thread(target=fn_counter)
t2.start()
logger.info("=> Running counter in 2nd background thread.")
logger.info('=> Running socket IO in main thread.')
socketio.run(app,
host='localhost',
port=10000,
debug=False) #True sends some exceptions and stops)
<!DOCTYPE HTML>
<html>
<head>
md5-3d7713a72454bb0cd3e117ed08c61228
</head>
</html>
Have not added the function that changes the contents of INSTANCE_DIR log.txt, although it's easy to do it manually while the server is running, simply go to that folder open manually log.txt change it then save.
It will fire the event of file update.
Further ideas which were tested:
-> move the t1 and t2 into background_thread() it rises the latency very
-> move t1 and t2 into def test_connect() does not refresh although server shows webs.
Cheers
Is there a specific problem you wanted to ask about, or just in general about this solution?
If you are just asking in general, I think this solution isn't robust and could potentially suffer from race conditions, I would use a Queue to pass data between the two threads.
thank for feedback, can you share an example with code of this.
Ok here is what i will try, i will build up one Queue and in background it will be picking up n threads dictionary and push it into clients.
One observation that could be in my opinion an improvement on your example repository, would be to add this functionality, since currently only show cases how to send to client upon very specific events (connect, disconnect, join rooms,...) and using a contexted background_thread sending at specific time intervals. Which is a bit limiting when you need to send dynamic information asynchronously.
Example: make 2 different threads that build their own datastream as dictionary > dump this dictionary into a global queue > let background_Transfer read this queue object and send its data to client asynchronosly.
It is worth to check.
cheers
@FrancisBase I don't believe that belongs in this repository. What we are discussing is a standard producer/consumer type solution, there are extensive examples of this elsewhere. For example: https://www.bogotobogo.com/python/Multithread/python_multithreading_Synchronization_Producer_Consumer_using_Queue.php.
@miguelgrinberg so this repository cannot be used in producer/consumer scenarios, are there any disadvantages that i'm not catching?
Have reduced the problem to the following bits of code:
RuntimeError: Working outside of request context.
def fn_i():
global q
while True:
time.sleep(1)
q.put({'key_i':random.random()})
return q
def fn_ii():
global q
while True:
time.sleep(10)
q.put({'key_ii':random.random()})
return q
#THREAD3 - CollectorThread
def background_thread_collector():
global thread1
global thread2
global q
thread1 = threading.Thread(target=fn_i)
thread1.start()
thread2 = threading.Thread(target=fn_ii)
thread2.start()
"""Example of how to send server generated events to clients."""
while True:
time.sleep(0.2)
while not q.empty():
socketio.emit('my_response',
q.get(), #{'data': 'Server generated event', 'count': count},
namespace='/test',
broadcast=True,
callback=ack
)
@app.route('/')
def index():
return render_template('index.html', async_mode=socketio.async_mode)
@socketio.on('connect', namespace='/test')
def test_connect():
global collector_thread
logger.info(' Client connected ' + request.sid)
with thread_lock:
if collector_thread is None:
collector_thread = socketio.start_background_task(background_thread_collector)
emit('my_response', {'data': 'Connected', 'count': 0})
This repository demonstrates Flask-SocketIO, it's not its purpose to demonstrate standard concurrency techniques. I just do not want to make it more complicated by adding producer and consumer threads, because it is a completely unrelated topic.
When you get an error, you need to include the complete error message, including all information about lines of code and the stack trace. Without that context it is impossible for me to tell you what the error is about.
Ok thank you.
Here is:
from flask import Flask, flash, request, redirect, render_template, Response, escape, jsonify, url_for, session, copy_current_request_context
from flask_socketio import SocketIO, send, emit, join_room, leave_room, close_room, rooms, disconnect
import threading
from threading import Thread, Event, Lock
import queue #Queue – A thread-safe FIFO implementation
import events
import time
import random
app = Flask(__name__)
async_mode = None
#socketio
socketio = SocketIO(app, async_mode=async_mode)
thread1=None
thread2=None
collector_thread=None
q = queue.Queue()
thread_lock = Lock()
def ack(value):
if value != 'pong':
logger.info('unexpected return value')
@app.route('/')
def index():
return render_template('index.html', async_mode=socketio.async_mode)
@socketio.on('connect', namespace='/test')
def test_connect():
def fn_i():
#def fn_i(req):
# with app.test_request_context():
# request=req
global q
while True:
time.sleep(1)
q.put({'key_i':random.random()})
return q
def fn_ii():
global q
while True:
time.sleep(10)
q.put({'key_ii':random.random()})
return q
def background_thread_collector():
"""Example of how to send server generated events to clients."""
global q
while True:
time.sleep(0.2)
while not q.empty():
socketio.emit('my_response',
q.get(), #{'data': 'Server generated event', 'count': count},
namespace='/test',
broadcast=True,
callback=ack
)
global collector_thread
global thread1
global thread2
with thread_lock:
if collector_thread is None:
collector_thread = threading.Thread(target=background_thread_collector)
collector_thread.start()
thread1 = threading.Thread(target=fn_i)
thread1.start()
thread2 = threading.Thread(target=fn_ii)
thread2.start()
'''
thread.start_new_thread(fn_i, (request))
thread.start_new_thread(fn_ii, (request))
thread.start_new_thread(background_thread_collector, (request))
'''
return "Thanks"
if __name__ == '__main__':
socketio.run(app,
host='localhost',
port=11000,
debug=False) #True sends some exceptions and stops)
Exception in thread Thread-6:
Traceback (most recent call last):
File "D:\conda\envs\env_py_gis2\lib\threading.py", line 916, in _bootstrap_inner
self.run()
File "D:\conda\envs\env_py_gis2\lib\threading.py", line 864, in run
self._target(*self._args, **self._kwargs)
File "<ipython-input-1-b117df1c8698>", line 58, in background_thread_collector
callback=ack
File "D:\conda\envs\env_py_gis2\lib\site-packages\flask_socketio\__init__.py", line 408, in emit
sid = flask.request.sid
File "D:\conda\envs\env_py_gis2\lib\site-packages\werkzeug\local.py", line 348, in __getattr__
return getattr(self._get_current_object(), name)
File "D:\conda\envs\env_py_gis2\lib\site-packages\werkzeug\local.py", line 307, in _get_current_object
return self.__local()
File "D:\conda\envs\env_py_gis2\lib\site-packages\flask\globals.py", line 38, in _lookup_req_object
raise RuntimeError(_request_ctx_err_msg)
RuntimeError: Working outside of request context.
This typically means that you attempted to use functionality that needed
an active HTTP request. Consult the documentation on testing for
information about how to avoid this problem.
You are emitting an event with broadcast=True and at the same time requesting a callback. Those two options are not compatible, callbacks can be used when you send to a single client.
Ok i have made the changes, and now no error is thrown, and yet server does not respond by updating and no error messages in python (i'm using the index.html file from examples).
from flask import Flask, flash, request, redirect, render_template, Response, escape, jsonify, url_for, session, copy_current_request_context
from flask_socketio import SocketIO, send, emit, join_room, leave_room, close_room, rooms, disconnect
import threading
from threading import Thread, Event, Lock
import queue #queue – A thread-safe FIFO implementation
import events
import time
app = Flask(__name__)
async_mode = None
#socketio
socketio = SocketIO(app, async_mode=async_mode)
thread1=None
thread2=None
collector_thread=None
q = queue.Queue()
thread_lock = Lock()
count=0
@app.route('/')
def index():
return render_template('index.html', async_mode=socketio.async_mode)
@socketio.on('connect', namespace='/test')
def test_connect():
def fn_i():
global q
while True:
time.sleep(1)
q.put({'data':'Server generated event',
'serverthread':'key_i'})
return q
def fn_ii():
global q
while True:
time.sleep(1)
q.put({'data':'Server generated event',
'serverthread':'key_i',
'stream':'any data type supported here'
})
return q
def background_thread_collector():
"""Example of how to send server generated events to clients."""
global q
global count
while True:
while not q.empty():
time.sleep(1)
count+=1
dict_send=q.get()
dict_send['count']=count
socketio.emit('my_response',
{'data': 'Server generated event', 'count': count},
namespace='/test',
broadcast=True
)
global collector_thread
global thread1
global thread2
with thread_lock:
if collector_thread is None:
collector_thread = threading.Thread(target=background_thread_collector)
collector_thread.start()
thread1 = threading.Thread(target=fn_i)
thread1.start()
thread2 = threading.Thread(target=fn_ii)
thread2.start()
return "Thanks"
if __name__ == '__main__':
socketio.run(app,
host='localhost',
port=11000,
debug=False) #True sends some exceptions and stops)
Are you using eventlet or gevent? Those are incompatible with threading and queue, the standard library needs to be monkey patched if you want to use these modules.
hmm, added to code:
from eventlet import monkey_patch as monkey_patch
monkey_patch()
also tried with
import eventlet
eventlet.monkey_patch()
Now no errors are thrown and browser is stuck loading and saying "waiting for localhost" in status bar.
You are once again giving me vague descriptions of problems you are having. Unfortunately it is impossible for me to figure out problems that way.
Eventlet and gevent are not trivial libraries, if you are going to work with threads, queues, locks and the like you have to understand how these libraries work to not run into trouble. My recommendation is that you remove both eventlet and gevent from your system and get your application working in standard Python first. You will lose websocket support, but at least it will be something you are familiar with. Once you have everything working you can look at introducing one of these libraries.
Usually i work with a lot of libraries some are trivial other are complex it is not of course issue. Guess this is a limitation of flasksocketio for not having out of the box queue and threads support namely in the examples (unless using other solutions like Redis that bypass its needs). You already have told it is a design decision, it's ok. For the specific workflow i'm using will use the background_threads() for pinging server and it works. So for this off course i appreciate and i recognize it is very useful.
So all good cheers.
There is absolutely no limitation in Flask-SocketIO, which does not have the goal of providing synchronization primitives. The only goal of this library is to give you the Socket.IO protocol, nothing else. If you are using eventlet alongisde Flask-SocketIO then you have to use the synchronization primitives offered by eventlet or that are compatible with it. Likewise with gevent.
If not a limitation is something which not very clear, it's not about eventlet it's only about sending the contents of a queue of dictionaries into clients when this queue has elements.
Although now it's clear that it is not in examples and it requires some tunning which in this case have solution so is ok.
You seem to be thinking that your problem is with Socket.IO, but I believe you are wrong. Your problem is that the synchronization primitives that you are using are either incorrect for your async framework, or else the way you are actually using these primitives is wrong.
My suggestion still stands. Remove eventlet/gevent from the picture, and get everything to work with standard Python first. It'll be a lot easier to move everything to an async framework once you know that your code is debugged and working.
No you are wrong because it works, so guess you didn't follow well or i wasn't clear. Only issue which actually is not even an issue, was that was not very clear when started to work with the library of what were its features. It is ok, because any library requires for developers to get acquainted first so i guess it is normal. With this input which you gave, and which i thank you for i now know how to use the library.
Really code is working and all is ok now, you can close the issue as resolved. Cheers.