
環球觀察:大數據NiFi(十八):離線同步MySQL數據到HDFS
?離線同步MySQL數據到HDFS
案例:使用NiFi將MySQL中數據導入到HDFS中。
(資料圖片僅供參考)
以上案例用到的處理器有“QueryDatabaseTable”、“ConvertAvroToJSON”、“SplitJson”、“PutHDFS”四個處理器。
一、配置“QueryDatabaseTable”處理器
該處理器主要使用提供的SQL語句或者生成SQL語句來查詢MySQL中的數據,查詢結果轉換成Avro格式。該處理器只能運行在主節點上。
關于“QueryDatabaseTable”處理器的“Properties”配置的說明如下:
配置項 | 默認值 | 允許值 | 描述 |
---|---|---|---|
Database Connection Pooling Service(數據庫連接池服務) | 用于獲得與數據庫的連接的Controller Service。 | ||
Database Type(數據庫類型) | Generic | 選擇數據庫類型。Generic 通用類型OracleOracle 12+MS SQL 2012+MS SQL 2008MySQLPostgreSQL | |
Table Name(表名) | 查詢數據庫的表名,當使用“Custom Query”時,此為查詢結果的別名,并作為FlowFile中的屬性。 | ||
Columns to Return(返回的列) | 查詢返回的列,多個列使用逗號分隔。如果列中有特殊名稱需要加引號,則所有列都需要加引號處理。 | ||
Additional WHERE clause(where條件) | 在構建SQL查詢時添加到WHERE條件中的自定義子句。 | ||
Custom Query(自定義SQL查詢) | 自定義的SQL語句。該查詢被構建成子查詢,設置后不會從其他屬性構建SQL查詢。自定義SQL不支持Order by查詢。 | ||
Maximum-value Columns(最大值列) | 指定增量查詢獲取最大值的列,多列使用逗號分開。指定后,這個處理器只能檢索到添加/更新的行。不能設置無法比較大小的列,例如:boolean/bit。如果不指定,則參照表中所有的列來查詢全量數據,這會對性能產生影響。 | ||
Max Wait Time(最大超時時間) | 0 seconds | SQL查詢最大時長,默認為0沒有限制,設置小于0的時間默認為0。 | |
Fetch Size(拉取數據量) | 0 | 每次從查詢結果中拉取的數據量。 | |
Max Rows Per Flow File(每個FlowFile行數) | 0 | 在一個FlowFile文件中的數據行數。通過這個參數可以將很大的結果集分到多個FlowFile中。默認設置為0,所有結果存入一個FlowFile。 | |
Output Batch Size(數據輸出批次量) | 0 | 輸出的FlowFile批次數據大小,當設置為0代表所有數據輸出到下游關系。如果數據量很大,則有可能下游很久沒有收到數據,如果設置了,則每次達到該數據量就釋放數據,傳輸到下游。 | |
Maximum Number of Fragments(最大片段數) | 0 | 設置返回的最大數據片段數,設置0默認將所有數據片段返回,如果表非常大,設置后可以防止OOM錯誤。 | |
Normalize Table/Column Names(標準表/列名) | false | truefalse | 是否將列名中不兼容avro的字符修改為兼容avro的字符。例如,冒號和句點將被更改為下劃線,以構建有效的Avro記錄。 |
Transaction Isolation Level | 設置事務隔離級別。 | ||
Use Avro Logical Types(使用Avro邏輯類型) | false | truefalse | 是否對DECIMAL/NUMBER, DATE, TIME 和 TIMESTAMP 列使用Avro邏輯類型。 |
Default Decimal Precision(Decimal數據類型位數) | 10 | 當 DECIMAL/NUMBER 數據類型轉換成Avro類型數據時,指定的數據位數。 | |
Default Decimal Scale(Decimal 數據類型小數位數) | 0 | 當 DECIMAL/NUMBER 數據類型轉換成Avro類型數據時,指定的小數點后的位數。 |
Table Name(表名)查詢數據庫的表名,當使用“Custom Query”時,此為查詢結果的別名,并作為FlowFile中的屬性。 Columns to Return (返回的列) 查詢返回的列,多個列使用逗號分隔。如果列中有特殊名稱需要加引號,則所有列都需要加引號處理。 Additional WHERE clause (where條件) 在構建SQL查詢時添加到WHERE條件中的自定義子句。 Custom Query (自定義SQL查詢) 自定義的SQL語句。該查詢被構建成子查詢,設置后不會從其他屬性構建SQL查詢。自定義SQL不支持Order by查詢。 Maximum-value Columns (最大值列) 指定增量查詢獲取最大值的列,多列使用逗號分開。指定后,這個處理器只能檢索到添加/更新的行。不能設置無法比較大小的列,例如:boolean/bit。如果不指定,則參照表中所有的列來查詢全量數據,這會對性能產生影響。 Max Wait Time(最大超時時間)0 seconds SQL查詢最大時長,默認為0沒有限制,設置小于0的時間默認為0。 Fetch Size(拉取數據量)0 每次從查詢結果中拉取的數據量。 Max Rows Per Flow File(每個FlowFile行數)0 在一個FlowFile文件中的數據行數。通過這個參數可以將很大的結果集分到多個FlowFile中。默認設置為0,所有結果存入一個FlowFile。 Output Batch Size(數據輸出批次量)0 輸出的FlowFile批次數據大小,當設置為0代表所有數據輸出到下游關系。如果數據量很大,則有可能下游很久沒有收到數據,如果設置了,則每次達到該數據量就釋放數據,傳輸到下游。 Maximum Number of Fragments(最大片段數)0 設置返回的最大數據片段數,設置0默認將所有數據片段返回,如果表非常大,設置后可以防止OOM錯誤。 Normalize Table/Column Names(標準表/列名)false true false 是否將列名中不兼容avro的字符修改為兼容avro的字符。例如,冒號和句點將被更改為下劃線,以構建有效的Avro記錄。 Transaction Isolation Level 設置事務隔離級別。 Use Avro Logical Types(使用Avro邏輯類型)false true false 是否對DECIMAL/NUMBER, DATE, TIME 和 TIMESTAMP 列使用Avro邏輯類型。 Default Decimal Precision(Decimal數據類型位數)10 當 DECIMAL/NUMBER 數據類型轉換成Avro類型數據時,指定的數據位數。 Default Decimal Scale(Decimal 數據類型小數位數)0 當 DECIMAL/NUMBER 數據類型轉換成Avro類型數據時,指定的小數點后的位數。
配置步驟如下:
1、新建“QueryDatabaseTable”處理器
2、配置“SCHEDULING”調度時間
這里調度時間配置為99999s,讀取數據庫,這里讀取一次即可,默認0會不間斷讀取數據庫會對服務器造成非常大壓力。執行僅支持“Primary”主節點運行。
3、配置“PROPERTIES”
配置“Database Connection Pooling Service”選擇創建,在彈出頁面中可以按照默認選擇直接點擊“Create”。
點擊“->”繼續配置MySQL連接:
在彈出的頁面中填入:
連接MysqlURL:jdbc:mysql://192.168.179.5:3306/mynifi?characterEncoding=UTF-8&useSSL=false
MySQL驅動類:com.mysql.jdbc.DriverMySQL jar包路徑:需要提前在NiFI集群各個節點上創建對應目錄并上傳jar包。連接mysql的用戶名和密碼。通過以上配置好連接mysql如下:
配置其他屬性如下:
二、???????配置“ConvertAvroToJSON”處理器
此處理器是將二進制Avro記錄轉換為JSON對象,提供了一個從Avro字段到JSON字段的直接映射,這樣得到的JSON將具有與Avro文檔相同的層次結構。輸出的JSON編碼為UTF-8編碼,如果傳入的FlowFile包含多個Avro記錄,則轉換后的FlowFile是一個含有所有Avro記錄的JSON數組或一個JSON對象序列(每個Json對象單獨成行)。如果傳入的FlowFile不包含任何記錄,則輸出一個空JSON對象。
關于“ConvertAvroToJSON”處理器的“Properties”配置的說明如下:
配置項 | 默認值 | 允許值 | 描述 |
---|---|---|---|
JSON container options(Json選擇) | array | nonearray | 如何解析Json對象,none:解析Json將每個Json對象寫入新行。array:解析到的json存入JsonArray一個對象 |
Wrap Single Record(數據庫類型) | false | truefalse | 指定解析到的空記錄或者單條記錄是否按照“JSON container options”配置包裝對象。 |
Avro schema(表名) | 如果Avro數據沒有Schema信息,需要配置。 |
配置步驟如下:
1、創建“ConvertAvroToJSON”處理器
2、配置“PROPERTIES”
3、連接“QueryDatabaseTable”處理器和“CovertAvroToJSON”處理器
連接好兩個處理器后,可以配置“Connection”為負載均衡方式傳遞數據:
三、???????配置“SplitJson”處理器
該處理器使用JsonPath表達式指定需要的Json數組元素,將Json數組中的多個Json對象切分出來,形成多個FlowFile。每個生成的FlowFile都由指定數組中的一個元素組成,并傳輸到關系"split",原始文件傳輸到關系"original"。如果沒有找到指定的JsonPath,或者沒有對數組元素求值,則將原始文件路由到"failure",不會生成任何文件。
關于“SplitJson”處理器的“Properties”配置的說明如下:
配置項 | 默認值 | 允許值 | 描述 |
---|---|---|---|
JsonPath Expression(Json表達式) | 一個JsonPath表達式,它指定用以分割的數組元素。 | ||
Null Value Representation(Null值表示) | empty string | empty stringthe string "null" | 指定結果為空值時的表示形式。 |
配置步驟如下:
1、創建“SplitJson”處理器
2、配置“PROPERTIES”
3、連接“ConvertAvroToJSON”處理器和“SplitJson”處理器
連接后,連接關系選擇“success”:
同時配置“ConverAvroToJSON”處理失敗的數據自動終止:
四、配置“PutHDFS”處理器
該處理器是將FlowFile數據寫入到HDFS分布式文件系統中。關于“PutHDFS”處理器的“Properties”主要配置的說明如下:
配置項 | 默認值 | 允許值 | 描述 |
---|---|---|---|
Hadoop Configuration Resources(Hadoop配置) | nonearray | HDFS配置文件,一個文件或者由逗號分隔的多個文件。不配置將在ClassPath中尋找‘core-site.xml’或者‘hdfs-site.xml’文件。 | |
Directory(目錄) | 需要寫入文件的HDFS父目錄。如果目錄不存在,將創建該目錄。 | ||
Conflict Resolution Strategy(沖突解決) | fail | replaceignorefailappend | 指示當輸出目錄中已經存在同名文件時如何處理。 |
配置步驟如下:
1、創建“PutHDFS”處理器
2、配置“PROPERTIES”
注意:以上需要在各個NiFi集群節點上創建“/root/test”目錄,并且在該目錄下上傳hdfs-site.xml和core-site.xml文件。
3、連接“SplitJson”處理器和“PutHDFS”處理器
同時設置“SplitJson”處理器中“failure”和“original”數據關系自動終止。
設置“PutHDFS”處理器“success”和“failure”數據關系自動終止:
配置好的連接關系如下:
五、??????????????運行測試
1、在MySQL創建庫“mynifi”,并且創建表“test1”,向表中插入10條數據
mysql> create database mynifi;Query OK, 1 row affected (0.02 sec)mysql> use mynifi;Database changedmysql> create table test1(id int,name varchar(255),age int );Query OK, 0 rows affected (0.07 sec)mysql> insert into test1 values (1,"zs",18),(2,"ls",19),(3,"ww",20),(4,"ml",21),(5,"tt",22)
2、首先啟動“QueryDatabaseTable”處理器觀察隊列數據
3、單獨啟動“ConvertAvroToJson”處理器觀察隊列數據
4、單獨啟動“SplitJson”處理器觀察隊列數據
5、單獨啟動“PutHDFS”處理器觀察HDFS對應目錄數據
查看數據:
注意:
如果在“QueryDatabaseTable”處理器中設置增屬性“Maximum-value Columns”為id,那么每次查詢都是大于id的增量數據。如果想要存入HDFS文件為多行而不是一行,可以將“CovertAvroToJson”處理器屬性“JSON container options”設置為none,直接解析Avro文件得到一個個json數據,然后直接連接“PutHDFS”處理器即可。標簽: PostgreSQL JSON 云數據庫 Server
-
15
2023-02傳iPhone15Pro將擁有更窄的邊框 2023年秋季發布
一位通常準確的消息人士再次強調了他們之前關于iPhone15 Pro將擁有更窄的邊框的報告,盡管沒有透露任何新的細節。預計iPhone 15 Pro將于 -
14
2023-02中正評測曝光RTX4060跑分 性能比3060強太多
近日,數碼博主中正評測曝光了RTX4060的跑分成績。據悉,這次測試采用的是搭載i9-13900HX處理器的雷神ZERO。硬件方面,RTX 4060擁有3072個 -
06
2023-02惠普新款IPSBlack面板顯示器上架 配備雷電4接口
惠普新款IPS Black面板顯示器Z32k G3已上架,價格為6999元。新款顯示器最大的亮點就是配備雷電4接口,擁有40Gbps的數據傳輸速率,通過單U -
03
2023-02售價15999元!蘋果全新MacBookPro上市開售
2月3日消息,今日,蘋果全新MacBook Pro正式上市開售,搭載M2 Pro芯片的14英寸版本起售價15999元,M2 Pro芯片16英寸版本起售價為19999 -
02
2023-02三星GalaxyS23系列正式發布 搭載高通第二代驍龍8系
今日凌晨2點,三星Galaxy S23系列正式發布。和上一代不同,Galaxy S23全系搭載高通第二代驍龍8移動平臺,沒有Exynos版本。不僅如此,Gala -
15
2022-11vivoX90系列正式發布 蔡司一英寸T+自研芯片
在當下的手機市場環境中,為了尋求突圍行業瓶頸,廠商們都把發力點之一放到了提升創新性方面。包括對手機形態、具體關鍵技術等的改善。尤其 -
14
2022-11驍龍8Gen2性能再提升20% 單核跑出了1524分
今年的高通驍龍峰會提前到了11月15日在美國夏威夷舉行,無疑,此次的絕對主角將是驍龍8 Gen2。已經見識過驍龍8 Gen2樣機的爆料人Yogesh -
14
2022-11魅藍lifeme140W氮化鎵充電器發布 體積縮小了20%以上
魅藍lifeme推出了一款新品140W GaN快速充電器,售價369元。這款140W氮化鎵充電器外觀設計簡約,采用純白配色,折疊式插腳設計方案,體積較 -
14
2022-11iPhone14系列Pro版銷量喜人 組裝工廠目前產能大幅降低
iPhone 14系列自從發布以來,標準版就明顯預冷,但Pro版熱度一直居高不下,銷量喜人。此前蘋果發布的第三季度財報還顯示,iPhone銷量明顯 -
14
2022-11全球首款12英寸高分辨率可拉伸顯示屏發布 具有全彩色RGB功能
LG顯示 (LG Display) 發布了全球首款12英寸高分辨率可拉伸顯示屏 (Stretchable Display)。據了解,這款顯示屏具有橡皮筋般的柔韌性, -
24
2022-05俄談判代表團團長:從未拒絕與烏克蘭談判
新華社明斯克5月22日電(記者魯金博)俄羅斯談判代表團團長、總統助理梅金斯基22日在接受白俄羅斯媒體視頻采訪時 -
24
2022-05加強核心技術攻關 著力解決“好用”問題
“擔大任,要有大能力。特別是要加快培育農機原創技術策源地,圍繞解決‘卡脖子’問題,深入開展重點產品核心技術


卷出一塊好曲屏 真我10系列新品發布會舉行
英國猴痘病例數預計將大幅上升
上海:視情適當延長畢業生在校生身份時間
國家電網確定新型電力系統科技攻關十大重點項目
比亞迪發布CTB電池車身一體化技術
商務部:堅定致力于實現全面、高水平的亞太自貿區
中辦國辦印發《意見》 推進實施國家文化數字化戰略
初夏看市場:“菜籃子”產品生產供應充足 蔬菜在田面積達9877.2萬畝
上海浦東重點生產企業復工復產超1100家
泰國羽毛球公開賽落幕 中國隊收獲一冠兩亞
-
1
配置拉滿的電競神機 雷神ZERO2023大黃蜂發布
-
2
真我10Pro系列發布 首發量產2160Hz超高頻調光技術
-
3
阿富汗塔利班組建正規軍
-
4
薩赫勒地區反恐形勢面臨新變數
-
5
北約北擴加劇歐洲安全風險
-
6
貴州畢節七星關區百所學校創辦百個“紅軍班”
-
7
湖北省孝感軍分區組織軍地聯合應急救援研究性演練
-
8
青藏高原等區域將新設一批國家公園
-
9
河北省承德軍分區退役軍人擔綱教練主力
-
10
軍校“大思政”:畫出教育同心圓