Skip to main content

多线程

AScript 使用 Python 原生的多线程能力. 主要通过 threading 模块实现.

# 导包
import threading

创建线程

函数方式创建

import threading
import time

def task(name):
for i in range(3):
print(f"线程 {name}: 第{i+1}次执行")
time.sleep(1)

# 创建线程
t = threading.Thread(target=task, args=("A",))
# 启动线程
t.start()

print("主线程继续执行, 不会被阻塞")

同时启动多个线程

import threading
import time

def task(name, count):
for i in range(count):
print(f"线程 {name}: {i+1}")
time.sleep(0.5)

# 创建多个线程
t1 = threading.Thread(target=task, args=("A", 3))
t2 = threading.Thread(target=task, args=("B", 5))

t1.start()
t2.start()

print("两个线程已同时启动")

等待线程结束

使用 join() 等待线程执行完毕后再继续

import threading
import time

def download(url):
print(f"开始下载: {url}")
time.sleep(2)
print(f"下载完成: {url}")

t1 = threading.Thread(target=download, args=("文件A",))
t2 = threading.Thread(target=download, args=("文件B",))

t1.start()
t2.start()

# 等待两个线程都执行完毕
t1.join()
t2.join()

print("所有下载已完成")

带超时的等待

import threading
import time

def slow_task():
time.sleep(10)

t = threading.Thread(target=slow_task)
t.start()

# 最多等3秒
t.join(timeout=3)

if t.is_alive():
print("线程还在运行, 不再等待")
else:
print("线程已完成")

守护线程

守护线程会在主线程结束时自动退出, 适合后台任务.

import threading
import time

def background_monitor():
while True:
print("后台监控中...")
time.sleep(2)

# 设置为守护线程
t = threading.Thread(target=background_monitor, daemon=True)
t.start()

# 主线程5秒后结束, 守护线程也会自动退出
time.sleep(5)
print("主线程结束")

线程锁

多个线程同时修改共享数据时, 需要加锁防止数据错乱.

基本锁

import threading

count = 0
lock = threading.Lock()

def add():
global count
for _ in range(100000):
lock.acquire()
count += 1
lock.release()

t1 = threading.Thread(target=add)
t2 = threading.Thread(target=add)

t1.start()
t2.start()
t1.join()
t2.join()

print(f"最终结果: {count}") # 200000

使用 with 语句(推荐)

import threading

count = 0
lock = threading.Lock()

def add():
global count
for _ in range(100000):
with lock:
count += 1

t1 = threading.Thread(target=add)
t2 = threading.Thread(target=add)

t1.start()
t2.start()
t1.join()
t2.join()

print(f"最终结果: {count}") # 200000

线程间通信

使用 Event 信号

import threading
import time

event = threading.Event()

def wait_for_signal():
print("等待信号...")
event.wait() # 阻塞直到 event 被 set
print("收到信号, 开始执行!")

def send_signal():
time.sleep(3)
print("发送信号!")
event.set()

t1 = threading.Thread(target=wait_for_signal)
t2 = threading.Thread(target=send_signal)

t1.start()
t2.start()

使用 Queue 队列

线程安全的数据传递方式, 适合生产者-消费者模式.

import threading
import queue
import time

q = queue.Queue()

def producer():
for i in range(5):
item = f"任务{i+1}"
q.put(item)
print(f"生产: {item}")
time.sleep(0.5)
q.put(None) # 结束信号

def consumer():
while True:
item = q.get()
if item is None:
break
print(f"消费: {item}")
time.sleep(1)

t1 = threading.Thread(target=producer)
t2 = threading.Thread(target=consumer)

t1.start()
t2.start()
t1.join()
t2.join()

print("全部处理完毕")

定时器

延迟执行一个函数

import threading

def delayed_task():
print("3秒后执行了!")

# 3秒后执行 delayed_task
timer = threading.Timer(3.0, delayed_task)
timer.start()

print("定时器已设置")

取消定时器

import threading

def task():
print("这不会执行")

timer = threading.Timer(5.0, task)
timer.start()

# 取消定时器
timer.cancel()
print("定时器已取消")

周期性执行

import threading

def repeat_task(interval, func):
"""每隔 interval 秒执行一次 func"""
func()
timer = threading.Timer(interval, repeat_task, args=(interval, func))
timer.daemon = True
timer.start()
return timer

def check():
print("定期检查中...")

# 每5秒执行一次
repeat_task(5, check)

# 保持主线程运行
import time
time.sleep(30)

线程池

使用 concurrent.futures 模块管理线程池, 适合批量并发任务.

基本用法

from concurrent.futures import ThreadPoolExecutor
import time

def task(name):
time.sleep(1)
return f"{name} 完成"

# 创建线程池, 最多同时运行3个线程
with ThreadPoolExecutor(max_workers=3) as pool:
futures = []
for i in range(6):
f = pool.submit(task, f"任务{i+1}")
futures.append(f)

# 获取所有结果
for f in futures:
print(f.result())

批量执行并获取结果

from concurrent.futures import ThreadPoolExecutor, as_completed
import requests

urls = [
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/2",
"https://httpbin.org/delay/1",
]

def fetch(url):
r = requests.get(url, timeout=5)
return f"{url} -> {r.status_code}"

with ThreadPoolExecutor(max_workers=3) as pool:
futures = {pool.submit(fetch, url): url for url in urls}

# as_completed: 谁先完成谁先返回
for f in as_completed(futures):
try:
print(f.result())
except Exception as e:
print(f"请求失败: {e}")

map 方式(更简洁)

from concurrent.futures import ThreadPoolExecutor

def square(n):
return n * n

with ThreadPoolExecutor(max_workers=4) as pool:
results = list(pool.map(square, [1, 2, 3, 4, 5]))
print(results) # [1, 4, 9, 16, 25]

实战案例

多线程批量下载

from concurrent.futures import ThreadPoolExecutor
import requests
from ascript.android.system import R

urls = [
"https://www.baidu.com/img/flexible/logo/pc/result@2.png",
"https://www.baidu.com/img/flexible/logo/pc/peak-resolve.png",
]

def download(url):
name = url.split("/")[-1]
r = requests.get(url, timeout=10)
with open(R.sd(name), "wb") as f:
f.write(r.content)
return f"{name} 下载完成 ({len(r.content)} bytes)"

with ThreadPoolExecutor(max_workers=3) as pool:
results = list(pool.map(download, urls))
for r in results:
print(r)

后台监控 + 主线程工作

import threading
import time

running = True

def monitor():
"""后台每3秒检查一次"""
while running:
print("[监控] 运行中...")
time.sleep(3)

# 启动后台监控线程
t = threading.Thread(target=monitor, daemon=True)
t.start()

# 主线程执行工作
for i in range(5):
print(f"[主线程] 工作 {i+1}")
time.sleep(2)

running = False
print("程序结束")