为什么使用spark.ml
sklearn只能单机计算,适合小数据的方法验证 spark有集群模式,适合大型数据
spark.ml基础
数据格式:spark的DataFrame(与pandas的DataFrame区分) Transformer:是可以将一个DataFrame变换成另一个,用于数据前处理。 Estimator:是一个算法,对一个DataFrame进行Fit后得到Estimator,再对test数据进行验证。
spark.ml机器学习流程
- 源数据ETL
- 数据预处理:如从pandas的DataFrame到Spark的DataFrame;将字符型特征转化为数值
- 特征提取
- 模型的训练和验证
spark.ml分类实战代码
- 导入数据,并由pd.DataFrame转化为Spark.DataFrame
from pyspark.sql import SparkSession
import pandas as pd
from pyspark.ml.linalg import Vector,Vectors
from pyspark.sql import Row
pd_df = pd.read_table('/home/iris.txt',header =None,sep=',',skiprows=1)
m = []
for i in range(pd_df.shape[0]):
dense = []
for j in range(1,pd_df.shape[1]-1):
dense.append(float(pd_df.iloc[i,j]))
rel = {}
rel['features'] = Vectors.dense(dense)
rel['label'] = str(pd_df.iloc[i,5])
m.append(rel)
data = spark.createDataFrame(m)
data.createOrReplaceTempView("iris")
df = spark.sql("select * from iris")
2.数据预处理
labelIndexer = StringIndexer().setInputCol("label").setOutputCol("indexedLabel").fit(df)
featureIndexer = VectorIndexer().setInputCol("features").setOutputCol("indexedFeatures").setMaxCategories(4).fit(df)
labelConverter = IndexToString().setInputCol("prediction").setOutputCol("predictedLabel").setLabels(labelIndexer.labels)
trainingData, testData = data.randomSplit([0.7, 0.3])
- 构建pipline
from pyspark.ml.classification import DecisionTreeClassificationModel,DecisionTreeClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
dtClassifier = DecisionTreeClassifier().setLabelCol("indexedLabel").setFeaturesCol("indexedFeatures")
pipelinedClassifier = Pipeline().setStages([labelIndexer, featureIndexer, dtClassifier, labelConverter])
modelClassifier = pipelinedClassifier.fit(trainingData)
predictionsClassifier = modelClassifier.transform(testData)
predictionsClassifier.select("predictedLabel", "label", "features").show(20)
evaluatorClassifier = MulticlassClassificationEvaluator().setLabelCol("indexedLabel").setPredictionCol("prediction").setMetricName("accuracy")
accuracy = evaluatorClassifier.evaluate(predictionsClassifier)
print("Test Error = " + str(1.0 - accuracy))
spark.ml回归实战代码
import pandas as pd
from pyspark.ml import Pipeline
from pyspark.sql import SQLContext, SparkSession
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.feature import VectorIndexer
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.linalg import Vector,Vectors
spark = SparkSession.builder.master("local").appName("Word Count").getOrCreate()
pd_df = pd.read_table('/home/liyiguo/regressin_data.txt',header =None,sep='\t',skiprows=1)
m = []
for i in range(pd_df.shape[0]):
dense = []
for j in range(1,pd_df.shape[1]-1):
dense.append(float(pd_df.iloc[i,j]))
rel = {}
rel['features'] = Vectors.dense(dense)
rel['label'] = float(pd_df.iloc[i,pd_df.shape[1]-1])
m.append(rel)
data = spark.createDataFrame(m)
scaler = MinMaxScaler(inputCol="features", outputCol="scaledFeatures")
featureIndexer =VectorIndexer(inputCol="scaledFeatures", outputCol="indexedFeatures", maxCategories=4)
(trainingData, testData) = data.randomSplit([0.7, 0.3])
rf = RandomForestRegressor(featuresCol="indexedFeatures")
pipeline = Pipeline(stages=[scaler,featureIndexer, rf])
model = pipeline.fit(trainingData)
predictions = model.transform(testData)
predictions.select("prediction", "label", "features").show(5)
evaluator = RegressionEvaluator(
labelCol="label", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)
rfModel = model.stages[1]
print(rfModel)
|