多线程
AScript 使用 Python 原生的多线程能力. 主要通过 threading 模块实现.
# 导包
import threading
创建线程
函数方式创建
import threading
import time
def task(name):
for i in range(3):
print(f"线程 {name}: 第{i+1}次执行")
time.sleep(1)
# 创建线程
t = threading.Thread(target=task, args=("A",))
# 启动线程
t.start()
print("主线程继续执行, 不会被阻塞")
同时启动多个线程
import threading
import time
def task(name, count):
for i in range(count):
print(f"线程 {name}: {i+1}")
time.sleep(0.5)
# 创建多个线程
t1 = threading.Thread(target=task, args=("A", 3))
t2 = threading.Thread(target=task, args=("B", 5))
t1.start()
t2.start()
print("两个线程已同时启动")
等待线程结束
使用 join() 等待线程执行完毕后再继续
import threading
import time
def download(url):
print(f"开始下载: {url}")
time.sleep(2)
print(f"下载完成: {url}")
t1 = threading.Thread(target=download, args=("文件A",))
t2 = threading.Thread(target=download, args=("文件B",))
t1.start()
t2.start()
# 等待两个线程都执行完毕
t1.join()
t2.join()
print("所有下载已完成")
带超时的等待
import threading
import time
def slow_task():
time.sleep(10)
t = threading.Thread(target=slow_task)
t.start()
# 最多等3秒
t.join(timeout=3)
if t.is_alive():
print("线程还在运行, 不再等待")
else:
print("线程已完成")
守护线程
守护线程会在主线程结束时自动退出, 适合后台任务.
import threading
import time
def background_monitor():
while True:
print("后台监控中...")
time.sleep(2)
# 设置为守护线程
t = threading.Thread(target=background_monitor, daemon=True)
t.start()
# 主线程5秒后结束, 守护线程也会自动退出
time.sleep(5)
print("主线程结束")
线程锁
多个线程同时修改共享数据时, 需要加锁防止数据错乱.
基本锁
import threading
count = 0
lock = threading.Lock()
def add():
global count
for _ in range(100000):
lock.acquire()
count += 1
lock.release()
t1 = threading.Thread(target=add)
t2 = threading.Thread(target=add)
t1.start()
t2.start()
t1.join()
t2.join()
print(f"最终结果: {count}") # 200000
使用 with 语句(推荐)
import threading
count = 0
lock = threading.Lock()
def add():
global count
for _ in range(100000):
with lock:
count += 1
t1 = threading.Thread(target=add)
t2 = threading.Thread(target=add)
t1.start()
t2.start()
t1.join()
t2.join()
print(f"最终结果: {count}") # 200000
线程间通信
使用 Event 信号
import threading
import time
event = threading.Event()
def wait_for_signal():
print("等待信号...")
event.wait() # 阻塞直到 event 被 set
print("收到信号, 开始执行!")
def send_signal():
time.sleep(3)
print("发送信号!")
event.set()
t1 = threading.Thread(target=wait_for_signal)
t2 = threading.Thread(target=send_signal)
t1.start()
t2.start()
使用 Queue 队列
线程安全的数据传递方式, 适合生产者-消费者模式.
import threading
import queue
import time
q = queue.Queue()
def producer():
for i in range(5):
item = f"任务{i+1}"
q.put(item)
print(f"生产: {item}")
time.sleep(0.5)
q.put(None) # 结束信号
def consumer():
while True:
item = q.get()
if item is None:
break
print(f"消费: {item}")
time.sleep(1)
t1 = threading.Thread(target=producer)
t2 = threading.Thread(target=consumer)
t1.start()
t2.start()
t1.join()
t2.join()
print("全部处理完毕")
定时器
延迟执行一个函数
import threading
def delayed_task():
print("3秒后执行了!")
# 3秒后执行 delayed_task
timer = threading.Timer(3.0, delayed_task)
timer.start()
print("定时器已设置")
取消定时器
import threading
def task():
print("这不会执行")
timer = threading.Timer(5.0, task)
timer.start()
# 取消定时器
timer.cancel()
print("定时器已取消")
周期性执行
import threading
def repeat_task(interval, func):
"""每隔 interval 秒执行一次 func"""
func()
timer = threading.Timer(interval, repeat_task, args=(interval, func))
timer.daemon = True
timer.start()
return timer
def check():
print("定期检查中...")
# 每5秒执行一次
repeat_task(5, check)
# 保持主线程运行
import time
time.sleep(30)
线程池
使用 concurrent.futures 模块管理线程池, 适合批量并发任务.
基本用法
from concurrent.futures import ThreadPoolExecutor
import time
def task(name):
time.sleep(1)
return f"{name} 完成"
# 创建线程池, 最多同时运行3个线程
with ThreadPoolExecutor(max_workers=3) as pool:
futures = []
for i in range(6):
f = pool.submit(task, f"任务{i+1}")
futures.append(f)
# 获取所有结果
for f in futures:
print(f.result())
批量执行并获取结果
from concurrent.futures import ThreadPoolExecutor, as_completed
import requests
urls = [
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/2",
"https://httpbin.org/delay/1",
]
def fetch(url):
r = requests.get(url, timeout=5)
return f"{url} -> {r.status_code}"
with ThreadPoolExecutor(max_workers=3) as pool:
futures = {pool.submit(fetch, url): url for url in urls}
# as_completed: 谁先完成谁先返回
for f in as_completed(futures):
try:
print(f.result())
except Exception as e:
print(f"请求失败: {e}")
map 方式(更简洁)
from concurrent.futures import ThreadPoolExecutor
def square(n):
return n * n
with ThreadPoolExecutor(max_workers=4) as pool:
results = list(pool.map(square, [1, 2, 3, 4, 5]))
print(results) # [1, 4, 9, 16, 25]
实战案例
多线程批量下载
from concurrent.futures import ThreadPoolExecutor
import requests
from ascript.android.system import R
urls = [
"https://www.baidu.com/img/flexible/logo/pc/result@2.png",
"https://www.baidu.com/img/flexible/logo/pc/peak-resolve.png",
]
def download(url):
name = url.split("/")[-1]
r = requests.get(url, timeout=10)
with open(R.res(name), "wb") as f:
f.write(r.content)
return f"{name} 下载完成 ({len(r.content)} bytes)"
with ThreadPoolExecutor(max_workers=3) as pool:
results = list(pool.map(download, urls))
for r in results:
print(r)
后台监控 + 主线程工作
import threading
import time
running = True
def monitor():
"""后台每3秒检查一次"""
while running:
print("[监控] 运行中...")
time.sleep(3)
# 启动后台监控线程
t = threading.Thread(target=monitor, daemon=True)
t.start()
# 主线程执行工作
for i in range(5):
print(f"[主线程] 工作 {i+1}")
time.sleep(2)
running = False
print("程序结束")