今天修改Bug时发现有部分Spark的程序还在使用Date().getHours()这种过期的方法,虽然也能使用但是保不齐那天升级JDK就趟坑了,于是就想把FastDateFormatSimpleDateFormatDateCalendar这些古老的处理日期的组合换成JDK1.8的LocalDateLocalDateTimeDateTimeFormatter这种当下比较推崇的组合。

有了想法后,我就开开心心的改成了如下的代码,我大概简化为如下:

1
2
3
4
5
val standard_fmt = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")
msgRdd.foreach(line => {
val arriveTime = line.split(",")(8)
val dateTime = LocalDateTime.parse(arriveTime, standard_fmt)
})

启动程序后本以为一次性就搞定,结果没想到出现了一大堆异常,为了凑字数,我贴出来给大家瞻仰一下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2094)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:916)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:915)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.RDD.foreach(RDD.scala:915)
at xyz.cco.streaming.dataclean.TripAndALStationStreaming$$anonfun$main$1.apply(TripAndALStationStreaming.scala:110)
at xyz.cco.streaming.dataclean.TripAndALStationStreaming$$anonfun$main$1.apply(TripAndALStationStreaming.scala:95)
at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:627)
at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:627)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
at scala.util.Try$.apply(Try.scala:192)
at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:254)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:254)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:254)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:253)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.NotSerializableException: java.time.format.DateTimeFormatter
Serialization stack:
- object not serializable (class: java.time.format.DateTimeFormatter, value: Value(MonthOfYear,2)Value(DayOfMonth,2)Value(HourOfDay,2)Value(MinuteOfHour,2)Value(SecondOfMinute,2))
- field (class: xyz.cco.streaming.dataclean.TripAndALStationStreaming$$anonfun$main$1$$anonfun$apply$2, name: mdhms_fmt$1, type: class java.time.format.DateTimeFormatter)
- object (class xyz.cco.streaming.dataclean.TripAndALStationStreaming$$anonfun$main$1$$anonfun$apply$2, <function1>)
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295)
... 30 more

这么看来,原来是没办法序列化,看来不能作为全局的变量传入到RDD中,很奇怪为啥SimpleDateFormat就可以。既然不让那就写到RDD的里面,于是代码就变成了如下:

1
2
3
4
5
6

msgRdd.foreach(line => {
val arriveTime = line.split(",")(8)
val standard_fmt = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")
val dateTime = LocalDateTime.parse(arriveTime, standard_fmt)
})

嗯,终于可以运行了,但是很丑陋啊,如果RDD里面有100W条记录岂不是执行100W次DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"),而且也有大量的的duplicated code,IDEA会毫不客气的给你的代码加上屎黄色的波浪线亦或是背景色,看起来也太难看了。于是就把它定义为静态类里面,于是创建DateTimeUtil.scala的伴生对象,代码内容如下:

1
2
3
4
5
6
import java.time.format.DateTimeFormatter

object DateTimeUtil {
val standard_fmt = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")
val ymd_fmt = DateTimeFormatter.ofPattern("yyyy-MM-dd")
}

然后在RDD的外层加入如下代码:

1
import xyz.cco.utils.DateTimeUtil._

那么此时可以执行的代码为:

1
2
3
4
5
import xyz.cco.utils.DateTimeUtil._
msgRdd.foreach(line => {
val arriveTime = line.split(",")(8)
val dateTime = LocalDateTime.parse(arriveTime, standard_fmt)
})

只需要import一次即可。