Press "Enter" to skip to content

使用restapi来控制mikrotik的功能

我写了个脚本来监控家里电视机的在线时长,如果时间达到我设定的阈值,则出发告警。

我现在想在告警的基础上,可以调用routeos的接口来启用我提前配置好的限速策略。最终实现,比如我家娃娃看电视半小时了,则自动给电视机断网。

curl -k -u admin:password http://192.168.1.1/rest/queue/simple

curl -k -u admin:password -X PATCH http://192.168.1.1/rest/queue/simple/*1 --data '{"disabled":"false"}' -H "content-type: application/json"

curl -k -u admin:password -X PATCH http://192.168.1.1/rest/queue/simple/*1 --data '{"disabled":"false"}' -H "content-type: application/json"

我们可以通过上面这种接口来实现我们的需求,更多api能力参考:

https://help.mikrotik.com/docs/display/ROS/REST+API

我的完整监控脚本

import subprocess
import time
import requests
import json
from apscheduler.schedulers.blocking import BlockingScheduler
from datetime import datetime, timedelta
from requests.auth import HTTPBasicAuth
import logging
import os

# 配置日志记录
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler("TVMonitor.log"),
        logging.StreamHandler()
    ]
)

# 电视的 IP 地址
IP_ADDRESS = "192.168.1.100"

# 发送消息的 API URL
API_URL = "https://wechat.api/send_message"

# Mikrotik REST API 配置
MIKROTIK_IP = "192.168.1..1"
MIKROTIK_PORT = 80
MIKROTIK_USERNAME = os.getenv('MIKROTIK_USERNAME', 'admin')
MIKROTIK_PASSWORD = os.getenv('MIKROTIK_PASSWORD', 'admin')
QUEUE_NAME = "TVLimit"  # 需要操作的队列名称
WX_ID="weixin_id"

# 阈值时间配置(单位:秒)
QUEUE_ENABLE_DELAY = 1800  # 30 分钟
QUEUE_DURATION = 3 * 3600  # 3 小时

# 创建调度器
scheduler = BlockingScheduler()

def ping_ip(ip):
    """Ping 指定 IP 地址,返回是否通畅。"""
    try:
        response = subprocess.run(['ping', '-c', '1', ip], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        if response.returncode == 0:
            logging.info(f"成功 ping 通 {ip}")
            return True
        else:
            logging.warning(f"无法 ping 通 {ip}")
            return False
    except Exception as e:
        logging.error(f"Ping {ip} 时出错: {e}")
        return False

def call_api(url, data):
    """调用发送消息的 API。"""
    try:
        response = requests.post(url, json=data, timeout=10)
        if response.status_code == 200:
            logging.info(f"API 响应: {response.status_code}, {response.text}")
        else:
            logging.warning(f"API 响应: {response.status_code}, {response.text}")
    except Exception as e:
        logging.error(f"调用 API 时出错: {e}")

def format_duration(seconds):
    """格式化持续时间为 'X小时Y分钟'。"""
    hours = seconds // 3600
    minutes = (seconds % 3600) // 60
    return f"{hours}小时{minutes}分钟"

def get_queue_id():
    """通过队列名称获取队列的 ID。"""
    url = f"http://{MIKROTIK_IP}:{MIKROTIK_PORT}/rest/queue/simple"
    try:
        response = requests.get(url, auth=HTTPBasicAuth(MIKROTIK_USERNAME, MIKROTIK_PASSWORD), verify=False, timeout=10)
        response.raise_for_status()
        queues = response.json()
        for queue in queues:
            if queue.get('name') == QUEUE_NAME:
                queue_id = queue.get('.id') or queue.get('id')
                logging.info(f"找到队列 {QUEUE_NAME} 的 ID: {queue_id}")
                return queue_id
        logging.error(f"未找到名为 {QUEUE_NAME} 的队列。")
        return None
    except requests.exceptions.RequestException as e:
        logging.error(f"获取队列 ID 时出错: {e}")
        return None

def enable_queue():
    """调用 Mikrotik REST API 启用限速队列。"""
    queue_id = get_queue_id()
    if not queue_id:
        return
    url = f"http://{MIKROTIK_IP}:{MIKROTIK_PORT}/rest/queue/simple/{queue_id}"
    data = {"disabled": False}
    try:
        response = requests.patch(
            url,
            auth=HTTPBasicAuth(MIKROTIK_USERNAME, MIKROTIK_PASSWORD),
            headers={"Content-Type": "application/json"},
            data=json.dumps(data),
            verify=False,
            timeout=10
        )
        response.raise_for_status()
        logging.info(f"已启用队列 {QUEUE_NAME}。响应: {response.status_code}")
    except requests.exceptions.RequestException as e:
        logging.error(f"启用队列时出错: {e}")

def disable_queue_func():
    """调用 Mikrotik REST API 停用限速队列。"""
    queue_id = get_queue_id()
    if not queue_id:
        return
    url = f"http://{MIKROTIK_IP}:{MIKROTIK_PORT}/rest/queue/simple/{queue_id}"
    data = {"disabled": True}
    try:
        response = requests.patch(
            url,
            auth=HTTPBasicAuth(MIKROTIK_USERNAME, MIKROTIK_PASSWORD),
            headers={"Content-Type": "application/json"},
            data=json.dumps(data),
            verify=False,
            timeout=10
        )
        response.raise_for_status()
        logging.info(f"已停用队列 {QUEUE_NAME}。响应: {response.status_code}")
    except requests.exceptions.RequestException as e:
        logging.error(f"停用队列时出错: {e}")

class TVChecker:
    def __init__(self, scheduler):
        self.ip_alive = False
        self.start_time = None
        self.queue_enabled = False
        self.scheduler = scheduler

    def check(self):
        """检查电视的状态并执行相应的操作。"""
        is_up = ping_ip(IP_ADDRESS)
        current_time = time.time()

        if is_up and not self.ip_alive:
            # 电视刚开机
            self.ip_alive = True
            self.start_time = current_time
            call_api(API_URL, {"userid": WX_ID, "message": "电视开机了"})
            logging.info("电视开机,发送通知。")

        elif is_up and self.ip_alive:
            # 电视持续开机,检查是否超过阈值时间
            if not self.queue_enabled and (current_time - self.start_time) > QUEUE_ENABLE_DELAY:
                message = "电视已开机超过30分钟,启用限速队列。"
                logging.info(message)
                call_api(API_URL, {"userid": WX_ID, "message": message})
                enable_queue()
                self.queue_enabled = True
                # 调度3小时后停用队列
                run_time = datetime.now() + timedelta(seconds=QUEUE_DURATION)
                self.scheduler.add_job(disable_queue_func, 'date', run_date=run_time, id='disable_queue_job')
                message = f"已安排在 {run_time} 停用限速队列。"
                logging.info(message)
                call_api(API_URL, {"userid": WX_ID, "message": message})

        elif not is_up and self.ip_alive:
            # 电视关机
            self.ip_alive = False
            if self.start_time:
                alive_duration = format_duration(int(current_time - self.start_time))
            else:
                alive_duration = "未知时长"
            message = f"电视关机了,开机时长{alive_duration}"
            call_api(API_URL, {"userid": WX_ID, "message": message})
            logging.info(f"电视关机,发送通知。开机时长: {alive_duration}")
            # 如果限速队列已启用,立即停用
            if self.queue_enabled:
                logging.info("电视关机时,限速队列已启用,立即停用。")
                # 移除已调度的停用任务,以防重复执行
                try:
                    self.scheduler.remove_job('disable_queue_job')
                    logging.info("已移除停用队列的调度任务。")
                except Exception as e:
                    logging.warning(f"移除停用任务时出错: {e}")
                disable_queue_func()
            self.start_time = None

def main():
    """主函数,设置调度任务并启动调度器。"""
    checker = TVChecker(scheduler)
    # 每60秒检查一次电视状态
    scheduler.add_job(checker.check, 'interval', seconds=60, id='tv_check_job')
    try:
        logging.info("启动调度器,开始监控电视状态...")
        scheduler.start()
    except (KeyboardInterrupt, SystemExit):
        logging.info("调度器已停止。")

if __name__ == "__main__":
    # 测试启用和停用队列
    enable_queue()
    time.sleep(10)
    disable_queue_func()
    # 主函数
    main()
发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注