Channels: send messages periodically 10 seg

Created on 7 Apr 2018  路  4Comments  路  Source: django/channels

Hi
I'm using django channels 2 my question is can I send messages to a group every few minutes? see images. what I want is:

  1. socket active
  2. the customer makes a connection and requests data
  3. this data is received in the backend
    3.1 are stored in Redis
  4. the stored data that Redis sends to that customer

What I'm looking for is for point 4 to be sent every 10 seconds to the client.

Hola
Estoy usando django channels 2 mi pregunta es 隆驴puedo enviar mensajes a un grupo cada lapso de tiempo ? ver imagenes. lo que quiero es:

  1. socket activo
  2. el cliente realiza una conexi贸n y solicitud de datos
  3. estos datos se reciben en el backend
    3.1 se almacenan en Redis
  4. los datos almacenados que Redis env铆a a ese cliente

Lo que estoy buscando es que el punto 4 sea enviado cada 10 segundos al cliente.

django channels_en
django channels

mi codigo en Consumer:
my code in Consumer:

json
import re
import redis
from channels.generic.websocket import (AsyncWebsocketConsumer)
from django.conf import settings
from django.core.paginator import Paginator, EmptyPage
`
conexion = redis.Redis(
    host='settings.REDIS_SERVER',
    port=settings.REDIS_PORT,
    password=settings.REDIS_PASS,
    charset="utf-8",
    decode_responses=True,
)
p = conexion.pipeline()

class ActiveCallsConsumer(AsyncWebsocketConsumer):
    async def connect(self):
        if self.scope["user"].is_anonymous:
            await self.close()
        else:
            self.user = self.scope['user'].username
            usuario = self.user
            # Join room group
            await self.channel_layer.group_add(
                self.user,
                self.channel_name
            )

            await self.accept()

            if str(usuario) not in conexion.lrange("usuarios", "0", "-1"):
                conexion.lpush("usuarios", usuario)
                conexion.hset(usuario, 'paginacion', 50)
                conexion.hset(usuario, 'pag_now', 1)
                conexion.hset(usuario, 'Destino', "")
                conexion.hset(usuario, 'Porta', "")
                conexion.hset(usuario, 'Indicativo', "")

            message = "-/-50-/-1-/--/--/--/--/--/"

            message = json.dumps(message)
            await self.channel_layer.group_send(
                self.user,
                {
                    'type': 'receive',
                    'message': message
                })

    # Receive message from WebSocket
    async def receive(self, text_data):
        self.user = self.scope['user'].username

        if type(text_data) == dict:
            text_data = json.dumps(text_data)

        text_data_json = json.loads(text_data)
        message = text_data_json['message']

        def exprecion_regular(dato):
            exprecion = ("[0-9]+-+[0-9]+$")
            if (re.match(exprecion, dato)):
                data = dato.split("-")
                if data[0] <= data[1]:
                    dato = dato
                else:
                    dato = ""
            else:
                dato = ""
            return dato

        response_msg = message
        msj = response_msg.split("-/-")
        user = self.user

        if msj[1] == "" or int(msj[1]) <= 0:
            paginacion = 25
        else:
            paginacion = msj[1]

        pag_now = msj[2]
        Destino = msj[3]
        Porta = msj[4]
        Indicativo = msj[5]

        conexion.hset(self.user, 'paginacion', paginacion)
        conexion.hset(self.user, 'pag_now', pag_now)
        conexion.hset(self.user, 'Destino', Destino)
        conexion.hset(self.user, 'Porta', Porta)
        conexion.hset(self.user, 'Indicativo', Indicativo)

        # Send message to room group with name username

        await self.channel_layer.group_send(
            self.user,
            {
                'type': 'rta_message',
                'message': message
            }
        )

    async def rta_message(self, event):
        """
        Recibe message from room group
        """
        print("recivido en rta_message")
        message = event['message']

        # Send message to WebSocket
        usuario = self.user

        if usuario in conexion.lrange("usuarios", 0, -1):
            calls_conect = 0
            calls_total = 0
            origin_data = []
            data = []

            paginas = conexion.hget(usuario, "paginacion")
            if not paginas:
                paginas = 1

            pagina_ahora = conexion.hget(usuario, "pag_now")
            if not pagina_ahora:
                pagina_ahora = 1

            for c in conexion.scan(0, '*-*-*-*-*', 1000000)[1]:
                p.hgetall(c)

            data = p.execute()

            for x in data:
                if "Answer" in x:
                    calls_total = calls_total + 1
                    if int(x["Answer"]) > 0:
                        calls_conect = calls_conect + 1

            setup = calls_total - calls_conect
            pa = Paginator(data, paginas)
            row_total = pa.count
            pags_total = pa.num_pages

            try:
                if pagina_ahora == int and pags_total == int:
                    if int(pagina_ahora) > int(pags_total):
                        page = pa.page(int(pags_total))
                    else:
                        page = pa.page(int(pagina_ahora))
                elif pags_total == int:
                    page = pa.page(int(pags_total))
                elif pagina_ahora.isdigit():
                    try:
                        page = pa.page(int(pagina_ahora))
                    except EmptyPage:
                        page = pa.page(int(pags_total))
                else:
                    page = pa.page(int(1))
            except Exception as e:
                page = pa.page(1)

            pag_now = page.number  # page now

            data = page.object_list

            msg = {
                'data': data,
                'pag_now': pag_now,
                'row_total': row_total,
                'calls_total': calls_total,
                'calls_conect': calls_conect,
                'setup': setup,
                'pags_total': pags_total,
                'user': usuario
            }
            print("Codificando ")
            await self.send(text_data=json.dumps(msg))
            print("Enviado")

    async def disconnect(self, close_code):
        """
         si se desconecta el socket este elimina el grupo seg煤n el usuario
        desconectado
        """
        self.user = self.scope['user'].username
        # Leave room group
        await self.channel_layer.group_discard(
            self.user,
            self.channel_name
        )

Most helpful comment

To send something every n seconds you need somewhere to send it from - probably a separate process. Once you have that, you can just have a loop of sleeping and then sending.

You could also use a native-async Consumer class and use await asyncio.sleep(4) in the body of the consumer somewhere, but this MUST be with an async variant, and not a normal consumer.

All 4 comments

To send something every n seconds you need somewhere to send it from - probably a separate process. Once you have that, you can just have a loop of sleeping and then sending.

You could also use a native-async Consumer class and use await asyncio.sleep(4) in the body of the consumer somewhere, but this MUST be with an async variant, and not a normal consumer.

OK Django Channels sends the messages to the successful group.

async def receive(self, text_data):
    self.user = self.scope['user'].username

    if type(text_data) == dict:
        text_data = json.dumps(text_data)

    text_data_json = json.loads(text_data)
    message = text_data_json['message']

    def exprecion_regular(dato):
        exprecion = ("[0-9]+-+[0-9]+$")
        if (re.match(exprecion, dato)):
            data = dato.split("-")
            if data[0] <= data[1]:
                dato = dato
            else:
                dato = ""
        else:
            dato = ""
        return dato

    response_msg = message
    msj = response_msg.split("-/-")
    user = self.user

    if msj[1] == "" or int(msj[1]) <= 0:
        paginacion = 25
    else:
        paginacion = msj[1]

    pag_now = msj[2]
    Destino = msj[3]
    Porta = msj[4]
    Indicativo = msj[5]

    conexion.hset(self.user, 'paginacion', paginacion)
    conexion.hset(self.user, 'pag_now', pag_now)
    conexion.hset(self.user, 'Destino', Destino)
    conexion.hset(self.user, 'Porta', Porta)
    conexion.hset(self.user, 'Indicativo', Indicativo)

    # Send message to room group with name username
    While True:
        await self.channel_layer.group_send(
            self.user,
            {
                'type': 'rta_message',
                'message': message
                }
                )
        await asyncio.sleep(10)

but when the customer makes new requests by means of:

var user=data{'info'};  
socket.send(JSON.stringify({
       message':"-/-50-/-1-/--/--/--/--/--/--/--/"
     }));

such requests do not arrive at the received one, and it is to be understood that in the received one a loop is running,

how can i process these requests so that when i receive them i send the current information each period in a loop of 10 sec ---> and when the client sends a request it will be updated and follow the loop but with the new information requested by the client?

@andrewgodwin Like you said, I made an async variant.

async def isalive(self):
    while True:
        await self.channel_layer.group_send('status', {'type': 'status.message', 'message': 'Hi bob'})
        await asyncio.sleep(5)

async def accept(self, subprotocol=None):
    """
    Accepts an incoming socket
    """
    await super().accept(subprotocol=subprotocol)
    await self.isalive()

This fails to send messages to the group for every 5 secs where as the below code does sends message to the group without any problem (but i can't achieve looping).

async def isalive(self):
       await self.channel_layer.group_send('status', {'type': 'status.message', 'message': 'Hi bob'})

This occurs only on sending group messages through while loop. But we use normal self.send_json within while loop, it works.

async def isalive(self):
        while True:
            await self.send_json(
                 {'message': '{}'.format('hi')}
            )
           await asyncio.sleep(5)

This does sends the message "Hi" for every 5 secs. If the while loop does the blocking then how the later one works.

I can able to solve the above problem using asyncio.ensure_future func

Was this page helpful?
0 / 5 - 0 ratings