# -*- coding: utf-8 -*-
"""
Created on Tue Apr 22 09:05:46 2025
@author: 1
"""
import requests
import time
import schedule
import sched
from datetime import datetime, timedelta
import threading
import pymysql
from concurrent.futures import ThreadPoolExecutor
import logging
from queue import Queue
import os
from pathlib import Path
# 获取虚拟环境目录
venv_dir = os.environ.get('VIRTUAL_ENV', None)
if venv_dir:
log_path = Path(venv_dir) / "app.log"
else:
log_path = Path.home() / "app.log"
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s [%(levelname)s] %(message)s',
handlers=[
logging.FileHandler(log_path),
logging.StreamHandler()
]
)
# 信息等级定义
INFO_LEVELS = {
'紧急': {'retry_interval': 60, 'max_retries': 3},
'重要': {'retry_interval': 60, 'max_retries': 3},
'一般': {'retry_interval': 60, 'max_retries': 3}
}
# 数据库连接配置
DB_CONFIG = {
'host': '************',
'user': 'root',
'password': '*******',
'database': '***********',
'charset': 'utf8mb4',
'cursorclass': pymysql.cursors.DictCursor
}
# 创建线程池
executor = ThreadPoolExecutor(max_workers=20)
class MessageSender:
def __init__(self):
self.message_queue = Queue()
self.lock = threading.RLock() # 可重入锁
self.scheduled_messages = {}
def send_message(self, group, message, info_level, attempt=1):
"""发送消息到企业微信机器人"""
headers = {'Content-Type': 'application/json'}
payload = {
"msgtype": "text",
"text": {
"content": message
}
}
try:
response = requests.post(group['robot_webhook'], json=payload, headers=headers, timeout=10)
if response.status_code == 200 and response.json().get('errcode') == 0:
logging.info(f"成功发送到群 {group['group_name']}")
return True
else:
logging.warning(f"发送到群 {group['group_name']} 失败,状态码: {response.status_code}")
return False
except Exception as e:
logging.error(f"发送到群 {group['group_name']} 异常: {e}")
return False
def retry_send(self, group, message, info_level, upno, sendplan, plantime, attempt=1):
"""独立的重试逻辑,不影响其他消息"""
max_retries = INFO_LEVELS[info_level]['max_retries']
retry_interval = INFO_LEVELS[info_level]['retry_interval']
final_retry_delay = 3600 # 1小时(秒)
success = self.send_message(group, message, info_level, attempt)
if not success:
if attempt <= max_retries:
# 前3次快速重试
logging.info(f"将在 {retry_interval} 秒后重试 (尝试 {attempt}/{max_retries})")
time.sleep(retry_interval)
self.retry_send(group, message, info_level, upno, sendplan, plantime, attempt + 1)
else:
# 3次失败后,1小时后再试
logging.warning(f"3次重试失败,将在1小时后再次尝试")
time.sleep(final_retry_delay)
# 重置尝试次数
self.retry_send(group, message, info_level, upno, sendplan, plantime, attempt=1)
def send_to_superior_groups(self, group, message, info_level, upno, sendplan, plantime, attempt=1):
"""向上级群组发送消息,每个上级群组独立处理"""
current_group = group
while current_group['parent_group_id'] is not None:
print('retry5')
superior = self.find_group_by_id(current_group['parent_group_id'])
if superior:
# 为每个上级群组创建独立线程
print('retry6')
executor.submit(self.retry_send, superior, message, info_level, upno, sendplan, plantime)
if attempt == 2:
break
print('retry7')
attempt = attempt + 1
if upno == 1:
break
print('retry8')
current_group = superior
else:
break
def find_group_by_id(self, group_id):
"""根据ID查找群组信息"""
try:
connection = pymysql.connect(**DB_CONFIG) # 不加锁
with connection.cursor() as cursor:
with self.lock: # 只锁住查询部分
cursor.execute(
"SELECT id, group_name, level, robot_webhook, parent_group_id FROM wechat_groups WHERE id = %s",
(group_id,)
)
return cursor.fetchone()
except Exception as e:
logging.error(f"查找群组时出错: {e}")
return None
finally:
if 'connection' in locals() and connection:
connection.close() # 确保连接关闭
def find_group_by_name(self, group_name):
"""根据名称查找群组信息"""
try:
with self.lock:
connection = pymysql.connect(**DB_CONFIG)
with connection.cursor() as cursor:
sql = "SELECT id, group_name, level, robot_webhook, parent_group_id FROM wechat_groups WHERE group_name = %s"
cursor.execute(sql, (group_name,))
print('按群名查找,',group_name)
result = cursor.fetchone()
print('执行命令,')
print(f'查询结果: {result}')
print(1111)
if not result:
logging.warning(f"未找到群组: {group_name}")
return result
except Exception as e:
print(f"查找群组时出错: {e}")
logging.error(f"查找群组时出错: {e}")
return None
finally:
if 'connection' in locals() and connection:
connection.close()
def schedule_message(self, row):
"""根据plantime安排消息发送"""
try:
plantime = row['plantime']
sendplan = row['sendplan'] # 1即时发送 2 定时发送
print('sendplan,',sendplan)
target_group2 = row['role']
print('目前消息群,',target_group2)
if sendplan == 1:
print('如果sendplan=1,立即发送')
# 如果sendplan=1,立即发送
self.process_message(row)
return
print('如果sendplan不是1,定时发送')
# 将plantime转换为datetime对象
if isinstance(plantime, str):
send_time = datetime.strptime(plantime, '%Y-%m-%d %H:%M:%S')
else:
send_time = plantime
now = datetime.now()
if send_time <= now:
# 如果发送时间已过,立即发送
print('如果发送时间已过,立即发送,',target_group2)
target_group = self.find_group_by_name(target_group2)
print('hhh')
if not target_group:
print('jjj')
logging.warning(f" 未找到群组 '{row['role']}',跳过此消息")
return # 直接返回,不继续执行
self.process_message(row)
else:
# 计算延迟时间(秒)
delay = (send_time - now).total_seconds()
# 为消息创建定时任务
message_id = row['id']
print('为消息创建定时任务,',message_id)
if message_id not in self.scheduled_messages:
timer = threading.Timer(delay, self.process_message, args=(row,))
timer.start()
print('已安排')
self.scheduled_messages[message_id] = timer
logging.info(f"已安排消息 {message_id} 在 {send_time} 发送")
except Exception as e:
logging.error(f"安排消息发送时出错: {e}")
def process_message(self, row):
"""处理单条消息"""
try:
message = f"通知: {row['result']}"
target_group = self.find_group_by_name(row['role'])
if not target_group:
logging.warning(f" 未找到群组 '{row['role']}',跳过此消息")
return # 直接返回,不继续执行
upno = row['upno']
sendplan = row['sendplan']
plantime = row['plantime']
# 主发送
self.retry_send(target_group, message, row['levelname'], upno, sendplan, plantime)
# 向上级发送
self.send_to_superior_groups(target_group, message, row['levelname'], upno, sendplan, plantime)
# 更新数据库状态
self.update_message_status(row['id'])
# 从预定消息中移除
if row['id'] in self.scheduled_messages:
del self.scheduled_messages[row['id']]
except Exception as e:
logging.error(f" 处理消息时出错: {e}")
def update_message_status(self, message_id):
"""更新消息状态"""
try:
with self.lock:
connection = pymysql.connect(**DB_CONFIG)
with connection.cursor() as cursor:
update_sql = "UPDATE ai_dayairesult SET isend = 1 WHERE id = %s"
cursor.execute(update_sql, (message_id,))
connection.commit()
except Exception as e:
logging.error(f"更新消息状态时出错: {e}")
finally:
if 'connection' in locals() and connection:
connection.close()
def check_and_schedule_messages(self):
"""检查并安排消息发送"""
try:
with self.lock:
connection = pymysql.connect(**DB_CONFIG)
with connection.cursor() as cursor:
sql = """SELECT id, sqlcmd, result, createdate, role, details, levelname, upno, sendplan, plantime
FROM ai_dayairesult
WHERE isend = 0"""
cursor.execute(sql)
results = cursor.fetchall()
for row in results:
try:
self.schedule_message(row) # 即使某条失败,继续下一条
except Exception as e:
logging.error(f" 安排消息 {row['id']} 时出错: {e}")
except Exception as e:
logging.error(f" 检查并安排消息时出错: {e}")
finally:
if 'connection' in locals() and connection:
connection.close()
def run_scheduler(sender):
"""运行定时任务"""
# 每分钟检查一次待发送消息
schedule.every(1).minutes.do(sender.check_and_schedule_messages)
while True:
schedule.run_pending()
time.sleep(1)
if __name__ == "__main__":
sender = MessageSender()
# 启动调度器线程
scheduler_thread = threading.Thread(target=run_scheduler, args=(sender,))
scheduler_thread.daemon = True
scheduler_thread.start()
# 立即执行一次检查
sender.check_and_schedule_messages()
# 主线程保持运行
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
logging.info("程序退出")
# 取消所有预定但未发送的消息
for timer in sender.scheduled_messages.values():
timer.cancel()