在移动端开发中,获取当前定位是一个常见的需求。在不同的移动操作系统上,获取定位的方式也是不一样的。下面将详细介绍在Android、iOS、H5和使用uni-app框架下获取定位的方法,同时,也提供了在uni-app开发时常用的几种方法。
2025年05月13日
在移动端开发中,获取当前定位是一个常见的需求。在不同的移动操作系统上,获取定位的方式也是不一样的。下面将详细介绍在Android、iOS、H5和使用uni-app框架下获取定位的方法,同时,也提供了在uni-app开发时常用的几种方法。
2025年05月13日
机器之心专栏
作者:kerlomz
网上关于验证码识别的开源项目众多,但大多是学术型文章或者仅仅是一个测试 demo,那么企业级的验证码识别究竟是怎样的呢?
1. 前言
网上关于验证么识别的开源项目众多,但大多是学术型文章或者仅仅是一个测试 demo,那么企业级的验证码识别究竟是怎样的呢?前方高能预警,这是一个生产水准的验证码识别项目,笔者可以向你们保证,它一定会是各位所见过的文章中最实用的,你甚至可以不需要懂代码写代码就能轻松使用它训练一个 99 识别率的模型。这才是企业级应该有的样子:算法开发负责框架,训练只需要一个实习生。不仅操作上简单,在可用性和稳定性上也是经得起考验。性能上,笔者使用腾讯云 1 核 1G 的机器测试:单次识别平均在 12ms 左右,再也不需要 GPU 部署了,CPU 一样可以日调百万。
2025年05月13日
一、系统概述
本监控系统基于 Python 语言开发,采用 Tkinter 实现 GUI 界面,结合 snap7 库实现与西门子 S7-200 Smart PLC 的通信。系统实现了对工业现场水泵机组、压力参数、液位高度及进水阀门的实时监控与控制功能,具备以下核心功能:
多设备监控:同时监控 3 台水泵的运行状态;
2025年05月13日
转载本文需注明出处:微信公众号EAWorld,违者必究。
一个APP的启动与结束都是伴随着RunLoop循环往复的,不断的循环、不断的往复。当线程被杀掉、APP退出后被系统以占用内存为由杀掉,RunLoop就消失了。但平时开发中很少见到RunLoop,为何它如此神秘?本文跟大家分享一下RunLoop的相关知识。
2025年05月13日
0.简介
现今时序数据库的应用场景十分广泛,其通过保留时间序列的性质,记录下事务随时间变化的事实。本文将介绍时序数据库产生背景,概念等基础知识,以及对于PG中时序数据库插件TimescaleDB做原理和代码的剖析。
1.基础知识
1.1 背景
2025年05月13日
# -*- coding: utf-8 -*-
"""
Created on Tue Apr 22 09:05:46 2025
@author: 1
"""
import requests
import time
import schedule
import sched
from datetime import datetime, timedelta
import threading
import pymysql
from concurrent.futures import ThreadPoolExecutor
import logging
from queue import Queue
import os
from pathlib import Path
# 获取虚拟环境目录
venv_dir = os.environ.get('VIRTUAL_ENV', None)
if venv_dir:
log_path = Path(venv_dir) / "app.log"
else:
log_path = Path.home() / "app.log"
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s [%(levelname)s] %(message)s',
handlers=[
logging.FileHandler(log_path),
logging.StreamHandler()
]
)
# 信息等级定义
INFO_LEVELS = {
'紧急': {'retry_interval': 60, 'max_retries': 3},
'重要': {'retry_interval': 60, 'max_retries': 3},
'一般': {'retry_interval': 60, 'max_retries': 3}
}
# 数据库连接配置
DB_CONFIG = {
'host': '************',
'user': 'root',
'password': '*******',
'database': '***********',
'charset': 'utf8mb4',
'cursorclass': pymysql.cursors.DictCursor
}
# 创建线程池
executor = ThreadPoolExecutor(max_workers=20)
class MessageSender:
def __init__(self):
self.message_queue = Queue()
self.lock = threading.RLock() # 可重入锁
self.scheduled_messages = {}
def send_message(self, group, message, info_level, attempt=1):
"""发送消息到企业微信机器人"""
headers = {'Content-Type': 'application/json'}
payload = {
"msgtype": "text",
"text": {
"content": message
}
}
try:
response = requests.post(group['robot_webhook'], json=payload, headers=headers, timeout=10)
if response.status_code == 200 and response.json().get('errcode') == 0:
logging.info(f"成功发送到群 {group['group_name']}")
return True
else:
logging.warning(f"发送到群 {group['group_name']} 失败,状态码: {response.status_code}")
return False
except Exception as e:
logging.error(f"发送到群 {group['group_name']} 异常: {e}")
return False
def retry_send(self, group, message, info_level, upno, sendplan, plantime, attempt=1):
"""独立的重试逻辑,不影响其他消息"""
max_retries = INFO_LEVELS[info_level]['max_retries']
retry_interval = INFO_LEVELS[info_level]['retry_interval']
final_retry_delay = 3600 # 1小时(秒)
success = self.send_message(group, message, info_level, attempt)
if not success:
if attempt <= max_retries:
# 前3次快速重试
logging.info(f"将在 {retry_interval} 秒后重试 (尝试 {attempt}/{max_retries})")
time.sleep(retry_interval)
self.retry_send(group, message, info_level, upno, sendplan, plantime, attempt + 1)
else:
# 3次失败后,1小时后再试
logging.warning(f"3次重试失败,将在1小时后再次尝试")
time.sleep(final_retry_delay)
# 重置尝试次数
self.retry_send(group, message, info_level, upno, sendplan, plantime, attempt=1)
def send_to_superior_groups(self, group, message, info_level, upno, sendplan, plantime, attempt=1):
"""向上级群组发送消息,每个上级群组独立处理"""
current_group = group
while current_group['parent_group_id'] is not None:
print('retry5')
superior = self.find_group_by_id(current_group['parent_group_id'])
if superior:
# 为每个上级群组创建独立线程
print('retry6')
executor.submit(self.retry_send, superior, message, info_level, upno, sendplan, plantime)
if attempt == 2:
break
print('retry7')
attempt = attempt + 1
if upno == 1:
break
print('retry8')
current_group = superior
else:
break
def find_group_by_id(self, group_id):
"""根据ID查找群组信息"""
try:
connection = pymysql.connect(**DB_CONFIG) # 不加锁
with connection.cursor() as cursor:
with self.lock: # 只锁住查询部分
cursor.execute(
"SELECT id, group_name, level, robot_webhook, parent_group_id FROM wechat_groups WHERE id = %s",
(group_id,)
)
return cursor.fetchone()
except Exception as e:
logging.error(f"查找群组时出错: {e}")
return None
finally:
if 'connection' in locals() and connection:
connection.close() # 确保连接关闭
def find_group_by_name(self, group_name):
"""根据名称查找群组信息"""
try:
with self.lock:
connection = pymysql.connect(**DB_CONFIG)
with connection.cursor() as cursor:
sql = "SELECT id, group_name, level, robot_webhook, parent_group_id FROM wechat_groups WHERE group_name = %s"
cursor.execute(sql, (group_name,))
print('按群名查找,',group_name)
result = cursor.fetchone()
print('执行命令,')
print(f'查询结果: {result}')
print(1111)
if not result:
logging.warning(f"未找到群组: {group_name}")
return result
except Exception as e:
print(f"查找群组时出错: {e}")
logging.error(f"查找群组时出错: {e}")
return None
finally:
if 'connection' in locals() and connection:
connection.close()
def schedule_message(self, row):
"""根据plantime安排消息发送"""
try:
plantime = row['plantime']
sendplan = row['sendplan'] # 1即时发送 2 定时发送
print('sendplan,',sendplan)
target_group2 = row['role']
print('目前消息群,',target_group2)
if sendplan == 1:
print('如果sendplan=1,立即发送')
# 如果sendplan=1,立即发送
self.process_message(row)
return
print('如果sendplan不是1,定时发送')
# 将plantime转换为datetime对象
if isinstance(plantime, str):
send_time = datetime.strptime(plantime, '%Y-%m-%d %H:%M:%S')
else:
send_time = plantime
now = datetime.now()
if send_time <= now:
# 如果发送时间已过,立即发送
print('如果发送时间已过,立即发送,',target_group2)
target_group = self.find_group_by_name(target_group2)
print('hhh')
if not target_group:
print('jjj')
logging.warning(f" 未找到群组 '{row['role']}',跳过此消息")
return # 直接返回,不继续执行
self.process_message(row)
else:
# 计算延迟时间(秒)
delay = (send_time - now).total_seconds()
# 为消息创建定时任务
message_id = row['id']
print('为消息创建定时任务,',message_id)
if message_id not in self.scheduled_messages:
timer = threading.Timer(delay, self.process_message, args=(row,))
timer.start()
print('已安排')
self.scheduled_messages[message_id] = timer
logging.info(f"已安排消息 {message_id} 在 {send_time} 发送")
except Exception as e:
logging.error(f"安排消息发送时出错: {e}")
def process_message(self, row):
"""处理单条消息"""
try:
message = f"通知: {row['result']}"
target_group = self.find_group_by_name(row['role'])
if not target_group:
logging.warning(f" 未找到群组 '{row['role']}',跳过此消息")
return # 直接返回,不继续执行
upno = row['upno']
sendplan = row['sendplan']
plantime = row['plantime']
# 主发送
self.retry_send(target_group, message, row['levelname'], upno, sendplan, plantime)
# 向上级发送
self.send_to_superior_groups(target_group, message, row['levelname'], upno, sendplan, plantime)
# 更新数据库状态
self.update_message_status(row['id'])
# 从预定消息中移除
if row['id'] in self.scheduled_messages:
del self.scheduled_messages[row['id']]
except Exception as e:
logging.error(f" 处理消息时出错: {e}")
def update_message_status(self, message_id):
"""更新消息状态"""
try:
with self.lock:
connection = pymysql.connect(**DB_CONFIG)
with connection.cursor() as cursor:
update_sql = "UPDATE ai_dayairesult SET isend = 1 WHERE id = %s"
cursor.execute(update_sql, (message_id,))
connection.commit()
except Exception as e:
logging.error(f"更新消息状态时出错: {e}")
finally:
if 'connection' in locals() and connection:
connection.close()
def check_and_schedule_messages(self):
"""检查并安排消息发送"""
try:
with self.lock:
connection = pymysql.connect(**DB_CONFIG)
with connection.cursor() as cursor:
sql = """SELECT id, sqlcmd, result, createdate, role, details, levelname, upno, sendplan, plantime
FROM ai_dayairesult
WHERE isend = 0"""
cursor.execute(sql)
results = cursor.fetchall()
for row in results:
try:
self.schedule_message(row) # 即使某条失败,继续下一条
except Exception as e:
logging.error(f" 安排消息 {row['id']} 时出错: {e}")
except Exception as e:
logging.error(f" 检查并安排消息时出错: {e}")
finally:
if 'connection' in locals() and connection:
connection.close()
def run_scheduler(sender):
"""运行定时任务"""
# 每分钟检查一次待发送消息
schedule.every(1).minutes.do(sender.check_and_schedule_messages)
while True:
schedule.run_pending()
time.sleep(1)
if __name__ == "__main__":
sender = MessageSender()
# 启动调度器线程
scheduler_thread = threading.Thread(target=run_scheduler, args=(sender,))
scheduler_thread.daemon = True
scheduler_thread.start()
# 立即执行一次检查
sender.check_and_schedule_messages()
# 主线程保持运行
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
logging.info("程序退出")
# 取消所有预定但未发送的消息
for timer in sender.scheduled_messages.values():
timer.cancel()
2025年05月13日
前几天教大家从入门到精通,当然仅靠那一篇文章是不足以带领大家精通JavaScript的,今天给大家带来第二讲!
BOM和DOM简介
BOM,Browser Object Model ,浏览器对象模型。
BOM主要提供了访问和操作浏览器各组件的方式。
浏览器组件:
window(浏览器窗口)
location(地址栏)
2025年05月13日
multiprocessing 模块为在子进程中运行任务、通信和共享数据,以及执行各种形式的同步提供支持。进程没有任何共享状态。
2025年05月13日
# 后端服务(FastAPI提供模型数据接口)
from fastapi import FastAPI
app = FastAPI()
@app.get("/scene_config")
def get_scene_config():
return {
"ground_size": 100,
"sky_color": "#87CEEB",
"objects": [
{"type": "tree", "position": [10,0,5]},
{"type": "rock", "position": [-5,0,8]}
]
}