Python 数据管道构建:ETL 与数据处理实战
Python 数据管道构建ETL 与数据处理实战1. 背景与动机数据管道是数据工程的核心组件负责数据的提取、转换和加载。本文介绍如何使用 Python 构建高效的数据管道。2. ETL 基础2.1 提取Extractimport pandas as pd from sqlalchemy import create_engine # 从数据库提取 engine create_engine(postgresql://user:passlocalhost/db) df pd.read_sql(SELECT * FROM sales, engine) # 从 API 提取 import requests response requests.get(https://api.example.com/data) data response.json()2.2 转换Transform# 数据清洗 df df.dropna() df df.drop_duplicates() # 数据转换 df[date] pd.to_datetime(df[date]) df[amount] df[quantity] * df[price] # 特征工程 df[month] df[date].dt.month df[year] df[date].dt.year2.3 加载Load# 加载到数据仓库 df.to_sql(sales_transformed, engine, if_existsappend, indexFalse) # 加载到文件 df.to_parquet(sales.parquet, compressionsnappy)3. 使用 Apache Airflowfrom airflow import DAG from airflow.operators.python import PythonOperator from datetime import datetime, timedelta def extract_task(): # 提取逻辑 pass def transform_task(): # 转换逻辑 pass def load_task(): # 加载逻辑 pass with DAG( etl_pipeline, start_datedatetime(2024, 1, 1), schedule_intervaltimedelta(hours1), catchupFalse ) as dag: extract PythonOperator(task_idextract, python_callableextract_task) transform PythonOperator(task_idtransform, python_callabletransform_task) load PythonOperator(task_idload, python_callableload_task) extract transform load4. 流式处理from kafka import KafkaConsumer, KafkaProducer import json consumer KafkaConsumer( raw-data, bootstrap_servers[localhost:9092], value_deserializerlambda x: json.loads(x.decode(utf-8)) ) producer KafkaProducer( bootstrap_servers[localhost:9092], value_serializerlambda x: json.dumps(x).encode(utf-8) ) for message in consumer: data message.value # 实时处理 processed process_data(data) producer.send(processed-data, processed)5. 结论数据管道的设计需要考虑数据源多样性、处理延迟和数据质量。通过合理选择工具和设计模式可以构建可扩展、可维护的数据基础设施。