5

使用Python制作可热载的定时调度器

 1 year ago
source link: https://mabbs.github.io/2022/09/21/cron.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

21 September 2022 - 字数统计:2238 - 阅读大约需要7分钟 - Hits: 27

使用Python制作可热载的定时调度器

by mayx


定时任务用CRON难道不够吗?

最近因为写的Python脚本比较多,另外也有好多脚本都是定时运行的脚本,然后Linux自带的CRON可能不是很直观,不然为什么那么多人开发CRON表达式生成器😂,另外CRON不能和脚本绑定到一起,也不能在Windows上使用,所以就想用Python来实现一个。
当然如果只是想用Python为一个程序做一个定时任务还是很简单的,不想用CRON也能直接用一个死循环if判断时间来做,比如想要每小时什么时候执行,那就不停的判断,直到那个时间到了以后然后去执行就好了。不过很多个脚本都用这种方法的话就显得太凌乱了,想要停止哪个脚本还不太好整,所以写一个定时调度器来统一管理所有的定时脚本也许会更好一些。

其实我最开始是想着把CRON表达式放到脚本里,然后让管理器像配置文件那样读取它,不过我自己解析CRON表达式有点麻烦,一时也没找到好的办法。不过在找这个东西的时候找到了一个有意思的库,叫做schedule,用pip就能安装。它可以用类似自然语言的语法结构去写定时语句,看起来很有意思,于是我就打算用这个库来写我的调度器。
写它不算很复杂,这个库还是挺方便理解的,比据说Python常用的什么APScheduler那个库好用多了,那个玩意看着就不怎么人性化。功能很快就写好了,但是有个问题,就是既然我要写一个可以热载(自动重加载)的调度器,用什么办法监视文件比较好呢?最开始我实现的时候是想着用列出目录和stat方法来读取文件的元数据,然后轮询,如果内容有变化就进行重载。不过这样写起来也麻烦,准确来说也不算准确,而且性能也不怎么好,还要轮询,文件一多整个程序的执行效率就会变低。然后我想着之前我用的Django好像就有这样的功能,它到底用的是什么方法呢?一般我们运行Django项目的时候它第一句会写“Watching for file changes with StatReloader”,那看来StatReloader就是它用来监视文件的模块,听这名字怎么和我之前想的差不多😂,另外也没有找到叫这个名字的库,所以就算了,我还是搜一下找找别的库吧。后来我找到了一个看起来不错的库,叫做watchdog,看起来好像用法也不算很复杂,而且据说是用的内核的什么东西来监测,性能比轮询好很多,所以我就整了这个库,感觉效果还不错,可靠性也很好,文件一有修改程序就会检测到然后进行重载。

import threading
import time
import schedule
import os
import importlib
from watchdog.observers import Observer
from watchdog.events import *

reload_status = [0]

def run_threaded(job_func):
    job_thread = threading.Thread(target=job_func)
    job_thread.start()
    
class FileEventHandler(FileSystemEventHandler):
    def __init__(self, reload_status):
        self.reload_status = reload_status
    
    def on_any_event(self, event):
        if not event.is_directory:
            self.reload_status[0] = 1

observer = Observer()
event_handler = FileEventHandler(reload_status)
observer.schedule(event_handler, "tasks", recursive=False)
observer.start()
while True:
    reload_status[0] = 0
    taskList = os.listdir("tasks")
    for task in taskList:
        if "__" in task or task.rsplit(".", 1)[-1] != "py":
            continue
        try:
            importlib.reload(importlib.import_module("tasks." + task.split(".")[0])).run(run_threaded, schedule)
        except:
            print(f"Task {task.split('.')[0]} import failure")

    while True:
        if reload_status[0]:
            print("Task change, reloading...")
            time.sleep(1)
            schedule.clear()
            break
        else:
            schedule.run_pending()
        time.sleep(1)

被管理脚本示例

def run(run_threaded, schedule):
    schedule.every().second.do(run_threaded, job)

def job():
    print("The job is running.")

注:脚本应该放到管理器所在文件夹下的“tasks”文件夹,具体定时的写法可以看看schedule官方示例

感觉程序果然还是写的越简单越好,功能也是越单一越好。据说APScheduler是用Python实现的像Java的Quartz那样的东西,看着就很难受,像我这个50行写的管理器看起来还挺不错的吧😁。

tags: Python - 程序 - 标志

0 comments

Be the first person to leave a comment!


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK