说明:

深度学习3.0接近尾声时,项目多了一个迫切的需求,需要有一个WebSSH。许尚明提出了通过websocket连接Kubernetes Pod的数据流来实现WebSSH的功能。刚好我前面学习完websocket,网上也有很好的范例,就很快搞定了!记录下,温故而知新。

涉及技术

  • Kubernetes Stream:接收数据执行,提供实时返回数据流
  • Django Channels:维持长连接,接收前端数据转给Kubernetes,同时将Kubernetes返回的数据发送给前端
  • xterm.js:一个前端终端组件,用于模拟Terminal的界面显示

基本的数据流向是:用户 --> xterm.js --> django channels --> kubernetes stream,接下来看看具体的代码实现。

路由文件

先定义一个Websocket的url。

  • my_project/routing.py
from django.urls import path
from django.conf.urls import url, include, re_path
from channels.routing import ProtocolTypeRouter, URLRouter
from tasks.consumer import MessagesConsumer, ImagesConsumer, WebSSHConsumer
# 用于Websocket认证, 集成了CookieMiddleware AuthMiddleware SessionMiddleware
from channels.auth import AuthMiddlewareStack
from channels.security.websocket import AllowedHostsOriginValidator
# self.scope['type'] 获取协议类型
# self.scope['url_route'] ['kwargs']['username']获取url中关键字参数
# channels routing是scope级别的, 一个连接只能由一个consumer接收和处理
# chaneels routing.py处理应用中WebSockets的路由,相当于Django中的urls.py
application = ProtocolTypeRouter({
    # 'http':views, 普通的http请求不需要我们手动在这里添加, 框架会自动加载。
    'websocket':AllowedHostsOriginValidator(AuthMiddlewareStack(
        URLRouter([
        re_path(r'^wts/(?P<podid>\d+)$',WebSSHConsumer), # 8.10
    ])))
})

上面的是正则匹配所有以wts/数字开头的websocket连接,都交由名为SSHConsumer的Consumer处理。

Consumer文件

Consumer的代码如下:

  • my_project/apps/app/consumer.py
class KubeApi:
    def __init__(self, namespace='default'):
        config.load_kube_config()
        self.namespace = namespace

    def pod_exec(self, podname,container=""):
        api_instance = client.CoreV1Api()
        exec_command = [
            "/bin/sh",
            "-c",
            'TERM=xterm-256color; export TERM; [ -x /bin/bash ] '
            '&& ([ -x /usr/bin/script ] '
            '&& /usr/bin/script -q -c "/bin/bash" /dev/null || exec /bin/bash) '
            '|| exec /bin/sh']

        cont_stream = stream(api_instance.connect_get_namespaced_pod_exec,
                             name=podname,
                             namespace=self.namespace,
                             container=container,
                             command=exec_command,
                             stderr=True, stdin=True,
                             stdout=True, tty=True,
                             _preload_content=False
                             )

        return cont_stream

class K8SStreamThread(Thread):
    def __init__(self, websocket, container_stream):
        Thread.__init__(self)
        self.websocket = websocket
        self.stream = container_stream

    def run(self):
        while self.stream.is_open():
            if self.stream.peek_stdout():
                stdout = self.stream.read_stdout()
                self.websocket.send(stdout)

            if self.stream.peek_stderr():
                stderr = self.stream.read_stderr()
                self.websocket.send(stderr)
        else:
            self.websocket.close()

class WebSSHConsumer(WebsocketConsumer):
    '''
    处理WebSSH的websocket请求
    '''
    def connect(self):
        request_session = self.scope['session']
        if not request_session.get('userId'):
            # 未登录用户拒绝连接
            self.close()
        self.podid = self.scope["url_route"]["kwargs"]["podid"]
        if not self.podid:
            self.close()
        if not Pod.objects.filter(id=self.podid).exists():
            self.close()
        pod = Pod.objects.get(id=self.podid)
            # 不是running状态的拒绝连接
        if pod.status != 'Running':
            self.close()
        try:
            self.stream = KubeApi().pod_exec(pod.name)
            kub_stream = K8SStreamThread(self, self.stream)
            kub_stream.start()

            self.accept()
        except Exception as e:
            print(e)
            self.close()

    def disconnect(self,close_code):
        self.stream.write_stdin('exit\r')

    def receive(self, text_data=None, bytes_data=None):
        self.stream.write_stdin(text_data)

主要思路

Kubernetes本身提供了stream方法来实现exec的功能,返回的就是一个WebSocket可以使用的数据流,使用起来也非常方便。KubeApi类就是这个作用。WebSSH可以看作是一个最简单的websocket长连接,每个连接建立后都是独立的,不会跟其他连接共享数据,所以这里不需要用到Group

当连接建立时通过self.scope["url_route"]["kwargs"]["podid"]获取到url中的podid,在数据库查询后得到podname传给KubeApi类,同时会新起一个线程不断循环是否有新数据产生,如果有则发送给websocket。

当websocket接收到数据就直接写入Kubernetes API,当websocket关闭则会发送个exit命令给Kubernetes。
这样后台就成功实现了WebSSH这个功能!回顾下,代码量很少,重点是要理解其中的含义。

Last modification:April 19th, 2020 at 11:47 am