|
我正在使用Spark集群,我想通过执行此代码来实现线性回归:
data = sqlContext.read.format("com.databricks.spark.csv")
.option("header","true")
.option("inferSchema","true")
.load("/FileStore/tables/w4s3yhez1497323663423/basma.csv/")
data.cache() # Cache data for faster reuse
data.count()
from pyspark.mllib.regression import LabeledPoint
# convenience for specifying schema
data = data.select("creat0","gfr0m").rdd.map(lambda r: LabeledPoint(r[1],[r[0]]))
.toDF()
display(data)
from pyspark.ml.feature import VectorAssembler
vecAssembler = VectorAssembler(inputCols=["creat0","gfr0m"],outputCol="features")
(trainingData,testData) = data.randomSplit([0.7,0.3],seed=100)
trainingData.cache()
testData.cache()
print "Training Data : ",trainingData.count()
print "Test Data : ",testData.count()
data.collect()
from pyspark.ml.regression import LinearRegression
lr = LinearRegression()
# Fit 2 models,using different regularization parameters
modelA = lr.fit(data,{lr.regParam: 0.0})
modelB = lr.fit(data,{lr.regParam: 100.0})
# Make predictions
predictionsA = modelA.transform(data)
display(predictionsA)
from pyspark.ml.evaluation import RegressionEvaluator
evaluator = RegressionEvaluator(metricName="rmse")
RMSE = evaluator.evaluate(predictionsA)
print("ModelA: Root Mean Squared Error = " + str(RMSE))
# ModelA: Root Mean Squared Error = 128.602026843
predictionsB = modelB.transform(data)
RMSE = evaluator.evaluate(predictionsB)
print("ModelB: Root Mean Squared Error = " + str(RMSE))
# ModelB: Root Mean Squared Error = 129.496300193
# Import numpy,pandas,and ggplot
import numpy as np
from pandas import *
from ggplot import *
But its give me this error:
IllegalArgumentException: u’requirement failed: Column features must
be of type org.apache.spark.ml.linalg.VectorUDT@3bfc3ba7 but was
actually org.apache.spark.mllib.linalg.VectorUDT@f71b0bce.
谷歌搜索此错误后,我找到了一个答案:
使用pyspark.ml.linalg import Vectors,VectorUDT
代替
from pyspark.mllib.linalg import Vectors,VectorUDT
要么
from pyspark.ml.linalg import VectorUDT
from pyspark.sql.functions import udf
和功能:
as_ml = udf(lambda v: v.asML() if v is not None else None,VectorUDT())
使用示例数据:
from pyspark.mllib.linalg import Vectors as MLLibVectors
df = sc.parallelize([
(MLLibVectors.sparse(4,[0,2],[1,-1]),),(MLLibVectors.dense([1,2,3,4]),)
]).toDF(["features"])
result = df.withColumn("features",as_ml("features"))
但我仍然有同样的错误:
这里有一些数据:
原因,“weight0”,“dbp0”,“gfr0m” 1,“90”,“10”,“22.72” 5,“54”,“16.08” 6,“66”,“9”,“25.47” 3,“110”,“11”,“32.95” 5,“62”,“20.3” 5,“65”,“8”,“28.94” 1,“15.88” 5,“96”,“38.09” 5,“41.64” 如图4所示,“68”,“25.85” 5,“7”,“37.77” 1,“82”,“9.5”,“16.25” 5,“76”,“37.55” 5,“56”,“”,“37.06” 1,“93”,“18.26” 5,“80”,“7.5”,“48.49” 1,“73”,“38.37” 如图4所示,“31.09” 1,“39.62” 1,“40.08” 1,“28.2” 5,“81”,“36.66” 2,“47.1” 5,“91”,“16.59” 2,“58”,“49.22” 1,“38.98” ,“61”,“21.8” 5,“50”,“6”,“26.97” 1,“83”,“27.81” 1,“86”,“48.62” ,“77”,“46.78” 5,“64”,“34.17” 5,“38.95” 1,“7.63” 5,“32.46” 1,“35.98” 5,“32.26” 5,“42”,“17.3” 1,“88”,“25.61” 5,“” 1,“84”,“31.4” 5,“53.25” 1,“52.65” 6,“74”,“40.77” 1,“70”,“22.35” 6,“20.16” 1,“52”,“13”,“32.61” ,“52.98” 5,“28.67”
最佳答案
在这里,您只需要从pyspark.ml中为VectorUDT设置别名:
from pyspark.mllib.linalg import Vectors as MLLibVectors
from pyspark.ml.linalg import VectorUDT as VectorUDTML
from pyspark.sql.functions import udf
as_ml = udf(lambda v: v.asML() if v is not None else None,VectorUDTML())
df = sc.parallelize([
(MLLibVectors.sparse(4,as_ml("features"))
result.show()
#+--------------------+
# | features|
#+--------------------+
# |(4,[1.0,-1.0])|
# | [1.0,2.0,3.0,4.0]|
# +--------------------+
当然,生成的DataFrame结果还没有准备好传递给LinearRegression,因为它没有标签列,但我相信你知道如何处理它. (编辑:安卓应用网)
【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!
|