Simplest HTTP Server for MicroPython

开发过一个项目,在刷了MicroPython的ESP32C3开发板上,运行一个HTTP服务和一个定时提交数据的自动任务。一开始,HTTP服务经常不定时宕掉。后来才发现,是内存不足。由于定时提价数据的自动任务,是通过HTTP协议POST数据,不能再精简。那么,只能对这个HTTP服务“开刀”。

这个HTTP服务,一开始是用TinyWeb开发,升级MicroPython v1.24.1后,改为使用microdot。两个框架的GitHub项目如下:

这些Web框架,功能比较全面,自然占用的内存就比较大。想要最大限度精简内存占用,最好的做法就是不使用Web框架开发,从底层实现HTTP服务。参考了以下代码:

最后编写出可以在MicroPython v1.24.1上运行的HTTP服务:

from micropython import const
import socket
import time
import logging

_logger = None
_RESP_STATUS = {
    200: 'HTTP/1.1 200 OK\n'
    ,302: 'HTTP/1.1 302 Found\n'
    ,400: 'HTTP/1.1 400 Bad Request\n'
    ,403: 'HTTP/1.1 403 Forbidden\n'
    ,500: 'HTTP/1.1 500 Internal Server Error\n'
}

def _getRequest(reqRaw: byte):
    '''获取整理后的请求数据
    
    Args:
        reqRaw (byte): 请求的原始字符串。
    
    Returns:
        dict: 整理后的请求dict对象。method:请求方法;path:请求Path;protocol:请求协议;form:POST的form数据。
	'''
    charset = 'utf-8'
    rArr = reqRaw.split(b'\r\n')
    r = str(rArr[0], charset).split()
    r = {'method':r[0].upper(), 'path':r[1], 'protocol':r[2]}
    if r['method'] == 'POST' and b'Content-Type: application/x-www-form-urlencoded' in rArr:
        r['form'] = dict(
            [tuple(item.split('=')) for item in str(rArr[-1], charset).split('&')]
        )
    return r

def _getPage(html: str, redirectUrl: str = '', redirectSecond: int = 0) -> str:
    '''生成HTML页面。主要是把传入的HTML数据,套上模板,或者实现自动跳转功能
	
    Args:
        html (str): HTML数据。
        redirectUrl (str): 跳转URL。
        redirectSecond (int): 等待指定秒数后自动跳转。

    Returns:
        str: 添加模板后的HTML页面
	'''
    redirectMeta = '' if len(redirectUrl) <= 0 else f'<meta http-equiv="Refresh" content="{redirectSecond}; URL={redirectUrl}" />'
    return f'<html><head><meta name="viewport" content="width=device-width,initial-scale=1">{redirectMeta}<title>Simple Web</title><link rel="icon" href="data:;base64,="></head><body>{html}</body></html>'

def _pageIndex(request):
    '''处理访问首页的请求。
	'''
    html = _getPage('''
<div style="text-align:center;">
<h1>Hello!</h1>
<p>Page index</p>
</div>
''')

    # Send response
    # Response header set boot status, for Shell script reading
    return html, 200

def _pageTips(request):
    '''处理提示页的请求,会在5秒后自动跳转首页。
	'''
    html = _getPage('<div style="text-align:center;">Forbidden.</div>', redirectUrl = 'index', redirectSecond = 3)
    return html, 200

# Start web server application
def run(logger: logging.Logger):
    '''启动HTTP服务

    Args:
        logger (logging.Logger): 传入logger对象。
    '''
    global _logger
    _logger = logger

    # Set up socket and start listening
    try:
        addr = socket.getaddrinfo(conf.WEB_HOST, conf.WEB_PORT)[0][-1]
        s = socket.socket()
        s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        s.bind(addr)
        s.listen()
        _logger.info(f'Web server on {addr}')
    except Exception as e:
        _logger.error(f'Web server start failed: {e}')
        return

    # Main loop to listen for connections
    while True:
        try:
            conn, addr = s.accept()
            _logger.debug(f'Request from: {addr}')
            
            # Receive and parse the request
            reqRaw = conn.recv(1024)
            _logger.debug(f'Request content: {reqRaw}')
            req = _getRequest(reqRaw)
                       
            # Process the request
            if req['path'] in ['/','/index'] and req['method'] == "GET":
                respHtml, respStatus, respHeader = _pageIndex(req)
            elif req['path'] == '/tips' and req['method'] == "GET":
                respHtml, respStatus, respHeader = _pageTips(req)
            else:
                # Url path of request is not support
                _logger.debug(f"Path {req['path']}, 400")
                conn.send(_RESP_STATUS[400])
                conn.close()
                continue

            # Send the HTTP response and close the connection
            conn.send(_RESP_STATUS[respStatus])
            conn.send('Content-Type: text/html\n')
            if respHeader is not None:
                for k,v in respHeader.items():
                    conn.send(f'{k}: {v}\n')
            # Respone set no cache
            # Notice: Two newline characters ("\n") indicate the end of the header.
            conn.send('Cache-Control: no-cache\nExpires: 0\n\n')
            conn.write(respHtml)
        except Exception as e:
            _logger.error(f'WebServer Exception: {e}')
            try:
                conn.send(_RESP_STATUS[500])
            except:
                pass
        finally:
            try:
                if conn is not None:
                    conn.close()
            except:
                pass

注意:

  • 关于节省内存
    • 首先不要import没用到的库。甚至可以import指定库并用完后,执行del
    • 使用byte类型,代替str类型。
    • 适当的时候,手工清理内存,即执行gc.collect()
    • py文件编译为mpy,加载mpy文件更快更省内存。
  • 关于稳定性
    • 使用try ... except捕获所有异常,避免程序因异常而意外终止。
使用 Hugo 构建
主题 StackJimmy 设计