Hadoop & Spark學習筆記(二):Hadoop和SparkRDD的操作

Yanwei Liu
11 min readFeb 22, 2020

--

Hadoop

管理者的指令

hdfs namenode -format  #格式化NameNodehdfs dfsadmin –report  #回報基本的檔案系統資訊和統計hdfs dfsadmin -getDatanodeInfo slave1:50020 #查看slave1的資訊

使用者的指令

hdfs dfs -ls /user/mis  #列出/user/mis目錄下的目錄和檔案hdfs dfs -ls -R /user/mis 加上-R選項,列出所有子目錄細節hdfs dfs -mkdir /hadoop/data #建立/hadoop/data這個目錄結構hdfs dfs -mkdir -p /hadoop/data #加上-p選項,連同原本不存在的上層父目錄一同建立hdfs dfs -touch /hadoop/data/input.txt #在/user/hadoop/data之下建立一個空檔案input.txt#使用-appendToFile將本地的檔案附加到某個檔案之後
hdfs dfs -appendToFile /etc/hosts /hadoop/data/input.txt
#把使用者mis的登入腳本檔拷貝到hdfs上的/hadoop目錄下,並且儲存為mis.sh
hdfs dfs -copyFromLocal /home/mis/.bashrc /hadoop/mis.sh
#將本地Hadoop設定檔目錄下的所有.xml檔案拷貝到/hadoop/data目錄下
hdfs dfs -copyFromLocal /usr/local/hadoop/etc/hadoop/*.xml /hadoop/data
#將hdfs上的/hadoop/mis.sh拷貝放置本地/home/mis目錄下
hdfs dfs -copyToLocal /hadoop/mis.sh /home/mis
#將hdfs上的目錄下所有檔案合併至一個本地檔案
hdfs dfs -getmerge /hadoop/data all.xml
hdfs dfs -cat /hadoop/mis.sh #顯示前例中所產生的mis.sh檔案內容hdfs dfs -tail /hadoop/mis.sh #顯示檔案最後面的資料#將/hadoop/mis.sh檔案拷貝到/hadoop/data之下
hdfs dfs -cp /hadoop/mis.sh /hadoop/data
#將/hadoop/data整個目錄,拷貝到hdfs的跟目錄下hdfs dfs -cp /hadoop/data /#從hdfs拷貝/hadoop/mis.sh至本地的/home/mis目錄,檔名取為mis2.txt
hdfs dfs -cp /hadoop/mis.sh home/mis/mis2.txt
#將/data移動到/hadoop下,並將目錄名稱改為data2
hdfs dfs -mv /data /hadoop/data2
#刪除檔案/hadoop/data/input.txt
hdfs dfs -rm /hadoop/data/input.txt
#刪除目錄/hadoop/data2
hdfs dfs -rm -r /hadoop/data2
#將/hadoop/data/mis.sh的存檔份數從1份提升到3份
hdfs dfs -setrep 3 /hadoop/data/mis.sh
#查看/hadoop目錄下所佔總空間大小(bytes)
hdfs dfs -du /hadoop
hdfs dfs -du /hadoop/data
#查看/hadoop目錄下的目錄數、檔案數和所佔總空間大小(bytes)
hdfs dfs -count /hadoop
#檢查/hadoop下檔案架構的健康狀態
hdfs fsck /hadoop

SparkRDD

spark-shell  #啟動spark shell
:q #用:q離開spark-shell環境
### Create建立#建立RDD,並查看RDD內容和分割數
val numRDD = sc.makeRDD(1 to 9)
numRDD.foreach(println)
numRDD.partitions.size
#使用parallelize建立RDD,功能與makeRDD相同
val numRDD = sc.parallelize(1 to 9)
numRDD.foreach(println)
#建立RDD,設定分割數為3
val numRDD2 = sc.makeRDD(1 to 9, 3)
numRDD2.partitions.size
#建立字串RDD,使用二種不同的輸出方式。注意collect會將資料轉成Arrayval strRDD = sc.makeRDD(Array("一", "二", "三", "四", "五", "六", "日"))
strRDD.foreach(println)
strRDD.collect()
#Array建立數字1~9 RDD
val numRDD = sc.makeRDD(Array(1,2,3,4,5,6,7,8,9))
numRDD.foreach(println)
numRDD.collect()
### Read讀取#讀入本地檔案/etc/network/interfaces建立RDD,並用foreach(println)列印出來
val fileRDD = sc.textFile("file:///etc/network/interfaces")
fileRDD.foreach(println)
### map運算#整數元素轉變成浮點數
val f2RDD = numRDD.map(_.toFloat)
f2RDD.foreach(println)
#計算y=3*√x+1的結果
val equRDD = f2RDD.map(x => 3*math.sqrt(x) + 1)
equRDD.foreach(println)
### filter運算#留下大於4的元素
val numRDD = sc.makeRDD(1 to 9)
numRDD.foreach(println)
val xRDD = numRDD.filter(x => x>4)
xRDD.foreach(println)
#找出RDD中是否有John、有無名字包含大寫J、有無名字包含大寫J或小寫k
val nameRDD=sc.makeRDD(Array("John", "Mary", "Jack", "Nancy", "Mark", "Maggie", "Melissa"))
nameRDD.foreach(println)
nameRDD.filter(x => x=="John").foreach(println)
nameRDD.filter(x => x.contains("J")).foreach(println)
nameRDD.filter(x => x.contains("J") || x.contains("k")).foreach(println)
### groupBy運算#基偶分組
val xRDD=numRDD.groupBy(x => { if (x % 2 == 0) "even" else "odd"})
#大於5的分一組,小於等於5的分一組
val yRDD=numRDD.groupBy(x => x > 5)
### sortBy運算#升降冪排列val xRDD=sc.makeRDD(Array(2,4,6,8,1,3,5,7,9)) #正常排列
xRDD.collect
val yRDD=xRDD.sortBy(x => x) #升冪排列
yRDD.collect
val zRDD=xRDD.sortBy(x => x, false) #降冪排列
zRDD.collect
### 統計運算#資料筆數切割成70:30,並分別計算筆數
val xRDD = sc.makeRDD(1 to 100)
xRDD.collect
val yRDD = xRDD.randomSplit(Array(0.7, 0.3))
yRDD(0).count
yRDD(0).collect
yRDD(1).count
yRDD(1).collect
#union聯集val xRDD=sc.makeRDD(1 to 5)
val yRDD=sc.makeRDD(4 to 8)
xRDD.collect()
yRDD.collect()
val zRDD=xRDD.union(yRDD)
zRDD.collect() //注意:4和5有重複,union結果不會去除重複
val wRDD=xRDD ++ yRDD
wRDD.collect()
#intersection交集val xRDD=sc.makeRDD(1 to 5)
val yRDD=sc.makeRDD(4 to 8)
xRDD.collect()
yRDD.collect()
val zRDD=xRDD.intersection(yRDD)
zRDD.collect() //只有4和5是xRDD和yRDD所共有
#subtract差集(刪除共有的)
val xRDD=sc.makeRDD(1 to 5)
val yRDD=sc.makeRDD(4 to 8)
xRDD.collect()
yRDD.collect()
val zRDD=xRDD.subtract(yRDD)
zRDD.collect() //xRDD的4和5被減掉了
#distince去除重複
val xRDD=sc.makeRDD(1 to 5)
val yRDD=sc.makeRDD(4 to 8)
val zRDD=xRDD.union(yRDD).distinct
zRDD.collect()
#組合成Key-Value
val xRDD=sc.makeRDD(Array("老師","學生"))
val yRDD=sc.makeRDD(Array("教書","讀書"))
xRDD.foreach(println)
yRDD.foreach(println)
val zRDD=xRDD.zip(yRDD)
zRDD.foreach(println)
### join運算#讓相同「學號」的「姓名」和「英文成績」連結在一起val studRDD=sc.makeRDD(Array(("ID01","曾子"),("ID02","子路"),("ID03","顏回"),("ID04","宰予")))val engRDD=sc.makeRDD(Array(("ID01",100),("ID02",85),("ID03",91)))
studRDD.join(engRDD).foreach(println)
#缺考的同學也會出現
studRDD.leftOuterJoin(engRDD).foreach(println)
### filter的Key-Value運算#找出學號(鍵)為ID03的學生,還有名字(值)為 "子路" 的學生val studRDD=sc.makeRDD(Array(("ID01","曾子"),("ID02","子路"),("ID03","顏回"),("ID04","宰予"),("ID10","子貢")))
studRDD.filter(x => x._1 == "ID03" ).collect
studRDD.filter(x => x._2 == "子路" ).collect
studRDD.filter { case (k,v) => k == "ID03" } .collect
studRDD.filter { case (k,v) => v == "子路" } .collect

### 計算統計量
val numRDD = sc.makeRDD(1 to 9)
numRDD.count
numRDD.mean
numRDD.stdev
numRDD.max
numRDD.min
numRDD.sum
numRDD.variance
numRDD.stats
### lookup運算#查詢xRDD中鍵為 "A" 的值有哪些val xRDD=sc.makeRDD(Array(("A",1),("A",2),("A",3),("B",4),("B",5),("C",6)))
xRDD.lookup("A")
### 檔案輸出#將studRDD的內容存成文字檔,saveAsTextFile的參數file:///home/mis/stud_output必須是一個「不存在」的目錄名稱val studRDD=sc.makeRDD(Array(("ID01","曾子"),("ID02","子路"),("ID03","顏回"),("ID04","宰予")))
studRDD.saveAsTextFile("hdfs://master:9000/stud_output")

--

--

No responses yet