我写了个脚本来监控家里电视机的在线时长,如果时间达到我设定的阈值,则出发告警。
我现在想在告警的基础上,可以调用routeos的接口来启用我提前配置好的限速策略。最终实现,比如我家娃娃看电视半小时了,则自动给电视机断网。
curl -k -u admin:password http://192.168.1.1/rest/queue/simple
curl -k -u admin:password -X PATCH http://192.168.1.1/rest/queue/simple/*1 --data '{"disabled":"false"}' -H "content-type: application/json"
curl -k -u admin:password -X PATCH http://192.168.1.1/rest/queue/simple/*1 --data '{"disabled":"false"}' -H "content-type: application/json"
我们可以通过上面这种接口来实现我们的需求,更多api能力参考:
https://help.mikrotik.com/docs/display/ROS/REST+API
我的完整监控脚本
import subprocess
import time
import requests
import json
from apscheduler.schedulers.blocking import BlockingScheduler
from datetime import datetime, timedelta
from requests.auth import HTTPBasicAuth
import logging
import os
# 配置日志记录
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler("TVMonitor.log"),
logging.StreamHandler()
]
)
# 电视的 IP 地址
IP_ADDRESS = "192.168.1.100"
# 发送消息的 API URL
API_URL = "https://wechat.api/send_message"
# Mikrotik REST API 配置
MIKROTIK_IP = "192.168.1..1"
MIKROTIK_PORT = 80
MIKROTIK_USERNAME = os.getenv('MIKROTIK_USERNAME', 'admin')
MIKROTIK_PASSWORD = os.getenv('MIKROTIK_PASSWORD', 'admin')
QUEUE_NAME = "TVLimit" # 需要操作的队列名称
WX_ID="weixin_id"
# 阈值时间配置(单位:秒)
QUEUE_ENABLE_DELAY = 1800 # 30 分钟
QUEUE_DURATION = 3 * 3600 # 3 小时
# 创建调度器
scheduler = BlockingScheduler()
def ping_ip(ip):
"""Ping 指定 IP 地址,返回是否通畅。"""
try:
response = subprocess.run(['ping', '-c', '1', ip], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
if response.returncode == 0:
logging.info(f"成功 ping 通 {ip}")
return True
else:
logging.warning(f"无法 ping 通 {ip}")
return False
except Exception as e:
logging.error(f"Ping {ip} 时出错: {e}")
return False
def call_api(url, data):
"""调用发送消息的 API。"""
try:
response = requests.post(url, json=data, timeout=10)
if response.status_code == 200:
logging.info(f"API 响应: {response.status_code}, {response.text}")
else:
logging.warning(f"API 响应: {response.status_code}, {response.text}")
except Exception as e:
logging.error(f"调用 API 时出错: {e}")
def format_duration(seconds):
"""格式化持续时间为 'X小时Y分钟'。"""
hours = seconds // 3600
minutes = (seconds % 3600) // 60
return f"{hours}小时{minutes}分钟"
def get_queue_id():
"""通过队列名称获取队列的 ID。"""
url = f"http://{MIKROTIK_IP}:{MIKROTIK_PORT}/rest/queue/simple"
try:
response = requests.get(url, auth=HTTPBasicAuth(MIKROTIK_USERNAME, MIKROTIK_PASSWORD), verify=False, timeout=10)
response.raise_for_status()
queues = response.json()
for queue in queues:
if queue.get('name') == QUEUE_NAME:
queue_id = queue.get('.id') or queue.get('id')
logging.info(f"找到队列 {QUEUE_NAME} 的 ID: {queue_id}")
return queue_id
logging.error(f"未找到名为 {QUEUE_NAME} 的队列。")
return None
except requests.exceptions.RequestException as e:
logging.error(f"获取队列 ID 时出错: {e}")
return None
def enable_queue():
"""调用 Mikrotik REST API 启用限速队列。"""
queue_id = get_queue_id()
if not queue_id:
return
url = f"http://{MIKROTIK_IP}:{MIKROTIK_PORT}/rest/queue/simple/{queue_id}"
data = {"disabled": False}
try:
response = requests.patch(
url,
auth=HTTPBasicAuth(MIKROTIK_USERNAME, MIKROTIK_PASSWORD),
headers={"Content-Type": "application/json"},
data=json.dumps(data),
verify=False,
timeout=10
)
response.raise_for_status()
logging.info(f"已启用队列 {QUEUE_NAME}。响应: {response.status_code}")
except requests.exceptions.RequestException as e:
logging.error(f"启用队列时出错: {e}")
def disable_queue_func():
"""调用 Mikrotik REST API 停用限速队列。"""
queue_id = get_queue_id()
if not queue_id:
return
url = f"http://{MIKROTIK_IP}:{MIKROTIK_PORT}/rest/queue/simple/{queue_id}"
data = {"disabled": True}
try:
response = requests.patch(
url,
auth=HTTPBasicAuth(MIKROTIK_USERNAME, MIKROTIK_PASSWORD),
headers={"Content-Type": "application/json"},
data=json.dumps(data),
verify=False,
timeout=10
)
response.raise_for_status()
logging.info(f"已停用队列 {QUEUE_NAME}。响应: {response.status_code}")
except requests.exceptions.RequestException as e:
logging.error(f"停用队列时出错: {e}")
class TVChecker:
def __init__(self, scheduler):
self.ip_alive = False
self.start_time = None
self.queue_enabled = False
self.scheduler = scheduler
def check(self):
"""检查电视的状态并执行相应的操作。"""
is_up = ping_ip(IP_ADDRESS)
current_time = time.time()
if is_up and not self.ip_alive:
# 电视刚开机
self.ip_alive = True
self.start_time = current_time
call_api(API_URL, {"userid": WX_ID, "message": "电视开机了"})
logging.info("电视开机,发送通知。")
elif is_up and self.ip_alive:
# 电视持续开机,检查是否超过阈值时间
if not self.queue_enabled and (current_time - self.start_time) > QUEUE_ENABLE_DELAY:
message = "电视已开机超过30分钟,启用限速队列。"
logging.info(message)
call_api(API_URL, {"userid": WX_ID, "message": message})
enable_queue()
self.queue_enabled = True
# 调度3小时后停用队列
run_time = datetime.now() + timedelta(seconds=QUEUE_DURATION)
self.scheduler.add_job(disable_queue_func, 'date', run_date=run_time, id='disable_queue_job')
message = f"已安排在 {run_time} 停用限速队列。"
logging.info(message)
call_api(API_URL, {"userid": WX_ID, "message": message})
elif not is_up and self.ip_alive:
# 电视关机
self.ip_alive = False
if self.start_time:
alive_duration = format_duration(int(current_time - self.start_time))
else:
alive_duration = "未知时长"
message = f"电视关机了,开机时长{alive_duration}"
call_api(API_URL, {"userid": WX_ID, "message": message})
logging.info(f"电视关机,发送通知。开机时长: {alive_duration}")
# 如果限速队列已启用,立即停用
if self.queue_enabled:
logging.info("电视关机时,限速队列已启用,立即停用。")
# 移除已调度的停用任务,以防重复执行
try:
self.scheduler.remove_job('disable_queue_job')
logging.info("已移除停用队列的调度任务。")
except Exception as e:
logging.warning(f"移除停用任务时出错: {e}")
disable_queue_func()
self.start_time = None
def main():
"""主函数,设置调度任务并启动调度器。"""
checker = TVChecker(scheduler)
# 每60秒检查一次电视状态
scheduler.add_job(checker.check, 'interval', seconds=60, id='tv_check_job')
try:
logging.info("启动调度器,开始监控电视状态...")
scheduler.start()
except (KeyboardInterrupt, SystemExit):
logging.info("调度器已停止。")
if __name__ == "__main__":
# 测试启用和停用队列
enable_queue()
time.sleep(10)
disable_queue_func()
# 主函数
main()