Overview
在实际机器学习工作当中,调参是我们一个重要的内容。PySpark
当中就实现了一个最常用的调参方法Grid Search
,我们结合lightGBM
使用一下PySpark
的调参。这个程序需要安装的依赖的安装方式,可以参考上一篇博客。
1. 引入依赖包
import numpy as np
import pyspark
spark = pyspark.sql.SparkSession.builder.appName("spark lightgbm") \
.master("spark://***.***.***.***:7077") \
.config("spark.jars.packages", "com.microsoft.ml.spark:mmlspark_2.11:0.18.1") \
.config("spark.cores.max", "20") \
.config("spark.driver.memory", "6G") \
.config("spark.executor.memory", "6G") \
.config("spark.executor.cores", "6") \
.getOrCreate()
import mmlspark
from mmlspark.lightgbm import LightGBMClassifier
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml import Pipeline
其中,pyspark.ml.tuning.ParamGridBuilder
就是用以实现Grid Search
的包。
2. 加载数据
df_train = spark.read.format("csv") \
.option("inferSchema", "true") \
.option("header", "true") \
.option("sep", ",") \
.load("hdfs://***.***.***.***:39000/young/训练集特征.csv")
df_val = spark.read.format("csv") \
.option("inferSchema", "true") \
.option("header", "true") \
.option("sep", ",") \
.load("hdfs://***.***.***.***:39000/young/验证集特征.csv")
df_test = spark.read.format("csv") \
.option("inferSchema", "true") \
.option("header", "true") \
.option("sep", ",") \
.load("hdfs://***.***.***.***:39000/young/测试集特征.csv")
处理训练集特征:
feature_cols = list(df_train.columns)
feature_cols.remove("label") # 从列名当中删除label才是真正的特征列表
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
df_train = assembler.transform(df_train)
3. 模型构建
lgb = LightGBMClassifier(
objective="binary",
boostingType='gbdt',
isUnbalance=True,
featuresCol='features',
labelCol='label',
maxBin=60,
baggingFreq=1,
baggingSeed=696,
earlyStoppingRound=30,
learningRate=0.1,
# lambdaL1=1.0,
# lambdaL2=45.0,
maxDepth=3,
numLeaves=128,
baggingFraction=0.7,
featureFraction=0.7,
minSumHessianInLeaf=0.001,
numIterations=800,
verbosity=1
)
设置Grid Search
参数组:
paramGrid = ParamGridBuilder() \
.addGrid(lgb.lambdaL1, list(np.arange(1.0, 3.0, 1.0))) \
.addGrid(lgb.lambdaL2, list(np.arange(1.0, 4.0, 1.0))) \
.build()
设置完成之后,我们可以看一下参数都是哪些:
for param in paramGrid:
print(param.values())
可以看到共6
组参数,左边为lambdaL1
,右边为lambdaL2
:
dict_values([1.0, 1.0])
dict_values([1.0, 2.0])
dict_values([1.0, 3.0])
dict_values([2.0, 1.0])
dict_values([2.0, 2.0])
dict_values([2.0, 3.0])
4. 交叉验证选择模型
官方提供了两种模型选择的方式:CrossValidator
和TrainValidationSplit
,可以参考官方文档。CrossValidator
和TrainValidationSplit
的区别在于:CrossValidator
会每次选取一部分训练集建模,去预测另外一部分训练集,这样会有K
个预测的分数(K
折交叉验证),最后模型的预测分数为K
个分数的平均;而TrainValidationSplit
则只会训练预测一次。这里,我们试着给出一个CrossValidator
的例子。
evaluator = BinaryClassificationEvaluator(labelCol="label", metricName="areaUnderROC")
cross_vallidator = CrossValidator(estimator=lgb,
estimatorParamMaps=paramGrid,
evaluator=evaluator,
numFolds=3)
model = cross_vallidator.fit(df_train)
最后,我们可以得到最好的模型,并看最好的模型的特征重要性:
model.bestModel
model.bestModel.getFeatureImportances()
但是,官方api
查看每个结果对应的超参数却是非常不友好,我们只好自己想办法,这里我们参考了这篇博客LightGBM Hyper Parameters Tuning in Spark:
def params_extract(model):
"""
function extact hyperparameter information from a CrossValidatorModel
input: a CrossValidatorModel instance, model fit by CrossValidator in pyspark.ml.tuning
output: a dictionary with key(hyperparameters setting), value(evaluator's metrics, r2, auc,...)
"""
length = len(model.avgMetrics)
res = {}
for i in range(length):
s = ""
paraDict = model.extractParamMap()[model.estimatorParamMaps][i]
for j in paraDict.keys():
s += str(j).split("__")[1] + " "
s += str(paraDict[j]) + " "
res[s.strip()] = model.avgMetrics[i]
return {k: v for k, v in sorted(res.items(), key=lambda item: item[1])}
我们试一下:
params_extract(model)
输出如下:
{'lambdaL1 1.0 lambdaL2 2.0': 0.7300710699287217,
'lambdaL1 1.0 lambdaL2 1.0': 0.7307078416518147,
'lambdaL1 1.0 lambdaL2 3.0': 0.7310860740685388,
'lambdaL1 2.0 lambdaL2 1.0': 0.7312930665848859,
'lambdaL1 2.0 lambdaL2 2.0': 0.7317563737747359,
'lambdaL1 2.0 lambdaL2 3.0': 0.7327463775422981}
5. 官方调参探讨
我们通过CrossValidator
可以获得最佳的模型,但是会有一个问题:这个最佳是拟合训练集的最佳,而不是我们给出的验证集的最佳;即使是TrainValidationSplit
,我们也不能自定义验证集并传入,只能随机选择验证集。这样对于那些样本时间先后顺序不敏感的数据是影响不大的,比如图像等,但是对于交易类数据,我们希望可以根据时间先后顺序自定义训练集,验证集和测试集,并且根据验证集的效果来确定最佳参数和模型。因此,我们就需要换种方式达到目的。
import numpy as np
import pyspark
spark = pyspark.sql.SparkSession.builder.appName("spark lightgbm") \
.master("spark://***.***.***.***:7077") \
.config("spark.jars.packages", "com.microsoft.ml.spark:mmlspark_2.11:0.18.1") \
.config("spark.cores.max", "20") \
.config("spark.driver.memory", "6G") \
.config("spark.executor.memory", "6G") \
.config("spark.executor.cores", "6") \
.getOrCreate()
import mmlspark
from mmlspark.lightgbm import LightGBMClassifier
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import BinaryClassificationEvaluator
# 加载数据
df_train = spark.read.format("csv") \
.option("inferSchema", "true") \
.option("header", "true") \
.option("sep", ",") \
.load("hdfs://***.***.***.***:39000/young/训练集特征.csv")
df_val = spark.read.format("csv") \
.option("inferSchema", "true") \
.option("header", "true") \
.option("sep", ",") \
.load("hdfs://***.***.***.***:39000/young/验证集特征.csv")
df_test = spark.read.format("csv") \
.option("inferSchema", "true") \
.option("header", "true") \
.option("sep", ",") \
.load("hdfs://***.***.***.***:39000/young/测试集特征.csv")
feature_cols = list(df_train.columns)
feature_cols.remove("label")
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
df_train = assembler.transform(df_train)
df_val = assembler.transform(df_val)
df_test = assembler.transform(df_test)
lgb = LightGBMClassifier(
objective="binary",
boostingType='gbdt',
isUnbalance=True,
featuresCol='features',
labelCol='label',
maxBin=60,
baggingFreq=1,
baggingSeed=696,
earlyStoppingRound=20,
learningRate=0.1,
#lambdaL1=1.0,
#lambdaL2=45.0,
maxDepth=3,
numLeaves=128,
baggingFraction=0.7,
featureFraction=0.7,
minSumHessianInLeaf=0.001,
numIterations=800,
verbosity=1
)
lightGBMs = list()
for lambdaL1 in list(np.arange(1.0, 3.0, 1.0)):
for lambdaL2 in list(np.arange(1.0, 4.0, 1.0)):
lightGBMs.append(lgb.setLambdaL1(lambdaL1).setLambdaL2(lambdaL2))
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction", metricName="areaUnderROC")
metrics = []
models = []
# 选择验证集效果最好的模型
for learner in lightGBMs:
model = learner.fit(df_train)
models.append(model)
scoredData = model.transform(df_val)
metrics.append(evaluator.evaluate(scoredData))
best_metric = max(metrics)
best_model = models[metrics.index(best_metric)]
# 得到测试集上AUC
scored_test = best_model.transform(df_test)
print(evaluator.evaluate(scored_test))
当然,用这种方式的话,我们就可以很方便地记录每组参数对应的结果了,在评价验证集效果时,也可以用KS
或者Gini
等指标了,这里就不再赘述。
本文主要参考了:How to build machine learning model at large scale with Apache Spark and LightGBM for credit card fraud detection?
Simplifying Machine Learning Pipelines with mmlspark
作者您好,
我在databricks运行代码,这里结合CrossValidator和lightgbm, 总是会出错。并且databricks的显示并行计算的进度条不弹出来。
把github的issues都翻遍了,也没解决方法,也尝试了把autoScale和dynamicAllocation都关掉了。请问您是否遇到过一样的问题?
现在作者都不怎么维护那个库了,好多人提问都没有人回答。