在数据驱动的时代,文件处理是编程中不可或缺的环节。Python 凭借简洁高效的语法和丰富的标准库,成为文件处理的首选语言之一。
1.7.1 文件打开模式
在 Python 中,使用内置函数open()打开文件,其基本语法为:
f = open("filename", "mode")
有四种打开模式:
| 模式 | 描述 |
|---|---|
"r" | 读取 - 默认值。打开文件进行读取,如果文件不存在则报错 |
"a" | 追加 - 打开供追加的文件,如果不存在则创建该文件 |
"w" | 写入 - 打开文件进行夫覆盖性写入,如果文件不存在则创建该文件 |
"x" | 创建 - 创建指定的文件,如果文件存在则返回错误 |
此外,您可以指定文件是应该作为二进制还是文本模式进行处理:
| 模式 | 描述 |
|---|---|
"t" | 文本 - 默认值。文本模式 |
"b" | 二进制 - 二进制模式(例如图像) |
示例:
# 默认模式(读取文本)
f = open("demofile.txt")
# 等同于
f = open("demofile.txt", "rt")
# 二进制模式
f = open("image.jpg", "rb")
1.7.2 文件的基本CRUD
1.7.2.1 文件读取
1. 读取整个文件
使用 read() 方法读取整个文件内容:
f = open("demofile.txt", "r")
print(f.read())
f.close()
Python 的面向对像编程
这里的 f 是一个文件对象(file object),由内置函数 open() 创建。read() 和 close() 是文件对象的方法,分别用于读取文件内容和关闭文件连接。
在 Python 中,一切皆对象。每个对象都有自己的属性(数据)和方法(函数),通过 . 操作符可以访问这些属性和方法:
- 属性:对象的状态或数据(例如
file.name表示文件名)。 - 方法:对象的行为或功能(例如
file.read()用于读取文件)。
这种设计模式称为封装(Encapsulation),它将数据和操作数据的函数捆绑在一起,提高了代码的安全性和可维护性。
2. 读取部分内容
可以指定要返回的字符数:
f = open("demofile.txt", "r")
print(f.read(5)) # 读取前5个字符
f.close()
3. 读取单行
使用 readline() 方法读取一行:
f = open("demofile.txt", "r")
print(f.readline()) # 读取第一行
f.close()
4. 逐行读取
通过循环遍历文件中的行:
f = open("demofile.txt", "r")
for x in f:
print(x)
f.close()
5. 读取所有行
使用 readlines() 方法读取所有行到列表中:
f = open("demofile.txt", "r")
lines = f.readlines()
print(lines)
f.close()
1.7.2.3 文件写入
1. 写入文件
使用 write() 方法写入内容:
f = open("demofile2.txt", "w")
f.write("Hello! 1111")
f.close()
f = open("demofile2.txt", "r")
print(f.read())
f.close()
2. 追加内容
使用 "a" 模式追加内容到文件末尾:
f = open("demofile2.txt", "a")
f.write("Now the file has more content!")
f.close()
1.7.2.4 文件创建
使用 "x" 模式创建新文件:
f = open("myfile.txt", "x")
f.close()
注意: 如果文件已存在,将返回错误。
1.7.2.5 文件删除
1. 删除文件
要删除文件,必须导入OS模块,并运行其 os.remove() 函数:
import os
os.remove("demofile.txt")
2. 检查文件是否存在
在删除文件之前检查文件是否存在:
import os
if os.path.exists("demofile.txt"):
os.remove("demofile.txt")
else:
print("The file does not exist")
3. 删除文件夹
使用 os.rmdir() 方法删除文件夹:
import os
os.rmdir("myfolder")
注意: 只能删除空文件夹。
1.7.3 文件处理进阶
1.7.3.1 使用 with 语句
手动调用close()方法容易遗漏,可能导致资源泄漏或数据丢失。with语句通过上下文管理器自动管理文件生命周期:
# 读取文件
with open("demofile.txt", "r") as f:
print(f.read())
# 写入文件
with open("demofile2.txt", "w") as f:
f.write("Hello World!")
1.7.3.2 异常处理
文件操作可能遇到多种异常,如文件不存在、权限不足等。使用try-except语句捕获异常:
try:
with open("demofile.txt", "r") as f:
print(f.read())
except FileNotFoundError:
print("文件不存在")
except PermissionError:
print("没有权限访问文件")
1.7.3.3 文件编码
指定文件编码(特别是处理中文时):
# UTF-8编码
with open("demofile.txt", "r", encoding="utf-8") as f:
print(f.read())
# 写入时指定编码
with open("demofile.txt", "w", encoding="utf-8") as f:
f.write("你好,世界!")
1.7.3.4 标准库的使用
如 CSV、JSON,具体使用参考下文。
1.7.4 实际应用示例
1. 配置文件读写
# 读取配置文件
def read_config(filename):
config = {}
try:
with open(filename, "r", encoding="utf-8") as f:
for line in f:
if "=" in line:
key, value = line.strip().split("=", 1)
config[key] = value
except FileNotFoundError:
print(f"配置文件 {filename} 不存在")
return config
# 写入配置文件
def write_config(filename, config):
with open(filename, "w", encoding="utf-8") as f:
for key, value in config.items():
f.write(f"{key}={value}\n")
2. 日志文件操作
import datetime
def write_log(message):
timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
with open("app.log", "a", encoding="utf-8") as f:
f.write(f"[{timestamp}] {message}\n")
# 使用示例
write_log("应用程序启动")
write_log("用户登录成功")
3. 数据备份
import shutil
import os
def backup_file(source, backup_dir):
if not os.path.exists(backup_dir):
os.makedirs(backup_dir)
filename = os.path.basename(source)
backup_path = os.path.join(backup_dir, f"backup_{filename}")
shutil.copy2(source, backup_path)
print(f"文件已备份到: {backup_path}")
1.7.5 文件处理高级-CSV
CSV(逗号分隔值)是常见的数据存储格式,广泛用于数据分析和数据交换。Python 的 csv 模块提供了高效且安全的处理方式。
1.7.5.1 基础 CSV 操作
import datetime
import csv
def write_log(message):
timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
with open("app.log", "a", encoding="utf-8") as f:
f.write(f"[{timestamp}] {message}\n")
# 使用示例
write_log("应用程序启动")
write_log("用户登录成功")
# 写入CSV文件
with open('data.csv', 'w', newline='', encoding='utf-8') as csvfile:
writer = csv.writer(csvfile)
# 写入表头
writer.writerow(['姓名', '年龄', '城市'])
# 写入数据行
writer.writerow(['Alice', 25, '北京'])
writer.writerow(['Bob', 30, '上海'])
writer.writerow(['Charlie', 35, '广州'])
# 读取CSV文件
with open('data.csv', 'r', encoding='utf-8') as csvfile:
reader = csv.reader(csvfile)
for row in reader:
print(row)
1.7.5.2 使用字典读写 CSV
import csv
# 使用字典读写CSV(更推荐的方式)
data = [
{'姓名': 'Alice', '年龄': 25, '城市': '北京'},
{'姓名': 'Bob', '年龄': 30, '城市': '上海'},
{'姓名': 'Charlie', '年龄': 35, '城市': '广州'}
]
# 使用字典写入CSV
with open('data_dict.csv', 'w', newline='', encoding='utf-8') as csvfile:
fieldnames = ['姓名', '年龄', '城市']
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
writer.writeheader() # 写入表头
writer.writerows(data) # 写入所有数据
# 使用字典读取CSV
with open('data_dict.csv', 'r', encoding='utf-8') as csvfile:
reader = csv.DictReader(csvfile)
for row in reader:
print(f"{row['姓名']} - {row['年龄']}岁 - {row['城市']}")
1.7.5.3 CSV 处理最佳实践
import csv
import os
def safe_csv_operation(filename, operation):
"""安全的CSV操作,包含错误处理
Args:
filename (str): 要操作的CSV文件路径
operation (function): 处理csvfile的函数
Returns:
Any: operation函数的返回值
"""
try:
# 检查并创建目录
file_dir = os.path.dirname(filename)
if file_dir and not os.path.exists(file_dir):
os.makedirs(file_dir)
with open(filename, 'w', newline='', encoding='utf-8') as csvfile:
return operation(csvfile)
except PermissionError:
print(f"权限错误:无法访问文件 {filename}")
except Exception as e:
print(f"操作失败:{e}")
# 使用示例
def write_csv_data(csvfile):
writer = csv.writer(csvfile)
writer.writerow(['ID', 'Name', 'Value'])
writer.writerow([1, 'Item1', 100])
writer.writerow([2, 'Item2', 200])
if __name__ == "__main__":
safe_csv_operation('output/data.csv', write_csv_data)
1.7.6 文件处理高级-JSON
JSON(JavaScript Object Notation)是现代应用程序中最常用的数据交换格式。Python 的 json 模块提供了完整的 JSON 序列化和反序列化功能。
1.7.6.1 基础 JSON 操作
import json
# 写入JSON文件
data = {
'name': 'Bob',
'age': 30,
'city': '上海',
'skills': ['Python', 'Docker', 'Kubernetes'],
'active': True
}
with open('config.json', 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
# 读取JSON文件
with open('config.json', 'r', encoding='utf-8') as f:
loaded_data = json.load(f)
print(loaded_data)
1.7.6.2 高级 JSON 处理
import json
from datetime import datetime
class DateTimeEncoder(json.JSONEncoder):
"""自定义JSON编码器,处理datetime对象"""
def default(self, obj):
if isinstance(obj, datetime):
return obj.isoformat()
return super().default(obj)
# 包含datetime的复杂数据结构
complex_data = {
'timestamp': datetime.now(),
'user': {
'id': 12345,
'name': '张三',
'email': 'zhangsan@example.com'
},
'settings': {
'theme': 'dark',
'language': 'zh-CN',
'notifications': True
}
}
# 使用自定义编码器写入JSON
with open('complex_config.json', 'w', encoding='utf-8') as f:
json.dump(complex_data, f, ensure_ascii=False, indent=2, cls=DateTimeEncoder)
# 读取并验证JSON
def load_json_safely(filename):
"""安全地加载JSON文件,包含验证
Args:
filename (str): JSON文件路径
Returns:
dict: 解析后的JSON数据或None
"""
try:
with open(filename, 'r', encoding='utf-8') as f:
data = json.load(f)
print("JSON文件加载成功")
return data
except json.JSONDecodeError as e:
print(f"JSON格式错误:{e}")
return None
except FileNotFoundError:
print(f"文件不存在:{filename}")
return None
except Exception as e:
print(f"读取失败:{e}")
return None
loaded_config = load_json_safely('complex_config.json')
1.7.6.3 JSON 配置文件管理
import json
import os
from typing import Dict, Any
class ConfigManager:
"""配置文件管理器"""
def __init__(self, config_file: str):
self.config_file = config_file
self.config = {}
self.load_config()
def load_config(self):
"""加载配置文件"""
if os.path.exists(self.config_file):
try:
with open(self.config_file, 'r', encoding='utf-8') as f:
self.config = json.load(f)
except Exception as e:
print(f"加载配置文件失败:{e}")
self.config = {}
else:
self.config = {}
def save_config(self):
"""保存配置文件"""
try:
with open(self.config_file, 'w', encoding='utf-8') as f:
json.dump(self.config, f, ensure_ascii=False, indent=2)
print("配置文件保存成功")
except Exception as e:
print(f"保存配置文件失败:{e}")
def get(self, key: str, default: Any = None) -> Any:
"""获取配置值
Args:
key: 配置键名
default: 缺省值(默认None)
Returns:
配置值或默认值
"""
return self.config.get(key, default)
def set(self, key: str, value: Any) -> None:
"""设置配置值
Args:
key: 配置键名
value: 配置值
"""
self.config[key] = value
# 使用示例
config = ConfigManager('app_config.json')
config.set('database', {
'host': 'localhost',
'port': 5432,
'name': 'myapp'
})
config.set('logging', {
'level': 'INFO',
'file': 'app.log'
})
config.save_config()
1.7.7 文件处理高级-二进制
二进制文件包括图片、音频、视频、可执行文件等。处理这类文件需要使用二进制模式,并注意内存使用和性能优化。
1.7.7.1 基础二进制文件操作
def copy_binary_file(source_path: str, target_path: str, chunk_size: int = 8192) -> bool:
"""复制二进制文件,使用分块读取避免内存溢出
Args:
source_path: 源文件路径
target_path: 目标文件路径
chunk_size: 每次读取的块大小(默认8KB)
Returns:
复制成功返回True,失败返回False
"""
try:
with open(source_path, 'rb') as source:
with open(target_path, 'wb') as target:
while True:
chunk = source.read(chunk_size)
if not chunk:
break
target.write(chunk)
print(f"文件复制成功:{source_path} -> {target_path}")
return True
except FileNotFoundError:
print(f"源文件不存在:{source_path}")
return False
except PermissionError:
print(f"权限不足,无法访问文件")
return False
except Exception as e:
print(f"复制失败:{e}")
return False
# 使用示例
copy_binary_file('source.jpg', 'copy.jpg')
1.7.7.2 二进制文件校验
import hashlib
def calculate_file_hash(file_path: str, algorithm: str = 'md5') -> str:
"""计算文件的哈希值,用于文件完整性验证
Args:
file_path: 文件路径
algorithm: 哈希算法(默认md5)
Returns:
文件的哈希值字符串,失败时返回空字符串
"""
hash_func = getattr(hashlib, algorithm)()
try:
with open(file_path, 'rb') as f:
for chunk in iter(lambda: f.read(4096), b""):
hash_func.update(chunk)
return hash_func.hexdigest()
except Exception as e:
print(f"计算哈希值失败:{e}")
return ""
def verify_file_integrity(file_path: str, expected_hash: str, algorithm: str = 'md5') -> bool:
"""验证文件完整性
Args:
file_path: 文件路径
expected_hash: 期望的哈希值
algorithm: 哈希算法(默认md5)
Returns:
验证通过返回True,失败返回False
"""
actual_hash = calculate_file_hash(file_path, algorithm)
if actual_hash == expected_hash:
print("文件完整性验证通过")
return True
else:
print(f"文件完整性验证失败:期望 {expected_hash},实际 {actual_hash}")
return False
# 使用示例
file_hash = calculate_file_hash('source.jpg')
print(f"文件哈希值:{file_hash}")
verify_file_integrity('copy.jpg', file_hash)
1.7.7.3 大文件处理优化
import os
from typing import Generator
def read_large_file(file_path: str, chunk_size: int = 1024*1024) -> Generator[bytes, None, None]:
"""生成器函数,用于逐块读取大文件
Args:
file_path: 文件路径
chunk_size: 块大小(默认1MB)
Yields:
文件块的字节数据
"""
try:
with open(file_path, 'rb') as f:
while True:
chunk = f.read(chunk_size)
if not chunk:
break
yield chunk
except Exception as e:
print(f"读取文件失败:{e}")
def process_large_file(file_path: str, output_path: str) -> None:
"""处理大文件的示例(这里以简单复制为例)
Args:
file_path: 输入文件路径
output_path: 输出文件路径
"""
try:
with open(output_path, 'wb') as output:
for chunk in read_large_file(file_path):
# 在这里可以对chunk进行处理
# 例如:数据转换、过滤、压缩等
output.write(chunk)
print(f"大文件处理完成:{file_path} -> {output_path}")
except Exception as e:
print(f"处理失败:{e}")
def get_file_info(file_path: str) -> dict:
"""获取文件的基本信息
Args:
file_path: 文件路径
Returns:
包含文件信息的字典
"""
try:
stat = os.stat(file_path)
return {
'size': stat.st_size,
'size_mb': round(stat.st_size / (1024*1024), 2),
'modified': stat.st_mtime,
'created': stat.st_ctime
}
except Exception as e:
print(f"获取文件信息失败:{e}")
return {}
# 使用示例
file_info = get_file_info('large_file.dat')
print(f"文件大小:{file_info.get('size_mb', 0)} MB")
1.7.8 总结与运维实践
1.7.8.1 总结
- 使用
open()函数打开文件,指定正确的模式 - 使用
with语句:自动管理文件关闭,避免资源泄漏。 - 指定编码,特别是处理中文内容时
- 处理异常:对可能出现的错误进行捕获,提高程序稳定性。
- 选择合适的模式:读取用
"r",写入用"w",追加用"a" - 分块读取大文件:避免一次性加载过多数据,降低内存占用。
- 使用
os模块进行文件和目录的删除操作 - 善用标准库:针对 CSV、JSON 等特定格式,使用对应模块提高效率。
- 使用适当的缓冲区大小:对于大文件,使用合适的chunk_size
- 避免重复打开文件:使用上下文管理器确保资源正确释放
- 使用生成器处理大文件:避免将整个文件加载到内存
- 选择合适的编码:明确指定文件编码,避免编码问题
- 使用pathlib:现代Python推荐使用pathlib进行路径操作
- 异步处理:对于I/O密集型操作,考虑使用异步编程
示例:
import contextlib
import os
import asyncio
from pathlib import Path
from typing import Generator
# --------------------- 错误处理和资源管理 ---------------------
@contextlib.contextmanager
def safe_file_operation(file_path: str, mode: str = 'r', encoding: str = 'utf-8'):
"""安全的文件操作上下文管理器(自动处理异常和关闭)
Args:
file_path: 文件路径
mode: 打开模式(默认'r')
encoding: 文本编码(二进制模式时忽略)
Yields:
文件对象或None(操作失败时)
"""
file_obj = None
try:
if 'b' in mode:
file_obj = open(file_path, mode)
else:
file_obj = open(file_path, mode, encoding=encoding)
yield file_obj
except FileNotFoundError:
print(f"文件不存在:{file_path}")
yield None
except PermissionError:
print(f"权限不足:{file_path}")
yield None
except Exception as e:
print(f"文件操作失败:{e}")
yield None
finally:
if file_obj:
file_obj.close()
# ------------------------- 路径处理 -------------------------
def ensure_directory(file_path: str) -> bool:
"""确保文件所在目录存在(递归创建)
Args:
file_path: 目标文件路径
Returns:
目录创建成功返回True,失败返回False
"""
try:
Path(file_path).parent.mkdir(parents=True, exist_ok=True)
return True
except Exception as e:
print(f"创建目录失败:{e}")
return False
def get_safe_filename(filename: str) -> str:
"""生成安全文件名(移除Windows非法字符)
Args:
filename: 原始文件名
Returns:
替换非法字符后的安全文件名
"""
unsafe_chars = '<>:"/\\|?*'
for char in unsafe_chars:
filename = filename.replace(char, '_')
return filename
def backup_file(file_path: str, backup_suffix: str = '.bak') -> str:
"""创建文件备份(需提前定义copy_binary_file函数)
Args:
file_path: 源文件路径
backup_suffix: 备份后缀(默认'.bak')
Returns:
备份文件路径,失败时返回空字符串
"""
try:
backup_path = file_path + backup_suffix
if os.path.exists(file_path):
# 示例:需提前实现二进制复制函数
# copy_binary_file(file_path, backup_path)
print(f"已备份至:{backup_path}")
return backup_path
except Exception as e:
print(f"备份失败:{e}")
return ""
# ------------------------- 异步优化 -------------------------
async def async_read_file(file_path: str) -> str:
"""异步读取文件(适用于高I/O场景)
Args:
file_path: 文件路径
Returns:
文件内容,失败时返回空字符串
"""
try:
async with aiofiles.open(file_path, 'r', encoding='utf-8') as f:
content = await f.read()
return content
except Exception as e:
print(f"异步读取失败:{e}")
return ""
# ------------------------- 使用示例 -------------------------
if __name__ == "__main__":
# 1. 错误处理示例
with safe_file_operation('data/test.txt', 'w') as f:
if f:
f.write("测试内容")
print("文件写入成功")
# 2. 路径处理示例
safe_name = get_safe_filename("报告:2025/季度.pdf")
print(f"安全文件名:{safe_name}")
if ensure_directory("output/report/2025Q2"):
print("目录结构创建成功")
# 3. 异步操作示例(需Python 3.7+)
async def demo_async():
content = await async_read_file('data/large.txt')
if content:
print(f"异步读取完成,内容长度:{len(content)}")
asyncio.run(demo_async())
1.7.8.2 运维实践
分析日志,待补充
评论区