SoFunction
Updated on 2024-11-13

The schedule module in Python on the use of timed tasks.

1 Cancel timed tasks

For example, when certain conditions are met, the timed task is canceled, in this scenario, it is not possible to say that the process is killed, so you can use the function of canceling the timed task

The following code, through the count control when the execution of this after 5, cancel the timed task

import schedule
import time
count=0
def do_func(name,age):
global count
count+=1
print(('%Y-%m-%d %H:%M:%S',(()))+" in do func : Name : "+name+" Age: "+str(age))

def main():
job=(2).(do_func,"Zhang Sanfeng",100)
while True:
if count>=5:
schedule.cancel_job(job)
schedule.run_pending()

if __name__=="__main__":
main()

The result is as follows, you can see that after 5 prints, the process does not exit, but the timed task is no longer executed

2022-05-25 00:03:15 in do func : Name: Zhang Sanfeng Age: 100

2022-05-25 00:03:17 in do func : Name: Zhang Sanfeng Age: 100

2022-05-25 00:03:19 in do func : Name: Zhang Sanfeng Age: 100

2022-05-25 00:03:21 in do func : Name: Zhang Sanfeng Age: 100

2022-05-25 00:03:23 in do func : Name: Zhang Sanfeng Age: 100

2 Timed tasks are executed only once

It seems pointless to execute a timed task once, but in some scenarios, such as executing it a certain number of times and then only needing to execute it again when certain conditions are met, it comes in handy!

The following is a simple one-time-only timed task

import time
def do_func(name,age):
print(('%Y-%m-%d %H:%M:%S',(()))+" in do func : Name : "+name+" Age: "+str(age))
return 
def main():
(2).(do_func,"Zhang Sanfeng",100)
while True:
schedule.run_pending()

if __name__=="__main__":
main()

The result of the execution is as follows, and it can be noticed that after executing it once, it is no longer executed

2022-05-25 00:14:20 in do func : Name: Zhang Sanfeng Age: 100

The principle of executing a timed task is actually at the end of the task function to return a message to cancel the task, the real value here lies in the decision of whether the end of the task to the task function, that is, you can make conditional judgments in the task function, when a certain business logic conditions are reached, you can return to cancel the message of the timed task so that setting up the timed task will be more flexible!

As follows, you can determine whether there are conditions for stopping in the business, and when the conditions are met, you will stop and stop executing.

import schedule
import time
count=1
def do_func(name,age):
global count
count+=1
print(('%Y-%m-%d %H:%M:%S',(()))+" in do func : Name : "+name+" Age: "+str(age))
if count >= 5:
print("It's been performed 5 times, it's achieved the desired result, no need to perform it again...")
return 

def main():
(2).(do_func,"Zhang Sanfeng",100)
while True:
schedule.run_pending()

if __name__=="__main__":
main()

The results of the implementation are as follows:

2022-05-25 00:19:30 in do func : Name: Zhang Sanfeng Age: 100

2022-05-25 00:19:32 in do func : Name: Zhang Sanfeng Age: 100

2022-05-25 00:19:34 in do func : Name: Zhang Sanfeng Age: 100

2022-05-25 00:19:36 in do func : Name: Zhang Sanfeng Age: 100

2022-05-25 00:19:38 in do func : Name: Zhang Sanfeng Age: 100

It's already been executed 5 times, it's achieving the desired result, it doesn't need to be executed again...

3 Get all timed tasks

The role of getting all the timed tasks is to be able to perform various operations such as modifications to certain timed tasks that meet certain conditions.

The following code modifies the task from every 3 seconds to every 3 minutes for interval 3

import schedule
import time
def do_func(name,age):
print(('%Y-%m-%d %H:%M:%S',(()))+" in do func : Name : "+name+" Age: "+str(age))
def main():
(2).(do_func,"Zhang Sanfeng",100)
(3).(do_func, "Zhang Sanfeng", 200)
all_jobs=schedule.get_jobs()
print("Original Timed Tasks:")
for job in all_jobs:
print(job)
for job in all_jobs:
if  ==3:
="minutes"
print("Modified timed tasks:")
for job in all_jobs:
print(job)
if __name__=="__main__":
main()

The results of the implementation are as follows:

The original timed task:

Job(interval=2, unit=seconds, do=do_func, args=('Zhang Sanfeng', 100), kwargs={})

Job(interval=3, unit=seconds, do=do_func, args=('Zhang Sanfeng', 200), kwargs={})

Modified timed tasks:

Job(interval=2, unit=seconds, do=do_func, args=('Zhang Sanfeng', 100), kwargs={})

Job(interval=3, unit=minutes, do=do_func, args=('Zhang Sanfeng', 200), kwargs={})

4 Cancel all tasks

The following is the code to cancel all tasks, before canceling it shows that there are two tasks, after canceling it shows that there are 0 timed tasks

import schedule
import time
def do_func(name,age):
print(('%Y-%m-%d %H:%M:%S',(()))+" in do func : Name : "+name+" Age: "+str(age))
def main():
(2).(do_func,"Zhang Sanfeng",100)
(3).(do_func, "Zhang Sanfeng", 200)
print(len(schedule.get_jobs()))
()
print(len(schedule.get_jobs()))
if __name__=="__main__":
main()

The implementation results are:

2

0

5 Tagging a timed task, also getting or canceling a timed task by tagging it

The following code demonstrates tagging tasks, picking tasks based on tags, clearing tasks based on tags, etc.

import schedule
import time

def do_func(name,age):
print(('%Y-%m-%d %H:%M:%S',(()))+" in do func : Name : "+name+" Age: "+str(age))
def main():
# Direct tagging when defining a timed task
(2).(do_func,"Zhang Sanfeng",100).tag("demo1","demo2")
(3).(do_func, "Zhang Sanfeng", 200).tag("demo2","demo3")
(4).(do_func, "Zhang Sanfeng", 300).tag("demo3", "demo4")
# Select tasks based on demo2 tags
demo2=schedule.get_jobs("demo2")
print("Tasks selected based on demo2 tags")
for job in demo2:
print(job)
# Cancel timed tasks based on tags
("demo3")
all_job=schedule.get_jobs()
print("Remaining tasks after deleting tasks labeled demo3")
for job in all_job:
print(job)

if __name__=="__main__":
main()

The results of the implementation are as follows:

Tasks selected based on demo2 tags

Job(interval=2, unit=seconds, do=do_func, args=('Zhang Sanfeng', 100), kwargs={})

Job(interval=3, unit=seconds, do=do_func, args=('Zhang Sanfeng', 200), kwargs={})

Remaining tasks after deleting the tasks labeled demo3

Job(interval=2, unit=seconds, do=do_func, args=('Zhang Sanfeng', 100), kwargs={})

to this article on the Python schedule module on the use of timed tasks to this article, more related Python schedule module content, please search for my previous posts or continue to browse the following related articles I hope you will support me in the future!