泥土巢 - 大数据之Spark https://www.nituchao.com/category/bigdata-spark/ 大数据技术Spark的研究探讨。 Spark创建多种数据格式的DataFrame https://www.nituchao.com/bigdata-spark/spark-create-multi-data-type-dataframe.html 2019-12-25T14:28:00+08:00 假如我们要通过RDD[Row]创建一个包含多个列的DataFrame,重点是列的数据类型可能会包含多个,这时候需要有一点技巧。uiduser_nameageincome1111nituchao21123.0这个DataFrame里包含多个数据类型:uid: Longuser_name: Stringage: Intincome: Double我们可以使用下面的方式来构建:import org.apache.spark.sql.Row import org.apache.spark.sql.types.{DoubleType, IntegerType, LongType, StringType, StructField, StructType} val uidSeq = Seq(1111L) val nameSeq = Seq("nituchao") val ageSeq = Seq(21) val incomeSeq = Seq(123.0) val rowRDD = spark.sparkContext.parallelize(Seq(Row.fromSeq(uidSeq ++ userNameSeq ++ ageSeq ++ incomeSeq))) val schema = StructType(Seq(StructField("uid", LongType, nullable = true), StructField("name", StringType, nullable = true), StructField("age", IntegerType, nullable = true), StructField("sex", DoubleType, nullable = true))) val df = spark.sqlContext.createDataFrame(rowRDD, schema) df.printSchema() df.show()输出:root |-- uid: long (nullable = true) |-- name: string (nullable = true) |-- age: integer (nullable = true) |-- sex: double (nullable = true) +----+---------+---+-----+ | uid|name |age| sex| +----+---------+---+-----+ |1111| nituchao| 21|123.0| +----+---------+---+-----+上面的技巧在于,使用Row.fromSeq()时,不同类型的数据,要用Seq()分别包起来然后++拼接后传进去。因为Seq中的元素必须是同类型的,如直接构造成一个Seq则会自动进行类型转换,多种类型数据不能混用。问题不大,却造成很大困扰。 Scala函数总结 https://www.nituchao.com/bigdata-spark/scala-function-summary.html 2018-08-21T09:29:00+08:00 在Scala里,带有其他函数做参数的函数叫做高阶函数,使用高阶函数可以简化代码。匿名函数Scala中定义匿名函数的语法很简单,箭头左边是参数列表,右边是函数体,参数的类型是可省略的,Scala的类型推断系统会推测出参数的类型。 使用匿名函数后,我们的代码变得更简洁了。 下面的表达式就定义了一个接受一个Int类型输入参数的匿名函数:var inc = (x:Int) => x + 1上面定义的匿名函数,其实是下面这种写法的简写:def add2 = new Function1(Int, Int) {      def apply(x: Int): Int = x + 1; }以上实例的inc现在可作为一个函数,使用方式如下:var x = inc(7) - 1同样,我们可以在匿名函数中定义多个参数:var mul = (x: Int, y: Int) => x * ymul现在可作为一个函数,使用方式如下:println(mul(3, 4))同样,我们也可以不给匿名函数设置参数,如下所示:var userDir = () => { System.getProperty("user.dir") }userDir现在可作为一个函数,使用方式如下:println( userDir() )高阶函数高阶函数(Higher-Order Function) 就是操作其他函数的函数。 Scala中允许使用高阶函数,高阶函数可以使用其他函数作为参数,或者使用函数作为输出结果。 以下实例中,apply()函数使用了另一个函数f和值v作为参数,而函数f又调用了参数v:> def apply(f: Int => String, v: Int) = f(v) apply: (f: Int => String, v: Int)String > def layout[A](x: A) = "[" + x.toString() + "]" layout: [A](x: A)String > apply(layout, 10) res1: String = [10]在Scala控制台中执行以下代码:> apply()减少重复代码有这样一段代码,查找当前目录样以某一个字符串结尾的文件:object FileMatcher {  private def filesHere = (new java.io.File(".")).listFiles  def filesEnding(query: String) =    for (file <- filesHere; if file.getName.endsWith(query))      yield file }如果,我们想查找包含某一个字符串的文件,则代码需要修改为:def filesContaining(query: String) =  for (file <- filesHere; if file.getName.contains(query))    yield file上面的改动只是使用了 contains 替代 endsWith,但是随着需求越来越复杂,我们要不停地去修改这段代码。例如,我想实现正则匹配的查找,则代码会是下面这个样子:def filesRegex(query: String) =  for (file <- filesHere; if file.getName.matches(query))    yield file为了应变复杂的需求,我们可以进行重构代码,抽象出变化的代码部分,将其声明为一个方法:def filesMatching(query: String,matcher: (String, String) => Boolean) = {  for (file <- filesHere; if matcher(file.getName, query))    yield file }这样,针对不同的需求,我们可以编写不同的matcher方法实现,该方法返回一个布尔值。有了这个新的 filesMatching 帮助方法,你可以通过让三个搜索方法调用它,并传入合适的函数 来简化它们:def filesEnding(query: String) = filesMatching(query, _.endsWith(_)) ​ def filesContaining(query: String) = filesMatching(query, _.contains(_)) ​ def filesRegex(query: String) = filesMatching(query, _.matches(_))上面的例子使用了占位符,例如, filesEnding 方法里的函数文本 .endsWith() 其实就是:(fileName: String, query: String) => fileName.endsWith(query)因为,已经确定了参数类型为字符串,故上面可以省略参数类型。由于第一个参数 fileName 在方法体中被第一个使用,第二个参数 query 第二个使用,你也可以使用占位符语法:_.endsWith(_)。第一个下划线是第一个参数文件名的占位符,第二个下划线是第二个参数查询字串的占位符。因为query参数是从外部传过来的,其可以直接传递给matcher函数,故filesMatching可以只需要一个参数:object FileMatcher {  private def filesHere = (new java.io.File(".")).listFiles ​  private def filesMatching(matcher: String => Boolean) =    for (file <- filesHere; if matcher(file.getName))      yield file ​  def filesEnding(query: String) = filesMatching(_.endsWith(query)) ​  def filesContaining(query: String) = filesMatching(_.contains(query)) ​  def filesRegex(query: String) = filesMatching(_.matches(query)) }上面的例子使用了函数作为第一类值帮助你减少代码重复的方式,另外还演示了闭包是如何能帮助你减少代码重复的。前面一个例子里用到的函数文本,如 _.endsWith(_)和_.contains(_)都是在运行期实例化成函数值而不是闭包,因为它们没有捕 获任何自由变量。举例来说,表达式_.endsWith(_)里用的两个变量都是用下划线代表的,也就是说它们都是从传递给函数的参数获得的。因此,_.endsWith(_)使用了两个绑定变量,而不是自由变量。相对的,最近的例子里面用到的函数文本_.endsWith(query)包含一个绑定变量,下划线代表的参数和一个名为 query的自由变量。仅仅因为 Scala 支持闭包才使得你可以在最近的这个例子里从 filesMatching 中去掉 query 参数,从而更进一步简化了代码。另外一个例子,是循环集合时可以使用exists方法来简化代码。以下是使用了这种方式的方法去判断是否传入的 List 包含了负数的例子:def  containsNeg(nums: List[Int]): Boolean = {    var exists = false    for (num <- nums)        if (num < 0)            exists = true    exists }采用和上面例子同样的方法,我们可以抽象代码,将重要的逻辑抽离到一个独立的方法中去实现。对于上面的查找判断是否存在的逻辑,Scala中提供了高阶函数 exists 来实现,代码如下:def containsNeg(nums: List[Int]) = nums.exists(_ < 0)同样,如果你要查找集合中是否存在偶数,则可以使用下面的代码:def containsOdd(nums: List[Int]) = nums.exists(_ % 2 == 1)递归函数递归函数在函数式编程的语言中起着重要的作用 Scala同样支持递归函数。递归函数意味着函数可以调用它本身。def factorial(n: BigInt): BigInt = {      if (n <= 1) 1      else n * factorial(n - 1) } scala> factorial(10) res48: BigInt = 3628800函数嵌套我们可以在Scala函数内定义函数,定义在函数内的函数称之为局部函数。 以下实例我们实现阶乘运行,并使用内嵌函数:def factorial(i: Int): Int = {      def fact(i: Int, accumulator: Int): Int = {            if (i <= 1) accumulator            else fact(i - 1, i * accumulator)     }     fact(i, 1) } scala> factorial(3) res49: Int = 6闭包闭包是一个函数,返回值依赖于声明在函数外部的一个或多个变量。 闭包通常来讲可以简单的认为是可以访问一个函数里面局部变量的另外一个函数。 如下面这段匿名的函数:val multiplier = (i:Int) => i * 10函数体内有一个变量i,它作为函数的一个参数。如下面的另一段代码:val multiplier = (i:Int) => i * factor在multiplier 中有两个变量:i 和factor。其中的一个i 是函数的形式参数,在multiplier 函数被调用时,i 被赋予一个新的值。然而,factor不是形式参数,而是自由变量,考虑下面代码:var factor = 3   val multiplier = (i:Int) => i * factor这里我们引入一个自由变量factor,这个变量定义在函数外面。 这样定义的函数变量multiplier 成为一个"闭包",因为它引用到函数外面定义的变量,定义这个函数的过程是将这个自由变量捕获而构成一个封闭的函数。完整的例子如下:scala> var factor = 3 factor: Int = 3 ​ scala> val multiplier = (i:Int) => i * factor multiplier: Int => Int = <function1> ​ scala> println("muliplier(1) value = " + multiplier(1)) muliplier(1) value = 3柯里化当函数有多个参数列表时,可以使用柯里化函数来简化代码调用。例如,对下面的函数,它实现两个 Int型参数,x 和 y 的加法:scala> def plainOldSum(x: Int, y: Int) = x + y plainOldSum: (Int,Int)Int ​ scala> plainOldSum(1, 2) res4: Int = 3我们可以将其柯里化,代之以一个列表的两个Int参数,实现如下:scala> def curriedSum(x: Int)(y: Int) = x + y curriedSum: (Int)(Int)Int ​ scala> curriedSum(1)(2) res5: Int = 3当你调用 curriedSum,你实际上背靠背地调用了两个传统函数。第一个函数调 用带单个的名为 x 的 Int 参数,并返回第二个函数的函数值,第二个函数带 Int 参数 y。你可以使用偏函数,填上第一个参数并且部分应用第二个参数。scala> val onePlus = curriedSum(1)_ onePlus: (Int) => Int = <function>curriedSum(1)_里的下划线是第二个参数列表的占位符。结果就是指向一个函数的参考,这个函数在被调用的时候,对它唯一的Int参数加1并返回结果:scala> onePlus(2) res7: Int = 3可变长度参数Scala允许你指明函数的最后一个参数可以是重复的,即我们不需要指定函数参数的个数,可以向函数传入可变长度参数列表。 Scala通过在参数的类型之后放一个星号来设置可变参数(可重复的参数)。例如:def printStrings(args: String*) = {    var i: Int = 0    for (arg <- args) {        println("Arg value[" + i + "] = " + arg)        i = i + i   } } ​ scala> printStrings("Runoob", "Scala", "Python") Arg value[0] = Runoob Arg value[0] = Scala Arg value[0] = Python ​ scala>贷出模式前面的例子提到了使用函数作为参数,我们可以将这个函数的执行结果再次作为参数传入函数,即双倍控制结构:能够重复一个操作两次并返回结果。下面是一个例子:scala> def twice(op: Double => Double, x: Double) = op(op(x)) twice: ((Double) => Double,Double)Double ​ scala> twice(_ + 1, 5) res9: Double = 7.0上面例子中 op 的类型是 Double => Double,就是说它是带一个 Double 做参数并返回另一个 Double 的函数。这里,op函数等同于:def add(x:Int)=x+1op函数会执行两次,第一次是执行add(5)=6,第二次是执行add(add(5))=add(6)=6+1=7。任何时候,你发现你的代码中多个地方有重复的代码块,你就应该考虑把它实现为这种双重控制结构。 考虑这样一种需求:打开一个资源,对它进行操作,然后关闭资源,你可以这样实现:def withPrintWriter(file: File, op: PrintWriter => Unit) {  val writer = new PrintWriter(file)  try {    op(writer) } finally {    writer.close() } }有了这个方法,你就可以这样使用:withPrintWriter(new File("date.txt"), writer => writer.println(new java.util.Date) )注意: 这里和上面的例子一样,使用了=> 来映射式定义函数,其可以看成是没有参数的函数,返回一个匿名函数;调用的时候是调用这个返回的匿名函数。 使用这个方法的好处是,调用这个方法只需要关注如何操作资源,而不用去关心资源的打开和关闭。这个技巧被称为贷出模式:loan pattern,因为该函数要个模板方法一样,实现了资源的打开和关闭,而将使用 PrintWriter 操作资源贷出给函数,交由调用者来实现。例子里的 withPrintWriter 把 PrintWriter 借给函数 op。当函数完成的时候,它发出信号说明它不再需要“借”的资源。于是资源被关闭在 finally 块中,以确信其确实被关闭,而忽略函数是正常结束返回还是抛出了异常。因为,这个函数有两个参数,所以你可以将该函数柯里化:def withPrintWriter(file: File)(op: PrintWriter => Unit) {  val writer = new PrintWriter(file)  try {    op(writer) } finally {    writer.close() } }这样的话,你可以如下方式调用:val file = new File("date.txt") withPrintWriter(file) {    writer => writer.println(new java.util.Date) }这个例子里,第一个参数列表,包含了一个 File 参数,被写成包围在小括号中。第二个参数列表,包含了一个函数参数,被包围在大括号中。当一个函数只有一个参数时,可以使用大括号代替小括号。偏函数Scala 应用函数是一种表达式,你不需要提供函数需要的所有参数,只需要提供部分,或不提供所需要的参数。 如下实例,我们打印日志信息:import java.util.Date ​ object Test {   def main(args: Array[String]) {      val date = new Date      log(date, "message1" )      Thread.sleep(1000)      log(date, "message2" )      Thread.sleep(1000)      log(date, "message3" )   } ​   def log(date: Date, message: String) = {     println(date + "----" + message)   } }执行以上代码,输出结果为:Mon Dec 02 12:52:41 CST 2013----message1 Mon Dec 02 12:52:41 CST 2013----message2 Mon Dec 02 12:52:41 CST 2013----message3实例中,log()方法接受两个参数: data 和message。我们在程序执行时调用了三次,参数date值都相同,message不同。 我们可以使用偏应用函数优化以上方法,绑定第一个date 参数,第二个参数使用下划线(_)替换缺失的参数列表,并把这个新的函数值的索引的赋给变量。以上实例修改如下:import java.util.Date ​ object Test {   def main(args: Array[String]) {      val date = new Date      // ps:此处跟柯里化好相似      val logWithDateBound = log(date, _ : String) ​      logWithDateBound("message1" )      Thread.sleep(1000)      logWithDateBound("message2" )      Thread.sleep(1000)      logWithDateBound("message3" )   } ​   def log(date: Date, message: String) = {     println(date + "----" + message)   } }执行以上代码,输出结果为:Mon Dec 02 12:53:56 CST 2013----message1 Mon Dec 02 12:53:56 CST 2013----message2 Mon Dec 02 12:53:56 CST 2013----message3传名参数 by-name parameterScala的解释器在解析函数参数(function arguments)是有两种方式:传值调用(call-by-value):先计算参数表达式的值,再应用到函数内部。传名调用(call-by-name): 将未计算的参数表达式直接应用到函数内部。在进入函数内部前,传值调用方式就已经将参数表达式的值计算完毕,而传名调用是在函数内部进行参数表达式的值计算的。 这就造成了一种现象,每次使用传名调用时,解释器都会计算一次表达式的值。 《Programming in Scala》的第九章提到了传名参数这个概念。其中举的例子是:实现一个称为myAssert的断言函数,该函数将带一个函数值做输入并参考一个标志位来决定该做什么。如果没有传名参数,你可以这样写myAssert:var assertionsEnabled = true def myAssert(predicate: () => Boolean) =      if (assertionsEnabled && !predicate())          throw new AssertionError这个定义是正确的,但使用它会有点儿难看:myAssert(() => 5 > 3) 你或许很想省略函数文本里的空参数列表和=>符号,写成如下形式:myAssert(5 > 3) // 不会有效,因为缺少() => 传名函数恰好为了实现你的愿望而出现。要实现一个传名函数,要定义参数的类型开始于=>而不是() =>。例如,你可以通过改变其类型() => Boolean为=> Boolean,把myAssert的predicate参数改为传名参数。def byNameAssert(predicate: => Boolean) =      if (assertionsEnabled && !predicate)          throw new AssertionError  现在你可以在需要断言的属性里省略空的参数了。使用byNameAssert的结果看上去就好象使用了内建控制结构:byNameAssert(5 > 3)  传名类型中,空的参数列表()被省略,它仅在参数中被允许。没有什么传名变量或传名字段这样的东西。现在,你或许想知道为什么你不能简化myAssert的编写,使用陈旧的Boolean作为它参数的类型,如:def boolAssert(predicate: Boolean) =      if (assertionsEnabled && !predicate)          throw new AssertionError     当然这种格式同样合法,并且使用这个版本boolAssert的代码看上去仍然与前面的一样:boolAssert(5 > 3)  虽然如此,这两种方式之间存在一个非常重要的差别须指出。因为boolAssert的参数类型是Boolean,在boolAssert(5 > 3)里括号中的表达式先于boolAssert的调用被评估。表达式5 > 3产生true,被传给boolAssert。相对的,因为byNameAssert的predicate参数的类型是=> Boolean,byNameAssert(5 > 3)里括号中的表达式不是先于byNameAssert的调用被评估的。而是代之以先创建一个函数值,其apply方法将评估5 > 3,而这个函数值将被传递给byNameAssert。因此这两种方式之间的差别,在于如果断言被禁用,你会看到boolAssert括号里的表达式的某些副作用,而byNameAssert却没有。例如,如果断言被禁用,boolAssert的例子里尝试对x / 0 == 0的断言将产生一个异常:scala> var assertionsEnabled = false assertionsEnabled: Boolean = false scala> boolAssert(x / 0 == 0)   java.lang.ArithmeticException: / by zero   at .< init>(< console>:8)   at .< clinit>(< console>)   at RequestResult$.< init>(< console>:3)   at RequestResult$.< clinit>(< console>)...  但在byNameAssert的例子里尝试同样代码的断言将不产生异常:scala> byNameAssert(x / 0 == 0) 指定函数参数名称一般情况下函数调用参数,就按照函数定义时的参数顺序一个一个传递。 我们也可以通过指定函数参数名,并且不需要按照顺序向函数传递参数,实例如下:def printInt(a: Int, b: Int) = {      println("value of a : " + a)      println("value of b : " + b)}println(b = 5, a = 7)总结本文主要总结了几种使用Scala高阶函数简化代码的方法,涉及到的知识点有:柯里化、偏函数、函数映射式定义、可变长度参数、贷出模式以及传名参数。需要意识到的是,灵活使用高阶函数可以简化代码,但也可能会增加代码阅读的复杂度。 BlockManagerMaster 和 BlockManager 的过程和源码分析 https://www.nituchao.com/bigdata-spark/blockmanagermaster-source-code.html 2018-05-23T13:24:00+08:00 块管理器BlockManager是Spark存储体系的核心组件。Spark围绕着BlockManager构建了存储模块,包括RDD, Shuffle, Broadcast的存储都使用了BlockManager。Driver Application和Executor都会创建BlockManager,而Driver Appliction还会创建一个BlockManagerMaster负责所有节点上(Driver和Executors)的BlockManager之间的通信协调和管理。BlockManager位于org.apache.spark.storage中,包含四个重要的组件: DiskStore,MemoryStore,Blocktransferservice, ConnectionManager。DiskStore: 负责对磁盘上的数据读写。MemoryStore: 负责内存数据的读写。ConnectionManager: 负责到远程节点的连接。BlockManagerWorker: 负责读写远程节点的数据。当BlockManager启动创建后会向BlockManagerMaster注册,其中BlockManagerMaster位于Driver上,管理者数据的元数据,比如包含了BlockManagerInfo、BlockStatus,当BlockManagerMaster进行了增删改操作,BlockManager会通知BlockManagerMaster,BlockManagerMaster通过BlockManagerInfo内的BlockStatus进行元数据的操作。