关于一个spark项目的详细拆分解析
.
Spark项目详细解析(该项目并未上传,系广汽新能源相关项目
PS:
- 关于这个功能中涉及到的一些知识,其实并没有太过复杂,
- kafka方面使用的是scala基础的kafka交互模块(org.apache.kafka)
- 数据库使用的java.sql直接编写sql语句进行数据库数据写入
- 逻辑模块更简单,基础的一些类型操作,for循环使用,if条件判断使用
- 比较好的一点是各个功能是分开独立的,方便与后期对功能判断条件的一些细微调整
- 相应的注释我都写在了代码之中,进行了一些简单的标注
- spark相关配置- pom中配置较少,这边需要导入的只有一个mysql-connector-java的包 - 1 
 2
 3
 4
 5
 6
 7
 8- <dependencies> 
 <dependency>
 
 <groupId>mysql</groupId>
 <artifactId>mysql-connector-java</artifactId>
 <version>8.0.25</version>
 </dependency>
 </dependencies>
- 这次写的时候使用的环境是scala-2.12.12,spark-3.1.1(如果需要使用其他版本的spark的话,对应scala版本需要根据官网提供的文档进行选择,必须使用要求的环境版本,否则在一些模块的调用上将会出现问题) 
- kafka与zookeeper环境没有什么要求,能用就行 
 
- 代码部分 - 首先要做的是构建kafka数据消费函数, 用于从kafka中消费数据并把这些数据拿过来进行处理 
- 直接上代码吧 - 1 
 2
 3
 4
 5
 6
 7
 8
 9
 10- val prop = new Properties 
 prop.put("bootstrap.servers", "localhost:9092")
 prop.put("group.id", "group01")
 prop.put("auto.offset.reset", "earliest")
 prop.put("key.deserializer",Class.forName("org.apache.kafka.common.serialization.StringDeserializer"))
 prop.put("value.deserializer",Class.forName("org.apache.kafka.common.serialization.StringDeserializer"))
 prop.put("enable.auto.commit", "true")
 prop.put("session.timeout.ms", "60000")
 val kafkaConsumer = new KafkaConsumer[String,String](prop)
 kafkaConsumer.subscribe(Collections.singletonList("test"))
- 详细解读 - new properties 首先我们需要构建一个变量用来存储我们的配置属性, 这里配置基本使用的是默认配置需要修改的地方也不多,建议根据自身需要修改一下
- bootstrap.server,这个属性对应是我们后台搭建好的kafka的地址,可以使用本地路径,也可以使用外网路径,主要看自身的kafka配置情况,酌情使用
- group.id 这个属性并不会有太大的影响,依然自行决定是否使用
- session.timeout.ms 设置连接超时时间
- new KafkaConsumer 创建一个新的kafka连接,需要传入我们准备好的配置属性(如上述代码)
- kafkaConsumer.subscribe(Collections.singletonList(“test”)) test为我们接下来要访问的topic的名称,这里我使用的是自己本机创建的一个名为test的topic,实际使用过程中根据实际情况来进行调整
 
- 继续上代码 - 1 
 2
 3
 4
 5
 6
 7
 8
 9
 10- while(true){ 
 val msgs:ConsumerRecords[String, String] = kafkaConsumer.poll(3000)
 val it = msgs.iterator()
 while (it.hasNext) {
 // 读取kafka中保存的数据
 val msg = it.next().value()
 val vSome = JSON.parseFull(msg)
 val vMap = vSome.get.asInstanceOf[Map[String,String]]
 }
 }
- 通过while循环来构建一个时刻执行的任务,来确保在获取到kafka数据的时候能够进行对应的处理,无数据产生时将会自动等待 
- kafkaConsumer.poll(3000) 将最新接收到的kafka数据抽出来,3000是设定超时时间timeout 
- msgs.iterator 将获取到的数据转化为迭代器来进行下一步的迭代 
- hasNext 用于查看是否还有数据, 还有数据则继续循环,无数据则退出内层循环进入外层循环 
- 使用next()来迭代一条数据,我们要使用的数据包含在这个数据中的value中 
- 使用JSON.parseFull方法来将我们拿到的数据转化为一个map类型的数据,这个数据目前还不能直接用,因为它自动的在map外面套了一层some来进行包装 
- 所以我们还需要对这个some数据进行处理转化成一个map类型的数据 
- map类型的数据有点像字典,不过它的表现形式并不是python字典中的键值对(key:value)形式,而是类似于指针一样的表现形式,(key -> value) 
- 一个map类型的数据在定义的时候需要设定一下key和value的数据类型 
- 如上述代码中, 我设定是数据类型为[String, String],也就是key,value 的类型全部为string类型 
- 完成这一步之后,我们就可以对这些数据来进行逻辑运算,书写我们之前设定好的规则了 
 
- 考虑到这部分代码如果拿过来的话会比较多,所以就不进行完整的展示了,我会拿一种一部分来进行一个展示,因为这部分逻辑判断中,绝大部分都是这一小部分进行服用得到的,所以也没必要去全部看一遍 - 1 
 2
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81- // 考虑到这部分相对来说比较多,就不写在外面了,直接在里面通过注释的形式来书写 
 def AEB_Computational_logic(data:Map[String,String]): Map[String,String] = {
 // 构建一个list来存储这个功能涉及到的一些信号名,有些功能需要用到多个信号来进行判断,所以这里以嵌套List的方式来进行构建
 val signal_list:List[Any] = List(
 List("SAS_SteeringAngle", "SAS_SteeringAngleSpd"),
 "EMS_GasPedalActPstforMRR",
 "MRR_AEBOffSt",
 List("BCS_VDCActiveSt", "BCS_VDCOff"),
 List("BCS_TCSActiveSt", "BCS_TCSOff"),
 "BCS_HDCCtrlSt",
 "BCS_VehSpd",
 "MRR_AEBLVehHoldReq",
 "BCS_VehSpd",
 "IFC_CameraBlockageSt",
 "VCU_CrntGearLvl",
 List("ADAS_SensorBlockedSts", "ADAS_SensorFailure")
 )
 // 定义最终返回的一个数据,这个数据同样使用map类型
 var result_data:Map[String,String] = Map()
 // 这个地方的作用是用来接收结果,同时用来作为终止循环的条件,因为我们这里只需要考虑到一个条件出错即可,所以在已经获取到一个结果的情况下,后续的数据就不需要进行处理了
 var cause_of_error = 0
 // 开启循环,读取信号列表中的信号用来获取对应的数据
 for(index <- Range(0,signal_list.length) if cause_of_error == 0) {
 val signal = signal_list.apply(index)
 // 这里根据信号的数据类型来进行区分,正常信号的数据类型为String类型,但有一部分为List类型,所以我们需要先区分出来
 if (signal.isInstanceOf[String]){
 // 从数据流中获取对应信号的值,如果该信号不存在,则不进行处理
 val signal_data:String = data.get(signal.toString).getOrElse("")
 if (signal_data!=""){
 // 对应获取到的数据进行切片转化成一个List类型并进行反转, 这里的反转是因为原定的数据给我们的是最新的在前面,而先获取到的数据在后面,所以我们需要把它进行一个反转操作,确保时间顺序是从左往右的
 val signal_data_list = signal_data.split(',').toList.reverse
 // 根据信号的不同来对数据进行不同的条件判断
 if (signal == "EMS_GasPedalActPstforMRR"){
 // >85%
 // 这里使用的exists方法的作用是对列表中的每一项进行判断,只要有一项满足即返回True,并修改cause_of_error的值为对应的错误原因编号, 该原因编号根据数据库中确定好的规则表来进行填写,当规则表发生重大变动时,则需要来这里进行对应的调整
 if(signal_data_list.exists(x => x.toInt > 85) || (signal_data_list.apply(signal_data_list.length-1).toInt - signal_data_list.apply(0).toInt).abs > 200){
 cause_of_error = 2
 }
 }else if(signal == "MRR_AEBOffSt"){
 if (signal_data_list.exists(x => x == "1")){
 cause_of_error = 3
 }
 }
 }
 }
 // 对多条件的错误原因进行处理
 else{
 // 使用自带的asInstanceOf对格式进行一下转化,因为这边无法自动判断出signal是什么格式,默认是any, 而any格式是不能使用apply方法的
 val this_signale_list:List[String] = signal.asInstanceOf[List[String]]
 val data_1 = data.get(this_signale_list.apply(0).toString).getOrElse("")
 val data_2 = data.get(this_signale_list.apply(1).toString).getOrElse("")
 // 这边如果还是通过信号判断的话可能不太准确,所以这里直接使用索引来进行区分,相较于单个信号,多信号的判断相对来说比较绕,费神一点,不过还好,基本的逻辑和用法都大差不差
 if (index == 0){
 if (data_1 != "" && data_2 != ""){
 val data_3 = data_1.split(',').toList.reverse
 val data_4 = data_2.split(',').toList.reverse
 if(data_3.exists(x => x.toInt > 60) || data_4.exists(x => x.toInt > 150)){
 cause_of_error = 1
 }
 }else if(data_1 != ""){
 val data_3 = data_1.split(',').toList.reverse
 if(data_3.exists(x => x.toInt > 60)){
 cause_of_error = 1
 }
 }else if(data_2 != ""){
 val data_4 = data_2.split(',').toList.reverse
 if(data_4.exists(x => x.toInt > 150)){
 cause_of_error = 1
 }
 }
 }
 }
 }
 // 在完成了上述的判断之后,我们得到了一个cause_of_error,将这个参数和功能编号一起写入到我们之前定义的result_data中去,使用return来返回
 result_data += ("function" -> "1")
 result_data += ("reason" -> cause_of_error.toString)
 return result_data
 }
- 完成了上述部分之后, 基本可以说已经完成了大部分的工作,接下来要做的就是对我们要存储的一些没有涉及到计算的数据进行添加 
 
- 这部分的工作并不多 - 1 
 2
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 // 基础字段处理
 var base_data: Map[String,String] = Map()
 // 车辆vin码
 base_data += ("vin" -> vMap.get("vin").getOrElse(""))
 // 信号录制时间
 base_data += ("time" -> vMap.get("sampleTime").getOrElse(""))
 // 车系
 base_data += ("vehicle_series" -> vMap.get("vehicleSeries").getOrElse(""))
 // 车型
 base_data += ("vehicle_model" -> vMap.get("vehicleModelCode").getOrElse(""))
 // 经度
 base_data += ("TEL_LongitudeDeg" -> vMap.get("vehicleModelCode").getOrElse(""))
 // 纬度
 base_data += ("TEL_LatitudeDeg" -> vMap.get("vehicleModelCode").getOrElse(""))
 // 省份
 //城市
 // 县,区
 //刹车踏板
 base_data += ("brake_pedel_status" -> vMap.get("VCU_EMS_BrkPedalSt").getOrElse(""))
- 基本上就是获取一下这些固定的数据,这里面需要进行处理的数据只有这个经纬度、省市县、刹车踏板这几个数据,处理起来比较轻松 
- 最终我们得到了一个base_data,里面会包含除了id,功能名称,错误原因三个意外的全部字段数据 
- 当然,这部分高德经纬度处理的东西正在研究怎么整,因为深入看了一下代码之后发现kafka传过来的经纬度数据还需要处理,他是分成度分秒来传回来的,需要对这些数据进行拼接之后才能使用 
- 完成这些之后, 我们需要对逻辑函数返回的错误条件与基础的base_data里面的数据加到一起,使用的是map数据相加的方法 ++ 
- 当然, 在++之前记得判断一下返回的错误条件的情况,按照之前的设置, 如果没有出现错误,返回的结果中reason的值为0(这里的0为字符串的0,注意别弄错了) 
 
- 完成数据准备之后就到了我们的数据添加部分,这部分负责将我们的数据添加到数据库表中去 - 首先是构建数据库连接,开头的时候已经说过, 我们用的是最基础的mysql-connetcor-java包进行的连接 - 1 
 2
 3
 4
 5
 6
 7- val url: String = db_url 
 val driver: String = "com.mysql.cj.jdbc.Driver"
 val username: String = "root"
 val password: String = "rbac2021"
 var conn: Connection = null
 conn = DriverManager.getConnection(url, username, password)
 val st = conn.createStatement
- 上述为基础的mysql连接方式,基本上各种语言中的连接都大同小异 
 
- 构建添加数据方法 - 1 
 2
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29- def sql_conn_create(data: Map[String,String]): Unit = { 
 try {
 Class.forName(driver)
 val sql = "" +
 "insert into function_abort_reason" +
 "(" +
 "vin, vehicle_series, vehicle_model, config_vehicle, park_vehicle, abort_time, " +
 "longitude, latitude, province, city, county,function, reason, speed,hrake_pedel_status,related_signals) " +
 "values(" +
 s"${vin}," +
 s"${vehicle_series}," +
 s"${vehicle_model}," +
 s"${config_vehicle}," +
 s"${park_vehicle}," +
 s"${abort_time}, " +
 s"${longitude}, " +
 s"${latitude}, " +
 s"${province}, " +
 s"${city}, " +
 s"${county}, " +
 s"${function}, " +
 s"${reason}, " +
 s"${speed}, " +
 s"${hrake_pedel_status}, " +
 s"${related_signals} " +
 ")"
 val rs = st.executeQuery(sql)
 }
- 感觉这个也没啥好讲的。。。使用格式化输入的方式来写一个sql语句出来,最后使用executeQuery方法添加数据,完事 
 
 
 
PS-1:以上就是关于部分功能的具体逻辑, 后续的测试需要等待经纬度方面的处理代码写好之后才能进行,目前正在看这方面的文章,姑且就这些吧
PS-2: 这部分的代码不算太完善,并没有涉及到spark方面的深度使用,说实话对于这个框架依然很不熟练,基本上写的过程中都是一边写一遍查询遇到的问题
PS-3: 那么就这些吧,有什么有疑问的地方随时找我
关于一个spark项目的详细拆分解析

