IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Spark.ml 进行机器学习 -> 正文阅读

[大数据]Spark.ml 进行机器学习

为什么使用spark.ml

sklearn只能单机计算,适合小数据的方法验证
spark有集群模式,适合大型数据

spark.ml基础

数据格式:spark的DataFrame(与pandas的DataFrame区分)
Transformer:是可以将一个DataFrame变换成另一个,用于数据前处理。
Estimator:是一个算法,对一个DataFrame进行Fit后得到Estimator,再对test数据进行验证。

spark.ml机器学习流程

  1. 源数据ETL
  2. 数据预处理:如从pandas的DataFrame到Spark的DataFrame;将字符型特征转化为数值
  3. 特征提取
  4. 模型的训练和验证

spark.ml分类实战代码

  1. 导入数据,并由pd.DataFrame转化为Spark.DataFrame
from pyspark.sql import SparkSession
import pandas as pd
from pyspark.ml.linalg import Vector,Vectors
from pyspark.sql import Row

pd_df = pd.read_table('/home/iris.txt',header =None,sep=',',skiprows=1)
m = []
for i in range(pd_df.shape[0]):   
    dense = []
    for j in range(1,pd_df.shape[1]-1):       
        dense.append(float(pd_df.iloc[i,j]))
    rel = {}
    # rel['features'] = Vectors.dense(float(pd_df.iloc[i,1]),float(pd_df.iloc[i,2]),float(pd_df.iloc[i,3]),float(pd_df.iloc[i,4]))
    rel['features'] = Vectors.dense(dense)
    rel['label'] = str(pd_df.iloc[i,5])
    m.append(rel)

data = spark.createDataFrame(m)

data.createOrReplaceTempView("iris")
df = spark.sql("select * from iris")

2.数据预处理

#分别获取标签列和特征列,进行索引,并进行了重命名。
labelIndexer = StringIndexer().setInputCol("label").setOutputCol("indexedLabel").fit(df)
 
featureIndexer = VectorIndexer().setInputCol("features").setOutputCol("indexedFeatures").setMaxCategories(4).fit(df)
#这里我们设置一个labelConverter,目的是把预测的类别重新转化成字符型的。
labelConverter = IndexToString().setInputCol("prediction").setOutputCol("predictedLabel").setLabels(labelIndexer.labels)
#接下来,我们把数据集随机分成训练集和测试集,其中训练集占70%。
trainingData, testData = data.randomSplit([0.7, 0.3])
  1. 构建pipline
from pyspark.ml.classification import DecisionTreeClassificationModel,DecisionTreeClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

#训练决策树模型,这里我们可以通过setter的方法来设置决策树的参数,也可以用ParamMap来设置(具体的可以查看spark mllib的官网)。具体的可以设置的参数可以通过explainParams()来获取。
dtClassifier = DecisionTreeClassifier().setLabelCol("indexedLabel").setFeaturesCol("indexedFeatures")
#在pipeline中进行设置
pipelinedClassifier = Pipeline().setStages([labelIndexer, featureIndexer, dtClassifier, labelConverter])
#训练决策树模型
modelClassifier = pipelinedClassifier.fit(trainingData)
#进行预测
predictionsClassifier = modelClassifier.transform(testData)

predictionsClassifier.select("predictedLabel", "label", "features").show(20)


evaluatorClassifier = MulticlassClassificationEvaluator().setLabelCol("indexedLabel").setPredictionCol("prediction").setMetricName("accuracy")
 
accuracy = evaluatorClassifier.evaluate(predictionsClassifier)
 
print("Test Error = " + str(1.0 - accuracy))

spark.ml回归实战代码

import pandas as pd
from pyspark.ml import Pipeline
from pyspark.sql import SQLContext, SparkSession

from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.feature import VectorIndexer
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.linalg import Vector,Vectors
# Load and parse the data file, converting it to a DataFrame.
# data = spark.read.format("libsvm").load("file:///home/liyiguo/spark/sample_libsvm_data.txt")

spark = SparkSession.builder.master("local").appName("Word Count").getOrCreate()

pd_df = pd.read_table('/home/liyiguo/regressin_data.txt',header =None,sep='\t',skiprows=1)

m = []
for i in range(pd_df.shape[0]):   
    dense = []
    for j in range(1,pd_df.shape[1]-1):       
        dense.append(float(pd_df.iloc[i,j]))
    rel = {}
    rel['features'] = Vectors.dense(dense)
    # rel['label'] = str(pd_df.iloc[i,pd_df.shape[1]-1])
    rel['label'] = float(pd_df.iloc[i,pd_df.shape[1]-1])
    m.append(rel)

data = spark.createDataFrame(m)


# Automatically identify categorical features, and index them.
# Set maxCategories so features with > 4 distinct values are treated as continuous.

scaler = MinMaxScaler(inputCol="features", outputCol="scaledFeatures")
featureIndexer =VectorIndexer(inputCol="scaledFeatures", outputCol="indexedFeatures", maxCategories=4)

# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = data.randomSplit([0.7, 0.3])

# Train a RandomForest model.
rf = RandomForestRegressor(featuresCol="indexedFeatures")

# Chain indexer and forest in a Pipeline
pipeline = Pipeline(stages=[scaler,featureIndexer, rf])

# Train model.  This also runs the indexer.
model = pipeline.fit(trainingData)

# Make predictions.
predictions = model.transform(testData)

# Select example rows to display.
predictions.select("prediction", "label", "features").show(5)

# Select (prediction, true label) and compute test error
evaluator = RegressionEvaluator(
    labelCol="label", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

rfModel = model.stages[1]
print(rfModel)  # summary only

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-07-28 00:16:32  更:2021-07-28 00:16:59 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年5日历 -2024/5/21 0:05:59-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码