开发过一个项目,在刷了MicroPython的ESP32C3开发板上,运行一个HTTP服务和一个定时提交数据的自动任务。一开始,HTTP服务经常不定时宕掉。后来才发现,是内存不足。由于定时提价数据的自动任务,是通过HTTP协议POST数据,不能再精简。那么,只能对这个HTTP服务“开刀”。
这个HTTP服务,一开始是用TinyWeb开发,升级MicroPython v1.24.1后,改为使用microdot。两个框架的GitHub项目如下:
- belyalov/tinyweb: Simple and lightweight HTTP async server for micropython
- miguelgrinberg/microdot: The impossibly small web framework for Python and MicroPython.
这些Web框架,功能比较全面,自然占用的内存就比较大。想要最大限度精简内存占用,最好的做法就是不使用Web框架开发,从底层实现HTTP服务。参考了以下代码:
- https://docs.micropython.org/en/latest/esp8266/tutorial/network_tcp.html#simple-http-server
- https://randomnerdtutorials.com/esp32-esp8266-micropython-web-server/
- https://randomnerdtutorials.com/raspberry-pi-pico-web-server-micropython/
最后编写出可以在MicroPython v1.24.1上运行的HTTP服务:
from micropython import const
import socket
import time
import logging
_logger = None
_RESP_STATUS = {
200: 'HTTP/1.1 200 OK\n'
,302: 'HTTP/1.1 302 Found\n'
,400: 'HTTP/1.1 400 Bad Request\n'
,403: 'HTTP/1.1 403 Forbidden\n'
,500: 'HTTP/1.1 500 Internal Server Error\n'
}
def _getRequest(reqRaw: byte):
'''获取整理后的请求数据
Args:
reqRaw (byte): 请求的原始字符串。
Returns:
dict: 整理后的请求dict对象。method:请求方法;path:请求Path;protocol:请求协议;form:POST的form数据。
'''
charset = 'utf-8'
rArr = reqRaw.split(b'\r\n')
r = str(rArr[0], charset).split()
r = {'method':r[0].upper(), 'path':r[1], 'protocol':r[2]}
if r['method'] == 'POST' and b'Content-Type: application/x-www-form-urlencoded' in rArr:
r['form'] = dict(
[tuple(item.split('=')) for item in str(rArr[-1], charset).split('&')]
)
return r
def _getPage(html: str, redirectUrl: str = '', redirectSecond: int = 0) -> str:
'''生成HTML页面。主要是把传入的HTML数据,套上模板,或者实现自动跳转功能
Args:
html (str): HTML数据。
redirectUrl (str): 跳转URL。
redirectSecond (int): 等待指定秒数后自动跳转。
Returns:
str: 添加模板后的HTML页面
'''
redirectMeta = '' if len(redirectUrl) <= 0 else f'<meta http-equiv="Refresh" content="{redirectSecond}; URL={redirectUrl}" />'
return f'<html><head><meta name="viewport" content="width=device-width,initial-scale=1">{redirectMeta}<title>Simple Web</title><link rel="icon" href="data:;base64,="></head><body>{html}</body></html>'
def _pageIndex(request):
'''处理访问首页的请求。
'''
html = _getPage('''
<div style="text-align:center;">
<h1>Hello!</h1>
<p>Page index</p>
</div>
''')
# Send response
# Response header set boot status, for Shell script reading
return html, 200
def _pageTips(request):
'''处理提示页的请求,会在5秒后自动跳转首页。
'''
html = _getPage('<div style="text-align:center;">Forbidden.</div>', redirectUrl = 'index', redirectSecond = 3)
return html, 200
# Start web server application
def run(logger: logging.Logger):
'''启动HTTP服务
Args:
logger (logging.Logger): 传入logger对象。
'''
global _logger
_logger = logger
# Set up socket and start listening
try:
addr = socket.getaddrinfo(conf.WEB_HOST, conf.WEB_PORT)[0][-1]
s = socket.socket()
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.bind(addr)
s.listen()
_logger.info(f'Web server on {addr}')
except Exception as e:
_logger.error(f'Web server start failed: {e}')
return
# Main loop to listen for connections
while True:
try:
conn, addr = s.accept()
_logger.debug(f'Request from: {addr}')
# Receive and parse the request
reqRaw = conn.recv(1024)
_logger.debug(f'Request content: {reqRaw}')
req = _getRequest(reqRaw)
# Process the request
if req['path'] in ['/','/index'] and req['method'] == "GET":
respHtml, respStatus, respHeader = _pageIndex(req)
elif req['path'] == '/tips' and req['method'] == "GET":
respHtml, respStatus, respHeader = _pageTips(req)
else:
# Url path of request is not support
_logger.debug(f"Path {req['path']}, 400")
conn.send(_RESP_STATUS[400])
conn.close()
continue
# Send the HTTP response and close the connection
conn.send(_RESP_STATUS[respStatus])
conn.send('Content-Type: text/html\n')
if respHeader is not None:
for k,v in respHeader.items():
conn.send(f'{k}: {v}\n')
# Respone set no cache
# Notice: Two newline characters ("\n") indicate the end of the header.
conn.send('Cache-Control: no-cache\nExpires: 0\n\n')
conn.write(respHtml)
except Exception as e:
_logger.error(f'WebServer Exception: {e}')
try:
conn.send(_RESP_STATUS[500])
except:
pass
finally:
try:
if conn is not None:
conn.close()
except:
pass
注意:
- 关于节省内存
- 首先不要import没用到的库。甚至可以
import
指定库并用完后,执行del
。 - 使用
byte
类型,代替str
类型。 - 适当的时候,手工清理内存,即执行
gc.collect()
。 - py文件编译为mpy,加载mpy文件更快更省内存。
- 首先不要import没用到的库。甚至可以
- 关于稳定性
- 使用
try ... except
捕获所有异常,避免程序因异常而意外终止。
- 使用