Django使用Channels实现WebSocket

来自:运维咖啡吧(微信号:SRE724),作者:37丫37

WebSocket是什么?

WebSocket是一种在单个TCP连接上进行全双工通讯的协议。WebSocket允许服务端主动向客户端推送数据。在WebSocket协议中,客户端浏览器和服务器只需要完成一次握手就可以创建持久性的连接,并在浏览器和服务器之间进行双向的数据传输。

WebSocket有什么用?

WebSocket区别于HTTP协议的一个最为显著的特点是,WebSocket协议可以由服务端主动发起消息,对于浏览器需要及时接收数据变化的场景非常适合,例如在Django中遇到一些耗时较长的任务我们通常会使用Celery来异步执行,那么浏览器如果想要获取这个任务的执行状态,在HTTP协议中只能通过轮训的方式由浏览器不断的发送请求给服务器来获取最新状态,这样发送很多无用的请求不仅浪费资源,还不够优雅,如果使用WebSokcet来实现就很完美了

WebSocket的另外一个应用场景就是下文要说的聊天室,一个用户(浏览器)发送的消息需要实时的让其他用户(浏览器)接收,这在HTTP协议下是很难实现的,但WebSocket基于长连接加上可以主动给浏览器发消息的特性处理起来就游刃有余了

初步了解WebSocket之后,我们看看如何在Django中实现WebSocket

Channels

Django本身不支持WebSocket,但可以通过集成Channels框架来实现WebSocket

Channels是针对Django项目的一个增强框架,可以使Django不仅支持HTTP协议,还能支持WebSocket,MQTT等多种协议,同时Channels还整合了Django的auth以及session系统方便进行用户管理及认证。

我下文所有的代码实现使用以下python和Django版本

  • python==3.6.3

  • django==2.2

集成Channels

我假设你已经新建了一个django项目,项目名字就叫webapp,目录结构如下

project
    - webapp
        - __init__.py
        - settings.py
        - urls.py
        - wsgi.py
    - manage.py

1.  安装channels

pip install channels==2.1.7

2.  修改settings.py文件,

# APPS中添加channels
INSTALLED_APPS = [
    'django.contrib.staticfiles',
    'channels',
]

# 指定ASGI的路由地址
ASGI_APPLICATION = 'webapp.routing.application'

channels运行于ASGI协议上,ASGI的全名是Asynchronous Server Gateway Interface。它是区别于Django使用的WSGI协议 的一种异步服务网关接口协议,正是因为它才实现了websocket

ASGI_APPLICATION 指定主路由的位置为webapp下的routing.py文件中的application

3.  setting.py的同级目录下创建routing.py路由文件,routing.py类似于Django中的url.py指明websocket协议的路由

from channels.routing import ProtocolTypeRouter

application = ProtocolTypeRouter({
    # 暂时为空,下文填充
})

4.  运行Django项目

C:\python36\python.exe D:/demo/tailf/manage.py runserver 0.0.0.0:80
Performing system checks...
Watching for file changes with StatReloader

System check identified no issues (0 silenced).
April 122019 - 17:44:52
Django version 2.2using settings 'webapp.settings'
Starting ASGI/Channels version 2.1.7 development server at http://0.0.0.0:80/
Quit the server with CTRL-BREAK.

仔细观察上边的输出会发现Django启动中的Starting development server已经变成了Starting ASGI/Channels version 2.1.7 development server,这表明项目已经由django使用的WSGI协议转换为了Channels使用的ASGI协议

至此Django已经基本集成了Channels框架

构建聊天室

上边虽然在项目中集成了Channels,但并没有任何的应用使用它,接下来我们以聊天室的例子来讲解Channels的使用

假设你已经创建好了一个叫chat的app,并添加到了settings.py的INSTALLED_APPS中,app的目录结构大概如下

chat
    - migrations
        - __init__.py
    - __init__.py
    - admin.py
    - apps.py
    - models.py
    - tests.py
    - views.py

我们构建一个标准的Django聊天页面,相关代码如下

url:

from django.urls import path
from chat.views import chat

urlpatterns = [
    path('chat', chat, name='chat-url')
]

view:

from django.shortcuts import render

def chat(request):
    return render(request, 'chat/index.html')

template:

{% extends "base.html" %}

{% block content %}
  <textarea class="form-control" id="chat-log" disabled rows="20"></textarea><br/>
  <input class="form-control" id="chat-message-input" type="text"/><br/>
  <input class="btn btn-success btn-block" id="chat-message-submit" type="button" value="Send"/>
{% endblock %}

通过上边的代码一个简单的web聊天页面构建完成了,访问页面大概样子如下:

接下来我们利用Channels的WebSocket协议实现消息的发送接收功能

1.  先从路由入手,上边我们已经创建了routing.py路由文件,现在来填充里边的内容

from channels.auth import AuthMiddlewareStack
from channels.routing import ProtocolTypeRouter, URLRouter
import chat.routing

application = ProtocolTypeRouter({
    'websocket': AuthMiddlewareStack(
        URLRouter(
            chat.routing.websocket_urlpatterns
        )
    ),
})

ProtocolTypeRouter: ASIG支持多种不同的协议,在这里可以指定特定协议的路由信息,我们只使用了websocket协议,这里只配置websocket即可

AuthMiddlewareStack: django的channels封装了django的auth模块,使用这个配置我们就可以在consumer中通过下边的代码获取到用户的信息

def connect(self):
    self.user = self.scope["user"]

self.scope类似于django中的request,包含了请求的type、path、header、cookie、session、user等等有用的信息

URLRouter: 指定路由文件的路径,也可以直接将路由信息写在这里,代码中配置了路由文件的路径,会去chat下的routeing.py文件中查找websocket_urlpatterns,chat/routing.py内容如下

from django.urls import path
from chat.consumers import ChatConsumer

websocket_urlpatterns = [
    path('ws/chat/', ChatConsumer),
]

routing.py路由文件跟django的url.py功能类似,语法也一样,意思就是访问ws/chat/都交给ChatConsumer处理

2.  接着编写consumer,consumer类似django中的view,内容如下

from channels.generic.websocket import WebsocketConsumer
import json

class ChatConsumer(WebsocketConsumer):
    def connect(self):
        self.accept()

    def disconnect(self, close_code):
        pass

    def receive(self, text_data):
        text_data_json = json.loads(text_data)
        message = '运维咖啡吧:' + text_data_json['message']

        self.send(text_data=json.dumps({
            'message': message
        }))

这里是个最简单的同步websocket consumer类,connect方法在连接建立时触发,disconnect在连接关闭时触发,receive方法会在收到消息后触发。整个ChatConsumer类会将所有收到的消息加上“运维咖啡吧:”的前缀发送给客户端

3.  最后我们在html模板页面添加websocket支持

{% extends "base.html" %}

{% block content %}
  <textarea class="form-control" id="chat-log" disabled rows="20"></textarea><br/>
  <input class="form-control" id="chat-message-input" type="text"/><br/>
  <input class="btn btn-success btn-block" id="chat-message-submit" type="button" value="Send"/>
{% endblock %}

{% block js %}
<script>
  var chatSocket = new WebSocket(
    'ws://' + window.location.host + '/ws/chat/');

  chatSocket.onmessage = function(e{
    var data = JSON.parse(e.data);
    var message = data['message'];
    document.querySelector('#chat-log').value += (message + '\n');
  };

  chatSocket.onclose = function(e{
    console.error('Chat socket closed unexpectedly');
  };

  document.querySelector('#chat-message-input').focus();
  document.querySelector('#chat-message-input').onkeyup = function(e{
    if (e.keyCode === 13) {  // enter, return
        document.querySelector('#chat-message-submit').click();
    }
  };

  document.querySelector('#chat-message-submit').onclick = function(e{
    var messageInputDom = document.querySelector('#chat-message-input');
    var message = messageInputDom.value;
    chatSocket.send(JSON.stringify({
        'message': message
    }));

    messageInputDom.value = '';
  };
</script>
{% endblock %}

WebSocket对象一个支持四个消息:onopen,onmessage,oncluse和onerror,我们这里用了两个onmessage和onclose

onopen: 当浏览器和websocket服务端连接成功后会触发onopen消息

onerror: 如果连接失败,或者发送、接收数据失败,或者数据处理出错都会触发onerror消息

onmessage: 当浏览器接收到websocket服务器发送过来的数据时,就会触发onmessage消息,参数e包含了服务端发送过来的数据

onclose: 当浏览器接收到websocket服务器发送过来的关闭连接请求时,会触发onclose消息

4.  完成前边的代码,一个可以聊天的websocket页面就完成了,运行项目,在浏览器中输入消息就会通过websocket-->rouging.py-->consumer.py处理后返回给前端

启用Channel Layer

上边的例子我们已经实现了消息的发送和接收,但既然是聊天室,肯定要支持多人同时聊天的,当我们打开多个浏览器分别输入消息后发现只有自己收到消息,其他浏览器端收不到,如何解决这个问题,让所有客户端都能一起聊天呢?

Channels引入了一个layer的概念,channel layer是一种通信系统,允许多个consumer实例之间互相通信,以及与外部Djanbo程序实现互通。

channel layer主要实现了两种概念抽象:

channel name: channel实际上就是一个发送消息的通道,每个Channel都有一个名称,每一个拥有这个名称的人都可以往Channel里边发送消息

group: 多个channel可以组成一个Group,每个Group都有一个名称,每一个拥有这个名称的人都可以往Group里添加/删除Channel,也可以往Group里发送消息,Group内的所有channel都可以收到,但是无法发送给Group内的具体某个Channel

了解了上边的概念,接下来我们利用channel layer实现真正的聊天室,能够让多个客户端发送的消息被彼此看到

1.  官方推荐使用redis作为channel layer,所以先安装channels_redis

pip install channels_redis==2.3.3

2.  然后修改settings.py添加对layer的支持

CHANNEL_LAYERS = {
    'default': {
        'BACKEND''channels_redis.core.RedisChannelLayer',
        'CONFIG': {
            "hosts": [('ops-coffee.cn', 6379)],
        },
    },
}

添加channel之后我们可以通过以下命令检查通道层是否能够正常工作

>python manage.py shell
Python 3.6.3 (v3.6.3:2c5fed8, Oct  3 201718:11:49) [MSC v.1900 64 bit (AMD64)] on win32
Type "help""copyright""credits" or "license" for more information.
(InteractiveConsole)
>>> import channels.layers
>>> channel_layer = channels.layers.get_channel_layer()
>>>
>>> from asgiref.sync import async_to_sync
>>> async_to_sync(channel_layer.send)('test_channel',{'site':'https://ops-coffee.cn'})
>>> async_to_sync(channel_layer.receive)('test_channel')
{'site''https://ops-coffee.cn'}
>>>

3.  consumer做如下修改引入channel layer

from asgiref.sync import async_to_sync
from channels.generic.websocket import WebsocketConsumer
import json

class ChatConsumer(WebsocketConsumer):
    def connect(self):
        self.room_group_name = 'ops_coffee'

        # Join room group
        async_to_sync(self.channel_layer.group_add)(
            self.room_group_name,
            self.channel_name
        )

        self.accept()

    def disconnect(self, close_code):
        # Leave room group
        async_to_sync(self.channel_layer.group_discard)(
            self.room_group_name,
            self.channel_name
        )

    # Receive message from WebSocket
    def receive(self, text_data):
        text_data_json = json.loads(text_data)
        message = text_data_json['message']

        # Send message to room group
        async_to_sync(self.channel_layer.group_send)(
            self.room_group_name,
            {
                'type''chat_message',
                'message': message
            }
        )

    # Receive message from room group
    def chat_message(self, event):
        message = '运维咖啡吧:' + event['message']

        # Send message to WebSocket
        self.send(text_data=json.dumps({
            'message': message
        }))

这里我们设置了一个固定的房间名作为Group name,所有的消息都会发送到这个Group里边,当然你也可以通过参数的方式将房间名传进来作为Group name,从而建立多个Group,这样可以实现仅同房间内的消息互通

当我们启用了channel layer之后,所有与consumer之间的通信将会变成异步的,所以必须使用async_to_sync

一个链接(channel)创建时,通过group_add将channel添加到Group中,链接关闭通过group_discard将channel从Group中剔除,收到消息时可以调用group_send方法将消息发送到Group,这个Group内所有的channel都可以收的到

group_send中的type指定了消息处理的函数,这里会将消息转给chat_message函数去处理

4.  经过以上的修改,我们再次在多个浏览器上打开聊天页面输入消息,发现彼此已经能够看到了,至此一个完整的聊天室已经基本完成

修改为异步

我们前边实现的consumer是同步的,为了能有更好的性能,官方支持异步的写法,只需要修改consumer.py即可

from channels.generic.websocket import AsyncWebsocketConsumer
import json

class ChatConsumer(AsyncWebsocketConsumer):
    async def connect(self):
        self.room_group_name = 'ops_coffee'

        # Join room group
        await self.channel_layer.group_add(
            self.room_group_name,
            self.channel_name
        )

        await self.accept()

    async def disconnect(self, close_code):
        # Leave room group
        await self.channel_layer.group_discard(
            self.room_group_name,
            self.channel_name
        )

    # Receive message from WebSocket
    async def receive(self, text_data):
        text_data_json = json.loads(text_data)
        message = text_data_json['message']

        # Send message to room group
        await self.channel_layer.group_send(
            self.room_group_name,
            {
                'type''chat_message',
                'message': message
            }
        )

    # Receive message from room group
    async def chat_message(self, event):
        message = '运维咖啡吧:' + event['message']

        # Send message to WebSocket
        await self.send(text_data=json.dumps({
            'message': message
        }))

其实异步的代码跟之前的差别不大,只有几个小区别:

ChatConsumer由WebsocketConsumer修改为了AsyncWebsocketConsumer

所有的方法都修改为了异步defasync def

await来实现异步I/O的调用

channel layer也不再需要使用async_to_sync

好了,现在一个完全异步且功能完整的聊天室已经构建完成了

代码地址

我已经将以上的演示代码上传至Github方便你在实现的过程中查看参考,具体地址为:

https://github.com/ops-coffee/demo/tree/master/websocket

通过上一篇《Django使用Channels实现WebSocket--上篇》的学习应该对Channels的各种概念有了清晰的认知,可以顺利的将Channels框架集成到自己的Django项目中实现WebSocket了,本篇文章将以一个Channels+Celery实现web端tailf功能的例子更加深入的介绍Channels

先说下我们要实现的目标:所有登录的用户可以查看tailf日志页面,在页面上能够选择日志文件进行监听,多个页面终端同时监听任何日志都互不影响,页面同时提供终止监听的按钮能够终止前端的输出以及后台对日志文件的读取

最终实现的结果见下图

接着我们来看下具体的实现过程

技术实现

所有代码均基于以下软件版本:

  • python==3.6.3

  • django==2.2

  • channels==2.1.7

  • celery==4.3.0

celery4在windows下支持不完善,所以请在linux下运行测试

日志数据定义

我们只希望用户能够查询固定的几个日志文件,就不是用数据库仅借助settings.py文件里写全局变量来实现数据存储

在settings.py里添加一个叫TAILF的变量,类型为字典,key标识文件的编号,value标识文件的路径

TAILF = {
    1: '/ops/coffee/error.log',
    2: '/ops/coffee/access.log',
}

基础Web页面搭建

假设你已经创建好了一个叫tailf的app,并添加到了settings.py的INSTALLED_APPS中,app的目录结构大概如下

tailf
    - migrations
        - __init__.py
    - __init__.py
    - admin.py
    - apps.py
    - models.py
    - tests.py
    - views.py

依然先构建一个标准的Django页面,相关代码如下

url:

from django.urls import path
from django.contrib.auth.views import LoginView,LogoutView

from tailf.views import tailf

urlpatterns = [
    path('tailf', tailf, name='tailf-url'),

    path('login', LoginView.as_view(template_name='login.html'), name='login-url'),
    path('logout', LogoutView.as_view(template_name='login.html'), name='logout-url'),
]

因为我们规定只有通过登录的用户才能查看日志,所以引入Django自带的LoginView,logoutView帮助我们快速构建Login,Logout功能

指定了登录模板使用login.html,它就是一个标准的登录页面,post传入username和password两个参数即可,不贴代码了

view:

from django.conf import settings
from django.shortcuts import render
from django.contrib.auth.decorators import login_required


# Create your views here.
@login_required(login_url='/login')
def tailf(request):
    logDict = settings.TAILF
    return render(request, 'tailf/index.html', {"logDict": logDict})

引入了login_required装饰器,来判断用户是否登录,未登录就给跳到/login登录页面

logDict 去setting里取我们定义好的TAILF字典赋值,并传递给前端

template:

{% extends "base.html" %}

{% block content %}
<div class="col-sm-8">
  <select class="form-control" id="file">
    <option value="">选择要监听的日志</option>
    {% for k,v in logDict.items %}
    <option value="{{ k }}">{{ v }}</option>
    {% endfor %}
  </select>
</div>
<div class="col-sm-2">
  <input class="btn btn-success btn-block" type="button" onclick="connect()" value="开始监听"/><br/>
</div>
<div class="col-sm-2">
  <input class="btn btn-warning btn-block" type="button" onclick="goclose()" value="终止监听"/><br/>
</div>
<div class="col-sm-12">
  <textarea class="form-control" id="chat-log" disabled rows="20"></textarea>
</div>
{% endblock %}

前端拿到TAILF后通过循环的方式填充到select选择框下,因为数据是字典格式,使用logDict.items的方式可以循环出字典的key和value

这样一个日志监听页面就完成了,但还无法实现日志的监听,继续往下

集成Channels实现WebSocket

日志监听功能主要的设计思路就是页面跟后端服务器建立websocket长连接,后端通过celery异步执行while循环不断的读取日志文件然后发送到websocket的channel里,实现页面上的实时显示

接着我们来集成channels

1.  先添加routing路由,直接修改webapp/routing.py

from channels.auth import AuthMiddlewareStack
from channels.routing import ProtocolTypeRouter, URLRouter

from django.urls import path, re_path
from chat.consumers import ChatConsumer
from tailf.consumers import TailfConsumer

application = ProtocolTypeRouter({
    'websocket': AuthMiddlewareStack(
        URLRouter([
            path('ws/chat/', ChatConsumer),
            re_path(r'^ws/tailf/(?P<id>\d+)/$', TailfConsumer),
        ])
    )
})

直接将路由信息写入到了URLRouter里,注意路由信息的外层多了一个list,区别于上一篇中介绍的写路由文件路径的方式

页面需要将监听的日志文件传递给后端,我们使用routing正则P<id>\d+传文件ID给后端程序,后端程序拿到ID之后根据settings中指定的TAILF解析出日志路径

routing的写法跟Django中的url写法完全一致,使用re_path匹配正则routing路由

2.  添加consumer在tailf/consumers.py文件中

import json
from channels.generic.websocket import WebsocketConsumer
from tailf.tasks import tailf


class TailfConsumer(WebsocketConsumer):
    def connect(self):
        self.file_id = self.scope["url_route"]["kwargs"]["id"]

        self.result = tailf.delay(self.file_id, self.channel_name)

        print('connect:'self.channel_name, self.result.id)
        self.accept()

    def disconnect(self, close_code):
        # 中止执行中的Task
        self.result.revoke(terminate=True)
        print('disconnect:'self.file_id, self.channel_name)

    def send_message(self, event):
        self.send(text_data=json.dumps({
            "message": event["message"]
        }))

这里使用Channels的单通道模式,每一个新连接都会启用一个新的channel,彼此互不影响,可以随意终止任何一个监听日志的请求

connect

我们知道self.scope类似于Django中的request,记录了丰富的请求信息,通过self.scope["url_route"]["kwargs"]["id"]取出routing中正则匹配的日志ID

然后将idchannel_name传递给celery的任务函数tailf,tailf根据id取到日志文件的路径,然后循环文件,将新内容根据channel_name写入对应channel

disconnect

当websocket连接断开的时候我们需要终止Celery的Task执行,以清除celery的资源占用

终止Celery任务使用到revoke指令,采用如下代码来实现

self.result.revoke(terminate=True)

注意self.result是一个result对象,而非id

参数terminate=True的意思是是否立即终止Task,为True时无论Task是否正在执行都立即终止,为False(默认)时需要等待Task运行结束之后才会终止,我们使用了While循环不设置为True就永远不会终止了

终止Celery任务的另外一种方法是:

from webapp.celery import app
app.control.revoke(result.id, terminate=True)

send_message

方便我们通过Django的view或者Celery的task调用给channel发送消息,官方也比较推荐这种方式

使用Celery异步循环读取日志

上边已经集成了Channels实现了WebSocket,但connect函数中的celery任务tailf还没有实现,下边来实现它

关于Celery的详细内容可以看这篇文章:《Django配置Celery执行异步任务和定时任务》,本文就不介绍集成使用以及细节原理,只讲一下任务task

task实现代码如下:

from __future__ import absolute_import
from celery import shared_task

import time
from channels.layers import get_channel_layer
from asgiref.sync import async_to_sync
from django.conf import settings


@shared_task
def tailf(id, channel_name):
    channel_layer = get_channel_layer()
    filename = settings.TAILF[int(id)]

    try:
        with open(filename) as f:
            f.seek(02)

            while True:
                line = f.readline()

                if line:
                    print(channel_name, line)
                    async_to_sync(channel_layer.send)(
                        channel_name,
                        {
                            "type""send.message",
                            "message""微信公众号【运维咖啡吧】原创 版权所有 " + str(line)
                        }
                    )
                else:
                    time.sleep(0.5)
    except Exception as e:
        print(e)

这里边主要涉及到Channels中另一个非常重要的点:从Channels的外部发送消息给Channel

其实上篇文章中检查通道层是否能够正常工作的时候使用的方法就是从外部给Channel通道发消息的示例,本文的具体代码如下

async_to_sync(channel_layer.send)(
    channel_name,
    {
        "type""send.message",
        "message""微信公众号【运维咖啡吧】原创 版权所有 " + str(line)
    }
)

channel_name 对应于传递给这个任务的channel_name,发送消息给这个名字的channel

type 对应于我们Channels的TailfConsumer类中的send_message方法,将方法中的_换成.即可

message 就是要发送给这个channel的具体信息

上边是发送给单Channel的情况,如果是需要发送到Group的话需要使用如下代码

async_to_sync(channel_layer.group_send)(
    group_name,
    {
        'type''chat.message',
        'message''欢迎关注公众号【运维咖啡吧】'
    }
)

只需要将发送单channel的send改为group_sendchannel_name改为group_name即可

需要特别注意的是:使用了channel layer之后一定要通过async_to_sync来异步执行

页面添加WebSocket支持

后端功能都已经完成,我们最后需要添加前端页面支持WebSocket

  function connect() {
    if ( $('#file').val() ) {
      window.chatSocket = new WebSocket(
        'ws://' + window.location.host + '/ws/tailf/' + $('#file').val() + '/');

      chatSocket.onmessage = function(e{
        var data = JSON.parse(e.data);
        var message = data['message'];
        document.querySelector('#chat-log').value += (message);
        // 跳转到页面底部
        $('#chat-log').scrollTop($('#chat-log')[0].scrollHeight);
      };

      chatSocket.onerror = function(e{
        toastr.error('服务端连接异常!')
      };

      chatSocket.onclose = function(e{
        toastr.error('websocket已关闭!')
      };
    } else {
      toastr.warning('请选择要监听的日志文件')
    }
  }

上一篇文章中有详细介绍过websocket的消息类型,这里不多介绍了

至此我们一个日志监听页面完成了,包含了完整的监听功能,但还无法终止,接着看下面的内容

Web页面主动断开WebSocket

web页面上“终止监听”按钮的主要逻辑就是触发WebSocket的onclose方法,从而可以触发Channels后端consumer的disconnect方法,进而终止Celery的循环读取日志任务

前端页面通过.close()可以直接触发WebSocket关闭,当然你如果直接关掉页面的话也会触发WebSocket的onclose消息,所以不用担心Celery任务无法结束的问题

  function goclose() {
    console.log(window.chatSocket);

    window.chatSocket.close();
    window.chatSocket.onclose = function(e{
      toastr.success('已终止日志监听!')
    };
  }

至此我们包含完善功能的Tailf日志监听、终止页面就全部完成了

写在最后

两篇文章结束不知道你是否对Channels有了更深一步的了解,能够操刀上手将Channels用在自己的项目中,实现理想的功能。个人觉得Channels的重点和难点在于对channel layer的理解和运用,真正的理解了并能熟练运用,相信你一定能够举一反三完美实现更多需求。


推荐↓↓↓
人工智能与大数据
上一篇:Django集成OpenLDAP认证 下一篇:利用AI炒股开挂!这届程序员真会玩