转载本文需注明出处:微信公众号EAWorld,违者必究。
引言:
一个APP的启动与结束都是伴随着RunLoop循环往复的,不断的循环、不断的往复。当线程被杀掉、APP退出后被系统以占用内存为由杀掉,RunLoop就消失了。但平时开发中很少见到RunLoop,为何它如此神秘?本文跟大家分享一下RunLoop的相关知识。
2025年05月13日
转载本文需注明出处:微信公众号EAWorld,违者必究。
一个APP的启动与结束都是伴随着RunLoop循环往复的,不断的循环、不断的往复。当线程被杀掉、APP退出后被系统以占用内存为由杀掉,RunLoop就消失了。但平时开发中很少见到RunLoop,为何它如此神秘?本文跟大家分享一下RunLoop的相关知识。
2025年05月13日
0.简介
现今时序数据库的应用场景十分广泛,其通过保留时间序列的性质,记录下事务随时间变化的事实。本文将介绍时序数据库产生背景,概念等基础知识,以及对于PG中时序数据库插件TimescaleDB做原理和代码的剖析。
1.基础知识
1.1 背景
2025年05月13日
# -*- coding: utf-8 -*-
"""
Created on Tue Apr 22 09:05:46 2025
@author: 1
"""
import requests
import time
import schedule
import sched
from datetime import datetime, timedelta
import threading
import pymysql
from concurrent.futures import ThreadPoolExecutor
import logging
from queue import Queue
import os
from pathlib import Path
# 获取虚拟环境目录
venv_dir = os.environ.get('VIRTUAL_ENV', None)
if venv_dir:
log_path = Path(venv_dir) / "app.log"
else:
log_path = Path.home() / "app.log"
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s [%(levelname)s] %(message)s',
handlers=[
logging.FileHandler(log_path),
logging.StreamHandler()
]
)
# 信息等级定义
INFO_LEVELS = {
'紧急': {'retry_interval': 60, 'max_retries': 3},
'重要': {'retry_interval': 60, 'max_retries': 3},
'一般': {'retry_interval': 60, 'max_retries': 3}
}
# 数据库连接配置
DB_CONFIG = {
'host': '************',
'user': 'root',
'password': '*******',
'database': '***********',
'charset': 'utf8mb4',
'cursorclass': pymysql.cursors.DictCursor
}
# 创建线程池
executor = ThreadPoolExecutor(max_workers=20)
class MessageSender:
def __init__(self):
self.message_queue = Queue()
self.lock = threading.RLock() # 可重入锁
self.scheduled_messages = {}
def send_message(self, group, message, info_level, attempt=1):
"""发送消息到企业微信机器人"""
headers = {'Content-Type': 'application/json'}
payload = {
"msgtype": "text",
"text": {
"content": message
}
}
try:
response = requests.post(group['robot_webhook'], json=payload, headers=headers, timeout=10)
if response.status_code == 200 and response.json().get('errcode') == 0:
logging.info(f"成功发送到群 {group['group_name']}")
return True
else:
logging.warning(f"发送到群 {group['group_name']} 失败,状态码: {response.status_code}")
return False
except Exception as e:
logging.error(f"发送到群 {group['group_name']} 异常: {e}")
return False
def retry_send(self, group, message, info_level, upno, sendplan, plantime, attempt=1):
"""独立的重试逻辑,不影响其他消息"""
max_retries = INFO_LEVELS[info_level]['max_retries']
retry_interval = INFO_LEVELS[info_level]['retry_interval']
final_retry_delay = 3600 # 1小时(秒)
success = self.send_message(group, message, info_level, attempt)
if not success:
if attempt <= max_retries:
# 前3次快速重试
logging.info(f"将在 {retry_interval} 秒后重试 (尝试 {attempt}/{max_retries})")
time.sleep(retry_interval)
self.retry_send(group, message, info_level, upno, sendplan, plantime, attempt + 1)
else:
# 3次失败后,1小时后再试
logging.warning(f"3次重试失败,将在1小时后再次尝试")
time.sleep(final_retry_delay)
# 重置尝试次数
self.retry_send(group, message, info_level, upno, sendplan, plantime, attempt=1)
def send_to_superior_groups(self, group, message, info_level, upno, sendplan, plantime, attempt=1):
"""向上级群组发送消息,每个上级群组独立处理"""
current_group = group
while current_group['parent_group_id'] is not None:
print('retry5')
superior = self.find_group_by_id(current_group['parent_group_id'])
if superior:
# 为每个上级群组创建独立线程
print('retry6')
executor.submit(self.retry_send, superior, message, info_level, upno, sendplan, plantime)
if attempt == 2:
break
print('retry7')
attempt = attempt + 1
if upno == 1:
break
print('retry8')
current_group = superior
else:
break
def find_group_by_id(self, group_id):
"""根据ID查找群组信息"""
try:
connection = pymysql.connect(**DB_CONFIG) # 不加锁
with connection.cursor() as cursor:
with self.lock: # 只锁住查询部分
cursor.execute(
"SELECT id, group_name, level, robot_webhook, parent_group_id FROM wechat_groups WHERE id = %s",
(group_id,)
)
return cursor.fetchone()
except Exception as e:
logging.error(f"查找群组时出错: {e}")
return None
finally:
if 'connection' in locals() and connection:
connection.close() # 确保连接关闭
def find_group_by_name(self, group_name):
"""根据名称查找群组信息"""
try:
with self.lock:
connection = pymysql.connect(**DB_CONFIG)
with connection.cursor() as cursor:
sql = "SELECT id, group_name, level, robot_webhook, parent_group_id FROM wechat_groups WHERE group_name = %s"
cursor.execute(sql, (group_name,))
print('按群名查找,',group_name)
result = cursor.fetchone()
print('执行命令,')
print(f'查询结果: {result}')
print(1111)
if not result:
logging.warning(f"未找到群组: {group_name}")
return result
except Exception as e:
print(f"查找群组时出错: {e}")
logging.error(f"查找群组时出错: {e}")
return None
finally:
if 'connection' in locals() and connection:
connection.close()
def schedule_message(self, row):
"""根据plantime安排消息发送"""
try:
plantime = row['plantime']
sendplan = row['sendplan'] # 1即时发送 2 定时发送
print('sendplan,',sendplan)
target_group2 = row['role']
print('目前消息群,',target_group2)
if sendplan == 1:
print('如果sendplan=1,立即发送')
# 如果sendplan=1,立即发送
self.process_message(row)
return
print('如果sendplan不是1,定时发送')
# 将plantime转换为datetime对象
if isinstance(plantime, str):
send_time = datetime.strptime(plantime, '%Y-%m-%d %H:%M:%S')
else:
send_time = plantime
now = datetime.now()
if send_time <= now:
# 如果发送时间已过,立即发送
print('如果发送时间已过,立即发送,',target_group2)
target_group = self.find_group_by_name(target_group2)
print('hhh')
if not target_group:
print('jjj')
logging.warning(f" 未找到群组 '{row['role']}',跳过此消息")
return # 直接返回,不继续执行
self.process_message(row)
else:
# 计算延迟时间(秒)
delay = (send_time - now).total_seconds()
# 为消息创建定时任务
message_id = row['id']
print('为消息创建定时任务,',message_id)
if message_id not in self.scheduled_messages:
timer = threading.Timer(delay, self.process_message, args=(row,))
timer.start()
print('已安排')
self.scheduled_messages[message_id] = timer
logging.info(f"已安排消息 {message_id} 在 {send_time} 发送")
except Exception as e:
logging.error(f"安排消息发送时出错: {e}")
def process_message(self, row):
"""处理单条消息"""
try:
message = f"通知: {row['result']}"
target_group = self.find_group_by_name(row['role'])
if not target_group:
logging.warning(f" 未找到群组 '{row['role']}',跳过此消息")
return # 直接返回,不继续执行
upno = row['upno']
sendplan = row['sendplan']
plantime = row['plantime']
# 主发送
self.retry_send(target_group, message, row['levelname'], upno, sendplan, plantime)
# 向上级发送
self.send_to_superior_groups(target_group, message, row['levelname'], upno, sendplan, plantime)
# 更新数据库状态
self.update_message_status(row['id'])
# 从预定消息中移除
if row['id'] in self.scheduled_messages:
del self.scheduled_messages[row['id']]
except Exception as e:
logging.error(f" 处理消息时出错: {e}")
def update_message_status(self, message_id):
"""更新消息状态"""
try:
with self.lock:
connection = pymysql.connect(**DB_CONFIG)
with connection.cursor() as cursor:
update_sql = "UPDATE ai_dayairesult SET isend = 1 WHERE id = %s"
cursor.execute(update_sql, (message_id,))
connection.commit()
except Exception as e:
logging.error(f"更新消息状态时出错: {e}")
finally:
if 'connection' in locals() and connection:
connection.close()
def check_and_schedule_messages(self):
"""检查并安排消息发送"""
try:
with self.lock:
connection = pymysql.connect(**DB_CONFIG)
with connection.cursor() as cursor:
sql = """SELECT id, sqlcmd, result, createdate, role, details, levelname, upno, sendplan, plantime
FROM ai_dayairesult
WHERE isend = 0"""
cursor.execute(sql)
results = cursor.fetchall()
for row in results:
try:
self.schedule_message(row) # 即使某条失败,继续下一条
except Exception as e:
logging.error(f" 安排消息 {row['id']} 时出错: {e}")
except Exception as e:
logging.error(f" 检查并安排消息时出错: {e}")
finally:
if 'connection' in locals() and connection:
connection.close()
def run_scheduler(sender):
"""运行定时任务"""
# 每分钟检查一次待发送消息
schedule.every(1).minutes.do(sender.check_and_schedule_messages)
while True:
schedule.run_pending()
time.sleep(1)
if __name__ == "__main__":
sender = MessageSender()
# 启动调度器线程
scheduler_thread = threading.Thread(target=run_scheduler, args=(sender,))
scheduler_thread.daemon = True
scheduler_thread.start()
# 立即执行一次检查
sender.check_and_schedule_messages()
# 主线程保持运行
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
logging.info("程序退出")
# 取消所有预定但未发送的消息
for timer in sender.scheduled_messages.values():
timer.cancel()
2025年05月13日
前几天教大家从入门到精通,当然仅靠那一篇文章是不足以带领大家精通JavaScript的,今天给大家带来第二讲!
BOM和DOM简介
BOM,Browser Object Model ,浏览器对象模型。
BOM主要提供了访问和操作浏览器各组件的方式。
浏览器组件:
window(浏览器窗口)
location(地址栏)
2025年05月13日
multiprocessing 模块为在子进程中运行任务、通信和共享数据,以及执行各种形式的同步提供支持。进程没有任何共享状态。
2025年05月13日
# 后端服务(FastAPI提供模型数据接口)
from fastapi import FastAPI
app = FastAPI()
@app.get("/scene_config")
def get_scene_config():
return {
"ground_size": 100,
"sky_color": "#87CEEB",
"objects": [
{"type": "tree", "position": [10,0,5]},
{"type": "rock", "position": [-5,0,8]}
]
}
2025年05月13日
此篇文章整理新手编写代码常见的一些错误,有些错误是粗心的错误,但对于新手而已,会折腾很长时间才搞定,所以在此总结下我遇到的一些问题。希望帮助到刚入门的朋友们。后续会不断补充。
2025年05月13日
信号是操作系统用来通知进程发生了某个事件的一种异步通信方式。在Python中,标准库的signal模块提供了处理这些系统信号的机制。信号通常由外部事件触发,例如用户按下Ctrl+C、子进程终止或系统资源耗尽等情况。对于开发系统程序、守护进程或需要长时间运行的应用程序,理解信号处理至关重要。
2025年05月13日
前文 《深入跨域 - 从初识到入门》 中,大家已经对同源与跨域的产生历史与重要性等有了一个初步的了解了,那么我们应该如何解决在日常开发中遇到的跨域引起的问题呢?