Microsoft Azure Databricks ETL

ETL: Extract, Transform and Load 数据仓库技术

Workspace -> Users (your own folder) -> 右键import -> .dbc file

-> Create -> Notebook -> Default language: python

jobs -> start cluster

调用notebook

%run /_POC_QA_SC_DataCenter/Dayu-connect-to-datalake
dbutils.notebook.run("/_POC_QA_SC_DataCenter/Dayu-connect-to-datalake_Func", 60, {"argument": "data"})
  • 优势:可传参,可调用多个笔记本
  • 缺点:启动新作业,变量不存在

Input & Output

dbutils.widgets.get("argument1")
dbutils.notebook.exit(data)

ADF 读取output:@activity(‘Notebook’).output

文件操作

  • dbutils
dbutils.fs.ls("abfss://container@blob.xxx.cn/folder/")
dbutils.fs.rm("abfss://container@blob.xxx.cn/folder/filename", True)
dbutils.fs.mkdirs(TempPath)
dbutils.fs.cp(SapPath+FileName, TempPath)

踩坑:若要跟新表结构,需将存表的文件夹删除

  • 获取文件名
df = spark.read \
  .format("text") \
  .load("abfss://container@blob.xxx.cn/folder/filename_*.txt") \
  .select("*", "_metadata")
display(df)

df.createOrReplaceTempView("df_spark")
df1=spark.sql(f"select distinct _metadata.file_name as filename from (select * from df_spark order by _metadata) a")
for i in range(0,30):
    filename=str(df1.collect()[i][0])

踩坑:xlsx文件也只能用 .format(‘csv’) 不能 .format(“com.crealytics.spark.excel”)

  • os
import os

os.listdir("/")
%sh ls /
  • azure-storage-file-datalake
pip install azure-storage-file-datalake
import os, uuid, sys
from azure.storage.filedatalake import DataLakeServiceClient
from azure.core._match_conditions import MatchConditions
from azure.storage.filedatalake._models import ContentSettings

def initialize_storage_account():
    try:  
        global service_client
        credential = ClientSecretCredential(tenant_id, client_id, client_secret)
        service_client = DataLakeServiceClient(account_url=account_url, credential=credential)
    except Exception as e:
        print(e)

def list_directory(container="raw", folder="example_folder"):
    try:
        file_system_client = service_client.get_file_system_client(file_system=container)
        paths = file_system_client.get_paths(path=folder)
        for path in paths:
            print(path.name + '\n')
    except Exception as e:
     print(e)

def create_directory(container="raw", folder="example_folder"):
    file_system_client = service_client.get_file_system_client(file_system=container)
    res = file_system_client.create_directory(folder)

def delete_file(container="raw", file="example_folder/example.xlsx"):
    file_system_client = service_client.get_file_system_client(file_system=container)
    res = file_system_client.delete_file(file)

def delete_directory(container="raw", folder="example_folder"):
    file_system_client = service_client.get_file_system_client(file_system=container)
    res = file_system_client.delete_directory(folder)

文件读写

  • CSV
df_csv=spark.read.format("csv").option("encoding","GBK").load("abfss://container@blob.xxx.cn/folder/filename.csv");
display(df_csv)
# GBK解决中文乱码
# CSV不需要header
df_csv.to_csv(<target_file>, index = False, header=False, encoding = 'utf_8_sig')

踩坑:encoding为GBK时,excel里将”创作语言和校对”设置英文首选,会出现中文乱码全是?情况

  • TXT
df_txt=spark.read.format("text").load(sourcefile);
df_txt.coalesce(1).write.format("text").save("s3://<bucket>/<folder>/filename")

踩坑:coalesce只会确保产生一个文件,仍会生成以filename命名的文件夹,文件夹下有以part加数字命名的txt文件(以及_SUCCESS, _committed, _started文件)

# 解决txt文件每一行有引号
df_txt.to_csv(<target_file>, index = False, header=False, sep='\t', quoting=False, quotechar=' ')
  • Excel
# header必传项, maxRowsInMemory解决文件过大>10mb
df_excel=spark.read.format("com.crealytics.spark.excel").option("header","true").option("maxRowsInMemory", 2000).load(sourcefile);

csv是通过spark读的,excel是spark底层hadoop读的,libraries里安装com.crealytics:spark-excel

pip install openpyxl

# excel文件得保留header
df_excel.to_excel(<target_file>, index = False)
  • JSON
select * from json.`abfss://container@blob.xxx.cn/folder/filename`
df_DateLake=spark.read.format("json").load("abfss://container@blob.xxx.cn/folder/filename.json");
  • Parquet
select * from delta.`abfss://container@blob.xxx.cn/folder/filename`

数据处理

DCL

  • blob 存储文件系统的访问权限
GRANT SELECT, MODIFY ON ANY FILE TO `<user>@<domain-name>` --
  • schema 访问权限
SHOW GRANTS ON SCHEMA <SCHEMANAME>

GRANT USAGE, SELECT, CREATE, READ_METADATA, MODIFY ON SCHEMA <SCHEMANAME> TO `<user>@<domain-name>`
  • table 访问权限
ALTER TABLE <TABLENAME> OWNER TO `<user>@<domain-name>`
GRANT SELECT, READ_METADATA ON TABLE <TABLENAME> TO `<user>@<domain-name>`

DESCRIBE [EXTENDED] <TABLENAME> --表的基本元数据信息

SHOW GRANTS on TABLE <TABLENAME> --表的权限信息
SHOW GRANTS `<user>@<domain-name>` on TABLE <TABLENAME>

DDL

  • 创建SCHEMA
CREATE SCHEMA example_schema
LOCATION 'dbfs:/mnt/azrzdbicinmtdpadlsqa/{container}/{example_schema}'
  • 创建TABLE
drop table if exists STG.TableName;
create table STG.TableName 
(
`Field1` string,
`Field2` INT,
`InsertTime` timestamp)
USING delta 
LOCATION "abfss://container@blob.xxx.cn/folder/STG/TableName";

如果省略 USING,则默认值为 DELTA

对于除 DELTA 之外的任何 data_source,还必须指定 LOCATION,除非catalog为 hive_metastore。

设置湖地址

set Address=abfss://container@blob.xxx.cn;
...
LOCATION "${hiveconf:Address}/folder/CSTG/TableName";

已创建SCHEMA后不需要Location

-- 加上comment会更好
CREATE TABLE IF NOT EXISTS example_schema.example_table
(
 col1 STRING COMMENT 'col1_comment'
)
using delta
PARTITIONED BY (insertDate)
  • 克隆TABLE

SHALLOW CLONE: 浅表克隆不会将数据文件复制到克隆目标。 表元数据等效于源。 创建这些克隆的成本较低。
DEEP CLONE: 深层克隆还会将源表数据复制到克隆目标。它还会克隆流元数据。

CREATE OR REPLACE TABLE <SCHEMA>.<TABLENAME> DEEP CLONE <SCHEMA>.<TABLENAME>;

DML

  • Insert Table: CSV/EXCEL (RAW -> STG)
%python
import datetime
ManuPath="abfss://container@blob.xxx.cn/folder/RawZone/MANU/"

def sliceErrorMsg(msg):
    FailReaon = str(msg)
    if FailReaon.find(';') > 0:
        return FailReaon[0:FailReaon.find(';')].replace("'", "`")
    elif FailReaon.find('.') > 0:
        return FailReaon[0:FailReaon.find('.')].replace("'", "`")
    else:
        return FailReaon[0:30].replace("'", "`")

def insertTable(FileName, LandingTableName, TableColumn, ExcelTitle, SaveDay=30, Path=ManuPath):
    try:
        fileFormat = 'csv' if 'csv' in FileName else 'com.crealytics.spark.excel';
        df_DateLake=spark.read.format(fileFormat).option("header","true").load(Path+FileName);
    except:
        spark.sql(f" insert into STG.TableReadLog select '{LandingTableName}',current_date(),now(),'Not found {FileName}',null;");
    else:
        df_DateLake.createOrReplaceTempView("df_spark");
        spark.sql(f"delete from {LandingTableName} where InsertTime<date_sub(now(), {SaveDay})");
        try:
            spark.sql(f"insert into {LandingTableName}({TableColumn}) select {ExcelTitle},now() InsertTime from df_spark;");
        except Exception as FailReaon:
            FailReaon=sliceErrorMsg(FailReaon);
            print(FailReaon);
            spark.sql(f"insert into STG.TableReadLog select '{LandingTableName}',current_date(),now(),'{FailReaon}',null;");
        else:
            spark.sql(f"insert into STG.TableWriteLog select '{LandingTableName}',current_date(),now(),null;");
  • 增量
INSERT INTO <CSTG_SCHEMA><TABLE> (<columns>, InsertTime)
SELECT <columns>, NOW() FROM <STG_SCHEMA>.<TABLE> WHERE InsertTime>Current_Date();
merge into <CSTG_SCHEMA><TABLE> a
using <STG_SCHEMA>.<TABLE> b
    on a.id=b.id and a.insertDate=b.insertDate -- 同一天内数据更新不会覆盖
when not matched then insert 
(<columns>,insertDate)
values(<columns>)
  • 容错性增量
DELETE FROM <CSTG_SCHEMA><TABLE> A WHERE EXISTS (SELECT key FROM <STG_SCHEMA><TABLE> B WHERE A.key=B.key and B.InsertTime>Current_Date());

INSERT INTO <CSTG_SCHEMA><TABLE> (<columns>, InsertTime)
SELECT <columns>, NOW() FROM <STG_SCHEMA>.<TABLE> WHERE InsertTime>Current_Date();
  • 版本容错性增量

    相同版本号全量覆盖,不同版本号增量

    解决 增量 和 容错性增量 无法删除错误上传的数据问题

DELETE FROM <CSTG_SCHEMA><TABLE> A
WHERE EXISTS (SELECT B.YearMonth 
              FROM <STG_SCHEMA><TABLE> B
              WHERE  B.YearMonth=A.YearMonth  AND InsertTime>Current_Date());

insert into <CSTG_SCHEMA><TABLE> (<columns>, InsertTime)
select distinct -- 测试期间防止多次stg抽数据 所以添加distinct
      <columns>,now()
FROM <STG_SCHEMA>.<TABLE> WHERE InsertTime>Current_Date();
  • 全量

有 InsertTime>CurrentDate 时需要判断 IsUpdate

IsUpdate=spark.sql('select count(1) Num from <STG_SCHEMA>.<TABLE> where InsertTime>=current_date()');
if IsUpdate.collect()[0][0] > 0:
    spark.sql(f'TRUNCATE TABLE <CSTG_SCHEMA><TABLE>;')
    spark.sql(f'INSERT INTO <CSTG_SCHEMA><TABLE> ({columns},InsertTime) SELECT {columns}, now() FROM <STG_SCHEMA>.<TABLE> where InsertTime>Current_Date();')

没有 InsertTime>CurrentDate 不需要 IsUpdate

TRUNCATE TABLE <DWD_SCHEMA><TABLE>;

INSERT INTO <DWD_SCHEMA><TABLE> (<columns>)
SELECT <columns> FROM <CSTG_SCHEMA>.<TABLE>;
  • Insert Table: TXT

法一:

from pyspark.sql.types import *

def insertTxT(FileName, LandingTableName, TableColumn, header=4, SaveDay=30):
    schemalist = [];
    for col in TableColumn: schemalist.append(StructField(col, StringType(), True));
    schema = StructType(schemalist);
    selectColumn = list(filter(not_blank, TableColumn))
    df_DateLake=spark.read.format("csv").option("header","false").option("comment","*").option("encoding","utf-8").option("sep","\t").schema(schema).load(SapPath+FileName).select(selectColumn).toPandas().iloc[header:];
    df_DateLake = spark.createDataFrame(df_DateLake)
    df_DateLake.createOrReplaceTempView("df_spark");
    columns = ','.join(selectColumn)+',InsertTime';
    spark.sql(f"insert into {LandingTableName}({columns}) select *, now() InsertTime from df_spark;");

法二:

import pandas

TempPath="dbfs:/FileStore/Temp"
TempFile="/dbfs/FileStore/Temp/"

def insertTxT_Pandas(FileName, LandingTableName, TableColumn,RenameColumn,header=3, SaveDay=30):
    dbutils.fs.rm(TempPath,True)
    dbutils.fs.mkdirs(TempPath)
    dbutils.fs.cp(SapPath+FileName, TempPath)
    df_DateLake=pandas.read_csv(TempFile+FileName, header=header,sep='\t', names=RenameColumn,skipinitialspace=True,skip_blank_lines=True,error_bad_lines=False,dtype=str)
    df_DateLake = spark.createDataFrame(df_DateLake);
    df_DateLake.createOrReplaceTempView("df_spark");
    columns = delInsertTime(TableColumn);
    spark.sql(f"insert into {LandingTableName}({TableColumn}) select {columns}, now() InsertTime from df_spark;");

OPTIMIZE

优化 Delta Lake 数据的布局,优化数据子集或按列归置数据。

OPTIMIZE table_name [WHERE predicate]
  [ZORDER BY (col_name1 [, ...] ) ]

启用自动优化

-- 所有新表
set spark.databricks.delta.properties.defaults.autoOptimize.optimizeWrite = true;
set spark.databricks.delta.properties.defaults.autoOptimize.autoCompact = true;
Category: Cloud

Author: Yoga

Article
Tagcloud
DVA Java Express Architecture Azure CI/CD database ML AWS ETL nest sql AntV Next Deep Learning Flutter TypeScript Angular DevTools Microsoft egg Tableau SAP Token Regexp Unit test Nginx nodeJS sails wechat Jmeter HTML2Canvas Swift Jenkins JS event GTM Algorithm Echarts React-Admin Rest React hook Flux Redux ES6 Route Component Ref AJAX Form JSX Virtual Dom Javascript CSS design pattern