侧边栏壁纸
博主头像
路小飞博主等级

行动起来,活在当下

  • 累计撰写 72 篇文章
  • 累计创建 12 个标签
  • 累计收到 0 条评论

目 录CONTENT

文章目录

1.7 Python 文件处理

路小飞
2025-06-25 / 0 评论 / 0 点赞 / 5 阅读 / 21587 字

在数据驱动的时代,文件处理是编程中不可或缺的环节。Python 凭借简洁高效的语法和丰富的标准库,成为文件处理的首选语言之一。

1.7.1 文件打开模式

在 Python 中,使用内置函数open()打开文件,其基本语法为:

f = open("filename", "mode")

有四种打开模式:

模式描述
"r"读取 - 默认值。打开文件进行读取,如果文件不存在则报错
"a"追加 - 打开供追加的文件,如果不存在则创建该文件
"w"写入 - 打开文件进行夫覆盖性写入,如果文件不存在则创建该文件
"x"创建 - 创建指定的文件,如果文件存在则返回错误

此外,您可以指定文件是应该作为二进制还是文本模式进行处理:

模式描述
"t"文本 - 默认值。文本模式
"b"二进制 - 二进制模式(例如图像)

示例:

# 默认模式(读取文本)
f = open("demofile.txt")

# 等同于
f = open("demofile.txt", "rt")

# 二进制模式
f = open("image.jpg", "rb")

1.7.2 文件的基本CRUD

1.7.2.1 文件读取

1. 读取整个文件

使用 read() 方法读取整个文件内容:

f = open("demofile.txt", "r")
print(f.read())
f.close()

Python 的面向对像编程

这里的 f 是一个文件对象file object),由内置函数 open() 创建。read()close() 是文件对象的方法,分别用于读取文件内容和关闭文件连接。

在 Python 中,一切皆对象。每个对象都有自己的属性(数据)和方法(函数),通过 . 操作符可以访问这些属性和方法:

  • 属性:对象的状态或数据(例如 file.name 表示文件名)。
  • 方法:对象的行为或功能(例如 file.read() 用于读取文件)。

这种设计模式称为封装(Encapsulation),它将数据和操作数据的函数捆绑在一起,提高了代码的安全性和可维护性。

2. 读取部分内容

可以指定要返回的字符数:

f = open("demofile.txt", "r")
print(f.read(5))  # 读取前5个字符
f.close()
3. 读取单行

使用 readline() 方法读取一行:

f = open("demofile.txt", "r")
print(f.readline())  # 读取第一行
f.close()
4. 逐行读取

通过循环遍历文件中的行:

f = open("demofile.txt", "r")
for x in f:
    print(x)
f.close()
5. 读取所有行

使用 readlines() 方法读取所有行到列表中:

f = open("demofile.txt", "r")
lines = f.readlines()
print(lines)
f.close()

1.7.2.3 文件写入

1. 写入文件

使用 write() 方法写入内容:

f = open("demofile2.txt", "w")
f.write("Hello! 1111")
f.close()

f = open("demofile2.txt", "r")
print(f.read())
f.close()
2. 追加内容

使用 "a" 模式追加内容到文件末尾:

f = open("demofile2.txt", "a")
f.write("Now the file has more content!")
f.close()

1.7.2.4 文件创建

使用 "x" 模式创建新文件:

f = open("myfile.txt", "x")
f.close()

注意: 如果文件已存在,将返回错误。

1.7.2.5 文件删除

1. 删除文件

要删除文件,必须导入OS模块,并运行其 os.remove() 函数:

import os
os.remove("demofile.txt")
2. 检查文件是否存在

在删除文件之前检查文件是否存在:

import os
if os.path.exists("demofile.txt"):
    os.remove("demofile.txt")
else:
    print("The file does not exist")
3. 删除文件夹

使用 os.rmdir() 方法删除文件夹:

import os
os.rmdir("myfolder")

注意: 只能删除空文件夹。

1.7.3 文件处理进阶

1.7.3.1 使用 with 语句

手动调用close()方法容易遗漏,可能导致资源泄漏或数据丢失。with语句通过上下文管理器自动管理文件生命周期:

# 读取文件
with open("demofile.txt", "r") as f:
    print(f.read())

# 写入文件
with open("demofile2.txt", "w") as f:
    f.write("Hello World!")

1.7.3.2 异常处理

文件操作可能遇到多种异常,如文件不存在、权限不足等。使用try-except语句捕获异常:

try:
    with open("demofile.txt", "r") as f:
        print(f.read())
except FileNotFoundError:
    print("文件不存在")
except PermissionError:
    print("没有权限访问文件")

1.7.3.3 文件编码

指定文件编码(特别是处理中文时):

# UTF-8编码
with open("demofile.txt", "r", encoding="utf-8") as f:
    print(f.read())

# 写入时指定编码
with open("demofile.txt", "w", encoding="utf-8") as f:
    f.write("你好,世界!")

1.7.3.4 标准库的使用

如 CSV、JSON,具体使用参考下文。

1.7.4 实际应用示例

1. 配置文件读写
# 读取配置文件
def read_config(filename):
    config = {}
    try:
        with open(filename, "r", encoding="utf-8") as f:
            for line in f:
                if "=" in line:
                    key, value = line.strip().split("=", 1)
                    config[key] = value
    except FileNotFoundError:
        print(f"配置文件 {filename} 不存在")
    return config

# 写入配置文件
def write_config(filename, config):
    with open(filename, "w", encoding="utf-8") as f:
        for key, value in config.items():
            f.write(f"{key}={value}\n")
2. 日志文件操作
import datetime

def write_log(message):
    timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    with open("app.log", "a", encoding="utf-8") as f:
        f.write(f"[{timestamp}] {message}\n")

# 使用示例
write_log("应用程序启动")
write_log("用户登录成功")
3. 数据备份
import shutil
import os

def backup_file(source, backup_dir):
    if not os.path.exists(backup_dir):
        os.makedirs(backup_dir)
    
    filename = os.path.basename(source)
    backup_path = os.path.join(backup_dir, f"backup_{filename}")
    
    shutil.copy2(source, backup_path)
    print(f"文件已备份到: {backup_path}")

1.7.5 文件处理高级-CSV

CSV(逗号分隔值)是常见的数据存储格式,广泛用于数据分析和数据交换。Python 的 csv 模块提供了高效且安全的处理方式。

1.7.5.1 基础 CSV 操作

import datetime
import csv

def write_log(message):
    timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    with open("app.log", "a", encoding="utf-8") as f:
        f.write(f"[{timestamp}] {message}\n")

# 使用示例
write_log("应用程序启动")
write_log("用户登录成功")

# 写入CSV文件
with open('data.csv', 'w', newline='', encoding='utf-8') as csvfile:
    writer = csv.writer(csvfile)
    # 写入表头
    writer.writerow(['姓名', '年龄', '城市'])
    # 写入数据行
    writer.writerow(['Alice', 25, '北京'])
    writer.writerow(['Bob', 30, '上海'])
    writer.writerow(['Charlie', 35, '广州'])

# 读取CSV文件
with open('data.csv', 'r', encoding='utf-8') as csvfile:
    reader = csv.reader(csvfile)
    for row in reader:
        print(row)

1.7.5.2 使用字典读写 CSV

import csv

# 使用字典读写CSV(更推荐的方式)
data = [
    {'姓名': 'Alice', '年龄': 25, '城市': '北京'},
    {'姓名': 'Bob', '年龄': 30, '城市': '上海'},
    {'姓名': 'Charlie', '年龄': 35, '城市': '广州'}
]

# 使用字典写入CSV
with open('data_dict.csv', 'w', newline='', encoding='utf-8') as csvfile:
    fieldnames = ['姓名', '年龄', '城市']
    writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
    
    writer.writeheader()  # 写入表头
    writer.writerows(data)  # 写入所有数据

# 使用字典读取CSV
with open('data_dict.csv', 'r', encoding='utf-8') as csvfile:
    reader = csv.DictReader(csvfile)
    for row in reader:
        print(f"{row['姓名']} - {row['年龄']}岁 - {row['城市']}")

1.7.5.3 CSV 处理最佳实践

import csv
import os

def safe_csv_operation(filename, operation):
    """安全的CSV操作,包含错误处理
    
    Args:
        filename (str): 要操作的CSV文件路径
        operation (function): 处理csvfile的函数
        
    Returns:
        Any: operation函数的返回值
    """
    try:
        # 检查并创建目录
        file_dir = os.path.dirname(filename)
        if file_dir and not os.path.exists(file_dir):
            os.makedirs(file_dir)
            
        with open(filename, 'w', newline='', encoding='utf-8') as csvfile:
            return operation(csvfile)
            
    except PermissionError:
        print(f"权限错误:无法访问文件 {filename}")
    except Exception as e:
        print(f"操作失败:{e}")

# 使用示例
def write_csv_data(csvfile):
    writer = csv.writer(csvfile)
    writer.writerow(['ID', 'Name', 'Value'])
    writer.writerow([1, 'Item1', 100])
    writer.writerow([2, 'Item2', 200])

if __name__ == "__main__":
    safe_csv_operation('output/data.csv', write_csv_data)

1.7.6 文件处理高级-JSON

JSON(JavaScript Object Notation)是现代应用程序中最常用的数据交换格式。Python 的 json 模块提供了完整的 JSON 序列化和反序列化功能。

1.7.6.1 基础 JSON 操作

import json

# 写入JSON文件
data = {
    'name': 'Bob',
    'age': 30,
    'city': '上海',
    'skills': ['Python', 'Docker', 'Kubernetes'],
    'active': True
}

with open('config.json', 'w', encoding='utf-8') as f:
    json.dump(data, f, ensure_ascii=False, indent=2)

# 读取JSON文件
with open('config.json', 'r', encoding='utf-8') as f:
    loaded_data = json.load(f)
    print(loaded_data)

1.7.6.2 高级 JSON 处理

import json
from datetime import datetime

class DateTimeEncoder(json.JSONEncoder):
    """自定义JSON编码器,处理datetime对象"""
    def default(self, obj):
        if isinstance(obj, datetime):
            return obj.isoformat()
        return super().default(obj)

# 包含datetime的复杂数据结构
complex_data = {
    'timestamp': datetime.now(),
    'user': {
        'id': 12345,
        'name': '张三',
        'email': 'zhangsan@example.com'
    },
    'settings': {
        'theme': 'dark',
        'language': 'zh-CN',
        'notifications': True
    }
}

# 使用自定义编码器写入JSON
with open('complex_config.json', 'w', encoding='utf-8') as f:
    json.dump(complex_data, f, ensure_ascii=False, indent=2, cls=DateTimeEncoder)

# 读取并验证JSON
def load_json_safely(filename):
    """安全地加载JSON文件,包含验证
    
    Args:
        filename (str): JSON文件路径
        
    Returns:
        dict: 解析后的JSON数据或None
    """
    try:
        with open(filename, 'r', encoding='utf-8') as f:
            data = json.load(f)
            print("JSON文件加载成功")
            return data
    except json.JSONDecodeError as e:
        print(f"JSON格式错误:{e}")
        return None
    except FileNotFoundError:
        print(f"文件不存在:{filename}")
        return None
    except Exception as e:
        print(f"读取失败:{e}")
        return None

loaded_config = load_json_safely('complex_config.json')

1.7.6.3 JSON 配置文件管理

import json
import os
from typing import Dict, Any

class ConfigManager:
    """配置文件管理器"""
    
    def __init__(self, config_file: str):
        self.config_file = config_file
        self.config = {}
        self.load_config()
    
    def load_config(self):
        """加载配置文件"""
        if os.path.exists(self.config_file):
            try:
                with open(self.config_file, 'r', encoding='utf-8') as f:
                    self.config = json.load(f)
            except Exception as e:
                print(f"加载配置文件失败:{e}")
                self.config = {}
        else:
            self.config = {}
    
    def save_config(self):
        """保存配置文件"""
        try:
            with open(self.config_file, 'w', encoding='utf-8') as f:
                json.dump(self.config, f, ensure_ascii=False, indent=2)
            print("配置文件保存成功")
        except Exception as e:
            print(f"保存配置文件失败:{e}")
    
    def get(self, key: str, default: Any = None) -> Any:
        """获取配置值
        
        Args:
            key: 配置键名
            default: 缺省值(默认None)
            
        Returns:
            配置值或默认值
        """
        return self.config.get(key, default)
    
    def set(self, key: str, value: Any) -> None:
        """设置配置值
        
        Args:
            key: 配置键名
            value: 配置值
        """
        self.config[key] = value

# 使用示例
config = ConfigManager('app_config.json')
config.set('database', {
    'host': 'localhost',
    'port': 5432,
    'name': 'myapp'
})
config.set('logging', {
    'level': 'INFO',
    'file': 'app.log'
})
config.save_config()

1.7.7 文件处理高级-二进制

二进制文件包括图片、音频、视频、可执行文件等。处理这类文件需要使用二进制模式,并注意内存使用和性能优化。

1.7.7.1 基础二进制文件操作

def copy_binary_file(source_path: str, target_path: str, chunk_size: int = 8192) -> bool:
    """复制二进制文件,使用分块读取避免内存溢出
    
    Args:
        source_path: 源文件路径
        target_path: 目标文件路径
        chunk_size: 每次读取的块大小(默认8KB)
        
    Returns:
        复制成功返回True,失败返回False
    """
    try:
        with open(source_path, 'rb') as source:
            with open(target_path, 'wb') as target:
                while True:
                    chunk = source.read(chunk_size)
                    if not chunk:
                        break
                    target.write(chunk)
        print(f"文件复制成功:{source_path} -> {target_path}")
        return True
    except FileNotFoundError:
        print(f"源文件不存在:{source_path}")
        return False
    except PermissionError:
        print(f"权限不足,无法访问文件")
        return False
    except Exception as e:
        print(f"复制失败:{e}")
        return False

# 使用示例
copy_binary_file('source.jpg', 'copy.jpg')

1.7.7.2 二进制文件校验

import hashlib

def calculate_file_hash(file_path: str, algorithm: str = 'md5') -> str:
    """计算文件的哈希值,用于文件完整性验证
    
    Args:
        file_path: 文件路径
        algorithm: 哈希算法(默认md5)
        
    Returns:
        文件的哈希值字符串,失败时返回空字符串
    """
    hash_func = getattr(hashlib, algorithm)()
    
    try:
        with open(file_path, 'rb') as f:
            for chunk in iter(lambda: f.read(4096), b""):
                hash_func.update(chunk)
        return hash_func.hexdigest()
    except Exception as e:
        print(f"计算哈希值失败:{e}")
        return ""

def verify_file_integrity(file_path: str, expected_hash: str, algorithm: str = 'md5') -> bool:
    """验证文件完整性
    
    Args:
        file_path: 文件路径
        expected_hash: 期望的哈希值
        algorithm: 哈希算法(默认md5)
        
    Returns:
        验证通过返回True,失败返回False
    """
    actual_hash = calculate_file_hash(file_path, algorithm)
    if actual_hash == expected_hash:
        print("文件完整性验证通过")
        return True
    else:
        print(f"文件完整性验证失败:期望 {expected_hash},实际 {actual_hash}")
        return False

# 使用示例
file_hash = calculate_file_hash('source.jpg')
print(f"文件哈希值:{file_hash}")
verify_file_integrity('copy.jpg', file_hash)

1.7.7.3 大文件处理优化

import os
from typing import Generator

def read_large_file(file_path: str, chunk_size: int = 1024*1024) -> Generator[bytes, None, None]:
    """生成器函数,用于逐块读取大文件
    
    Args:
        file_path: 文件路径
        chunk_size: 块大小(默认1MB)
        
    Yields:
        文件块的字节数据
    """
    try:
        with open(file_path, 'rb') as f:
            while True:
                chunk = f.read(chunk_size)
                if not chunk:
                    break
                yield chunk
    except Exception as e:
        print(f"读取文件失败:{e}")

def process_large_file(file_path: str, output_path: str) -> None:
    """处理大文件的示例(这里以简单复制为例)
    
    Args:
        file_path: 输入文件路径
        output_path: 输出文件路径
    """
    try:
        with open(output_path, 'wb') as output:
            for chunk in read_large_file(file_path):
                # 在这里可以对chunk进行处理
                # 例如:数据转换、过滤、压缩等
                output.write(chunk)
        print(f"大文件处理完成:{file_path} -> {output_path}")
    except Exception as e:
        print(f"处理失败:{e}")

def get_file_info(file_path: str) -> dict:
    """获取文件的基本信息
    
    Args:
        file_path: 文件路径
        
    Returns:
        包含文件信息的字典
    """
    try:
        stat = os.stat(file_path)
        return {
            'size': stat.st_size,
            'size_mb': round(stat.st_size / (1024*1024), 2),
            'modified': stat.st_mtime,
            'created': stat.st_ctime
        }
    except Exception as e:
        print(f"获取文件信息失败:{e}")
        return {}

# 使用示例
file_info = get_file_info('large_file.dat')
print(f"文件大小:{file_info.get('size_mb', 0)} MB")

1.7.8 总结与运维实践

1.7.8.1 总结

  1. 使用 open() 函数打开文件,指定正确的模式
  2. 使用with语句:自动管理文件关闭,避免资源泄漏。
  3. 指定编码,特别是处理中文内容时
  4. 处理异常:对可能出现的错误进行捕获,提高程序稳定性。
  5. 选择合适的模式:读取用 "r",写入用 "w",追加用 "a"
  6. 分块读取大文件:避免一次性加载过多数据,降低内存占用。
  7. 使用 os 模块进行文件和目录的删除操作
  8. 善用标准库:针对 CSV、JSON 等特定格式,使用对应模块提高效率。
  9. 使用适当的缓冲区大小:对于大文件,使用合适的chunk_size
  10. 避免重复打开文件:使用上下文管理器确保资源正确释放
  11. 使用生成器处理大文件:避免将整个文件加载到内存
  12. 选择合适的编码:明确指定文件编码,避免编码问题
  13. 使用pathlib:现代Python推荐使用pathlib进行路径操作
  14. 异步处理:对于I/O密集型操作,考虑使用异步编程

示例:

import contextlib
import os
import asyncio
from pathlib import Path
from typing import Generator

# --------------------- 错误处理和资源管理 ---------------------
@contextlib.contextmanager
def safe_file_operation(file_path: str, mode: str = 'r', encoding: str = 'utf-8'):
    """安全的文件操作上下文管理器(自动处理异常和关闭)
    
    Args:
        file_path: 文件路径
        mode: 打开模式(默认'r')
        encoding: 文本编码(二进制模式时忽略)
        
    Yields:
        文件对象或None(操作失败时)
    """
    file_obj = None
    try:
        if 'b' in mode:
            file_obj = open(file_path, mode)
        else:
            file_obj = open(file_path, mode, encoding=encoding)
        yield file_obj
    except FileNotFoundError:
        print(f"文件不存在:{file_path}")
        yield None
    except PermissionError:
        print(f"权限不足:{file_path}")
        yield None
    except Exception as e:
        print(f"文件操作失败:{e}")
        yield None
    finally:
        if file_obj:
            file_obj.close()

# ------------------------- 路径处理 -------------------------
def ensure_directory(file_path: str) -> bool:
    """确保文件所在目录存在(递归创建)
    
    Args:
        file_path: 目标文件路径
        
    Returns:
        目录创建成功返回True,失败返回False
    """
    try:
        Path(file_path).parent.mkdir(parents=True, exist_ok=True)
        return True
    except Exception as e:
        print(f"创建目录失败:{e}")
        return False

def get_safe_filename(filename: str) -> str:
    """生成安全文件名(移除Windows非法字符)
    
    Args:
        filename: 原始文件名
        
    Returns:
        替换非法字符后的安全文件名
    """
    unsafe_chars = '<>:"/\\|?*'
    for char in unsafe_chars:
        filename = filename.replace(char, '_')
    return filename

def backup_file(file_path: str, backup_suffix: str = '.bak') -> str:
    """创建文件备份(需提前定义copy_binary_file函数)
    
    Args:
        file_path: 源文件路径
        backup_suffix: 备份后缀(默认'.bak')
        
    Returns:
        备份文件路径,失败时返回空字符串
    """
    try:
        backup_path = file_path + backup_suffix
        if os.path.exists(file_path):
            # 示例:需提前实现二进制复制函数
            # copy_binary_file(file_path, backup_path)
            print(f"已备份至:{backup_path}")
            return backup_path
    except Exception as e:
        print(f"备份失败:{e}")
    return ""

# ------------------------- 异步优化 -------------------------
async def async_read_file(file_path: str) -> str:
    """异步读取文件(适用于高I/O场景)
    
    Args:
        file_path: 文件路径
        
    Returns:
        文件内容,失败时返回空字符串
    """
    try:
        async with aiofiles.open(file_path, 'r', encoding='utf-8') as f:
            content = await f.read()
        return content
    except Exception as e:
        print(f"异步读取失败:{e}")
        return ""

# ------------------------- 使用示例 -------------------------
if __name__ == "__main__":
    # 1. 错误处理示例
    with safe_file_operation('data/test.txt', 'w') as f:
        if f:
            f.write("测试内容")
            print("文件写入成功")
    
    # 2. 路径处理示例
    safe_name = get_safe_filename("报告:2025/季度.pdf")
    print(f"安全文件名:{safe_name}")
    
    if ensure_directory("output/report/2025Q2"):
        print("目录结构创建成功")
    
    # 3. 异步操作示例(需Python 3.7+)
    async def demo_async():
        content = await async_read_file('data/large.txt')
        if content:
            print(f"异步读取完成,内容长度:{len(content)}")
    
    asyncio.run(demo_async())

1.7.8.2 运维实践

分析日志,待补充

0

评论区