如果惰性是提高效率之道路,那么并行性则可以被认为是提高效率之航线。如果两个或者多个任务可以按任意顺序序列执行,而又不会对结果的正确性产生任何影响,那么这些任务就可以并行执行。Scala为此提供了多种方式,其中最简单的方式是并行地处理集合中的元素。
这篇笔记是书中一个例子:给定一些城市的名字,从 web 获取对应的天气状况(xml格式),按照城市名字排序展示。因此会用到 url/xml 操作,不过重点我们看下,如何释放一个集合的并行能力。
1. 顺序集合
首先定义一个方法,参数为城市名,返回这个城市的天气状况:
import scala.io.Source
import scala.xml._
def getWeatherData(city: String) = {
val response = Source.fromURL(
s"https://raw.githubusercontent.com/ReactivePlatform/" +
s"Pragmatic-Scala-StaticResources/master/src/main/resources/" +
s"weathers/$city.xml")
val xmlResponse = XML.loadString(response.mkString)
val cityName = (xmlResponse \\ "city" \ "@name").text
val temperature = (xmlResponse \\ "temperature" \ "@value").text
val condition = (xmlResponse \\ "weather" \ "@value").text
(cityName, temperature, condition)
} //> getWeatherData: (city: String)(String, String, String)
// getWeatherData("Houston,us") //> res0: (String, String, String) = (Houston,61,clear sky)
主要就是fromURL
获取 xml 格式数据,然后解析的过程。getWeatherData("Houston,us")
是测试用的例子,打开注释可以看下执行的效果。
然后定义一个方法,打印该返回值:
def printWeatherData(weatherData: (String, String, String)): Unit = {
val (cityName, temperature, condition) = weatherData
println(f"$cityName%-15s $temperature%-6s $condition")
} //> printWeatherData: (weatherData: (String, String, String))Unit
printWeatherData(getWeatherData("Sydney,australia"))
//> Sydney 68 broken clouds
最后一个方法,定义城市名字的列表,逐个获取各个城市天气的情况并打印,记录整个耗时。其中参数是getData
方法。该方法,接收一个城市名字列表,返回对应的一个三元组列表。
def timeSample(getData: List[String] => List[(String, String, String)]): Unit = {
val cities = List("Bangalore,india",
"Berlin,germany",
"Boston,us",
"Brussels,belgium",
"Chicago,us",
"Houston,us",
"Krakow,poland",
"London,uk",
"Minneapolis,us",
"Oslo,norway",
"Reykjavik,iceland",
"Rome,italy",
"Stockholm,sweden",
"Sydney,australia",
"Tromso,norway")
val start = System.nanoTime
getData(cities) sortBy{_._1} foreach printWeatherData
val end = System.nanoTime
println(s"Time taken: ${(end -start)/1.0e9} s")
}
运行一下:
timeSample(cities => cities map getWeatherData)
//> Bangalore 88.57 few clouds
//| Berlin 48.2 mist
//| Boston 45.93 mist
//| Brussels 49.21 clear sky
//| Chicago 31.59 overcast clouds
//| Houston 61 clear sky
//| Krakow 55.4 broken clouds
//| London 50 broken clouds
//| Minneapolis 29.55 clear sky
//| Oslo 42.8 fog
//| Reykjavik 48.06 overcast clouds
//| Rome 54.88 few clouds
//| Stockholm 38.97 clear sky
//| Sydney 68 broken clouds
//| Tromso 35.6 clear sky
//| Time taken: 44.019762313 s
这个理解起来不复杂,就是顺序获取,一共花了 44s。
2. 并行集合加速
前面的例子有两个部分:慢的部分——对于每个城市,我们都通过网络获取并收集天气信息,快的部分——我们对数据进行排序,并显示它们。非常简单,因为慢的部分被封装到了作为参数传递给timeSample()函数的函数值中。因此,我们只需要更换那部分代码来提高速度即可,而其余的部分则可以保持不变。
如果能够并行对 cities 执行 getWeatherData 方法,毫无疑问性能会得到提升。在 Scala 中,这并不复杂:
timeSample(cities => (cities.par map getWeatherData).toList)
...
//| Time taken: 3.225210903 s
对于许多顺序集合,Scala都拥有其并行版本。例如,ParArray是Array对应的并行版本,同样的,ParHashMap、ParHashSet和ParVector分别对应于HashMap、HashSet和Vector。我们可以使用par()和seq()方法来在顺序集合及其并行版本之间进行相互转换。
这本书看到这里,对于大数据的处理计算,Scala 真的开始让人惊艳了。