大数据SparkSQl指的是什么呢


这期内容当中小编将会给大家带来有关大数据SparkSQl指的是什么呢,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。Spark SQL是Spark用来处理结构化数据的一个模块,它提供了一个编程抽象叫做DataFrame并且作为分布式SQL查询引擎的作用。SparkSql中返回的数据类型是DataFrame1.1.1.为什么要学习Spark SQL我们已经学习了Hive,它是将Hive SQL转换成MapReduce然后提交到集群上执行,大大简化了编写MapReduce的程序的复杂性,由于MapReduce这种计算模型执行效率比较慢。所有Spark SQL的应运而生,它是将Spark SQL转换成RDD,然后提交到集群执行,执行效率非常快!HIVE:简化编写MapReduce的程序的复杂性Spark SQL转换成RDD:替代MapReduce,提高效率Spark1.0版本开始就推出了SparkSQL,最早是叫Shark1、内存列存储–可以大大优化内存使用效率,减少了内存消耗,避免了gc对大量数据的性能开销2、字节码生成技术(byte-code generation)–可以使用动态字节码生成技术来优化性能3、Scala代码的优化  结构化数据是指任何有结构信息的数据。所谓结构信息,就是每条记录共用的已知的字段集合。当数据符合这样的条件时,Spark SQL就会使得针对这些数据的读取和查询变得更加简单高效。具体 来说,Spark SQL提供了以下三大功能(见图9-1)。(1) Spark SQL可以从各种结构化数据源(例如JSON、Hive、Parquet等)中读取数据。(2) Spark SQL不仅支持在Spark程序内使用SQL语句进行数据查询,也支持从类似商业 智能软件Tableau这样的外部工具中通过标准数据库连接器(JDBC/ODBC)连接Spark SQL进行查询。(3)当在Spark程序内使用Spark SQL时,Spark SQL支持SQL与常规的Python/Java/Scala代码高度整合,包括连接RDD与SQL表、公开的自定义SQL函数接口等。这样一来, 许多工作都更容易实现了。为了实现这些功能,Spark SQL提供了一种特殊的RDD,叫作SchemaRDD。SchemaRDD是存放Row对象的RDD,每个Row对象代表一行记录。SchemaRDD还包含记录的结构信 息(即数据字段)。SchemaRDD看起来和普通的RDD很像,但是在内部,SchemaRDD可 以利用结构信息更加高效地存储数据。此外,SchemaRDD还支持RDD上所没有的一些新 操作,比如运行SQL查询。SchemaRDD可以从外部数据源创建,也可以从查询结果或普 通RDD中创建。什么是DataFrames(SparkSql中返回的数据类型:它在概念上等同于关系数据库中的表,但在查询上进行了优化)与RDD类似,DataFrame也是一个分布式数据容器。然而DataFrame更像传统数据库的二维表格,除了数据以外,还记录数据的结构信息,即schema。1.1.1.创建DataFrames在Spark SQL中SQLContext是创建DataFrames和执行SQL的入口,在spark-1.6.1中已经内置了一个sqlContext1.在本地创建一个文件,有三列,分别是id、name、age,用空格分隔,然后上传到hdfs上hdfs dfs -put person.txt /2.在spark shell执行下面命令,读取数据,将每一行的数据使用列分隔符分割val lineRDD = sc.textFile(“hdfs://node01:9000/person.txt”).map(_.split(” “))3.定义case class(相当于表的schemacase class Person(id:Int, name:String, age:Int)4.RDDcase class关联val personRDD = lineRDD.map(x => Person(x(0).toInt, x(1), x(2).toInt))  (里面的数据是在Array中)5.RDD转换成DataFrameval personDF = personRDD.toDF6.对DataFrame进行处理personDF.showval seq1 = Seq((“1″,”bingbing”,35),(“2″,”yuanyuan”,34),(“3″,”mimi”,33))val rdd1 =sc.parallelize(seq1)val df = rdd1.toDF(“id”,”name”,”age”)df.showDSL:领域特定语言////查看DataFrame中的内容
//查看DataFrame部分列中的内容1.
2.
3.
//打印DataFrame的Schema信息
//查询所有的name和age,并将age+11.df.select(col(“id”),col(“name”),col(“age”)+1).show
2.df.select(df(“id”), df(“name”), df(“age”) + 1).show
//过滤age大于等于18的df.filter(col(“age”) >= 35).show
//按年龄进行分组并统计相同年龄的人数df.groupBy(“age”).count().show()
SQL风格语法//查询年龄最大的前两名1.如果想使用SQL风格的语法,需要将DataFrame注册成表df.registerTempTable(“t_person”)2.sqlContext.sql(“select * from t_person order by age desc limit 2”).show
//显示表的Schema信息
以编程方式执行Spark SQL查询1.编写Spark SQL查询程序1.通过反射推断Schema=======================================================packagecom.qf.gp1708.day06//通过反射获取用户信息importorg.apache.spark.rdd.RDDimportorg.apache.spark.sql.{DataFrame, SQLContext}importorg.apache.spark.{SparkConf, SparkContext}objectInferSchema {defmain(args: Array[String]): Unit = {valconf =newSparkConf().setMaster(“local”).setAppName(“inferschema”)valsc =newSparkContext(conf)valsqlContext:SQLContext =newSQLContext(sc)1. //获取数据并切分valline = sc.textFile(“C://Users/Song/Desktop/person.txt”).map(_.split(“,”))3//将获取的数据和Person样例类进行关联valpersonRdd: RDD[Godness] = line.map(arr=>Godness(arr(0).toLong,arr(1),arr(2).toInt,arr(3).toInt))//引入隐 香港云主机式转换函数,这样才可以调用到toDF方法importsqlContext.implicits._4//将personRDD转换成DataFramevaldF: DataFrame = personRdd.toDF5. //注册一张临时表dF.registerTempTable(“t_person”)valsql =“select * from t_person where fv > 70 order by age”//查询valres: DataFrame = sqlContext.sql(sql)res.show()sc.stop()}}2//创建样例类case classGodness(id:Long,name:String,age:Int,fv:Int)=========================================================2.通过StructType直接指定Schema===========================================packagecom.qf.gp1708.day06importorg.apache.spark.rdd.RDDimportorg.apache.spark.sql.{DataFrame, Row, SQLContext}importorg.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}importorg.apache.spark.{SparkConf, SparkContext}/*** 通过StructType类型直接指定Schema*/objectStructTypeSchema {defmain(args: Array[String]): Unit = {valconf =newSparkConf().setAppName(“str”).setMaster(“local”)valsc =newSparkContext(conf)valsqlContext =newSQLContext(sc)//获取数据并切分vallines = sc.textFile(“hdfs://…”).map(_.split(“,”))//指定schema信息StructType{List(StructField(“id”,IntegerType,false),StructField(“name”,StringType,true),StructField(“age”,IntegerType,true),StructField(“fv”,IntegerType,true),)}//开始映射valrowRDD: RDD[Row] = lines.map(arr =>Row(arr(0).toInt,arr(1),arr(2).toInt,arr(3).toInt))//把RDD转换为DataFramevalpersonDF: DataFrame = sqlContext.createDataFrame(rowRDD,schema)//生成临时表personDF.registerTempTable(“t_person”)valsql =“select name,age,fv from t_person where age >30 order by age desc”valres = sqlContext.sql(sql)res.write.mode(“append”).json(“c://out-20180903-1”)sc.stop()}}=================================================================1.数据源1.1.JDBCSpark SQL可以通过JDBC从关系型数据库中读取数据的方式创建DataFrame,通过对DataFrame一系列的计算后,还可以将数据再写回关系型数据库中。1.1.1.MySQL中加载数据(Spark Shell方式)1.启动Spark Shell,必须指定mysql连接驱动jar包/usr/local/spark-1.6.1-bin-hadoop2.6/bin/spark-shell –master spark://node01:7077 –jars /usr/local/spark-1.6.1-bin-hadoop2.6/mysql-connector-java-5.1.35-bin.jar   (指定MySQL包)–driver-class-path /usr/local/spark-1.6.1-bin-hadoop2.6/mysql-connector-java-5.1.35-bin.jar(指定驱动类)2.从mysql中加载数据val jdbcDF = sqlContext.read.format(“jdbc”).options(Map(“url” -> “jdbc:mysql://node03:3306/bigdata”, “driver” -> “com.mysql.jdbc.Driver”, “dbtable” -> “person”, “user” -> “root”, “password” -> “root”)).load()3.执行查询jdbcDF.show()1.1.2.将数据写入到MySQL中(打jar包方式)packagecom.qf.gp1708.day06importjava.util.Propertiesimportorg.apache.spark.rdd.RDDimportorg.apache.spark.sql.{Row, SQLContext}importorg.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}importorg.apache.spark.{SparkConf, SparkContext}/*** 写入数据到MySQL*/objectInsertData2MySQLDemo {defmain(args: Array[String]): Unit = {valconf =newSparkConf().setAppName(“”).setMaster(“local[2]”)valsc =newSparkContext(conf)valsqlContext =newSQLContext(sc)vallines= sc.textFile(“”).map(_.split(“,”))//生成Schemavalschema = StructType {Array(StructField(“name”, StringType,true),StructField(“age”, IntegerType,true),StructField(“fv”, StringType,true),)}//映射valpersonRDD = lines.map(arr =>Row(arr(1).toString,arr(2).toInt,arr(3).toInt))//生成DataFramevalpersonDF = sqlContext.createDataFrame(personRDD,schema)//生成用于写入MySQL的配置信息valprop =newProperties()prop.put(“user”,“root”)prop.put(“password”,“root”)prop.put(“driver”,“com.mysql.jdbc.Driver”)valjdbcUrl=“jdbc:mysql://hadoop03:3306/bigdata”valtable=“person”//把数据写入MySQLpersonDF.write.mode(“append”).jdbc(jdbcUrl,table,prop)sc.stop()}}/usr/local/spark-1.6.3-bin-hadoop2.6/spark-submit –class com.qf….. –master spark://hadoop01:7077 –executor-memory 512m –total-executor-cores 2 –jars /usr/…/mysql-connector-java-5.1.35-bin.jar –driver-class-path /usr/…/mysql-connector-java-5.1.35-bin.jar /root/1.jar=======================================================kafka:消息中间件(缓存数据)—解耦  为处理实时数据提供一个统一、高吞吐量、低等待的平台  3、为什么需要消息队列(重要、了解)  消息系统的核心作用就是三点:解耦,异步和并行  Kafka对消息保存时根据Topic进行归类  Topic:底层就是队列,将不同的消息放在不同的队列中进行分类上述就是小编为大家分享的大数据SparkSQl指的是什么呢了,如果刚好有类似的疑惑,不妨参照上述分析进行理解。如果想知道更多相关知识,欢迎关注开发云行业资讯频道。

相关推荐: Scala怎么使用

本篇内容主要讲解“Scala怎么使用”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“Scala怎么使用”吧!scala语言,从词法上就与Java语言不同。支持完全符号作为命名,而且被命名的东西,不受任何限制。可以是方…

免责声明:本站发布的图片视频文字,以转载和分享为主,文章观点不代表本站立场,本站不承担相关法律责任;如果涉及侵权请联系邮箱:360163164@qq.com举报,并提供相关证据,经查实将立刻删除涉嫌侵权内容。

(0)
打赏 微信扫一扫 微信扫一扫
上一篇 09/23 19:05
下一篇 09/23 19:05

相关推荐