Python实战信贷资产质量分析三大核心指标可视化引言当数据科学遇上信贷风控在金融科技领域数据驱动的风控决策已成为行业标配。我曾参与过一个消费金融项目当团队第一次看到用Python自动生成的Vintage曲线时原本需要人工整理两周的数据报告现在只需30分钟就能动态更新。这种效率跃迁让我深刻认识到掌握数据分析工具的风控师就像拥有了透视业务本质的X光机。本文将带您用Python实战信贷资产质量分析的三大核心指标Vintage分析、滚动率矩阵和迁移率计算。不同于理论讲解我们将从一个虚构但典型的信贷数据集出发使用Pandas进行数据加工Matplotlib/Seaborn实现可视化并分享实际项目中积累的代码优化技巧。适合已经了解基础概念但缺乏代码实现经验的数据分析师通过本教程您将获得从原始还款计划表到分析指标的完整处理流程避免常见数据陷阱的实战经验可直接复用的代码模板与可视化方案1. 数据准备与清洗1.1 构建模拟数据集我们首先用Python生成一个接近真实业务的模拟数据集。这个数据集包含10,000笔虚拟贷款时间跨度为24个月每笔贷款有12期还款计划import pandas as pd import numpy as np from datetime import datetime, timedelta # 生成基础贷款数据 np.random.seed(42) loan_ids [fL{str(i).zfill(5)} for i in range(10000)] products [Product_A]*7000 [Product_B]*3000 loan_dates pd.date_range(start2020-01-01, end2021-12-31, periods10000) loan_amounts np.round(np.random.lognormal(7, 0.3, 10000), 2) loans pd.DataFrame({ loan_id: loan_ids, product: products, disbursal_date: loan_dates, loan_amount: loan_amounts }) # 生成还款计划表 repayment_records [] for _, loan in loans.iterrows(): for term in range(1, 13): due_date loan[disbursal_date] pd.DateOffset(monthsterm) actual_payment_date due_date pd.Timedelta( daysnp.random.choice( [0, 1, 2, 3, 4, 5, 10, 30, 60, 90, 120], p[0.85, 0.05, 0.03, 0.02, 0.01, 0.01, 0.01, 0.005, 0.005, 0.005, 0.005] ) ) repayment_records.append({ loan_id: loan[loan_id], term: term, due_date: due_date, actual_payment_date: actual_payment_date }) repayments pd.DataFrame(repayment_records)1.2 关键字段计算原始数据需要加工出分析所需的衍生字段# 计算逾期天数 repayments[days_past_due] ( repayments[actual_payment_date] - repayments[due_date] ).dt.days repayments[days_past_due] repayments[days_past_due].clip(lower0) # 定义逾期状态 def get_delq_stage(days): if days 0: return M0 elif 1 days 30: return M1 elif 31 days 60: return M2 elif 61 days 90: return M3 else: return M4 repayments[delq_stage] repayments[days_past_due].apply(get_delq_stage) # 计算MOB账龄 repayments[mob] ( (repayments[due_date].dt.to_period(M) - repayments[loan_id].map(loans.set_index(loan_id)[disbursal_date].dt.to_period(M))) .apply(lambda x: x.n) )提示实际项目中建议将字段计算过程封装成函数方便后续数据更新时复用2. Vintage分析实战2.1 计算Vintage数据Vintage分析的核心是按放款月份和账龄对齐计算逾期率def calculate_vintage(repayments, loans, delq_stageM4): # 合并贷款信息 df repayments.merge(loans, onloan_id) # 计算各MOB的逾期状态 vintage_data df.groupby([ df[disbursal_date].dt.to_period(M).rename(disbursal_month), mob ]).apply(lambda x: pd.Series({ total_loans: x[loan_id].nunique(), delq_loans: x[x[delq_stage] delq_stage][loan_id].nunique() })).reset_index() vintage_data[delq_rate] vintage_data[delq_loans] / vintage_data[total_loans] return vintage_data vintage_m4 calculate_vintage(repayments, loans, M4)2.2 可视化Vintage曲线使用Seaborn绘制专业级Vintage曲线import seaborn as sns import matplotlib.pyplot as plt plt.figure(figsize(12, 8)) sns.lineplot( datavintage_m4, xmob, ydelq_rate, huedisbursal_month, paletteviridis, linewidth2.5 ) plt.title(Vintage Analysis - M4 Delinquency Rate, fontsize14) plt.xlabel(MOB, fontsize12) plt.ylabel(Delinquency Rate, fontsize12) plt.grid(True, alpha0.3) plt.legend(titleDisbursal Month, bbox_to_anchor(1.05, 1), locupper left) plt.tight_layout()图各月份放款资产的M4逾期率随账龄变化趋势2.3 业务解读技巧从Vintage曲线中可以提取的关键业务洞察成熟期判断当曲线趋于平缓时的MOB即为账户成熟期资产质量对比不同月份放款的曲线最终位置高低反映资产质量差异策略效果评估曲线拐点变化可能对应风控策略调整时间# 自动识别成熟期 def identify_maturity(vintage_data, threshold0.005): last_mob vintage_data[mob].max() maturity {} for month in vintage_data[disbursal_month].unique(): month_data vintage_data[vintage_data[disbursal_month] month] for mob in range(1, last_mob): if abs(month_data[month_data[mob] mob][delq_rate].values[0] - month_data[month_data[mob] mob1][delq_rate].values[0]) threshold: maturity[str(month)] mob break return maturity3. 滚动率矩阵构建3.1 滚动率计算逻辑滚动率分析需要选择观察点比较前后两个时间窗的最坏逾期状态def calculate_roll_rates(repayments, observation_date, lookback6, lookforward6): # 确定观察点前后的时间窗口 lookback_start observation_date - pd.DateOffset(monthslookback) lookforward_end observation_date pd.DateOffset(monthslookforward) # 获取观察期内的最坏状态 lookback_status repayments[ (repayments[due_date] lookback_start) (repayments[due_date] observation_date) ].groupby(loan_id)[delq_stage].max().rename(lookback_status) # 获取表现期内的最坏状态 lookforward_status repayments[ (repayments[due_date] observation_date) (repayments[due_date] lookforward_end) ].groupby(loan_id)[delq_stage].max().rename(lookforward_status) # 合并数据 roll_data pd.concat([lookback_status, lookforward_status], axis1) return roll_data.pivot_table( indexlookback_status, columnslookforward_status, aggfuncsize, fill_value0 )3.2 可视化与解读将滚动率矩阵转化为百分比并可视化# 计算百分比滚动率矩阵 observation_date pd.to_datetime(2021-06-30) roll_matrix calculate_roll_rates(repayments, observation_date) roll_matrix_pct roll_matrix.div(roll_matrix.sum(axis1), axis0) # 绘制热力图 plt.figure(figsize(10, 8)) sns.heatmap( roll_matrix_pct, annotTrue, fmt.1%, cmapYlOrRd, linewidths0.5, cbar_kws{label: Transition Probability} ) plt.title(Roll Rate Matrix (6-month observation window), fontsize14) plt.xlabel(Forward Status, fontsize12) plt.ylabel(Backward Status, fontsize12)滚动率矩阵的业务应用场景包括坏客户定义M4的回滚率低于5%时可定义为坏客户催收优先级根据恶化概率分配催收资源风险预警M2→M3的高转化率可能预示整体风险上升4. 迁移率分析实现4.1 迁移率计算迁移率反映相邻逾期状态间的转化情况def calculate_flow_rates(repayments, vintage_month): # 筛选指定vintage的数据 vintage_loans loans[ loans[disbursal_date].dt.to_period(M) vintage_month ][loan_id] vintage_repayments repayments[ repayments[loan_id].isin(vintage_loans) ].sort_values([loan_id, term]) # 计算状态迁移 flow_rates [] stages [M0, M1, M2, M3, M4] for i in range(len(stages)-1): from_stage stages[i] to_stage stages[i1] for term in range(1, 13): prev_term vintage_repayments[ (vintage_repayments[term] term) (vintage_repayments[delq_stage] from_stage) ] next_term vintage_repayments[ (vintage_repayments[term] term1) (vintage_repayments[loan_id].isin(prev_term[loan_id])) (vintage_repayments[delq_stage] to_stage) ] if len(prev_term) 0: flow_rate len(next_term) / len(prev_term) flow_rates.append({ from_stage: from_stage, to_stage: to_stage, term: term, flow_rate: flow_rate }) return pd.DataFrame(flow_rates)4.2 迁移率可视化用折线图展示迁移率随时间变化趋势flow_rates calculate_flow_rates(repayments, pd.Period(2021-01)) plt.figure(figsize(12, 6)) for transition in [M0-M1, M1-M2, M2-M3, M3-M4]: subset flow_rates[ (flow_rates[from_stage] transition.split(-)[0]) (flow_rates[to_stage] transition.split(-)[1]) ] sns.lineplot( datasubset, xterm, yflow_rate, labeltransition, markero ) plt.title(Flow Rate Analysis by Loan Term, fontsize14) plt.xlabel(Loan Term, fontsize12) plt.ylabel(Flow Rate, fontsize12) plt.grid(True, alpha0.3) plt.legend()4.3 迁移率的业务应用迁移率的核心价值在于损失预测结合各阶段迁移率计算最终坏账概率催收效果评估对比不同时期/策略下的迁移率变化准备金计算通过迁移路径计算不同逾期状态的损失率# 计算毛坏账率 flow_rate_avg flow_rates.groupby([from_stage, to_stage])[flow_rate].mean() gross_loss_rate (flow_rate_avg[M0, M1] * flow_rate_avg[M1, M2] * flow_rate_avg[M2, M3] * flow_rate_avg[M3, M4])5. 分析报告自动化5.1 使用Jupyter Notebook创建交互式报告将上述分析整合到Jupyter Notebook中并添加参数控件from ipywidgets import interact, Dropdown # 创建交互式Vintage分析 def interactive_vintage(delq_stage): vintage_data calculate_vintage(repayments, loans, delq_stage) plt.figure(figsize(12, 8)) sns.lineplot( datavintage_data, xmob, ydelq_rate, huedisbursal_month, paletteviridis ) plt.show() interact( interactive_vintage, delq_stageDropdown(options[M1, M2, M3, M4], valueM4) )5.2 使用Plotly创建动态可视化提升报告的交互体验import plotly.express as px fig px.line( vintage_m4, xmob, ydelq_rate, colordisbursal_month, titleInteractive Vintage Analysis, labels{mob: MOB, delq_rate: Delinquency Rate}, hover_data[total_loans] ) fig.update_layout( hovermodex unified, legend_title_textDisbursal Month ) fig.show()5.3 自动生成PDF报告使用Jupyter Notebook转换功能创建可分享的报告jupyter nbconvert --to pdf --TemplateExporter.exclude_inputTrue analysis_report.ipynb6. 性能优化与大数据处理当数据量增大时需要优化计算性能6.1 使用Dask处理大数据import dask.dataframe as dd # 将Pandas DataFrame转换为Dask DataFrame dask_repayments dd.from_pandas(repayments, npartitions4) dask_loans dd.from_pandas(loans, npartitions2) # 并行计算Vintage分析 def dask_vintage(): return ( dask_repayments.merge(dask_loans, onloan_id) .groupby([disbursal_date, mob]) .apply(lambda x: pd.Series({ total_loans: x[loan_id].nunique(), delq_loans: x[x[delq_stage] M4][loan_id].nunique() }), meta{total_loans: int64, delq_loans: int64}) .compute() )6.2 使用Polars加速计算import polars as pl # 使用Polars处理数据 def polars_vintage(): pl_repayments pl.from_pandas(repayments) pl_loans pl.from_pandas(loans) return ( pl_repayments.join(pl_loans, onloan_id) .groupby([disbursal_date, mob]) .agg([ pl.col(loan_id).n_unique().alias(total_loans), pl.col(loan_id).filter(pl.col(delq_stage) M4).n_unique().alias(delq_loans) ]) .with_column((pl.col(delq_loans) / pl.col(total_loans)).alias(delq_rate)) .to_pandas() )7. 生产环境最佳实践7.1 分析流水线设计将完整分析流程封装成可复用的流水线from sklearn.pipeline import Pipeline from sklearn.base import BaseEstimator, TransformerMixin class VintageAnalyzer(BaseEstimator, TransformerMixin): def __init__(self, delq_stageM4): self.delq_stage delq_stage def fit(self, X, yNone): return self def transform(self, X): repayments, loans X return calculate_vintage(repayments, loans, self.delq_stage) # 创建分析流水线 analysis_pipeline Pipeline([ (vintage, VintageAnalyzer()), (roll_rate, RollRateAnalyzer()), (flow_rate, FlowRateAnalyzer()) ])7.2 自动化监控系统设置自动监控异常值的预警机制def monitor_vintage(vintage_data, threshold0.3): latest_month vintage_data[disbursal_month].max() latest_data vintage_data[vintage_data[disbursal_month] latest_month] alert False for mob in range(1, 4): mob_data latest_data[latest_data[mob] mob] if len(mob_data) 0 and mob_data[delq_rate].iloc[0] threshold: print(fAlert! High delinquency at MOB {mob}: {mob_data[delq_rate].iloc[0]:.1%}) alert True if not alert: print(Vintage metrics within normal range)7.3 数据质量检查在分析前执行数据质量验证def data_quality_check(repayments, loans): checks { Missing loan_ids in repayments: len(set(loans[loan_id]) - set(repayments[loan_id])), Duplicate repayment records: repayments.duplicated(subset[loan_id, term]).sum(), Invalid payment dates: (repayments[actual_payment_date] repayments[due_date]).sum(), MOB out of range: (repayments[mob] 0).sum() } for check, count in checks.items(): if count 0: print(f {check}: {count} cases) else: print(f✓ {check}: OK)8. 扩展应用与高级分析8.1 结合机器学习预测使用历史分析结果构建预测模型from sklearn.ensemble import RandomForestClassifier from sklearn.model_selection import train_test_split # 准备特征数据 def prepare_features(repayments, loans): # 计算每个贷款的历史表现特征 features [] for loan_id, loan_data in repayments.groupby(loan_id): features.append({ loan_id: loan_id, max_dpd: loan_data[days_past_due].max(), avg_dpd: loan_data[days_past_due].mean(), m1_count: (loan_data[delq_stage] M1).sum(), early_delq: (loan_data[loan_data[mob] 3][delq_stage] ! M0).any() }) features_df pd.DataFrame(features) return features_df.merge( loans[[loan_id, loan_amount, product]], onloan_id ) # 训练预测模型 features prepare_features(repayments, loans) X pd.get_dummies(features.drop(columns[loan_id])) y (features[max_dpd] 90).astype(int) # 预测是否会成为M4 X_train, X_test, y_train, y_test train_test_split(X, y, test_size0.2) model RandomForestClassifier() model.fit(X_train, y_train)8.2 多维度交叉分析按产品、渠道等维度进行细分分析def segmented_vintage(repayments, loans, segment_byproduct): df repayments.merge(loans, onloan_id) return df.groupby([ disbursal_date, mob, segment_by ]).apply(lambda x: pd.Series({ total_loans: x[loan_id].nunique(), delq_loans: x[x[delq_stage] M4][loan_id].nunique(), delq_rate: x[x[delq_stage] M4][loan_id].nunique() / x[loan_id].nunique() })).reset_index() product_vintage segmented_vintage(repayments, loans, product)8.3 趋势分析与预测使用时间序列方法预测未来资产质量from statsmodels.tsa.arima.model import ARIMA # 准备时间序列数据 vintage_ts vintage_m4.groupby(disbursal_month)[delq_rate].last().reset_index() vintage_ts[disbursal_month] vintage_ts[disbursal_month].dt.to_timestamp() # 拟合ARIMA模型 model ARIMA(vintage_ts[delq_rate], order(1,1,1)) results model.fit() # 预测未来6个月 forecast results.get_forecast(steps6)