介紹

本文演示了 Apache Spark 如何使用 PySpark 編寫強大的 ETL 作業。PySpark 可幫助您創建更具可擴展性的處理和分析(大)數據。

在我們的案例中,我們將使用一個數據集,其中包含來自超過 370,000 輛二手車的信息(在 Kaggle 上托管的數據)。?請務必注意,數據的內容是德語。

Spark SQL

什么是阿帕奇火花

Apache Spark是最受歡迎的大規模數據處理引擎之一。它是一個開源系統,具有支持多態編程語言的 API。數據處理在內存中完成,因此比 MapReduce 快 100 倍。Spark 附帶的庫支持一組豐富的高級工具,包括用于 SQL 和結構化數據處理的Spark SQL、用于機器學習的 MLlib、用于圖形處理的GraphXSpark 流式處理。它可以從本地計算機運行,但也可以擴展到數百個節點的群集。

什么是 ETL?

ETL(Extract、T ransform 和Load)是將數據從一個系統遷移到另一個系統的過程。 T數據提取是從同質或異構源檢索數據的過程,用于進一步的數據處理和數據存儲。在數據處理期間,數據正在清理,并修改或刪除錯誤或不準確的記錄。最后,將處理后的數據加載(例如存儲)到目標系統,如數據倉庫或數據湖或 NoSQL 或 RDBMS。

提取

每個 Spark 應用程序的出發點都是創建 SparkSession。這是一個驅動程序過程,用于維護有關 Spark 應用程序的所有相關信息,它還負責在所有執行器之間分發和調度應用程序。我們只需通過以下方式創建 SparkSession:

Python

 

格式("csv") |
.架構(架構) |
.選項("標題","真") |
.load(環境="HOME"]="/數據/自動.csv")

打?。?加載到 PySpark 中的數據","\n")
返回 df' 數據朗="文本/x-python"*

1
defload_df_with_schema火花):
2
架構結構類型(*
3
結構字段"日期已爬"時間戳類型 true),
4
結構字段("賣方",字符串類型(),),

6
結構字段"報價類型"字符串類型true),
7
結構場"價格"長類型),
8
結構字段"abtest"字符串類型true),
9
結構字段"車輛類型"字符串類型true),
10
結構字段"年度注冊"字符串類型true),
11
結構場("電源PS",短型(),),

13
結構字段"模型"字符串類型true),
14
結構場"公里"長類型),
15
結構字段"月注冊"字符串類型true),
16
結構場"燃料類型"字符串類型true),
17
結構場"品牌"字符串類型),
18
結構字段("日期創建",日期類型(),),

20
結構場"nrofPictures"短型),
21
結構字段"郵碼"字符串類型true),
22
結構字段"最后看到"時間戳類型true
23
  ])
24

25
df火花|
26
讀取 |

27
  .格式"csv"|
28
  .架構架構|
29
  .選項("標題""true"|
30
  .負載環境="HOME"="/數據/自動.csv"
31

32
打印("加載到 PySpark 中的數據""\n")
33

對汽車數據集的五行進行采樣

如您所見,有多列包含空值。我們可以處理丟失的數據與各種各樣的選項。但是,討論此情況已不及本文的范圍。因此,我們選擇將缺少的值保留為 null。但是,此數據集中有更多的奇怪的值和列,因此需要一些基本轉換:

此清理的基本原理基于以下內容:列"日期已爬"和"lastSeen"似乎對任何未來的分析都不起作用。列"nrOfPictures"中的所有值等于 0,因此我們決定刪除此列。

賣方
格韋利希 3
私人 371525

優惠類型
安格博特 371513
格蘇 12

檢查"賣方"和"報價類型"列時,會產生以下數字。因此,我們可以刪除包含值"gewerblich"的三行,然后刪除列"賣方"。相同的邏輯也適用于列"offerType",因此,我們只剩下一個更加平衡的數據集。例如,我們將數據集如下所示:

Last five rows of the ‘cleaned’ car dataset

"已清理"汽車數據集的最后五行

負荷

我們已經將原始數據轉換為分析就緒數據,因此,我們已準備好將數據加載到本地運行的 MySQL 數據庫中進行進一步分析。例如,我們初始化了 MySQL 數據庫,其中帶有"自動"和"汽車"的表。

使用 MySQL 連接器 Python 在 Python 中連接 MySQL 數據庫的步驟

1. 使用 pip 安裝 MySQL 連接器 Python。

2. 使用 MySQL 連接器 Python 的 mysql.connector.connect() 方法,具有所需的參數來連接 MySQL。

3. 使用 connect() 方法返回的連接對象創建游標對象以執行數據庫操作。

4執行() 從 Python 執行 SQL 查詢。

5. 工作完成后,使用 cursor.close() 和 MySQL 數據庫連接使用連接.close()關閉 Cursor 對象。

6. 捕獲異常,如果在此過程中可能發生任何異常。

現在,我們已經創建了一個游標,我們可以在我們的"自動"數據庫中創建名為"汽車"的表:

創建表后,現在可以使用數據集填充它了。我們可以將數據作為元組列表(其中每個記錄都是一個元組)添加到我們的 INSERT 語句中:

因此,現在可以使用我們以前定義的游標執行此命令:

Python

 

xxxxxxx
1
1
cur.執行許多(insert_querycars_seqcars_seq= 我們將多行(從列表中)插入到表中

png"數據-新="假"數據大小="14380"數據大小格式化="14.4 kB"數據類型="臨時"數據 url="/存儲/臨時/134514 96-3 行.png"src_"http://www.armittex.com/wp-內容/上傳/2020/05/13451496-3行.png"/]

我們需要調用以下代碼將事務提交到 MySQL:

Python

 

xxxxxxx
1
 
1
恩 .提交()

為了確保數據集在 MySQL 中正確加載,我們可以在 MySQL 工作臺中檢查:

dataset loaded correctly

最后評論

對于試圖構建可擴展數據應用程序的任何數據工程師或數據科學家而言,PySpark是一個非常強大且非常有用(大)的數據工具。本文的代碼可以在GitHub上找到。由于這是我在平臺上的第一篇文章。請隨時給我您的反饋或意見。

引用

Comments are closed.