📈 数据科学

数据预处理完整指南

全面介绍数据科学项目中的数据预处理流程,包括数据清洗、特征工程、数据转换等关键步骤。

作者: AI-View团队
#数据科学 #数据预处理 #特征工程 #数据清洗
数据预处理完整指南

数据预处理完整指南

数据预处理是数据科学项目成功的关键步骤,高质量的数据是构建有效模型的基础。

数据探索与理解

数据基本信息检查

首先需要了解数据的基本结构:

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns

# 加载数据
df = pd.read_csv('dataset.csv')

# 基本信息
print("数据形状:", df.shape)
print("\n数据类型:")
print(df.dtypes)
print("\n缺失值统计:")
print(df.isnull().sum())
print("\n基本统计信息:")
print(df.describe())

数据分布分析

# 数值型变量分布
numeric_columns = df.select_dtypes(include=[np.number]).columns
for col in numeric_columns:
    plt.figure(figsize=(12, 4))
    
    plt.subplot(1, 2, 1)
    plt.hist(df[col].dropna(), bins=30, alpha=0.7)
    plt.title(f'{col} 分布')
    plt.xlabel(col)
    plt.ylabel('频次')
    
    plt.subplot(1, 2, 2)
    plt.boxplot(df[col].dropna())
    plt.title(f'{col} 箱线图')
    plt.ylabel(col)
    
    plt.tight_layout()
    plt.show()

缺失值处理

缺失值类型分析

缺失值主要分为三类:

  1. 完全随机缺失(MCAR):缺失与任何变量无关
  2. 随机缺失(MAR):缺失依赖于已观察变量
  3. 非随机缺失(MNAR):缺失依赖于未观察值本身

基本处理方法

# 删除缺失值
df_dropped = df.dropna()  # 删除包含缺失值的行
df_dropped_cols = df.dropna(axis=1)  # 删除包含缺失值的列

# 填充缺失值
# 数值型变量
df['numeric_col'].fillna(df['numeric_col'].mean(), inplace=True)  # 均值填充
df['numeric_col'].fillna(df['numeric_col'].median(), inplace=True)  # 中位数填充

# 分类变量
df['category_col'].fillna(df['category_col'].mode()[0], inplace=True)  # 众数填充

# 前向/后向填充
df.fillna(method='ffill', inplace=True)  # 前向填充
df.fillna(method='bfill', inplace=True)  # 后向填充

# 插值填充
df['numeric_col'].interpolate(method='linear', inplace=True)

高级填充方法

from sklearn.impute import KNNImputer
from sklearn.experimental import enable_iterative_imputer
from sklearn.impute import IterativeImputer

# KNN填充
knn_imputer = KNNImputer(n_neighbors=5)
df_knn = pd.DataFrame(knn_imputer.fit_transform(df.select_dtypes(include=[np.number])),
                      columns=df.select_dtypes(include=[np.number]).columns)

# 迭代填充
iterative_imputer = IterativeImputer(random_state=42)
df_iterative = pd.DataFrame(iterative_imputer.fit_transform(df.select_dtypes(include=[np.number])),
                           columns=df.select_dtypes(include=[np.number]).columns)

异常值检测与处理

统计方法

# Z-score方法
from scipy import stats

def detect_outliers_zscore(data, threshold=3):
    z_scores = np.abs(stats.zscore(data))
    return z_scores > threshold

# IQR方法
def detect_outliers_iqr(data):
    Q1 = data.quantile(0.25)
    Q3 = data.quantile(0.75)
    IQR = Q3 - Q1
    lower_bound = Q1 - 1.5 * IQR
    upper_bound = Q3 + 1.5 * IQR
    return (data < lower_bound) | (data > upper_bound)

# 应用异常值检测
for col in numeric_columns:
    outliers_z = detect_outliers_zscore(df[col])
    outliers_iqr = detect_outliers_iqr(df[col])
    
    print(f"{col} - Z-score异常值: {outliers_z.sum()}")
    print(f"{col} - IQR异常值: {outliers_iqr.sum()}")

机器学习方法

from sklearn.ensemble import IsolationForest
from sklearn.svm import OneClassSVM
from sklearn.neighbors import LocalOutlierFactor

# 孤立森林
isolation_forest = IsolationForest(contamination=0.1, random_state=42)
outliers_if = isolation_forest.fit_predict(df.select_dtypes(include=[np.number]))

# 一类SVM
one_class_svm = OneClassSVM(nu=0.1)
outliers_svm = one_class_svm.fit_predict(df.select_dtypes(include=[np.number]))

# 局部异常因子
lof = LocalOutlierFactor(n_neighbors=20, contamination=0.1)
outliers_lof = lof.fit_predict(df.select_dtypes(include=[np.number]))

数据标准化

数值型数据缩放

from sklearn.preprocessing import StandardScaler, MinMaxScaler, RobustScaler

# 标准化
scaler = StandardScaler()
df_scaled = pd.DataFrame(scaler.fit_transform(df.select_dtypes(include=[np.number])),
                        columns=df.select_dtypes(include=[np.number]).columns)

# 归一化
min_max_scaler = MinMaxScaler()
df_normalized = pd.DataFrame(min_max_scaler.fit_transform(df.select_dtypes(include=[np.number])),
                            columns=df.select_dtypes(include=[np.number]).columns)

# 鲁棒缩放
robust_scaler = RobustScaler()
df_robust = pd.DataFrame(robust_scaler.fit_transform(df.select_dtypes(include=[np.number])),
                        columns=df.select_dtypes(include=[np.number]).columns)

# 可视化缩放效果
fig, axes = plt.subplots(2, 2, figsize=(15, 10))

df.select_dtypes(include=[np.number]).hist(ax=axes[0,0], bins=30)
axes[0,0].set_title('原始数据')

df_scaled.hist(ax=axes[0,1], bins=30)
axes[0,1].set_title('标准化数据')

df_normalized.hist(ax=axes[1,0], bins=30)
axes[1,0].set_title('归一化数据')

df_robust.hist(ax=axes[1,1], bins=30)
axes[1,1].set_title('鲁棒缩放数据')

plt.tight_layout()
plt.show()

分类变量编码

from sklearn.preprocessing import LabelEncoder, OneHotEncoder
import pandas as pd

# 标签编码
label_encoder = LabelEncoder()
df['category_encoded'] = label_encoder.fit_transform(df['category_column'])

# 独热编码
df_encoded = pd.get_dummies(df, columns=['category_column'], prefix='cat')

# 序数编码
ordinal_mapping = {'low': 1, 'medium': 2, 'high': 3}
df['ordinal_encoded'] = df['ordinal_column'].map(ordinal_mapping)

# 目标编码
def target_encoding(df, categorical_col, target_col):
    target_mean = df.groupby(categorical_col)[target_col].mean()
    return df[categorical_col].map(target_mean)

df['target_encoded'] = target_encoding(df, 'category_column', 'target')

特征工程

# 多项式特征
from sklearn.preprocessing import PolynomialFeatures

poly_features = PolynomialFeatures(degree=2, include_bias=False)
df_poly = pd.DataFrame(poly_features.fit_transform(df[['feature1', 'feature2']]),
                      columns=poly_features.get_feature_names_out(['feature1', 'feature2']))

# 交互特征
df['interaction'] = df['feature1'] * df['feature2']

# 时间特征
df['datetime'] = pd.to_datetime(df['date_column'])
df['year'] = df['datetime'].dt.year
df['month'] = df['datetime'].dt.month
df['day_of_week'] = df['datetime'].dt.dayofweek
df['is_weekend'] = df['day_of_week'].isin([5, 6]).astype(int)

特征选择

统计方法

from sklearn.feature_selection import SelectKBest, chi2, f_classif, mutual_info_classif
from sklearn.feature_selection import RFE
from sklearn.ensemble import RandomForestClassifier

# 卡方检验(分类问题)
chi2_selector = SelectKBest(chi2, k=10)
X_chi2 = chi2_selector.fit_transform(X, y)

# F检验(数值特征)
f_selector = SelectKBest(f_classif, k=10)
X_f = f_selector.fit_transform(X, y)

# 互信息
mi_selector = SelectKBest(mutual_info_classif, k=10)
X_mi = mi_selector.fit_transform(X, y)

基于模型的方法

from sklearn.feature_selection import SelectFromModel

# 基于随机森林的特征重要性
rf = RandomForestClassifier(n_estimators=100, random_state=42)
rf.fit(X, y)

# 特征重要性可视化
feature_importance = pd.DataFrame({
    'feature': X.columns,
    'importance': rf.feature_importances_
}).sort_values('importance', ascending=False)

plt.figure(figsize=(10, 8))
sns.barplot(data=feature_importance.head(20), x='importance', y='feature')
plt.title('特征重要性排名')
plt.show()

# 基于Lasso的特征选择
from sklearn.linear_model import LassoCV

lasso = LassoCV(cv=5, random_state=42)
lasso.fit(X, y)

# 选择非零系数的特征
selected_features = X.columns[lasso.coef_ != 0]
print(f"Lasso选择的特征数量: {len(selected_features)}")

数据验证

数据质量检查

# 数据类型检查
print("数据类型检查:")
print(df.dtypes)

# 重复数据检查
print(f"重复行数: {df.duplicated().sum()}")

# 数据范围检查
print("数值范围检查:")
for col in df.select_dtypes(include=[np.number]).columns:
    print(f"{col}: [{df[col].min()}, {df[col].max()}]")

# 分类变量唯一值
print("分类变量唯一值")
for col in df.select_dtypes(include=['object']).columns:
    print(f"{col}: {df[col].nunique()} 个唯一值")

数据预处理管道

from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.impute import SimpleImputer

# 定义数值型和分类型特征
numeric_features = df.select_dtypes(include=[np.number]).columns.tolist()
categorical_features = df.select_dtypes(include=['object']).columns.tolist()

# 数值型特征处理管道
numeric_transformer = Pipeline(steps=[
    ('imputer', SimpleImputer(strategy='median')),
    ('scaler', StandardScaler())
])

# 分类型特征处理管道
categorical_transformer = Pipeline(steps=[
    ('imputer', SimpleImputer(strategy='constant', fill_value='missing')),
    ('onehot', OneHotEncoder(handle_unknown='ignore'))
])

# 组合预处理器
preprocessor = ColumnTransformer(
    transformers=[
        ('num', numeric_transformer, numeric_features),
        ('cat', categorical_transformer, categorical_features)
    ]
)

# 完整的预处理管道
full_pipeline = Pipeline(steps=[
    ('preprocessor', preprocessor)
])

# 应用预处理
X_processed = full_pipeline.fit_transform(df)
print(f"预处理后的数据形状: {X_processed.shape}")

数据预处理最佳实践

class DataPreprocessor:
    """数据预处理器类"""
    
    def __init__(self):
        self.numeric_imputer = SimpleImputer(strategy='median')
        self.categorical_imputer = SimpleImputer(strategy='most_frequent')
        self.scaler = StandardScaler()
        self.encoder = OneHotEncoder(drop='first', sparse=False)
        self.outlier_detector = IsolationForest(contamination=0.1)
        
    def fit(self, X, y=None):
        """训练预处理器"""
        # 分离数值型和分类型特征
        self.numeric_columns = X.select_dtypes(include=[np.number]).columns
        self.categorical_columns = X.select_dtypes(include=['object']).columns
        
        # 训练各个组件
        if len(self.numeric_columns) > 0:
            X_numeric = X[self.numeric_columns]
            self.numeric_imputer.fit(X_numeric)
            X_numeric_imputed = self.numeric_imputer.transform(X_numeric)
            self.scaler.fit(X_numeric_imputed)
            self.outlier_detector.fit(X_numeric_imputed)
            
        if len(self.categorical_columns) > 0:
            X_categorical = X[self.categorical_columns]
            self.categorical_imputer.fit(X_categorical)
            X_categorical_imputed = self.categorical_imputer.transform(X_categorical)
            self.encoder.fit(X_categorical_imputed)
            
        return self
    
    def transform(self, X):
        """应用预处理"""
        X_processed = X.copy()
        
        # 处理数值型特征
        if len(self.numeric_columns) > 0:
            X_numeric = X_processed[self.numeric_columns]
            X_numeric_imputed = self.numeric_imputer.transform(X_numeric)
            X_numeric_scaled = self.scaler.transform(X_numeric_imputed)
            
            # 异常值处理(可选)
            outliers = self.outlier_detector.predict(X_numeric_scaled)
            X_numeric_scaled[outliers == -1] = np.nan
            
            # 更新数值型特征
            for i, col in enumerate(self.numeric_columns):
                X_processed[col] = X_numeric_scaled[:, i]
        
        # 处理分类型特征
        if len(self.categorical_columns) > 0:
            X_categorical = X_processed[self.categorical_columns]
            X_categorical_imputed = self.categorical_imputer.transform(X_categorical)
            X_categorical_encoded = self.encoder.transform(X_categorical_imputed)
            
            # 创建编码后的特征名
            encoded_feature_names = self.encoder.get_feature_names_out(self.categorical_columns)
            
            # 删除原始分类特征
            X_processed = X_processed.drop(columns=self.categorical_columns)
            
            # 添加编码后的特征
            for i, feature_name in enumerate(encoded_feature_names):
                X_processed[feature_name] = X_categorical_encoded[:, i]
        
        return X_processed
    
    def fit_transform(self, X, y=None):
        """训练并应用预处理"""
        return self.fit(X, y).transform(X)

# 使用示例
preprocessor = DataPreprocessor()
X_processed = preprocessor.fit_transform(df)
print(f"预处理完成,数据形状: {X_processed.shape}")

最佳实践

  1. 理解业务背景:数据预处理应该基于对业务的深入理解
  2. 保留原始数据:始终保留一份未处理的原始数据
  3. 文档化处理过程:记录数据预处理的每个步骤,确保可重现性
  4. 验证处理效果:通过可视化和统计检验验证处理效果
  5. 迭代优化:根据模型表现不断优化预处理流程

数据预处理是一个迭代的过程,需要结合具体业务场景和模型需求,制定适合的处理策略。