Spark 小记

Spark 小记

链接

这里使用 SparkSession 进行链接

from pyspark.sql import SparkSession
sparkSession = (
    SparkSession.builder.master("master")
    .appName("example-pyspark-read-and-write")
    .getOrCreate()
)

文件处理

假设文件内容为以下内容重复两次,共计一百行。

{"timestamp":"1600993027","name":"0.205-240-81.adsl-dyn.isp.belgacom.be","type":"a","value":"81.240.205.0"}
{"timestamp":"1600993274","name":"0.205-241-81.adsl-dyn.isp.belgacom.be","type":"a","value":"81.241.205.0"}
{"timestamp":"1600993109","name":"0.205-242-81.adsl-dyn.isp.belgacom.be","type":"a","value":"81.242.205.0"}
{"timestamp":"1600993317","name":"0.205-243-81.adsl-dyn.isp.belgacom.be","type":"a","value":"81.243.205.0"}
{"timestamp":"1600993330","name":"0.205-244-23.rdns.scalabledns.com","type":"a","value":"23.244.205.0"}
{"timestamp":"1600993020","name":"0.205-244-81.adsl-dyn.isp.belgacom.be","type":"a","value":"81.244.205.0"}
{"timestamp":"1600993274","name":"0.205-245-23.rdns.scalabledns.com","type":"a","value":"23.245.205.0"}
{"timestamp":"1600993074","name":"0.205-245-81.adsl-dyn.isp.belgacom.be","type":"a","value":"81.245.205.0"}
{"timestamp":"1600993005","name":"0.205-246-81.adsl-dyn.isp.belgacom.be","type":"a","value":"81.246.205.0"}
{"timestamp":"1600993122","name":"0.205-247-81.adsl-dyn.isp.belgacom.be","type":"a","value":"81.247.205.0"}
{"timestamp":"1600993245","name":"0.205-26-211.dynamic.dsl.mel.iprimus.net.au","type":"a","value":"211.26.205.0"}
{"timestamp":"1600993083","name":"0.205-27-211.static.corp.pth.iprimus.net.au","type":"a","value":"211.27.205.0"}
{"timestamp":"1600993540","name":"0.205-31-94.telenet.ru","type":"a","value":"94.31.205.0"}
{"timestamp":"1600993142","name":"0.205-50-210.dynamic.dsl.mel.iprimus.net.au","type":"a","value":"210.50.205.0"}
{"timestamp":"1600993070","name":"0.205-65-87.adsl-dyn.isp.belgacom.be","type":"a","value":"87.65.205.0"}
{"timestamp":"1600993017","name":"0.205-67-87.adsl-dyn.isp.belgacom.be","type":"a","value":"87.67.205.0"}
{"timestamp":"1600993318","name":"0.205-78-194.adsl-fix.skynet.be","type":"a","value":"194.78.205.0"}
{"timestamp":"1600992963","name":"0.205-88-23.rdns.scalabledns.com","type":"a","value":"23.88.205.0"}
{"timestamp":"1600993316","name":"0.205-89-23.rdns.scalabledns.com","type":"a","value":"23.89.205.0"}
{"timestamp":"1600993157","name":"0.205-pool.nikopol.net","type":"a","value":"213.111.205.0"}
{"timestamp":"1600993003","name":"0.205-tl.cetl.com.ar","type":"a","value":"127.0.0.1"}
{"timestamp":"1600993060","name":"0.205.0.109.rev.sfr.net","type":"a","value":"109.0.205.0"}
{"timestamp":"1600993106","name":"0.205.0.85.dynamic.wline.res.cust.swisscom.ch","type":"a","value":"85.0.205.0"}
{"timestamp":"1600993221","name":"0.205.0.93.rev.sfr.net","type":"a","value":"93.0.205.0"}
{"timestamp":"1600993062","name":"0.205.1.109.rev.sfr.net","type":"a","value":"109.1.205.0"}
{"timestamp":"1600993081","name":"0.205.1.135.in-addr.arpa","type":"a","value":"135.1.205.0"}
{"timestamp":"1600993611","name":"0.205.1.85.dynamic.wline.res.cust.swisscom.ch","type":"a","value":"85.1.205.0"}
{"timestamp":"1600993377","name":"0.205.1.93.rev.sfr.net","type":"a","value":"93.1.205.0"}
{"timestamp":"1600993016","name":"0.205.1.cablemodem.amnethn.com","type":"a","value":"103.224.182.208"}
{"timestamp":"1600993596","name":"0.205.10.109.rev.sfr.net","type":"a","value":"109.10.205.0"}
{"timestamp":"1600992950","name":"0.205.10.93.rev.sfr.net","type":"a","value":"93.10.205.0"}
{"timestamp":"1600992964","name":"0.205.100.34.bc.googleusercontent.com","type":"a","value":"34.100.205.0"}
{"timestamp":"1600993129","name":"0.205.100.84.rev.sfr.net","type":"a","value":"84.100.205.0"}
{"timestamp":"1600993491","name":"0.205.101.34.bc.googleusercontent.com","type":"a","value":"34.101.205.0"}
{"timestamp":"1600993019","name":"0.205.101.84.rev.sfr.net","type":"a","value":"84.101.205.0"}
{"timestamp":"1600993842","name":"0.205.102.146.nbk.vse.cz","type":"a","value":"146.102.205.0"}
{"timestamp":"1600993166","name":"0.205.102.34.bc.googleusercontent.com","type":"a","value":"34.102.205.0"}
{"timestamp":"1600993111","name":"0.205.102.84.rev.sfr.net","type":"a","value":"84.102.205.0"}
{"timestamp":"1600993174","name":"0.205.103.34.bc.googleusercontent.com","type":"a","value":"34.103.205.0"}
{"timestamp":"1600992962","name":"0.205.103.84.rev.sfr.net","type":"a","value":"84.103.205.0"}
{"timestamp":"1600993148","name":"0.205.104.34.bc.googleusercontent.com","type":"a","value":"34.104.205.0"}
{"timestamp":"1600993429","name":"0.205.104.92.dynamic.wline.res.cust.swisscom.ch","type":"a","value":"92.104.205.0"}
{"timestamp":"1600993062","name":"0.205.105.34.bc.googleusercontent.com","type":"a","value":"34.105.205.0"}
{"timestamp":"1600993159","name":"0.205.105.92.dynamic.wline.res.cust.swisscom.ch","type":"a","value":"92.105.205.0"}
{"timestamp":"1600992965","name":"0.205.106.34.bc.googleusercontent.com","type":"a","value":"34.106.205.0"}
{"timestamp":"1600993045","name":"0.205.106.62.rev.sfr.net","type":"a","value":"62.106.205.0"}
{"timestamp":"1600992957","name":"0.205.106.92.dynamic.wline.res.cust.swisscom.ch","type":"a","value":"92.106.205.0"}
{"timestamp":"1600992954","name":"0.205.107.34.bc.googleusercontent.com","type":"a","value":"34.107.205.0"}
{"timestamp":"1600993068","name":"0.205.107.92.dynamic.wline.res.cust.swisscom.ch","type":"a","value":"92.107.205.0"}
{"timestamp":"1600993152","name":"0.205.108.190-cust.enetworksgy.com","type":"a","value":"190.108.205.0"}

文件放在了 HDFS 上面,可以使用 SparkSession 对象读取

data_file = sparkSession.read.json("hdfs://hdfs/input/test.json")
# DataFrame[name: string, timestamp: string, type: string, value: string]

对文件去重,根据 name 和 value 进行去重

clean_data = data_file.distinct().dropDuplicates(subset=[c for c in data_file.columns if c in ["name", "value"]])
# 通过 collect 方法查看去重后的数据
clean_data.collect()
# [Row(name='0.205.108.190-cust.enetworksgy.com', timestamp='1600993152', type='a', value='190.108.205.0'), Row(name='0.205.10.109.rev.sfr.net', timestamp='1600993596', type='a', value='109.10.205.0'), Row(name='0.205-240-81.adsl-dyn.isp.belgacom.be', timestamp='1600993027', type='a', value='81.240.205.0'), Row(name='0.205-27-211.static.corp.pth.iprimus.net.au', timestamp='1600993083', type='a', value='211.27.205.0'), Row(name='0.205.101.34.bc.googleusercontent.com', timestamp='1600993491', type='a', value='34.101.205.0'), Row(name='0.205.1.85.dynamic.wline.res.cust.swisscom.ch', timestamp='1600993611', type='a', value='85.1.205.0'), Row(name='0.205-241-81.adsl-dyn.isp.belgacom.be', timestamp='1600993274', type='a', value='81.241.205.0'), Row(name='0.205-pool.nikopol.net', timestamp='1600993157', type='a', value='213.111.205.0'), Row(name='0.205.106.34.bc.googleusercontent.com', timestamp='1600992965', type='a', value='34.106.205.0'), Row(name='0.205.106.92.dynamic.wline.res.cust.swisscom.ch', timestamp='1600992957', type='a', value='92.106.205.0'), Row(name='0.205.101.84.rev.sfr.net', timestamp='1600993019', type='a', value='84.101.205.0'), Row(name='0.205.105.92.dynamic.wline.res.cust.swisscom.ch', timestamp='1600993159', type='a', value='92.105.205.0'), Row(name='0.205-247-81.adsl-dyn.isp.belgacom.be', timestamp='1600993122', type='a', value='81.247.205.0'), Row(name='0.205.1.cablemodem.amnethn.com', timestamp='1600993016', type='a', value='103.224.182.208'), Row(name='0.205-245-81.adsl-dyn.isp.belgacom.be', timestamp='1600993074', type='a', value='81.245.205.0'), Row(name='0.205.106.62.rev.sfr.net', timestamp='1600993045', type='a', value='62.106.205.0'), Row(name='0.205-246-81.adsl-dyn.isp.belgacom.be', timestamp='1600993005', type='a', value='81.246.205.0'), Row(name='0.205.102.146.nbk.vse.cz', timestamp='1600993842', type='a', value='146.102.205.0'), Row(name='0.205-67-87.adsl-dyn.isp.belgacom.be', timestamp='1600993017', type='a', value='87.67.205.0'), Row(name='0.205.10.93.rev.sfr.net', timestamp='1600992950', type='a', value='93.10.205.0'), Row(name='0.205.103.34.bc.googleusercontent.com', timestamp='1600993174', type='a', value='34.103.205.0'), Row(name='0.205.107.34.bc.googleusercontent.com', timestamp='1600992954', type='a', value='34.107.205.0'), Row(name='0.205-50-210.dynamic.dsl.mel.iprimus.net.au', timestamp='1600993142', type='a', value='210.50.205.0'), Row(name='0.205.100.34.bc.googleusercontent.com', timestamp='1600992964', type='a', value='34.100.205.0'), Row(name='0.205-89-23.rdns.scalabledns.com', timestamp='1600993316', type='a', value='23.89.205.0'), Row(name='0.205.1.93.rev.sfr.net', timestamp='1600993377', type='a', value='93.1.205.0'), Row(name='0.205-88-23.rdns.scalabledns.com', timestamp='1600992963', type='a', value='23.88.205.0'), Row(name='0.205.1.109.rev.sfr.net', timestamp='1600993062', type='a', value='109.1.205.0'), Row(name='0.205.100.84.rev.sfr.net', timestamp='1600993129', type='a', value='84.100.205.0'), Row(name='0.205-245-23.rdns.scalabledns.com', timestamp='1600993274', type='a', value='23.245.205.0'), Row(name='0.205.104.34.bc.googleusercontent.com', timestamp='1600993148', type='a', value='34.104.205.0'), Row(name='0.205.102.34.bc.googleusercontent.com', timestamp='1600993166', type='a', value='34.102.205.0'), Row(name='0.205-242-81.adsl-dyn.isp.belgacom.be', timestamp='1600993109', type='a', value='81.242.205.0'), Row(name='0.205-26-211.dynamic.dsl.mel.iprimus.net.au', timestamp='1600993245', type='a', value='211.26.205.0'), Row(name='0.205.0.85.dynamic.wline.res.cust.swisscom.ch', timestamp='1600993106', type='a', value='85.0.205.0'), Row(name='0.205-31-94.telenet.ru', timestamp='1600993540', type='a', value='94.31.205.0'), Row(name='0.205.0.93.rev.sfr.net', timestamp='1600993221', type='a', value='93.0.205.0'), Row(name='0.205.103.84.rev.sfr.net', timestamp='1600992962', type='a', value='84.103.205.0'), Row(name='0.205-244-23.rdns.scalabledns.com', timestamp='1600993330', type='a', value='23.244.205.0'), Row(name='0.205-244-81.adsl-dyn.isp.belgacom.be', timestamp='1600993020', type='a', value='81.244.205.0'), Row(name='0.205-78-194.adsl-fix.skynet.be', timestamp='1600993318', type='a', value='194.78.205.0'), Row(name='0.205.104.92.dynamic.wline.res.cust.swisscom.ch', timestamp='1600993429', type='a', value='92.104.205.0'), Row(name='0.205-65-87.adsl-dyn.isp.belgacom.be', timestamp='1600993070', type='a', value='87.65.205.0'), Row(name='0.205-243-81.adsl-dyn.isp.belgacom.be', timestamp='1600993317', type='a', value='81.243.205.0'), Row(name='0.205.102.84.rev.sfr.net', timestamp='1600993111', type='a', value='84.102.205.0'), Row(name='0.205.107.92.dynamic.wline.res.cust.swisscom.ch', timestamp='1600993068', type='a', value='92.107.205.0'), Row(name='0.205.105.34.bc.googleusercontent.com', timestamp='1600993062', type='a', value='34.105.205.0'), Row(name='0.205.0.109.rev.sfr.net', timestamp='1600993060', type='a', value='109.0.205.0'), Row(name='0.205.1.135.in-addr.arpa', timestamp='1600993081', type='a', value='135.1.205.0'), Row(name='0.205-tl.cetl.com.ar', timestamp='1600993003', type='a', value='127.0.0.1')]
# 通过 count 方法查看去重后的数据总数
clean_data.count()
# 50
# 通过 foreach() 方法来针对 item 进行操作
clean_data.foreach(lambda x: print(x))
# Row(name='0.205.108.190-cust.enetworksgy.com', timestamp='1600993152', type='a', value='190.108.205.0')
# ……

本文链接:

https://jamchoi.me/archives/spark.html
1 + 1 =
快来做第一个评论的人吧~