前言
我是QQ邮箱的老用户了,QQ邮箱很早就有“无限空间”的服务,我用到现在才用了700MB左右的空间,前几天看到知乎有人尝试过用QQ邮箱备份文件,就花了点时间自己实现了一个。
开整
一些链接
源码:GitHub
QQ邮箱的限制
- QQ邮箱写明的限制:
- 普通邮件的附件最大为50M,邮件正文+普通附件最大为55M。
- 一个账号一天内发送的普通附件上限200M。
- QQ邮箱可存储邮件的数量是500000封。
- QQ邮箱没有写明的限制:
- 一个账号一天能接收的附件上限约为300M。
可知QQ邮箱理论容量上限约为
50000 * 55MB ≈ 26.2TB
,不过也很难用完就是了。
文件处理
因为有附件总大小不50MB的限制,所以有必要做一个文件切割,分包的工作。
生成文件列表
def sort_file_list(dir_path: list) -> list:
'''根据目录生成文件列表,并按照文件尺寸从大到小的顺序排序
参数:
目录路径
返回:
排过序的目录列表,每个元素为[(文件名,文件路径,文件尺寸),]
'''
try:
files = []
with Path(dir_path) as p:
for x in p.iterdir():
# 只处理文件
if x.is_file():
file_path = str(PurePath(dir_path, x.name))
file_size = x.stat().st_size
files.append((file_path, x.name, file_size))
files.sort(key=lambda x: x[2], reverse=True)
except FileNotFoundError:
print(f'**目录[{dir_path}]不存在**')
return(files)
文件切分、分组
def group_file_by_size(file_list: list, max_size: int = 50331648, min_size: int = 1048576) -> (list, list):
'''把文件切分、打包成合适的大小
参数:
有序文件列表(需要先按照文件尺寸排序(sort_file_list))
文件组的最大大小(字节)
文件切片的最小大小(字节)(防止出现非常小的切片)
返回:
文件信息列表,[(文件名,路径,总文件块数,总大小),]
文件块列表,[(文件名,索引,文件块数,起始字节,读取字节),]
'''
file_info = [] # 记录文件名,路径,文件切片数(不切片为0)
group_list = [] # 记录文件分组(文件名,起始,大小,尺寸)
group = [] # 文件组变量
used_size = 0 # 已用空间
while len(file_list):
# 三个状态:
# 空的组,pop第一个文件,切分塞入
# 组非空,找到最适合的文件塞入
# 组非空,找不到适合的文件,关闭组,开启下一组。
if used_size == 0:
# 新的附件空间
free_size = max_size
# 取出一个文件
file_path, file_name, file_size = file_list.pop(0)
f_size = file_size
piece = 0 # 文件块数数(0=不切割)
start = 0 # 文件首字节
size = 0 # 文件结束字节
while file_size > 0:
if file_size >= max_size:
# 文件剩余部分大于等于空闲空间
file_size -= max_size
size = max_size
piece += 1
used_size = 0
group_list.append(
#[(f'{file_name}.[{piece}]', start, size)]
[(file_name, len(file_info), piece, start, size)]
)
start += max_size
else:
# 文件剩余部分小于空闲空间
size = file_size
if piece:
piece += 1
used_size = file_size
file_size = 0
# 开始一个文件组
group = [
#(f'{file_name}{f".[{piece}]" if piece else None}', start, size)
(file_name, len(file_info), piece, start, size)
]
file_info.append((file_name, file_path, piece, f_size))
if not file_list:
# 已经是最后一个文件
group_list.append(group)
else:
# 已经有文件的空间
free_size = max_size - used_size
# smallest_size = file_list[-1][2]
if file_list[-1][2] <= free_size:
# 空闲空间足够大,无需切割
# 寻找能放得下的最大的文件
for i, f in enumerate(file_list, 0):
if f[2] <= free_size:
break
file_path, file_name, file_size = file_list.pop(i)
f_size = file_size
start = 0
size = file_size
used_size += file_size
group.append((file_name, len(file_info), 0, start, size))
file_info.append((file_name, file_path, 0, f_size))
if not file_list:
# 已经是最后一个文件
group_list.append(group)
else:
# 最小文件大小大于空闲空间
if free_size > min_size:
# 无法放下任何文件,但是允许切分文件
file_path, file_name, file_size = file_list.pop(0)
f_size = file_size
file_size -= free_size
piece = 1
start = 0
size = free_size
# 切分第一部分
group.append(
#(f'{file_name}.[{piece}]', start, size)
(file_name, len(file_info), piece, start, size)
)
group_list.append(group)
start += size
used_size = 0
group = []
# 安排文件的后续部分
while file_size > 0:
if file_size >= max_size:
# 后续部分大于等于最大空间
file_size -= max_size
size = max_size
piece += 1
# 直接封装文件组
group_list.append(
#[(f'{file_name}.[{piece}]', start, size)]
[(file_name, len(file_info), piece, start, size)]
)
start += size
else:
# 后续部分小于最大空间
size = file_size
piece += 1
used_size = file_size
file_size = 0
# 开始一个文件组
group = [
#(f'{file_name}.[{piece}]', start, size)
(file_name, len(file_info), piece, start, size)
]
file_info.append(
(file_name, file_path, piece, f_size))
if not file_list:
# 已经是最后一个文件
group_list.append(group)
else:
# 无法放下任何文件,不允许切分文件,开始新组
group_list.append(group)
group = []
used_size = 0
return((file_info, group_list))
这里我设计了两个阈值,max_size
控制一个文件组的最大尺寸,min_size
控制一个文件切片的最小尺寸,防止出现非常零碎的碎片。
组装文件,为发信做准备
def pack_file(file_info, file_group) -> list:
'''按照文件组读取文件,两个参数可以用group_file_by_size获取
参数:
文件信息列表
文件组列表
返回:
文件内容列表[(文件名,文件二进制数据),]
'''
pack_list = []
for name, index, piece, start, size in file_group:
try:
path = file_info[index][1]
with open(path, 'rb') as f:
f.seek(start)
content = f.read(size)
if piece:
pack_list.append((f'{name}.[{piece}]', content))
else:
pack_list.append((name, content))
except FileNotFoundError:
print(f'**文件{name}不存在**')
except IndexError:
print('**索引越界,请检查传入列表是否对应**')
return(pack_list)
发送电邮
import smtplib
from email.header import Header
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from email.mime.application import MIMEApplication
def send_email(subject: str, message: str, attact_list: list = []):
'''发送邮件
参数:
邮件主题
邮件正文
邮件附件列表,列表中每个元素为(文件名,字节集)
'''
mailobj = MIMEMultipart()
mailobj['From'] = Header(f"{email_from}<{email_sender}>", 'utf-8')
mailobj['To'] = Header(f"{email_to}<{email_receiver}>", 'utf-8')
mailobj['Subject'] = Header(subject, 'utf-8')
mailobj.attach(MIMEText(message, 'plain', 'utf-8'))
if attact_list:
for name, content in attact_list:
part = MIMEApplication(content)
part.add_header('Content-Disposition', 'attachment', filename=name)
mailobj.attach(part)
with smtplib.SMTP_SSL(host=smtp_host, port=smtp_port) as smtpObj:
smtpObj.connect(host=smtp_host, port=smtp_port)
smtpObj.login(smtp_user, smtp_pass)
smtpObj.sendmail(email_sender, email_receiver, mailobj.as_string())
From和To的部分其实写的不是很规范,但是QQ邮箱能正确识别,应该问题也不大。
生成系统报告
import psutil
def gen_sys_info() -> (str, list):
'''生成简单的系统状态文本,会产生5秒延时
返回:
系统状态清单
警告信息
'''
def graph_process(percent: int, length: int = 15):
'''生成进度条,percent为进度[0-100],length为长度'''
percent_count = int(length*percent/100)
return(f'{str(percent).rjust(4)}% [{"#"*percent_count}{"_"*(length-percent_count)}]')
def check_status():
'''判断数值是否异常'''
warns = []
if cpu_percent >= 95:
warns.append(' CPU负载过高\n')
if mem_percent >= 95:
warns.append(' 内存占用过高\n')
for load in cpu_load:
if load+0.3 >= cpu_trade:
warns.append(' 系统负载过高\n')
break
for usage in disk_percent:
if usage >= 95:
warns.append(' 磁盘空间不足\n')
break
if warns:
return(f'警告信息:\n{"".join(warns)}\n')
else:
return(False)
uname = platform.uname()
sys_hostname = uname.node
sys_type = uname.system
if sys_type == 'Linux':
sys_version = ' '.join(platform.dist())
else:
sys_version = f'{uname.system} {uname.version}'
time_boot = datetime.fromtimestamp(psutil.boot_time())
time_curr = datetime.now()
time_pass = time_curr - time_boot
cpu_percent = psutil.cpu_percent()
cpu_core = psutil.cpu_count(logical=False)
cpu_trade = psutil.cpu_count(logical=True)
cpu_load = psutil.getloadavg()
mem = psutil.virtual_memory()
mem_total = mem.total
mem_used = mem.used
mem_percent = mem.percent
disk_info = []
disk_percent = []
for d in psutil.disk_partitions():
if d.fstype:
usage = psutil.disk_usage(d.mountpoint)
d_total = usage.total
d_used = usage.used
d_percent = usage.percent
disk_percent.append(d_percent)
disk_info.append(
f'磁盘{d.mountpoint.ljust(9)}: '
f'{graph_process(d_percent)} '
f'{size2str(d_used)}/{size2str(d_total)}\n')
else:
d_total = 0
d_used = 0
d_percent = 0
disk_info.append(
f'磁盘{d.mountpoint.ljust(9)}: {graph_process(0)} 不可用\n')
net_old = psutil.net_io_counters()
sleep(5)
net = psutil.net_io_counters()
net_send_s = (net.bytes_sent - net_old.bytes_sent)//5
net_recv_s = (net.bytes_recv - net_old.bytes_recv)//5
net_send = net.bytes_sent
net_recv = net.bytes_recv
user_info = []
for i, user in enumerate(psutil.users(), 1):
u_name = user.name
u_host = user.host
u_start = user.started
user_info.append(f' {i}. {u_name} [{u_host}] 自 '
f'{datetime2str(datetime.fromtimestamp(u_start))}\n')
warns = check_status()
msg = (f'{"系统状态".center(50,"=")}\n'
f'{warns if warns else ""}'
f'主机名称{" "*6}:{sys_hostname}\n'
f'系统版本{" "*6}:{sys_version}\n'
'\n'
f'当前时间{" "*6}:{datetime2str(time_curr)}\n'
f'运行时间{" "*6}:{time_pass.days}天{int(time_pass.seconds//3600)}时\n'
f'平均负载{" "*6}:{cpu_load[0]} {cpu_load[1]} {cpu_load[2]}\n'
'\n'
f'CPU使用{" "*6}:{graph_process(cpu_percent)} '
f'{cpu_core}C{cpu_trade}T\n'
f'内存使用{" "*6}:{graph_process(mem_percent)} '
f'{size2str(mem_used)}/{size2str(mem_total)}\n'
f'{"".join(disk_info)}\n'
f'当前流量{" "*6}:{size2str(net_send_s)}/S 发送 {size2str(net_recv_s)}/S 接收\n'
f'累计流量{" "*6}:{size2str(net_send)} 发送 {size2str(net_recv)} 接收\n'
'\n'
f'登陆用户{" "*6}:\n{"".join(user_info) if user_info else " 无登录用户"}'
)
return (msg, warns)
顺便写了个生成系统报告的功能。
部署方法
- Linux
-
下载程序
git clone https://github.com/chr233/backup_by_mail cd backup_by_mail chmod 755 run.py mv config.py.example config.py
-
配置SMTP服务器
# SMTP邮箱设置 smtp_user = '[email protected]' smtp_pass = 'xxxxxxxxx' smtp_host = 'smtp.qq.com' smtp_port = 465 # 发件邮箱,收件邮箱 email_sender = '[email protected]' email_reveive = '[email protected]' # 收发人昵称,可以为空 email_from = '邮件备份工具' email_to = '' # 自定义主机名,显示在邮件主题中,便于区分 # 留空使用计算机主机名 host_name = '土豆服务器'
-
使用示例:
# 发送系统报告 ./run.py # 发送文件备份 ./run.py 要备份的文件目录
-
使用Cron添加计划任务即可自动执行
# 每天0点0分发送系统报告 0 0 * * * /root/backup_by_mail/run.py > /dev/null 2>&1 # 每天0点0分发送文件备份 0 0 * * * /root/backup_by_mail/run.py 要备份文件所在目录 > /dev/null 2>&1
- Windows
- 下载源码 链接
- 配置SMTP服务器 (同上)
- 添加到计划任务或者手动运行
效果图:
切片文件还原方法
A和B为文件碎片,C为拼接后的完整文件
-
Linux
cat A B > C
-
Windows
copy /b A + B C
本文链接:https://blog.chrxw.com/archives/2020/03/15/1149.html
转载请保留本文链接,谢谢
1 条评论
你好,WINDOWS下可以配置为每天自动运行吗?