📈 数据科学
数据预处理完整指南
全面介绍数据科学项目中的数据预处理流程,包括数据清洗、特征工程、数据转换等关键步骤。
作者: AI-View团队
#数据科学
#数据预处理
#特征工程
#数据清洗
数据预处理完整指南
数据预处理是数据科学项目成功的关键步骤,高质量的数据是构建有效模型的基础。
数据探索与理解
数据基本信息检查
首先需要了解数据的基本结构:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
# 加载数据
df = pd.read_csv('dataset.csv')
# 基本信息
print("数据形状:", df.shape)
print("\n数据类型:")
print(df.dtypes)
print("\n缺失值统计:")
print(df.isnull().sum())
print("\n基本统计信息:")
print(df.describe())
数据分布分析
# 数值型变量分布
numeric_columns = df.select_dtypes(include=[np.number]).columns
for col in numeric_columns:
plt.figure(figsize=(12, 4))
plt.subplot(1, 2, 1)
plt.hist(df[col].dropna(), bins=30, alpha=0.7)
plt.title(f'{col} 分布')
plt.xlabel(col)
plt.ylabel('频次')
plt.subplot(1, 2, 2)
plt.boxplot(df[col].dropna())
plt.title(f'{col} 箱线图')
plt.ylabel(col)
plt.tight_layout()
plt.show()
缺失值处理
缺失值类型分析
缺失值主要分为三类:
- 完全随机缺失(MCAR):缺失与任何变量无关
- 随机缺失(MAR):缺失依赖于已观察变量
- 非随机缺失(MNAR):缺失依赖于未观察值本身
基本处理方法
# 删除缺失值
df_dropped = df.dropna() # 删除包含缺失值的行
df_dropped_cols = df.dropna(axis=1) # 删除包含缺失值的列
# 填充缺失值
# 数值型变量
df['numeric_col'].fillna(df['numeric_col'].mean(), inplace=True) # 均值填充
df['numeric_col'].fillna(df['numeric_col'].median(), inplace=True) # 中位数填充
# 分类变量
df['category_col'].fillna(df['category_col'].mode()[0], inplace=True) # 众数填充
# 前向/后向填充
df.fillna(method='ffill', inplace=True) # 前向填充
df.fillna(method='bfill', inplace=True) # 后向填充
# 插值填充
df['numeric_col'].interpolate(method='linear', inplace=True)
高级填充方法
from sklearn.impute import KNNImputer
from sklearn.experimental import enable_iterative_imputer
from sklearn.impute import IterativeImputer
# KNN填充
knn_imputer = KNNImputer(n_neighbors=5)
df_knn = pd.DataFrame(knn_imputer.fit_transform(df.select_dtypes(include=[np.number])),
columns=df.select_dtypes(include=[np.number]).columns)
# 迭代填充
iterative_imputer = IterativeImputer(random_state=42)
df_iterative = pd.DataFrame(iterative_imputer.fit_transform(df.select_dtypes(include=[np.number])),
columns=df.select_dtypes(include=[np.number]).columns)
异常值检测与处理
统计方法
# Z-score方法
from scipy import stats
def detect_outliers_zscore(data, threshold=3):
z_scores = np.abs(stats.zscore(data))
return z_scores > threshold
# IQR方法
def detect_outliers_iqr(data):
Q1 = data.quantile(0.25)
Q3 = data.quantile(0.75)
IQR = Q3 - Q1
lower_bound = Q1 - 1.5 * IQR
upper_bound = Q3 + 1.5 * IQR
return (data < lower_bound) | (data > upper_bound)
# 应用异常值检测
for col in numeric_columns:
outliers_z = detect_outliers_zscore(df[col])
outliers_iqr = detect_outliers_iqr(df[col])
print(f"{col} - Z-score异常值: {outliers_z.sum()}")
print(f"{col} - IQR异常值: {outliers_iqr.sum()}")
机器学习方法
from sklearn.ensemble import IsolationForest
from sklearn.svm import OneClassSVM
from sklearn.neighbors import LocalOutlierFactor
# 孤立森林
isolation_forest = IsolationForest(contamination=0.1, random_state=42)
outliers_if = isolation_forest.fit_predict(df.select_dtypes(include=[np.number]))
# 一类SVM
one_class_svm = OneClassSVM(nu=0.1)
outliers_svm = one_class_svm.fit_predict(df.select_dtypes(include=[np.number]))
# 局部异常因子
lof = LocalOutlierFactor(n_neighbors=20, contamination=0.1)
outliers_lof = lof.fit_predict(df.select_dtypes(include=[np.number]))
数据标准化
数值型数据缩放
from sklearn.preprocessing import StandardScaler, MinMaxScaler, RobustScaler
# 标准化
scaler = StandardScaler()
df_scaled = pd.DataFrame(scaler.fit_transform(df.select_dtypes(include=[np.number])),
columns=df.select_dtypes(include=[np.number]).columns)
# 归一化
min_max_scaler = MinMaxScaler()
df_normalized = pd.DataFrame(min_max_scaler.fit_transform(df.select_dtypes(include=[np.number])),
columns=df.select_dtypes(include=[np.number]).columns)
# 鲁棒缩放
robust_scaler = RobustScaler()
df_robust = pd.DataFrame(robust_scaler.fit_transform(df.select_dtypes(include=[np.number])),
columns=df.select_dtypes(include=[np.number]).columns)
# 可视化缩放效果
fig, axes = plt.subplots(2, 2, figsize=(15, 10))
df.select_dtypes(include=[np.number]).hist(ax=axes[0,0], bins=30)
axes[0,0].set_title('原始数据')
df_scaled.hist(ax=axes[0,1], bins=30)
axes[0,1].set_title('标准化数据')
df_normalized.hist(ax=axes[1,0], bins=30)
axes[1,0].set_title('归一化数据')
df_robust.hist(ax=axes[1,1], bins=30)
axes[1,1].set_title('鲁棒缩放数据')
plt.tight_layout()
plt.show()
分类变量编码
from sklearn.preprocessing import LabelEncoder, OneHotEncoder
import pandas as pd
# 标签编码
label_encoder = LabelEncoder()
df['category_encoded'] = label_encoder.fit_transform(df['category_column'])
# 独热编码
df_encoded = pd.get_dummies(df, columns=['category_column'], prefix='cat')
# 序数编码
ordinal_mapping = {'low': 1, 'medium': 2, 'high': 3}
df['ordinal_encoded'] = df['ordinal_column'].map(ordinal_mapping)
# 目标编码
def target_encoding(df, categorical_col, target_col):
target_mean = df.groupby(categorical_col)[target_col].mean()
return df[categorical_col].map(target_mean)
df['target_encoded'] = target_encoding(df, 'category_column', 'target')
特征工程
# 多项式特征
from sklearn.preprocessing import PolynomialFeatures
poly_features = PolynomialFeatures(degree=2, include_bias=False)
df_poly = pd.DataFrame(poly_features.fit_transform(df[['feature1', 'feature2']]),
columns=poly_features.get_feature_names_out(['feature1', 'feature2']))
# 交互特征
df['interaction'] = df['feature1'] * df['feature2']
# 时间特征
df['datetime'] = pd.to_datetime(df['date_column'])
df['year'] = df['datetime'].dt.year
df['month'] = df['datetime'].dt.month
df['day_of_week'] = df['datetime'].dt.dayofweek
df['is_weekend'] = df['day_of_week'].isin([5, 6]).astype(int)
特征选择
统计方法
from sklearn.feature_selection import SelectKBest, chi2, f_classif, mutual_info_classif
from sklearn.feature_selection import RFE
from sklearn.ensemble import RandomForestClassifier
# 卡方检验(分类问题)
chi2_selector = SelectKBest(chi2, k=10)
X_chi2 = chi2_selector.fit_transform(X, y)
# F检验(数值特征)
f_selector = SelectKBest(f_classif, k=10)
X_f = f_selector.fit_transform(X, y)
# 互信息
mi_selector = SelectKBest(mutual_info_classif, k=10)
X_mi = mi_selector.fit_transform(X, y)
基于模型的方法
from sklearn.feature_selection import SelectFromModel
# 基于随机森林的特征重要性
rf = RandomForestClassifier(n_estimators=100, random_state=42)
rf.fit(X, y)
# 特征重要性可视化
feature_importance = pd.DataFrame({
'feature': X.columns,
'importance': rf.feature_importances_
}).sort_values('importance', ascending=False)
plt.figure(figsize=(10, 8))
sns.barplot(data=feature_importance.head(20), x='importance', y='feature')
plt.title('特征重要性排名')
plt.show()
# 基于Lasso的特征选择
from sklearn.linear_model import LassoCV
lasso = LassoCV(cv=5, random_state=42)
lasso.fit(X, y)
# 选择非零系数的特征
selected_features = X.columns[lasso.coef_ != 0]
print(f"Lasso选择的特征数量: {len(selected_features)}")
数据验证
数据质量检查
# 数据类型检查
print("数据类型检查:")
print(df.dtypes)
# 重复数据检查
print(f"重复行数: {df.duplicated().sum()}")
# 数据范围检查
print("数值范围检查:")
for col in df.select_dtypes(include=[np.number]).columns:
print(f"{col}: [{df[col].min()}, {df[col].max()}]")
# 分类变量唯一值
print("分类变量唯一值")
for col in df.select_dtypes(include=['object']).columns:
print(f"{col}: {df[col].nunique()} 个唯一值")
数据预处理管道
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.impute import SimpleImputer
# 定义数值型和分类型特征
numeric_features = df.select_dtypes(include=[np.number]).columns.tolist()
categorical_features = df.select_dtypes(include=['object']).columns.tolist()
# 数值型特征处理管道
numeric_transformer = Pipeline(steps=[
('imputer', SimpleImputer(strategy='median')),
('scaler', StandardScaler())
])
# 分类型特征处理管道
categorical_transformer = Pipeline(steps=[
('imputer', SimpleImputer(strategy='constant', fill_value='missing')),
('onehot', OneHotEncoder(handle_unknown='ignore'))
])
# 组合预处理器
preprocessor = ColumnTransformer(
transformers=[
('num', numeric_transformer, numeric_features),
('cat', categorical_transformer, categorical_features)
]
)
# 完整的预处理管道
full_pipeline = Pipeline(steps=[
('preprocessor', preprocessor)
])
# 应用预处理
X_processed = full_pipeline.fit_transform(df)
print(f"预处理后的数据形状: {X_processed.shape}")
数据预处理最佳实践
class DataPreprocessor:
"""数据预处理器类"""
def __init__(self):
self.numeric_imputer = SimpleImputer(strategy='median')
self.categorical_imputer = SimpleImputer(strategy='most_frequent')
self.scaler = StandardScaler()
self.encoder = OneHotEncoder(drop='first', sparse=False)
self.outlier_detector = IsolationForest(contamination=0.1)
def fit(self, X, y=None):
"""训练预处理器"""
# 分离数值型和分类型特征
self.numeric_columns = X.select_dtypes(include=[np.number]).columns
self.categorical_columns = X.select_dtypes(include=['object']).columns
# 训练各个组件
if len(self.numeric_columns) > 0:
X_numeric = X[self.numeric_columns]
self.numeric_imputer.fit(X_numeric)
X_numeric_imputed = self.numeric_imputer.transform(X_numeric)
self.scaler.fit(X_numeric_imputed)
self.outlier_detector.fit(X_numeric_imputed)
if len(self.categorical_columns) > 0:
X_categorical = X[self.categorical_columns]
self.categorical_imputer.fit(X_categorical)
X_categorical_imputed = self.categorical_imputer.transform(X_categorical)
self.encoder.fit(X_categorical_imputed)
return self
def transform(self, X):
"""应用预处理"""
X_processed = X.copy()
# 处理数值型特征
if len(self.numeric_columns) > 0:
X_numeric = X_processed[self.numeric_columns]
X_numeric_imputed = self.numeric_imputer.transform(X_numeric)
X_numeric_scaled = self.scaler.transform(X_numeric_imputed)
# 异常值处理(可选)
outliers = self.outlier_detector.predict(X_numeric_scaled)
X_numeric_scaled[outliers == -1] = np.nan
# 更新数值型特征
for i, col in enumerate(self.numeric_columns):
X_processed[col] = X_numeric_scaled[:, i]
# 处理分类型特征
if len(self.categorical_columns) > 0:
X_categorical = X_processed[self.categorical_columns]
X_categorical_imputed = self.categorical_imputer.transform(X_categorical)
X_categorical_encoded = self.encoder.transform(X_categorical_imputed)
# 创建编码后的特征名
encoded_feature_names = self.encoder.get_feature_names_out(self.categorical_columns)
# 删除原始分类特征
X_processed = X_processed.drop(columns=self.categorical_columns)
# 添加编码后的特征
for i, feature_name in enumerate(encoded_feature_names):
X_processed[feature_name] = X_categorical_encoded[:, i]
return X_processed
def fit_transform(self, X, y=None):
"""训练并应用预处理"""
return self.fit(X, y).transform(X)
# 使用示例
preprocessor = DataPreprocessor()
X_processed = preprocessor.fit_transform(df)
print(f"预处理完成,数据形状: {X_processed.shape}")
最佳实践
- 理解业务背景:数据预处理应该基于对业务的深入理解
- 保留原始数据:始终保留一份未处理的原始数据
- 文档化处理过程:记录数据预处理的每个步骤,确保可重现性
- 验证处理效果:通过可视化和统计检验验证处理效果
- 迭代优化:根据模型表现不断优化预处理流程
数据预处理是一个迭代的过程,需要结合具体业务场景和模型需求,制定适合的处理策略。