Java开发者转战Python:进阶篇
📖 系列文章导航
本文是"Java开发者转战Python"系列的第二篇,深入学习Python的进阶特性。
📚 系列:
# 一、Python进阶特性
# 1、异常处理
Python的异常处理机制与Java类似,但语法更简洁,并提供了一些独特的特性。
# 1.1、try-except基本语法
基本用法
# 基本异常捕获
try:
number = int(input("请输入数字: "))
result = 10 / number
print(f"结果: {result}")
except ValueError:
print("输入的不是有效数字")
except ZeroDivisionError:
print("不能除以零")
对比Java:
// Java异常处理
try {
int number = Integer.parseInt(scanner.nextLine());
int result = 10 / number;
System.out.println("结果: " + result);
} catch (NumberFormatException e) {
System.out.println("输入的不是有效数字");
} catch (ArithmeticException e) {
System.out.println("不能除以零");
}
捕获多个异常
# 方式1:分别捕获
try:
data = json.loads(json_string)
except ValueError as e:
print(f"JSON解析错误: {e}")
except KeyError as e:
print(f"键不存在: {e}")
# 方式2:合并捕获(异常处理逻辑相同时)
try:
file = open("data.txt")
data = process(file)
except (FileNotFoundError, PermissionError) as e:
print(f"文件访问错误: {e}")
# 方式3:捕获所有异常(不推荐用于生产)
try:
risky_operation()
except Exception as e:
print(f"发生错误: {e}")
# 1.2、try-except-else-finally
Python的异常处理提供了else和finally子句。
else子句
try:
file = open("data.txt", "r")
data = file.read()
except FileNotFoundError:
print("文件不存在")
else:
# 只有try块成功时才执行
print(f"读取了{len(data)}个字符")
process_data(data)
finally:
# 无论如何都会执行
if 'file' in locals():
file.close()
print("文件已关闭")
完整示例
def divide(a, b):
try:
result = a / b
except ZeroDivisionError:
print("错误:除数不能为零")
return None
except TypeError:
print("错误:参数类型错误")
return None
else:
print(f"计算成功: {a} / {b} = {result}")
return result
finally:
print("除法操作完成")
# 使用
divide(10, 2)
# 输出:
# 计算成功: 10 / 2 = 5.0
# 除法操作完成
divide(10, 0)
# 输出:
# 错误:除数不能为零
# 除法操作完成
对比Java:
// Java没有else,但有finally
try {
result = a / b;
System.out.println("计算成功"); // 需要手动判断
} catch (ArithmeticException e) {
System.out.println("错误:除数不能为零");
} finally {
System.out.println("除法操作完成");
}
# 1.3、异常类型与继承体系
Python的异常都继承自BaseException。
异常层次结构
BaseException
├── SystemExit # 由 sys.exit() 触发,用于正常退出程序
├── KeyboardInterrupt # 用户按下 Ctrl+C 或其他中断信号时触发
├── GeneratorExit # 当调用 generator.close() 或生成器被垃圾回收时抛出,通常不应被捕获
└── Exception # 所有常规、可恢复错误的基类,应始终从此类派生自定义异常
├── StopIteration # 被内置函数 next() 或 for 循环用于标志迭代器耗尽(Python 3.7+ 中在 for 循环中自动处理)
├── ArithmeticError # 数值计算错误的基类
│ ├── ZeroDivisionError # 除数为零(如 1/0)
│ ├── OverflowError # 数值超出可表示范围(多见于整数,但在浮点中通常返回 inf)
│ └── FloatingPointError # 浮点运算错误(极少触发,因现代系统通常遵循 IEEE 754)
├── AssertionError # assert 语句失败时抛出(仅在调试模式下启用)
├── AttributeError # 访问对象不存在的属性或方法时抛出(如 obj.foo 但 foo 未定义)
├── EOFError # 在 input() 或类似函数中遇到文件结束符(EOF)而未读取到数据时抛出
├── ImportError # 导入模块失败(如 import nonexistent_module)
│ └── ModuleNotFoundError # ImportError 的子类,明确表示模块未找到(Python 3.6+)
├── LookupError # 序列或映射查找失败的基类
│ ├── IndexError # 索引超出序列范围(如 [1,2][5])
│ └── KeyError # 字典中键不存在(如 d['missing'])
├── MemoryError # 内存不足,无法分配更多内存(罕见,因现代 OS 通常有虚拟内存机制)
├── NameError # 使用未定义的变量名(如 print(undefined_var))
│ └── UnboundLocalError # 函数中局部变量在赋值前被引用(常见于闭包或作用域问题)
├── OSError # 操作系统相关错误(如文件、进程、权限等)
│ ├── FileNotFoundError # 尝试打开不存在的文件
│ ├── PermissionError # 无权限访问资源(如读写受保护文件)
│ └── TimeoutError # 系统调用超时(如 socket 超时)
├── RuntimeError # 一般性运行时错误,不属于其他类别的错误
│ ├── NotImplementedError # 抽象方法未实现(常用于基类中提示子类覆盖)
│ └── RecursionError # 递归深度超过 sys.getrecursionlimit() 限制
├── TypeError # 操作或函数应用于类型不兼容的对象(如 'a' + 1)
├── ValueError # 类型正确但值无效(如 int('abc') 或 math.sqrt(-1))
│ └── UnicodeError # Unicode 编码/解码/转换错误(如 str.encode('utf-8') 时字符无法编码)
└── Warning # 警告类异常基类,不中断程序执行,可通过 warnings 模块控制
├── DeprecationWarning # 使用了已废弃的特性
├── FutureWarning # 当前有效但未来语义会变或被移除
├── UserWarning # 用户自定义警告
└── ...(其他警告子类)
常见异常
# ValueError:值错误
int("abc") # ValueError: invalid literal
# TypeError:类型错误
"string" + 123 # TypeError: can only concatenate str
# KeyError:字典键不存在
{"a": 1}["b"] # KeyError: 'b'
# IndexError:索引越界
[1, 2, 3][5] # IndexError: list index out of range
# AttributeError:属性不存在
"string".non_existent # AttributeError: 'str' object has no attribute
# FileNotFoundError:文件不存在
open("non_existent.txt") # FileNotFoundError
# ImportError:模块导入失败
import non_existent_module # ModuleNotFoundError
# 1.4、raise语句
抛出异常
def withdraw(amount, balance):
if amount <= 0:
raise ValueError("取款金额必须大于0")
if amount > balance:
raise ValueError(f"余额不足:需要{amount},当前{balance}")
return balance - amount
# 使用
try:
new_balance = withdraw(1000, 500)
except ValueError as e:
print(f"操作失败: {e}")
# 输出: 操作失败: 余额不足:需要1000,当前500
重新抛出异常
def process_data(data):
try:
result = risky_operation(data)
except ValueError as e:
print(f"警告: {e}")
raise # 重新抛出原异常
# 或者抛出新异常
def process_file(filename):
try:
with open(filename) as f:
return f.read()
except FileNotFoundError:
raise ValueError(f"配置文件{filename}不存在")
# 1.5、自定义异常
# 基本自定义异常
class ValidationError(Exception):
"""数据验证错误"""
pass
# 带额外信息的异常
class InsufficientFundsError(Exception):
"""余额不足异常"""
def __init__(self, balance, amount):
self.balance = balance
self.amount = amount
self.shortage = amount - balance
super().__init__(f"余额不足:需要{amount},当前{balance},缺少{self.shortage}")
# 使用
def withdraw(balance, amount):
if amount > balance:
raise InsufficientFundsError(balance, amount)
return balance - amount
try:
new_balance = withdraw(100, 150)
except InsufficientFundsError as e:
print(e)
print(f"缺少金额: {e.shortage}")
# 输出:
# 余额不足:需要150,当前100,缺少50
# 缺少金额: 50
异常基类最佳实践
class AppError(Exception):
"""应用程序基础异常"""
pass
class DatabaseError(AppError):
"""数据库相关错误"""
pass
class NetworkError(AppError):
"""网络相关错误"""
pass
class APIError(AppError):
"""API调用错误"""
def __init__(self, status_code, message):
self.status_code = status_code
self.message = message
super().__init__(f"API错误 {status_code}: {message}")
# 使用层次化捕获
try:
call_api()
except APIError as e:
print(f"API调用失败: {e}")
except NetworkError as e:
print(f"网络错误: {e}")
except AppError as e:
print(f"应用错误: {e}")
对比Java:
// Java自定义异常
public class InsufficientFundsException extends Exception {
private double balance;
private double amount;
public InsufficientFundsException(double balance, double amount) {
super("余额不足:需要" + amount + ",当前" + balance);
this.balance = balance;
this.amount = amount;
}
public double getShortage() {
return amount - balance;
}
}
# 1.6、异常链(Exception Chaining)
Python 3支持异常链,保留原始异常信息。
使用from关键字
def parse_config(filename):
try:
with open(filename) as f:
return json.load(f)
except FileNotFoundError as e:
raise ValueError(f"配置文件{filename}不存在") from e
except json.JSONDecodeError as e:
raise ValueError(f"配置文件格式错误") from e
try:
config = parse_config("config.json")
except ValueError as e:
print(f"错误: {e}")
print(f"原因: {e.__cause__}")
# 可以追溯到原始异常
隐式异常链
try:
try:
result = 10 / 0
except ZeroDivisionError:
# 在异常处理中又发生了异常
undefined_variable # NameError
except NameError as e:
print(f"当前异常: {e}")
print(f"之前的异常: {e.__context__}")
抑制异常链
try:
result = 10 / 0
except ZeroDivisionError:
# 使用from None抑制异常链
raise ValueError("计算错误") from None
# 1.7、实用异常处理模式
资源清理
# ❌ 不推荐
file = open("data.txt")
try:
data = file.read()
process(data)
finally:
file.close()
# ✅ 推荐:使用with语句
with open("data.txt") as file:
data = file.read()
process(data)
# 自动关闭文件
多个资源
# 管理多个资源
with open("input.txt") as infile, open("output.txt", "w") as outfile:
data = infile.read()
outfile.write(data.upper())
EAFP vs LBYL
Python推崇EAFP(Easier to Ask for Forgiveness than Permission,请求原谅比请求许可更容易)而非LBYL(Look Before You Leap,先检查再行动)。
# LBYL(Java风格)- 不推荐
if key in dictionary:
value = dictionary[key]
else:
value = default_value
# EAFP(Python风格)- 推荐
try:
value = dictionary[key]
except KeyError:
value = default_value
# 或者使用get方法
value = dictionary.get(key, default_value)
异常传播与日志
import logging
def process_user_data(user_id):
try:
user = fetch_user(user_id)
data = transform_data(user)
save_data(data)
except DatabaseError as e:
logging.error(f"数据库错误处理用户{user_id}: {e}")
raise # 重新抛出,让上层处理
except ValidationError as e:
logging.warning(f"验证失败用户{user_id}: {e}")
return None # 吞掉异常,返回默认值
except Exception as e:
logging.critical(f"未知错误处理用户{user_id}: {e}", exc_info=True)
raise # 致命错误,必须抛出
小结对比表
| 特性 | Python | Java |
|---|---|---|
| 基本语法 | try-except | try-catch |
| 多异常 | except (E1, E2) | 多个catch块或\| |
| else子句 | 支持 | 不支持 |
| finally | 支持 | 支持 |
| 异常链 | raise ... from | Throwable.initCause() |
| 检查异常 | 无 | 有(编译时检查) |
| 自定义异常 | 继承Exception | 继承Exception |
Python的异常处理更简洁,但缺少Java的检查异常机制。这让Python更灵活,但也需要开发者更自律地处理异常!
# 2、文件操作与I/O
Python的文件操作比Java简单直观得多,无需处理繁琐的流和缓冲区。
# 2.1、文件打开与关闭
基本操作
# 打开文件
file = open("data.txt", "r") # 读模式
content = file.read()
file.close() # 必须手动关闭
# ✅ 推荐:使用with语句自动关闭
with open("data.txt", "r") as file:
content = file.read()
# 自动关闭,即使发生异常也会关闭
文件模式
| 模式 | 说明 | Java对应 |
|---|---|---|
'r' | 只读(默认) | FileReader |
'w' | 写入(覆盖) | FileWriter |
'a' | 追加 | FileWriter(file, true) |
'x' | 独占创建(文件已存在则失败) | - |
'b' | 二进制模式 | FileInputStream |
't' | 文本模式(默认) | - |
'+' | 读写模式 | RandomAccessFile |
常用组合
# 文本文件
open("file.txt", "r") # 读
open("file.txt", "w") # 写(覆盖)
open("file.txt", "a") # 追加
open("file.txt", "r+") # 读写
# 二进制文件
open("image.png", "rb") # 读二进制
open("image.png", "wb") # 写二进制
对比Java:
// Java需要更多代码
try (BufferedReader reader = new BufferedReader(
new FileReader("data.txt"))) {
String content = reader.readLine();
} catch (IOException e) {
e.printStackTrace();
}
# 2.2、文本文件读写
读取文件
# 方式1:一次性读取全部内容
with open("data.txt", "r", encoding="utf-8") as f:
content = f.read()
print(content)
# 方式2:按行读取(返回列表)
with open("data.txt", "r", encoding="utf-8") as f:
lines = f.readlines()
for line in lines:
print(line.strip()) # 去除换行符
# 方式3:逐行迭代(推荐,内存友好)
with open("data.txt", "r", encoding="utf-8") as f:
for line in f:
print(line.strip())
# 方式4:读取指定字节数
with open("data.txt", "r") as f:
chunk = f.read(100) # 读取前100个字符
写入文件
# 写入字符串
with open("output.txt", "w", encoding="utf-8") as f:
f.write("Hello, World!\n")
f.write("第二行\n")
# 写入多行
lines = ["第一行\n", "第二行\n", "第三行\n"]
with open("output.txt", "w", encoding="utf-8") as f:
f.writelines(lines)
# 追加内容
with open("output.txt", "a", encoding="utf-8") as f:
f.write("追加的内容\n")
实用示例
# 复制文件
with open("source.txt", "r") as src, open("dest.txt", "w") as dst:
dst.write(src.read())
# 处理CSV
with open("data.csv", "r") as f:
for line in f:
fields = line.strip().split(",")
print(fields)
# 统计行数
with open("data.txt", "r") as f:
line_count = sum(1 for line in f)
print(f"总行数: {line_count}")
# 查找并替换
with open("input.txt", "r") as f:
content = f.read()
content = content.replace("old", "new")
with open("output.txt", "w") as f:
f.write(content)
# 2.3、二进制文件处理
读写二进制文件
# 读取二进制文件
with open("image.png", "rb") as f:
data = f.read()
print(f"文件大小: {len(data)} 字节")
# 写入二进制文件
with open("output.bin", "wb") as f:
f.write(b"\x00\x01\x02\x03")
# 复制二进制文件
with open("source.png", "rb") as src, open("dest.png", "wb") as dst:
dst.write(src.read())
# 分块读取大文件(内存友好)
with open("large_file.bin", "rb") as f:
while chunk := f.read(8192): # 每次读8KB
process(chunk)
使用struct模块处理二进制数据
import struct
# 写入结构化二进制数据
with open("data.bin", "wb") as f:
# 写入:1个整数,1个浮点数,5个字符
data = struct.pack("if5s", 42, 3.14, b"hello")
f.write(data)
# 读取结构化二进制数据
with open("data.bin", "rb") as f:
data = f.read()
num, pi, text = struct.unpack("if5s", data)
print(f"整数: {num}, 浮点: {pi}, 文本: {text.decode()}")
# 2.4、pathlib模块(现代路径操作)
pathlib是Python 3.4+推荐的路径操作方式,面向对象,比os.path更直观。
基本用法
from pathlib import Path
# 创建Path对象
file_path = Path("data/file.txt")
absolute_path = Path("/usr/local/bin/python")
# 获取当前目录
current_dir = Path.cwd()
print(current_dir)
# 获取用户主目录
home_dir = Path.home()
print(home_dir)
# 路径拼接
config_file = Path.home() / "config" / "settings.json"
# 等同于:Path.home().joinpath("config", "settings.json")
路径属性
file_path = Path("/home/user/documents/report.pdf")
print(file_path.name) # report.pdf(文件名)
print(file_path.stem) # report(文件名不含扩展名)
print(file_path.suffix) # .pdf(扩展名)
print(file_path.parent) # /home/user/documents(父目录)
print(file_path.parents[0]) # /home/user/documents
print(file_path.parents[1]) # /home/user
print(file_path.anchor) # /(根目录)
print(file_path.parts) # ('/', 'home', 'user', 'documents', 'report.pdf')
文件系统操作
from pathlib import Path
file_path = Path("data.txt")
# 检查存在性
if file_path.exists():
print("文件存在")
# 检查类型
if file_path.is_file():
print("是文件")
if file_path.is_dir():
print("是目录")
# 读写文件
file_path.write_text("Hello, World!", encoding="utf-8")
content = file_path.read_text(encoding="utf-8")
# 二进制读写
file_path.write_bytes(b"\x00\x01\x02")
data = file_path.read_bytes()
# 创建目录
Path("new_dir").mkdir(exist_ok=True) # 创建单层目录
Path("parent/child").mkdir(parents=True, exist_ok=True) # 创建多层
# 删除文件
file_path.unlink(missing_ok=True) # Python 3.8+
# 遍历目录
for item in Path(".").iterdir():
print(item)
# 模式匹配查找文件
for py_file in Path(".").glob("*.py"):
print(py_file)
for py_file in Path(".").rglob("*.py"): # 递归查找
print(py_file)
实用示例
from pathlib import Path
# 查找项目中所有Python文件
project_root = Path(".")
python_files = list(project_root.rglob("*.py"))
print(f"找到{len(python_files)}个Python文件")
# 统计代码行数
total_lines = 0
for py_file in python_files:
lines = py_file.read_text().count("\n")
total_lines += lines
print(f"总行数: {total_lines}")
# 安全地处理配置文件
config_file = Path.home() / ".myapp" / "config.json"
if not config_file.exists():
config_file.parent.mkdir(parents=True, exist_ok=True)
config_file.write_text('{"default": true}')
对比Java:
// Java使用Path和Files(Java 7+)
import java.nio.file.*;
Path filePath = Paths.get("/home/user/data.txt");
String content = Files.readString(filePath);
Files.writeString(filePath, "Hello");
// 遍历目录
try (var stream = Files.list(Paths.get("."))) {
stream.forEach(System.out::println);
}
# 2.5、标准输入输出
输入
# input():读取一行输入
name = input("请输入姓名: ")
print(f"你好, {name}!")
# 读取数字
age = int(input("请输入年龄: "))
# 读取多个值
x, y = map(int, input("输入两个数字(空格分隔): ").split())
输出
# print():标准输出
print("Hello, World!")
# 多个参数
print("姓名:", name, "年龄:", age)
# 自定义分隔符和结束符
print("a", "b", "c", sep="-") # a-b-c
print("loading", end="...") # loading...(不换行)
print(" done!") # loading... done!
# 写入文件
with open("output.txt", "w") as f:
print("写入文件", file=f)
格式化输出
name = "张三"
age = 25
score = 95.678
# f-string(推荐)
print(f"姓名: {name}, 年龄: {age}, 成绩: {score:.2f}")
# format方法
print("姓名: {}, 年龄: {}, 成绩: {:.2f}".format(name, age, score))
# %格式化(老式)
print("姓名: %s, 年龄: %d, 成绩: %.2f" % (name, age, score))
小结对比表
| 操作 | Python | Java |
|---|---|---|
| 打开文件 | open() | FileReader/BufferedReader |
| 自动关闭 | with语句 | try-with-resources |
| 读取全部 | read() | readAll()或循环读取 |
| 逐行读取 | for line in f | BufferedReader.readLine() |
| 路径操作 | pathlib.Path | java.nio.file.Path |
| 文件存在 | path.exists() | Files.exists() |
| 创建目录 | path.mkdir() | Files.createDirectories() |
Python的文件I/O操作比Java简洁优雅得多,pathlib模块提供了现代化的路径处理方式!
# 3、迭代器与生成器
迭代器和生成器是Python的强大特性,能够高效处理大数据集和实现惰性求值。
# 3.1、迭代器协议
Python的迭代器基于两个魔术方法:__iter__和__next__。
基本概念
# 可迭代对象(Iterable):实现了__iter__方法
numbers = [1, 2, 3, 4, 5]
iterator = iter(numbers) # 获取迭代器
# 迭代器(Iterator):实现了__iter__和__next__方法
print(next(iterator)) # 1
print(next(iterator)) # 2
print(next(iterator)) # 3
# 迭代完毕后抛出StopIteration
# next(iterator) # 迭代完毕会抛出StopIteration
自定义迭代器
class Countdown:
"""倒计时迭代器"""
def __init__(self, start):
self.current = start
def __iter__(self):
return self
def __next__(self):
if self.current <= 0:
raise StopIteration
self.current -= 1
return self.current + 1
# 使用
for num in Countdown(5):
print(num) # 5, 4, 3, 2, 1
实用迭代器示例
class FileReader:
"""逐行读取文件的迭代器"""
def __init__(self, filename):
self.file = open(filename, 'r')
def __iter__(self):
return self
def __next__(self):
line = self.file.readline()
if not line:
self.file.close()
raise StopIteration
return line.strip()
# 使用
for line in FileReader("data.txt"):
print(line)
对比Java:
// Java使用Iterator接口
Iterator<Integer> iterator = numbers.iterator();
while (iterator.hasNext()) {
System.out.println(iterator.next());
}
# 3.2、生成器函数(yield)
生成器是创建迭代器的最简单方式,使用yield关键字。
基本用法
def countdown(n):
"""倒计时生成器"""
while n > 0:
yield n
n -= 1
# 使用
for num in countdown(5):
print(num) # 5, 4, 3, 2, 1
# 生成器是迭代器
gen = countdown(3)
print(next(gen)) # 3
print(next(gen)) # 2
print(next(gen)) # 1
# print(next(gen)) # StopIteration
工作原理
def simple_generator():
print("开始")
yield 1
print("继续")
yield 2
print("结束")
yield 3
gen = simple_generator()
print("创建生成器")
print(next(gen)) # 开始 -> 1
print(next(gen)) # 继续 -> 2
print(next(gen)) # 结束 -> 3
实用示例
# 1. 斐波那契数列
def fibonacci(n):
"""生成前n个斐波那契数"""
a, b = 0, 1
count = 0
while count < n:
yield a
a, b = b, a + b
count += 1
print(list(fibonacci(10)))
# [0, 1, 1, 2, 3, 5, 8, 13, 21, 34]
# 2. 逐行读取大文件(内存友好)
def read_large_file(file_path):
"""逐行读取文件"""
with open(file_path) as f:
for line in f:
yield line.strip()
for line in read_large_file("huge_file.txt"):
process(line) # 一次只加载一行到内存
# 3. 批量处理数据
def batch(items, size):
"""将数据分批"""
batch = []
for item in items:
batch.append(item)
if len(batch) == size:
yield batch
batch = []
if batch: # 最后一批
yield batch
# 使用
data = range(100)
for batch in batch(data, 10):
print(f"处理批次: {batch}")
# 4. 无限序列
def infinite_sequence():
"""无限递增序列"""
num = 0
while True:
yield num
num += 1
gen = infinite_sequence()
print(next(gen)) # 0
print(next(gen)) # 1
print(next(gen)) # 2
# 3.3、生成器表达式
生成器表达式是列表推导式的生成器版本,使用圆括号。
基本语法
# 列表推导式:立即计算,占用内存
squares_list = [x**2 for x in range(1000000)]
# 生成器表达式:惰性计算,节省内存
squares_gen = (x**2 for x in range(1000000))
# 使用
for square in squares_gen:
if square > 100:
break
print(square)
对比
import sys
# 列表推导式
list_comp = [x for x in range(10000)]
print(f"列表大小: {sys.getsizeof(list_comp)} bytes") # ~87616 bytes
# 生成器表达式
gen_expr = (x for x in range(10000))
print(f"生成器大小: {sys.getsizeof(gen_expr)} bytes") # ~112 bytes
实用示例
# 1. 过滤和转换
numbers = range(1, 11)
even_squares = (x**2 for x in numbers if x % 2 == 0)
print(list(even_squares)) # [4, 16, 36, 64, 100]
# 2. 链式处理
lines = (line.strip() for line in open("data.txt"))
non_empty = (line for line in lines if line)
uppercase = (line.upper() for line in non_empty)
for line in uppercase:
print(line)
# 3. 作为函数参数
sum_of_squares = sum(x**2 for x in range(10))
print(sum_of_squares) # 285
max_value = max((x**2 for x in range(10)))
print(max_value) # 81
# 4. 内存友好的数据处理
total = sum(int(line) for line in open("numbers.txt"))
# 3.4、itertools模块
itertools提供了高效的迭代器工具。
无限迭代器
from itertools import count, cycle, repeat
# count:无限计数
for i in count(10, 2): # 从10开始,步长2
if i > 20:
break
print(i) # 10, 12, 14, 16, 18, 20
# cycle:无限循环
counter = 0
for item in cycle(['A', 'B', 'C']):
if counter >= 5:
break
print(item, end=" ") # A B C A B
counter += 1
# repeat:重复元素
for item in repeat('Hello', 3):
print(item) # Hello(3次)
组合迭代器
from itertools import chain, zip_longest, product, combinations, permutations
# chain:连接多个迭代器
for item in chain([1, 2], [3, 4], [5, 6]):
print(item, end=" ") # 1 2 3 4 5 6
# zip_longest:配对(补齐)
for pair in zip_longest([1, 2, 3], ['a', 'b'], fillvalue='?'):
print(pair) # (1, 'a'), (2, 'b'), (3, '?')
# product:笛卡尔积
for pair in product([1, 2], ['a', 'b']):
print(pair) # (1, 'a'), (1, 'b'), (2, 'a'), (2, 'b')
# combinations:组合
for combo in combinations([1, 2, 3, 4], 2):
print(combo) # (1,2), (1,3), (1,4), (2,3), (2,4), (3,4)
# permutations:排列
for perm in permutations([1, 2, 3], 2):
print(perm) # (1,2), (1,3), (2,1), (2,3), (3,1), (3,2)
过滤和分组
from itertools import filterfalse, dropwhile, takewhile, groupby
# filterfalse:过滤假值
data = [1, 0, 2, 0, 3]
non_zero = filterfalse(lambda x: x == 0, data)
print(list(non_zero)) # [1, 2, 3]
# dropwhile:丢弃直到条件为假
numbers = [1, 3, 5, 6, 7, 8, 9]
result = dropwhile(lambda x: x < 6, numbers)
print(list(result)) # [6, 7, 8, 9]
# takewhile:获取直到条件为假
result = takewhile(lambda x: x < 6, numbers)
print(list(result)) # [1, 3, 5]
# groupby:分组
data = [('A', 1), ('A', 2), ('B', 3), ('B', 4), ('A', 5)]
for key, group in groupby(data, lambda x: x[0]):
print(f"{key}: {list(group)}")
# A: [('A', 1), ('A', 2)]
# B: [('B', 3), ('B', 4)]
# A: [('A', 5)]
实用组合
from itertools import islice, tee, accumulate
# islice:切片迭代器
gen = (x**2 for x in range(100))
first_10 = list(islice(gen, 10))
print(first_10) # [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
# tee:复制迭代器
gen1, gen2 = tee(range(5), 2)
print(list(gen1)) # [0, 1, 2, 3, 4]
print(list(gen2)) # [0, 1, 2, 3, 4]
# accumulate:累积
from operator import mul
numbers = [1, 2, 3, 4, 5]
print(list(accumulate(numbers))) # [1, 3, 6, 10, 15](累加)
print(list(accumulate(numbers, mul))) # [1, 2, 6, 24, 120](累乘)
# 3.5、惰性求值优势
生成器的惰性求值特性带来巨大优势。
内存效率
# ❌ 列表:立即计算,占用大量内存
def process_data_list():
data = [expensive_operation(x) for x in range(1000000)]
return sum(data)
# ✅ 生成器:惰性计算,内存占用小
def process_data_generator():
data = (expensive_operation(x) for x in range(1000000))
return sum(data)
无限序列
# 生成器可以表示无限序列
def primes():
"""无限素数生成器"""
yield 2
candidates = count(3, 2)
while True:
prime = next(candidates)
yield prime
candidates = (x for x in candidates if x % prime != 0)
# 取前10个素数
prime_gen = primes()
first_10_primes = [next(prime_gen) for _ in range(10)]
print(first_10_primes) # [2, 3, 5, 7, 11, 13, 17, 19, 23, 29]
管道式处理
# 数据处理管道
def read_log(filename):
"""读取日志"""
with open(filename) as f:
for line in f:
yield line.strip()
def filter_errors(lines):
"""过滤错误"""
for line in lines:
if 'ERROR' in line:
yield line
def parse_timestamp(lines):
"""解析时间戳"""
for line in lines:
# 假设格式:[2024-01-01 10:00:00] ERROR: message
yield line.split(']')[0][1:]
# 管道组合
lines = read_log("app.log")
errors = filter_errors(lines)
timestamps = parse_timestamp(errors)
# 惰性执行,只在需要时处理
for ts in timestamps:
print(ts)
小结对比表
| 特性 | 列表 | 生成器 | Java Stream |
|---|---|---|---|
| 求值方式 | 立即 | 惰性 | 惰性 |
| 内存占用 | 大 | 小 | 小 |
| 可重复使用 | ✅ | ❌ | ❌ |
| 索引访问 | ✅ | ❌ | ❌ |
| 长度获取 | ✅ | ❌ | ❌ |
| 无限序列 | ❌ | ✅ | ✅ |
# 4、装饰器
装饰器是Python的强大特性,允许在不修改原函数/类的情况下增强其功能。这类似于Java的注解+AOP,但更灵活强大。
# 4.1、函数装饰器基础
基本概念
装饰器本质是一个接受函数作为参数并返回新函数的高阶函数。
# 最简单的装饰器
def my_decorator(func):
"""装饰器函数"""
def wrapper():
print("函数执行前")
func()
print("函数执行后")
return wrapper
# 使用装饰器(方式1:手动包装)
def say_hello():
print("Hello!")
say_hello = my_decorator(say_hello)
say_hello()
# 输出:
# 函数执行前
# Hello!
# 函数执行后
# 使用装饰器(方式2:@语法糖)
@my_decorator
def say_world():
print("World!")
say_world()
# 输出:
# 函数执行前
# World!
# 函数执行后
带参数的函数装饰
def my_decorator(func):
def wrapper(*args, **kwargs):
"""接受任意参数"""
print(f"调用 {func.__name__},参数: {args}, {kwargs}")
result = func(*args, **kwargs)
print(f"返回值: {result}")
return result
return wrapper
@my_decorator
def add(a, b):
return a + b
@my_decorator
def greet(name, greeting="Hello"):
return f"{greeting}, {name}!"
# 使用
result = add(3, 5)
# 调用 add,参数: (3, 5), {}
# 返回值: 8
message = greet("张三", greeting="你好")
# 调用 greet,参数: ('张三',), {'greeting': '你好'}
# 返回值: 你好, 张三!
对比Java:
// Java需要使用注解+AOP或代理模式
@Around("@annotation(LogExecution)")
public Object logExecution(ProceedingJoinPoint joinPoint) throws Throwable {
System.out.println("方法执行前");
Object result = joinPoint.proceed();
System.out.println("方法执行后");
return result;
}
@LogExecution
public void sayHello() {
System.out.println("Hello!");
}
# 4.2、functools.wraps保留元信息
装饰器会改变函数的元信息,functools.wraps用于保留原函数的元数据。
from functools import wraps
# ❌ 不使用wraps
def bad_decorator(func):
def wrapper(*args, **kwargs):
"""这是wrapper的文档"""
return func(*args, **kwargs)
return wrapper
@bad_decorator
def my_function():
"""这是my_function的文档"""
pass
print(my_function.__name__) # wrapper(错误!)
print(my_function.__doc__) # 这是wrapper的文档(错误!)
# ✅ 使用wraps
def good_decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
"""这是wrapper的文档"""
return func(*args, **kwargs)
return wrapper
@good_decorator
def my_function2():
"""这是my_function2的文档"""
pass
print(my_function2.__name__) # my_function2(正确!)
print(my_function2.__doc__) # 这是my_function2的文档(正确!)
标准装饰器模板
from functools import wraps
def my_decorator(func):
"""标准装饰器模板"""
@wraps(func)
def wrapper(*args, **kwargs):
# 执行前的逻辑
print(f"Before calling {func.__name__}")
# 调用原函数
result = func(*args, **kwargs)
# 执行后的逻辑
print(f"After calling {func.__name__}")
return result
return wrapper
# 4.3、带参数的装饰器
装饰器本身也可以接受参数。
from functools import wraps
def repeat(times):
"""重复执行装饰器"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
for i in range(times):
print(f"第{i+1}次执行:")
result = func(*args, **kwargs)
return result
return wrapper
return decorator
@repeat(3)
def greet(name):
print(f"Hello, {name}!")
greet("张三")
# 输出:
# 第1次执行:
# Hello, 张三!
# 第2次执行:
# Hello, 张三!
# 第3次执行:
# Hello, 张三!
工作原理
# @repeat(3) 等价于:
# greet = repeat(3)(greet)
# 分步理解:
decorator = repeat(3) # 调用repeat(3),返回decorator函数
greet = decorator(greet) # 调用decorator(greet),返回wrapper
实用带参数装饰器示例
from functools import wraps
import time
def retry(max_attempts=3, delay=1):
"""重试装饰器"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
for attempt in range(max_attempts):
try:
return func(*args, **kwargs)
except Exception as e:
if attempt == max_attempts - 1:
raise e
print(f"第{attempt + 1}次尝试失败,{delay}秒后重试...")
time.sleep(delay)
return wrapper
return decorator
@retry(max_attempts=5, delay=2)
def unstable_api_call():
"""不稳定的API调用"""
import random
if random.random() < 0.7:
raise ConnectionError("API调用失败")
return "成功"
# 使用
result = unstable_api_call()
# 4.4、类装饰器
类也可以作为装饰器,通过实现__call__方法。
from functools import wraps
class CountCalls:
"""统计函数调用次数"""
def __init__(self, func):
wraps(func)(self)
self.func = func
self.count = 0
def __call__(self, *args, **kwargs):
self.count += 1
print(f"{self.func.__name__} 已被调用 {self.count} 次")
return self.func(*args, **kwargs)
@CountCalls
def say_hello():
print("Hello!")
say_hello() # say_hello 已被调用 1 次 -> Hello!
say_hello() # say_hello 已被调用 2 次 -> Hello!
say_hello() # say_hello 已被调用 3 次 -> Hello!
print(say_hello.count) # 3
带参数的类装饰器
from functools import wraps
class LogWith:
"""带日志级别的装饰器"""
def __init__(self, level="INFO"):
self.level = level
def __call__(self, func):
@wraps(func)
def wrapper(*args, **kwargs):
print(f"[{self.level}] 调用 {func.__name__}")
result = func(*args, **kwargs)
print(f"[{self.level}] {func.__name__} 返回 {result}")
return result
return wrapper
@LogWith(level="DEBUG")
def add(a, b):
return a + b
result = add(3, 5)
# [DEBUG] 调用 add
# [DEBUG] add 返回 8
# 4.5、装饰器叠加
多个装饰器可以叠加使用。
def make_bold(func):
@wraps(func)
def wrapper(*args, **kwargs):
return "<b>" + func(*args, **kwargs) + "</b>"
return wrapper
def make_italic(func):
@wraps(func)
def wrapper(*args, **kwargs):
return "<i>" + func(*args, **kwargs) + "</i>"
return wrapper
@make_bold
@make_italic
def say_hello():
return "Hello!"
print(say_hello()) # <b><i>Hello!</i></b>
# 等价于:
# say_hello = make_bold(make_italic(say_hello))
# 执行顺序:从下到上装饰,从上到下执行
顺序理解
def decorator1(func):
print(f"装饰器1 装饰 {func.__name__}")
@wraps(func)
def wrapper(*args, **kwargs):
print("装饰器1: 执行前")
result = func(*args, **kwargs)
print("装饰器1: 执行后")
return result
return wrapper
def decorator2(func):
print(f"装饰器2 装饰 {func.__name__}")
@wraps(func)
def wrapper(*args, **kwargs):
print("装饰器2: 执行前")
result = func(*args, **kwargs)
print("装饰器2: 执行后")
return result
return wrapper
@decorator1
@decorator2
def test():
print("执行test函数")
# 装饰阶段输出:
# 装饰器2 装饰 test
# 装饰器1 装饰 wrapper
test()
# 执行阶段输出:
# 装饰器1: 执行前
# 装饰器2: 执行前
# 执行test函数
# 装饰器2: 执行后
# 装饰器1: 执行后
# 4.6、常用内置装饰器
Python提供了一些常用的内置装饰器。
@property(属性装饰器)
class Circle:
def __init__(self, radius):
self._radius = radius
@property
def radius(self):
"""半径(只读)"""
return self._radius
@property
def area(self):
"""面积(计算属性)"""
return 3.14159 * self._radius ** 2
@area.setter
def area(self, value):
"""通过面积反推半径"""
self._radius = (value / 3.14159) ** 0.5
circle = Circle(5)
print(circle.area) # 78.53975
circle.area = 100
print(circle.radius) # 5.641895835477563
@staticmethod(静态方法)
class MathUtils:
@staticmethod
def add(a, b):
"""静态方法,不需要访问类或实例"""
return a + b
@staticmethod
def is_even(n):
return n % 2 == 0
# 使用
print(MathUtils.add(3, 5)) # 8
print(MathUtils.is_even(4)) # True
@classmethod(类方法)
class Date:
def __init__(self, year, month, day):
self.year = year
self.month = month
self.day = day
@classmethod
def from_string(cls, date_string):
"""工厂方法:从字符串创建"""
year, month, day = map(int, date_string.split('-'))
return cls(year, month, day)
@classmethod
def today(cls):
"""工厂方法:创建今天的日期"""
import datetime
now = datetime.date.today()
return cls(now.year, now.month, now.day)
# 使用
date1 = Date.from_string("2024-10-26")
date2 = Date.today()
@cached_property(缓存属性)
from functools import cached_property
class DataProcessor:
def __init__(self, data):
self.data = data
@cached_property
def processed_data(self):
"""计算量大的属性,只计算一次"""
print("处理数据中...")
import time
time.sleep(2) # 模拟耗时操作
return [x * 2 for x in self.data]
processor = DataProcessor([1, 2, 3, 4, 5])
print(processor.processed_data) # 处理数据中... [2, 4, 6, 8, 10]
print(processor.processed_data) # [2, 4, 6, 8, 10](直接返回缓存)
# 4.7、装饰器实战案例
案例1:性能计时器
from functools import wraps
import time
def timer(func):
"""测量函数执行时间"""
@wraps(func)
def wrapper(*args, **kwargs):
start = time.time()
result = func(*args, **kwargs)
end = time.time()
print(f"{func.__name__} 执行时间: {end - start:.4f}秒")
return result
return wrapper
@timer
def slow_function():
time.sleep(1)
return "完成"
result = slow_function()
# slow_function 执行时间: 1.0012秒
案例2:权限检查
from functools import wraps
def require_auth(func):
"""检查用户是否已登录"""
@wraps(func)
def wrapper(*args, **kwargs):
# 假设有一个全局的当前用户对象
if not hasattr(wrapper, 'current_user') or not wrapper.current_user:
raise PermissionError("需要登录")
return func(*args, **kwargs)
return wrapper
def require_role(role):
"""检查用户角色"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
if not hasattr(wrapper, 'current_user'):
raise PermissionError("需要登录")
if wrapper.current_user.get('role') != role:
raise PermissionError(f"需要{role}权限")
return func(*args, **kwargs)
return wrapper
return decorator
@require_auth
def view_profile():
return "个人资料"
@require_role('admin')
def delete_user(user_id):
return f"删除用户 {user_id}"
# 设置当前用户
view_profile.current_user = {"username": "张三", "role": "user"}
delete_user.current_user = {"username": "管理员", "role": "admin"}
print(view_profile()) # 个人资料
print(delete_user(123)) # 删除用户 123
案例3:缓存/记忆化
from functools import wraps
def memoize(func):
"""缓存函数结果"""
cache = {}
@wraps(func)
def wrapper(*args):
if args not in cache:
print(f"计算 {func.__name__}{args}")
cache[args] = func(*args)
else:
print(f"从缓存获取 {args}")
return cache[args]
# 添加清除缓存的方法
wrapper.cache = cache
wrapper.clear_cache = lambda: cache.clear()
return wrapper
@memoize
def fibonacci(n):
if n < 2:
return n
return fibonacci(n-1) + fibonacci(n-2)
print(fibonacci(10))
# 计算 fibonacci(10)
# 计算 fibonacci(9)
# ...
# 55
print(fibonacci(10)) # 从缓存获取 (10)
print(fibonacci.cache) # 查看缓存内容
fibonacci.clear_cache() # 清除缓存
案例4:输入验证
from functools import wraps
def validate_types(**type_hints):
"""验证函数参数类型"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
# 获取函数签名
import inspect
sig = inspect.signature(func)
bound = sig.bind(*args, **kwargs)
bound.apply_defaults()
# 验证类型
for param_name, expected_type in type_hints.items():
if param_name in bound.arguments:
value = bound.arguments[param_name]
if not isinstance(value, expected_type):
raise TypeError(
f"{param_name} 应为 {expected_type.__name__},"
f"实际为 {type(value).__name__}"
)
return func(*args, **kwargs)
return wrapper
return decorator
@validate_types(name=str, age=int, salary=float)
def create_employee(name, age, salary):
return f"{name}, {age}岁, 薪资{salary}"
# 正确调用
print(create_employee("张三", 25, 5000.0))
# 错误调用
try:
create_employee("李四", "30", 6000.0) # age类型错误
except TypeError as e:
print(e) # age 应为 int,实际为 str
案例5:日志记录
from functools import wraps
import logging
logging.basicConfig(level=logging.INFO)
def log_calls(log_args=True, log_result=True):
"""记录函数调用日志"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
logger = logging.getLogger(func.__module__)
# 记录调用
if log_args:
logger.info(f"调用 {func.__name__},参数: {args}, {kwargs}")
else:
logger.info(f"调用 {func.__name__}")
try:
result = func(*args, **kwargs)
# 记录结果
if log_result:
logger.info(f"{func.__name__} 返回: {result}")
return result
except Exception as e:
logger.error(f"{func.__name__} 抛出异常: {e}")
raise
return wrapper
return decorator
@log_calls(log_args=True, log_result=True)
def divide(a, b):
return a / b
result = divide(10, 2)
# INFO:__main__:调用 divide,参数: (10, 2), {}
# INFO:__main__:divide 返回: 5.0
try:
divide(10, 0)
except ZeroDivisionError:
pass
# INFO:__main__:调用 divide,参数: (10, 0), {}
# ERROR:__main__:divide 抛出异常: division by zero
案例6:单例模式
from functools import wraps
def singleton(cls):
"""单例装饰器"""
instances = {}
@wraps(cls)
def get_instance(*args, **kwargs):
if cls not in instances:
instances[cls] = cls(*args, **kwargs)
return instances[cls]
return get_instance
@singleton
class DatabaseConnection:
def __init__(self, host, port):
self.host = host
self.port = port
print(f"连接到数据库 {host}:{port}")
# 使用
db1 = DatabaseConnection("localhost", 5432)
# 连接到数据库 localhost:5432
db2 = DatabaseConnection("localhost", 5432)
# (不会打印,因为返回的是同一个实例)
print(db1 is db2) # True
小结对比表
| 特性 | Python装饰器 | Java注解+AOP |
|---|---|---|
| 语法 | @decorator | @Annotation |
| 灵活性 | 高(运行时修改行为) | 中(需要额外配置) |
| 嵌套 | 支持多层装饰 | 支持 |
| 带参数 | 支持 | 支持 |
| 类装饰 | 支持装饰类 | 主要用于方法 |
| 内省 | 容易(__wrapped__) | 需要反射 |
Python的装饰器比Java的注解更灵活强大,可以在运行时动态修改函数/类的行为,而且语法更简洁!
# 5、上下文管理器
上下文管理器让资源管理变得优雅安全,自动处理资源的获取和释放。这类似于Java的try-with-resources,但更灵活。
# 5.1、with语句原理
基本概念
with语句确保资源在使用后被正确清理,即使发生异常也能保证。
# 传统方式:需要手动管理
file = open("data.txt", "r")
try:
content = file.read()
process(content)
finally:
file.close() # 必须手动关闭
# with语句:自动管理
with open("data.txt", "r") as file:
content = file.read()
process(content)
# 自动关闭文件,即使发生异常
对比Java:
// Java try-with-resources(Java 7+)
try (BufferedReader reader = new BufferedReader(new FileReader("data.txt"))) {
String content = reader.readLine();
process(content);
} // 自动关闭资源
上下文管理器协议
上下文管理器需要实现__enter__和__exit__方法:
class MyContextManager:
def __enter__(self):
"""进入with块时调用"""
print("进入上下文")
return self # 返回值赋给as后的变量
def __exit__(self, exc_type, exc_value, traceback):
"""退出with块时调用"""
print("退出上下文")
if exc_type is not None:
print(f"发生异常: {exc_type.__name__}: {exc_value}")
return False # False表示不抑制异常
# 使用
with MyContextManager() as cm:
print("执行代码")
# raise ValueError("测试异常")
# 输出:
# 进入上下文
# 执行代码
# 退出上下文
__exit__方法参数说明
def __exit__(self, exc_type, exc_value, traceback):
"""
exc_type: 异常类型(如果没有异常则为None)
exc_value: 异常实例(如果没有异常则为None)
traceback: 异常堆栈(如果没有异常则为None)
返回值:
- False或None: 异常继续传播
- True: 抑制异常(异常被吞掉)
"""
pass
# 5.2、自定义上下文管理器
文件操作管理器
class FileManager:
"""文件管理器"""
def __init__(self, filename, mode):
self.filename = filename
self.mode = mode
self.file = None
def __enter__(self):
"""打开文件"""
print(f"打开文件: {self.filename}")
self.file = open(self.filename, self.mode)
return self.file
def __exit__(self, exc_type, exc_value, traceback):
"""关闭文件"""
if self.file:
print(f"关闭文件: {self.filename}")
self.file.close()
return False
# 使用
with FileManager("test.txt", "w") as f:
f.write("Hello, World!")
# 输出:
# 打开文件: test.txt
# 关闭文件: test.txt
数据库连接管理器
class DatabaseConnection:
"""数据库连接管理器"""
def __init__(self, host, database):
self.host = host
self.database = database
self.connection = None
def __enter__(self):
"""建立连接"""
print(f"连接数据库: {self.host}/{self.database}")
# 这里模拟连接
self.connection = f"Connection({self.host}, {self.database})"
return self.connection
def __exit__(self, exc_type, exc_value, traceback):
"""关闭连接"""
print("关闭数据库连接")
self.connection = None
# 如果有异常,回滚事务
if exc_type is not None:
print("发生异常,回滚事务")
else:
print("提交事务")
return False
# 使用
with DatabaseConnection("localhost", "mydb") as conn:
print(f"使用连接: {conn}")
# 执行数据库操作
计时器上下文管理器
import time
class Timer:
"""计时器上下文管理器"""
def __init__(self, name="代码块"):
self.name = name
self.start_time = None
def __enter__(self):
self.start_time = time.time()
return self
def __exit__(self, exc_type, exc_value, traceback):
elapsed = time.time() - self.start_time
print(f"{self.name} 执行时间: {elapsed:.4f}秒")
return False
# 使用
with Timer("数据处理"):
# 模拟耗时操作
time.sleep(1)
result = sum(range(1000000))
# 输出: 数据处理 执行时间: 1.xxxx秒
异常处理上下文管理器
class IgnoreException:
"""忽略特定异常"""
def __init__(self, *exceptions):
self.exceptions = exceptions
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, traceback):
# 如果异常类型在指定列表中,抑制异常
if exc_type is not None and issubclass(exc_type, self.exceptions):
print(f"忽略异常: {exc_type.__name__}: {exc_value}")
return True # 抑制异常
return False
# 使用
with IgnoreException(ValueError, KeyError):
data = {"a": 1}
value = data["b"] # KeyError,但会被忽略
print("继续执行") # 这行不会执行
print("程序继续") # 这行会执行
# 输出:
# 忽略异常: KeyError: 'b'
# 程序继续
# 5.3、contextlib模块
contextlib模块提供了创建上下文管理器的便捷工具。
@contextmanager装饰器
使用生成器函数创建上下文管理器:
from contextlib import contextmanager
@contextmanager
def file_manager(filename, mode):
"""使用装饰器创建上下文管理器"""
print(f"打开文件: {filename}")
file = open(filename, mode)
try:
yield file # yield前是__enter__,yield后是__exit__
finally:
print(f"关闭文件: {filename}")
file.close()
# 使用
with file_manager("test.txt", "w") as f:
f.write("Hello!")
工作原理
@contextmanager
def my_context():
# __enter__阶段
print("进入")
resource = "资源对象"
try:
yield resource # 返回给as变量
# with块正常结束后继续执行
print("正常退出")
except Exception as e:
# with块抛出异常时执行
print(f"异常退出: {e}")
raise # 重新抛出异常
finally:
# __exit__阶段,无论如何都执行
print("清理资源")
# 使用
with my_context() as res:
print(f"使用: {res}")
实用示例
from contextlib import contextmanager
import time
@contextmanager
def timing(label):
"""计时上下文管理器"""
start = time.time()
try:
yield
finally:
end = time.time()
print(f"{label}: {end - start:.4f}秒")
# 使用
with timing("数据库查询"):
time.sleep(0.5)
# 执行查询
临时修改环境
from contextlib import contextmanager
import os
@contextmanager
def temporary_env(**env_vars):
"""临时设置环境变量"""
old_env = {}
# 保存旧值并设置新值
for key, value in env_vars.items():
old_env[key] = os.environ.get(key)
os.environ[key] = value
try:
yield
finally:
# 恢复旧值
for key, old_value in old_env.items():
if old_value is None:
os.environ.pop(key, None)
else:
os.environ[key] = old_value
# 使用
with temporary_env(DEBUG="true", LOG_LEVEL="debug"):
print(os.environ.get("DEBUG")) # true
# 执行需要特定环境的代码
print(os.environ.get("DEBUG")) # None(已恢复)
suppress(抑制异常)
from contextlib import suppress
# 传统写法
try:
os.remove("somefile.txt")
except FileNotFoundError:
pass
# 使用suppress
with suppress(FileNotFoundError):
os.remove("somefile.txt")
# 抑制多个异常
with suppress(FileNotFoundError, PermissionError):
os.remove("protected_file.txt")
closing(自动关闭)
from contextlib import closing
from urllib.request import urlopen
# 确保对象的close()方法被调用
with closing(urlopen("http://example.com")) as page:
content = page.read()
# 自动调用page.close()
redirect_stdout/redirect_stderr(重定向输出)
from contextlib import redirect_stdout, redirect_stderr
import io
# 重定向标准输出
f = io.StringIO()
with redirect_stdout(f):
print("这些内容会被捕获")
print("不会打印到控制台")
output = f.getvalue()
print(f"捕获的输出: {output}")
# 重定向到文件
with open("output.txt", "w") as f:
with redirect_stdout(f):
print("写入文件")
help(str.upper)
ExitStack(管理多个上下文)
from contextlib import ExitStack
# 动态管理多个上下文
with ExitStack() as stack:
# 动态添加上下文管理器
files = [
stack.enter_context(open(f"file{i}.txt", "w"))
for i in range(5)
]
# 使用所有文件
for i, f in enumerate(files):
f.write(f"内容 {i}\n")
# 所有文件自动关闭
# 条件性添加上下文
def process_files(filenames, use_backup=False):
with ExitStack() as stack:
files = []
for filename in filenames:
file = stack.enter_context(open(filename, "r"))
files.append(file)
if use_backup:
backup = stack.enter_context(
open(f"{filename}.bak", "w")
)
files.append(backup)
# 处理文件
for f in files:
process(f)
# 5.4、嵌套上下文管理器
多个with语句
# 方式1:嵌套with
with open("input.txt", "r") as infile:
with open("output.txt", "w") as outfile:
data = infile.read()
outfile.write(data.upper())
# 方式2:单行with(Python 2.7+)
with open("input.txt", "r") as infile, open("output.txt", "w") as outfile:
data = infile.read()
outfile.write(data.upper())
组合使用
from contextlib import contextmanager
import threading
@contextmanager
def acquire_lock(lock):
"""获取锁"""
print("获取锁")
lock.acquire()
try:
yield
finally:
print("释放锁")
lock.release()
# 使用
lock = threading.Lock()
with acquire_lock(lock):
# 临界区代码
print("执行临界区代码")
# 5.5、资源管理最佳实践
案例1:数据库事务管理
from contextlib import contextmanager
class Database:
def __init__(self):
self.connection = None
@contextmanager
def transaction(self):
"""事务上下文管理器"""
print("开始事务")
try:
yield self
print("提交事务")
# self.connection.commit()
except Exception as e:
print(f"回滚事务: {e}")
# self.connection.rollback()
raise
# 使用
db = Database()
with db.transaction():
# 执行数据库操作
pass
案例2:临时修改配置
from contextlib import contextmanager
class Config:
debug = False
log_level = "INFO"
@contextmanager
def temporary_config(**changes):
"""临时修改配置"""
original = {}
# 保存并修改
for key, value in changes.items():
original[key] = getattr(Config, key)
setattr(Config, key, value)
try:
yield Config
finally:
# 恢复
for key, value in original.items():
setattr(Config, key, value)
# 使用
print(Config.debug) # False
with temporary_config(debug=True, log_level="DEBUG"):
print(Config.debug) # True
print(Config.log_level) # DEBUG
print(Config.debug) # False(已恢复)
案例3:批量操作
from contextlib import contextmanager
@contextmanager
def batch_operation(batch_size=100):
"""批量操作上下文"""
items = []
def add_item(item):
"""添加项目"""
items.append(item)
if len(items) >= batch_size:
process_batch(items)
items.clear()
try:
yield add_item
finally:
# 处理剩余项目
if items:
process_batch(items)
def process_batch(items):
print(f"处理批次: {len(items)} 项")
# 使用
with batch_operation(batch_size=3) as add:
for i in range(10):
add(f"项目{i}")
# 输出:
# 处理批次: 3 项
# 处理批次: 3 项
# 处理批次: 3 项
# 处理批次: 1 项
案例4:性能分析
from contextlib import contextmanager
import time
import functools
@contextmanager
def profile_section(name):
"""性能分析上下文"""
start_time = time.time()
start_memory = 0 # 简化示例
try:
yield
finally:
elapsed = time.time() - start_time
print(f"{name}:")
print(f" 时间: {elapsed:.4f}秒")
# 使用
with profile_section("数据处理"):
data = [i ** 2 for i in range(1000000)]
with profile_section("数据保存"):
time.sleep(0.1)
案例5:资源池管理
from contextlib import contextmanager
from queue import Queue
class ConnectionPool:
"""连接池"""
def __init__(self, size=5):
self.pool = Queue(maxsize=size)
for i in range(size):
self.pool.put(f"Connection_{i}")
@contextmanager
def get_connection(self):
"""获取连接"""
conn = self.pool.get()
print(f"获取连接: {conn}")
try:
yield conn
finally:
print(f"归还连接: {conn}")
self.pool.put(conn)
# 使用
pool = ConnectionPool(size=2)
with pool.get_connection() as conn1:
print(f"使用 {conn1}")
with pool.get_connection() as conn2:
print(f"使用 {conn2}")
小结对比表
| 特性 | Python上下文管理器 | Java try-with-resources |
|---|---|---|
| 语法 | with obj as var: | try (Type var = ...) {} |
| 协议 | __enter__/__exit__ | AutoCloseable.close() |
| 自定义 | 类或@contextmanager | 实现AutoCloseable |
| 嵌套 | 支持 | 支持 |
| 异常抑制 | 支持(__exit__返回True) | 不支持 |
| 多资源 | with a, b: | try (A a; B b) |
# 6、函数式编程
Python虽然不是纯函数式语言,但提供了丰富的函数式编程工具。对于熟悉Java 8+ Stream API的开发者来说,Python的函数式特性会感觉很亲切。
# 6.1、map()、filter()、reduce()
这三个函数是函数式编程的基石。
// Java Stream API方式
List<Integer> squares = IntStream.range(0, 10)
.map(x -> x * x)
.boxed()
.collect(Collectors.toList());
List<Integer> evenSquares = IntStream.range(0, 10)
.filter(x -> x % 2 == 0)
.map(x -> x * x)
.boxed()
.collect(Collectors.toList());
// 多重循环 - Java需要嵌套Stream或flatMap
List<String> pairs = IntStream.rangeClosed(1, 3)
.boxed()
.flatMap(x -> Stream.of("a", "b", "c")
.map(y -> "(" + x + ", " + y + ")"))
.collect(Collectors.toList());
字符串处理示例:
# 提取所有单词的首字母
sentence = "Hello World Python Programming"
initials = [word[0] for word in sentence.split()]
print(initials) # ['H', 'W', 'P', 'P']
# 过滤并转换
words = ["apple", "banana", "cherry", "date"]
upper_long_words = [w.upper() for w in words if len(w) > 5]
print(upper_long_words) # ['BANANA', 'CHERRY']
# 嵌套列表展平
matrix = [[1, 2, 3], [4, 5, 6], [7, 8, 9]]
flat = [num for row in matrix for num in row]
print(flat) # [1, 2, 3, 4, 5, 6, 7, 8, 9]
# 6.2、高阶函数
高阶函数是指接受函数作为参数或返回函数的函数。
函数作为参数:
def apply_operation(numbers, operation):
"""对列表中每个数字应用操作"""
return [operation(x) for x in numbers]
# 定义不同的操作
def square(x):
return x ** 2
def double(x):
return x * 2
def negate(x):
return -x
numbers = [1, 2, 3, 4, 5]
print(apply_operation(numbers, square)) # [1, 4, 9, 16, 25]
print(apply_operation(numbers, double)) # [2, 4, 6, 8, 10]
print(apply_operation(numbers, negate)) # [-1, -2, -3, -4, -5]
# 使用lambda
print(apply_operation(numbers, lambda x: x**3)) # [1, 8, 27, 64, 125]
函数作为返回值:
def make_multiplier(n):
"""返回一个乘以n的函数"""
def multiplier(x):
return x * n
return multiplier
times2 = make_multiplier(2)
times10 = make_multiplier(10)
print(times2(5)) # 10
print(times10(5)) # 50
# 实际应用:创建验证器
def make_range_validator(min_val, max_val):
"""创建范围验证函数"""
def validator(value):
return min_val <= value <= max_val
return validator
age_validator = make_range_validator(0, 120)
percentage_validator = make_range_validator(0, 100)
print(age_validator(25)) # True
print(age_validator(150)) # False
print(percentage_validator(85)) # True
内置高阶函数:
# sorted() - 自定义排序
students = [
{'name': 'Alice', 'age': 25, 'score': 85},
{'name': 'Bob', 'age': 22, 'score': 92},
{'name': 'Charlie', 'age': 23, 'score': 78}
]
# 按年龄排序
by_age = sorted(students, key=lambda s: s['age'])
print([s['name'] for s in by_age]) # ['Bob', 'Charlie', 'Alice']
# 按分数降序
by_score = sorted(students, key=lambda s: s['score'], reverse=True)
print([s['name'] for s in by_score]) # ['Bob', 'Alice', 'Charlie']
# 多级排序:先按分数降序,再按年龄升序
multi_sort = sorted(students, key=lambda s: (-s['score'], s['age']))
# max()/min() - 自定义比较
oldest = max(students, key=lambda s: s['age'])
print(oldest['name']) # Alice
highest_score = max(students, key=lambda s: s['score'])
print(highest_score['name']) # Bob
# 6.3、偏函数(functools.partial)
偏函数用于固定函数的某些参数,创建新函数。
from functools import partial
# 基本示例
def power(base, exponent):
return base ** exponent
# 创建偏函数 - 固定exponent=2
square = partial(power, exponent=2)
print(square(5)) # 25
print(square(10)) # 100
# 固定exponent=3
cube = partial(power, exponent=3)
print(cube(5)) # 125
# 实际应用1:日志记录
def log_message(message, level='INFO', timestamp=True):
import datetime
prefix = f"[{datetime.datetime.now()}] " if timestamp else ""
print(f"{prefix}{level}: {message}")
# 创建专用日志函数
log_error = partial(log_message, level='ERROR')
log_warning = partial(log_message, level='WARNING')
log_debug = partial(log_message, level='DEBUG', timestamp=False)
log_error("Database connection failed")
log_warning("Low memory")
log_debug("Variable value: 42")
# 实际应用2:数据转换
def convert_value(value, multiplier=1, offset=0, round_digits=2):
"""通用数值转换函数"""
result = value * multiplier + offset
return round(result, round_digits)
# 摄氏度转华氏度: F = C * 9/5 + 32
celsius_to_fahrenheit = partial(convert_value, multiplier=9/5, offset=32)
print(celsius_to_fahrenheit(0)) # 32.0
print(celsius_to_fahrenheit(100)) # 212.0
# 米转英尺: feet = meter * 3.28084
meter_to_feet = partial(convert_value, multiplier=3.28084)
print(meter_to_feet(10)) # 32.81
# 实际应用3:配置HTTP请求
import functools
def make_request(url, method='GET', headers=None, timeout=30):
"""模拟HTTP请求"""
headers = headers or {}
print(f"{method} {url}")
print(f"Headers: {headers}")
print(f"Timeout: {timeout}s")
# API特定配置
api_headers = {'Authorization': 'Bearer token123', 'Content-Type': 'application/json'}
api_request = partial(make_request, headers=api_headers, timeout=60)
# 使用
api_request('https://api.example.com/users')
api_request('https://api.example.com/posts', method='POST')
与Java对比:
// Java没有直接的偏函数,需要手动包装
BiFunction<Integer, Integer, Integer> power = (base, exp) -> (int) Math.pow(base, exp);
// 创建"偏函数"
Function<Integer, Integer> square = base -> power.apply(base, 2);
Function<Integer, Integer> cube = base -> power.apply(base, 3);
# 6.4、函数组合
函数组合是将多个函数组合成一个新函数。
# 手动实现函数组合
def compose(*functions):
"""从右到左组合函数"""
def inner(arg):
result = arg
for func in reversed(functions):
result = func(result)
return result
return inner
# 定义基础函数
def add_one(x):
return x + 1
def double(x):
return x * 2
def square(x):
return x ** 2
# 组合函数:(x+1) * 2 然后平方
combined = compose(square, double, add_one)
print(combined(3)) # ((3+1)*2)^2 = 64
# 更优雅的实现
from functools import reduce
def compose2(*functions):
"""使用reduce实现函数组合"""
return reduce(lambda f, g: lambda x: f(g(x)), functions)
combined2 = compose2(square, double, add_one)
print(combined2(3)) # 64
# 实际应用:数据处理管道
def remove_spaces(text):
return text.replace(' ', '')
def to_lowercase(text):
return text.lower()
def remove_punctuation(text):
import string
return text.translate(str.maketrans('', '', string.punctuation))
# 组合文本清洗函数
clean_text = compose(remove_punctuation, to_lowercase, remove_spaces)
text = "Hello, World! Python is AWESOME."
print(clean_text(text)) # heloworldpythonisawesome
# 更Pythonic的方式:使用管道
class Pipeline:
"""函数管道"""
def __init__(self, value):
self.value = value
def pipe(self, func):
"""应用函数并返回新的Pipeline"""
return Pipeline(func(self.value))
def get(self):
"""获取最终值"""
return self.value
# 使用管道
result = (Pipeline("Hello, World!")
.pipe(str.lower)
.pipe(lambda s: s.replace(' ', ''))
.pipe(lambda s: s.replace('!', ''))
.get())
print(result) # helloworld
实战:数据转换管道
# 处理用户数据
users = [
{'name': ' ALICE ', 'age': '25', 'email': 'ALICE@EXAMPLE.COM'},
{'name': 'bob', 'age': '30', 'email': 'bob@example.com '},
{'name': ' Charlie ', 'age': '22', 'email': ' charlie@EXAMPLE.com'}
]
# 定义转换函数
def clean_name(user):
user['name'] = user['name'].strip().capitalize()
return user
def convert_age(user):
user['age'] = int(user['age'])
return user
def normalize_email(user):
user['email'] = user['email'].strip().lower()
return user
# 组合所有转换
from functools import reduce
def process_user(user):
transformations = [clean_name, convert_age, normalize_email]
return reduce(lambda u, transform: transform(u), transformations, user)
# 应用到所有用户
processed = list(map(process_user, users))
for user in processed:
print(user)
# {'name': 'Alice', 'age': 25, 'email': 'alice@example.com'}
# {'name': 'Bob', 'age': 30, 'email': 'bob@example.com'}
# {'name': 'Charlie', 'age': 22, 'email': 'charlie@example.com'}
# 6.5、纯函数与副作用
纯函数是函数式编程的核心概念。
纯函数的特征:
# 纯函数 - 相同输入总是产生相同输出,无副作用
def add(a, b):
return a + b
def multiply(a, b):
return a * b
# 纯函数 - 不修改输入
def append_item_pure(lst, item):
"""返回新列表,不修改原列表"""
return lst + [item]
original = [1, 2, 3]
new_list = append_item_pure(original, 4)
print(original) # [1, 2, 3] - 未被修改
print(new_list) # [1, 2, 3, 4]
# 非纯函数 - 有副作用
def append_item_impure(lst, item):
"""直接修改列表"""
lst.append(item)
return lst
original = [1, 2, 3]
new_list = append_item_impure(original, 4)
print(original) # [1, 2, 3, 4] - 被修改了!
print(new_list) # [1, 2, 3, 4]
# 非纯函数 - 依赖外部状态
counter = 0
def increment_impure():
global counter
counter += 1
return counter
print(increment_impure()) # 1
print(increment_impure()) # 2 - 相同调用,不同结果!
# 纯函数替代方案
def increment_pure(value):
return value + 1
counter = 0
counter = increment_pure(counter) # 1
counter = increment_pure(counter) # 2
纯函数的优势:
# 1. 可测试性
def calculate_total(items, tax_rate):
"""纯函数 - 易于测试"""
subtotal = sum(item['price'] * item['quantity'] for item in items)
return subtotal * (1 + tax_rate)
# 测试简单
items = [
{'price': 10, 'quantity': 2},
{'price': 5, 'quantity': 3}
]
assert calculate_total(items, 0.1) == 38.5
# 2. 可组合性
def filter_active(users):
return [u for u in users if u.get('active', False)]
def sort_by_name(users):
return sorted(users, key=lambda u: u['name'])
def get_emails(users):
return [u['email'] for u in users]
# 轻松组合
users = [
{'name': 'Alice', 'active': True, 'email': 'alice@example.com'},
{'name': 'Bob', 'active': False, 'email': 'bob@example.com'},
{'name': 'Charlie', 'active': True, 'email': 'charlie@example.com'}
]
active_emails = get_emails(sort_by_name(filter_active(users)))
print(active_emails) # ['alice@example.com', 'charlie@example.com']
# 3. 并发安全
from concurrent.futures import ThreadPoolExecutor
def square_pure(x):
"""纯函数 - 线程安全"""
return x ** 2
numbers = range(100)
with ThreadPoolExecutor(max_workers=4) as executor:
results = list(executor.map(square_pure, numbers))
避免副作用的技巧:
# 1. 使用copy避免修改原始数据
import copy
def update_user_pure(user, **updates):
"""返回更新后的新用户对象"""
new_user = copy.deepcopy(user)
new_user.update(updates)
return new_user
user = {'name': 'Alice', 'age': 25}
updated = update_user_pure(user, age=26)
print(user) # {'name': 'Alice', 'age': 25}
print(updated) # {'name': 'Alice', 'age': 26}
# 2. 使用不可变数据结构
from collections import namedtuple
User = namedtuple('User', ['name', 'age', 'email'])
user = User('Alice', 25, 'alice@example.com')
# 创建新对象而不是修改
updated = user._replace(age=26)
print(user) # User(name='Alice', age=25, ...)
print(updated) # User(name='Alice', age=26, ...)
# 3. 使用dataclasses(Python 3.7+)
from dataclasses import dataclass, replace
@dataclass(frozen=True) # frozen=True使其不可变
class Product:
name: str
price: float
quantity: int
product = Product('Book', 29.99, 10)
# product.price = 19.99 # 错误!不可修改
# 使用replace创建新对象
discounted = replace(product, price=19.99)
print(product) # Product(name='Book', price=29.99, quantity=10)
print(discounted) # Product(name='Book', price=19.99, quantity=10)
# 7、类型提示(Type Hints)
作为Java开发者,你习惯了强类型系统。Python从3.5版本开始引入了类型提示,虽然不强制检查,但能让代码更清晰、IDE提示更好。
# 7.1、基本类型注解
# 变量类型注解
name: str = "Alice"
age: int = 25
salary: float = 5000.50
is_active: bool = True
# 函数参数和返回值注解
def greet(name: str, age: int) -> str:
return f"Hello {name}, you are {age} years old"
def add(a: int, b: int) -> int:
return a + b
def get_user_info() -> dict:
return {"name": "Alice", "age": 25}
# 无返回值
def log_message(message: str) -> None:
print(message)
# Java对比
"""
// Java强制类型
String name = "Alice";
int age = 25;
public String greet(String name, int age) {
return "Hello " + name;
}
"""
# 7.2、复合类型注解
from typing import List, Dict, Tuple, Set
# 列表类型
numbers: List[int] = [1, 2, 3, 4]
names: List[str] = ["Alice", "Bob"]
# 字典类型
user: Dict[str, int] = {"age": 25, "score": 90}
config: Dict[str, any] = {"host": "localhost", "port": 8080}
# 元组类型
point: Tuple[int, int] = (10, 20)
person: Tuple[str, int, bool] = ("Alice", 25, True)
# 集合类型
unique_ids: Set[int] = {1, 2, 3}
# 嵌套类型
users: List[Dict[str, any]] = [
{"name": "Alice", "age": 25},
{"name": "Bob", "age": 30}
]
# 实际应用
def get_user_scores() -> Dict[str, List[int]]:
"""返回用户和他们的分数列表"""
return {
"Alice": [85, 90, 88],
"Bob": [92, 87, 95]
}
def process_data(items: List[Tuple[str, int]]) -> Dict[str, int]:
"""处理数据列表"""
return {name: score for name, score in items}
# 7.3、Optional与Union
from typing import Optional, Union
# Optional - 可能为None
def find_user(user_id: int) -> Optional[dict]:
"""返回用户或None"""
if user_id > 0:
return {"id": user_id, "name": "Alice"}
return None
# Optional[X] 等价于 Union[X, None]
def get_config(key: str) -> Optional[str]:
return None
# Union - 多种可能类型
def process_value(value: Union[int, str, float]) -> str:
"""处理int、str或float类型"""
return str(value)
# 实际应用
def parse_input(data: Union[str, bytes]) -> str:
"""处理字符串或字节"""
if isinstance(data, bytes):
return data.decode('utf-8')
return data
# Java对比
"""
// Java使用泛型和null
Optional<User> findUser(int id) {
if (id > 0) {
return Optional.of(new User(id));
}
return Optional.empty();
}
"""
# 7.4、泛型类型
from typing import TypeVar, Generic, List
# 定义类型变量
T = TypeVar('T')
# 泛型函数
def first_element(items: List[T]) -> Optional[T]:
"""返回列表第一个元素"""
return items[0] if items else None
# 泛型类
class Stack(Generic[T]):
"""通用栈实现"""
def __init__(self) -> None:
self._items: List[T] = []
def push(self, item: T) -> None:
self._items.append(item)
def pop(self) -> Optional[T]:
return self._items.pop() if self._items else None
# 使用
int_stack: Stack[int] = Stack()
int_stack.push(1)
int_stack.push(2)
str_stack: Stack[str] = Stack()
str_stack.push("hello")
# 7.5、typing模块常用类型
from typing import Callable, Any, Sequence, Mapping, Iterable
# Callable - 可调用对象
def apply_func(func: Callable[[int, int], int], a: int, b: int) -> int:
return func(a, b)
result = apply_func(lambda x, y: x + y, 3, 5) # 8
# Any - 任意类型
def process_data(data: Any) -> str:
return str(data)
# Sequence - 序列类型
def sum_values(numbers: Sequence[int]) -> int:
return sum(numbers)
sum_values([1, 2, 3]) # List
sum_values((1, 2, 3)) # Tuple
# Mapping - 映射类型
def print_config(config: Mapping[str, any]) -> None:
for key, value in config.items():
print(f"{key}: {value}")
# Iterable - 可迭代对象
def process_items(items: Iterable[str]) -> List[str]:
return [item.upper() for item in items]
# 7.6、类型别名
from typing import List, Dict, Tuple
# 定义类型别名
UserId = int
UserName = str
Score = float
# 复杂类型别名
User = Dict[str, any]
UserList = List[User]
Coordinate = Tuple[float, float]
ScoreBoard = Dict[UserName, List[Score]]
# 使用类型别名
def get_user(user_id: UserId) -> User:
return {"id": user_id, "name": "Alice"}
def calculate_average(scores: ScoreBoard) -> Dict[UserName, Score]:
return {
name: sum(score_list) / len(score_list)
for name, score_list in scores.items()
}
# 7.7、mypy静态类型检查
# 安装mypy: pip install mypy
# 示例代码 example.py
def add_numbers(a: int, b: int) -> int:
return a + b
result: str = add_numbers(1, 2) # 类型错误!
# 运行检查
# mypy example.py
# error: Incompatible types in assignment (expression has type "int", variable has type "str")
# 配置mypy: mypy.ini
"""
[mypy]
python_version = 3.9
warn_return_any = True
warn_unused_configs = True
disallow_untyped_defs = True
"""
# 实际项目示例
from typing import List, Optional
class UserService:
"""用户服务类"""
def __init__(self, db_connection: any) -> None:
self.db = db_connection
def find_user(self, user_id: int) -> Optional[Dict[str, any]]:
"""查找用户"""
# 实现...
return {"id": user_id, "name": "Alice"}
def get_all_users(self) -> List[Dict[str, any]]:
"""获取所有用户"""
return [{"id": 1, "name": "Alice"}]
对比总结:
| 特性 | Python类型提示 | Java类型系统 |
|---|---|---|
| 强制性 | 可选,运行时不检查 | 强制,编译时检查 |
| 基本语法 | name: str | String name |
| 泛型 | List[int] | List<Integer> |
| 可选类型 | Optional[int] | Optional<Integer> |
| 联合类型 | Union[int, str] | 不直接支持 |
| 类型别名 | UserId = int | typedef(C++)或接口 |
| 检查工具 | mypy, pyright | javac内置 |
| IDE支持 | VSCode, PyCharm | IntelliJ IDEA |
# 8、反射与内省
Python的反射能力比Java更强大。作为Java开发者,你会发现Python的反射API更简单直接。
# 8.1、inspect模块
import inspect
# 定义示例类
class User:
"""用户类"""
def __init__(self, name: str, age: int):
self.name = name
self.age = age
def greet(self):
return f"Hello, I'm {self.name}"
@staticmethod
def create_guest():
return User("Guest", 0)
# 检查对象类型
print(inspect.isclass(User)) # True
print(inspect.isfunction(User.greet)) # True
print(inspect.ismethod(User().greet)) # True
# 获取源代码
print(inspect.getsource(User)) # 打印User类的源代码
print(inspect.getfile(User)) # 获取文件路径
# 获取函数签名
def example_func(name: str, age: int = 18) -> str:
return f"{name}: {age}"
sig = inspect.signature(example_func)
print(sig) # (name: str, age: int = 18) -> str
for param_name, param in sig.parameters.items():
print(f"{param_name}: {param.annotation}, default={param.default}")
# 获取类成员
members = inspect.getmembers(User)
for name, value in members:
print(f"{name}: {type(value)}")
# 8.2、动态属性操作
class Config:
host = "localhost"
port = 8080
# getattr - 获取属性
print(getattr(Config, 'host')) # localhost
print(getattr(Config, 'timeout', 30)) # 30 (默认值)
# setattr - 设置属性
setattr(Config, 'host', '192.168.1.1')
print(Config.host) # 192.168.1.1
# hasattr - 检查属性是否存在
print(hasattr(Config, 'host')) # True
print(hasattr(Config, 'timeout')) # False
# delattr - 删除属性
delattr(Config, 'port')
print(hasattr(Config, 'port')) # False
# 实际应用:动态配置加载
config_data = {
'database_host': 'localhost',
'database_port': 3306,
'cache_enabled': True
}
class AppConfig:
pass
for key, value in config_data.items():
setattr(AppConfig, key, value)
print(AppConfig.database_host) # localhost
# 8.3、动态导入模块
import importlib
# 动态导入模块
math_module = importlib.import_module('math')
print(math_module.sqrt(16)) # 4.0
# 动态导入并获取属性
module = importlib.import_module('json')
dumps_func = getattr(module, 'dumps')
print(dumps_func({'name': 'Alice'})) # {"name": "Alice"}
# __import__() 函数
os_module = __import__('os')
print(os_module.getcwd())
# 实际应用:插件系统
def load_plugin(plugin_name: str):
"""动态加载插件"""
try:
module = importlib.import_module(f'plugins.{plugin_name}')
if hasattr(module, 'Plugin'):
return module.Plugin()
else:
raise AttributeError(f"Plugin class not found in {plugin_name}")
except ImportError as e:
print(f"Failed to load plugin: {e}")
return None
# 8.4、类型检查
# type() vs isinstance()
value = [1, 2, 3]
print(type(value)) # <class 'list'>
print(type(value) == list) # True
# isinstance() - 推荐使用,支持继承
print(isinstance(value, list)) # True
print(isinstance(value, (list, tuple))) # True - 检查多个类型
# issubclass() - 判断继承关系
class Animal:
pass
class Dog(Animal):
pass
print(issubclass(Dog, Animal)) # True
print(issubclass(Dog, object)) # True - 所有类都继承自object
# 鸭子类型应用
def process_iterable(obj):
"""处理任何可迭代对象"""
if hasattr(obj, '__iter__'):
for item in obj:
print(item)
else:
print("Not iterable")
process_iterable([1, 2, 3]) # 处理列表
process_iterable("hello") # 处理字符串
process_iterable(range(5)) # 处理range对象
# 8.5、动态创建类
# 使用type()创建类
# type(name, bases, dict)
User = type('User', (object,), {
'name': 'Alice',
'greet': lambda self: f"Hello, {self.name}"
})
user = User()
print(user.name) # Alice
print(user.greet()) # Hello, Alice
# 更复杂的示例
def __init__(self, name, age):
self.name = name
self.age = age
def get_info(self):
return f"{self.name}, {self.age} years old"
Person = type('Person', (object,), {
'__init__': __init__,
'get_info': get_info
})
p = Person("Bob", 25)
print(p.get_info()) # Bob, 25 years old
# 实际应用:ORM模型动态生成
def create_model(table_name: str, fields: dict):
"""动态创建数据库模型"""
def __init__(self, **kwargs):
for key, value in kwargs.items():
setattr(self, key, value)
def __repr__(self):
field_strs = [f"{k}={getattr(self, k)}" for k in fields.keys()]
return f"{table_name}({', '.join(field_strs)})"
attrs = {
'__init__': __init__,
'__repr__': __repr__,
'_table_name': table_name,
'_fields': fields
}
return type(table_name, (object,), attrs)
# 创建User模型
UserModel = create_model('users', {
'id': 'INTEGER',
'name': 'VARCHAR(100)',
'email': 'VARCHAR(100)'
})
user = UserModel(id=1, name='Alice', email='alice@example.com')
print(user) # users(id=1, name=Alice, email=alice@example.com)
# 8.6、元类(Metaclass)基础
# 元类是创建类的类
class Meta(type):
"""自定义元类"""
def __new__(mcs, name, bases, attrs):
# 在类创建时执行
print(f"Creating class: {name}")
# 添加类属性
attrs['created_by'] = 'Meta'
return super().__new__(mcs, name, bases, attrs)
# 使用元类
class MyClass(metaclass=Meta):
pass
# 输出: Creating class: MyClass
print(MyClass.created_by) # Meta
# 实际应用:自动注册类
_registry = {}
class RegisterMeta(type):
"""自动注册元类"""
def __new__(mcs, name, bases, attrs):
cls = super().__new__(mcs, name, bases, attrs)
if name != 'Base': # 跳过基类
_registry[name] = cls
return cls
class Base(metaclass=RegisterMeta):
pass
class PluginA(Base):
pass
class PluginB(Base):
pass
print(_registry) # {'PluginA': <class '__main__.PluginA'>, 'PluginB': <class '__main__.PluginB'>}
# 8.7、实战案例
案例1:简易依赖注入
class Container:
"""依赖注入容器"""
def __init__(self):
self._services = {}
def register(self, interface, implementation):
"""注册服务"""
self._services[interface] = implementation
def resolve(self, interface):
"""解析服务"""
implementation = self._services.get(interface)
if implementation is None:
raise ValueError(f"Service {interface} not registered")
# 检查是否需要依赖注入
sig = inspect.signature(implementation.__init__)
dependencies = {}
for param_name, param in sig.parameters.items():
if param_name == 'self':
continue
if param.annotation != inspect.Parameter.empty:
dependencies[param_name] = self.resolve(param.annotation)
return implementation(**dependencies)
# 使用示例
class Database:
def query(self, sql):
return f"Executing: {sql}"
class UserRepository:
def __init__(self, db: Database):
self.db = db
def find_all(self):
return self.db.query("SELECT * FROM users")
# 注册服务
container = Container()
container.register(Database, Database)
container.register(UserRepository, UserRepository)
# 解析服务(自动注入依赖)
repo = container.resolve(UserRepository)
print(repo.find_all()) # Executing: SELECT * FROM users
案例2:序列化/反序列化
import json
from typing import get_type_hints
class Serializable:
"""可序列化基类"""
def to_dict(self):
"""转换为字典"""
result = {}
for key, value in self.__dict__.items():
if isinstance(value, Serializable):
result[key] = value.to_dict()
else:
result[key] = value
return result
@classmethod
def from_dict(cls, data: dict):
"""从字典创建对象"""
hints = get_type_hints(cls.__init__)
kwargs = {}
for key, value in data.items():
if key in hints:
hint = hints[key]
# 如果是Serializable子类,递归创建
if isinstance(hint, type) and issubclass(hint, Serializable):
kwargs[key] = hint.from_dict(value)
else:
kwargs[key] = value
else:
kwargs[key] = value
return cls(**kwargs)
class Address(Serializable):
def __init__(self, city: str, street: str):
self.city = city
self.street = street
class User(Serializable):
def __init__(self, name: str, age: int, address: Address):
self.name = name
self.age = age
self.address = address
# 使用
user = User("Alice", 25, Address("Beijing", "Main St"))
user_dict = user.to_dict()
print(json.dumps(user_dict, indent=2))
# 反序列化
restored = User.from_dict(user_dict)
print(f"{restored.name} lives in {restored.address.city}")
对比总结:
| 特性 | Python | Java |
|---|---|---|
| 反射API | inspect, getattr, setattr | java.lang.reflect |
| 获取类信息 | inspect.getmembers() | Class.getDeclaredMethods() |
| 动态调用 | getattr(obj, 'method')() | method.invoke(obj) |
| 动态创建类 | type(name, bases, dict) | Proxy.newProxyInstance() |
| 类型检查 | isinstance(), issubclass() | instanceof, isAssignableFrom() |
| 元类 | class Meta(type) | 不支持(使用注解处理器) |
| 灵活性 | 极高 | 中等 |
| 性能 | 较慢 | 较快 |
# 二、Python标准库
Python的标准库非常丰富,开箱即用。作为Java开发者,你会发现很多功能在Python标准库中已经实现,无需引入第三方依赖。
# 1、常用内置模块
# 1.1、os与sys模块
os模块 - 操作系统交互
import os
# 获取当前工作目录
print(os.getcwd()) # D:\workspace\project
# 改变工作目录
os.chdir('/tmp')
# 列出目录内容
files = os.listdir('.')
print(files)
# 创建目录
os.mkdir('new_folder')
os.makedirs('path/to/folder', exist_ok=True) # 递归创建
# 删除文件和目录
os.remove('file.txt') # 删除文件
os.rmdir('folder') # 删除空目录
import shutil
shutil.rmtree('folder') # 删除非空目录
# 文件和目录判断
print(os.path.exists('file.txt')) # 是否存在
print(os.path.isfile('file.txt')) # 是否是文件
print(os.path.isdir('folder')) # 是否是目录
# 路径操作
full_path = os.path.join('path', 'to', 'file.txt') # path/to/file.txt
dirname = os.path.dirname('/path/to/file.txt') # /path/to
basename = os.path.basename('/path/to/file.txt') # file.txt
name, ext = os.path.splitext('file.txt') # ('file', '.txt')
# 环境变量
print(os.environ.get('PATH'))
os.environ['MY_VAR'] = 'value'
# 执行系统命令
os.system('ls -l') # 不推荐,使用subprocess模块
sys模块 - Python解释器交互
import sys
# 命令行参数
print(sys.argv) # ['script.py', 'arg1', 'arg2']
# Python版本信息
print(sys.version)
print(sys.version_info) # sys.version_info(major=3, minor=9, ...)
# 模块搜索路径
print(sys.path)
sys.path.append('/custom/path')
# 退出程序
sys.exit(0) # 正常退出
sys.exit(1) # 异常退出
# 标准输入输出
sys.stdout.write("Hello\n")
line = sys.stdin.readline()
# 获取对象大小
numbers = [1, 2, 3, 4, 5]
print(sys.getsizeof(numbers)) # 字节数
Java对比:
// Java获取当前目录
String currentDir = System.getProperty("user.dir");
// Java环境变量
String path = System.getenv("PATH");
// Java命令行参数
public static void main(String[] args) {
// args数组
}
# 1.2、datetime模块
from datetime import datetime, date, time, timedelta
# 获取当前时间
now = datetime.now()
print(now) # 2025-01-26 15:30:45.123456
today = date.today()
print(today) # 2025-01-26
# 创建日期时间
dt = datetime(2025, 1, 26, 15, 30, 45)
d = date(2025, 1, 26)
t = time(15, 30, 45)
# 格式化输出
print(now.strftime('%Y-%m-%d %H:%M:%S')) # 2025-01-26 15:30:45
print(now.strftime('%Y年%m月%d日')) # 2025年01月26日
# 解析字符串
dt = datetime.strptime('2025-01-26 15:30:45', '%Y-%m-%d %H:%M:%S')
# 日期运算
tomorrow = today + timedelta(days=1)
next_week = today + timedelta(weeks=1)
one_hour_later = now + timedelta(hours=1)
# 时间差
diff = datetime(2025, 12, 31) - now
print(diff.days) # 剩余天数
print(diff.total_seconds()) # 总秒数
# 时间戳
timestamp = now.timestamp() # 转为时间戳
dt = datetime.fromtimestamp(timestamp) # 从时间戳创建
# Java对比
"""
// Java 8+
LocalDateTime now = LocalDateTime.now();
LocalDate today = LocalDate.now();
// 格式化
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
String formatted = now.format(formatter);
// 日期运算
LocalDate tomorrow = today.plusDays(1);
"""
# 1.3、json模块
import json
# Python对象转JSON字符串
data = {
'name': 'Alice',
'age': 25,
'skills': ['Python', 'Java'],
'active': True
}
json_str = json.dumps(data)
print(json_str) # {"name": "Alice", "age": 25, ...}
# 美化输出
json_str = json.dumps(data, indent=2, ensure_ascii=False)
print(json_str)
# JSON字符串转Python对象
data = json.loads(json_str)
print(data['name']) # Alice
# 读写JSON文件
# 写入
with open('data.json', 'w', encoding='utf-8') as f:
json.dump(data, f, indent=2, ensure_ascii=False)
# 读取
with open('data.json', 'r', encoding='utf-8') as f:
data = json.load(f)
# 自定义JSON编码
from datetime import datetime
class DateTimeEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, datetime):
return obj.isoformat()
return super().default(obj)
data = {'created': datetime.now()}
json_str = json.dumps(data, cls=DateTimeEncoder)
# Java对比
"""
// Java使用Jackson或Gson
ObjectMapper mapper = new ObjectMapper();
String json = mapper.writeValueAsString(data);
Data data = mapper.readValue(json, Data.class);
"""
# 1.4、re模块(正则表达式)
import re
# 基本匹配
text = "My email is alice@example.com"
match = re.search(r'\w+@\w+\.\w+', text)
if match:
print(match.group()) # alice@example.com
# 查找所有匹配
text = "Phone: 123-456-7890, Mobile: 098-765-4321"
phones = re.findall(r'\d{3}-\d{3}-\d{4}', text)
print(phones) # ['123-456-7890', '098-765-4321']
# 替换
text = "Hello World"
result = re.sub(r'World', 'Python', text)
print(result) # Hello Python
# 分割
text = "apple,banana;orange:grape"
fruits = re.split(r'[,;:]', text)
print(fruits) # ['apple', 'banana', 'orange', 'grape']
# 编译正则(提高性能)
pattern = re.compile(r'\d+')
numbers = pattern.findall("I have 3 apples and 5 oranges")
print(numbers) # ['3', '5']
# 捕获组
text = "Name: Alice, Age: 25"
match = re.search(r'Name: (\w+), Age: (\d+)', text)
if match:
print(match.group(1)) # Alice
print(match.group(2)) # 25
print(match.groups()) # ('Alice', '25')
# 常用正则表达式
email_pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
url_pattern = r'^https?://(?:www\.)?[-a-zA-Z0-9@:%._\+~#=]{1,256}\.[a-zA-Z0-9()]{1,6}\b'
phone_pattern = r'^\d{3}-\d{3}-\d{4}$'
# 1.5、math与random模块
import math
import random
# math模块
print(math.pi) # 3.141592653589793
print(math.e) # 2.718281828459045
print(math.sqrt(16)) # 4.0
print(math.pow(2, 3)) # 8.0
print(math.ceil(4.3)) # 5
print(math.floor(4.7)) # 4
print(math.fabs(-5)) # 5.0
# 三角函数
print(math.sin(math.pi / 2)) # 1.0
print(math.cos(0)) # 1.0
# random模块
print(random.random()) # 0.0到1.0的随机浮点数
# 随机整数
print(random.randint(1, 10)) # 1到10的随机整数(包含10)
print(random.randrange(1, 10)) # 1到9的随机整数(不含10)
# 随机选择
colors = ['red', 'green', 'blue']
print(random.choice(colors)) # 随机选择一个
# 随机多个(有放回)
print(random.choices(colors, k=3)) # ['red', 'blue', 'red']
# 随机多个(无放回)
print(random.sample(colors, k=2)) # ['green', 'red']
# 打乱列表
numbers = [1, 2, 3, 4, 5]
random.shuffle(numbers)
print(numbers) # [3, 1, 5, 2, 4]
# 设置随机种子(可复现)
random.seed(42)
print(random.random()) # 相同种子产生相同序列
# 1.6、collections模块
from collections import Counter, defaultdict, deque, namedtuple, OrderedDict
# Counter - 计数器
words = ['apple', 'banana', 'apple', 'orange', 'banana', 'apple']
counter = Counter(words)
print(counter) # Counter({'apple': 3, 'banana': 2, 'orange': 1})
print(counter.most_common(2)) # [('apple', 3), ('banana', 2)]
# defaultdict - 默认值字典
dd = defaultdict(list)
dd['fruits'].append('apple') # 不需要检查key是否存在
print(dd) # defaultdict(<class 'list'>, {'fruits': ['apple']})
# 按类别分组
data = [('fruit', 'apple'), ('veg', 'carrot'), ('fruit', 'banana')]
grouped = defaultdict(list)
for category, item in data:
grouped[category].append(item)
print(dict(grouped)) # {'fruit': ['apple', 'banana'], 'veg': ['carrot']}
# deque - 双端队列
dq = deque([1, 2, 3])
dq.append(4) # 右端添加
dq.appendleft(0) # 左端添加
dq.pop() # 右端移除
dq.popleft() # 左端移除
print(dq) # deque([1, 2, 3])
# 限制长度的deque(用作循环缓冲区)
buffer = deque(maxlen=3)
for i in range(5):
buffer.append(i)
print(buffer) # deque([2, 3, 4], maxlen=3)
# namedtuple - 命名元组
Point = namedtuple('Point', ['x', 'y'])
p = Point(10, 20)
print(p.x, p.y) # 10 20
print(p[0], p[1]) # 也可以用索引
# OrderedDict - 有序字典(Python 3.7+普通dict也保序)
od = OrderedDict()
od['a'] = 1
od['b'] = 2
od['c'] = 3
print(list(od.keys())) # ['a', 'b', 'c']
# Java对比
"""
// Java Counter类似
Map<String, Integer> counter = new HashMap<>();
// 需要手动计数
// Java deque
Deque<Integer> deque = new ArrayDeque<>();
deque.addFirst(1);
deque.addLast(2);
"""
# 1.7、pathlib模块
from pathlib import Path
# 创建路径对象
p = Path('/usr/local/bin')
p = Path.home() # 用户主目录
p = Path.cwd() # 当前工作目录
# 路径拼接
config_path = Path.home() / '.config' / 'app' / 'settings.json'
print(config_path) # /home/user/.config/app/settings.json
# 路径属性
p = Path('/path/to/file.txt')
print(p.name) # file.txt
print(p.stem) # file
print(p.suffix) # .txt
print(p.parent) # /path/to
print(p.parts) # ('/', 'path', 'to', 'file.txt')
# 文件操作
p = Path('test.txt')
p.write_text('Hello World', encoding='utf-8') # 写入
content = p.read_text(encoding='utf-8') # 读取
p.unlink() # 删除文件
# 目录操作
dir_path = Path('new_folder')
dir_path.mkdir(exist_ok=True) # 创建目录
dir_path.mkdir(parents=True, exist_ok=True) # 递归创建
# 判断
p = Path('file.txt')
print(p.exists()) # 是否存在
print(p.is_file()) # 是否是文件
print(p.is_dir()) # 是否是目录
# 遍历目录
for file in Path('.').iterdir():
print(file)
# 递归查找
for py_file in Path('.').rglob('*.py'):
print(py_file)
# 对比os.path
"""
# 旧方式
import os
path = os.path.join(os.path.expanduser('~'), '.config', 'app')
# 新方式(pathlib)
path = Path.home() / '.config' / 'app'
"""
模块对比总结:
| 功能 | Python模块 | Java对应 |
|---|---|---|
| 操作系统交互 | os | System, Runtime |
| 文件路径 | pathlib, os.path | java.nio.file.Path |
| 日期时间 | datetime | java.time.* |
| JSON处理 | json | Jackson, Gson |
| 正则表达式 | re | java.util.regex.Pattern |
| 随机数 | random | java.util.Random |
| 高级集合 | collections | java.util.* |
# 2、数据处理
# 2.1、csv模块
import csv
# 读取CSV文件
with open('data.csv', 'r', encoding='utf-8') as f:
reader = csv.reader(f)
headers = next(reader) # 读取表头
for row in reader:
print(row) # ['Alice', '25', 'Beijing']
# 使用DictReader(推荐)
with open('data.csv', 'r', encoding='utf-8') as f:
reader = csv.DictReader(f)
for row in reader:
print(row['name'], row['age']) # 按列名访问
# 写入CSV文件
data = [
['name', 'age', 'city'],
['Alice', 25, 'Beijing'],
['Bob', 30, 'Shanghai']
]
with open('output.csv', 'w', encoding='utf-8', newline='') as f:
writer = csv.writer(f)
writer.writerows(data)
# 使用DictWriter
with open('output.csv', 'w', encoding='utf-8', newline='') as f:
fieldnames = ['name', 'age', 'city']
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
writer.writerow({'name': 'Alice', 'age': 25, 'city': 'Beijing'})
# 2.2、XML处理
import xml.etree.ElementTree as ET
# 解析XML
xml_str = '''
<users>
<user id="1">
<name>Alice</name>
<age>25</age>
</user>
<user id="2">
<name>Bob</name>
<age>30</age>
</user>
</users>
'''
root = ET.fromstring(xml_str)
# 遍历元素
for user in root.findall('user'):
user_id = user.get('id')
name = user.find('name').text
age = user.find('age').text
print(f"ID: {user_id}, Name: {name}, Age: {age}")
# 创建XML
root = ET.Element('users')
user = ET.SubElement(root, 'user', id='1')
ET.SubElement(user, 'name').text = 'Alice'
ET.SubElement(user, 'age').text = '25'
# 保存XML
tree = ET.ElementTree(root)
tree.write('users.xml', encoding='utf-8', xml_declaration=True)
# 2.3、pickle模块(序列化)
import pickle
# Python对象序列化
data = {
'name': 'Alice',
'scores': [85, 90, 88],
'metadata': {'created': '2025-01-26'}
}
# 保存到文件
with open('data.pkl', 'wb') as f:
pickle.dump(data, f)
# 从文件加载
with open('data.pkl', 'rb') as f:
loaded_data = pickle.load(f)
print(loaded_data)
# 序列化为字节串
bytes_data = pickle.dumps(data)
restored = pickle.loads(bytes_data)
# 2.4、struct模块(二进制数据)
import struct
# 打包二进制数据
# 格式: i=int, f=float, s=string
packed = struct.pack('i f 10s', 42, 3.14, b'Hello')
print(packed) # b'*\x00\x00\x00\xc3\xf5H@Hello\x00\x00\x00\x00\x00'
# 解包二进制数据
unpacked = struct.unpack('i f 10s', packed)
print(unpacked) # (42, 3.140000104904175, b'Hello\x00\x00\x00\x00\x00')
# 实际应用:读取二进制文件
with open('data.bin', 'rb') as f:
data = f.read(struct.calcsize('i f'))
values = struct.unpack('i f', data)
# 3、网络编程
# 3.1、socket模块
import socket
# TCP服务器
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.bind(('localhost', 8080))
server.listen(5)
print("Server listening on port 8080...")
while True:
client, addr = server.accept()
print(f"Connection from {addr}")
data = client.recv(1024)
client.send(b"Hello from server")
client.close()
# TCP客户端
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client.connect(('localhost', 8080))
client.send(b"Hello server")
response = client.recv(1024)
print(response.decode())
client.close()
# 3.2、urllib模块
from urllib import request, parse
# GET请求
response = request.urlopen('https://api.github.com')
html = response.read().decode('utf-8')
print(html)
# POST请求
data = parse.urlencode({'key': 'value'}).encode()
req = request.Request('https://httpbin.org/post', data=data)
response = request.urlopen(req)
print(response.read().decode())
# 设置请求头
req = request.Request('https://api.github.com')
req.add_header('User-Agent', 'Python App')
response = request.urlopen(req)
# 3.3、http.server
# 命令行启动简单HTTP服务器
# python -m http.server 8000
# 自定义HTTP服务器
from http.server import HTTPServer, BaseHTTPRequestHandler
class MyHandler(BaseHTTPRequestHandler):
def do_GET(self):
self.send_response(200)
self.send_header('Content-type', 'text/html')
self.end_headers()
self.wfile.write(b'<h1>Hello World</h1>')
server = HTTPServer(('localhost', 8000), MyHandler)
server.serve_forever()
# 4、多线程与多进程
# 4.1、threading模块
import threading
import time
# 创建线程
def worker(name):
print(f"Thread {name} starting")
time.sleep(2)
print(f"Thread {name} done")
threads = []
for i in range(5):
t = threading.Thread(target=worker, args=(i,))
threads.append(t)
t.start()
# 等待所有线程完成
for t in threads:
t.join()
# 线程类
class MyThread(threading.Thread):
def __init__(self, name):
super().__init__()
self.name = name
def run(self):
print(f"{self.name} is running")
# 线程锁
lock = threading.Lock()
def safe_increment():
global counter
with lock:
counter += 1
# 4.2、multiprocessing模块
from multiprocessing import Process, Pool, Queue
# 创建进程
def worker(name):
print(f"Process {name} starting")
if __name__ == '__main__':
processes = []
for i in range(5):
p = Process(target=worker, args=(i,))
processes.append(p)
p.start()
for p in processes:
p.join()
# 进程池
def square(x):
return x ** 2
if __name__ == '__main__':
with Pool(processes=4) as pool:
results = pool.map(square, range(10))
print(results)
# 4.3、concurrent.futures模块
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import time
def task(n):
time.sleep(1)
return n ** 2
# 线程池
with ThreadPoolExecutor(max_workers=5) as executor:
futures = [executor.submit(task, i) for i in range(10)]
for future in futures:
print(future.result())
# 进程池
with ProcessPoolExecutor(max_workers=4) as executor:
results = executor.map(task, range(10))
print(list(results))
# 4.4、GIL详解
GIL(全局解释器锁)是CPython的实现细节:
影响:
- 同一时刻只有一个线程执行Python字节码
- 多线程无法利用多核CPU进行CPU密集型任务
- I/O密集型任务不受影响
解决方案:
- CPU密集型: 使用multiprocessing
- I/O密集型: 使用threading或asyncio
- 混合型: 根据具体情况选择
Java对比:
- Java没有GIL,多线程可以真正并行
- Python的多线程更适合I/O操作
- Python的多进程类似Java的多线程
# 5、日志与调试
# 5.1、logging模块
import logging
# 基本配置
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S',
filename='app.log'
)
# 使用日志
logging.debug("Debug message")
logging.info("Info message")
logging.warning("Warning message")
logging.error("Error message")
logging.critical("Critical message")
# 创建logger
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
# 添加处理器
file_handler = logging.FileHandler('app.log')
console_handler = logging.StreamHandler()
# 设置格式
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
file_handler.setFormatter(formatter)
console_handler.setFormatter(formatter)
logger.addHandler(file_handler)
logger.addHandler(console_handler)
# 使用logger
logger.info("Application started")
# 5.2、pdb调试器
import pdb
def buggy_function(x, y):
result = x + y
pdb.set_trace() # 设置断点
result = result * 2
return result
# 调试命令:
# n - 下一行
# s - 进入函数
# c - 继续执行
# p variable - 打印变量
# l - 显示当前代码
# q - 退出调试
# 5.3、traceback模块
import traceback
try:
result = 1 / 0
except Exception as e:
# 打印完整堆栈
traceback.print_exc()
# 获取堆栈信息
tb_str = traceback.format_exc()
print(tb_str)
# 记录到日志
logging.error("Error occurred", exc_info=True)
# 三、异步编程
异步编程是Python的重要特性,特别适合处理I/O密集型任务。对于Java开发者来说,Python的异步编程模型比Java的CompletableFuture更加直观和强大。
# 1、asyncio基础
# 1.1、async/await语法
Python 3.5引入了async/await语法,让异步代码看起来像同步代码一样直观。
import asyncio
async def greet(name):
print(f"开始问候 {name}")
await asyncio.sleep(1) # 异步等待1秒
return f"Hello, {name}!"
async def main():
result = await greet("张三")
print(result)
# 运行异步程序
asyncio.run(main())
对比Java的异步编程:
// Java使用CompletableFuture
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "Hello, 张三!";
});
future.thenAccept(System.out::println);
核心概念:
async def:定义协程函数await:暂停当前协程,等待另一个协程完成asyncio.run():启动事件循环并运行主协程
# 1.2、协程概念
协程(Coroutine)是可以暂停和恢复执行的函数,比线程更轻量级。
import asyncio
import time
async def task(name, duration):
print(f"任务 {name} 开始")
await asyncio.sleep(duration)
print(f"任务 {name} 完成")
return f"{name} 结果"
async def main():
start_time = time.time()
# 顺序执行(总时间 = 3秒)
result1 = await task("A", 1)
result2 = await task("B", 2)
end_time = time.time()
print(f"顺序执行耗时: {end_time - start_time:.2f}秒")
asyncio.run(main())
协程 vs 线程对比:
| 特性 | 协程 | 线程 |
|---|---|---|
| 创建成本 | 极低(几KB) | 较高(几MB) |
| 切换成本 | 用户态切换 | 内核态切换 |
| 数量限制 | 可以创建数万个 | 通常数百个 |
| 适用场景 | I/O密集型 | CPU密集型 |
# 1.3、事件循环
事件循环是异步编程的核心,负责调度和执行协程。
import asyncio
async def worker(name):
print(f"{name} 开始工作")
await asyncio.sleep(1)
print(f"{name} 完成工作")
async def main():
# 创建任务
task1 = asyncio.create_task(worker("任务1"))
task2 = asyncio.create_task(worker("任务2"))
# 等待所有任务完成
await task1
await task2
print("所有任务完成")
# 使用asyncio.run()
asyncio.run(main())
# 手动控制事件循环(高级用法)
loop = asyncio.new_event_loop()
try:
loop.run_until_complete(main())
finally:
loop.close()
asyncio.run() vs await的区别:
asyncio.run():启动新的事件循环,通常用于程序入口await:在现有事件循环中等待协程完成
import asyncio
async def nested():
return "嵌套协程结果"
async def outer():
# ✅ 正确:使用await
result = await nested()
print(result)
# ❌ 错误:不能在协程中使用asyncio.run()
# asyncio.run(nested()) # RuntimeError: no running event loop
asyncio.run(outer())
# 1.4、Task与Future
Task是Future的子类,用于包装协程并调度执行。
import asyncio
async def background_task(name, delay):
print(f"后台任务 {name} 开始")
await asyncio.sleep(delay)
print(f"后台任务 {name} 完成")
return f"{name} 的结果"
async def main():
# 创建多个任务(立即开始执行)
tasks = [
asyncio.create_task(background_task("A", 2)),
asyncio.create_task(background_task("B", 1)),
asyncio.create_task(background_task("C", 3))
]
# 等待所有任务完成
results = await asyncio.gather(*tasks)
print("所有任务结果:", results)
# 带超时的等待
try:
await asyncio.wait_for(asyncio.create_task(background_task("D", 5)), timeout=3)
except asyncio.TimeoutError:
print("任务D超时")
asyncio.run(main())
Task的高级用法:
import asyncio
async def cancellable_task():
try:
for i in range(10):
print(f"工作中... {i}")
await asyncio.sleep(1)
except asyncio.CancelledError:
print("任务被取消")
# 清理资源
await asyncio.sleep(0.5)
print("清理完成")
raise
async def main():
task = asyncio.create_task(cancellable_task())
# 3秒后取消任务
await asyncio.sleep(3)
task.cancel()
try:
await task
except asyncio.CancelledError:
print("主协程捕获到取消异常")
asyncio.run(main())
# 2、异步I/O
# 2.1、异步文件操作
Python 3.4+提供了异步文件操作API。
import asyncio
import aiofiles # 第三方库,功能更完整
async def read_file_async(filename):
# 使用aiofiles(推荐)
async with aiofiles.open(filename, 'r', encoding='utf-8') as f:
content = await f.read()
print(f"文件内容长度: {len(content)}")
return content
async def write_file_async(filename, content):
async with aiofiles.open(filename, 'w', encoding='utf-8') as f:
await f.write(content)
print(f"写入文件: {filename}")
async def process_files():
# 并发处理多个文件
files = ['file1.txt', 'file2.txt', 'file3.txt']
tasks = []
for filename in files:
content = f"这是 {filename} 的内容\n"
tasks.append(write_file_async(filename, content))
await asyncio.gather(*tasks)
# 并发读取文件
read_tasks = [read_file_async(f) for f in files]
contents = await asyncio.gather(*read_tasks)
print(f"读取了 {len(contents)} 个文件")
# 安装aiofiles: pip install aiofiles
# asyncio.run(process_files())
对比Java的异步文件操作:
// Java NIO.2异步文件操作
AsynchronousFileChannel channel = AsynchronousFileChannel.open(
Paths.get("file.txt"), StandardOpenOption.READ);
ByteBuffer buffer = ByteBuffer.allocate(1024);
Future<Integer> readResult = channel.read(buffer, 0);
// 处理结果
while (!readResult.isDone()) {
// 可以做其他工作
}
# 2.2、异步网络请求
使用aiohttp库进行异步HTTP请求。
import asyncio
import aiohttp
import time
async def fetch_url(session, url):
try:
async with session.get(url) as response:
print(f"获取 {url} - 状态: {response.status}")
content = await response.text()
return {
'url': url,
'status': response.status,
'length': len(content),
'content': content[:100] # 只返回前100个字符
}
except Exception as e:
print(f"请求 {url} 失败: {e}")
return {'url': url, 'error': str(e)}
async def fetch_multiple_urls(urls):
# 创建会话(复用连接)
async with aiohttp.ClientSession() as session:
# 并发请求多个URL
tasks = [fetch_url(session, url) for url in urls]
results = await asyncio.gather(*tasks, return_exceptions=True)
# 处理结果
success_count = 0
for result in results:
if isinstance(result, dict) and 'error' not in result:
success_count += 1
print(f"✅ {result['url']}: {result['status']} ({result['length']} 字符)")
else:
print(f"❌ 失败: {result}")
print(f"成功请求: {success_count}/{len(urls)}")
return results
async def main():
urls = [
'https://httpbin.org/delay/1',
'https://httpbin.org/delay/2',
'https://httpbin.org/status/200',
'https://httpbin.org/status/404'
]
start_time = time.time()
await fetch_multiple_urls(urls)
end_time = time.time()
print(f"总耗时: {end_time - start_time:.2f}秒")
# 安装aiohttp: pip install aiohttp
# asyncio.run(main())
异步HTTP客户端的最佳实践:
import asyncio
import aiohttp
from aiohttp import ClientTimeout, ClientSession
class AsyncHttpClient:
def __init__(self, timeout=30, max_connections=100):
self.timeout = ClientTimeout(total=timeout)
self.connector = aiohttp.TCPConnector(limit=max_connections)
async def __aenter__(self):
self.session = ClientSession(
timeout=self.timeout,
connector=self.connector
)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.session.close()
async def get(self, url, **kwargs):
async with self.session.get(url, **kwargs) as response:
return await response.json()
async def post(self, url, data=None, json=None, **kwargs):
async with self.session.post(url, data=data, json=json, **kwargs) as response:
return await response.json()
async def api_client_example():
async with AsyncHttpClient() as client:
# GET请求
data = await client.get('https://api.github.com/users/python')
print(f"Python GitHub followers: {data['followers']}")
# POST请求
result = await client.post('https://httpbin.org/post', json={'key': 'value'})
print(f"POST响应: {result['json']}")
# asyncio.run(api_client_example())
# 3、异步编程最佳实践
# 3.1、何时使用异步
适合异步的场景:
- I/O密集型操作(网络请求、数据库查询、文件读写)
- 需要并发处理大量连接
- 实时性要求高的应用
- WebSocket、聊天服务器、API网关
# ✅ 适合异步:网络爬虫
import asyncio
import aiohttp
async def crawl_urls(urls):
async with aiohttp.ClientSession() as session:
tasks = [fetch_url(session, url) for url in urls]
results = await asyncio.gather(*tasks)
return results
# ✅ 适合异步:WebSocket聊天服务器
async def handle_chat_messages(websocket):
async for message in websocket:
await broadcast_to_other_clients(message)
# ❌ 不适合异步:CPU密集型计算
async def heavy_computation():
# 这会阻塞事件循环
result = sum(i * i for i in range(10000000))
return result
不适合异步的场景:
- CPU密集型计算(数值计算、图像处理)
- 简单的脚本程序
- 同步第三方库的包装
# 3.2、异步与多线程的选择
异步 vs 多线程对比:
| 维度 | 异步 | 多线程 |
|---|---|---|
| 并发模型 | 单线程事件循环 | 多线程抢占式 |
| 内存占用 | 低(单线程) | 高(每个线程栈空间) |
| 上下文切换 | 快(用户态) | 慢(内核态) |
| 编程复杂度 | 高(需要异步思维) | 中等 |
| 调试难度 | 较高 | 中等 |
| 适用场景 | I/O密集型 | CPU密集型或阻塞操作 |
混合使用示例:
import asyncio
import concurrent.futures
import time
def cpu_intensive_task(n):
"""CPU密集型任务 - 在线程池中执行"""
return sum(i * i for i in range(n))
async def async_io_task():
"""I/O密集型任务 - 异步执行"""
await asyncio.sleep(1)
return "I/O任务完成"
async def mixed_workload():
start_time = time.time()
# 创建线程池执行器
loop = asyncio.get_event_loop()
with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
# 并发执行不同类型的任务
tasks = [
# CPU密集型任务在线程池中
loop.run_in_executor(executor, cpu_intensive_task, 1000000),
loop.run_in_executor(executor, cpu_intensive_task, 2000000),
# I/O密集型任务异步执行
async_io_task(),
async_io_task(),
]
results = await asyncio.gather(*tasks)
end_time = time.time()
print(f"混合任务完成,耗时: {end_time - start_time:.2f}秒")
print(f"结果: {results}")
asyncio.run(mixed_workload())
# 3.3、常见陷阱
陷阱1:在协程中调用阻塞函数
import asyncio
import time
# ❌ 错误:阻塞事件循环
async def bad_example():
time.sleep(2) # 阻塞2秒,整个事件循环被阻塞
return "完成"
# ✅ 正确:使用异步版本
async def good_example():
await asyncio.sleep(2) # 非阻塞等待
return "完成"
# ✅ 或者在线程池中执行阻塞操作
async def blocking_in_thread():
loop = asyncio.get_event_loop()
with concurrent.futures.ThreadPoolExecutor() as executor:
result = await loop.run_in_executor(executor, time.sleep, 2)
return "完成"
陷阱2:忘记await
import asyncio
async def forget_await():
# ❌ 错误:忘记await,协程不会执行
asyncio.sleep(1)
print("这可能不会按预期执行")
async def correct_usage():
# ✅ 正确:使用await
await asyncio.sleep(1)
print("1秒后执行")
陷阱3:异常处理不当
import asyncio
async def task_with_error():
await asyncio.sleep(0.1)
raise ValueError("任务失败")
async def bad_error_handling():
# ❌ 错误:异常被忽略
asyncio.create_task(task_with_error())
async def good_error_handling():
# ✅ 正确:处理异常
task = asyncio.create_task(task_with_error())
try:
await task
except ValueError as e:
print(f"捕获到异常: {e}")
# ✅ 使用gather处理异常
async def gather_with_exceptions():
tasks = [
asyncio.create_task(task_with_error()),
asyncio.create_task(asyncio.sleep(1))
]
results = await asyncio.gather(*tasks, return_exceptions=True)
for i, result in enumerate(results):
if isinstance(result, Exception):
print(f"任务 {i} 失败: {result}")
else:
print(f"任务 {i} 成功: {result}")
陷阱4:过度使用异步
# ❌ 过度设计:简单的同步任务不需要异步
async def over_engineered():
result = await async_add(2, 3)
return result
# ✅ 简单直接
def simple():
return 2 + 3
最佳实践总结:
- 保持异步函数纯净:异步函数中只调用异步函数
- 正确处理异常:使用try/except包装可能失败的操作
- 避免阻塞操作:将阻塞操作放到线程池中
- 合理并发数:控制同时进行的任务数量
- 使用连接池:复用数据库和HTTP连接
- 性能监控:使用工具监控异步程序性能
📖 系列文章导航
👈 上一篇:Java开发者转战Python:基础篇 - Python基础语法、数据结构、面向对象编程
👉 下一篇:Java开发者转战Python:生态与质量管理 - 探索Python生态系统和工程实践
祝你变得更强!