2021年4月5日 星期一

Azure Databricks的543

前言

轉眼間,Blog主已經在目前的公司待滿了兩年了,而因為我剛進來的時候,我的team就已經採用了Azure的系統,自然而然這兩年接觸不少Azure家的產品。一個我很常用的是Azure Data Factory(以下簡稱ADF),Blog主覺得是還蠻不錯的ETL服務...如果上手之後。

在純ADF裡面,想要做比較複雜的邏輯判斷會比較麻煩一點,簡單的If判斷,次數比較少的Loop都還可以(為什麼說次數比較少?因為在ADF裡,常跟ForEach配對使用的LookUp有容量跟row數限制),但要做比較複雜的任務的時候就很麻煩,上述的Loop次數問題不談,還有單一pipeline不能用多重Loop,想改用多重pipeline時,卻又無法回傳參數...等等問題(執筆當下,2021/04的情況)。

當然,Azure也有其他服務可以解決上述的問題,像是比較貼近ADF這種UI介面為主的Logic App,專門處理大量數據的Databricks,還有比較傳統,但自由度高很多的Azure Batch...等等。其中,目前Blog主比較愛用的就是Databricks,它算是處於上述三種服務的中間位置,不像Logic App那麼的純UI,基本上還是寫Code,但是編輯跟運行code又比Azure Batch快速跟方便很多,不用考慮太多系統建構的問題。以下就是這段時間下來,Blog主發覺在Databricks很常用到的,容易搞混的,還有意外方便的一些東西。

正文

從DB取資料時的轉換問題

Databricks最主要的功用是處理大量數據,所以最先遇到的是取資料的問題。在預設情況下,從Database裡面取資料出來時,是個PySpark的Dataframe(以下簡稱DF)。它能支援分散處理的操作,但跟大家比較熟悉的Pandas DF操作會差很多,同時網路上的資源也比較少。所以一個運算效率比較差,但對操作者方便的做法就是轉成Pandas的DF(特別在當你的資料還在幾萬~幾百萬行時)。下面是Blog主習慣用的自定義函式,他一次做完「SQL→從DB取出資料→轉換成Pandas的DF→先轉換型態成object」的動作。當然,如果你只想轉換DF,就用toPandas()的那行就好。

def get_pd_frame_by_query(pushdown_query):
    pushdown_query = "( "+pushdown_query+" ) query"
    df_query = spark.read.jdbc(url=jdbcUrl, table=pushdown_query, properties=connectionProperties)
    pdf_query = df_query.select("*").toPandas()
    pdf_query = pdf_query.astype('object')
    return pdf_query

不過,有時候當檔案還是太大時,會轉換失敗,有時候是真的達到物理的極限,只好用更好性能的虛擬機,或者乖乖用PySpark那套。但有些時候,其實是Databricks的自動優化功能導致轉換失敗,以下這條關閉的指令會很方便(預設是開啟的)。

spark.conf.set("spark.sql.execution.arrow.enabled", "false")

Databricks File System的陷阱

Databricks一個很方便的功能是Databricks File System(以下簡稱DBFS),可以讓你mount不同雲端儲存服務在系統裡,並直接像本地系統般的存取。不過在使用上會有些陷阱:

兩種路徑


取自MS docs
(https://docs.microsoft.com/en-us/azure/databricks/data/databricks-file-system)

同個檔案在Databricks裡面會有兩種路徑,一個是以DBFS當起點(也就是上圖的兩個fs),另一個是以你啟動的虛擬機為起點,分別使用不同的路徑開頭。簡單來說前者你想存取掛載的空間時要加「/dbfs」,後者你想存取虛擬機本機時要加「file:/」。

存取問題

講到這邊,可能有些人會想問,「為什麼要搞這麼複雜呢?如果我每一行都用虛擬機本機去存取DBFS就好了不是?」,這句聽起來是對的沒錯,不過實際上用虛擬機本機,對DBFS上的檔案做比較複雜的處理時,會跳錯誤或停止不動。具體來說,什麼是「比較複雜的處理?」,Blog主的認知是超出讀取檔案的動作之外的,都可能算是。

一個append失敗的範例,同一段code用w或r都會成功

譬如像Blog主以前想對DBFS的檔案做append的動作,當時是用python的open,一下指令馬上就死給我看(但單純的讀取r或寫入w就正常)。還有一次對DBFS的檔案,用gzip配合split做分割壓縮檔的時候,寫入到第二個檔案就會停住。總之,DBFS雖然方便,而且規格統一。但實際使用上,如果對DBFS裡的檔案想做直接加工時,還是建議先把檔案複製到本機裡,處理完後再複製回去,會比較安全。

跟一般notebook使用shell指令的差別

在以前習慣的Jupyter Notebook裡,加上一個驚嘆號就能實行指令,在Databricks的Notebook裡面其實也可以,不過轉成用%sh放在前面,並且還有另外一種%fs。%sh基本上就是平常linux能使用的指令,代表的是以虛擬機為起點,%fs則是前面講的DBFS專用指令,以DBFS的角度為起點,可以做一些基本的檔案操作(cp,rm,ls...等等),使用上需要注意的地方已經在前面解釋了。

%sh
#check all the mounted storage
ls -l /dbfs/mnt

能用指令後,另外一個問題就是,如何動態的放入字串到指令裡?我還蠻喜歡用Jupyter的{},能讓我直接帶入Python的變數。但Databricks就不支援了,你只能用Python的os先把變數轉到系統上,再間接使用他們。以下是Blog主以前寫的兩段Code,第一段先用Python放入參數,第二段再用shell指令去取檔案。

#pass to the os environment variables
os.environ['RUN_ID'] = run_id
os.environ['SOURCE_DIR'] = source_file_dir
os.environ['SOURCE_FN'] = source_filename
os.environ['OUTPUT_FN'] = output_filename
#check the os environment variables
print(os.getenv('RUN_ID'),os.getenv('SOURCE_DIR'),os.getenv('SOURCE_FN'),os.getenv('OUTPUT_FN'))
%sh
#copy the file from blob storage
cp /dbfs/mnt/$SOURCE_DIR/$SOURCE_FN $RUN_ID/$SOURCE_FN

Web Terminal

Web terminal可以在虛擬機的設定頁面找到

另一個我很喜歡的是Web terminal,配合DBFS的系統,在當我想快速確認檔案時非常的好用。譬如我最常使用的Azure Storage,當我要確認檔案時還要把上面的檔案先載到本地,才能開起來看。如果檔案小,只會浪費一點操作的時間,如果檔案大,就浪費很多下載的時間。而且常有的情況是,我需要反覆確認檔案,這時用下載的就非常沒效率。而用這個Web terminal能夠直接確認檔案內容,你想要用tail,head,還是less...等等都可以。

讀取及傳回ADF參數

在文章的一開始,Blog主就講了一點Databricks跟ADF的運作關係。Blog主在需要動態傳入參數執行的時候,在Databricks上會需要接收ADF傳來的參數。又或著執行完時,有ADF其他後續的工作,是依賴著Databricks的結果的,那就需要傳回ADF上的參數。以下是分別兩種的範例:

取得參數

def get_parameter(parameter_name):
    dbutils.widgets.text(parameter_name, "","")
    parameter_val = dbutils.widgets.get(parameter_name)
    return parameter_val

回傳參數


import json
dbutils.notebook.exit(json.dumps({
  "code": code,
  "reason" :reason
}))

結尾

以上就是Blog主幾個常用或覺得比較特殊的內容,希望能對一些有用Databricks的讀者有幫助。其實這篇是Blog主第一次打技術類的文章,寫起來還蠻新奇的感覺(笑)。其實現在網路上,光是繁體中文的技術文章也不少(雖然不能說豐富?),但工作的時候發現,跟Azure相關的繁中文章卻極少,也算是Blog主想開始寫這篇文章的契機之一,以後可能看情況會不定期更新。


沒有留言:

張貼留言