首页 > 编程知识 正文

小说阅读源码,如何阅读框架源码

时间:2023-05-05 19:24:33 阅读:42543 作者:3069

趁热学习Ap计划程序的python源代码,有兴趣实现任务调度控件。

源代码分析主要针对AP计划程序下的几个重要模块

事件执行机构作业任务jobstores任务包含triggers触发器schedulers调度程序。 主要看看事件事件和执行机构

events事件event是一个主要由APScheduler触发的事件类,可以使用add_listener )将侦听器函数绑定到调度程序,并在接收到指定事件后对其进行自定义

事件主要根据事件的类型进行封装,是什么意思?

调度程序启动/关闭等事件封装在调度程序事件类中,任务执行结果封装在JobExecutionEvent类中等

因此,evnets最后分为4个类。 代码很简单,主要是使用类来区分用户感兴趣的信息。 每个类封装的信息都不同,但具体分类的详细情况,请看上一篇,有详细的介绍

#在任务调度过程中发生的事件,如基类、启动调度程序和关闭调度程序。 classschedulerevent(object ) :def_init_(self,code,alias=None ) 3360super ) )。 self(_init_ ) self.code=codeself.alias=alias def _ repr _ (self ) 3360return'%s ) code=%% self.code classjobevent (调度器事件) :def__init_(self,code,job )。 作业存储(: super )作业事件, self(_init__ ) code ) self.codeself.job _ id=job _ id self.job任务达到最大并发时触发的事件classjobsubmimity 作业_ id,作业存储,sheduled _ run _ times (: super (jobsubmissionevent,self )._init_ ) code,作业_ id, 作业存储(self.scheduled _ run _ times=scheduled _ run _ times #任务已成功执行等, 执行与任务执行结果相关的事件任务时出现异常的classjobexecutionevent (job event ) :def_init_ ) self、code、job_id、jobstore、 触发scheduled_run_的跟踪back=none (: super (jobexecutionevent,self )._init_ ) code、job_id、job store

现在,让我们从总体上看一下BaseExecutor代码的逻辑

从异常类Exception继承的自定义异常MaxInstancesReachedError

classmaxinstancesreachederror (exception ) : def _ init _ (self,job ) :super ) maxinstancesreachederror, self (_ _ init _ (job ' % s ' hasalreadyreacheditsmaximumnumberofinstances ) % ) % (job.id,job.max_instances )

bda: 0)

super() 方法设计目的是用来解决多重继承时父类的查找问题,在涉及到子类中需要调用父类的方法时,这是个很好的习惯

self._instances 类型是 dict 字典类型, 主要是为了统计 job 同时运行的实例数,以 job.id 作为 key,同时运行的实例数作为对应的 value,主要是实现前面2篇提到过,我们是可以限制一个任务同时运行的最大实例数的 max_instances

这里的 defaultdict 就是一个关于初始化字典的小技巧,一般情况下,看一下下面这段代码:

# 初始化dict={}# 设置值dict[element] = value# 获取值target = dict[element]

获取值的前提是 element 在字典中,如果不在字典里就会报错

通过 defaultdict(lambda: 0) 来保证当从 self._instances 取值的时候,如果不存在的时候,返回 0,而不会报错

启动 start def start(self, scheduler, alias): self._scheduler = scheduler self._lock = scheduler._create_lock() self._logger = logging.getLogger('apscheduler.executors.%s' % alias)

持有一个 BaseScheduler 对象主要是为了在任务执行成功失败后,可以将任务执行状态返回给调度程序 (因为调度程序支持用户对事件添加侦听器的功能)

初始化完成,再调用启动后,接下来主要看一下如何让 BaseScheduler 执行任务

提交任务 submit_job def submit_job(self, job, run_times): assert self._lock is not None, with self._lock: if self._instances[job.id] >= job.max_instances: raise MaxInstancesReachedError(job) self._do_submit_job(job, run_times) self._instances[job.id] += 1 assert 是为了确保 BaseScheduler 已经通过 start() 完成了部分成员变量的初始化在加锁的情况下,通过 self._instances 获取正在运行的任务的实例,如果超过 max_instances (默认为1), 则会抛出 MaxInstancesReachedError 的异常self._do_submit_job() 来真正执行这个任务正在运行的任务计数加 1 子类重载函数

因为执行器的不同,这里的 _do_submit_job() 是一个纯虚函数,需要子类自己实现
当然,shutdown(self, wait=True) 也交给子类实现,但是父类提供了一个缺省的实现

操作执行结果 _run_job_success(self, job_id, events) 子类在任务执行成功后调用_run_job_error(self, job_id, exc, traceback=None) 任务失败后调用

简单看一下实现,任务执行成功后,在加锁的前提下将任务计数减取 1, 如果为 0,则删除这个键值,最后将处理的事件,返回给 scheduler,执行失败也差不多,只是最后返回给 scheduler 包含更多的错误信息

def _run_job_success(self, job_id, events): with self._lock: self._instances[job_id] -= 1 if self._instances[job_id] == 0: del self._instances[job_id] for event in events: self._scheduler._dispatch_event(event)

BaseScheduler 类里的部分都分析完了,还有一个 run_job(job, jobstore_alias, run_times, logger_name) 的函数

run_job

这个函数实现了任务的执行,并根据任务执行的结果,做相应信息的收集,是整个执行器里,最核心的部分

def run_job(job, jobstore_alias, run_times, logger_name): events = [] logger = logging.getLogger(logger_name) for run_time in run_times: if job.misfire_grace_time is not None: difference = datetime.now(utc) - run_time grace_time = timedelta(seconds=job.misfire_grace_time) if difference > grace_time: events.append(JobExecutionEvent(EVENT_JOB_MISSED, job.id, jobstore_alias, run_time)) logger.warning('Run time of job "%s" was missed by %s', job, difference) continue logger.info('Running job "%s" (scheduled at %s)', job, run_time) try: retval = job.func(*job.args, **job.kwargs) except BaseException: exc, tb = sys.exc_info()[1:] formatted_tb = ''.join(format_tb(tb)) events.append(JobExecutionEvent(EVENT_JOB_ERROR, job.id, jobstore_alias, run_time, exception=exc, traceback=formatted_tb)) logger.exception('Job "%s" raised an exception', job) if six.PY2: sys.exc_clear() del tb else: import traceback traceback.clear_frames(tb) del tb else: events.append(JobExecutionEvent(EVENT_JOB_EXECUTED, job.id, jobstore_alias, run_time, retval=retval)) logger.info('Job "%s" executed successfully', job) return events

大致处理的逻辑是这样的:

调度程序调用 run_job(),携带参数 任务 和任务所属的 任务存储别名,执行时间 和 日志logger_name当任务配置了 misfire_grace_time,这个字段指的是当真正执行任务的时间与计划执行时间的误差,也就是在设置的误差范围内,任务被调用时,该任务还是会被执行,反之则任务状态就会为 EVENTJOBMISSED,不执行try 里执行预定的任务然后对执行结果状态按照成功失败分别处理 成功,直接将事件状态设置成 EVENT_JOB_EXECUTED失败,通过 sys.exc_info 获取详细的错误信息,最后防止内存泄漏,清空错误信息堆栈

关于子类,主要想介绍一下 BasePoolExecutor

BasePoolExecutor

这个是线程池和进程池的基类,从线程池中获取一个线程或进程,然后将 run_job() 扔进去执行就可以了

class BasePoolExecutor(BaseExecutor): @abstractmethod def __init__(self, pool): super(BasePoolExecutor, self).__init__() self._pool = pool def _do_submit_job(self, job, run_times): def callback(f): exc, tb = (f.exception_info() if hasattr(f, 'exception_info') else (f.exception(), getattr(f.exception(), '__traceback__', None))) if exc: self._run_job_error(job.id, exc, tb) else: self._run_job_success(job.id, f.result()) f = self._pool.submit(run_job, job, job._jobstore_alias, run_times, self._logger.name) f.add_done_callback(callback) def shutdown(self, wait=True): self._pool.shutdown(wait) ThreadPoolExecutor

使用 concurrent.futures 下的线程池 ThreadPoolExecutor

class ThreadPoolExecutor(BasePoolExecutor): def __init__(self, max_workers=10): pool = concurrent.futures.ThreadPoolExecutor(int(max_workers)) super(ThreadPoolExecutor, self).__init__(pool) ProcessPoolExecutor

使用 concurrent.futures 下的进程池 ProcessPoolExecutor

class ProcessPoolExecutor(BasePoolExecutor): def __init__(self, max_workers=10): pool = concurrent.futures.ProcessPoolExecutor(int(max_workers)) super(ProcessPoolExecutor, self).__init__(pool)

其它的子类在实现上区别都不是很大,这里就不展开了,有兴趣的可以自己阅读一下~

版权声明:该文观点仅代表作者本人。处理文章:请发送邮件至 三1五14八八95#扣扣.com 举报,一经查实,本站将立刻删除。