加入收藏 | 设为首页 | 会员中心 | 我要投稿 西安站长网 (https://www.029zz.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 教程 > 正文

基于Spark的数据分析实践

发布时间:2019-06-20 16:48:45 所属栏目:教程 来源:EAWorld
导读:副标题#e# 引言: Spark是在借鉴了MapReduce之上发展而来的,继承了其分布式并行计算的优点并改进了MapReduce明显的缺陷。Spark主要包含了Spark Core、Spark SQL、Spark Streaming、MLLib和GraphX等组件。 本文主要分析了 Spark RDD 以及 RDD 作为开发的不

TextFile DataFrame

  1. import.org.apache.spark.sql._ 
  2. //定义数据的列名称和类型 
  3. valdt=StructType(List(id:String,name:String,gender:String,age:Int)) 
  4. ​ 
  5. //导入user_info.csv文件并指定分隔符 
  6. vallines = sc.textFile("/path/user_info.csv").map(_.split(",")) 
  7. ​ 
  8. //将表结构和数据关联起来,把读入的数据user.csv映射成行,构成数据集 
  9. valrowRDD = lines.map(x=>Row(x(0),x(1),x(2),x(3).toInt)) 
  10. ​ 
  11. //通过SparkSession.createDataFrame()创建表,并且数据表表头 
  12. val df= spark.createDataFrame(rowRDD, dt) 

读取规则数据文件作为DataFrame

  1. SparkSession.Builder builder = SparkSession.builder() 
  2. Builder.setMaster("local").setAppName("TestSparkSQLApp") 
  3. SparkSession spark = builder.getOrCreate(); 
  4. SQLContext sqlContext = spark.sqlContext(); 
  5. ​ 
  6. # 读取 JSON 数据,path 可为文件或者目录 
  7. valdf=sqlContext.read().json(path); 
  8. ​ 
  9. # 读取 HadoopParquet 文件 
  10. vardf=sqlContext.read().parquet(path); 
  11. ​ 
  12. # 读取 HadoopORC 文件 
  13. vardf=sqlContext.read().orc(path); 

JSON 文件为每行一个 JSON 对象的文件类型,行尾无须逗号。文件头也无须[]指定为数组;SparkSQL 读取是只是按照每行一条 JSON Record序列化;

Parquet文件

  1. Configurationconfig = new Configuration(); 
  2. ParquetFileReaderreader = ParquetFileReader.open( 
  3.  HadoopInputFile.fromPath(new Path("hdfs:///path/file.parquet"),conf)); 
  4. Map<String, String>schema = reader.getFileMetaData().getKeyValueMetaData(); 
  5. String allFields= schema.get("org.apache.spark.sql.parquet.row.metadata"); 

allFiedls 的值就是各字段的名称和具体的类型,整体是一个json格式进行展示。

读取 Hive 表作为 DataFrame

Spark2 API 推荐通过 SparkSession.Builder 的 Builder 模式创建 SparkContext。 Builder.getOrCreate() 用于创建 SparkSession,SparkSession 是 SparkContext 的封装。

在Spark1.6中有两个核心组件SQLcontext和HiveContext。SQLContext 用于处理在 SparkSQL 中动态注册的表,HiveContext 用于处理 Hive 中的表。

从Spark2.0以上的版本开始,spark是使用全新的SparkSession接口代替Spark1.6中的SQLcontext和HiveContext。SQLContext.sql 即可执行 Hive 中的表,也可执行内部注册的表;

(编辑:西安站长网)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

推荐文章
    热点阅读