标签分类
技术文章
当前位置:主页 > 计算机编程 > java > Spark Streaming算子开发实例

Spark Streaming算子开发代码讲解

  • 发布时间:
  • 作者:码农之家原创
  • 点击:173

Spark Streaming算子开发实例

这篇文章主要知识点是关于Spark,Streaming算子,Spark,Streaming,Spark Streaming算子开发实例,Spark学习笔记Spark Streaming的使用 的内容,如果大家想对相关知识点有系统深入的学习,可以参阅以下电子书

Hadoop+Spark生态系统操作与实战指南
  • 类型:大数据技术大小:114.9 MB格式:PDF出版:清华大学出版社作者:余辉
立即下载

更多相关的学习资源可以参阅 程序设计电子书Java电子书、等栏目。

Spark Streaming算子开发实例

transform算子开发

transform操作应用在DStream上时,可以用于执行任意的RDD到RDD的转换操作,还可以用于实现DStream API中所没有提供的操作,比如说,DStreamAPI中并没有提供将一个DStream中的每个batch,与一个特定的RDD进行join的操作,DStream中的join算子只能join其他DStream,但是我们自己就可以使用transform操作来实现该功能。

实例:黑名单用户实时过滤

package StreamingDemo

import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
 * 实时黑名单过滤
 */
object TransformDemo {
 def main(args: Array[String]): Unit = {
  //设置日志级别
  Logger.getLogger("org").setLevel(Level.WARN)
  val conf = new SparkConf()
   .setAppName(this.getClass.getSimpleName)
   .setMaster("local[2]")
  val ssc = new StreamingContext(conf, Seconds(2))

  //创建一个黑名单的RDD
  val blackRDD =
   ssc.sparkContext.parallelize(Array(("zs", true), ("lisi", true)))

  //通过socket从nc中获取数据
  val linesDStream = ssc.socketTextStream("Hadoop01", 6666)

  /**
   * 过滤黑名单用户发言
   * zs sb sb sb sb
   * lisi fuck fuck fuck
   * jack hello
   */
  linesDStream
   .map(x => {
    val info = x.split(" ")
    (info(0), info.toList.tail.mkString(" "))
   })
   .transform(rdd => { //transform是一个RDD->RDD的操作,所以返回值必须是RDD
    /**
     * 经过leftouterjoin操作之后,产生的结果如下:
     * (zs,(sb sb sb sb),Some(true)))
     * (lisi,(fuck fuck fuck),some(true)))
     * (jack,(hello,None))
     */
    val joinRDD = rdd.leftOuterJoin(blackRDD)

    //如果是Some(true)的,说明就是黑名单用户,如果是None的,说明不在黑名单内,把非黑名单的用户保留下来
    val filterRDD = joinRDD.filter(x => x._2._2.isEmpty)

    filterRDD
   })
   .map(x=>(x._1,x._2._1)).print()

  ssc.start()
  ssc.awaitTermination()
 }
}

测试

启动nc,传入用户及其发言信息

Spark Streaming算子开发实例

可以看到程序实时的过滤掉了在黑名单里的用户发言

Spark Streaming算子开发实例

updateStateByKey算子开发

updateStateByKey算子可以保持任意状态,同时不断有新的信息进行更新,这个算子可以为每个key维护一份state,并持续不断的更新state。对于每个batch来说,Spark都会为每个之前已经存在的key去应用一次State更新函数,无论这个key在batch中是否有新的值,如果State更新函数返回的值是none,那么这个key对应的state就会被删除;对于新出现的key也会执行state更新函数。

要使用该算子,必须进行两个步骤

  • 定义state——state可以是任意的数据类型
  • 定义state更新函数——用一个函数指定如何使用之前的状态,以及从输入流中获取新值更新状态

注意:updateStateByKey操作,要求必须开启Checkpoint机制

实例:基于缓存的实时WordCount

package StreamingDemo

import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
 * 基于缓存的实时WordCount,在全局范围内统计单词出现次数
 */
object UpdateStateByKeyDemo {
 def main(args: Array[String]): Unit = {
  //设置日志级别
  Logger.getLogger("org").setLevel(Level.WARN)

  /**
   * 如果没有启用安全认证或者从Kerberos获取的用户为null,那么获取HADOOP_USER_NAME环境变量,
   * 并将它的值作为Hadoop执行用户设置hadoop username
   * 这里实验了一下在没有启用安全认证的情况下,就算不显式添加,也会自动获取我的用户名
   */
  //System.setProperty("HADOOP_USER_NAME","Setsuna")

  val conf = new SparkConf()
   .setAppName(this.getClass.getSimpleName)
   .setMaster("local[2]")
  val ssc = new StreamingContext(conf, Seconds(2))

  //设置Checkpoint存放的路径
  ssc.checkpoint("hdfs://Hadoop01:9000/checkpoint")

  //创建输入DStream
  val lineDStream = ssc.socketTextStream("Hadoop01", 6666)
  val wordDStream = lineDStream.flatMap(_.split(" "))
  val pairsDStream = wordDStream.map((_, 1))

  /**
   * state:代表之前的状态值
   * values:代表当前batch中key对应的values值
   */
  val resultDStream =
   pairsDStream.updateStateByKey((values: Seq[Int], state: Option[Int]) => {

    //当state为none,表示没有对这个单词做统计,则返回0值给计数器count
    var count = state.getOrElse(0)

    //遍历values,累加新出现的单词的value值
    for (value <- values) {
     count += value
    }

    //返回key对应的新state,即单词的出现次数
    Option(count)
   })

  //在控制台输出
  resultDStream.print()

  ssc.start()
  ssc.awaitTermination()
 }
}

测试

开启nc,输入单词

Spark Streaming算子开发实例

控制台实时输出的结果

Spark Streaming算子开发实例

window滑动窗口算子开发

Spark Streaming提供了滑动窗口操作的支持,可以对一个滑动窗口内的数据执行计算操作
在滑动窗口中,包含批处理间隔、窗口间隔、滑动间隔

  • 对于窗口操作而言,在其窗口内部会有N个批处理数据
  • 批处理数据的大小由窗口间隔决定,而窗口间隔指的就是窗口的持续时间,也就是窗口的长度
  • 滑动时间间隔指的是经过多长时间窗口滑动一次,形成新的窗口,滑动间隔默认情况下和批处理时间间隔的相同

注意:滑动时间间隔和窗口时间间隔的大小一定得设置为批处理间隔的整数倍

用一个官方的图来作为说明

Spark Streaming算子开发实例

批处理间隔是1个时间单位,窗口间隔是3个时间单位,滑动间隔是2个时间单位。对于初始的窗口time1-time3,只有窗口间隔满足了才触发数据的处理。所以滑动窗口操作都必须指定两个参数,窗口长度和滑动时间间隔。在Spark Streaming中对滑动窗口的支持是比Storm更加完善的。

Window滑动算子操作

 

算子 描述
window() 对每个滑动窗口的数据执行自定义的计算
countByWindow() 对每个滑动窗口的数据执行count操作
reduceByWindow() 对每个滑动窗口的数据执行reduce操作
reduceByKeyAndWindow() 对每个滑动窗口的数据执行reduceByKey操作
countByValueAndWindow() 对每个滑动窗口的数据执行countByValue操作

 

reduceByKeyAndWindow算子开发

实例:在线热点搜索词实时滑动统计

每隔2秒钟,统计最近5秒钟的搜索词中排名最靠前的3个搜索词以及出现次数

package StreamingDemo

import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
 * 需求:每隔2秒钟,统计最近5秒钟的搜索词中排名最靠前的3个搜索词以及出现次数
 */
object ReduceByKeyAndWindowDemo {
 def main(args: Array[String]): Unit = {

  //设置日志级别
  Logger.getLogger("org").setLevel(Level.WARN)
  //基础配置
  val conf = new SparkConf()
   .setAppName(this.getClass.getSimpleName)
   .setMaster("local[2]")

  //批处理间隔设置为1s
  val ssc = new StreamingContext(conf, Seconds(1))

  val linesDStream = ssc.socketTextStream("Hadoop01", 6666)
  linesDStream
   .flatMap(_.split(" ")) //根据空格来做分词
   .map((_, 1)) //返回(word,1)
   .reduceByKeyAndWindow(
    //定义窗口如何计算的函数
    //x代表的是聚合后的结果,y代表的是这个Key对应的下一个需要聚合的值
    (x: Int, y: Int) => x + y,
    //窗口长度为5秒
    Seconds(5),
    //窗口时间间隔为2秒
    Seconds(2)
   )
   .transform(rdd => { //transform算子对rdd做处理,转换为另一个rdd
    //根据Key的出现次数来进行排序,然后降序排列,获取最靠前的3个搜索词
    val info: Array[(String, Int)] = rdd.sortBy(_._2, false).take(3)
    //将Array转换为resultRDD
    val resultRDD = ssc.sparkContext.parallelize(info)
    resultRDD
   })
   .map(x => s"${x._1}出现的次数是:${x._2}")
   .print()

  ssc.start()
  ssc.awaitTermination()

 }
}

测试结果

Spark Streaming算子开发实例

DStream Output操作概览

Spark Streaming允许DStream的数据输出到外部系统,DSteram中的所有计算,都是由output操作触发的,foreachRDD输出操作,也必须在里面对RDD执行action操作,才能触发对每一个batch的计算逻辑。

 

转换 描述
print() 在Driver中打印出DStream中数据的前10个元素。主要用于测试,或者是不需要执行什么output操作时,用于简单触发一下job。
saveAsTextFiles(prefix,
[suffix])
将DStream中的内容以文本的形式保存为文本文件,其中每次批处理间隔内产生的文件以prefix-TIME_IN_MS[.suffix]的方式命名。
saveAsObjectFiles(prefix
, [suffix])
将DStream中的内容按对象序列化并且以SequenceFile的格式保存。其中每次批处理间隔内产生的文件以prefix-TIME_IN_MS[.suffix]的方式命名。
saveAsHadoopFiles(pref
ix, [suffix])
将DStream中的内容以文本的形式保存为Hadoop文件,其中每次批处理间隔内产生的文件
以prefix-TIME_IN_MS[.suffix]的方式命名。
foreachRDD(func) 最基本的输出操作,将func函数应用于DStream中的RDD上,这个操作会输出数据到外部系
统,比如保存RDD到文件或者网络数据库等。需要注意的是func函数是在运行该streaming
应用的Driver进程里执行的。

 

foreachRDD算子开发

foreachRDD是最常用的output操作,可以遍历DStream中的每个产生的RDD并进行处理,然后将每个RDD中的数据写入外部存储,如文件、数据库、缓存等,通常在其中针对RDD执行action操作,比如foreach

使用foreachRDD操作数据库

通常在foreachRDD中都会创建一个Connection,比如JDBC Connection,然后通过Connection将数据写入外部存储

误区一:在RDD的foreach操作外部创建Connection

dstream.foreachRDD { rdd =>
  val connection=createNewConnection()
  rdd.foreach { record => connection.send(record)
  }
}

这种方式是错误的,这样的方式会导致Connection对象被序列化后被传输到每一个task上,但是Connection对象是不支持序列化的,所以也就无法被传输

误区二:在RDD的foreach操作内部创建Connection

dstream.foreachRDD { rdd =>
  rdd.foreach { record =>
    val connection = createNewConnection()
    connection.send(record)
    connection.close()
  }
}

这种方式虽然是可以的,但是执行效率会很低,因为它会导致对RDD中的每一条数据都创建一个Connection对象,通常Connection对象的创建都是很消耗性能的

合理的方式

  • 第一种:使用RDD的foreachPartition操作,并且在该操作内部创建Connection对象,这样就相当于为RDD的每个partition创建一个Connection对象,节省了很多资源
  • 第二种:自己手动封装一个静态连接池,使用RDD的foreachPartition操作,并且在该操作内部从静态连接池中,通过静态方法获取到一个连接,连接使用完之后再放回连接池中。这样的话,可以在多个RDD的partition之间复用连接了

实例:实时全局统计WordCount,并将结果保存到MySQL数据库中

MySQL数据库建表语句如下

CREATE TABLE wordcount (
  word varchar(100) CHARACTER SET utf8 NOT NULL,
  count int(10) NOT NULL,
  PRIMARY KEY (word)
) ENGINE=InnoDB DEFAULT CHARSET=latin1;

在IDEA中添加mysql-connector-java-5.1.40-bin.jar

Spark Streaming算子开发实例

代码如下

连接池的代码,其实一开始有想过用静态块来写个池子直接获取,但是如果考虑到池子宽度不够用的问题,这样的方式其实更好,一开始,实例化一个连接池出来,被调用获取连接,当连接全部都被获取了的时候,池子空了,就再实例化一个池子出来

package StreamingDemo

import java.sql.{Connection, DriverManager, SQLException}
import java.util

object JDBCManager {
 var connectionQue: java.util.LinkedList[Connection] = null

 /**
  * 从数据库连接池中获取连接对象
  * @return
  */
 def getConnection(): Connection = {
  synchronized({
   try {
    //如果连接池是空的,那么就实例化一个Connection类型的链表
    if (connectionQue == null) {
     connectionQue = new util.LinkedList[Connection]()
     for (i <- 0 until (10)) {
      //生成10个连接,并配置相关信息
      val connection = DriverManager.getConnection(
       "jdbc:mysql://Hadoop01:3306/test?characterEncoding=utf-8",
       "root",
       "root")
      //将连接push进连接池
      connectionQue.push(connection)
     }
    }
   } catch {
    //捕获异常并输出
    case e: SQLException => e.printStackTrace()
   }
   //如果连接池不为空,则返回表头元素,并将它在链表里删除
   return connectionQue.poll()
  })
 }

 /**
  * 当连接对象用完后,需要调用这个方法归还连接
  * @param connection
  */
 def returnConnection(connection: Connection) = {
  //插入元素
  connectionQue.push(connection)
 }

 def main(args: Array[String]): Unit = {
  //main方法测试
  getConnection()
  println(connectionQue.size())
 }
}

wordcount代码

package StreamingDemo

import org.apache.log4j.{Level, Logger}
import org.apache.spark.{SparkConf, streaming}
import org.apache.spark.streaming.{Seconds, StreamingContext}

object ForeachRDDDemo {
 def main(args: Array[String]): Unit = {
  //设置日志级别,避免INFO信息过多
  Logger.getLogger("org").setLevel(Level.WARN)

  //设置Hadoop的用户,不加也可以
  System.setProperty("HADOOP_USER_NAME", "Setsuna")

  //Spark基本配置
  val conf = new SparkConf()
   .setAppName(this.getClass.getSimpleName)
   .setMaster("local[2]")
  val ssc = new StreamingContext(conf, streaming.Seconds(2))

  //因为要使用updateStateByKey,所以需要使用checkpoint
  ssc.checkpoint("hdfs://Hadoop01:9000/checkpoint")

  //设置socket,跟nc配置的一样
  val linesDStream = ssc.socketTextStream("Hadoop01", 6666)
  val wordCountDStream = linesDStream
   .flatMap(_.split(" "))   //根据空格做分词
   .map((_, 1)) //生成(word,1)
   .updateStateByKey((values: Seq[Int], state: Option[Int]) => {
    //实时更新状态信息
    var count = state.getOrElse(0)
    for (value <- values) {
     count += value
    }
    Option(count)
   })

  wordCountDStream.foreachRDD(rdd => {
   if (!rdd.isEmpty()) {
    rdd.foreachPartition(part => {
     //从连接池中获取连接
     val connection = JDBCManager.getConnection()
     part.foreach(data => {
      val sql = //往wordcount表中插入wordcount信息,on duplicate key update子句是有则更新无则插入
       s"insert into wordcount (word,count) " +
        s"values ('${data._1}',${data._2}) on duplicate key update count=${data._2}"
      //使用prepareStatement来使用sql语句
      val pstmt = connection.prepareStatement(sql)
      pstmt.executeUpdate()
     })
     //在连接处提交完数据后,归还连接到连接池
     JDBCManager.returnConnection(connection)
    })
   }
  })

  ssc.start()
  ssc.awaitTermination()
 }
}

打开nc,输入数据

Spark Streaming算子开发实例

在另一个终端对wordcount的结果进行查询,可以发现是实时发生变化的

Spark Streaming算子开发实例

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持码农之家。

Spark学习笔记Spark Streaming的使用

1. Spark Streaming

  • Spark Streaming是一个基于Spark Core之上的实时计算框架,可以从很多数据源消费数据并对数据进行处理
  • Spark Streaing中有一个最基本的抽象叫DStream(代理),本质上就是一系列连续的RDD,DStream其实就是对RDD的封装
  • DStream可以认为是一个RDD的工厂,该DStream里面生产都是相同业务逻辑的RDD,只不过是RDD里面要读取数据的不相同
  • 在一个批次的处理时间间隔里, DStream只产生一个RDD
  • DStream就相当于一个"模板", 我们可以根据这个"模板"来处理一段时间间隔之内产生的这个rdd,以此为依据来构建rdd的DAG

2. 当下比较流行的实时计算引擎

吞吐量 编程语言 处理速度 生态

Storm 较低 clojure 非常快(亚秒) 阿里(JStorm)

Flink 较高 scala 较快(亚秒) 国内使用较少

Spark Streaming 非常高 scala 快(毫秒) 完善的生态圈

3. Spark Streaming处理网络数据

//创建StreamingContext 至少要有两个线程 一个线程用于接收数据 一个线程用于处理数据
val conf = new SparkConf().setAppName("Ops1").setMaster("local[2]")
val ssc = new StreamingContext(conf, Milliseconds(3000))
val receiverDS: ReceiverInputDStream[String] = ssc.socketTextStream("uplooking01", 44444)
val pairRetDS: DStream[(String, Int)] = receiverDS.flatMap(_.split(",")).map((_, 1)).reduceByKey(_ + _)
pairRetDS.print()
//开启流计算
ssc.start()
//优雅的关闭
ssc.awaitTermination()

4. Spark Streaming接收数据的两种方式(Kafka)

Receiver

  • 偏移量是由zookeeper来维护的
  • 使用的是Kafka高级的API(消费者的API)
  • 编程简单
  • 效率低(为了保证数据的安全性,会开启WAL)
  • kafka0.10的版本中已经彻底弃用Receiver了
  • 生产环境一般不会使用这种方式

Direct

  • 偏移量是有我们来手动维护
  • 效率高(我们直接把spark streaming接入到kafka的分区中了)
  • 编程比较复杂
  • 生产环境一般使用这种方式

5. Spark Streaming整合Kafka

基于Receiver的方式整合kafka(生产环境不建议使用,在0.10中已经移除了)

//创建StreamingContext 至少要有两个线程 一个线程用于接收数据 一个线程用于处理数据
val conf = new SparkConf().setAppName("Ops1").setMaster("local[2]")
val ssc = new StreamingContext(conf, Milliseconds(3000))
val zkQuorum = "uplooking03:2181,uplooking04:2181,uplooking05:2181"
val groupId = "myid"
val topics = Map("hadoop" -> 3)
val receiverDS: ReceiverInputDStream[(String, String)] = KafkaUtils.createStream(ssc, zkQuorum, groupId, topics)
receiverDS.flatMap(_._2.split(" ")).map((_,1)).reduceByKey(_+_).print()
ssc.start()
ssc.awaitTermination()

基于Direct的方式(生产环境使用)

//创建StreamingContext 至少要有两个线程 一个线程用于接收数据 一个线程用于处理数据
val conf = new SparkConf().setAppName("Ops1").setMaster("local[2]")
val ssc = new StreamingContext(conf, Milliseconds(3000))
val kafkaParams = Map("metadata.broker.list" -> "uplooking03:9092,uplooking04:9092,uplooking05:9092")
val topics = Set("hadoop")
val inputDS: InputDStream[(String, String)] = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)
inputDS.flatMap(_._2.split(" ")).map((_, 1)).reduceByKey(_ + _).print()
ssc.start()
ssc.awaitTermination()

6. 实时流计算的架构

Spark学习笔记Spark Streaming的使用

1. 生成日志(模拟用户访问web应用的日志)

public class GenerateAccessLog {
  public static void main(String[] args) throws IOException, InterruptedException {
    //准备数据
    int[] ips = {123, 18, 123, 112, 181, 16, 172, 183, 190, 191, 196, 120};
    String[] requesTypes = {"GET", "POST"};
    String[] cursors = {"/vip/112", "/vip/113", "/vip/114", "/vip/115", "/vip/116", "/vip/117", "/vip/118", "/vip/119", "/vip/120", "/vip/121", "/free/210", "/free/211", "/free/212", "/free/213", "/free/214", "/company/312", "/company/313", "/company/314", "/company/315"};

    String[] courseNames = {"大数据", "python", "java", "c++", "c", "scala", "android", "spark", "hadoop", "redis"};
    String[] references = {"www.baidu.com/", "www.sougou.com/", "www.sou.com/", "www.google.com"};
    FileWriter fw = new FileWriter(args[0]);
    PrintWriter printWriter = new PrintWriter(fw);
    while (true) {
      //      Thread.sleep(1000);
      //产生字段
      String date = new Date().toLocaleString();
      String method = requesTypes[getRandomNum(0, requesTypes.length)];
      String url = "/cursor" + cursors[getRandomNum(0, cursors.length)];
      String HTTPVERSION = "HTTP/1.1";
      String ip = ips[getRandomNum(0, ips.length)] + "." + ips[getRandomNum(0, ips.length)] + "." + ips[getRandomNum(0, ips.length)] + "." + ips[getRandomNum(0, ips.length)];
      String reference = references[getRandomNum(0, references.length)];
      String rowLog = date + " " + method + " " + url + " " + HTTPVERSION + " " + ip + " " + reference;
      printWriter.println(rowLog);
      printWriter.flush();
    }
  }


  //[start,end)
  public static int getRandomNum(int start, int end) {
    int i = new Random().nextInt(end - start) + start;
    return i;
  }
}

2. flume使用avro采集web应用服务器的日志数据

采集命令执行的结果到avro中

# The configuration file needs to define the sources, 
# the channels and the sinks.
# Sources, channels and sinks are defined per agent, 
# in this case called 'agent'
f1.sources = r1
f1.channels = c1
f1.sinks = k1

#define sources
f1.sources.r1.type = exec
f1.sources.r1.command =tail -F /logs/access.log

#define channels
f1.channels.c1.type = memory
f1.channels.c1.capacity = 1000
f1.channels.c1.transactionCapacity = 100

#define sink 采集日志到uplooking03
f1.sinks.k1.type = avro
f1.sinks.k1.hostname = uplooking03
f1.sinks.k1.port = 44444

#bind sources and sink to channel 
f1.sources.r1.channels = c1
f1.sinks.k1.channel = c1
从avro采集到控制台
# The configuration file needs to define the sources, 
# the channels and the sinks.
# Sources, channels and sinks are defined per agent, 
# in this case called 'agent'
f2.sources = r2
f2.channels = c2
f2.sinks = k2

#define sources
f2.sources.r2.type = avro
f2.sources.r2.bind = uplooking03
f2.sources.r2.port = 44444

#define channels
f2.channels.c2.type = memory
f2.channels.c2.capacity = 1000
f2.channels.c2.transactionCapacity = 100

#define sink
f2.sinks.k2.type = logger

#bind sources and sink to channel 
f2.sources.r2.channels = c2
f2.sinks.k2.channel = c2
从avro采集到kafka中
# The configuration file needs to define the sources, 
# the channels and the sinks.
# Sources, channels and sinks are defined per agent, 
# in this case called 'agent'
f2.sources = r2
f2.channels = c2
f2.sinks = k2

#define sources
f2.sources.r2.type = avro
f2.sources.r2.bind = uplooking03
f2.sources.r2.port = 44444

#define channels
f2.channels.c2.type = memory
f2.channels.c2.capacity = 1000
f2.channels.c2.transactionCapacity = 100

#define sink
f2.sinks.k2.type = org.apache.flume.sink.kafka.KafkaSink
f2.sinks.k2.topic = hadoop
f2.sinks.k2.brokerList = uplooking03:9092,uplooking04:9092,uplooking05:9092
f2.sinks.k2.requiredAcks = 1

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持码农之家。

以上就是本次给大家分享的全部知识点内容总结,大家还可以在下方相关文章里找到vue项目中使用md5加密以及、 解决axios.interceptors.respon、 儿童python编程入门书籍推、 等java文章进一步学习,感谢大家的阅读和支持。

上一篇:Scala操作Redis用连接池工具类RedisUtil详解

下一篇:Java8中Lambda表达式使用和Stream API知识点详解

展开 +

收起 -

学习笔记
网友NO.572274

spark dataframe 将一列展开,把该列所有值都变成新列的方法

The original dataframe 需求:hour代表一天的24小时,现在要将hour列展开,每一个小时都作为一个列 实现: val pivots = beijingGeoHourPopAfterDrop.groupBy("geoHash").pivot("hour").sum("countGeoPerHour").na.fill(0) 并且统计了对应的countGeoPerHour的和,如果有些行没有这个新列对应的数据,将用null填充 The new dataframe 以上这篇spark dataframe 将一列展开,把该列所有值都变成新列的方法就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持码农之家。 ……

网友NO.245678

java-spark中各种常用算子的写法示例

Spark的算子的分类 从大方向来说,Spark 算子大致可以分为以下两类: 1)Transformation 变换/转换算子:这种变换并不触发提交作业,完成作业中间过程处理。 Transformation 操作是延迟计算的,也就是说从一个RDD 转换生成另一个 RDD 的转换操作不是马上执行,需要等到有 Action 操作的时候才会真正触发运算。 2)Action 行动算子:这类算子会触发 SparkContext 提交 Job 作业。 Action 算子会触发 Spark 提交作业(Job),并将数据输出 Spark系统。 从小方向来说,Spark 算子大致可以分为以下三类: 1)Value数据类型的Transformation算子,这种变换并不触发提交作业,针对处理的数据项是Value型的数据。 2)Key-Value数据类型的Transfromation算子,这种变换并不触发提交作业,针对处理的数据项是Key-Value型的数据对。 3)Action算子,这类算子会触发SparkContext提交Job作业。 引言 通常写spark的程序用scala比较方便,毕竟spark的源码就是用scala写的。然而,目前java开发者特别多,尤其进行数据对接、上线服务的时候,这时候,就需要掌握一些spark在java中的使用方法了 一、map map在进行数据处理、转换的时候,不能更常用了 在使用map之前 首先要定义一个转换的函数 格式如下: FunctionString, LabeledPoint transForm = new FunctionString, LabeledPoint() {//String是某一行的输入类型 Label……

网友NO.299017

Spark MLlib随机梯度下降法概述与实例

机器学习算法中回归算法有很多,例如神经网络回归算法、蚁群回归算法,支持向量机回归算法等,其中也包括本篇文章要讲述的梯度下降算法,本篇文章将主要讲解其基本原理以及基于Spark MLlib进行实例示范,不足之处请多多指教。 梯度下降算法包含多种不同的算法,有批量梯度算法,随机梯度算法,折中梯度算法等等。对于随机梯度下降算法而言,它通过不停的判断和选择当前目标下最优的路径,从而能够在最短路径下达到最优的结果。我们可以在一个人下山坡为例,想要更快的到达山低,最简单的办法就是在当前位置沿着最陡峭的方向下山,到另一个位置后接着上面的方式依旧寻找最陡峭的方向走,这样每走一步就停下来观察最下路线的方法就是随机梯度下降算法的本质。 随机梯度下降算法理论基础 在线性回归中,我们给出回归方程,如下所示: 我们知道,对于最小二乘法要想求得最优变量就要使得计算值与实际值的偏差的平方最小。而随机梯度下降算法对于系数需要通过不断的求偏导求解出当前位置下最优化的数据,那么梯度方向公式推导如下公式,公式中的θ会向着梯度下降最快的方向减少,从而推断出θ的最优解。 因此随机梯度下降法的公式归结为通过迭代计算特征值从而求出最合适的值。θ的求解公式如……

<
1
>

Copyright 2018-2019 xz577.com 码农之家

版权责任说明