數據專欄

智能大數據搬運工,你想要的我們都有

科技資訊:

科技學院:

科技百科:

科技書籍:

網站大全:

軟件大全:

matplotlib是常用的繪圖庫,支持python及在Jupyter Notebook下使用,也支持最新的JupyterLab環境。這里介紹matplotlib的字體設置方法以及繪圖線型、符號和顏色的設置。 1、中文字體 關于中文亂碼問題, https://www.linuxidc.com/Linux/2019-03/157632.htm Linux下查找中文字體,使用命令: fc-list :lang=zh 繪圖函數參考下面: def draw(dfx): myfont = FontProperties(fname='/usr/share/fonts/truetype/arphic/ukai.ttc',size=24) fig=plt.figure(figsize=(48,12), dpi=250) p1=fig.add_subplot(1,1,1) p1.set_xticklabels(dfx['日期'], rotation=15, fontsize='small',fontproperties=myfont) #顯示數據。 p1.plot(dfx['日期'],dfx['新增確診'],color='red',linewidth=3,label='新增確診') p1.bar(dfx['日期'],dfx['新增死亡'],color='black',label='新增死亡') plt.title(u'全國新增病例數量(NCP)-2020年01-02月',fontproperties=myfont) plt.legend(loc=0,ncol=1,prop=myfont) plt.grid(True) plt.gcf().autofmt_xdate() plt.show() 需要字體設置的主要有標題、圖例和軸上的標簽。先創建一個myfont對象,然后在title、legend、set_xticklabel放進去即可,注意legend的參數是prop。 2、圖形繪制 這里給出個簡單的使用matplotlib繪圖例子,使用了顏色、線寬、線型符號等風格樣式。 import matplotlib.pyplot as plt from matplotlib.font_manager import * x= range(100) y= [i**2 for i in x] plt.subplots(1, 1) plt.plot(x, y, linewidth = '1', label = 'Example', color='coral', linestyle=':', marker='|') plt.legend(loc='upper left') plt.show() 3、線型表 linestyle可選參數: '-' solid line style '--' dashed line style '-.' dash-dot line style ':' dotted line style 4、符號表 marker可選參數: '.' point marker ',' pixel marker 'o' circle marker 'v' triangle_down marker '^' triangle_up marker '<' triangle_left marker '>' triangle_right marker '1' tri_down marker '2' tri_up marker '3' tri_left marker '4' tri_right marker 's' square marker 'p' pentagon marker '*' star marker 'h' hexagon1 marker 'H' hexagon2 marker '+' plus marker 'x' x marker 'D' diamond marker 'd' thin_diamond marker '|' vline marker '_' hline marker 5、顏色表 color可用的顏色: cnames = { 'aliceblue': '#F0F8FF', 'antiquewhite': '#FAEBD7', 'aqua': '#00FFFF', 'aquamarine': '#7FFFD4', 'azure': '#F0FFFF', 'beige': '#F5F5DC', 'bisque': '#FFE4C4', 'black': '#000000', 'blanchedalmond': '#FFEBCD', 'blue': '#0000FF', 'blueviolet': '#8A2BE2', 'brown': '#A52A2A', 'burlywood': '#DEB887', 'cadetblue': '#5F9EA0', 'chartreuse': '#7FFF00', 'chocolate': '#D2691E', 'coral': '#FF7F50', 'cornflowerblue': '#6495ED', 'cornsilk': '#FFF8DC', 'crimson': '#DC143C', 'cyan': '#00FFFF', 'darkblue': '#00008B', 'darkcyan': '#008B8B', 'darkgoldenrod': '#B8860B', 'darkgray': '#A9A9A9', 'darkgreen': '#006400', 'darkkhaki': '#BDB76B', 'darkmagenta': '#8B008B', 'darkolivegreen': '#556B2F', 'darkorange': '#FF8C00', 'darkorchid': '#9932CC', 'darkred': '#8B0000', 'darksalmon': '#E9967A', 'darkseagreen': '#8FBC8F', 'darkslateblue': '#483D8B', 'darkslategray': '#2F4F4F', 'darkturquoise': '#00CED1', 'darkviolet': '#9400D3', 'deeppink': '#FF1493', 'deepskyblue': '#00BFFF', 'dimgray': '#696969', 'dodgerblue': '#1E90FF', 'firebrick': '#B22222', 'floralwhite': '#FFFAF0', 'forestgreen': '#228B22', 'fuchsia': '#FF00FF', 'gainsboro': '#DCDCDC', 'ghostwhite': '#F8F8FF', 'gold': '#FFD700', 'goldenrod': '#DAA520', 'gray': '#808080', 'green': '#008000', 'greenyellow': '#ADFF2F', 'honeydew': '#F0FFF0', 'hotpink': '#FF69B4', 'indianred': '#CD5C5C', 'indigo': '#4B0082', 'ivory': '#FFFFF0', 'khaki': '#F0E68C', 'lavender': '#E6E6FA', 'lavenderblush': '#FFF0F5', 'lawngreen': '#7CFC00', 'lemonchiffon': '#FFFACD', 'lightblue': '#ADD8E6', 'lightcoral': '#F08080', 'lightcyan': '#E0FFFF', 'lightgoldenrodyellow': '#FAFAD2', 'lightgreen': '#90EE90', 'lightgray': '#D3D3D3', 'lightpink': '#FFB6C1', 'lightsalmon': '#FFA07A', 'lightseagreen': '#20B2AA', 'lightskyblue': '#87CEFA', 'lightslategray': '#778899', 'lightsteelblue': '#B0C4DE', 'lightyellow': '#FFFFE0', 'lime': '#00FF00', 'limegreen': '#32CD32', 'linen': '#FAF0E6', 'magenta': '#FF00FF', 'maroon': '#800000', 'mediumaquamarine': '#66CDAA', 'mediumblue': '#0000CD', 'mediumorchid': '#BA55D3', 'mediumpurple': '#9370DB', 'mediumseagreen': '#3CB371', 'mediumslateblue': '#7B68EE', 'mediumspringgreen': '#00FA9A', 'mediumturquoise': '#48D1CC', 'mediumvioletred': '#C71585', 'midnightblue': '#191970', 'mintcream': '#F5FFFA', 'mistyrose': '#FFE4E1', 'moccasin': '#FFE4B5', 'navajowhite': '#FFDEAD', 'navy': '#000080', 'oldlace': '#FDF5E6', 'olive': '#808000', 'olivedrab': '#6B8E23', 'orange': '#FFA500', 'orangered': '#FF4500', 'orchid': '#DA70D6', 'palegoldenrod': '#EEE8AA', 'palegreen': '#98FB98', 'paleturquoise': '#AFEEEE', 'palevioletred': '#DB7093', 'papayawhip': '#FFEFD5', 'peachpuff': '#FFDAB9', 'peru': '#CD853F', 'pink': '#FFC0CB', 'plum': '#DDA0DD', 'powderblue': '#B0E0E6', 'purple': '#800080', 'red': '#FF0000', 'rosybrown': '#BC8F8F', 'royalblue': '#4169E1', 'saddlebrown': '#8B4513', 'salmon': '#FA8072', 'sandybrown': '#FAA460', 'seagreen': '#2E8B57', 'seashell': '#FFF5EE', 'sienna': '#A0522D', 'silver': '#C0C0C0', 'skyblue': '#87CEEB', 'slateblue': '#6A5ACD', 'slategray': '#708090', 'snow': '#FFFAFA', 'springgreen': '#00FF7F', 'steelblue': '#4682B4', 'tan': '#D2B48C', 'teal': '#008080', 'thistle': '#D8BFD8', 'tomato': '#FF6347', 'turquoise': '#40E0D0', 'violet': '#EE82EE', 'wheat': '#F5DEB3', 'white': '#FFFFFF', 'whitesmoke': '#F5F5F5', 'yellow': '#FFFF00', 'yellowgreen': '#9ACD32'} 顏色樣本如下: 另外: 如果安裝了seaborn擴展的話,在字典seaborn.xkcd_rgb中包含所有的xkcd crowdsourced color names。使用方法如下: plt.plot([1,2], lw=4, c=seaborn.xkcd_rgb['baby poop green']) 所有顏色表如下:
來源:OSCHINA
發布時間:2020-02-10 15:57:00
前言 首先介紹下在本文出現的幾個比較重要的概念: 函數計算(Function Compute) : 函數計算 是一個事件驅動的服務,通過函數計算,用戶無需管理服務器等運行情況,只需編寫代碼并上傳。函數計算準備計算資源,并以彈性伸縮的方式運行用戶代碼,而用戶只需根據實際代碼運行所消耗的資源進行付費。函數計算更多信息 參考 。 Fun : Fun 是一個用于支持 Serverless 應用部署的工具,能幫助您便捷地管理函數計算、API 網關、日志服務等資源。它通過一個資源配置文件(template.yml),協助您進行開發、構建、部署操作。Fun 的更多文檔 參考 。 備注: 本文介紹的技巧需要 Fun 版本大于等于 3.5.0。 依賴工具 本項目是在 MacOS 下開發的,涉及到的工具是平臺無關的,對于 Linux 和 Windows 桌面系統應該也同樣適用。在開始本例之前請確保如下工具已經正確的安裝,更新到最新版本,并進行正確的配置。 Docker Fun Fun 工具依賴于 docker 來模擬本地環境。 對于 MacOS 用戶可以使用 homebrew 進行安裝: brew cask install docker brew tap vangie/formula brew install fun Windows 和 Linux 用戶安裝請參考: https://github.com/aliyun/fun/blob/master/docs/usage/installation.md https://github.com/aliyun/fcli/releases 安裝好后,記得先執行 fun config 初始化一下配置。 初始化 使用 fun init 命令可以快捷的將本模板項目初始化到本地。 fun init vangie/puppeteer-example 安裝依賴 fun install fun install 會執行 Funfile 文件里的指令,依次執行如下任務: 安裝 chrome headless 二進制文件; 安裝 puppeteer 依賴的 apt 包; 安裝 npm 依賴。 部署 同步大文件到 nas 盤: fun nas sync 部署代碼: $ fun deploy using template: template.yml using region: cn-hangzhou using accountId: *********** 3743 using accessKeyId: ***********Ptgk using timeout: 600 Waiting for service puppeteer to be deployed... make sure role 'aliyunfcgeneratedrole-cn-hangzhou-puppeteer' is exist role 'aliyunfcgeneratedrole-cn-hangzhou-puppeteer' is already exist attaching police 'AliyunECSNetworkInterfaceManagementAccess' to role: aliyunfcgeneratedrole-cn-hangzhou-puppeteer attached police 'AliyunECSNetworkInterfaceManagementAccess' to role: aliyunfcgeneratedrole-cn-hangzhou-puppeteer using 'VpcConfig: Auto' , Fun will try to generate related vpc resources automatically vpc already generated, vpcId is : vpc-bp1wv9al02opqahkizmvr vswitch already generated, vswitchId is : vsw-bp1kablus0jrcdeth8v35 security group already generated, security group is : sg-bp1h2swzeb5vgjfu6gpo generated auto VpcConfig done: { "vpcId" : "vpc-bp1wv9al02opqahkizmvr" , "vswitchIds" :[ "vsw-bp1kablus0jrcdeth8v35" ], "securityGroupId" : "sg-bp1h2swzeb5vgjfu6gpo" } using 'NasConfig: Auto' , Fun will try to generate related nas file system automatically nas file system already generated, fileSystemId is : 0825 a4a395 nas file system mount target is already created, mountTargetDomain is : 0825 a4a395-rrf16.cn-hangzhou.nas.aliyuncs.com generated auto NasConfig done: { "UserId" : 10003 , "GroupId" : 10003 , "MountPoints" :[{ "ServerAddr" : "0825a4a395-rrf16.cn-hangzhou.nas.aliyuncs.com:/puppeteer" , "MountDir" : "/mnt/auto" }]} Checking if nas directories /puppeteer exists, if not, it will be created automatically Checking nas directories done [ "/puppeteer" ] Waiting for function html2png to be deployed... Waiting for packaging function html2png code... The function html2png has been packaged. A total of 7 files files were compressed and the final size was 2.56 KB Waiting for HTTP trigger httpTrigger to be deployed... triggerName: httpTrigger methods: [ 'GET' ] url: https: //xxxxxx.cn-hangzhou.fc.aliyuncs.com/2016-08-15/proxy/puppeteer/html2png/ Http Trigger will forcefully add a 'Content-Disposition: attachment' field to the response header, which cannot be overwritten and will cause the response to be downloaded as an attachment in the browser. This issue can be avoided by using CustomDomain. trigger httpTrigger deploy success function html2png deploy success service puppeteer deploy success ===================================== Tips for nas resources ================================================== Fun has detected the .nas.yml file in your working directory, which contains the local directory: /Users/vangie/Workspace/puppeteer-example/{{ projectName }}/.fun/root /Users/vangie/Workspace/puppeteer-example/{{ projectName }}/node_modules The above directories will be automatically ignored when 'fun deploy' . Any content of the above directories changes,you need to use 'fun nas sync' to sync local resources to remote. =============================================================================================================== 驗證 curl https://xxxxxx.cn-hangzhou.fc.aliyuncs.com/2016-08-15/proxy/puppeteer/html2png/ > screenshot.png 如果不傳遞查詢參數,默認會截取阿里云的首頁。 如果想換一個網址,可以使用如下命令格式: curl https://xxxxxx.cn-hangzhou.fc.aliyuncs.com/2016-08-15/proxy/puppeteer/html2png/?url=http://www.alibaba.com > screenshot.png 調試 如果需要在本地調試代碼,可以使用如下命令: $ fun local start using template : template.yml HttpTrigger httpTrigger of puppeteer/html2png was registered url : http ://localhost: 8000 / 2016 -08 -15 /proxy/puppeteer/html2png methods: [ 'GET' ] authType: ANONYMOUS function compute app listening on port 8000 ! 瀏覽器打開 http://localhost:8000/2016-08-15/proxy/puppeteer/html2png 即可。 查看更多:https://yq.aliyun.com/articles/743644?utm_content=g_1000103099 上云就看云棲號:更多云資訊,上云案例,最佳實踐,產品入門,訪問:https://yqh.aliyun.com/
來源:OSCHINA
發布時間:2020-02-10 15:55:00
這是一個包含了 函數計算 每種 Runtime 結合 HTTP Trigger 實現文件上傳和文件下載的示例集。每個示例包括: 一個公共 HTML 頁面,該頁面有一個文件選擇框和上傳按鈕,會列出已經上傳的文件,點擊某個已上傳的文件可以把文件下載下來; 支持文件上傳、下載和列舉的函數。 我們知道不同語言在處理 HTTP 協議上傳下載時都有很多中方法和社區庫,特別是結合函數計算的場景,開發人員往往需要耗費不少精力去學習和嘗試。本示例集編撰的目的就是節省開發者甄別的精力和時間,為每種語言提供一種有效且符合社區最佳實踐的方法,可以拿來即用。 當前已支持的 Runtime 包括: nodejs python php java 計劃支持的 Runtime 包括: dotnetcore 不打算支持的 Runtime 包括: custom 使用限制 由于函數計算對于 HTTP 的 Request 和 Response 的 Body 大小限制均為 6M,所以該示例集只適用于借助函數計算上傳和下載文件小于 6M 的場景。對于大于 6M 的情況,可以考慮如下方法: 分片上傳 ,把文件切分成小塊,上傳以后再拼接起來; 借助于 OSS ,將文件先上傳 OSS,函數從 OSS 上下載文件,處理完以后回傳 OSS; 借助于 NAS ,將大文件放在 NAS 網盤上,函數可以像讀寫普通文件系統一樣訪問 NAS 網盤的文件。 快速開始 安裝依賴 在開始之前請確保開發環境已經安裝了如下工具: docker funcraft git make 構建并啟動函數 克隆代碼: git clone https://github.com/vangie/ fc -file-transfer 本地啟動函數: $ make start ... HttpTrigger httpTrigger of file -transfer/nodejs was registered url : http ://localhost: 8000 / 2016 -08 -15 /proxy/ file -transfer/nodejs methods: [ 'GET' , 'POST' ] authType: ANONYMOUS HttpTrigger httpTrigger of file -transfer/python was registered url : http ://localhost: 8000 / 2016 -08 -15 /proxy/ file -transfer/python methods: [ 'GET' , 'POST' ] authType: ANONYMOUS HttpTrigger httpTrigger of file -transfer/ java was registered url : http ://localhost: 8000 / 2016 -08 -15 /proxy/ file -transfer/ java methods: [ 'GET' , 'POST' ] authType: ANONYMOUS HttpTrigger httpTrigger of file -transfer/php was registered url : http ://localhost: 8000 / 2016 -08 -15 /proxy/ file -transfer/php methods: [ 'GET' , 'POST' ] authType: ANONYMOUS function compute app listening on port 8000 ! make start 命令會調用 Makefile 文件中的指令,通過 fun local 在本地的 8000 端口開放 HTTP 服務,控制臺會打印出每個 HTTP Trigger 的 URL 、支持的 HTTP 方法,以及認證方式。 效果演示 上面四個 URL 地址隨便選一個在瀏覽器中打開示例頁面。 接口說明 所有示例都實現了下述四個 HTTP 接口: GET / 返回文件上傳 Form 的 HTML 頁面 GET /list 以 JSON 數組形式返回文件列表 POST /upload 以 multipart/form-data 格式上傳文件 fileContent 作為文件字段 fileName 作為文件名字段 GET /download?filename=xxx 以 application/octet-stream 格式返回文件內容。 此外為了能正確的計算相對路徑,在訪問根路徑時如果不是以 / 結尾,都會觸發一個 301 跳轉,在 URL 末尾加上一個 / 。 不同語言的示例代碼 nodejs python php java 已知問題 文件大小 限制 fun local 實現存在已知問題,上傳過大的文件會自動退出,未來的版本會修復。 部署到線上需要綁定 自定義域名 才能使用,否則 HTML 文件在瀏覽器中會被 強制下載 而不是直接渲染。 查看更多:https://yq.aliyun.com/articles/743642?utm_content=g_1000103098 上云就看云棲號:更多云資訊,上云案例,最佳實踐,產品入門,訪問:https://yqh.aliyun.com/
來源:OSCHINA
發布時間:2020-02-10 15:42:00
導讀: 容器存儲是 Kubernetes 系統中提供數據持久化的基礎組件,是實現有狀態服務的重要保證。Kubernetes 默認提供了主流的存儲卷接入方案(In-Tree),同時也提供了插件機制(Out-Of-Tree),允許其他類型的存儲服務接入 Kubernetes 系統服務。本文將從 Kubernetes 存儲架構、存儲插件原理、實現等方面進行講解,希望大家有所收獲。 一、Kubernetes 存儲體系架構 引例: 在 Kubernetes 中掛載一個 Volume 首先以一個 Volume 的掛載例子來作為引入。 如下圖所示,左邊的 YAML 模板定義了一個 StatefulSet 的一個應用,其中定義了一個名為 disk-pvc 的 volume,掛載到 Pod 內部的目錄是 /data。disk-pvc 是一個 PVC 類型的數據卷,其中定義了一個 storageClassName。 因此這個模板是一個典型的動態存儲的模板。右圖是數據卷掛載的過程,主要分為 6 步: 第一步 :用戶創建一個包含 PVC的 Pod; 第二步 :PV Controller 會不斷觀察 ApiServer,如果它發現一個 PVC 已經創建完畢但仍然是未綁定的狀態,它就會試圖把一個 PV 和 PVC 綁定; PV Controller 首先會在集群內部找到一個適合的 PV 進行綁定,如果未找到相應的 PV,就調用 Volume Plugin 去做 Provision。Provision 就是從遠端上一個具體的存儲介質創建一個 Volume,并且在集群中創建一個 PV 對象,然后將此 PV 和 PVC 進行綁定; 第三步 :通過 Scheduler 完成一個調度功能; 我們知道,當一個 Pod 運行的時候,需要選擇一個 Node,這個節點的選擇就是由 Scheduler 來完成的。Scheduler 進行調度的時候會有多個參考量,比如 Pod 內部所定義的 nodeSelector、nodeAffinity 這些定義以及 Volume 中所定義的一些標簽等。 我們可以在數據卷中添加一些標簽,這樣使用這個 pv 的 Pod 就會由于標簽的限制,被調度器調度到期望的節點上。 第四步 :如果有一個 Pod 調度到某個節點之后,它所定義的 PV 還沒有被掛載(Attach),此時 AD Controller 就會調用 VolumePlugin,把遠端的 Volume 掛載到目標節點中的設備上(如:/dev/vdb); 第五步: 當 Volum Manager 發現一個 Pod 調度到自己的節點上并且 Volume 已經完成了掛載,它就會執行 mount 操作,將本地設備(也就是剛才得到的 /dev/vdb)掛載到 Pod 在節點上的一個子目錄中。同時它也可能會做一些像格式化、是否掛載到 GlobalPath 等這樣的附加操作。 第六步 :綁定操作,就是將已經掛載到本地的 Volume 映射到容器中。 Kubernetes 的存儲架構 接下來,我們一起看一下 Kubernetes 的存儲架構。 PV Controller : 負責 PV/PVC 的綁定、生命周期管理,并根據需求進行數據卷的 Provision/Delete 操作; AD Controller :負責存儲設備的 Attach/Detach 操作,將設備掛載到目標節點; Volume Manager :管理卷的 Mount/Unmount 操作、卷設備的格式化以及掛載到一些公用目錄上的操作; Volume Plugins :它主要是對上面所有掛載功能的實現; PV Controller、AD Controller、Volume Manager 主要是進行操作的調用,而具體操作則是由 Volume Plugins 實現的。 Scheduler :實現對 Pod 的調度能力,會根據一些存儲相關的的定義去做一些存儲相關的調度; 接下來,我們分別介紹上面這幾部分的功能。 PV Controller 首先我們先來回顧一下幾個基本概念: Persistent Volume (PV) : 持久化存儲卷,詳細定義了預掛載存儲空間的各項參數; 例如,我們去掛載一個遠端的 NAS 的時候,這個 NAS 的具體參數就要定義在 PV 中。一個 PV 是沒有 NameSpace 限制的,它一般由 Admin 來創建與維護; Persistent Volume Claim (PVC) :持久化存儲聲明; 它是用戶所使用的存儲接口,對存儲細節無感知,主要是定義一些基本存儲的 Size、AccessMode 參數在里面,并且它是屬于某個 NameSpace 內部的。 StorageClass :存儲類; 一個動態存儲卷會按照 StorageClass 所定義的模板來創建一個 PV,其中定義了創建模板所需要的一些參數和創建 PV 的一個 Provisioner(就是由誰去創建的)。 PV Controller 的主要任務就是完成 PV、PVC 的生命周期管理,比如創建、刪除 PV 對象,負責 PV、PVC 的狀態遷移;另一個任務就是綁定 PVC 與 PV 對象,一個 PVC 必須和一個 PV 綁定后才能被應用使用,它們是一一綁定的,一個 PV 只能被一個 PVC 綁定,反之亦然。 接下來,我們看一下一個 PV 的狀態遷移圖。 創建好一個 PV 以后,我們就處于一個 Available 的狀態,當一個 PVC 和一個 PV 綁定的時候,這個 PV 就進入了 Bound 的狀態,此時如果我們把 PVC 刪掉,Bound 狀態的 PV 就會進入 Released 的狀態。 一個 Released 狀態的 PV 會根據自己定義的 ReclaimPolicy 字段來決定自己是進入一個 Available 的狀態還是進入一個 Deleted 的狀態。如果 ReclaimPolicy 定義的是 "recycle" 類型,它會進入一個 Available 狀態,如果轉變失敗,就會進入 Failed 的狀態。 相對而言,PVC 的狀態遷移圖就比較簡單。 一個創建好的 PVC 會處于 Pending 狀態,當一個 PVC 與 PV 綁定之后,PVC 就會進入 Bound 的狀態,當一個 Bound 狀態的 PVC 的 PV 被刪掉之后,該 PVC 就會進入一個 Lost 的狀態。對于一個 Lost 狀態的 PVC,它的 PV 如果又被重新創建,并且重新與該 PVC 綁定之后,該 PVC 就會重新回到 Bound 狀態。 下圖是一個 PVC 去綁定 PV 時對 PV 篩選的一個流程圖。就是說一個 PVC 去綁定一個 PV 的時候,應該選擇一個什么樣的 PV 進行綁定。 首先 它會檢查 VolumeMode 這個標簽,PV 與 PVC 的 VolumeMode 標簽必須相匹配。VolumeMode 主要定義的是我們這個數據卷是文件系統 (FileSystem) 類型還是一個塊 (Block) 類型; 第二個部分 是 LabelSelector。當 PVC 中定義了 LabelSelector 之后,我們就會選擇那些有 Label 并且與 PVC 的 LabelSelector 相匹配的 PV 進行綁定; 第三個部分 是 StorageClassName 的檢查。如果 PVC 中定義了一個 StorageClassName,則必須有此相同類名的 PV 才可以被篩選中。 這里再具體解釋一下 StorageClassName 這個標簽,該標簽的目的就是說,當一個 PVC 找不到相應的 PV 時,我們就會用該標簽所指定的 StorageClass 去做一個動態創建 PV 的操作,同時它也是一個綁定條件,當存在一個滿足該條件的 PV 時,就會直接使用現有的 PV,而不再去動態創建。 第四個部分 是 AccessMode 檢查。 AccessMode 就是平時我們在 PVC 中定義的如 "ReadWriteOnce"、"RearWriteMany" 這樣的標簽。該綁定條件就是要求 PVC 和 PV 必須有匹配的 AccessMode,即 PVC 所需求的 AccessMode 類型,PV 必須具有。 最后 一個部分是 Size 的檢查。 一個 PVC 的 Size 必須小于等于 PV 的 Size,這是因為 PVC 是一個聲明的 Volume,實際的 Volume 必須要大于等于聲明的 Volume,才能進行綁定。 接下來,我們看一個 PV Controller 的一個實現。 PV Controller 中主要有兩個實現邏輯:一個是 ClaimWorker;一個是 VolumeWorker。 ClaimWorker 實現的是 PVC 的狀態遷移。 通過系統標簽 "pv.kubernetes.io/bind-completed" 來標識一個 PVC 的狀態。 如果該標簽為 True,說明我們的 PVC 已經綁定完成,此時我們只需要去同步一些內部的狀態; 如果該標簽為 False,就說明我們的 PVC 處于未綁定狀態。 這個時候就需要檢查整個集群中的 PV 去進行篩選。通過 findBestMatch 就可以去篩選所有的 PV,也就是按照之前提到的五個綁定條件來進行篩選。如果篩選到 PV,就執行一個 Bound 操作,否則就去做一個 Provision 的操作,自己去創建一個 PV。 再看 VolumeWorker 的操作。它實現的則是 PV 的狀態遷移。 通過 PV 中的 ClaimRef 標簽來進行判斷,如果該標簽為空,就說明該 PV 是一個 Available 的狀態,此時只需要做一個同步就可以了;如果該標簽非空,這個值是 PVC 的一個值,我們就會去集群中查找對應的 PVC。如果存在該 PVC,就說明該 PV 處于一個 Bound 的狀態,此時會做一些相應的狀態同步;如果找不到該 PVC,就說明該 PV 處于一個綁定過的狀態,相應的 PVC 已經被刪掉了,這時 PV 就處于一個 Released 的狀態。此時再根據 ReclaimPolicy 是否是 Delete 來決定是刪掉還是只做一些狀態的同步。 以上就是 PV Controller 的簡要實現邏輯。 AD Controller AD Controller 是 Attach/Detach Controller 的一個簡稱。 它有兩個核心對象,即 DesiredStateofWorld 和 ActualStateOfWorld。 DesiredStateofWorld 是集群中預期要達到的數據卷的掛載狀態; ActualStateOfWorld 則是集群內部實際存在的數據卷掛載狀態。 它有兩個核心邏輯,desiredStateOfWorldPopulator 和 Reconcile。 desiredStateOfWorldPopulator 主要是用來同步集群的一些數據以及 DSW、ASW 數據的更新,它會把集群里面,比如說我們創建一個新的 PVC、創建一個新的 Pod 的時候,我們會把這些數據的狀態同步到 DSW 中; Reconcile 則會根據 DSW 和 ASW 對象的狀態做狀態同步。它會把 ASW 狀態變成 DSW 狀態,在這個狀態的轉變過程中,它會去執行 Attach、Detach 等操作。 下面這個表分別給出了 desiredStateOfWorld 以及 actualStateOfWorld 對象的一個具體例子。 desiredStateOfWorld 會對每一個 Worker 進行定義,包括 Worker 所包含的 Volume 以及一些試圖掛載的信息; actualStateOfWorl 會把所有的 Volume 進行一次定義,包括每一個 Volume 期望掛載到哪個節點上、掛載的狀態是什么樣子的等等。 下圖是 AD Controller 實現的邏輯框圖。 從中我們可以看到,AD Controller 中有很多 Informer,Informer 會把集群中的 Pod 狀態、PV 狀態、Node 狀態、PVC 狀態同步到本地。 在初始化的時候會調用 populateDesireStateofWorld 以及 populateActualStateofWorld 將 desireStateofWorld、actualStateofWorld 兩個對象進行初始化。 在執行的時候,通過 desiredStateOfWorldPopulator 進行數據同步,即把集群中的數據狀態同步到 desireStateofWorld 中。reconciler 則通過輪詢的方式把 actualStateofWorld 和 desireStateofWorld 這兩個對象進行數據同步,在同步的時候,會通過調用 Volume Plugin 進行 attach 和 detach 操作,同時它也會調用 nodeStatusUpdater 對 Node 的狀態進行更新。 以上就是 AD Controller 的簡要實現邏輯。 Volume Manager Volume Manager 實際上是 Kubelet 中一部分,是 Kubelet 中眾多 Manager 的一個。它主要是用來做本節點 Volume 的 Attach/Detach/Mount/Unmount 操作。 它和 AD Controller 一樣包含有 desireStateofWorld 以及 actualStateofWorld,同時還有一個 volumePluginManager 對象,主要進行節點上插件的管理。在核心邏輯上和 AD Controller 也類似,通過 desiredStateOfWorldPopulator 進行數據的同步以及通過 Reconciler 進行接口的調用。 這里我們需要講一下 Attach/Detach 這兩個操作: 之前我們提到 AD Controller 也會做 Attach/Detach 操作,所以到底是由誰來做呢?我們可以通過 "--enable-controller-attach-detach" 標簽進行定義,如果它為 True,則由 AD Controller 來控制;若為 False,就由 Volume Manager 來做。 它是 Kubelet 的一個標簽,只能定義某個節點的行為,所以如果假設一個有 10 個節點的集群,它有 5 個節點定義該標簽為 False,說明這 5 個節點是由節點上的 Kubelet 來做掛載,而其它 5 個節點是由 AD Controller 來做掛載。 下圖是 Volume Manager 實現邏輯圖。 我們可以看到,最外層是一個循環,內部則是根據不同的對象,包括 desireStateofWorld, actualStateofWorld 的不同對象做一個輪詢。 例如,對 actualStateofWorld 中的 MountedVolumes 對象做輪詢,對其中的某一個 Volume,如果它同時存在于 desireStateofWorld,這就說明實際的和期望的 Volume 均是處于掛載狀態,因此我們不會做任何處理。如果它不存在于 desireStateofWorld,說明期望狀態中該 Volume 應該處于 Umounted 狀態,就執行 UnmountVolume,將其狀態轉變為 desireStateofWorld 中相同的狀態。 所以我們可以看到:實際上,該過程就是根據 desireStateofWorld 和 actualStateofWorld 的對比,再調用底層的接口來執行相應的操作,下面的 desireStateofWorld.UnmountVolumes 和 actualStateofWorld.AttachedVolumes 的操作也是同樣的道理。 Volume Plugins 我們之前提到的 PV Controller、AD Controller 以及 Volume Manager 其實都是通過調用 Volume Plugin 提供的接口,比如 Provision、Delete、Attach、Detach 等去做一些 PV、PVC 的管理。而這些接口的具體實現邏輯是放在 VolumePlugin 中的 根據源碼的位置可將 Volume Plugins 分為 In-Tree 和 Out-of-Tree 兩類: In-Tree 表示源碼是放在 Kubernetes 內部的,和 Kubernetes 一起發布、管理與迭代,缺點及時迭代速度慢、靈活性差; Out-of-Tree 類的 Volume Plugins 的代碼獨立于 Kubernetes,它是由存儲商提供實現的,目前主要有 Flexvolume 和 CSI 兩種實現機制,可以根據存儲類型實現不同的存儲插件。所以我們比較推崇 Out-of-Tree 這種實現邏輯。 從位置上我們可以看到,Volume Plugins 實際上就是 PV Controller、AD Controller 以及 Volume Manager 所調用的一個庫,分為 In-Tree 和 Out-of-Tree 兩類 Plugins。它通過這些實現來調用遠端的存儲,比如說掛載一個 NAS 的操作 "mount -t nfs * ",該命令其實就是在 Volume Plugins 中實現的,它會去調用遠程的一個存儲掛載到本地。 從類型上來看,Volume Plugins 可以分為很多種。In-Tree 中就包含了 幾十種常見的存儲實現,但一些公司的自己定義私有類型,有自己的 API 和參數,公共存儲插件是無法支持的,這時就需要 Out-of-Tree 類的存儲實現,比如 CSI、FlexVolume。 Volume Plugins 的具體實現會放到后面去講。這里主要看一下 Volume Plugins 的插件管理。 Kubernetes會在 PV Controller、AD Controller 以及 Volume Manager 中來做插件管理。通過 VolumePlguinMg 對象進行管理。主要包含 Plugins 和 Prober 兩個數據結構。 Plugins 主要是用來保存 Plugins 列表的一個對象,而 Prober 是一個探針,用于發現新的 Plugin,比如 FlexVolume、CSI 是擴展的一種插件,它們是動態創建和生成的,所以一開始我們是無法預知的,因此需要一個探針來發現新的 Plugin。 下圖是插件管理的整個過程。 PV Controller、AD Controller 以及 Volume Manager 在啟動的時候會執行一個 InitPlugins 方法來對 VolumePluginsMgr 做一些初始化。 它首先會將所有 In-Tree 的 Plugins 加入到我們的插件列表中。同時會調用 Prober 的 init 方法,該方法會首先調用一個 InitWatcher,它會時刻觀察著某一個目錄 (比如圖中的 /usr/libexec/kubernetes/kubelet-plugins/volume/exec/),當這個目錄每生成一個新文件的時候,也就是創建了一個新的 Plugins,此時就會生成一個新的 FsNotify.Create 事件,并將其加入到 EventsMap 中;同理,如果刪除了一個文件,就生成一個 FsNotify.Remove 事件加入到 EventsMap 中。 當上層調用 refreshProbedPlugins 時,Prober 就會把這些事件進行一個更新,如果是 Create,就將其添加到插件列表;如果是 Remove,就從插件列表中刪除一個插件。 以上就是 Volume Plugins 的插件管理機制。 Kubernetes 存儲卷調度 我們之前說到 Pod 必須被調度到某個 Worker 上才能去運行。在調度 Pod 時,我們會使用不同的調度器來進行篩選,其中有一些與 Volume 相關的調度器。例如 VolumeZonePredicate、VolumeBindingPredicate、CSIMaxVolumLimitPredicate 等。 VolumeZonePredicate 會檢查 PV 中的 Label,比如 failure-domain.beta.kubernetes.io/zone 標簽,如果該標簽定義了 zone 的信息,VolumeZonePredicate 就會做相應的判斷,即必須符合相應的 zone 的節點才能被調度。 比如下圖左側的例子,定義了一個 label 的 zone 為 cn-shenzhen-a。右側的 PV 則定義了一個 nodeAffinity,其中定義了 PV 所期望的節點的 Label,該 Label 是通過 VolumeBindingPredicate 進行篩選的。 存儲卷具體調度信息的實現可以參考《 從零開始入門 K8s | 應用存儲和持久化數據卷:存儲快照與拓撲調度 》,這里會有一個更加詳細的介紹。 二、Flexvolume 介紹及使用 Flexvolume 是 Volume Plugins 的一個擴展,主要實現 Attach/Detach/Mount/Unmount 這些接口。我們知道這些功能本是由 Volume Plugins 實現的,但是對于某些存儲類型,我們需要將其擴展到 Volume Plugins 以外,所以我們需要把接口的具體實現放到外面。 在下圖中我們可以看到,Volume Plugins 其實包含了一部分 Flexvolume 的實現代碼,但這部分代碼其實只有一個 “Proxy”的功能。 比如當 AD Controller 調用插件的一個 Attach 時,它首先會調用 Volume Plugins 中 Flexvolume 的 Attach 接口,但這個接口只是把調用轉到相應的 Flexvolume 的Out-Of-Tree實現上。 Flexvolume是可被 Kubelet 驅動的可執行文件,每一次調用相當于執行一次 shell 的 ls 這樣的腳本,都是可執行文件的命令行調用,因此它不是一個常駐內存的守護進程。 Flexvolume 的 Stdout 作為 Kubelet 調用的返回結果,這個結果需要是 JSON 格式。 Flexvolume默認的存放地址為 "/usr/libexec/kubernetes/kubelet-plugins/volume/exec/alicloud~disk/disk"。 下面是一個命令格式和調用的實例。 Flexvolume 的接口介紹 Flexvolum 包含以下接口: init : 主要做一些初始化的操作,比如部署插件、更新插件的時候做 init 操作,返回的時候會返回剛才我們所說的 DriveCapabilities 類型的數據結構,用來說明我們的 Flexvolume 插件有哪些功能; GetVolumeName : 返回插件名; Attach : 掛載功能的實現。根據 --enable-controller-attach-detach 標簽來決定是由 AD Controller 還是 Kubelet 來發起掛載操作; WaitforAttach : Attach 經常是異步操作,因此需要等待掛載完成,才能需要進行下面的操作; MountDevice:它是 mount 的一部分。這里我們將 mount 分為 MountDevice 和 SetUp 兩部分,MountDevice 主要做一些簡單的預處理工作,比如將設備格式化、掛載到 GlobalMount 目錄中等; GetPath :獲取每個 Pod 對應的本地掛載目錄; Setup :使用 Bind 方式將 GlobalPath 中的設備掛載到 Pod 的本地目錄; TearDown 、 UnmountDevice 、 Detach 實現的是上面一些借口的逆過程; ExpandVolumeDevice :擴容存儲卷,由 Expand Controller 發起調用; NodeExpand : 擴容文件系統,由 Kubelet 發起調用。 上面這些接口不一定需要全部實現,如果某個接口沒有實現的話,可以將返回結果定義成: { "status" : "Not supported" , "message" : "error message" } 告訴調用者沒有實現這個接口。此外,Volume Plugins 中的 Flexvolume 接口除了作為一個 Proxy 外,它也提供了一些默認實現,比如 Mount 操作。所以如果你的 Flexvolume 中沒有定義該接口,該默認實現就會被調用。 在定義 PV 時可以通過 secretRef 字段來定義一些 secret 的功能。比如掛載時所需的用戶名和密碼,就可以通過 secretRef 傳入。 Flexvolume 的掛載分析 從掛載流程和卸載流程兩個方向來分析 Flexvolume 的掛載過程。 我們首先看 Attach 操作,它調用了一個遠端的 API 把我們的 Storage 掛載到目標節點中的某個設備上去。然后通過 MountDevice 將本地設備掛載到 GlobalPath 中,同時也會做一些格式化這樣的操作。Mount 操作(SetUp),它會把 GlobalPath 掛載 PodPath 中,PodPath 就是 Pod 啟動時所映射的一個目錄。 下圖給出了一個例子,比如我們一個云盤,其 Volume ID 為 d-8vb4fflsonz21h31cmss,在執行完 Attach 和 WaitForAttach 操作之后,就會將其掛載到目標節點上的 /dec/vdc 設備中。執行 MountDevice 之后,就會把上述設備格式化,掛載到一個本地的 GlobalPath 中。而執行完 Mount 之后,就會將 GlobalPath 映射到 Pod 相關的一個子目錄中。最后執行 Bind 操作,將我們的本地目錄映射到容器中。這樣完成一次掛載過程。 卸載流程就是一個逆過程。上述過程描述的是一個塊設備的掛載過程,對于文件存儲類型,就無需 Attach、MountDevice操作,只需要 Mount 操作,因此文件系統的 Flexvolume 實現較為簡單,只需要 Mount 和 Unmount 過程即可。 Flexvolume 的代碼示例 其中主要實現的是 init()、doMount()、doUnmount() 方法。在執行該腳本的時候對傳入的參數進行判斷來決定執行哪一個命令。 在 Github 上還有很多 Flexvolume 的示例,大家可以自行參考查閱。阿里云提供了一個 Flexvolume 的實現 ,有興趣的可以參考一下。 Flexvolume 的使用 下圖給出了一個 Flexvolume 類型的 PV 模板。它和其它模板實際上沒有什么區別,只不過類型被定義為 flexVolume 類型。flexVolume 中定義了 driver、fsType、options。 driver 定義的是我們實現的某種驅動,比如圖中的是 aliclound/disk,也可以是 aliclound/nas 等; fsType 定義的是文件系統類型,比如 "ext4"; options 包含了一些具體的參數,比如定義云盤的 id 等。 我們也可以像其它類型一樣,通過 selector 中的 matchLabels 定義一些篩選條件。同樣也可以定義一些相應的調度信息,比如定義 zone 為 cn-shenzhen-a。 下面是一個具體的運行結果。在 Pod 內部我們掛載了一個云盤,其所在本地設備為 /dev/vdb。通過 mount | grep disk 我們可以看到相應的掛載目錄,首先它會將 /dev/vdb 掛載到 GlobalPath 中;其次會將 GlobalPath 通過 mount 命令掛載到一個 Pod 所定義的本地子目錄中去;最后會把該本地子目錄映射到 /data 上。 三、CSI 介紹及使用 和 Flexvolume 類似,CSI 也是為第三方存儲提供數據卷實現的抽象接口。 有了 Flexvolume,為何還要 CSI 呢? Flexvolume 只是給 kubernetes 這一個編排系統來使用的,而 CSI 可以滿足不同編排系統的需求,比如 Mesos,Swarm。 其次 CSI 是容器化部署,可以減少環境依賴,增強安全性,豐富插件的功能。我們知道,Flexvolume 是在 host 空間一個二進制文件,執行 Flexvolum 時相當于執行了本地的一個 shell 命令,這使得我們在安裝 Flexvolume 的時候需要同時安裝某些依賴,而這些依賴可能會對客戶的應用產生一些影響。因此在安全性上、環境依賴上,就會有一個不好的影響。 同時對于豐富插件功能這一點,我們在 Kubernetes 生態中實現 operator 的時候,經常會通過 RBAC 這種方式去調用 Kubernetes 的一些接口來實現某些功能,而這些功能必須要在容器內部實現,因此像 Flexvolume 這種環境,由于它是 host 空間中的二進制程序,就沒法實現這些功能。而 CSI 這種容器化部署的方式,可以通過 RBAC 的方式來實現這些功能。 CSI 主要包含兩個部分:CSI Controller Server 與 CSI Node Server。 Controller Server 是控制端的功能,主要實現創建、刪除、掛載、卸載等功能; Node Server 主要實現的是節點上的 mount、Unmount 功能。 下圖給出了 CSI 接口通信的描述。CSI Controller Server 和 External CSI SideCar 是通過 Unix Socket 來進行通信的,CSI Node Server 和 Kubelet 也是通過 Unix Socket 來通信,之后我們會講一下 External CSI SiderCar 的具體概念。 下圖給出了 CSI 的接口。主要分為三類:通用管控接口、節點管控接口、中心管控接口。 通用管控接口主要返回 CSI 的一些通用信息,像插件的名字、Driver 的身份信息、插件所提供的能力等; 節點管控接口的 NodeStageVolume 和 NodeUnstageVolume 就相當于 Flexvolume 中的 MountDevice 和 UnmountDevice。NodePublishVolume 和 NodeUnpublishVolume 就相當于 SetUp 和 TearDown 接口; 中心管控接口的 CreateVolume 和 DeleteVolume 就是我們的 Provision 和 Delete 存儲卷的一個接口,ControllerPublishVolume 和 ControllerUnPublishVolume 則分別是 Attach 和 Detach 的接口。 CSI 的系統結構 CSI 是通過 CRD 的形式實現的,所以 CSI 引入了這么幾個對象類型:VolumeAttachment、CSINode、CSIDriver 以及 CSI Controller Server 與 CSI Node Server 的一個實現。 在 CSI Controller Server 中,有傳統的類似 Kubernetes 中的 AD Controller 和 Volume Plugins,VolumeAttachment 對象就是由它們所創建的。 此外,還包含多個 External Plugin組件,每個組件和 CSI Plugin 組合的時候會完成某種功能。比如: External Provisioner 和 Controller Server 組合的時候就會完成數據卷的創建與刪除功能; External Attacher 和 Controller Server 組合起來可以執行數據卷的掛載和操作; External Resizer 和 Controller Server 組合起來可以執行數據卷的擴容操作; External Snapshotter 和 Controller Server 組合則可以完成快照的創建和刪除。 CSI Node Server 中主要包含 Kubelet 組件,包括 VolumeManager 和 VolumePlugin,它們會去調用 CSI Plugin 去做 mount 和 unmount 操作;另外一個組件 Driver Registrar 主要實現的是 CSI Plugin 注冊的功能。 以上就是 CSI 的整個拓撲結構,接下來我們將分別介紹不同的對象和組件。 CSI 對象 我們將介紹 3 種對象:VolumeAttachment,CSIDriver,CSINode。 VolumeAttachment 描述一個 Volume 卷在一個 Pod 使用中掛載、卸載的相關信息。例如,對一個卷在某個節點上的掛載,我們通過 VolumeAttachment 對該掛載進行跟蹤。AD Controller 創建一個 VolumeAttachment,而 External-attacher 則通過觀察該 VolumeAttachment,根據其狀態來進行掛載和卸載操作。 下圖就是一個 VolumeAttachment 的例子,其類別 (kind) 為 VolumeAttachment,spec 中指定了 attacher 為 ossplugin.csi.alibabacloud.com,即指定掛載是由誰操作的;指定了 nodeName 為 cn-zhangjiakou.192.168.1.53,即該掛載是發生在哪個節點上的;指定了 source 為 persistentVolumeName 為 oss-csi-pv,即指定了哪一個數據卷進行掛載和卸載。 status 中 attached 指示了掛載的狀態,如果是 False, External-attacher 就會執行一個掛載操作。 第二個對象是 CSIDriver,它描述了集群中所部署的 CSI Plugin 列表,需要管理員根據插件類型進行創建。 例如下圖中創建了一些 CSI Driver,通過 kuberctl get csidriver 我們可以看到集群里面創建的 3 種類型的 CSI Driver:一個是云盤;一個是 NAS;一個是 OSS。 在 CSI Driver 中,我們定義了它的名字,在 spec 中還定義了 attachRequired 和 podInfoOnMount 兩個標簽。 attachRequired 定義一個 Plugin 是否支持 Attach 功能,主要是為了對塊存儲和文件存儲做區分。比如文件存儲不需要 Attach 操作,因此我們將該標簽定義為 False; podInfoOnMount 則是定義 Kubernetes 在調用 Mount 接口時是否帶上 Pod 信息。 第三個對象是 CSINode,它是集群中的節點信息,由 node-driver-registrar 在啟動時創建。它的作用是每一個新的 CSI Plugin 注冊后,都會在 CSINode 列表里添加一個 CSINode 信息。 例如下圖,定義了 CSINode 列表,每一個 CSINode 都有一個具體的信息(左側的 YAML)。以 一 cn-zhangjiakou.192.168.1.49 為例,它包含一個云盤的 CSI Driver,還包含一個 NAS 的 CSI Driver。每個 Driver 都有自己的 nodeID 和它的拓撲信息 topologyKeys。如果沒有拓撲信息,可以將 topologyKeys 設置為 "null"。也就是說,假如有一個有 10 個節點的集群,我們可以只定義一部分節點擁有 CSINode。 CSI 組件之 Node-Driver-Registrar Node-Driver-Registrar 主要實現了 CSI Plugin 注冊的一個機制。我們來看一下下圖中的流程圖。 第 1 步 ,在啟動的時候有一個約定,比如說在 /var/lib/kuberlet/plugins_registry 這個目錄每新加一個文件,就相當于每新加了一個 Plugin; 啟動 Node-Driver-Registrar,它首先會向 CSI-Plugin 發起一個接口調用 GetPluginInfo,這個接口會返回 CSI 所監聽的地址以及 CSI-Plugin 的一個 Driver name; 第 2 步 ,Node-Driver-Registrar 會監聽 GetInfo 和 NotifyRegistrationStatus 兩個接口; 第 3 步 ,會在 /var/lib/kuberlet/plugins_registry 這個目錄下啟動一個 Socket,生成一個 Socket 文件 ,例如:"diskplugin.csi.alibabacloud.com-reg.sock",此時 Kubelet 通過 Watcher 發現這個 Socket 后,它會通過該 Socket 向 Node-Driver-Registrar 的 GetInfo 接口進行調用。GetInfo 會把剛才我們所獲得的的 CSI-Plugin 的信息返回給 Kubelet,該信息包含了 CSI-Plugin 的監聽地址以及它的 Driver name; 第 4 步 ,Kubelet 通過得到的監聽地址對 CSI-Plugin 的 NodeGetInfo 接口進行調用; 第 5 步 ,調用成功之后,Kubelet 會去更新一些狀態信息,比如節點的 Annotations、Labels、status.allocatable 等信息,同時會創建一個 CSINode 對象; 第 6 步 ,通過對 Node-Driver-Registrar 的 NotifyRegistrationStatus 接口的調用告訴它我們已經把 CSI-Plugin 注冊成功了。 通過以上 6 步就實現了 CSI Plugin 注冊機制。 CSI 組件之 External-Attacher External-Attacher 主要是通過 CSI Plugin 的接口來實現數據卷的掛載與卸載功能。它通過觀察 VolumeAttachment 對象來實現狀態的判斷。VolumeAttachment 對象則是通過 AD Controller 來調用 Volume Plugin 中的 CSI Attacher 來創建的。CSI Attacher 是一個 In-Tree 類,也就是說這部分是 Kubernetes 完成的。 當 VolumeAttachment 的狀態是 False 時,External-Attacher 就去調用底層的一個 Attach 功能;若期望值為 False,就通過底層的 ControllerPublishVolume 接口實現 Detach 功能。同時,External-Attacher 也會同步一些 PV 的信息在里面。 CSI 部署 我們現在來看一下塊存儲的部署情況。 之前提到 CSI 的 Controller 分為兩部分,一個是 Controller Server Pod,一個是 Node Server Pod。 我們只需要部署一個 Controller Server,如果是多備份的,可以部署兩個。Controller Server 主要是通過多個外部插件來實現的,比如說一個 Pod 中可以定義多個 External 的 Container 和一個包含 CSI Controller Server 的 Container,這時候不同的 External 組件會和 Controller Server 組成不同的功能。 而 Node Server Pod 是個 DaemonSet,它會在每個節點上進行注冊。Kubelet 會直接通過 Socket 的方式直接和 CSI Node Server 進行通信、調用 Attach/Detach/Mount/Unmount 等。 Driver Registrar 只是做一個注冊的功能,會在每個節點上進行部署。 文件存儲和塊存儲的部署情況是類似的。只不過它會把 Attacher 去掉,也沒有 VolumeAttachment 對象。 CSI 使用示例 和 Flexvolume 一樣,我們看一下它的定義模板。 可以看到,它和其它的定義并沒什么區別。主要的區別在于類型為 CSI,里面會定義 driver,volumeHandle,volumeAttribute,nodeAffinity 等。 driver 就是定義是由哪一個插件來去實現掛載; volumeHandle 主要是指示 PV 的唯一標簽; volumeAttribute 用于附加參數,比如 PV 如果定義的是 OSS,那么就可以在 volumeAttribute 定義 bucket、訪問的地址等信息在里面; nodeAffinity 則可以定義一些調度信息。與 Flexvolume 類似,還可以通過 selector 和 Label 定義一些綁定條件。 中間的圖給出了一個動態調度的例子,它和其它類型的動態調度是一樣的。只不過在定義 provisioner 的時候指定了一個 CSI 的 provisioner。 下面給出了一個具體的掛載例子。 Pod 啟動之后,我們可以看到 Pod 已經把一個 /dev/vdb 掛載到 /data 上了。同理,它有一個 GlobalPath 和一個 PodPath 的集群在里面。我們可以把一個 /dev/vdb 掛載到一個 GlobalPath 里面,它就是一個 CSI 的一個 PV 在本節點上唯一確定的目錄。一個 PodPath 就是一個 Pod 所確定的一個本地節點的目錄,它會把 Pod 所對應的目錄映射到我們的容器中去。 CSI 的其它功能 除了掛載、卸載之外,CSI 化提供了一些附加的功能。例如,在定義模板的時候往往需要一些用戶名和密碼信息,此時我們就可通過 Secret 來進行定義。之前我們所講的 Flexvolume 也支持這個功能,只不過 CSI 可以根據不同的階段定義不同的 Secret 類型,比如掛載階段的 Secret、Mount 階段的 Secret、Provision 階段的 Secret。 Topology 是一個拓撲感知的功能。當我們定義一個數據卷的時候,集群中并不是所有節點都能滿足該數據卷的需求,比如我們需要掛載不同的 zone 的信息在里面,這就是一個拓撲感知的功能。這部分在第 10 講已有詳細的介紹,大家可以進行參考。 Block Volume 就是 volumeMode 的一個定義,它可以定義成 Block 類型,也可以定義成文件系統類型,CSI 支持 Block 類型的 Volume,就是說掛載到 Pod 內部時,它是一個塊設備,而不是一個目錄。 Skip Attach 和 PodInfo On Mount 是剛才我們所講過的 CSI Driver 中的兩個功能。 CSI 的近期 Features CSI 還是一個比較新的實現方式。近期也有了很多更新,比如 ExpandCSIVolumes 可以實現文件系統擴容的功能;VolumeSnapshotDataSource 可以實現數據卷的快照功能;VolumePVCDataSource 實現的是可以定義 PVC 的數據源;我們以前在使用 CSI 的時候只能通過 PVC、PV 的方式定義,而不能直接在 Pod 里面定義 Volume,CSIInlineVolume 則可以讓我們可以直接在 Volume 中定義一些 CSI 的驅動。 阿里云在 GitHub 上開源了 CSI 的實現 ,大家有興趣的可以看一下,做一些參考。 四、本文總結 本文主要介紹了 Kubernetes 集群中存儲卷相關的知識,主要有以下三點內容: 第一部分講述了 Kubernetes 存儲架構,主要包括存儲卷概念、掛載流程、系統組件等相關知識; 第二部分講述了 Flexvolume 插件的實現原理、部署架構、使用示例等; 第三部分講述了 CSI 插件的實現原理、資源對象、功能組件、使用示例等; 希望上述知識點能讓各位同學有所收獲,特別是在處理存儲卷相關的設計、開發、故障處理等方面有所幫助。 查看更多:https://yq.aliyun.com/articles/743613?utm_content=g_1000103097 上云就看云棲號:更多云資訊,上云案例,最佳實踐,產品入門,訪問:https://yqh.aliyun.com/
來源:OSCHINA
發布時間:2020-02-10 15:36:00
# default_exp digdata # 上面一行用于nbdev中聲明本模塊的名稱。必須是notebook的第一個Cell的第一行。 digdata 描述:抗擊新冠病毒(3)-探索在線數據資源 功能:本頁面用于交互式地探索數據。通過訪問網絡獲取數據,分析和理解網頁數據結構,轉換為列表格式,用于后續的分析和繪圖輸出。 模塊:使用JupyterLab、Python、nbdev等完成。用到的Python模塊包括: requests,訪問web服務網站。 re,正則表達式解析。 json,JSON格式解析。 BeautifulSoup,HTML格式解析。 pprint,格式化輸出。 pandas,數據表格分析。 源碼-https://github.com/openthings/anti2020ncov 參考: JupyterLab-數據實驗室 文學式編程-nbdev入門教程 抗擊新冠病毒(1)-開源軟件與數據項目 抗擊新冠病毒(2)-基于Jupyter+nbdev的數據分析 #hide from nbdev.showdoc import * #export from bs4 import BeautifulSoup from parser import * #regex_parser import re import json import time import logging import datetime import requests import pprint 獲取網頁數據 #export #url = "https://3g.dxy.cn/newh5/view/pneumonia" url = "https://ncov.dxy.cn/ncovh5/view/pneumonia?from=singlemessage&isappinstalled=0" headers = { 'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_2) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/79.0.3945.117 Safari/537.36' } #export session = requests.session() session.headers.update(headers) r = session.get(url) #export #pprint.pprint(r.text) #export #soup = BeautifulSoup(r.content, 'lxml') #soup 提取特定的數據域 # export # 分為總體情況、分省情況、省內各市情況、新聞四大類。 overall_information = re.search(r'\{("id".*?)\}', str(soup.find('script', attrs={'id': 'getStatisticsService'}))) province_information = re.search(r'\[(.*?)\]', str(soup.find('script', attrs={'id': 'getListByCountryTypeService1'}))) area_information = re.search(r'\[(.*)\]', str(soup.find('script', attrs={'id': 'getAreaStat'}))) news_information = re.search(r'\[(.*?)\]', str(soup.find('script', attrs={'id': 'getTimelineService'}))) 1、總體情況 #pprint.pprint(overall_information.string) #overall_information.group(0) #jsall = json.loads(overall_information.group(0)) def overall_parser(overall_information): overall_information = json.loads(overall_information.group(0)) overall_information.pop('id') overall_information.pop('createTime') overall_information.pop('modifyTime') overall_information.pop('imgUrl') overall_information.pop('deleted') overall_information['countRemark'] = overall_information['countRemark'].replace(' 疑似', ',疑似').replace(' 治愈', ',治愈').replace(' 死亡', ',死亡').replace(' ', '') #overall_information = json.loads(overall_information.group(0)) 2、分省情況 #provinces = json.loads(province_information.group(0)) #provinces def province_parser(province_information): provinces = json.loads(province_information.group(0)) crawl_timestamp = "" for province in provinces: province.pop('id') province['comment'] = province['comment'].replace(' ', '') province['crawlTime'] = crawl_timestamp #province['country'] = country_type.get(province['countryType']) province['tags'] = province['tags'].replace(' ', '') province = regex_parser(content=province, key='tags') #for province in provinces: # print(province['id'],'\t',province['provinceShortName'],'\t',province['tags']) 3、省內各市縣情況 #area_information.string area = json.loads(area_information.group(0)) print("省份\t確診\t疑似\t治愈\t死亡") for a in area: print(a['provinceName'],'\t',a['confirmedCount'],'\t',a['suspectedCount'],'\t',a['curedCount'],'\t',a['deadCount']) 按省提取城市情況 cities = area[0]['cities'] #cities print("城市\t確診\t疑似\t治愈\t死亡") for p in area: cities = p['cities'] print("===================================") print(p['provinceName'],'\t',p['confirmedCount'],'\t',p['suspectedCount'],'\t',p['curedCount'],'\t',p['deadCount']) print("-----------------------------------") for c in cities: print(c['cityName'],'\t',c['confirmedCount'],'\t',c['suspectedCount'],'\t',c['curedCount'],'\t',c['deadCount']) 4、新聞列表 news = json.loads(news_information.group(0)) #news for n in news: print(n['id'],'\t',n['infoSource'].strip(),'\t',n['title'].strip())#,n['summary'].strip()) nbdev 適用工具 # 將notebook轉化為python的*.py代碼,保存到項目名稱的子目錄中。 from nbdev.export import * notebook2script() Converted 00_digdata.ipynb. Converted 01_getdata.ipynb. Converted 10_charts.ipynb. Converted 10_china.ipynb. Converted index.ipynb. help(notebook2script) Help on function notebook2script in module nbdev.export: notebook2script(fname=None, silent=False, to_dict=False) Convert notebooks matching `fname` to modules
來源:OSCHINA
發布時間:2020-02-10 12:39:00
直播主題: 智能測溫及社區防疫監控解決方案 直播時間: 2月10日 10:00-10:30 講師: 岑參,阿里云智能IoT解決方案架構師七年醫療行業數字化咨詢經驗,曾任職于國內頭部IT公司,現負責IOT醫療行業 適合觀眾: 政務大廳管理人員、車站交通樞紐管理人員、醫院管理人員、一般居民用戶 內容簡介: 熱成像人體測溫方案適用于人群聚集區域檢測疫情防控,利用紅外非接觸式體溫檢測,可實現快速體溫篩查,遠距離、大面積檢測,自動預警;智能體溫遠程監控方案可部署于醫院新開設的隔離病區或居民家庭,能很大程度減少醫患間因測量基礎生命體征而發生的接觸,提高效率降低被感染風險。 直播主題: 金融行業釘釘組織健康守護方案 直播時間: 2月10日 14:00-14:50 講師: 七玉,釘釘金融行業運營專家十年企業數字化咨詢、運營經驗,曾任職于國際、國內頭部IT公司,現負責釘釘金融行業 適合觀眾: 金融行業(銀行、保險、證券等)IT負責人、人事經理、辦公室主任等 內容簡介: 應對疫情期間,各組織啟用遠程辦公的需求,阿里巴巴釘釘緊急推出組織健康方案,免費推出員工健康打卡服務以及異地辦公工具,幫助金融機構守護組織成員健康的同時實現數智化遠程協同。本次課程分為兩期,第一期:精準通知、智能守護。包括釘釘健康打卡、緊急通知、視頻會議三個核心場景。 在線看大會,就來云棲號! 每天都有行業專家分享!請訪問: https://yqh.aliyun.com/zhibo
來源:OSCHINA
發布時間:2020-02-10 10:33:00
一、安裝準備 本次安裝的版本是截止2020.1.30最新的版本0.17.0 軟件要求 需要**Java 8(8u92 +)**以上的版本,否則會有問題 Linux,Mac OS X或其他類似Unix的操作系統(不支持Windows) 硬件要求 Druid包括一組參考配置和用于單機部署的啟動腳本: nano-quickstart micro-quickstart small medium large xlarge 單服務器參考配置 Nano-Quickstart:1個CPU,4GB RAM 啟動命令: bin/start-nano-quickstart 配置目錄: conf/druid/single-server/nano-quickstart 微型快速入門:4個CPU,16GB RAM 啟動命令: bin/start-micro-quickstart 配置目錄: conf/druid/single-server/micro-quickstart 小型:8 CPU,64GB RAM(?i3.2xlarge) 啟動命令: bin/start-small 配置目錄: conf/druid/single-server/small 中:16 CPU,128GB RAM(?i3.4xlarge) 啟動命令: bin/start-medium 配置目錄: conf/druid/single-server/medium 大型:32 CPU,256GB RAM(?i3.8xlarge) 啟動命令: bin/start-large 配置目錄: conf/druid/single-server/large 大型X:64 CPU,512GB RAM(?i3.16xlarge) 啟動命令: bin/start-xlarge 配置目錄: conf/druid/single-server/xlarge 我們這里做測試使用選擇最低配置即可 nano-quickstart 二、下載安裝包 訪問官網: http://druid.io/現在也會跳轉https://druid.apache.org/ 或者直接訪問 https://druid.apache.org/ 點擊download進入下載頁面: 選擇最新版本: apache-druid-0.17.0-bin.tar.gz 進行下載 200多M 也可以選擇下載源碼包 用maven進行編譯 三、安裝 上傳安裝包 在終端中運行以下命令來安裝Druid: tar -xzf apache-druid-0.17.0-bin.tar.gz cd apache-druid-0.17.0 安裝包里有這幾個目錄: LICENSE 和 NOTICE 文件 bin/* -腳本 conf/* -單服務器和集群設置的示例配置 extensions/* -擴展 hadoop-dependencies/* -Druid Hadoop依賴 lib/* -Druid庫 quickstart/* -快速入門教程的配置文件,樣本數據和其他文件 配置文件 #進入我們要啟動的配置文件位置: cd conf/druid/single-server/nano-quickstart/ _common 公共配置 是druid一些基本的配置,比如元數據庫地址 各種路徑等等 其他的是各個節點的配置 比較類似,比如broker cd broker/ jvm配置 main配置 runtime運行時相關的配置 回到主目錄 啟動的conf在 cd conf/supervise/single-server 里面是不同配置啟動不同的腳本 四、啟動 回到主目錄 ./bin/start-nano-quickstart 啟動成功: 訪問 localhost:8888 看到管理頁面 如果要修改端口,需要修改配置的端口和主目錄下的 vi bin/verify-default-ports 五、加載數據 Druid提供了一個示例數據文件,其中包含2015年9月12日發生的Wiki的示例數據。 此樣本數據位于 quickstart/tutorial/wikiticker-2015-09-12-sampled.json.gz 示例數據大概是這樣: { "timestamp":"2015-09-12T20:03:45.018Z", "channel":"#en.wikipedia", "namespace":"Main", "page":"Spider-Man's powers and equipment", "user":"foobar", "comment":"/* Artificial web-shooters */", "cityName":"New York", "regionName":"New York", "regionIsoCode":"NY", "countryName":"United States", "countryIsoCode":"US", "isAnonymous":false, "isNew":false, "isMinor":false, "isRobot":false, "isUnpatrolled":false, "added":99, "delta":99, "deleted":0, } Druid加載數據分為以下幾種: 加載文件 從kafka中加載數據 從hadoop中加載數據 自定義加載方式 我們這樣演示一下加載示例文件數據 1、進入localhost:8888 點擊load data 2、選擇local disk 3、選擇Connect data 4、預覽數據 Base directory輸入quickstart/tutorial/ File filter輸入 wikiticker-2015-09-12-sampled.json.gz 然后點擊apply預覽 就可以看見數據了 點擊Next:parse data解析數據 5、解析數據 可以看到json數據已經被解析了 繼續解析時間 6、解析時間 解析時間成功 之后兩步是transform和filter 這里不做演示了 直接next 7、確認Schema 這一步會讓我們確認Schema 可以做一些修改 由于數據量較小 我們直接關掉Rollup 直接下一步 8、設置分段 這里可以設置數據分段 我們選擇hour next 9、確認發布 10、發布成功 開始解析數據 等待任務成功 11、查看數據 選擇datasources 可以看到我們加載的數據 可以看到數據源名稱 Fully是完全可用 還有大小等各種信息 12、查詢數據 點擊query按鈕 我們可以寫sql查詢數據了 還可以將數據下載 Druid相關博文 什么是Druid 靜下心來,努力的提升自己,永遠都沒有錯。更多實時計算相關博文,歡迎關注實時流式計算
來源:OSCHINA
發布時間:2020-02-10 09:06:00
Apache Flink社區宣布Flink 1.10.0正式發布! 本次Release版本修復1.2K個問題,對Flink作業的整體性能和穩定性做了重大改進,同時增加了對K8S,Python的支持。 這個版本標志著與Blink集成的完成,并且強化了流式SQL與Hive的集成,本文將詳細介紹新功能和主要的改進。 一、內存管理優化 原有TaskExecutor 有一些缺點: 流處理和批處理用了不同的配置模型; 流處理的堆外配置RocksDB復雜,需要用戶配置; 為了使內存管理更明確直觀,Flink 1.10對TaskExecutor內存模型和配置做了重大改進,這個更改使FLink更適合于各種部署環境:K8S,Yarn,Mesos。 這種更改統一了入口點,使得下游框架比如zeppelin的編程更加容易。 二、集成Kubernetes 這對于想要在容器中使用Flink的用戶是一個非常好的消息。 在Flink1.10中推出了 Active Kubernetes集成 Flink的ResourceManager( K8sResMngr )與Kubernetes進行本地通信以按需分配新的Pod,類似于Flink的Yarn和Mesos集成。用戶還可以利用命名空間為聚合資源消耗有限的多租戶環境啟動Flink集群。事先配置具有足夠權限的RBAC角色和服務帳戶。 用戶可以簡單地參考Kubernetes配置選項,然后使用以下命令在CLI中將作業提交到Kubernetes上的現有Flink會話: ./bin/flink run -d -e kubernetes-session -Dkubernetes.cluster-id= examples/streaming/WindowJoin.jar 三、集成Hive Flink 1.10通過開發將Hive集成到Flink,可用于生產環境。 并且支持大部分Hive版本,Flink支持Hive版本列表: 1.0 1.0.0 1.0.1 1.1 1.1.0 1.1.1 1.2 1.2.0 1.2.1 1.2.2 2.0 2.0.0 2.0.1 2.1 2.1.0 2.1.1 2.2 2.2.0 2.3 2.3.0 2.3.1 2.3.2 2.3.3 2.3.4 2.3.5 2.3.6 3.1 3.1.0 3.1.1 3.1.2 需要引入依賴 org.apache.flink flink-connector-hive_2.11 1.10.0 provided org.apache.flink flink-table-api-java-bridge_2.11 1.10.0 provided org.apache.hive hive-exec ${hive.version} provided 連接Hive代碼 val settings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build() val tableEnv = TableEnvironment.create(settings) val name = "myhive" val defaultDatabase = "mydatabase" val hiveConfDir = "/opt/hive-conf" // a local path val version = "2.3.4" val hive = new HiveCatalog(name, defaultDatabase, hiveConfDir, version) tableEnv.registerCatalog("myhive", hive) // set the HiveCatalog as the current catalog of the session tableEnv.useCatalog("myhive") 四、PyFlink:支持UDF 從Flink 1.10開始,PyFlink開始支持UDF函數。 用戶還可以 pip 使用以下方法輕松安裝PyFlink : pip install apache-flink 五、其他重要變化 Flink現在可以編譯并在Java 11上運行。 一個新的Elasticsearch sink,完全支持Elasticsearch 7.x版本。 Kafka 0.8 和 0.9 版本已經被廢,不再支持。 刪除了非認證網絡流量配置選項taskmanager.network.credit.model。 刪除了舊版Web UI。 六、貢獻者名單 最后我們看一下貢獻者的名單,有很多國內大神的身影 Achyuth Samudrala, Aitozi, Alberto Romero, Alec.Ch, Aleksey Pak, Alexander Fedulov, Alice Yan, Aljoscha Krettek, Aloys, Andrey Zagrebin, Arvid Heise, Benchao Li, Benoit Hanotte, Beno?t Paris, Bhagavan Das, Biao Liu, Chesnay Schepler, Congxian Qiu, Cyrille Chépélov, César Soto Valero, David Anderson, David Hrbacek, David Moravek, Dawid Wysakowicz, Dezhi Cai, Dian Fu, Dyana Rose, Eamon Taaffe, Fabian Hueske, Fawad Halim, Fokko Driesprong, Frey Gao, Gabor Gevay, Gao Yun, Gary Yao, GatsbyNewton, GitHub, Grebennikov Roman, GuoWei Ma, Gyula Fora, Haibo Sun, Hao Dang, Henvealf, Hongtao Zhang, HuangXingBo, Hwanju Kim, Igal Shilman, Jacob Sevart, Jark Wu, Jeff Martin, Jeff Yang, Jeff Zhang, Jiangjie (Becket) Qin, Jiayi, Jiayi Liao, Jincheng Sun, Jing Zhang, Jingsong Lee, JingsongLi, Joao Boto, John Lonergan, Kaibo Zhou, Konstantin Knauf, Kostas Kloudas, Kurt Young, Leonard Xu, Ling Wang, Lining Jing, Liupengcheng, LouisXu, Mads Chr. Olesen, Marco Zu?hlke, Marcos Klein, Matyas Orhidi, Maximilian Bode, Maximilian Michels, Nick Pavlakis, Nico Kruber, Nicolas Deslandes, Pablo Valtuille, Paul Lam, Paul Lin, PengFei Li, Piotr Nowojski, Piotr Przybylski, Piyush Narang, Ricco Chen, Richard Deurwaarder, Robert Metzger, Roman, Roman Grebennikov, Roman Khachatryan, Rong Rong, Rui Li, Ryan Tao, Scott Kidder, Seth Wiesman, Shannon Carey, Shaobin.Ou, Shuo Cheng, Stefan Richter, Stephan Ewen, Steve OU, Steven Wu, Terry Wang, Thesharing, Thomas Weise, Till Rohrmann, Timo Walther, Tony Wei, TsReaper, Tzu-Li (Gordon) Tai, Victor Wong, WangHengwei, Wei Zhong, WeiZhong94, Wind (Jiayi Liao), Xintong Song, XuQianJin-Stars, Xuefu Zhang, Xupingyong, Yadong Xie, Yang Wang, Yangze Guo, Yikun Jiang, Ying, YngwieWang, Yu Li, Yuan Mei, Yun Gao, Yun Tang, Zhanchun Zhang, Zhenghua Gao, Zhijiang, Zhu Zhu, a-suiniaev, azagrebin, beyond1920, biao.liub, blueszheng, bowen.li, caoyingjie, catkint, chendonglin, chenqi, chunpinghe, cyq89051127, danrtsey.wy, dengziming, dianfu, eskabetxe, fanrui, forideal, gentlewang, godfrey he, godfreyhe, haodang, hehuiyuan, hequn8128, hpeter, huangxingbo, huzheng, ifndef-SleePy, jiemotongxue, joe, jrthe42, kevin.cyj, klion26, lamber-ken, libenchao, liketic, lincoln-lil, lining, liuyongvs, liyafan82, lz, mans2singh, mojo, openinx, ouyangwulin, shining-huang, shuai-xu, shuo.cs, stayhsfLee, sunhaibotb, sunjincheng121, tianboxiu, tianchen, tianchen92, tison, tszkitlo40, unknown, vinoyang, vthinkxie, wangpeibin, wangxiaowei, wangxiyuan, wangxlong, wangyang0918, whlwanghailong, xuchao0903, xuyang1706, yanghua, yangjf2019, yongqiang chai, yuzhao.cyz, zentol, zhangzhanchum, zhengcanbin, zhijiang, zhongyong jin, zhuzhu.zz, zjuwangg, zoudaokoulife, 硯田, 謝磊, 張志豪, 曹建華 Flink系列文章: Flink入門(一)——Apache Flink介紹 Flink入門(二)——Flink架構介紹 Flink入門(三)——環境與部署 Flink入門(四)——編程模型 更多實時計算,Flink,Kafka等相關技術博文,歡迎關注實時流式計算
來源:OSCHINA
發布時間:2020-02-13 09:39:00
2020年到了,祝大家新年快樂! 2020年是一個閏年(Leap Year),閏年是會出故障的。八年前,2012年2月29日,我在Azure的時候我們就出了一個大故障: https://azure.microsoft.com/en-us/blog/summary-of-windows-azure-service-disruption-on-feb-29th-2012/ 常見的錯誤認知 1、 一年總是365天 2、2月總是28天 3、閏年是每四年一次 其實,閏年并不是每四年一次。2000是閏年,但1900年和2100都不是閏年。 哪里容易出閏年相關的Bug 1、在一個日期值上加或減時間的代碼。尤其是加減1年或1個月的代碼 2、各種根據數據庫查詢結果生成的報表和圖標,月度和年度統計可能會少算1天 3、證書/密碼/密鑰/緩存 等的過期時間,可能會比預期的早了一天,或者可能設定了一個非法的過期時間 4、固定長度的數組。例如,一個長度為365的數組遇到閏年可能就不夠了,可能會數組越界。 5、UI組件,例如日歷、日期選擇組件,以及客戶端輸入校驗相關的代碼。 閏年的哪些日子要特別注意 2019年12月31日:這是閏年前一年的最后一天。2019年的最后一天加365天,并不是2020年的最后一天,而會是2020年的倒數第二天(即2020年12月30日)。 2020年1月1日:閏年的第一天。閏年的第一天加365天,并不是下一年的1月1日,而是今年的12月31日。 2020年1月31日:這一天加28天,并不是下個月(2月)的最后一天。 2020年2月1日:這一天加28天,并不是下個月(3月)的第一天。 2020年2月28日:這是2月29日的前一天。有問題的代碼可能會錯誤的把這天當成2月的最后一天,試圖加1天得到3月1日。但實際上這一天加1天是2月29日。 2020年2月29日:這是閏年多出來的一天。如果代碼以為2月總是只有28天,那代碼可能出現各種問題,例如: 入參校驗會認為一個合法輸入(2020/2/29)是非法的,用 { year+1 , month , day } 的方式來加減1年的話會產生一個非法日期。 2020年3月1日:2月29日后面的那天。代碼如果在3月1日上減28天,會得到2月2日(而不是預期中的2月1日);減365天的話會得到2019年3月2日(而不是預期中的3月1日)。 2020年12月31日:一年的第366天。 代碼如果不能正確處理一年的第366天,可能也會導致問題。例如,2008年12月31日,第三方軟件中的問題導致了所有Microsoft Zune設備無法使用,詳情參考: http://www.theguardian.com/technology/blog/2009/jan/01/zune-firmware-mistake 代碼如果假設1年永遠是365天,聲明了一個固定大小為365的數組,那在一年的第366天可能會發生數組越界。 數組越界如果發生在 C/C++ 語言編寫的代碼里,可能導致內存溢出攻擊漏洞。 查看更多:https://yq.aliyun.com/articles/742802?utm_content=g_1000103478 上云就看云棲號:更多云資訊,上云案例,最佳實踐,產品入門,訪問:https://yqh.aliyun.com/
來源:OSCHINA
發布時間:2020-02-12 15:13:00
2020.01.02 新年伊始,Nacos Star 突破 10000,從此邁上了一個新的里程碑。感謝大家的一路支持、信任和幫助?。?! Nacos 開源 17 個月以來,發布了 22 個版本,成功切入 Dubbo/Spring-Cloud/ 云原生三個核心生態。吸引了 88 位優秀貢獻者,積累了 110 家企業案例,官網累計獲取 20w+ 用戶瀏覽, 2000 UV ,借此機會,我們代表 Nacos 社區一起回顧 Nacos 來時的路,和未來的發展方向。 項目起源 Nacos 在阿里巴巴起源于 2008 年五彩石項目(完成微服務拆分和業務中臺建設),成長于十年雙十一的洪峰考驗,沉淀了簡單易用、穩定可靠、性能卓越的核心競爭力。隨著云計算興起,2018 年我們深刻感受到開源軟件行業的影響,因此決定將 Nacos(阿里內部 Configserver/Diamond/Vipserver 內核) 開源,輸出阿里十年的沉淀,推動微服務行業發展,加速企業數字化轉型! 開源后的工作 開源很重要的是生態,而且開發者往往是先選服務框架,再選注冊中心和配置中心,因此在 1.0 之前 Nacos 首先支持了國內人氣最高的 Dubbo/Spring-Cloud 兩個主流服務框架,又在 1.X 版本之后支持了云原生的服務框架。至此 Nacos 目前已經能夠支持所有主流服務框架,并且為用戶未來平滑遷移云原生服務框架做好了準備! 雖然我們期望通過云原生的方式支持多語言,但是也為 Java/Golang/NodeJs/Cpp/Python 等提供了語言級支持,以便給大家更好的編程體驗! 后續規劃 2020 年,我們將聚焦 Nacos 內核構建,打造一個更穩定、更安全、更高效的微服務引擎! 目前最核心的工作如下: 建立訪問控制體系,提升安全水準 升級連接通道,提升推送效率 解耦Mysql,降低部署運維成本 查看更多:https://yq.aliyun.com/articles/742637?utm_content=g_1000103477 上云就看云棲號:更多云資訊,上云案例,最佳實踐,產品入門,訪問:https://yqh.aliyun.com/
來源:OSCHINA
發布時間:2020-02-12 15:10:00
消息隊列常見面試問題小集合 一、為什么使用消息隊列?消息隊列有什么優點和缺點?Kafka、ActiveMQ、RabbitMQ、RocketMQ 都有什么區別,以及適合哪些場景? 面試官心理分析 其實面試官主要是想看看: 第一 ,你知不知道你們系統里為什么要用消息隊列這個東西? 不少候選人,說自己項目里用了 Redis、MQ,但是其實他并不知道自己為什么要用這個東西。其實說白了,就是為了用而用,或者是別人設計的架構,他從頭到尾都沒思考過。 沒有對自己的架構問過為什么的人,一定是平時沒有思考的人,面試官對這類候選人印象通常很不好。因為面試官擔心你進了團隊之后只會木頭木腦的干呆活兒,不會自己思考。 第二 ,你既然用了消息隊列這個東西,你知不知道用了有什么好處&壞處? 你要是沒考慮過這個,那你盲目弄個 MQ 進系統里,后面出了問題你是不是就自己溜了給公司留坑?你要是沒考慮過引入一個技術可能存在的弊端和風險,面試官把這類候選人招進來了,基本可能就是挖坑型選手。就怕你干 1 年挖一堆坑,自己跳槽了,給公司留下無窮后患。 第三 ,既然你用了 MQ,可能是某一種 MQ,那么你當時做沒做過調研? 你別傻乎乎的自己拍腦袋看個人喜好就瞎用了一個 MQ,比如 Kafka,甚至都從沒調研過業界流行的 MQ 到底有哪幾種。每一個 MQ 的優點和缺點是什么。每一個 MQ 沒有絕對的好壞 ,但是就是看用在哪個場景可以 揚長避短,利用其優勢,規避其劣勢 。 如果是一個不考慮技術選型的候選人招進了團隊,leader 交給他一個任務,去設計個什么系統,他在里面用一些技術,可能都沒考慮過選型,最后選的技術可能并不一定合適,一樣是留坑。 面試題剖析 為什么使用消息隊列 其實就是問問你消息隊列都有哪些使用場景,然后你項目里具體是什么場景,說說你在這個場景里用消息隊列是什么? 面試官問你這個問題, 期望的一個回答 是說,你們公司有個什么 業務場景 ,這個業務場景有個什么技術挑戰,如果不用 MQ 可能會很麻煩,但是你現在用了 MQ 之后帶給了你很多的好處。 先說一下消息隊列常見的使用場景吧,其實場景有很多,但是比較核心的有 3 個: 解耦 、 異步 、 削峰 。 解耦 看這么個場景。A 系統發送數據到 BCD 三個系統,通過接口調用發送。如果 E 系統也要這個數據呢?那如果 C 系統現在不需要了呢?A 系統負責人幾乎崩潰...... 在這個場景中,A 系統跟其它各種亂七八糟的系統嚴重耦合,A 系統產生一條比較關鍵的數據,很多系統都需要 A 系統將這個數據發送過來。A 系統要時時刻刻考慮 BCDE 四個系統如果掛了該咋辦?要不要重發,要不要把消息存起來?頭發都白了??! 如果使用 MQ,A 系統產生一條數據,發送到 MQ 里面去,哪個系統需要數據自己去 MQ 里面消費。如果新系統需要數據,直接從 MQ 里消費即可;如果某個系統不需要這條數據了,就取消對 MQ 消息的消費即可。這樣下來,A 系統壓根兒不需要去考慮要給誰發送數據,不需要維護這個代碼,也不需要考慮人家是否調用成功、失敗超時等情況。 總結 :通過一個 MQ,Pub/Sub 發布訂閱消息這么一個模型,A 系統就跟其它系統徹底解耦了。 面試技巧 :你需要去考慮一下你負責的系統中是否有類似的場景,就是一個系統或者一個模塊,調用了多個系統或者模塊,互相之間的調用很復雜,維護起來很麻煩。但是其實這個調用是不需要直接同步調用接口的,如果用 MQ 給它異步化解耦,也是可以的,你就需要去考慮在你的項目里,是不是可以運用這個 MQ 去進行系統的解耦。在簡歷中體現出來這塊東西,用 MQ 作解耦。 異步 再來看一個場景,A 系統接收一個請求,需要在自己本地寫庫,還需要在 BCD 三個系統寫庫,自己本地寫庫要 3ms,BCD 三個系統分別寫庫要 300ms、450ms、200ms。最終請求總延時是 3 + 300 + 450 + 200 = 953ms,接近 1s,用戶感覺搞個什么東西,慢死了慢死了。用戶通過瀏覽器發起請求,等待個 1s,這幾乎是不可接受的。 一般互聯網類的企業,對于用戶直接的操作,一般要求是每個請求都必須在 200 ms 以內完成,對用戶幾乎是無感知的。 如果 使用 MQ ,那么 A 系統連續發送 3 條消息到 MQ 隊列中,假如耗時 5ms,A 系統從接受一個請求到返回響應給用戶,總時長是 3 + 5 = 8ms,對于用戶而言,其實感覺上就是點個按鈕,8ms 以后就直接返回了,爽!網站做得真好,真快! 削峰 每天 0:00 到 12:00,A 系統風平浪靜,每秒并發請求數量就 50 個。結果每次一到 12:00 ~ 13:00 ,每秒并發請求數量突然會暴增到 5k+ 條。但是系統是直接基于 MySQL的,大量的請求涌入 MySQL,每秒鐘對 MySQL 執行約 5k 條 SQL。 一般的 MySQL,扛到每秒 2k 個請求就差不多了,如果每秒請求到 5k 的話,可能就直接把 MySQL 給打死了,導致系統崩潰,用戶也就沒法再使用系統了。 但是高峰期一過,到了下午的時候,就成了低峰期,可能也就 1w 的用戶同時在網站上操作,每秒中的請求數量可能也就 50 個請求,對整個系統幾乎沒有任何的壓力。 如果使用 MQ,每秒 5k 個請求寫入 MQ,A 系統每秒鐘最多處理 2k 個請求,因為 MySQL 每秒鐘最多處理 2k 個。A 系統從 MQ 中慢慢拉取請求,每秒鐘就拉取 2k 個請求,不要超過自己每秒能處理的最大請求數量就 ok,這樣下來,哪怕是高峰期的時候,A 系統也絕對不會掛掉。而 MQ 每秒鐘 5k 個請求進來,就 2k 個請求出去,結果就導致在中午高峰期(1 個小時),可能有幾十萬甚至幾百萬的請求積壓在 MQ 中。 這個短暫的高峰期積壓是 ok 的,因為高峰期過了之后,每秒鐘就 50 個請求進 MQ,但是 A 系統依然會按照每秒 2k 個請求的速度在處理。所以說,只要高峰期一過,A 系統就會快速將積壓的消息給解決掉。 消息隊列有什么優缺點 優點上面已經說了,就是 在特殊場景下有其對應的好處 , 解耦 、 異步 、 削峰 。 缺點有以下幾個: 1.系統可用性降低 系統引入的外部依賴越多,越容易掛掉。本來你就是 A 系統調用 BCD 三個系統的接口就好了,人 ABCD 四個系統好好的,沒啥問題,你偏加個 MQ 進來,萬一 MQ 掛了咋整,MQ 一掛,整套系統崩潰的,你不就完了? 2.系統復雜度提高 硬生生加個 MQ 進來,你怎么保證消息沒有重復消費?怎么處理消息丟失的情況?怎么保證消息傳遞的順序性?頭大頭大,問題一大堆,痛苦不已。 3.一致性問題 A 系統處理完了直接返回成功了,人都以為你這個請求就成功了;但是問題是,要是 BCD 三個系統那里,BD 兩個系統寫庫成功了,結果 C 系統寫庫失敗了,咋整?你這數據就不一致了。 所以消息隊列實際是一種非常復雜的架構,你引入它有很多好處,但是也得針對它帶來的壞處做各種額外的技術方案和架構來規避掉,做好之后,你會發現,媽呀,系統復雜度提升了一個數量級,也許是復雜了 10 倍。但是關鍵時刻,用,還是得用的。 Kafka、ActiveMQ、RabbitMQ、RocketMQ 有什么優缺點? 一般的業務系統要引入 MQ,最早大家都用 ActiveMQ,但是現在確實大家用的不多了,沒經過大規模吞吐量場景的驗證,社區也不是很活躍,所以大家還是算了吧,我個人不推薦用這個了;綜上,各種對比之后,有如下建議: 后來大家開始用 RabbitMQ,但是確實 erlang 語言阻止了大量的 Java 工程師去深入研究和掌控它,對公司而言,幾乎處于不可控的狀態,但是確實人家是開源的,比較穩定的支持,活躍度也高; 不過現在確實越來越多的公司,會去用 RocketMQ,確實很不錯(阿里出品),但社區可能有突然黃掉的風險,對自己公司技術實力有絕對自信的,推薦用 RocketMQ,否則回去老老實實用 RabbitMQ 吧,人家有活躍的開源社區,絕對不會黃。 所以 中小型公司 ,技術實力較為一般,技術挑戰不是特別高,用 RabbitMQ 是不錯的選擇; 大型公司 ,基礎架構研發實力較強,用 RocketMQ 是很好的選擇。 如果是 大數據領域 的實時計算、日志采集等場景,用 Kafka 是業內標準的,絕對沒問題,社區活躍度很高,絕對不會黃,何況幾乎是全世界這個領域的事實性規范。 二、如何保證消息隊列的高可用? 面試官心理分析 如果有人問到你 MQ 的知識,高可用是必問的。 上一講提到,MQ 會導致系統可用性降低。 所以只要你用了 MQ,接下來問的一些要點肯定就是圍繞著 MQ 的那些缺點怎么來解決了。 要是你傻乎乎的就干用了一個 MQ,各種問題從來沒考慮過,那你就杯具了,面試官對你的感覺就是,只會簡單使用一些技術,沒任何思考,馬上對你的印象就不太好了。 這樣的同學招進來要是做個 20k 薪資以內的普通小弟還湊合,要是做薪資 20k+ 的高工,那就慘了,讓你設計個系統,里面肯定一堆坑,出了事故公司受損失,團隊一起背鍋。 面試題剖析 這個問題這么問是很好的,因為不能問你 Kafka 的高可用性怎么保證? ActiveMQ 的高可用性怎么保證? 一個面試官要是這么問就顯得很沒水平,人家可能用的就是 RabbitMQ,沒用過 Kafka,你上來問人家 Kafka 干什么? 這不是擺明了刁難人么。 所以有水平的面試官,問的是 MQ 的高可用性怎么保證? 這樣就是你用過哪個 MQ,你就說說你對那個 MQ 的高可用性的理解。 RabbitMQ 的高可用性 RabbitMQ 是比較有代表性的,因為是基于主從(非分布式)做高可用性的,我們就以 RabbitMQ 為例子講解第一種 MQ 的高可用性怎么實現。 RabbitMQ 有三種模式: 單機模式、普通集群模式、鏡像集群模式。 單機模式 單機模式,就是 Demo 級別的,一般就是你本地啟動了玩玩兒的??,沒人生產用單機模式。 普通集群模式(無高可用性) 普通集群模式,意思就是在多臺機器上啟動多個 RabbitMQ 實例,每個機器啟動一個。 你創建的 queue,只會放在一個 RabbitMQ 實例上,但是每個實例都同步 queue 的元數據(元數據可以認為是 queue 的一些配置信息,通過元數據,可以找到 queue 所在實例)。 你消費的時候,實際上如果連接到了另外一個實例,那么那個實例會從 queue 所在實例上拉取數據過來。 這種方式確實很麻煩,也不怎么好,沒做到所謂的分布式,就是個普通集群。 因為這導致你要么消費者每次隨機連接一個實例然后拉取數據,要么固定連接那個 queue 所在實例消費數據,前者有數據拉取的開銷,后者導致單實例性能瓶頸。 而且如果那個放 queue 的實例宕機了,會導致接下來其他實例就無法從那個實例拉取,如果你開啟了消息持久化,讓 RabbitMQ 落地存儲消息的話,消息不一定會丟,得等這個實例恢復了,然后才可以繼續從這個 queue 拉取數據。 所以這個事兒就比較尷尬了,這就沒有什么所謂的高可用性,這方案主要是提高吞吐量的,就是說讓集群中多個節點來服務某個 queue 的讀寫操作。 鏡像集群模式(高可用性) 這種模式,才是所謂的 RabbitMQ 的高可用模式。 跟普通集群模式不一樣的是,在鏡像集群模式下,你創建的 queue,無論元數據還是 queue 里的消息都會存在于多個實例上,就是說,每個 RabbitMQ 節點都有這個 queue 的一個完整鏡像,包含 queue 的全部數據的意思。 然后每次你寫消息到 queue 的時候,都會自動把消息同步到多個實例的 queue 上。 那么如何開啟這個鏡像集群模式呢? 其實很簡單,RabbitMQ 有很好的管理控制臺,就是在后臺新增一個策略,這個策略是鏡像集群模式的策略,指定的時候是可以要求數據同步到所有節點的,也可以要求同步到指定數量的節點,再次創建 queue 的時候,應用這個策略,就會自動將數據同步到其他的節點上去了。 這樣的話,好處在于,你任何一個機器宕機了,沒事兒,其它機器(節點)還包含了這個 queue 的完整數據,別的 consumer 都可以到其它節點上去消費數據。 壞處在于,第一,這個性能開銷也太大了吧,消息需要同步到所有機器上,導致網絡帶寬壓力和消耗很重! 第二,這么玩兒,不是分布式的,就沒有擴展性可言了,如果某個 queue 負載很重,你加機器,新增的機器也包含了這個 queue 的所有數據,并沒有辦法線性擴展你的 queue。 你想,如果這個 queue 的數據量很大,大到這個機器上的容量無法容納了,此時該怎么辦呢? Kafka 的高可用性 Kafka 一個最基本的架構認識: 由多個 broker 組成,每個 broker 是一個節點; 你創建一個 topic,這個 topic 可以劃分為多個 partition,每個 partition 可以存在于不同的 broker 上,每個 partition 就放一部分數據。 這就是天然的分布式消息隊列,就是說一個 topic 的數據,是分散放在多個機器上的,每個機器就放一部分數據。 實際上 RabbmitMQ 之類的,并不是分布式消息隊列,它就是傳統的消息隊列,只不過提供了一些集群、HA(High Availability, 高可用性) 的機制而已,因為無論怎么玩兒,RabbitMQ 一個 queue 的數據都是放在一個節點里的,鏡像集群下,也是每個節點都放這個 queue 的完整數據。 Kafka 0.8 以前,是沒有 HA 機制的,就是任何一個 broker 宕機了,那個 broker 上的 partition 就廢了,沒法寫也沒法讀,沒有什么高可用性可言。 比如說,我們假設創建了一個 topic,指定其 partition 數量是 3 個,分別在三臺機器上。 但是,如果第二臺機器宕機了,會導致這個 topic 的 1/3 的數據就丟了,因此這個是做不到高可用的。 Kafka 0.8 以后,提供了 HA 機制,就是 replica(復制品) 副本機制。 每個 partition 的數據都會同步到其它機器上,形成自己的多個 replica 副本。 所有 replica 會選舉一個 leader 出來,那么生產和消費都跟這個 leader 打交道,然后其他 replica 就是 follower。 寫的時候,leader 會負責把數據同步到所有 follower 上去,讀的時候就直接讀 leader 上的數據即可。 只能讀寫 leader? 很簡單,要是你可以隨意讀寫每個 follower,那么就要 care 數據一致性的問題,系統復雜度太高,很容易出問題。 Kafka 會均勻地將一個 partition 的所有 replica 分布在不同的機器上,這樣才可以提高容錯性。 這么搞,就有所謂的高可用性了,因為如果某個 broker 宕機了,沒事兒,那個 broker上面的 partition 在其他機器上都有副本的。 如果這個宕機的 broker 上面有某個 partition 的 leader,那么此時會從 follower 中重新選舉一個新的 leader 出來,大家繼續讀寫那個新的 leader 即可。 這就有所謂的高可用性了。 寫數據的時候,生產者就寫 leader,然后 leader 將數據落地寫本地磁盤,接著其他 follower 自己主動從 leader 來 pull 數據。 一旦所有 follower 同步好數據了,就會發送 ack 給 leader,leader 收到所有 follower 的 ack 之后,就會返回寫成功的消息給生產者。 (當然,這只是其中一種模式,還可以適當調整這個行為) 消費的時候,只會從 leader 去讀,但是只有當一個消息已經被所有 follower 都同步成功返回 ack 的時候,這個消息才會被消費者讀到。 看到這里,相信你大致明白了 Kafka 是如何保證高可用機制的了,對吧? 不至于一無所知,現場還能給面試官畫畫圖。 要是遇上面試官確實是 Kafka 高手,深挖了問,那你只能說不好意思,太深入的你沒研究過。 三、如何保證消息不被重復消費? 或者說,如何保證消息消費的冪等性? 面試官心理分析 其實這是很常見的一個問題,這倆問題基本可以連起來問。 既然是消費消息,那肯定要考慮會不會重復消費? 能不能避免重復消費? 或者重復消費了也別造成系統異??梢詥?? 這個是 MQ 領域的基本問題,其實本質上還是問你使用消息隊列如何保證冪等性,這個是你架構里要考慮的一個問題。 面試題剖析 回答這個問題,首先你別聽到重復消息這個事兒,就一無所知吧,你先大概說一說可能會有哪些重復消費的問題。 首先,比如 RabbitMQ、RocketMQ、Kafka,都有可能會出現消息重復消費的問題,正常。 因為這問題通常不是 MQ 自己保證的,是由我們開發來保證的。 挑一個 Kafka 來舉個例子,說說怎么重復消費吧。 Kafka 實際上有個 offset 的概念,就是每個消息寫進去,都有一個 offset,代表消息的序號,然后 consumer 消費了數據之后,每隔一段時間(定時定期),會把自己消費過的消息的 offset 提交一下,表示“我已經消費過了,下次我要是重啟啥的,你就讓我繼續從上次消費到的 offset 來繼續消費吧”。 但是凡事總有意外,比如我們之前生產經常遇到的,就是你有時候重啟系統,看你怎么重啟了,如果碰到點著急的,直接 kill 進程了,再重啟。 這會導致 consumer 有些消息處理了,但是沒來得及提交 offset,尷尬了。 重啟之后,少數消息會再次消費一次。 舉個栗子。 有這么個場景。 數據 1/2/3 依次進入 kafka,kafka 會給這三條數據每條分配一個 offset,代表這條數據的序號,我們就假設分配的 offset 依次是 152/153/154。 消費者從 kafka 去消費的時候,也是按照這個順序去消費。 假如當消費者消費了 offset=153 的這條數據,剛準備去提交 offset 到 zookeeper,此時消費者進程被重啟了。 那么此時消費過的數據 1/2 的 offset 并沒有提交,kafka 也就不知道你已經消費了 offset=153 這條數據。 那么重啟之后,消費者會找 kafka 說,嘿,哥兒們,你給我接著把上次我消費到的那個地方后面的數據繼續給我傳遞過來。 由于之前的 offset 沒有提交成功,那么數據 1/2 會再次傳過來,如果此時消費者沒有去重的話,那么就會導致重復消費。 如果消費者干的事兒是拿一條數據就往數據庫里寫一條,會導致說,你可能就把數據 1/2 在數據庫里插入了 2 次,那么數據就錯啦。 其實重復消費不可怕,可怕的是你沒考慮到重復消費之后,怎么保證冪等性。 舉個例子吧。 假設你有個系統,消費一條消息就往數據庫里插入一條數據,要是你一個消息重復兩次,你不就插入了兩條,這數據不就錯了? 但是你要是消費到第二次的時候,自己判斷一下是否已經消費過了,若是就直接扔了,這樣不就保留了一條數據,從而保證了數據的正確性。 一條數據重復出現兩次,數據庫里就只有一條數據,這就保證了系統的冪等性。 冪等性,通俗點說,就一個數據,或者一個請求,給你重復來多次,你得確保對應的數據是不會改變的,不能出錯。 所以第二個問題來了,怎么保證消息隊列消費的冪等性? 其實還是得結合業務來思考,我這里給幾個思路: 比如你拿個數據要寫庫,你先根據主鍵查一下,如果這數據都有了,你就別插入了,update 一下好吧。 比如你是寫 Redis,那沒問題了,反正每次都是 set,天然冪等性。 比如你不是上面兩個場景,那做的稍微復雜一點,你需要讓生產者發送每條數據的時候,里面加一個全局唯一的 id,類似訂單 id 之類的東西,然后你這里消費到了之后,先根據這個 id 去比如 Redis 里查一下,之前消費過嗎? 如果沒有消費過,你就處理,然后這個 id 寫 Redis。 如果消費過了,那你就別處理了,保證別重復處理相同的消息即可。 比如基于數據庫的唯一鍵來保證重復數據不會重復插入多條。 因為有唯一鍵約束了,重復數據插入只會報錯,不會導致數據庫中出現臟數據。 當然,如何保證 MQ 的消費是冪等性的,需要結合具體的業務來看。 四、如何保證消息的可靠性傳輸? 或者說,如何處理消息丟失的問題? 面試官心理分析 這個是肯定的,用 MQ 有個基本原則,就是數據不能多一條,也不能少一條,不能多,就是前面說的重復消費和冪等性問題。 不能少,就是說這數據別搞丟了。 那這個問題你必須得考慮一下。 如果說你這個是用 MQ 來傳遞非常核心的消息,比如說計費、扣費的一些消息,那必須確保這個 MQ 傳遞過程中絕對不會把計費消息給弄丟。 面試題剖析 數據的丟失問題,可能出現在生產者、MQ、消費者中,咱們從 RabbitMQ 和 Kafka 分別來分析一下吧。 RabbitMQ 生產者弄丟了數據 生產者將數據發送到 RabbitMQ 的時候,可能數據就在半路給搞丟了,因為網絡問題啥的,都有可能。 此時可以選擇用 RabbitMQ 提供的事務功能,就是生產者發送數據之前開啟 RabbitMQ 事務channel.txSelect,然后發送消息,如果消息沒有成功被 RabbitMQ 接收到,那么生產者會收到異常報錯,此時就可以回滾事務channel.txRollback,然后重試發送消息; 如果收到了消息,那么可以提交事務channel.txCommit。 // 開啟事務 channel.txSelect try { // 這里發送消息 } catch ( Exception e) { channel.txRollback // 這里再次重發這條消息 } // 提交事務 channel.txCommitCopy to clipboardErrorCopied 但是問題是,RabbitMQ 事務機制(同步)一搞,基本上吞吐量會下來,因為太耗性能。 所以一般來說,如果你要確保說寫 RabbitMQ 的消息別丟,可以開啟 confirm 模式,在生產者那里設置開啟 confirm 模式之后,你每次寫的消息都會分配一個唯一的 id,然后如果寫入了 RabbitMQ 中,RabbitMQ 會給你回傳一個 ack 消息,告訴你說這個消息 ok 了。 如果 RabbitMQ 沒能處理這個消息,會回調你的一個 nack 接口,告訴你這個消息接收失敗,你可以重試。 而且你可以結合這個機制自己在內存里維護每個消息 id 的狀態,如果超過一定時間還沒接收到這個消息的回調,那么你可以重發。 事務機制和 confirm 機制最大的不同在于,事務機制是同步的,你提交一個事務之后會阻塞在那兒,但是 confirm 機制是異步的,你發送個消息之后就可以發送下一個消息,然后那個消息 RabbitMQ 接收了之后會異步回調你的一個接口通知你這個消息接收到了。 所以一般在生產者這塊避免數據丟失,都是用 confirm 機制的。 RabbitMQ 弄丟了數據 就是 RabbitMQ 自己弄丟了數據,這個你必須開啟 RabbitMQ 的持久化,就是消息寫入之后會持久化到磁盤,哪怕是 RabbitMQ 自己掛了,恢復之后會自動讀取之前存儲的數據,一般數據不會丟。 除非極其罕見的是,RabbitMQ 還沒持久化,自己就掛了,可能導致少量數據丟失,但是這個概率較小。 設置持久化有兩個步驟: 創建 queue 的時候將其設置為持久化 這樣就可以保證 RabbitMQ 持久化 queue 的元數據,但是它是不會持久化 queue 里的數據的。 第二個是發送消息的時候將消息的 deliveryMode 設置為 2 就是將消息設置為持久化的,此時 RabbitMQ 就會將消息持久化到磁盤上去。 必須要同時設置這兩個持久化才行,RabbitMQ 哪怕是掛了,再次重啟,也會從磁盤上重啟恢復 queue,恢復這個 queue 里的數據。 注意,哪怕是你給 RabbitMQ 開啟了持久化機制,也有一種可能,就是這個消息寫到了 RabbitMQ 中,但是還沒來得及持久化到磁盤上,結果不巧,此時 RabbitMQ 掛了,就會導致內存里的一點點數據丟失。 所以,持久化可以跟生產者那邊的 confirm 機制配合起來,只有消息被持久化到磁盤之后,才會通知生產者 ack了,所以哪怕是在持久化到磁盤之前,RabbitMQ 掛了,數據丟了,生產者收不到 ack,你也是可以自己重發的。 消費端弄丟了數據 RabbitMQ 如果丟失了數據,主要是因為你消費的時候,剛消費到,還沒處理,結果進程掛了,比如重啟了,那么就尷尬了,RabbitMQ 認為你都消費了,這數據就丟了。 這個時候得用 RabbitMQ 提供的 ack 機制,簡單來說,就是你必須關閉 RabbitMQ 的自動 ack,可以通過一個 api 來調用就行,然后每次你自己代碼里確保處理完的時候,再在程序里 ack 一把。 這樣的話,如果你還沒處理完,不就沒有 ack 了? 那 RabbitMQ 就認為你還沒處理完,這個時候 RabbitMQ 會把這個消費分配給別的 consumer 去處理,消息是不會丟的。 Kafka 消費端弄丟了數據 唯一可能導致消費者弄丟數據的情況,就是說,你消費到了這個消息,然后消費者那邊自動提交了 offset,讓 Kafka 以為你已經消費好了這個消息,但其實你才剛準備處理這個消息,你還沒處理,你自己就掛了,此時這條消息就丟咯。 這不是跟 RabbitMQ 差不多嗎,大家都知道 Kafka 會自動提交 offset,那么只要關閉自動提交 offset,在處理完之后自己手動提交 offset,就可以保證數據不會丟。 但是此時確實還是可能會有重復消費,比如你剛處理完,還沒提交 offset,結果自己掛了,此時肯定會重復消費一次,自己保證冪等性就好了。 生產環境碰到的一個問題,就是說我們的 Kafka 消費者消費到了數據之后是寫到一個內存的 queue 里先緩沖一下,結果有的時候,你剛把消息寫入內存 queue,然后消費者會自動提交 offset。 然后此時我們重啟了系統,就會導致內存 queue 里還沒來得及處理的數據就丟失了。 Kafka 弄丟了數據 這塊比較常見的一個場景,就是 Kafka 某個 broker 宕機,然后重新選舉 partition 的 leader。 大家想想,要是此時其他的 follower 剛好還有些數據沒有同步,結果此時 leader 掛了,然后選舉某個 follower 成 leader 之后,不就少了一些數據? 這就丟了一些數據啊。 生產環境也遇到過,我們也是,之前 Kafka 的 leader 機器宕機了,將 follower 切換為 leader 之后,就會發現說這個數據就丟了。 所以此時一般是要求起碼設置如下 4 個參數: 給 topic 設置 replication.factor 參數: 這個值必須大于 1,要求每個 partition 必須有至少 2 個副本。 在 Kafka 服務端設置 min.insync.replicas 參數: 這個值必須大于 1,這個是要求一個 leader 至少感知到有至少一個 follower 還跟自己保持聯系,沒掉隊,這樣才能確保 leader 掛了還有一個 follower 吧。 在 producer 端設置 acks=all: 這個是要求每條數據,必須是寫入所有 replica 之后,才能認為是寫成功了。 在 producer 端設置 retries=MAX(很大很大很大的一個值,無限次重試的意思): 這個是要求一旦寫入失敗,就無限重試,卡在這里了。 我們生產環境就是按照上述要求配置的,這樣配置之后,至少在 Kafka broker 端就可以保證在 leader 所在 broker 發生故障,進行 leader 切換時,數據不會丟失。 生產者會不會弄丟數據? 如果按照上述的思路設置了 acks=all,一定不會丟,要求是,你的 leader 接收到消息,所有的 follower 都同步到了消息之后,才認為本次寫成功了。 如果沒滿足這個條件,生產者會自動不斷的重試,重試無限次。 五、如何保證消息的順序性? 面試官心理分析 其實這個也是用 MQ 的時候必問的話題,第一看看你了不了解順序這個事兒? 第二看看你有沒有辦法保證消息是有順序的? 這是生產系統中常見的問題。 面試題剖析 我舉個例子,我們以前做過一個 mysql binlog 同步的系統,壓力還是非常大的,日同步數據要達到上億,就是說數據從一個 mysql 庫原封不動地同步到另一個 mysql 庫里面去(mysql -> mysql)。 常見的一點在于說比如大數據 team,就需要同步一個 mysql 庫過來,對公司的業務系統的數據做各種復雜的操作。 你在 mysql 里增刪改一條數據,對應出來了增刪改 3 條 binlog 日志,接著這三條 binlog 發送到 MQ 里面,再消費出來依次執行,起碼得保證人家是按照順序來的吧? 不然本來是: 增加、修改、刪除; 你愣是換了順序給執行成刪除、修改、增加,不全錯了么。 本來這個數據同步過來,應該最后這個數據被刪除了; 結果你搞錯了這個順序,最后這個數據保留下來了,數據同步就出錯了。 先看看順序會錯亂的倆場景: RabbitMQ: 一個 queue,多個 consumer。 比如,生產者向 RabbitMQ 里發送了三條數據,順序依次是 data1/data2/data3,壓入的是 RabbitMQ 的一個內存隊列。 有三個消費者分別從 MQ 中消費這三條數據中的一條,結果消費者2先執行完操作,把 data2 存入數據庫,然后是 data1/data3。 這不明顯亂了。 Kafka: 比如說我們建了一個 topic,有三個 partition。 生產者在寫的時候,其實可以指定一個 key,比如說我們指定了某個訂單 id 作為 key,那么這個訂單相關的數據,一定會被分發到同一個 partition 中去,而且這個 partition 中的數據一定是有順序的。 消費者從 partition 中取出來數據的時候,也一定是有順序的。 到這里,順序還是 ok 的,沒有錯亂。 接著,我們在消費者里可能會搞多個線程來并發處理消息。 因為如果消費者是單線程消費處理,而處理比較耗時的話,比如處理一條消息耗時幾十 ms,那么 1 秒鐘只能處理幾十條消息,這吞吐量太低了。 而多個線程并發跑的話,順序可能就亂掉了。 解決方案 RabbitMQ 拆分多個 queue,每個 queue 一個 consumer,就是多一些 queue 而已,確實是麻煩點; 或者就一個 queue 但是對應一個 consumer,然后這個 consumer 內部用內存隊列做排隊,然后分發給底層不同的 worker 來處理。 Kafka 一個 topic,一個 partition,一個 consumer,內部單線程消費,單線程吞吐量太低,一般不會用這個。 寫 N 個內存 queue,具有相同 key 的數據都到同一個內存 queue; 然后對于 N 個線程,每個線程分別消費一個內存 queue 即可,這樣就能保證順序性。 六、如何解決消息隊列的延時以及過期失效問題? 消息隊列滿了以后該怎么處理? 有幾百萬消息持續積壓幾小時,說說怎么解決? 面試官心理分析 你看這問法,其實本質針對的場景,都是說,可能你的消費端出了問題,不消費了; 或者消費的速度極其慢。 接著就坑爹了,可能你的消息隊列集群的磁盤都快寫滿了,都沒人消費,這個時候怎么辦? 或者是這整個就積壓了幾個小時,你這個時候怎么辦? 或者是你積壓的時間太長了,導致比如 RabbitMQ 設置了消息過期時間后就沒了怎么辦? 所以就這事兒,其實線上挺常見的,一般不出,一出就是大 case。 一般常見于,舉個例子,消費端每次消費之后要寫 mysql,結果 mysql 掛了,消費端 hang 那兒了,不動了; 或者是消費端出了個什么岔子,導致消費速度極其慢。 面試題剖析 關于這個事兒,我們一個一個來梳理吧,先假設一個場景,我們現在消費端出故障了,然后大量消息在 mq 里積壓,現在出事故了,慌了。 大量消息在 mq 里積壓了幾個小時了還沒解決 幾千萬條數據在 MQ 里積壓了七八個小時,從下午 4 點多,積壓到了晚上 11 點多。 這個是我們真實遇到過的一個場景,確實是線上故障了,這個時候要不然就是修復 consumer 的問題,讓它恢復消費速度,然后傻傻的等待幾個小時消費完畢。 這個肯定不能在面試的時候說吧。 一個消費者一秒是 1000 條,一秒 3 個消費者是 3000 條,一分鐘就是 18 萬條。 所以如果你積壓了幾百萬到上千萬的數據,即使消費者恢復了,也需要大概 1 小時的時間才能恢復過來。 一般這個時候,只能臨時緊急擴容了,具體操作步驟和思路如下: 先修復 consumer 的問題,確保其恢復消費速度,然后將現有 consumer 都停掉。 新建一個 topic,partition 是原來的 10 倍,臨時建立好原先 10 倍的 queue 數量。 然后寫一個臨時的分發數據的 consumer 程序,這個程序部署上去消費積壓的數據,消費之后不做耗時的處理,直接均勻輪詢寫入臨時建立好的 10 倍數量的 queue。 接著臨時征用 10 倍的機器來部署 consumer,每一批 consumer 消費一個臨時 queue 的數據。 這種做法相當于是臨時將 queue 資源和 consumer 資源擴大 10 倍,以正常的 10 倍速度來消費數據。 等快速消費完積壓數據之后,得恢復原先部署的架構,重新用原先的 consumer 機器來消費消息。 mq 中的消息過期失效了 假設你用的是 RabbitMQ,RabbtiMQ 是可以設置過期時間的,也就是 TTL。 如果消息在 queue 中積壓超過一定的時間就會被 RabbitMQ 給清理掉,這個數據就沒了。 那這就是第二個坑了。 這就不是說數據會大量積壓在 mq 里,而是大量的數據會直接搞丟。 這個情況下,就不是說要增加 consumer 消費積壓的消息,因為實際上沒啥積壓,而是丟了大量的消息。 我們可以采取一個方案,就是批量重導,這個我們之前線上也有類似的場景干過。 就是大量積壓的時候,我們當時就直接丟棄數據了,然后等過了高峰期以后,比如大家一起喝咖啡熬夜到晚上12點以后,用戶都睡覺了。 這個時候我們就開始寫程序,將丟失的那批數據,寫個臨時程序,一點一點的查出來,然后重新灌入 mq 里面去,把白天丟的數據給他補回來。 也只能是這樣了。 假設 1 萬個訂單積壓在 mq 里面,沒有處理,其中 1000 個訂單都丟了,你只能手動寫程序把那 1000 個訂單給查出來,手動發到 mq 里去再補一次。 mq 都快寫滿了 如果消息積壓在 mq 里,你很長時間都沒有處理掉,此時導致 mq 都快寫滿了,咋辦? 這個還有別的辦法嗎? 沒有,誰讓你第一個方案執行的太慢了,你臨時寫程序,接入數據來消費,消費一個丟棄一個,都不要了,快速消費掉所有的消息。 然后走第二個方案,到了晚上再補數據吧。 七、如果讓你寫一個消息隊列,該如何進行架構設計? 說一下你的思路。 面試官心理分析 其實聊到這個問題,一般面試官要考察兩塊: 你有沒有對某一個消息隊列做過較為深入的原理的了解,或者從整體了解把握住一個消息隊列的架構原理。 看看你的設計能力,給你一個常見的系統,就是消息隊列系統,看看你能不能從全局把握一下整體架構設計,給出一些關鍵點出來。 說實話,問類似問題的時候,大部分人基本都會蒙,因為平時從來沒有思考過類似的問題,大多數人就是平時埋頭用,從來不去思考背后的一些東西。 類似的問題,比如,如果讓你來設計一個 Spring 框架你會怎么做? 如果讓你來設計一個 Dubbo 框架你會怎么做? 如果讓你來設計一個 MyBatis 框架你會怎么做? 面試題剖析 其實回答這類問題,說白了,不求你看過那技術的源碼,起碼你要大概知道那個技術的基本原理、核心組成部分、基本架構構成,然后參照一些開源的技術把一個系統設計出來的思路說一下就好。 比如說這個消息隊列系統,我們從以下幾個角度來考慮一下: 首先這個 mq 得支持可伸縮性吧,就是需要的時候快速擴容,就可以增加吞吐量和容量,那怎么搞? 設計個分布式的系統唄,參照一下 kafka 的設計理念,broker -> topic -> partition,每個 partition 放一個機器,就存一部分數據。 如果現在資源不夠了,簡單啊,給 topic 增加 partition,然后做數據遷移,增加機器,不就可以存放更多數據,提供更高的吞吐量了? 其次你得考慮一下這個 mq 的數據要不要落地磁盤吧? 那肯定要了,落磁盤才能保證別進程掛了數據就丟了。 那落磁盤的時候怎么落??? 順序寫,這樣就沒有磁盤隨機讀寫的尋址開銷,磁盤順序讀寫的性能是很高的,這就是 kafka 的思路。 其次你考慮一下你的 mq 的可用性??? 這個事兒,具體參考之前可用性那個環節講解的 kafka 的高可用保障機制。 多副本 -> leader & follower -> broker 掛了重新選舉 leader 即可對外服務。 能不能支持數據 0 丟失??? 可以的,參考我們之前說的那個 kafka 數據零丟失方案。 mq 肯定是很復雜的,面試官問你這個問題,其實是個開放題,他就是看看你有沒有從架構角度整體構思和設計的思維以及能力。 確實這個問題可以刷掉一大批人,因為大部分人平時不思考這些東西。 參考文章: https://mp.weixin.qq.com/s/3GMs3ae7ffDFgia9VSDMEg https://mp.weixin.qq.com/s/hAw2KEnZJNIq_qVw8H1UMg
來源:OSCHINA
發布時間:2020-02-12 10:40:00
本文首發于公眾號「Python知識圈」,如需轉載,請在公眾號聯系作者授權。 前言 上一篇文章整理了的公眾號所有文章的導航鏈接,其實如果手動整理起來的話,是一件很費力的事情,因為公眾號里添加文章的時候只能一篇篇的選擇,是個單選框。 面對幾百篇的文章,這樣一個個選擇的話,是一件苦差事。 pk哥作為一個 Pythoner,當然不能這么低效,我們用爬蟲把文章的標題和鏈接等信息提取出來。 抓包 我們需要通過抓包提取公眾號文章的請求的 URL,參考之前寫過的一篇抓包的文章 Python爬蟲APP前的準備 ,pk哥這次直接抓取 PC 端微信的公眾號文章列表信息,更簡單。 我以抓包工具 Charles 為例,勾選容許抓取電腦的請求,一般是默認就勾選的。 為了過濾掉其他無關請求,我們在左下方設置下我們要抓取的域名。 打開 PC 端微信,打開 「Python知識圈」公眾號文章列表后,Charles 就會抓取到大量的請求,找到我們需要的請求,返回的 JSON 信息里包含了文章的標題、摘要、鏈接等信息,都在 comm_msg_info 下面。 這些都是請求鏈接后的返回,請求鏈接 url 我們可以在 Overview 中查看。 通過抓包獲取了這么多信息后,我們可以寫爬蟲爬取所有文章的信息并保存了。 初始化函數 公眾號歷史文章列表向上滑動,加載更多文章后發現鏈接中變化的只有 offset 這個參數,我們創建一個初始化函數,加入代理 IP,請求頭和信息,請求頭包含了 User-Agent、Cookie、Referer。 這些信息都在抓包工具可以看到。 請求數據 通過抓包分析出來了請求鏈接,我們就可以用 requests 庫來請求了,用返回碼是否為 200 做一個判斷,200 的話說明返回信息正常,我們再構建一個函數 parse_data() 來解析提取我們需要的返回信息。 def request_data(self): try: response = requests.get(self.base_url.format(self.offset), headers=self.headers, proxies=self.proxy) print(self.base_url.format(self.offset)) if 200 == response.status_code: self.parse_data(response.text) except Exception as e: print(e) time.sleep(2) pass 提取數據 通過分析返回的 Json 數據,我們可以看到,我們需要的數據都在 app_msg_ext_info 下面。 我們用 json.loads 解析返回的 Json 信息,把我們需要的列保存在 csv 文件中,有標題、摘要、文章鏈接三列信息,其他信息也可以自己加。 def parse_data(self, responseData): all_datas = json.loads(responseData) if 0 == all_datas['ret'] and all_datas['msg_count']>0: summy_datas = all_datas['general_msg_list'] datas = json.loads(summy_datas)['list'] a = [] for data in datas: try: title = data['app_msg_ext_info']['title'] title_child = data['app_msg_ext_info']['digest'] article_url = data['app_msg_ext_info']['content_url'] info = {} info['標題'] = title info['小標題'] = title_child info['文章鏈接'] = article_url a.append(info) except Exception as e: print(e) continue print('正在寫入文件') with open('Python公眾號文章合集1.csv', 'a', newline='', encoding='utf-8') as f: fieldnames = ['標題', '小標題', '文章鏈接'] # 控制列的順序 writer = csv.DictWriter(f, fieldnames=fieldnames) writer.writeheader() writer.writerows(a) print("寫入成功") print('----------------------------------------') time.sleep(int(format(random.randint(2, 5)))) self.offset = self.offset+10 self.request_data() else: print('抓取數據完畢!') 這樣,爬取的結果就會以 csv 格式保存起來。 運行代碼時,可能會遇到 SSLError 的報錯,最快的解決辦法就是 base_url 前面的 https 去掉 s 再運行。 保存markdown格式的鏈接 經常寫文章的人應該都知道,一般寫文字都會用 Markdown 的格式來寫文章,這樣的話,不管放在哪個平臺,文章的格式都不會變化。 在 Markdown 格式里,用 [文章標題](文章url鏈接) 表示,所以我們保存信息時再加一列信息就行,標題和文章鏈接都獲取了,Markdown 格式的 url 也就簡單了。 md_url = '[{}]'.format(title) + '({})'.format(article_url) 爬取完成后,效果如下。 我們把 md鏈接這一列全部粘貼到 Markdown 格式的筆記里就行了,大部分的筆記軟件都知道新建 Markdown 格式的文件的。 這樣,這些導航文章鏈接整理起來就是分類的事情了。 你用 Python 解決過生活中的小問題嗎?歡迎留言討論。 歡迎關注公眾號「Python知識圈」,公眾號后臺回復關鍵字,獲取更多干貨。 回復「英語」:送你英語 7000 單詞速記法,親測非常有效。 回復「編程」:免費獲贈2019最新編程資料,認真學完BAT offer 拿到手軟。 回復「賺錢」:領取簡單可實操的 36 個賺錢的小項目,每天多賺100塊零花錢。 回復「電子書」:免費送你10本Python電子書。
來源:OSCHINA
發布時間:2020-02-11 22:45:02
隨著數據中臺的概念愈發火熱,越來越多的技術公司開始慢慢駛入中臺的賽道,無論是數據中臺、技術中臺還是業務中臺等等,只要與中臺沾上邊兒的,大家理解的概念與期待產品應該有的樣子都各有不同又自成體系。也正因為此,被“中臺的風”吹著跑的各個企業決策者們,也對這個概念愈發好奇與著迷,似乎只要快速搭上“中臺”的這趟車,就能讓企業搖身一變,迅速轉型,進而成為行業佼佼者。而現實中,中臺真的是企業的靈丹妙藥嗎?而數瀾的數棲平臺又與數據中臺有什么關系呢? 先讓我們來回顧一下,中臺的概念是如何在國內突然爆紅的。 一、中臺,風從哪兒來? 「中臺」,原本是一種美軍作戰概念,即通過高效、統一的后方系統,來支持前端的機動部隊,提高作戰效率,減少冗余投入。 而這一概念在國內的開端,依據現在普遍流行的說法,則源于馬云 2015 年帶領團隊對開發出《部落沖突》、《皇室戰爭》等手游的芬蘭公司 Supercell 的一次拜訪。這家僅有 200 人不到的小公司,在 2015 財年已創造出 23.3 億美元的營收。而彼時的阿里,員工人數 3.4W,2015 財年的營收為 122.93 億美元。 能夠支撐 Supercell 公司這種高效散兵作戰模式的基礎,是他們經過 6 年時間沉淀下來的游戲中臺。中臺將游戲開發過程中公共、通用的游戲素材和算法整理起來,可以同時支持幾個小團隊在幾周時間內研發出一款新游戲,并能鼓勵員工充分試錯。但在愈發龐大的阿里生態體系內部,則因為業務的快速擴張和增長,出現了不同業務線之間“煙囪林立”、資源利用率低的問題。同時,部門之間常常因為所談合作不能立即產生收益,基于 KPI 的問題,最終都會被廢止掉。長此以往,公司的創新力實際上也會逐漸下降。 因此,馬云回國后,便開始全面推廣「中臺」戰略。并于同年底基于「大中臺,小前臺」的戰略,對組織架構進行了全面徹底的調整。 在阿里已經成功實施內部「中臺」戰略的兩年后,也就是從 2018 年底到 2019 年初這短短半年內,各個大型互聯網企業開始進行大規模組織架構調整。 2018 年,商業大環境開始發生變化。To C 業務開始逐漸進入瓶頸期,以往靠著流量紅利瘋狂擴張業務的大型互聯網企業,已經無法找到當年百試百靈的路數了。面對整個互聯網行業「水溫」的變化,企業意識到降本增效勢在必行,轉型同樣勢在必行。于是,各大型互聯網企業開始向 To B 業務模式轉型。同時,企業內部管理也開始走向精益化,以往的瘋狂擴張導致的各種「業務煙囪」、「部門墻」都在推倒名單中。 當云計算、大數據、人工智能、支付能力,成為企業轉型中業務較量的關鍵點時,數據能力、算法能力、調度能力的沉淀,則成為了考驗企業內部是否能夠快速支撐前臺業務,實現企業轉型的重要環節。而這些能力的背后,中臺的重要性不言而喻。 因此,「旁觀者們」從行動上,正式入局。 騰訊的「930 變革」、京東的中臺戰略、美團的數據全量打通、字節跳動的「直播大中臺」、百度的 All in AI 戰略,都讓中臺概念持續升溫,也讓整個行業熱度上漲不斷。 二、數據中臺,到底是什么? 數瀾科技鐵教授曾在《 數據中臺系列(一):你的企業真的需要「數據中臺」嗎? 》一文中提出: 數據中臺,它不僅僅是我們平時會提到的任何一種工具,也不僅僅是一種企業協同工作的方法,更不能把它當做是一個簡單的組織架構。 「 數據中臺,包括平臺、工具、數據、組織、流程、規范等一切與企業數據資產如何用起來所相關的。 企業所屬行業不同,經營策略不同,從而數據場景也千差萬別。再加上企業人員運用數據的能力參差不齊,這就導致了 每一家企業的數據中臺都是獨一無二的,不是購買一個所謂的數據中臺工具就能解決的。 當然合適的工具是可以降低企業應用數據難度的,這是強調的是「合適的」,而不是「高級的」?!? 數瀾數據中臺理念 而在數瀾看來,數據中臺是一種戰略選擇和組織形式,通過有型的產品支撐和實施方法論,解決大企業面臨的數據孤島、數據維護混亂、數據價值利用低的問題,依據企業特有的業務和架構,構建一套源源不斷地把數據變成資產并服務于業務的,可持續讓企業數據用起來的機制,讓數據可見、可懂、可用、可運營。 它的出現,基于以下兩個大前提: 1)豐富的數據維度; 以阿里巴巴為例,TCIF & IDMAPPING,淘寶消費者信息工廠和用戶識別,打通了阿里集團所有相關業務域,建立了幾千個標簽來刻畫用戶畫像。比如:你的真實性別、購物性別、音樂風格偏愛是「R&B」、你的線上購物行為特征是「愛薅羊毛還是財大氣粗」等等。如果沒有這些用戶數據維度,標簽的建立無法做到大而全,也就無法提升用戶畫像的精準度。 2)多個大數據場景。 同樣,數據服務支撐了阿里媽媽、淘寶、天貓、支付寶等多個業務板塊的場景,每天都有上億的調用次數。通過業務效果反饋,進而不斷優化調整數據和模型?,F在許多企業想要建設數據中臺,卻發現沒有實際的數據應用場景,無法進行切入。 三、數棲平臺,就是數瀾的數據中臺嗎? 正如上文提到的,數據中臺是各個企業獨有的一種戰略選擇和組織形式,市面上不可能存在數據中臺這樣的一個產品。 數瀾 CEO 風劍在面對記者采訪時曾說: 「在我看來,但凡說銷售數據中臺產品的人,都是在忽悠。我對數據中臺的定義是,數據中臺絕對是不可復制的;數據中臺是企業管理與運營的一套機制,一套讓數據可用起來的可持續的機制。 企業需要一套自己的數據資產管理體系與框架,但不可能存在一個所有企業和組織都適用的、通用的數據中臺框架。 」 也因此,數瀾的數棲平臺,僅僅是一個一站式數據應用基礎設施,它是在幫助用戶搭建自有數據中臺過程中,必不可少的一套工具。通過數棲平臺,企業可以讓自己業務沉淀多年的數據融匯打通,同時,通過開發平臺可以幫助企業內部的開發同學快速的進行對數據的處理,將數據成體系有邏輯的被管理起來,并且開發出更多可被業務使用的標簽,向上層提供更多的彈藥。 數棲平臺架構 而僅僅通過數棲平臺,企業很難快速的構建起完整的數據中臺。在這個過程中,就需要客戶對于自身業務及數據的全流程完整梳理,同時也需要數瀾將沉淀多年的中臺建設方法論融入到幫助客戶共同建設數據中臺的每一個細節中。只有科學的工具+先進的方法論,二者結合在一起,才能夠實現最終的目標。 掃 它 ↓ 深 入 了 解 數 棲 呀!
來源:OSCHINA
發布時間:2019-11-29 10:15:00
描述:今天早上到公司,發現測試集群中的一臺機器的磁盤使用率100%,而其他節點的磁盤使用率只有30%左右,檢查磁盤的使用情況后,使用率飽滿的機器上,90%的數據都是/dfs目錄下的,因為只是昨天項目測試剛跑進來的數據,刪是不可能的,所以只能想辦法對集群中的數據進行平衡。 引起這種情況的方式很多: 1. 添加新的Datanode節點 2. 人為干預將數據的副本數降低或者增加 我們都知道當HDFS出現數據不平衡的時候,就會造成MapReduce或Spark等應用程序無法很好的利用本地計算的優勢,而且Datanode節點之間也沒有更好的網絡帶寬利用率,某些Datanode節點的磁盤無法使用等等問題。 在Hadoop中,提供了hdfs balancer程序用來保證HDFS的數據平衡,我們先看一下這個程序的參數: hdfs balancer --help Usage: hdfs balancer [-policy ] the balancing policy: datanode or blockpool [-threshold ] Percentage of disk capacity [-exclude [-f | ]] Excludes the specified datanodes. [-include [-f | ]] Includes only the specified datanodes. [-idleiterations ] Number of consecutive idle iterations (-1 for Infinite) before exit. [-runDuringUpgrade] Whether to run the balancer during an ongoing HDFS upgrade.This is usually not desired since it will not affect used space on over-utilized machines. Generic options supported are -conf specify an application configuration file -D use value for given property -fs specify a namenode -jt specify a ResourceManager -files specify comma separated files to be copied to the map reduce cluster -libjars specify comma separated jar files to include in the classpath. -archives specify comma separated archives to be unarchived on the compute machines. The general command line syntax is bin/hadoop command [genericOptions] [commandOptions] 選項的含義根據描述應該很好理解,其中-threshold參數是用來判斷數據平衡的依據,值范圍為0-100。默認值為10,表示HDFS達到平衡狀態的磁盤使用率偏差值為10%,如果機器與機器之間磁盤使用率偏差小于10%,那么我們就認為HDFS集群已經達到了平衡的狀態。 我們可以從CDH平臺的CM上看到該參數是默認值和含義: 該參數具體含義為:判斷集群是否平衡的目標參數,每一個 Datanode 存儲使用率和集群總存儲使用率的差值都應該小于這個閥值,理論上,該參數設置的越小,整個集群就越平衡,但是在線上環境中,Hadoop集群在進行balance時,還在并發的進行數據的寫入和刪除,所以有可能無法到達設定的平衡參數值。 參數-policy表示的平衡策略,默認為DataNode。 該參數的具體含義為:應用于重新平衡 HDFS 存儲的策略。默認DataNode策略平衡了 DataNode 級別的存儲。這類似于之前發行版的平衡策略。BlockPool 策略平衡了塊池級別和 DataNode 級別的存儲。BlockPool 策略僅適用于 Federated HDFS 服務。 參數-exclude和-include是用來選擇balancer時,可以指定哪幾個DataNode之間重分布,也可以從HDFS集群中排除哪幾個節點不需要重分布,比如: hdfs balancer -include CDHD,CDHA,CDHM,CDHT,CDHO 除了上面的參數會影響HDFS數據重分布,還有如下的參數也會影響重分布, dfs.datanode.balance.bandwidthPerSec, dfs.balance.bandwidthPerSec 該默認設置:1048576(1M/s),個人建議如果機器的網卡和 交換機 的帶寬有限,可以適當降低該速度,一般默認就可以了。 該參數含義如下: HDFS平衡器檢測集群中使用過度或者使用不足的DataNode,并在這些DataNode之間移動數據塊來保證負載均衡。如果不對平衡操作進行帶寬限制,那么它會很快就會搶占所有的網絡資源,不會為Mapreduce作業或者數據輸入預留資源。參數dfs.balance.bandwidthPerSec定義了每個DataNode平衡操作所允許的最大使用帶寬,這個值的單位是byte,這是很不直觀的,因為網絡帶寬一般都是用bit來描述的。因此,在設置的時候,要先計算好。DataNode使用這個參數來控制網絡帶寬的使用,但不幸的是,這個參數在守護進程啟動的時候就讀入,導致管理員沒辦法在平衡運行時來修改這個值,如果需要調整就要重啟集群。 下面簡單介紹一下balancer的原理: Rebalance程序作為一個獨立的進程與NameNode進行分開執行。 步驟1: Rebalance Server從NameNode中獲取所有的DataNode情況:每一個DataNode磁盤使用情況。 步驟2: Rebalance Server計算哪些機器需要將數據移動,哪些機器可以接受移動的數據。并且從NameNode中獲取需要移動的數據分布情況。 步驟3: Rebalance Server計算出來可以將哪一臺機器的block移動到另一臺機器中去。 步驟4,5,6: 需要移動block的機器將數據移動的目的機器上去,同時刪除自己機器上的block數據。 步驟7: Rebalance Server獲取到本次數據移動的執行結果,并繼續執行這個過程,一直沒有數據可以移動或者HDFS集群以及達到了平衡的標準為止。 實戰: 找一個比較空閑的的Datanode執行,建議不要在NameNode執行: hdfs balancer -include CDHD,CDHA,CDHM,CDHT,CDHO 執行過程如下(部分),大家可以對照上面的流程看日志,可能會更清楚一點: 16/07/11 09:35:12 INFO balancer.Balancer: namenodes = [hdfs://CDHB:8022] 16/07/11 09:35:12 INFO balancer.Balancer: parameters = Balancer.Parameters [BalancingPolicy.Node, threshold = 10.0, max idle iteration = 5, number of nodes to be excluded = 0, number of nodes to be included = 5, run during upgrade = false] Time Stamp Iteration# Bytes Already Moved Bytes Left To Move Bytes Being Moved 16/07/11 09:35:14 INFO net.NetworkTopology: Adding a new node: /default/192.168.1.130:50010 16/07/11 09:35:14 INFO net.NetworkTopology: Adding a new node: /default/192.168.1.131:50010 16/07/11 09:35:14 INFO net.NetworkTopology: Adding a new node: /default/192.168.1.135:50010 16/07/11 09:35:14 INFO net.NetworkTopology: Adding a new node: /default/192.168.1.138:50010 16/07/11 09:35:14 INFO net.NetworkTopology: Adding a new node: /default/192.168.1.139:50010 16/07/11 09:35:14 INFO balancer.Balancer: 2 over-utilized: [192.168.1.130:50010:DISK, 192.168.1.135:50010:DISK] 16/07/11 09:35:14 INFO balancer.Balancer: 1 underutilized: [192.168.1.131:50010:DISK] 16/07/11 09:35:14 INFO balancer.Balancer: Need to move 203.48 GB to make the cluster balanced. 16/07/11 09:35:14 INFO balancer.Balancer: Decided to move 10 GB bytes from 192.168.1.130:50010:DISK to 192.168.1.131:50010:DISK 16/07/11 09:35:14 INFO balancer.Balancer: Decided to move 10 GB bytes from 192.168.1.135:50010:DISK to 192.168.1.138:50010:DISK 16/07/11 09:35:14 INFO balancer.Balancer: Will move 20 GB in this iteration 16/07/11 09:36:00 INFO balancer.Dispatcher: Successfully moved blk_1074048042_307309 with size=134217728 from 192.168.1.130:50010:DISK to 192.168.1.131:50010:DISK through 192.168.1.130:50010 16/07/11 09:36:07 INFO balancer.Dispatcher: Successfully moved blk_1074049886_309153 with size=134217728 from 192.168.1.135:50010:DISK to 192.168.1.138:50010:DISK through 192.168.1.135:50010 16/07/11 09:36:09 INFO balancer.Dispatcher: Successfully moved blk_1074048046_307313 with size=134217728 from 192.168.1.130:50010:DISK to 192.168.1.131:50010:DISK through 192.168.1.130:50010 16/07/11 09:36:10 INFO balancer.Dispatcher: Successfully moved blk_1074049900_309167 with size=134217728 from 192.168.1.135:50010:DISK to 192.168.1.138:50010:DISK through 192.168.1.135:50010 16/07/11 09:36:16 INFO balancer.Dispatcher: Successfully moved blk_1074048061_307328 with size=134217728 from 192.168.1.130:50010:DISK to 192.168.1.131:50010:DISK through 192.168.1.130:50010 16/07/11 09:36:17 INFO balancer.Dispatcher: Successfully moved blk_1074049877_309144 with size=134217728 from 192.168.1.135:50010:DISK to 192.168.1.138:50010:DISK through 192.168.1.135:50010 如果你使用的是CDH集成平臺,也可以通過CM來執行數據重分布: 步驟1:先選擇HDFS 組件 的頁面,如下: 步驟2:找到頁面右側的操作選擇,從下拉框中選擇數據“重新平衡”選項 步驟3:確定“重新平衡”就開始安裝默認的設置規則重新分布DataNode的Block數據了,可以用CM的日志中查看具體的執行過程。 參考博客: https://www.2cto.com/net/201607/525222.html
來源:OSCHINA
發布時間:2019-11-29 09:28:00
經過一段時間的演化,spark-binlog,delta-plus慢慢進入正軌。spark-binlog可以將MySQL binlog作為標準的Spark數據源來使用,目前支持insert/update/delete 三種事件的捕捉。 delta-plus則是對Delta Lake的一個增強庫,譬如在Delta Plus里實現了將binlog replay進Detla表,從而保證Delta表和數據庫表接近實時同步。除此之外,detla-plus還集成了譬如布隆過濾器等來盡快數據更新更新速度。更多特性可參考我寫的專欄。 數據湖Delta Lake 深入解析 ? zhuanlan.zhihu.com 圖標 有了這兩個庫,加上Spark,我們就能通過兩行代碼完成庫表的同步。 以前如果要做數據增量同步,大概需要這么個流程: 問題很明顯,Pipeline長,涉及到技術多,中間轉存其實也挺麻煩的,難做到實時。我們希望可以更簡單些,比如最好是這樣: 然后我可能只要寫如下代碼就可以搞定: val spark: SparkSession = ??? val df = spark.readStream. format("org.apache.spark.sql.mlsql.sources.MLSQLBinLogDataSource"). option("host","127.0.0.1"). option("port","3306"). option("userName","xxxxx"). option("password","xxxxx"). option("databaseNamePattern","mlsql_console"). option("tableNamePattern","script_file"). optioin("binlogIndex","4"). optioin("binlogFileOffset","4"). load() df.writeStream. format("org.apache.spark.sql.delta.sources.MLSQLDeltaDataSource"). option(" path ","/tmp/sync/tables"). option("mode","Append"). option("idCols","id"). option("duration","5"). option("syncType","binlog"). checkpointLocation("/tmp/cpl-binlog2") .mode(OutputMode.Append).save("{db}/{table}") 讀和寫,非常簡單。讀你需要提供MySQL binlog信息,寫的時候指定主鍵,以及表的存儲路徑。 如果使用MLSQL則更簡單,下面是一個完整的流式同步腳本: set streamName="binlog"; load binlog.`` where host="127.0.0.1" and port="3306" and userName="xxxx" and password="xxxxxx" and bingLogNamePrefix="mysql-bin" and binlogIndex="4" and binlogFileOffset="4" and databaseNamePattern="mlsql_console" and tableNamePattern="script_file" as table1; save append table1 as rate. mysql_{db}.{table} options mode="Append" and idCols="id" and duration="5" and syncType="binlog" and checkpointLocation="/tmp/cpl-binlog2"; 因為是增量同步,所以第一次需要先全量同步一次,用MLSQL也很簡單: connect jdbc where url="jdbc: mysql://127.0.0.1:3306/mlsql_console?characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&tinyInt1isBit=false " and driver="com.mysql.jdbc.Driver" and user="xxxxx" and password="xxxx" as db_cool; load jdbc. db_cool.script_file as script_file; save overwrite script_file as delta. mysql_mlsql_console.script_file ; load delta. mysql_mlsql_console.script_file as output; 如果你使用了Console則可在編輯器里直接運行: 如果你安裝了binlog2delta插件, 則可享受向導便利:
來源:OSCHINA
發布時間:2019-11-28 21:45:00
軟件發展到今,企業業務系統日趨復雜,開發一個業務系統需要掌握和關注的知識點越來越多。除實現業務邏輯本身,還需考慮很多非業務的基礎技術系統:如分布式cache和隊列、基礎服務能力集成、容量規劃、彈性伸縮等。這種情況下,研發門檻逐漸上升,效率逐漸下降。企業很難做到低成本創新、試錯和快速擴展業務。 阿里云Serverless應用引擎(簡稱SAE)產品的出現,很好地解決了這類問題。幫助 PaaS 層用戶免運維IaaS,按需使用,按量計費,提供了一系列通用能力,實現低門檻微服務/Web/多語言應用上云,有效解決成本及效率問題。 免運維、省成本是所有Serverless產品的核心優勢之一,SAE除了免運維底層IaaS外,還能讓用戶免部署和運維微服務注冊中心等組件,提供生產級別穩定可靠的微服務托管能力;免部署和運維K8s集群,零容器基礎的用戶也能擁抱K8s帶來的技術紅利。 很多企業在云上都會部署多套環境,存在很大的閑置浪費。使用SAE的“一鍵啟停開發測試環境”,按需釋放閑置資源,節省成本,需要使用時一鍵秒級拉起。后續SAE考慮基于K8s強大的編排能力,編排應用所需的DB、應用和應用的依賴,一鍵初始化拉起一套全新環境,以及多環境的克隆復制等。 云時代下彈性已成為新常態,很多業務場景無法提前預知,如天貓雙11、突發事件導致社交網站瞬時過載。和傳統彈性方案相比,SAE在成本和效率上都能做到極致?;诒O控觸發按需彈,不會出現資源浪費/不足,在效率上免去ECS擴容和ECS啟動的時間,能做到秒級彈性。 SAE三個主要指標數據:端到端啟動時長20s,滿足突發場景快速擴容的需要。支持0.5core的最小規格,進一步降低用戶使用成本。部署一套日常環境成本節省47%~57%。 據Serverless應用引擎(SAE)產品經理黛忻介紹,SAE繼續探索彈性效率和用戶成本的優化方案,繼續將一些基礎技術歸納抽象下沉到平臺,讓創新業務成為企業的唯一關注點。 據悉,阿里云是國內率先提供了面向應用的Serverless產品的云計算公司。截止目前,已有上百家企業通過 SAE 構建應用,實現業務的快速交付和IT成本優化。 原文鏈接 本文為云棲社區原創內容,未經允許不得轉載。
來源:OSCHINA
發布時間:2019-11-28 17:30:00
流媒體必定是5G市場上必不可少的一把利器,在云服務終端之下,流媒體的展現形式是多樣化的,我們再4G的的時代已經感受到了他無窮的魅力,我們如何看云服務的市場呢,云服務市場下的流媒體的未來就是本文主要闡述的內容   如果把云端比作一座城鎮,先后搬遷進來的居民們是形形色色的:IT企業率先發現了這片沃土開始挖掘建設,隨后金融、營銷咨詢、零售、醫療等等領域的企業也開始紛紛搬遷,不久后看這個城鎮景色繁華又生活便利,一些“家大業大”的企業也開始舉家遷移,例如工業、制造業、交通等等。   在這一過程中,有一位看似不起眼、卻又十分重要居民,就是流媒體。   流媒體,包括但不限于游戲、直播、VR、視頻等等大量內容行業。相比體量龐大的工業或制造業,這些領域的產業往往量級更輕,而且這些行業本身大多也是建設在完備的數字化基礎之上。這就導致,這些行業能否進行產業升級,很可能需要數據傳輸技術的支持。而當云計算真的著手對于流媒體領域進行改造時,迸發出的能量,卻又往往會超出人們的想象。   很多人認為,近年來云游戲的異軍突起崛起源于5G的抬頭,更快的數據傳輸速度,意味著終端與云端間更低的延遲,也意味著可以傳輸更龐大的數據體量,因此來支持游戲的高質量畫面,和對于玩家指令的響應。實際這種說法略顯片面,因為游戲發展的過程本身就是云化的過程。從PC游戲再到下載客戶端就能進行網游,再到“貪玩藍月”式的頁游,就連云游戲這一概念的首次提出都在2009年。降低硬件要求,隨時隨地進行游戲,本身就是游戲發展的一條路徑。但由于受網絡技術的限制,游戲在云化的過程中不得不割舍掉諸如畫質、即時反應等的優勢。   但隨著近年以來虛擬機技術和容器技術的提升,云計算廠商對于GPU虛擬化的能力不斷提高,可以給予云游戲更優質的計算資源,同時云計算廠商的網絡覆蓋能力加上不斷增強的邊緣計算能力,可以很好地解決網絡延遲問題。再結合音視頻編解碼技術、客戶端與服務器端的同步算法、網絡傳輸優化等等方面的提升,今天的云游戲平臺已經可以運行像《巫師》系列、《只狼》等原本對終端硬件有著很高要求的大作。   雖然今天的云游戲運行起來還常常會出現畫面質量不穩定、延遲卡頓等問題,但是不得不說,隨著云計算廠商能力的提升,游戲產業正在發生變化。同樣,直播、視頻、VR產業也以同樣的方式受到云計算發展的影響。視頻內容的分發、直播中的實時安全審核等等,實際也都是隨著云計算技術能力而得到的提升。
來源:OSCHINA
發布時間:2019-11-28 13:39:00
> Github原文鏈接 1 OOP-Klass(Ordinary Object Pointer)模型 OOP-Klass模型用來描述class的屬性和行為 設計為OOP和Klass兩部分是因為不希望每個對象都有一個C ++ vtbl指針, 因此,普通的oops沒有任何虛擬功能。 相反,他們將所有“虛擬”函數轉發到它們的klass,它具有vtbl并根據對象的實際類型執行C ++調度。 1.1 OOP oopDesc是對象類的最高父類。 {name}Desc類描述了Java對象的格式,可從C++訪問這些字段 路徑: /hotspot/share/oops/oop.hpp 完整的類層次結構,請閱讀 src/hotspot/share/oops/oopsHierarchy.hpp OOP體系 1.2 Klass Klass體系 Klass對象提供 語言級別的類對象(方法字典等) 為對象提供虛擬機調度行為 class Klass : public Metadata { friend class VMStructs; friend class JVMCIVMStructs; protected: // 如果添加指向任何元數據對象的新字段,則必須將此字段添加到Klass :: metaspace_pointers_do() // 注意:在klass結構的起始處將常用字段放在一起,以獲得更好的緩存行為(雖然可能不會有太大的區別,但可以肯定不會造成傷害) enum { _primary_super_limit = 8 }; // The "layout helper" is a combined descriptor of object layout. // For klasses which are neither instance nor array, the value is zero. // // For instances, layout helper is a positive number, the instance size. // This size is already passed through align_object_size and scaled to bytes. // The low order bit is set if instances of this class cannot be // allocated using the fastpath. // // For arrays, layout helper is a negative number, containing four // distinct bytes, as follows: // MSB:[tag, hsz, ebt, log2(esz)]:LSB // where: // tag is 0x80 if the elements are oops, 0xC0 if non-oops // hsz is array header size in bytes (i.e., offset of first element) // ebt is the BasicType of the elements // esz is the element size in bytes // This packed word is arranged so as to be quickly unpacked by the // various fast paths that use the various subfields. // // The esz bits can be used directly by a SLL instruction, without masking. // // Note that the array-kind tag looks like 0x00 for instance klasses, // since their length in bytes is always less than 24Mb. // // Final note: This comes first, immediately after C++ vtable, // because it is frequently queried. jint _layout_helper; // Klass identifier used to implement devirtualized oop closure dispatching. const KlassID _id; // The fields _super_check_offset, _secondary_super_cache, _secondary_supers // and _primary_supers all help make fast subtype checks. See big discussion // in doc/server_compiler/checktype.txt // // Where to look to observe a supertype (it is &_secondary_super_cache for // secondary supers, else is &_primary_supers[depth()]. juint _super_check_offset; // 類名. Instance classes: java/lang/String, etc. Array classes: [I, // [Ljava/lang/String;, etc. Set to zero for all other kinds of classes. Symbol* _name; // Cache of last observed secondary supertype Klass* _secondary_super_cache; // Array of all secondary supertypes Array* _secondary_supers; // Ordered list of all primary supertypes Klass* _primary_supers[_primary_super_limit]; // java/lang/Class instance mirroring this class OopHandle _java_mirror; // Superclass Klass* _super; // First subclass (NULL if none); _subklass->next_sibling() is next one Klass* volatile _subklass; // Sibling link (or NULL); links all subklasses of a klass Klass* volatile _next_sibling; // All klasses loaded by a class loader are chained through these links Klass* _next_link; // 用于加載此類的VM對類加載器的表示。 //提供訪問相應的java.lang.ClassLoader實例 ClassLoaderData* _class_loader_data; jint _modifier_flags; // 處理的訪問標志,由Class.getModifiers使用 AccessFlags _access_flags; // 訪問標志。 類/接口的區別就存儲在這里 JFR_ONLY(DEFINE_TRACE_ID_FIELD;) // 偏向鎖定實現和統計 // 64位塊優先,以避免碎片 jlong _last_biased_lock_bulk_revocation_time; markOop _prototype_header; // Used when biased locking is both enabled and disabled for this type jint _biased_lock_revocation_count; // 虛表長度 int _vtable_len; ... > 本文由博客一文多發平臺 OpenWrite 發布!
來源:OSCHINA
發布時間:2019-11-28 01:25:00
跨域支持 import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.web.cors.CorsConfiguration; import org.springframework.web.cors.UrlBasedCorsConfigurationSource; import org.springframework.web.filter.CorsFilter; @Configuration public class CorsConfig { /** * 跨域支持 * * @return / @Bean public CorsFilter corsFilter() { final UrlBasedCorsConfigurationSource source = new UrlBasedCorsConfigurationSource(); final CorsConfiguration config = new CorsConfiguration(); config.setAllowCredentials(true); // 允許cookies跨域 config.addAllowedOrigin(" ");// #允許向該服務器提交請求的URI, 表示全部允許 config.addAllowedHeader(" ");// #允許訪問的頭信息, 表示全部 config.setMaxAge(18000L);// 預檢請求的緩存時間(秒),即在這個時間段里,對于相同的跨域請求不會再預檢了 config.addAllowedMethod(" ");// 允許提交請求的方法,*表示全部允許 source.registerCorsConfiguration("/**", config); return new CorsFilter(source); } } RestTemplate高并發下異常與配置 import org.apache.http.client.HttpClient; import org.apache.http.impl.client.HttpClientBuilder; import org.apache.http.impl.client.HttpClients; import org.apache.http.impl.conn.PoolingHttpClientConnectionManager; import org.springframework.cloud.client.loadbalancer.LoadBalanced; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.http.client.HttpComponentsClientHttpRequestFactory; import org.springframework.web.client.DefaultResponseErrorHandler; import org.springframework.web.client.RestTemplate; /** * RestTemplate高并發下異常與配置說明 1、java.util.ConcurrentModificationException 2、java.net.SocketTimeoutException Connection timed out */ @Configuration public class RestTemplateConfig { @Bean @LoadBalanced public RestTemplate restTemplate() { // 長連接 PoolingHttpClientConnectionManager pollingConnectionManager = new PoolingHttpClientConnectionManager(); // 總連接數 pollingConnectionManager.setMaxTotal(1000); // 同路由的并發數 pollingConnectionManager.setDefaultMaxPerRoute(1000); HttpClientBuilder httpClientBuilder = HttpClients.custom(); httpClientBuilder.setConnectionManager(pollingConnectionManager); // 重試次數,默認是3次,沒有開啟 // httpClientBuilder.setRetryHandler(new DefaultHttpRequestRetryHandler(3, true)); HttpClient httpClient = httpClientBuilder.build(); HttpComponentsClientHttpRequestFactory clientHttpRequestFactory = new HttpComponentsClientHttpRequestFactory( httpClient); // 連接超時 ms clientHttpRequestFactory.setConnectTimeout(12000); // 數據讀取超時時間,即SocketTimeout ms clientHttpRequestFactory.setReadTimeout(12000); // 連接不夠用的等待時間,不宜過長,必須設置,比如連接不夠用時,時間過長將是災難性的 clientHttpRequestFactory.setConnectionRequestTimeout(200); // 緩沖請求數據,默認值是true。通過POST或者PUT大量發送數據時,建議將此屬性更改為false,以免耗盡內存。 // clientHttpRequestFactory.setBufferRequestBody(false); RestTemplate restTemplate = new RestTemplate(); restTemplate.setRequestFactory(clientHttpRequestFactory); restTemplate.setErrorHandler(new DefaultResponseErrorHandler()); return restTemplate; } } json數據Long為String /** * @description: 返回json是轉換long為string @create: 2019-08-02 17:49 **/ import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.module.SimpleModule; import com.fasterxml.jackson.databind.ser.std.ToStringSerializer; import org.springframework.context.annotation.Configuration; import org.springframework.http.converter.HttpMessageConverter; import org.springframework.http.converter.json.MappingJackson2HttpMessageConverter; import org.springframework.web.servlet.config.annotation.EnableWebMvc; import org.springframework.web.servlet.config.annotation.WebMvcConfigurer; import java.util.List; @EnableWebMvc @Configuration public class WebDataConvertConfig implements WebMvcConfigurer { @Override public void configureMessageConverters(List> converters) { MappingJackson2HttpMessageConverter jackson2HttpMessageConverter = new MappingJackson2HttpMessageConverter(); ObjectMapper objectMapper = new ObjectMapper(); /** * 序列換成json時,將所有的long變成string * 因為js中得數字類型不能包含所有的java long值 */ SimpleModule simpleModule = new SimpleModule(); simpleModule.addSerializer(Long.class, ToStringSerializer.instance); simpleModule.addSerializer(Long.TYPE, ToStringSerializer.instance); objectMapper.registerModule(simpleModule); jackson2HttpMessageConverter.setObjectMapper(objectMapper); converters.add(jackson2HttpMessageConverter); } } > 本文由作者pm1024:JAVA實驗手冊 發布,交流:583284584!
來源:OSCHINA
發布時間:2019-11-27 16:48:00
作者 | 易立 阿里云資深技術專家 containerd 是一個開源的行業標準容器運行時,關注于簡單、穩定和可移植,同時支持 Linux 和 Windows。 2016 年 12 月 14 日,Docker 公司宣布將 Docker Engine 的核心組件 containerd 捐贈到一個新的開源社區獨立發展和運營。阿里云、AWS、 Google、IBM 和 Microsoft 作為初始成員,共同建設 containerd 社區; 2017 年 3 月,Docker 將 containerd 捐獻給 CNCF(云原生計算基金會)。containerd 得到了快速的發展和廣泛的支持; Docker 引擎已經將 containerd 作為容器生命周期管理的基礎,Kubernetes 也在 2018 年 5 月,正式支持 containerd 作為容器運行時管理器; 2019 年 2 月,CNCF 宣布 containerd 畢業,成為生產可用的項目。 containerd 從 1.1 版本開始就已經內置了 Container Runtime Interface (CRI) 支持,進一步簡化了對 Kubernetes 的支持。其架構圖如下: 在 Kubernetes 場景下,containerd 與完整 Docker Engine 相比,具有更少的資源占用和更快的啟動速度。 圖片來源: containerd 紅帽主導的 cri-o 是與 containerd 競爭的容器運行時管理項目。containerd 與 cri-o 項目相比,在性能上具備優勢,在社區支持上也更加廣泛。 圖片來源: ebay 的分享 更重要的是 containerd 提供了靈活的擴展機制,支持各種符合 OCI(Open Container Initiative)的容器運行時實現,比如 runc 容器(也是熟知的 Docker 容器)、KataContainer、gVisor 和 Firecraker 等安全沙箱容器。 在 Kubernetes 環境中,可以用不同的 API 和命令行工具來管理容器 / Pod、鏡像等概念。為了便于大家理解,我們可以用下圖說明如何利用不同層次的 API 和 CLI 管理容器生命周期管理。 Kubectl:是集群層面的命令行工具,支持 Kubernetes 的基本概念 crictl :是針對節點上 CRI 的命令行工具 ctr :是針對 containerd 的命令行工具 體驗 Minikube 是體驗 containerd 作為 Kubernetes 容器運行時的最簡單方式,我們下面將其作為 Kubernetes 容器運行時,并支持 runc 和 gvisor 兩種不同的實現。 早期由于網絡訪問原因,很多朋友無法直接使用官方 Minikube 進行實驗。在最新的 Minikube 1.5 版本中,已經提供了完善的配置化方式,可以幫助大家利用阿里云的鏡像地址來獲取所需 Docker 鏡像和配置,同時支持 Docker/Containerd 等不同容器運行時。我們 創建 一個 Minikube 虛擬機環境,注意需要指明 --container-runtime=containerd 參數設置 containerd 作為容器運行時。同時 registry-mirror 也要替換成自己的阿里云鏡像加速地址。 $ minikube start --image-mirror-country cn \ --iso-url=https://kubernetes.oss-cn-hangzhou.aliyuncs.com/minikube/iso/minikube-v1.5.0.iso \ --registry-mirror=https://XXX.mirror.aliyuncs.com \ --container-runtime=containerd Darwin 10.14.6 上的 minikube v1.5.0 Automatically selected the 'hyperkit' driver (alternates: [virtualbox]) ? 您所在位置的已知存儲庫都無法訪問。正在將 registry.cn-hangzhou.aliyuncs.com/google_containers 用作后備存儲庫。 正在創建 hyperkit 虛擬機(CPUs=2,Memory=2000MB, Disk=20000MB)... ? VM is unable to connect to the selected image repository: command failed: curl -sS https://k8s.gcr.io/ stdout: stderr: curl: (7) Failed to connect to k8s.gcr.io port 443: Connection timed out : Process exited with status 7 正在 containerd 1.2.8 中準備 Kubernetes v1.16.2… 拉取鏡像 ... 正在啟動 Kubernetes ... ? Waiting for: apiserver etcd scheduler controller 完成!kubectl 已經配置至 "minikube" $ minikube dashboard Verifying dashboard health ... Launching proxy ... Verifying proxy health ... Opening http://127.0.0.1:54438/api/v1/namespaces/kubernetes-dashboard/services/http:kubernetes-dashboard:/proxy/ in your default browser... 部署測試應用 我們通過 Pod 部署一個 nginx 應用: $ cat nginx.yaml apiVersion: v1 kind: Pod metadata: name: nginx spec: containers: - name: nginx image: nginx $ kubectl apply -f nginx.yaml pod/nginx created $ kubectl exec nginx -- uname -a Linux nginx 4.19.76 #1 SMP Fri Oct 25 16:07:41 PDT 2019 x86_64 GNU/Linux 然后,我們開啟 minikube 對 gvisor 支持: $ minikube addons enable gvisor gvisor was successfully enabled $ kubectl get pod,runtimeclass gvisor -n kube-system NAME READY STATUS RESTARTS AGE pod/gvisor 1/1 Running 0 60m NAME CREATED AT runtimeclass.node.k8s.io/gvisor 2019-10-27T01:40:45Z $ kubectl get runtimeClass NAME CREATED AT gvisor 2019-10-27T01:40:45Z 當 gvisor pod 進入 Running 狀態的時候,可以部署 gvisor 測試應用。 我們可以看到 K8s 集群中已經注冊了一個 gvisor 的“runtimeClassName”。之后,開發者可以通過在 Pod 聲明中的 “runtimeClassName” 來選擇不同類型的容器運行時實現。比如,如下我們創建一個運行在 gvisor 沙箱容器中的 nginx 應用。 $ cat nginx-untrusted.yaml apiVersion: v1 kind: Pod metadata: name: nginx-untrusted spec: runtimeClassName: gvisor containers: - name: nginx image: nginx $ kubectl apply -f nginx-untrusted.yaml pod/nginx-untrusted created $ kubectl exec nginx-untrusted -- uname -a Linux nginx-untrusted 4.4 #1 SMP Sun Jan 10 15:06:54 PST 2016 x86_64 GNU/Linux 我們可以清楚地發現:由于基于 runc 的容器與宿主機共享操作系統內核,runc 容器中查看到的 OS 內核版本與 Minikube 宿主機 OS 內核版本相同;而 gvisor 的 runsc 容器采用了獨立內核,它和 Minikube 宿主機 OS 內核版本不同。 正是因為每個沙箱容器擁有獨立的內核,減小了安全攻擊面,具備更好的安全隔離特性。適合隔離不可信的應用,或者多租戶場景。注意:gvisor 在 minikube 中,通過 ptrace 對內核調用進行攔截,其性能損耗較大,此外 gvisor 的兼容性還有待增強。 使用 ctl 和 crictl 工具 我們現在可以進入進入 Minikube 虛擬機: $ minikube ssh containerd 支持通過名空間對容器資源進行隔離,查看現有 containerd 名空間: $ sudo ctr namespaces ls NAME LABELS k8s.io # 列出所有容器鏡像 $ sudo ctr --namespace=k8s.io images ls ... # 列出所有容器列表 $ sudo ctr --namespace=k8s.io containers ls 在 Kubernetes 環境更加簡單的方式是利用 crictl 對 pods 進行操作。 # 查看pod列表 $ sudo crictl pods POD ID CREATED STATE NAME NAMESPACE ATTEMPT 78bd560a70327 3 hours ago Ready nginx-untrusted default 0 94817393744fd 3 hours ago Ready nginx default 0 ... # 查看名稱包含nginx的pod的詳細信息 $ sudo crictl pods --name nginx -v ID: 78bd560a70327f14077c441aa40da7e7ad52835100795a0fa9e5668f41760288 Name: nginx-untrusted UID: dda218b1-d72e-4028-909d-55674fd99ea0 Namespace: default Status: Ready Created: 2019-10-27 02:40:02.660884453 +0000 UTC Labels: io.kubernetes.pod.name -> nginx-untrusted io.kubernetes.pod.namespace -> default io.kubernetes.pod.uid -> dda218b1-d72e-4028-909d-55674fd99ea0 Annotations: kubectl.kubernetes.io/last-applied-configuration -> {"apiVersion":"v1","kind":"Pod","metadata":{"annotations":{},"name":"nginx-untrusted","namespace":"default"},"spec":{"containers":[{"image":"nginx","name":"nginx"}],"runtimeClassName":"gvisor"}} kubernetes.io/config.seen -> 2019-10-27T02:40:00.675588392Z kubernetes.io/config.source -> api ID: 94817393744fd18b72212a00132a61c6cc08e031afe7b5295edafd3518032f9f Name: nginx UID: bfcf51de-c921-4a9a-a60a-09faab1906c4 Namespace: default Status: Ready Created: 2019-10-27 02:38:19.724289298 +0000 UTC Labels: io.kubernetes.pod.name -> nginx io.kubernetes.pod.namespace -> default io.kubernetes.pod.uid -> bfcf51de-c921-4a9a-a60a-09faab1906c4 Annotations: kubectl.kubernetes.io/last-applied-configuration -> {"apiVersion":"v1","kind":"Pod","metadata":{"annotations":{},"name":"nginx","namespace":"default"},"spec":{"containers":[{"image":"nginx","name":"nginx"}]}} kubernetes.io/config.seen -> 2019-10-27T02:38:18.206096389Z kubernetes.io/config.source -> api containerd 與 Docker 的關系 很多同學都關心 containerd 與 Docker 的關系,以及是否 containerd 可以取代 Docker? containerd 已經成為容器運行時的主流實現,也得到了 Docker 社區和 Kubernetes 社區的大力支持。Docker Engine 底層的容器生命周期管理也是基于 containerd 實現。 但是 Docker Engine 包含了更多的開發者工具鏈,比如鏡像構建。也包含了 Docker 自己的日志、存儲、網絡、Swarm 編排等能力。此外,絕大多數容器生態廠商,如安全、監控、開發等對 Docker Engine 的支持比較完善,對 containerd 的支持也在逐漸補齊。 所以在 Kubernetes 運行時環境,對安全和效率和定制化更加關注的用戶可以選擇 containerd 作為容器運行時環境;對于大多數開發者,繼續使用 Docker Engine 作為容器運行時也是一個不錯的選擇。 阿里云容器服務對 containerd 的支持 在阿里云 Kubernetes 服務 ACK,我們已經采用 containerd 作為容器運行時管理,來支撐安全沙箱容器和 runc 容器的混合部署。在現有產品中,我們和阿里云操作系統團隊、螞蟻金服一起支持了基于輕量虛擬化的 runV 沙箱容器,4Q 也將和操作系統團隊、安全團隊合作發布基于 Intel SGX 的可信加密沙箱容器。 具體產品信息可以參考 該文檔 。 Serverless Kubernetes(ASK)中,我們也利用 containerd 靈活的插件機制定制和剪裁了面向 nodeless 環境的容器運行時實現。 原文鏈接 containerd 與安全沙箱的 Kubernetes 初體驗 原文鏈接 本文為云棲社區原創內容,未經允許不得轉載。
來源:OSCHINA
發布時間:2019-11-27 15:58:00
前言 Apache Dubbo 是由阿里開源的一個RPC框架,除了基本的 RPC 功能以外,還提供了一整套的服務治理相關功能。目前它已經是 Apache 基金會下的頂級項目。 而 dubbo-go 則是 Dubbo 的 Go 語言實現。 最近在 dubbo-go 的 todo list 上發現,它還沒有實現 TPS Limit 的模塊,于是就抽空實現了這個部分。 TPS limit 實際上就是限流,比如說限制一分鐘內某個接口只能訪問 200 次,超過這個次數,則會被拒絕服務。在 Dubbo 的 Java 版本上,只有一個實現,就是 DefaultTPSLimiter 。 DefaultTPSLimiter 是在服務級別上進行限流。雖然 Dubbo 的官方文檔里面聲稱可以在 method 級別上進行限流,但是我看了一下它的源碼,實際上這個是做不到的。當然,如果自己通過實現 Filter 接口來實現 method 級別的限流,那么自然是可以的——這樣暴露了 Dubbo Java 版本實現的另外一個問題,就是 Dubbo 的 TpsLimitFilter 實現,是不允許接入自己 TpsLimiter 的實現的。這從它的源碼也可以看出來: 它直接寫死了 TpsLimiter 的實現。 這個實現的目前只是合并到了 develop 上,等下次發布正式版本的時候才會發布出來。 GitHub: https://github.com/apache/dubbo-go/pull/237 設計思路 于是我大概參考了一下 Dubbo 已有的實現,做了一點改進。 Dubbo 里面的核心抽象是 TpsLimiter 接口。 TpsLimitFilter 只是簡單調用了一下這個接口的方法而已: 這個抽象是很棒的。但是還欠缺了一些抽象。 實際上,一個 TPS Limit 就要解決三個問題: 對什么東西進行 limit 。比如說,對服務進行限流,或者對某個方法進行限流,或者對IP進行限流,或者對用戶進行限流; 如何判斷已經 over limitation 。這是從算法層面上考慮,即用什么算法來判斷某個調用進來的時候,已經超過配置的上限了; 被拒絕之后該如何處理。如果一個請求被斷定為已經 over limititation 了,那么該怎么處理; 所以在 TpsLimiter 接口的基礎上,我再加了兩個抽象: TpsLimiter TpsLimitStrategy RejectedExecutionHandler TpsLimiter 對應到 Java 的 TpsLimiter ,兩者是差不多。在我的設想里面,它既是頂級入口,還需要承擔解決第一個問題的職責。 而 TpsLimitStrategy 則是第二個問題的抽象的接口定義。它代表的是純粹的算法。該接口完全沒有參數,實際上,所有的實現需要維護自身的狀態——對于大部分實現而言,它大概只需要獲取一下系統時間戳,所以不需要參數。 最后一個接口 RejectedExecutionHandler 代表的是拒絕策略。在 TpsLimitFilter 里面,如果它調用 TpsLimiter 的實現,發現該請求被拒絕,那么就會使用該接口的實現來獲取一個返回值,返回給客戶端。 實現 其實實現沒太多好談的。不過有一些微妙的地方,我雖然在代碼里面注釋了,但是我覺得在這里再多說一點也是可以的。 首先提及的就是拒絕策略 RejectedExecutionHandler ,我就是提供了一種實現,就是隨便 log 了一下,什么都沒做。因為這個東西是強業務相關的,我也不能提供更加多的通用的實現。 方法與服務雙重支持的 TpsLimiter TpsLimiter 我只有一個實現,那就是 MethodServiceTpsLimiterImpl 。它就是根據配置,如果方法級別配置了參數,那么會在方法級別上進行限流。否則,如果在服務級別( ServiceKey )上有配置,那么會在服務級別進行限流。 舉個最復雜的例子:服務 A 限制 100 ,有四個方法,方法 M1 配置限制 40 ,方法 M2 和方法 M3 無配置,方法M4配置限制 -1 :那么方法 M1 會單獨限流 40 ; M2 和 M3 合并統計,被限制在 100 ;方法 M4 則會被忽略。 用戶可以配置具體的算法。比如說使用我接下來說的,我已經實現的三種實現。 FixedWindow 和 ThreadSafeFixedWindow FixedWindow 直接對應到 Java 的 DefaultTpsLimiter 。它采用的是 fixed-window 算法:比如說配置了一分鐘內只能調用 100 次。假如從 00:00 開始計時,那么 00:00-01:00 內,只能調用 100 次。只有到達 01:00 ,才會開啟新的窗口 01:00-02:00 。如圖: Fixed-Window圖示 Fixed-Window實現 這里有一個很有意思的地方。就是這個實現,是一個幾乎線程安全但是其實并不是線程安全的實現。 在所有的實現里面,它是最為簡單,而且性能最高的。我在衡量了一番之后,還是沒把它做成線程安全的。事實上, Java 版本的也不是線程安全的。 它只會在多個線程通過第 67 行的檢測之后,才會出現并發問題,這個時候就不是線程安全了。但是在最后的 return 語句中,那一整個是線程安全的。它因為不斷計數往上加,所以多個線程同時跑到這里,其實不會有什么問題。 現在我要揭露一個最為奇詭的特性了:并發越高,那么這個 race condition 就越嚴重,也就是說越不安全。 但是從實際使用角度而言,有極端 TPS 的還是比較少的。對于那些 TPS 只有幾百每秒的,是沒什么問題的。 為了保持和 Dubbo 一致的特性,我把它作為默認的實現。 此外,我還為它搞了一個線程安全版本,也就是 ThreadSafeFixedWindowTpsLimitStrategyImpl ,只是簡單的用 sync 封裝了一下,可以看做是一個 Decorator 模式的應用。 如果強求線程安全,可以考慮使用這個。 SlidingWindow 這是我比較喜歡的實現。它跟網絡協議里面的滑動窗口算法在理念上是比較接近的。 具體來說,假如我設置的同樣是一分鐘 1000 次,它統計的永遠是從當前時間點往前回溯一分鐘內,已經被調用了多少次。如果這一分鐘內,調用次數沒超過 1000 ,請求會被處理,如果已經超過,那么就會拒絕。 我再來描述一下, SldingWindow 和 FixedWindow 兩種算法的區別。這兩者很多人會搞混。假如當前的時間戳是 00:00 ,兩個算法同時收到了第一個請求,開啟第一個時間窗口。 那么 FixedWindow 就是 00:00-01:00 是第一個窗口,接下來依次是 01:00-02:00 , 02:00-03:00 , ...。當然假如說 01:00 之后的三十秒內都沒有請求,在 01:31 又來了一個請求,那么時間窗口就是 01:31-02:31 。 而 SildingWindow 則沒有這種概念。假如在 01:30 收到一個請求,那么 SlidingWindow 統計的則是 00:30-01:30 內有沒有達到 1000 次。它永遠計算的都是接收到請求的那一刻往前回溯一分鐘的請求數量。 如果還是覺得有困難,那么簡單來說就是 FixedWindow 往后看一分鐘, SlidingWindow 回溯一分鐘。 這個說法并不嚴謹,只是為了方便理解。 在真正寫這個實現的時候,我稍微改了一點點: 我用了一個隊列來保存每次訪問的時間戳。一般的寫法,都是請求進來,先把已經不在窗口時間內的時間戳刪掉,然后統計剩下的數量,也就是后面的 slow path 的那一堆邏輯。 但是我改了的一點是,我進來直接統計隊列里面的數量——也就是請求數量,如果都小于上限,那么我可以直接返回 true ,即 quick path 。 這種改進的核心就是:我只有在檢測到當前隊列里面有超過上限數量的請求數量時候,才會嘗試刪除已經不在窗口內的時間戳。 這其實就是,是每個請求過來,我都清理一下隊列呢?還是只有隊列元素超出數量了,我才清理呢?我選擇的是后者。 我認為這是一種改進……當然從本質上來說,整體開銷是沒有減少的——因為 golang 語言里面 List 的實現,一次多刪除幾個,和每次刪除一個,多刪幾次,并沒有多大的區別。 算法總結 無論是 FixedWindow 算法還是 SlidingWindow 算法都有一個固有的缺陷,就是這個時間窗口難控制。 我們設想一下,假如說我們把時間窗口設置為一分鐘,允許 1000 次調用。然而,在前十秒的時候就調用了 1000 次。在后面的五十秒,服務器雖然將所有的請求都處理完了,然是因為窗口還沒到新窗口,所以這個時間段過來的請求,全部會被拒絕。 解決的方案就是調小時間窗口,比如調整到一秒。但是時間窗口的縮小,會導致 FixedWindow 算法的 race condition 情況加劇。 那些沒有實現的 基于特定業務對象的限流 舉例來說,某些特殊業務用的針對用戶 ID 進行限流和針對 IP 進行限流,我就沒有在 dubbo-go 里面實現。有需要的可以通過實現 TpsLimiter 接口來完成。 全局 TPS limit 這篇文章之前討論的都是單機限流。如果全局限流,比如說針對某個客戶,它購買的服務是每分鐘調用 100 次,那么就需要全局限流——雖然這種 case 都不會用 Filter 方案,而是另外做一個 API 接入控制。 比如說,很常用的使用 Redis 進行限流的。針對某個客戶,一分鐘只能訪問 100 次,那我就用客戶 ID 做 key , value 設置成 List ,每次調用過來,隨便塞一個值進去,設置過期時間一分鐘。那么每次統計只需要統計當前 key 的存活的值的數量就可以了。 這種我也沒實現,因為好像沒什么需求。國內討論 TPS limit 都是討論單機 TPS limit 比較多。 這個同樣可以通過實現 TpsLimiter 接口來實現。 Leaky Bucket 算法 這個本來可以是 TpsLimitStrategy 的一種實現的。后來我覺得,它其實并沒有特別大的優勢——雖然號稱可以做到均勻,但是其實并做不到真正的均勻。通過調整 SlidingWindow 的窗口大小,是可以接近它宣稱的均勻消費的效果的。比如說調整到一秒,那其實就已經很均勻了。而這并不會帶來多少額外的開銷。 作者信息: 鄧明,畢業于南京大學,就職于eBay Payment部門,負責退款業務開發 原文鏈接 本文為云棲社區原創內容,未經允許不得轉載。
來源:OSCHINA
發布時間:2019-11-27 15:49:00
通過Eigen的矩陣運算,將點云進行Z軸旋轉45°,再沿X軸平移2.5. http://pointclouds.org/documentation/tutorials/matrix_transform.php#matrix-transform #include #include pcl::PointCloud::Ptr source_cloud(new pcl::PointCloud()); pcl::PointCloud::Ptr transformed_cloud(new pcl::PointCloud()); float theta = M_PI / 4; // The angle of rotation in radians Eigen::Affine3f transform_2 = Eigen::Affine3f::Identity(); // Define a translation of 2.5 meters on the x axis. transform_2.translation() << 2.5, 0.0, 0.0; // The same rotation matrix as before; theta radians around Z axis transform_2.rotate(Eigen::AngleAxisf(theta, Eigen::Vector3f::UnitZ())); // Print the transformation printf("\nMethod #2: using an Affine3f\n"); std::cout << transform_2.matrix() << std::endl; // Executing the transformation // You can either apply transform_1 or transform_2; they are the same pcl::transformPointCloud(*source_cloud, *transformed_cloud, transform_2);
來源:OSCHINA
發布時間:2019-11-27 15:29:00
HTTP Proxy Demo 代碼 1、Python #! -- encoding:utf-8 -- import requests # 要訪問的目標頁面 targetUrl = "http://ip.hahado.cn/ip" # 代理服務器 proxyHost = "ip.hahado.cn" proxyPort = "39010" # 代理隧道驗證信息 proxyUser = "username" proxyPass = "password" proxyMeta = "http://%(user)s:%(pass)s@%(host)s:%(port)s" % { "host" : proxyHost, "port" : proxyPort, "user" : proxyUser, "pass" : proxyPass, } proxies = { "http" : proxyMeta, "https" : proxyMeta, } resp = requests.get(targetUrl, proxies=proxies) print resp.status_code print resp.text 2、C Sharp HttpWebRequest request = (HttpWebRequest)WebRequest.Create("http://ip.hahado.cn/ip"); WebProxy myProxy = new WebProxy(); Uri newUri = new Uri("http://ip.hahado.cn:39010"); myProxy.Address = newUri; myProxy.Credentials = new NetworkCredential("username", "password"); request.Proxy = myProxy; 3、PHP // 要訪問的目標頁面 $targetUrl = "http://ip.hahado.cn/ip"; //$targetUrl = "http://ip.hahado.cn/switch-ip"; //$targetUrl = "http://ip.hahado.cn/current-ip"; // 代理服務器 define("PROXY_SERVER", "ip.hahado.cn:39010"); // 隧道身份信息 define("PROXY_USER", "username"); define("PROXY_PASS", "password"); $proxyAuth = base64_encode(PROXY_USER . ":" . PROXY_PASS); $headers = implode("\r\n", [ "Proxy-Authorization: Basic {$proxyAuth}", "Proxy-Switch-Ip: yes", ]); $options = [ "http" => [ "proxy" => $proxyServer, "header" => $headers, "method" => "GET", ], ]; $context = stream_context_create($options); $result = file_get_contents($url, false, $context); var_dump($result); 4、JAVA import java.io.ByteArrayOutputStream; import java.io.InputStream; import java.net.Authenticator; import java.net.HttpURLConnection; import java.net.InetSocketAddress; import java.net.PasswordAuthentication; import java.net.Proxy; import java.net.URL; class ProxyAuthenticator extends Authenticator { private String user, password; public ProxyAuthenticator(String user, String password) { this.user = user; this.password = password; } protected PasswordAuthentication getPasswordAuthentication() { return new PasswordAuthentication(user, password.toCharArray()); } } /** * 注意:下面代碼僅僅實現HTTP請求鏈接,每一次請求都是無狀態保留的,僅僅是這次請求是更換IP的,如果下次請求的IP地址會改變 * 如果是多線程訪問的話,只要將下面的代碼嵌入到你自己的業務邏輯里面,那么每次都會用新的IP進行訪問,如果擔心IP有重復, * 自己可以維護IP的使用情況,并做校驗。 */ public class ProxyDemo { public static void main(String args[]) throws Exception { // 要訪問的目標頁面 String targetUrl = "http://ip.hahado.cn/ip"; //String targetUrl = "http://ip.hahado.cn/switch-ip"; //String targetUrl = "http://ip.hahado.cn/current-ip"; // 代理服務器 String proxyServer = "ip.hahado.cn"; int proxyPort = 39010; // 代理隧道驗證信息 String proxyUser = "username"; String proxyPass = "password"; try { URL url = new URL(targetUrl); Authenticator.setDefault(new ProxyAuthenticator(proxyUser, proxyPass)); // 創建代理服務器地址對象 InetSocketAddress addr = new InetSocketAddress(proxyServer, proxyPort); // 創建HTTP類型代理對象 Proxy proxy = new Proxy(Proxy.Type.HTTP, addr); // 設置通過代理訪問目標頁面 HttpURLConnection connection = (HttpURLConnection) url.openConnection(proxy); // 設置IP切換頭 connection.setRequestProperty("Proxy-Switch-Ip","yes"); // 解析返回數據 byte[] response = readStream(connection.getInputStream()); System.out.println(new String(response)); } catch (Exception e) { System.out.println(e.getLocalizedMessage()); } } /** * 將輸入流轉換成字符串 * * @param inStream * @return * @throws Exception */ public static byte[] readStream(InputStream inStream) throws Exception { ByteArrayOutputStream outSteam = new ByteArrayOutputStream(); byte[] buffer = new byte[1024]; int len = -1; while ((len = inStream.read(buffer)) != -1) { outSteam.write(buffer, 0, len); } outSteam.close(); inStream.close(); return outSteam.toByteArray(); } } 5、golang package main import ( "net/url" "net/http" "bytes" "fmt" "io/ioutil" ) const ProxyServer = "ip.hahado.cn:39010" type ProxyAuth struct { License string SecretKey string } func (p ProxyAuth) ProxyClient() http.Client { proxyURL, _ := url.Parse("http://" + p.License + ":" + p.SecretKey + "@" + ProxyServer) return http.Client{Transport: &http.Transport{Proxy:http.ProxyURL(proxyURL)}} } func main() { targetURI := "http://ip.hahaod.cn/ip" //targetURI := "http://ip.hahaod.cn/switch-ip" //targetURI := "http://ip.hahaod.cn/current-ip" // 初始化 proxy http client client := ProxyAuth{License: "username", SecretKey: "password"}.ProxyClient() request, _ := http.NewRequest("GET", targetURI, bytes.NewBuffer([] byte(``))) // 切換IP (只支持 HTTP) request.Header.Set("Proxy-Switch-Ip", "yes") response, err := client.Do(request) if err != nil { panic("failed to connect: " + err.Error()) } else { bodyByte, err := ioutil.ReadAll(response.Body) if err != nil { fmt.Println("讀取 Body 時出錯", err) return } response.Body.Close() body := string(bodyByte) fmt.Println("Response Status:", response.Status) fmt.Println("Response Header:", response.Header) fmt.Println("Response Body:\n", body) } } 提取代理IP連接: https://v.duoip.cn/customer/signup/?sale=xujinyang1991
來源:OSCHINA
發布時間:2019-11-27 11:52:00
極簡教程-Python的容器部署 場景描述:我們使用一個簡單的python項目,本項目是中文分詞的算法。如何實現Docker安裝部署。 第一步: Win10下創建目錄文本。選擇在D盤下創建docker目錄,分別新建三個文件:Dockerfile,app.py,equirements.txt Dockerfile(沒有后綴):一個文本文件,包含了一條條的指令(Instruction),每一條指令構建一層,因此每一條指令的內容,就是描述該層應當如何構建。創建鏡像必須文件。 # 基于鏡像基礎 FROM python:3.7 # 設置代碼文件夾工作目錄 /app WORKDIR /app # 復制當前代碼文件到容器中 /app ADD . /app # 安裝所需的包 RUN pip install -r requirements.txt # Run app.py when the container launches CMD ["python", "app.py"] app.py:python項目的源代碼,這里測試的單個python文件,如果是一個完整項目,可以將整個文件夾拷貝到這里。 # coding:utf8 ? """ DESC: Python數據預處理之第一個分詞程序范例 Author:伏草惟存 Prompt: code in Python3 env """ ? import jieba ? str = "道路千萬條,安全第一條;行車不規范,親人兩行淚。" print("原句: \n" + str) ? seg_list = jieba.cut(str) print("分詞: \n" + " / ".join(seg_list)) equirements.txt :所需要的插件,以python為例,其獲取方法是cmd命令,進入到【D:\docker】目錄,執行命令:pip freeze > requirements.txt 第二步:生成鏡像。本文采用的windows環境。docker build -t friendlyhello .命令中最后的點不要忘記,這里表示當前目錄 第三步:查看鏡像是否生成 第四步:運行鏡像程序,這里可以看到分詞效果
來源:OSCHINA
發布時間:2019-11-27 09:43:00
服務器風扇的作用是加快散熱片表面空氣的流動速度,以提高散熱片和空氣的熱交換速度。風扇作為風冷散熱器的兩大重要部件之一,它的性能的好壞往往對服務器散熱器效果和使用壽命起著一定的決定性作用。在選購服務器風扇的時候,考慮風扇的基本指標有以下幾點:      1、風扇功率      功率越大,風扇風力越強勁,散熱效果也就越好。而風扇的功率與風扇的轉速又是有直接聯系的,也就是說風扇的轉速越高,風扇也就越強勁有力。      2、風扇轉速      風扇的轉速與風扇的功率是密不可分的,轉速的大小直接影響到風扇功率的大小。風扇的轉速越高,向CPU傳送的進風量就越大,CPU獲得的冷卻效果就會越好。但是一旦風扇的轉速超過它的額定值,那么風扇在長時間超負荷運作之下,本身產生熱量也會增高,而且時間越長產生的熱量也就越大,此時風扇不但不能起到很好的冷卻效果,反而會“火上澆油”。      另外,風扇在高速動轉過程中,可能會產生很強的噪音,時間長了可能會縮短風扇壽命;還有,較高的運轉速度需要較大的功率來提供“動力源”,而高動力源又是從主板和電源中的功率中獲得的,一旦超出主板的負荷就會引起系統的不穩定。因此,我們在選擇風扇的,同時應該平衡風扇的轉速和發熱量之間的關系,最好選擇轉速在3500轉至5200轉之間的風扇。      3、風扇材質      CPU發出熱量首先傳導到散熱片,再由風扇帶來的冷空氣吹拂而把散熱片的熱量帶走,而風扇所能傳導的熱量快慢是由組成風扇的導熱片的材質決定的,因此風扇的材料質量對熱量的傳導性能具有很大的作用,為此我們在選擇風扇時一定要注意風扇導熱片的熱傳導性能是否良好。      4、風扇噪聲      太大的噪音將會影響我們操作電腦的心情。噪音太小通常與風扇的功率有關,功率越大、轉速也就越快,此時一個負影響也就表現出來了,那就是噪聲。我們在購買風扇時,一定要先試聽一下風扇的噪音,如果太大,那么最好是不要買。如今風扇為了減輕噪聲都投入了一些設計,例如改變扇葉的角度,增加扇軸的潤滑度和穩定度等。      現在有很多便宜的風扇用的軸承都是油封的,由銅質外套和鋼制軸芯組成,長時間工作之后扇軸潤滑度不夠,風扇噪音增大、轉速減低,這很容易導致機器過熱而出現死機現象,嚴重的時候還有可能把機芯燒壞。      現在有許多知名品牌的風扇開始使用滾珠軸承,這種軸承就是利用許多鋼珠來作為減少摩擦的介質。這種滾珠風扇的特點就是風力大,壽命長、噪音小,但成本比較高,只有高檔風扇才可能使用到它。      5、風扇排風量      風扇排風量可以說是一個比較綜合的指標,因此我們可以這么說排風量是衡量一個風扇性能的最直接因素。如果一個風扇可以達到5000轉/分,不過如果扇葉是扁平的話,那是不會形成任何氣流的,所以關系到散熱風扇的排風量的時候,扇葉的角度也是很重要的一個因素。測試一個風扇排風量的方法很容易,只要將手放在散熱片附近感受一下吹出的風的強度即可,通常質量好的風扇,即使我們在離它很遠的位置,也仍然可以感到風流,這就是散熱效果上佳的表現。      6、風扇葉片      同一風扇如果其他部分保持不變,只將葉片由五扇葉改為七扇葉,風量變化可能不會增加多少。但是就風扇的轉速而言,七扇葉的轉速會低于五扇葉(通風量相同的情況下),相對的如果采用七扇葉風扇,軸承的磨損,漏油情況較少,風扇的壽命較長。如果五扇葉和七扇葉的轉速相同,七扇葉的通風量會更大。風扇的轉速越高,相應的壽命就越短,噪音也越大。另外,風扇的扇葉越厚,葉片斜角越大,則風壓也越大。扇葉的入口角(以45度為最大)也是決定風扇通風量的重要因素之一。      我們知道,服務器AMD CPU的發熱量比INTEL大,但是AMD CPU所能承受的最高溫度也比INTEL高。正由于AMD CPU發熱量大,相對與AMD CPU來說,風扇散熱片底部的厚度越厚越好,而INTEL的發熱量小,散熱片的厚度可以小一些。由于散熱片的厚度要求不同,最終對風扇的要求也不同。      對于底部較厚的散熱片,它可以很快吸收到CPU的熱量,存儲的熱量也更多。為了不使CPU長期工作在高溫環境下。除了要求散熱片本身的導熱性較好以外,還需要更大的風流來吹散CPU熱量。如果要把底部的熱量吹走,就需要風扇產生足夠的風壓,能將風流吹到散熱片的底部,對流方式的散熱才能從底部開始進行。
來源:OSCHINA
發布時間:2019-11-26 14:23:00
前面文章介紹過 Hadoop分布式的配置 ,但是設計到高可用,這次使用zookeeper配置Hadoop高可用。 1.環境準備 1)修改IP 2)修改主機名及主機名和IP地址的映射 3)關閉防火墻 4)ssh免密登錄 5)創建hadoop用戶和用戶組 6)安裝更新安裝源、JDK、配置環境變量等 2.服務器規劃 Node1 Node2 Node3 NameNode NameNode JournalNode JournalNode JournalNode DataNode DataNode DataNode ZK ZK ZK ResourceManager NodeManager NodeManager ResourceManager NodeManager 3.配置Zookeeper集群 參考我的之前的文章 Zookeeper安裝和配置說明 4.安裝Hadoop 1)官方下載地址:http://hadoop.apache.org/ 2)解壓hadoop2.7.2至/usr/local/hadoop2.7 3)修改hadoop2.7的所屬組和所屬者為hadoop chown -R hadoop:hadoop /usr/local/hadoop2.7 4)配置HADOOP_HOME vim /etc/profile #HADOOP_HOME export HADOOP_HOME=/usr/local/hadoop2.7 export HADOOP_CONF_DIR=${HADOOP_HOME}/etc/hadoop export HADOOP_COMMON_LIB_NATIVE_DIR=${HADOOP_HOME}/lib/native export PATH=$HADOOP_HOME/bin:$HADOOP_HOME/sbin:$PATH 5.配置Hadoop集群高可用 5.1配置HDFS集群 hadoop-env.sh export JAVA_HOME=/usr/local/jdk1.8.0_221 hadoop-site.xml dfs.nameservices hadoopCluster dfs.ha.namenodes.hadoopCluster nn1,nn2 dfs.namenode.rpc-address.hadoopCluster.nn1 node1:9000 dfs.namenode.rpc-address.hadoopCluster.nn2 node2:9000 dfs.namenode.http-address.hadoopCluster.nn1 node1:50070 dfs.namenode.http-address.hadoopCluster.nn2 node2:50070 dfs.namenode.shared.edits.dir qjournal://node1:8485;node2:8485;node3:8485/hadoopCluster dfs.ha.fencing.methods sshfence dfs.ha.fencing.ssh.private-key-files /home/hadoop/.ssh/id_rsa dfs.journalnode.edits.dir /data_disk/hadoop/jn dfs.permissions.enable false dfs.client.failover.proxy.provider.hadoopCluster org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider dfs.namenode.name.dir file:///data_disk/hadoop/name 為了保證元數據的安全一般配置多個不同目錄 dfs.datanode.data.dir file:///data_disk/hadoop/data datanode 的數據存儲目錄 dfs.replication 3 HDFS的數據塊的副本存儲個數,默認是3 core-site.xml fs.defaultFS hdfs://hadoopCluster hadoop.tmp.dir file:///data_disk/hadoop/tmp 啟動hadoop集群 (1)在各個JournalNode節點上,輸入以下命令啟動journalnode服務 sbin/hadoop-daemon.sh start journalnode (2)在[nn1]上,對其進行格式化,并啟動 bin/hdfs namenode -format sbin/hadoop-daemon.sh start namenode (3)在[nn2]上,同步nn1的元數據信息 bin/hdfs namenode -bootstrapStandby (4)啟動[nn2] sbin/hadoop-daemon.sh start namenode (5)在[nn1]上,啟動所有datanode sbin/hadoop-daemons.sh start datanode (6)將[nn1]切換為Active bin/hdfs haadmin -transitionToActive nn1 (7)查看是否Active bin/hdfs haadmin -getServiceState nn1 打開瀏覽器查看namenode的狀態 5.2配置HDFS自動故障轉移 在hdfs-site.xml中增加 dfs.ha.automatic-failover.enabled true 在core-site.xml文件中增加 ha.zookeeper.quorum node1:2181,node2:2181,node3:2181 5.2.1啟動 (1)關閉所有HDFS服務: sbin/stop-dfs.sh (2)啟動Zookeeper集群: bin/zkServer.sh start (3)初始化HA在Zookeeper中狀態: bin/hdfs zkfc -formatZK (4)啟動HDFS服務: sbin/start-dfs.sh (5)在各個NameNode節點上啟動DFSZK Failover Controller,先在哪臺機器啟動,哪個機器的NameNode就是Active NameNode sbin/hadoop-daemon.sh start zkfc 5.2.2驗證 (1)將Active NameNode進程kill kill -9 namenode的進程id (2)將Active NameNode機器斷開網絡 service network stop 如果kill nn1后nn2沒有變成active,可能有以下原因 (1)ssh免密登錄沒配置好 (2)未找到fuster程序,導致無法進行fence,參考 博文 5.3YARN-HA配置 yarn-site.xml yarn.nodemanager.aux-services mapreduce_shuffle yarn.resourcemanager.ha.enabled true yarn.resourcemanager.cluster-id cluster-yarn1 yarn.resourcemanager.ha.rm-ids rm1,rm2 yarn.resourcemanager.hostname.rm1 node1 yarn.resourcemanager.hostname.rm2 node3 yarn.resourcemanager.zk-address node1:2181,node2:2181,node3:2181 yarn.resourcemanager.recovery.enabled true yarn.resourcemanager.store.class org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore 5.3.1啟動HDFS (1)在各個JournalNode節點上,輸入以下命令啟動journalnode服務: sbin/hadoop-daemon.sh start journalnode (2)在[nn1]上,對其進行格式化,并啟動: bin/hdfs namenode -format sbin/hadoop-daemon.sh start namenode (3)在[nn2]上,同步nn1的元數據信息: bin/hdfs namenode -bootstrapStandby (4)啟動[nn2]: sbin/hadoop-daemon.sh start namenode (5)啟動所有DataNode sbin/hadoop-daemons.sh start datanode (6)將[nn1]切換為Active bin/hdfs haadmin -transitionToActive nn1 5.3.2啟動YARN (1)在node1中執行: sbin/start-yarn.sh (2)在node3中執行: sbin/yarn-daemon.sh start resourcemanager (3)查看服務狀態 bin/yarn rmadmin -getServiceState rm1
來源:OSCHINA
發布時間:2019-11-26 10:12:00
漏洞描述 Apache Flink是一個用于分布式流和批處理數據的開放源碼平臺。Flink的核心是一個流數據流引擎,它為數據流上的分布式計算提供數據分發、通信和容錯功能。Flink在流引擎之上構建批處理,覆蓋本地迭代支持、托管內存和程序優化。近日有安全研究人員發現apache flink允許上傳任意的jar包從而導致遠程代碼執行。 漏洞級別 高危 影響范圍 Apache Flink <=1.9.1 漏洞復現 首先下載Apache Flink 1.9.1安裝包并進行解壓,之后進入bin文件夾內運行./start-cluster.sh啟動環境,瀏覽器訪問 http://ip:8081驗證是否成功,如下圖所示: 接著使用生成jar的木馬文件并進行上傳,如下圖所示: 開啟msf進行監聽并點擊提交,可看到成功返回一個shell。如下圖所示: 修復建議 建議用戶關注Apache Flink官網,及時獲取該漏洞最新補丁。 臨時解決建議 設置IP白名單只允許信任的IP訪問控制臺并添加訪問認證。 漏洞檢測方法 目前github已有相應公開的檢測poc,如下圖所示: 鏈接: https://github.com/LandGrey/flink-unauth-rce ? 更多Flink相關博文歡迎關注實時流式計算 本文由博客一文多發平臺 OpenWrite 發布!
來源:OSCHINA
發布時間:2019-11-26 09:25:00
集群容錯中的第二個關鍵詞Router,中文意思就是路由 前端的路由和后端的路由他們是不同的,但是思想是基本一致的. 鑒于很多技術文章都有一個詬病,就是只講概念,卻不講應用場景,其實Router在應用隔離,讀寫分離,灰度發布中都有它的影子.因此本篇用灰度發布的例子來做前期的鋪墊 灰度發布 百度百科 你發布應用的時候,不停止對外的服務,也就是讓用戶感覺不到你在發布 那么下面演示一下灰度發布 1.首先在192.168.56.2和192.168.56.3兩臺機器上啟動Provider,然后啟動Consumer,如下圖 2.假設我們要升級192.168.56.2服務器上的服務,接著我們去dubbo的控制臺配置路由,切斷192.168.56.2的流量,配置完成并且啟動之后,就看到此時只調用192.168.56.3的服務 3.假設此時你在192.168.56.2服務器升級服務,升級完成后再次將啟動服務. 4.由于服務已經升級完成,那么我們此時我們要把剛才的禁用路由取消點,于是點了禁用,但是此時dubbo的這個管理平臺就出現了bug,如下圖所示 驚奇的發現點了禁用,數據就變兩條了,繼續點禁用,還是兩條,而且刪除還刪除不了,這樣就很蛋疼了...但是一直刪不了也不是辦法,解決辦法也是有的,那就是去zookeeper上刪除節點 Mac上好像沒有特別好用的zookeeper可視化客戶端工具,于是我就用了這個idea的zookeeper插件 只要將這個zookeeper節點刪除 然后刷新控制臺的界面,如下圖那么就只剩下一條了 6.那么此時我們再看控制臺的輸出,已經恢復正常,整個灰度發布流程結束 Router的繼承體系圖 從圖中可以看出,他有四個實現類 MockInvokersSelector在 Dubbo 源碼解析(一) - 集群架構的設計 中提到這里 ScriptRouter在dubbo的測試用例中就有用到,這個類的源碼不多,也就124行.引用官網的描述 > 腳本路由規則 支持 JDK 腳本引擎的所有腳本,比如:javascript, jruby, groovy 等,通過 type=javascript 參數設置腳本類型,缺省為 javascript。 當然看到這里可能你可能還是沒有感覺出這個類有什么不可替代的作用,你注意一下這個類中有個ScriptEngine的屬性 那么我可以舉一個應用場景給你 假如有這么個表達式如下: double d = (1+1-(2-4)*2)/24; //沒有問題 // 但是假如這個表達式是這樣的字符串格式,或者更復雜的運算,那么你就不好處理了 // 然后這個ScriptEngine類的eval方法就能很好處理這類字符串表達式的問題 "(1+1-(2-4)*2)/24" 本篇主要講講 ConditionRouter(條件路由) 條件路由主要就是根據dubbo管理控制臺配置的路由規則來過濾相關的invoker,當我們對路由規則點擊啟用的時候,就會觸發RegistryDirectory類的notify方法 @Override public synchronized void notify(List urls) { List invokerUrls = new ArrayList(); List routerUrls = new ArrayList(); List configuratorUrls = new ArrayList(); for (URL url : urls) { String protocol = url.getProtocol(); String category = url.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY); if (Constants.ROUTERS_CATEGORY.equals(category) || Constants.ROUTE_PROTOCOL.equals(protocol)) { routerUrls.add(url); } else if (Constants.CONFIGURATORS_CATEGORY.equals(category) || Constants.OVERRIDE_PROTOCOL.equals(protocol)) { configuratorUrls.add(url); } else if (Constants.PROVIDERS_CATEGORY.equals(category)) { invokerUrls.add(url); } else { logger.warn("Unsupported category " + category + " in notified url: " + url + " from registry " + getUrl().getAddress() + " to consumer " + NetUtils.getLocalHost()); } } // configurators if (configuratorUrls != null && !configuratorUrls.isEmpty()) { this.configurators = toConfigurators(configuratorUrls); } // routers if (routerUrls != null && !routerUrls.isEmpty()) { List routers = toRouters(routerUrls); if (routers != null) { // null - do nothing setRouters(routers); } } List localConfigurators = this.configurators; // local reference // merge override parameters this.overrideDirectoryUrl = directoryUrl; if (localConfigurators != null && !localConfigurators.isEmpty()) { for (Configurator configurator : localConfigurators) { this.overrideDirectoryUrl = configurator.configure(overrideDirectoryUrl); } } // providers refreshInvoker(invokerUrls); } 為什么這個notify方法傳入的是List呢? 引用一段官網文檔的描述 > 所有配置最終都將轉換為 URL 表示,并由服務提供方生成,經注冊中心傳遞給消費方,各屬性對應 URL 的參數,參見配置項一覽表中的 "對應URL參數" 列 其實對于 Router 來說,我們最關心的就是他是怎么過濾的.所以下面這些流程代碼我們先走一遍 /** * 將 invokerURL 列表轉換為Invoker Map。 轉換規則如下: * 1. 如果已將URL轉換為invoker,則不再將重新引用該URL且直接從緩存中獲取它,并且請注意,URL中的任何參數更改都將被重新引用。 * 2. 如果傳入的invoker列表不為空,則表示它是最新的invoker列表 * 3. 如果傳入的invokerUrl列表為空,則表示該規則只是覆蓋規則或路由規則,需要重新進行比較以決定是否重新引用。 * * @參數 invokerUrls 此參數不能為空 */ // TODO: 2017/8/31 FIXME 應使用線程池刷新地址,否則可能會積累任務。 private void refreshInvoker(List invokerUrls) { if (invokerUrls != null && invokerUrls.size() == 1 && invokerUrls.get(0) != null && Constants.EMPTY_PROTOCOL.equals(invokerUrls.get(0).getProtocol())) { this.forbidden = true; // 禁止訪問 this.methodInvokerMap = null; // 將方法invoker map設置為null destroyAllInvokers(); //關閉所有invoker } else { this.forbidden = false; // 允許訪問 Map> oldUrlInvokerMap = this.urlInvokerMap; // 本地引用 if (invokerUrls.isEmpty() && this.cachedInvokerUrls != null) { invokerUrls.addAll(this.cachedInvokerUrls); } else { this.cachedInvokerUrls = new HashSet(); this.cachedInvokerUrls.addAll(invokerUrls);// 緩存的invoker網址,便于比較 } if (invokerUrls.isEmpty()) { return; } Map> newUrlInvokerMap = toInvokers(invokerUrls);// Translate url list to Invoker map Map>> newMethodInvokerMap = toMethodInvokers(newUrlInvokerMap); // 更改方法名稱以映射Invoker Map // state change // If the calculation is wrong, it is not processed. if (newUrlInvokerMap == null || newUrlInvokerMap.size() == 0) { logger.error(new IllegalStateException("urls to invokers error .invokerUrls.size :" + invokerUrls.size() + ", invoker.size :0. urls :" + invokerUrls.toString())); return; } this.methodInvokerMap = multiGroup ? toMergeMethodInvokerMap(newMethodInvokerMap) : newMethodInvokerMap; this.urlInvokerMap = newUrlInvokerMap; try { destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap); // Close the unused Invoker } catch (Exception e) { logger.warn("destroyUnusedInvokers error. ", e); } } } /** * 使用方法將invokers列表轉換為映射關系 * * @param invokersMap Invoker Map * @return Mapping relation between Invoker and method */ private Map>> toMethodInvokers(Map> invokersMap) { Map>> newMethodInvokerMap = new HashMap<>(); // 根據provider URL聲明的方法分類,這些方法與注冊表兼容以執行過濾的方法 List> invokersList = new ArrayList>(); if (invokersMap != null && invokersMap.size() > 0) { for (Invoker invoker : invokersMap.values()) { String parameter = invoker.getUrl().getParameter(Constants.METHODS_KEY); if (parameter != null && parameter.length() > 0) { String[] methods = Constants.COMMA_SPLIT_PATTERN.split(parameter); if (methods != null && methods.length > 0) { for (String method : methods) { if (method != null && method.length() > 0 && !Constants.ANY_VALUE.equals(method)) { List> methodInvokers = newMethodInvokerMap.get(method); if (methodInvokers == null) { methodInvokers = new ArrayList>(); newMethodInvokerMap.put(method, methodInvokers); } methodInvokers.add(invoker); } } } } invokersList.add(invoker); } } List> newInvokersList = route(invokersList, null); newMethodInvokerMap.put(Constants.ANY_VALUE, newInvokersList); if (serviceMethods != null && serviceMethods.length > 0) { for (String method : serviceMethods) { List> methodInvokers = newMethodInvokerMap.get(method); if (methodInvokers == null || methodInvokers.isEmpty()) { methodInvokers = newInvokersList; } newMethodInvokerMap.put(method, route(methodInvokers, method)); } } // 排序且不可修改 for (String method : new HashSet(newMethodInvokerMap.keySet())) { List> methodInvokers = newMethodInvokerMap.get(method); Collections.sort(methodInvokers, InvokerComparator.getComparator()); newMethodInvokerMap.put(method, Collections.unmodifiableList(methodInvokers)); } return Collections.unmodifiableMap(newMethodInvokerMap); } 這個條件路由有一個特點,就是他的getUrl是有值的 從這里我們看到,此時實現類是ConditionRouter,由于接下來的邏輯如果直接讓大家看源碼圖可能不夠清晰,所以我又把這個核心的篩選過程用了一個高清無碼圖,并且用序號標注 最后的篩選結果如下,因為我們在管理后臺配置了禁用192.168.56.2,所以最后添加進invokers的就只有192.168.56.3 參考 dubbo源碼解析-router > 本文由博客一文多發平臺 OpenWrite 發布!
來源:OSCHINA
發布時間:2019-11-25 23:43:00
1、hdfs是通過分布式集群來存儲文件,為客戶端提供了一個便捷的訪問方式,就是一個虛擬的目錄結構 2、文件存儲到hdfs集群中去的時候是被切分成block的 3、文件的block存放在若干臺datanode節點上 4、hdfs文件系統中的文件與真實的block之間有映射關系,由namenode管理 5、每一個block在集群中會存儲多個副本,好處是可以提高數據的可靠性,還可以提高訪問的吞吐量
來源:OSCHINA
發布時間:2019-11-25 19:28:00
您是否想加入Apache社區并成為某個項目的Committer或PPMC,擁有一個apache郵箱呢? 你是否知道apache社區的Committer也可以是非代碼貢獻者? 本聯合meetup旨在讓對開源有興趣的伙伴們有機會加入到社區中來,成為一份子,讓自己的青春熱血留下永久痕跡,讓自己的代碼(或者文檔、或者issue等)才華綻放出璀璨的光芒! 活動介紹 如今,開源在中國遍地開花,開源之勢不可擋,Apache社區已經有10多個來自咱們中國本土的開源項目,本次聯合兩個Apache社區項目的用戶以及技術愛好者歡聚一堂,一起分享開源技術,一起為中國本土開源獻力! Apache ShardingSphere(Incubator)是一套開源的分布式數據庫中間件,旨在充分合理地在分布式的場景下利用關系型數據庫的計算和存儲能力,提供標準化的數據分片、分布式事務和數據庫治理功能,可適用于如Java同構、異構語言、云原生等各種多樣化的應用場景。 Apache DolphinScheduler(Incubator)是一個分布式去中心化,易擴展的可視化DAG工作流任務調度系統。致力于解決數據處理流程中錯綜復雜的依賴關系,具有高可靠性(HA)、易擴展、支持絕大多數任務場景、簡單易用(可視化拖拽)等特性,已在數十家公司使用。 特邀請到兩個社區的使用伙伴和Committer等理論+實踐進行干貨分享以及現場交流,活動的最后,會有如何加入Apache社區并成為Committer或PPMC的精彩討論,引導大家成為開源社區的貢獻者,一起為開源獻一份力! 時間地點 沙龍時間:2019-12-08 14:00 沙龍地點: 北京市海淀區海淀大街34號海置創投大廈7層 創業邦 面向人群:對開源技術感興趣的小伙伴均可參與 日程安排 14:00 - 14:40 The integration of DolphinScheduler and containerization (Xiaochun Liu). 《DolphinScheduler與容器化的融合》 趣加游戲 & Committer 劉小春 14:40 - 15:20 Analyzing of Sharding-Proxy principle (Yonglun Zhang). 《Sharding-Proxy原理解析》 京東數科 & PPMC 張永倫 15:20 - 16:00 Migration and application of DolphinScheduler in Baiwang (Shuang Yang). 《DolphinScheduler在百望云的遷移和應用》 大數據平臺部總監 楊爽 16:00-16:10 Break 16:10 - 16:40 The Architecture of ShardingSphere and Roadmap (Juan Pan). 《ShardingSphere的架構及未來規劃》 京東數科高級DBA && PPMC 潘娟 16:40 - 17:20 Roundtable Discussion - How to join the Apache community and to be a committer. 《圓桌討論 - 如何加入Apache社區并且成為Committer》 Free discussion 本次分享的伙伴都是對參與開源社區很有經驗的伙伴,圓桌討論環節更是精心為大家準備,還在猶豫什么呢,趕快掃下圖中的二維碼來現場交流吧
來源:OSCHINA
發布時間:2019-11-25 18:47:00
BI報表工具很有意思的一點是:不管是老員工還是新入職的,一眼就能發現數據中的問題,進而針對這個問題從不同角度進行深入的分析挖掘,從問題產生的原因(包括時間、地點、經手人員等明細)到問題造成的影響等,都能在幾個點擊中快速挖掘出來??梢哉f,用BI報表工具分析數據,就算是新入職的都能將問題的產生、發展過程一一找出來。 覺得不可能?太夸張了?不如看看以下兩張報表感受一下BI報表帶來的直觀形象的數據分析效果。 以上便是采用奧威BI報表工具制作的BI報表效果。圖1為醫院管理駕駛艙,圖二為奧威小鎮分析報表,不管是那一個都能讓瀏覽者對數據有較為清晰直觀的感受。當然以上為截圖,具體的自助式數據分析效果大家感受不到。一般來說,即便是在BI報表中沒采用智能鉆取、高效聯動等可視化分析功能,瀏覽者也可通過點擊左上角的小圖標快速調出數據集構建器,自助式分析挖掘數據。如更改字段與維度組合,如修改數據排序等。又或者通過右上角調出篩選按鈕,自行篩選數據來實現自助式數據分析挖掘。 SpeedBI數據分析云就是奧威BI報表工具系列中支持用戶自由選擇云端、私有部署,人人都能輕松上手的智能可視化分析報表。在該BI報表工具中,用戶只需上傳數據即可在前端快速完成BI報表,復雜多變的數據運算分析也好,細致的UI調整也好,都能通過SpeedBI數據分析云平臺快速完成。如向來以核算科目多變、運算組合多變、數量多而阻礙分析的財務運算,也能借助奧威BI報表工具獨有行計算模型,在前端輕松實現。 不僅是行計算模型,SpeedBI數據分析云同樣提供無縫對接主流ERP的奧威BI標準解決方案、針對不同行業共性而量身定制的奧威BI行業解決方案,支持報表語言多樣化,支持AI取數,支持不同終端(大屏、電腦、手機)等,支持用戶隨時隨地打開BI報表,獲取關鍵數據,更及時發現并解決問題。 http://www.powerbi.com.cn/
來源:OSCHINA
發布時間:2019-11-25 11:23:00
分析思維這種事,即便是面對同一份數據、同一份報表,不同的人看到的、接下去想要看到的都很可能完全不一樣,但一般分析報表也就只有一個分析視角,不管是你在這張報表中發現什么問題,想繼續分析研究那些數據都只能另外再做一張報表,不僅跟不上你的分析思維,還可能打斷你的分析。但是,如果是BI報表,它能自動跟上你的分析思維,你想分析研究那些數據,想看哪些方面的數據分析,BI報表下一秒就能呈現出來。 怎么確保BI 報表準確跟上瀏覽者的分析思維? 構建強大數據中心,確保數據隨傳隨到 BI報表要跟上瀏覽者的分析思維,前提之一是想要什么數據就能秒速調取什么數據,這就要求BI報表功能擁有功能強大、反應靈敏的數據中心,將多個業務系統的主數據和交易數據全部打通,消除信息孤島,統一數據分析口徑。 簡單來說這個數據中心就像一個數據中轉站,各種各樣的數據匯聚到這里,并進行統一整理清洗,當前端傳來數據調取分析指令時就能立即投入數據調取分析中。 高效智能的可視化分析系統,確保數據可視化分析的高效、高質 數據中心將數據統一整理清洗,可視化分析系統則負責智能分析挖掘,并通過直觀易懂的方式呈現數據。就如在奧威BI報表工具上,當用戶將數據上傳后,只需在前臺進行簡單的操作(通常為點擊、拖拉拽),下達數據分析指令后,系統將自主完成數據抽取、分析、挖掘的整個過程,并且僅需一兩秒就能以直觀易懂的圖像化分析圖表呈現在電腦屏幕上。 落地多維動態分析功能,確保瀏覽者隨時能自主分析挖掘數據 在BI報表跟上瀏覽者分析思維,隨時呈現瀏覽者想要的分析角度、分析內容效果上,多維動態分析功能是一個不可忽視的關鍵功能,正是因為有了多維動態分析功能,瀏覽者才能隨時自定義字段與維度組合,隨時以瀏覽者的身份篩選數據、層層鉆取BI報表或數據明細等。 奧威BI報表工具所制作的BI報表,不僅支持大屏、平板、電腦、手機,自動適應不同屏幕大小,以最佳方式展現數據,呈現數據,更重要的是,不管在那一個終端,用戶都能實現自助式數據可視化分析。同一張報表,在不同人手上都能按照瀏覽者的要求快速呈現瀏覽者所要的數據可視化分析,一張BI報表能呈現多少內容,能從多少個角度對數據進行多方面的分析挖掘都取決于瀏覽者自身。 如果說以前的分析報表,是人在適應分析報表的話,那么在奧威 BI 報表工具上,就是BI 報表在隨時隨地適應人,跟隨人的分析思維變化而改變。 奧威BI報表工具不僅圍繞用戶實際分析需求自主開發了多項高效、智能的可視化分析功能,提供豐富直觀的可視化分析圖表,更可實現 “ BI+ ”模式 ,也就是在奧威BI報表工具的基礎上,落地奧威BI獨有的BI解決方案。無縫對接主流ERP,預設分析模型,預設前端BI報表樣式,僅需做必要個性化設計,僅需針對來源業務系統修改部分ETL腳本(基本基本的SQL能力即可)。甚至對于金蝶、用友標準解決方案來說,1天就能出方案,真正的0開發。 奧威BI報表工具具體有哪些,服務范圍有什么區別?哪款更適合我呢?有沒有相關的可視化分析功能體驗頁面?……登錄奧威BI官方網站的相關頁面,了解一下哪款BI報表工具更適合自己吧! http://www.powerbi.com.cn/
來源:OSCHINA
發布時間:2019-11-29 10:18:00
svg水球圖演示效果 如上述的動態水球圖效果,替換不同的svg圖片即可實現不同的動態水球效果。 svg格式說明(百度百科) SVG是一種圖像文件格式,它的英文全稱為Scalable Vector Graphics,意思為可縮放的矢量圖形。它是基于XML(Extensible Markup Language),由World Wide Web Consortium(W3C)聯盟進行開發的。嚴格來說應該是一種開放標準的矢量圖形語言,可讓你設計激動人心的、高分辨率的Web圖形頁面。用戶可以直接用代碼來描繪圖像,可以用任何文字處理工具打開SVG圖像,通過改變部分代碼來使圖像具有交互功能,并可以隨時插入到HTML中通過瀏覽器來觀看。 svg圖片的制作或獲取 1.創建path形狀的方法,利用Illustartior定制你自己的個性化圖標(需要有一定的設計基礎); 2.素材網下載svg格式,如阿里巴巴旗下iconfont;或下載ai格式,而后直接打開到Illustartor編輯; 3.右鍵查看路徑,把所有可釋放的復合路徑全部釋放,只保留要一個路徑,也只能用一個路徑; 4.全部選中右鍵菜單里選擇→建立復合路徑( 復制 path 標簽內的 d 屬性的值,如果有多個,則拼接到一起,然后粘貼替換 symbols 里面的路徑,也就); 5.文件-導出-導出為 -保存類型【選擇SVG】-點擊按鈕導出; 6.SVG文件右鍵菜單選擇記事本打開,效果如下: 7.svg代碼將顯示出來,找到path標簽 d=“復制這里的代碼”; svg水球圖的Echarts代碼 option = { backgroundColor : "#000" , title : { text : 'Mouse Beautiful' , textStyle : { fontWeight : 'normal' , fontSize : 25 , color : '#fff' } } , series : [ { type : 'liquidFill' , data : [ 0.7 ] , radius : '90%' , waveLength : '30%' , waveHeight : '10' , amplitude : 20 , outline : { show : false } , backgroundStyle : { color : '#333' , borderColor : '#000' , borderWidth : 2 , shadowColor : 'rgba(0, 0, 0, 0.4)' , shadowBlur : 20 } , //path代碼粘貼到此處,代碼的多少取決于圖形的復雜度 shape : 'path://M1185 32L1182 34L1181 40L1197 56L1198 73L1196 75L1191 91L1186 100L1185 106L1174 113L1176 119L1174 125L1179 133L1185 134L1185 143L1192 147L1195 154L1194 160L1196 162L1197 175L1202 183L1204 193L1211 198L1211 200L1197 209L1191 210L1189 214L1185 217L1179 228L1185 240L1181 241L1182 248L1180 250L1178 249L1178 246L1175 247L1162 234L1158 234L1157 236L1159 243L1156 247L1165 259L1167 259L1168 262L1174 267L1173 269L1171 268L1168 274L1178 279L1180 286L1182 288L1180 292L1183 294L1183 297L1186 301L1183 303L1181 302L1177 295L1169 299L1166 296L1163 297L1165 299L1165 304L1161 306L1158 305L1152 312L1144 310L1140 307L1136 308L1132 305L1127 307L1129 311L1128 314L1125 310L1124 305L1119 306L1114 312L1111 312L1109 308L1107 308L1103 317L1092 328L1094 333L1090 336L1087 335L1085 337L1085 343L1087 346L1091 349L1092 347L1094 348L1090 353L1086 355L1083 354L1082 351L1082 354L1080 356L1073 356L1063 363L1045 370L1042 370L1035 363L1024 365L1021 367L1012 367L1012 383L1016 392L1019 394L1024 390L1027 390L1034 396L1031 413L1024 423L1024 425L1029 422L1031 422L1031 424L1029 429L1019 430L1015 434L1003 470L999 472L992 472L995 463L999 460L998 455L990 456L984 468L981 471L977 471L965 467L959 470L959 468L956 467L957 460L955 454L958 452L958 449L956 447L953 446L947 449L939 457L928 453L927 446L923 444L920 439L921 428L925 418L924 413L922 413L922 411L912 411L909 397L905 395L892 399L886 398L896 393L898 390L896 385L890 385L885 387L880 382L865 382L861 379L856 380L845 388L842 382L835 377L830 369L826 366L823 355L818 350L815 350L807 354L786 369L780 368L772 357L775 354L775 350L779 340L777 336L772 334L771 331L775 324L771 322L767 317L768 311L764 304L766 299L755 298L754 295L757 294L757 292L752 288L741 283L732 281L729 278L720 278L719 273L708 260L702 256L701 253L703 238L699 230L698 222L705 205L702 199L695 193L693 178L706 168L713 155L721 155L726 152L727 145L729 143L733 141L744 142L746 138L755 133L753 131L753 121L756 111L757 99L753 92L743 88L740 88L731 93L724 92L730 77L735 72L739 71L744 67L749 60L756 59L762 44L754 32L750 30L739 30L736 27L743 14L747 11L760 9L762 7L767 -8L775 -14L786 -19L800 -33L804 -31L806 -16L816 -5L818 14L822 16L835 12L848 17L853 24L861 44L866 50L877 56L886 56L891 54L899 47L899 39L890 23L891 16L903 2L926 -11L936 -10L939 -6L944 10L947 11L949 7L965 -2L972 -12L980 -37L987 -43L992 -44L995 -43L1000 -36L1000 -25L1002 -19L1010 -17L1024 -20L1029 -11L1030 -5L1034 1L1040 9L1048 15L1056 15L1057 9L1051 -6L1052 -9L1059 -12L1071 -10L1095 -1L1100 5L1102 17L1110 29L1139 23L1148 30L1156 32L1167 29L1169 23L1175 17L1177 17L1186 23L1183 30L1184 31z' , color : [ 'rgba(255,255,0,0.3)' ] , //水波的顏色 對應的是data里面值 label : { normal : { formatter : '70%' , } } } ] } ; Done!
來源:OSCHINA
發布時間:2020-07-18 09:29:00
一、創建數組: var dataName = [ "A" , "B" , "C" , "D" , "E" ] ; var datalabel = [ 100 , 2 , 3 , 12 , 13 ] ; var data = [ 18203 , 23489 , 29034 , 104970 , 131744 ] ; 二、設置option var option = { tooltip : { trigger : 'axis' , axisPointer : { type : 'shadow' } } , grid : { left : '3%' , right : '4%' , bottom : '3%' , containLabel : true } , xAxis : { type : 'value' , boundaryGap : [ 0 , 0.01 ] } , yAxis : { type : 'category' , data : dataName , axisLabel : { interval : 0 , color : '#666' , align : 'right' , fontSize : 13 , } } , series : [ { name : '漏刻有時' , type : 'bar' , itemStyle : { normal : { barBorderRadius : 5 , } , } , label : { show : true , position : "right" , formatter : function ( params ) { console . log ( params . dataIndex ) ; return '總金額:' + data [ params . dataIndex ] + '元\n\n總數量:' + datalabel [ params . dataIndex ] + '個' } } , data : data } , ] } ; 三、重點解讀: label : { show : true , position : "right" , formatter : function ( params ) { //console.log(params.dataIndex); return '總金額:' + data [ params . dataIndex ] + '元\n\n總數量:' + datalabel [ params . dataIndex ] + '個' } } Done!
來源:OSCHINA
發布時間:2020-07-18 09:29:00
python腳本方式執行spark程序,好處是不用編譯,寫完就走! 示例腳本如下: from pyspark import SparkConf, SparkContext conf = SparkConf().setAppName("myTest").setMaster("local") sc = SparkContext(conf=conf) x = [1,2,3] rdd = sc.parallelize(x) count=rdd.count() print("len=",count) # read textfile rdd2=sc.textFile("c:\\spark\\doc\\word.txt") def f(x):return print(x) rdd2.foreach(f) print("rdd2:",rdd2.count()) 保存為"test1.py"文件。然后執行spark-submit test1.py提交執行即可。 pyspark比scala方式要方便多了。 word.txt內容: hello world 1 執行結果: len= 3 hello workd 1 rdd2: 3
來源:OSCHINA
發布時間:2020-07-17 17:39:00
隨著科技的發展,大數據已變成信息通信業的主要資源及主要運用。物聯網技術、云計算技術、移動互聯、車聯網平臺、手機、平板、PC及遍及地球每個角落里的傳感器,無一不是數據來源或者承載方,“大數據”被視作云計算技術之后的又一高新科技網絡熱點,大伙兒都在講:智慧旅游的發展趨勢離不了大數據,借助大數據提供充足有益的資源,智慧旅游才可以得以“智慧”發展趨勢,那么大數據如何助推智慧旅游呢?大數據打造智慧旅游有什么益處? 大數據打造智慧旅游的4點益處 1、大數據打造智慧旅游,工作人員應用管理系統大數據集成化技術,剖析旅客總流量、商家經營、公共文化服務、咨詢舉報等旅游綜合信息,對景區展開實時監測,對出現異常指標值展開預警信息,盡快為旅客、商家等出示服務項目。持續加速智慧旅游大城市建設腳步,以智慧旅游建設和大數據運用為主線,積極主動推動信息科技和旅游產業融合發展趨勢,積極開展智慧旅游尤其是景區智慧化提高工作能力。 旅游局(旅游管委會)或景區創建旅游網絡輿情監測系統軟件,對主要新聞媒體、社區論壇、博客、新浪微博等方式輿情信息展開動態性監管,將海量數據依照信息的正負面信息、知名度、信息內容特性及時間等展開歸類,獲取基本信息,按時自動生成相對匯報,依照預訂對策對潛在性的危機事件及時預警和處理。 數據信息綜合分析展現服務平臺根據運用旅游局、景區等多年經營及從第三方選購的大數據基礎上,打造實時數據統計分析的信息化管理展現服務平臺,根據對領域、旅客等信息內容展開多層次的精確剖析和合理預測分析,能夠 為客戶出示經營管理決策、輿情分析、惡性事件預警信息,另外能夠 根據合理融合旅游管控數據信息、旅游行業大數據,為政府部門,旅游公司制訂宣傳策劃營銷戰略出示合理的數據信息支撐點,真實完成“智慧旅游”。 2、大數據打造智慧旅游,在我國公布的《“十三五”全國旅游信息化規劃》明確提出,要“推動旅游大數據應用,推動新驅動”,要“用大數據對旅客信息內容展開相關性分析,提升旅游公共文化服務資源配備”,要“數據共享,注重產業生態圈旅游的共享發展”,這說明旅游大數據的時期已到來。在大眾旅游時期,產業生態圈旅游發展趨勢不能再借助理性經驗,而必須借助大數據助推管理決策。 以便提高景區的管理力度和現代化管理能力,并打造智慧旅游景區的總體目標,提出了景區員工管理與客流分析系統軟件,選用視頻數據分析系統的方法完成人流量的數據分析。另外選用面部識別技術和智能視頻無損檢測技術完成景區進出口人臉識別入園和景區風險地區警報提示等。 3、大數據打造智慧旅游,將為智慧旅游服務保障,為智慧旅游發展趨勢引入新的魅力驅動力。借助大數據出示的有益資源,推動智慧旅游完成穩步發展。大數據將以更科學、更簡易、更智慧的方法促進政府部門監管、企業經營和旅客消費管理決策。中國移動大數據將助推智慧旅游發展趨勢完成質的提升,助推智慧旅游騰飛。 4、大數據打造智慧旅游,微信客戶端顧客有其龐大的顧客規模和網絡通信的實用性,不但確保了數據信息出示的實用性和可持續性,還鑄就了大量和多樣性的數據信息。這種數據信息和景區視頻監控系統數據信息、金融大數據、實時路況數據信息、景區運作數據信息等旅游數據信息緊密結合,可精確清楚地剖析出人流量來源、旅游運動軌跡、熱力地圖、旅游喜好等內容。根據數據整理解決和深層發掘技術,就能掌握旅游領域的行業動態、旅客消費行為、旅游公司運行情況,進而正確引導旅游市場健康有序發展。 根據旅游基礎數據庫查詢的建設,考慮旅游大數據解決的運用要求,對景區各軟件系統的數據信息展開統一監管,提升信息內容發掘與運用,并產生有關的信息化管理建設的規范與標準。以互聯網技術為基礎,融合多樣信息科技方式,創建旅游基礎信息共享及互換平臺,產生統一的旅游基礎數據庫查詢及旅游資源共享互換管理中心,支撐主題風格數據庫查詢運用和部門協作旅游信息內容資源聚集、互換和共享。 現如今,智慧旅游大數據讓傳統式旅游更為聰慧,將景區通過剖析大數據,發覺每一個時間點的主要旅客人群,根據合理的方法推送給潛在旅客,吸引他們的前來。這不僅精準營銷,減少無用的營銷推廣成本費,讓景區的運營管理更為簡單輕松。
來源:OSCHINA
發布時間:2020-07-17 14:04:00
https://docs.cloudera.com/documentation/enterprise/5-8-x/topics/impala_resource_management.html https://blog.csdn.net/silentwolfyh/article/details/83549202 0440-如何啟用Impala的動態資源池:https://blog.csdn.net/Hadoop_SC/article/details/104350431 0441-Impala動態資源池及放置規則使用: https://blog.csdn.net/Hadoop_SC/article/details/104350416?utm_medium=distribute.pc_relevant.none-task-blog-baidujs-1 Cloudera Management 的Impala Admission Control impala開啟資源管理,這里不依賴YARN的資源管理。 需要開啟Impala的Admisson Control 保存配置后重啟Impala服務,以上就完成了Impala動態資源池的啟用。 3.進入Impala動態資源池管理界面 4.點擊”Impala Admission Control”,進入資源池配置界面 1.Impala的Admission Control功能主要是為了限制用戶提交SQL的并發數,以避免集群繁忙內存不足的情況。當集群的查詢太多或查詢需要的總內存太多,達到一個閾值時,提交的SQL將進入等待狀態,當集群資源可用時才會開始查詢。 2.Impala的動態資源池與Yarn動態資源池一致,可用創建多個不同的資源池、創建不同的執行計劃以及設置放置規則。 3.Impala中的資源池的層級只支持兩級,父級資源池均為root https://docs.cloudera.com/documentation/enterprise/6/6.3/topics/impala_howto_rm.html#enable_admission_control 此功能僅在“啟用 ResourceManager ACL” 設置為 true 且“管理 ACL” 未設置為 * 時相關。(請參見頂級頁面中的“訪問控制設置”。) 關于Impala 動態資源池 放置規則類型的解釋說明: root.[pool name]:該規則始終滿足,在其它規則不匹配的情況下使用,因此該規則默認要放置在所有匹配規則之后。 root.[primary group]:該放規則使用與該用戶主要組匹配的資源池。Linux中用戶默認的主要組與用戶名一致,匹配時會通過用戶的主要組與資源池名稱比對。 root.[secondarygroup]:該放置規則用于匹配用戶的次要組,使用與次要組之一匹配的資源池。 root.[username]:該放置規則用于匹配與用戶名一致的資源池。(不推薦使用) 已在運行時指定:該放置規則主要使用在運行時指定的資源池。 放置規則的判斷方式,根據放置規則的順序1、2、3…進行判斷,判斷到滿足條件的放置規則后,后續的規則不再進行匹配。
來源:OSCHINA
發布時間:2020-07-16 16:04:00
網羅數據集,不定期更新! 數據集鏈接:https://pan.baidu.com/s/1RgmRv80zQB71HSze8bQvwA 提取碼:ih2c 酒品數據集( wine.csv) 數據格式:wine.csv 標簽:有 語言: 英文 Wine Alcohol Malic.acid Ash Acl Mg Phenols Flavanoids Nonflavanoid.phenols Proanth Color.int Hue OD Proline X: [1:] 13個feature y: [0] 3分類(1 2 3) 數據大?。?0.8k,178條數據 數據用途:多分類任務 預處理代碼: 構造Dataset & Dataloader : https://my.oschina.net/u/4228078/blog/4320363 純英文預料數據集(text8.zip) 數據格式:text8.train.txt text8.dev.txt text8.test.txt 英文數據集,無標點無換行 標簽:無 語言: 英文 數據大?。?5M 數據用途:文本分析 預處理代碼: https://my.oschina.net/u/4228078/blog/4405730 項目:語言模型實現: https://my.oschina.net/u/4228078/blog/4462382 名字-國家數據集(names.csv.gz.zip) 數據格式:names_train.csv.gz names_test.csv.gz csv的壓縮gz格式文件 第一列[0]:人名 第二列[1]:人名對應的國家 標簽:有 語言: 英文 數據大?。簍rain:13374條數據 test:6700條數據 數據用途:根據人名預測國籍 預處理代碼: https://my.oschina.net/u/4228078/blog/4415324 青云數據集(qingyun.tsv) 數據格式:qingyun.tsv 第一列[0]:問題 第二列[1]:回答 標簽:有 語言: 中文 數據大?。?05914條對話 數據用途:開放式聊天機器人 預處理代碼: 英文-中文翻譯數據集(translate_en2cn) 數據格式:英文+'\t' + 中文 標簽:有 語言: 中英文 數據大?。?.1M 數據用途:機器翻譯 預處理代碼: https://my.oschina.net/u/4228078/blog/4471073 圖片數據集(ants, bees) 數據格式:圖片 標簽:有 語言: 數據大?。?00張圖片 數據用途:圖片分類 預處理代碼:
來源:OSCHINA
發布時間:2020-07-16 15:22:00
本文首發于 vivo互聯網技術 微信公眾號 鏈接: https://mp.weixin.qq.com/s/qayKiwk5QAIWI7-nyD3FVA 作者:DuZhimin 隨著互聯網、尤其是物聯網的發展,我們需要把各種類型的終端實時監測、檢查與分析設備所采集、產生的數據記錄下來,在有時間的坐標中將這些數據連點成線,往過去看可以做成多緯度報表,揭示其趨勢性、規律性、異常性;往未來看可以做大數據分析,機器學習,實現預測和預警。 這些數據的典型特點是: 產生頻率快 (每一個監測點一秒鐘內可產生多條數據)、 嚴重依賴于采集時間 (每一條數據均要求對應唯一的時間)、 測點多信息量大 (實時監測系統均有成千上萬的監測點,監測點每秒鐘都產生數據,每天產生幾十GB的數據量)。 基于時間序列數據的特點,關系型數據庫無法滿足對時間序列數據的有效存儲與處理,因此迫切需要一種專門針對時間序列數據來做優化處理的數據庫系統。 一、簡介 1、時序數據 時序數據是基于時間的一系列的數據。 2、時序數據庫 時序數據庫就是存放時序數據的數據庫,并且需要支持時序數據的快速寫入、持久化、多緯度的聚合查詢等基本功能。 對比傳統數據庫僅僅記錄了數據的當前值,時序數據庫則記錄了所有的歷史數據。同時時序數據的查詢也總是會帶上時間作為過濾條件。 3、OpenTSDB 毫無遺漏的接收并存儲大量的時間序列數據。 3.1、存儲 無需轉換,寫的是什么數據存的就是什么數據 時序數據以毫秒的精度保存 永久保留原始數據 3.2、擴展性 運行在Hadoop 和 HBase之上 可擴展到每秒數百萬次寫入 可以通過添加節點擴容 3.3、讀能力 直接通過內置的GUI來生成圖表 還可以通過HTTP API查詢數據 另外還可以使用開源的前端與其交互 4、OpenTSDB核心概念 我們來看一下這樣一段信息:2019-12-5 22:31:21版本號為‘3.2.1’的某產品客戶端的首頁PV是1000W Metric: 指標,即平時我們所說的監控項。譬如上面的PV Tags: 維度,也即標簽,在OpenTSDB里面,Tags由tagk和tagv組成的鍵值對,即tagk=takv。標簽是用來描述Metric的,比如上面的 某產品 客戶端的版本號 version=‘3.2.1’ Value :一個Value表示一個metric的實際數值,比如:1000W Timestamp: 即時間戳,用來描述Value是什么時候發生的:比如:2019-12-5 22:31:21 Data Point: 即某個Metric在某個時間點的數值,Data Point包括以下部分:Metric、Tags、Value、Timestamp 保存到OpenTSDB的數據就是無數個DataPoint 上面描述2019-12-5 22:31:21版本號為‘3.2.1’的 某產品 客戶端的首頁PV是1000W,就是1個DataPoint。 二、OpenTSDB的部署架構 1、架構圖 2、說明OpenTSDB底層是使用HBase來存儲數據的,也就是說搭建OpenTSDB之前,必須先搭建好HBase環境。 OpenTSDB是由一系列的TSD和實用的命令行工具組成。 應用通過運行一個或多個tsd(Time Series Daemon, OpenTSDB的節點)來與OpenTSDB的交互。 每個TSD是獨立的,沒有master,沒有共享狀態,所以你可以運行盡可能多的 TSD 來處理工作負載。 三、HBase簡介 從OpenTSDB的部署架構中我們看到OpenTSDB是建立在HBase之上的,那么HBase又是啥呢?為了更好的剖析OpenTSDB,這里我們簡要介紹一下HBase。 1、HBase是一個高可靠性、強一致性、高性能、面向列、可伸縮、實時讀寫的分布式開源NoSQL數據庫。 2、HBase是無模式數據庫,只需要提前定義列簇,并不需要指定列限定符。同時它也是無類型數據庫,所有數據都是按二進制字節方式存儲的。 3、它把數據存儲在表中,表按“行鍵,列簇,列限定符和時間版本”的四維坐標系來組織,也就是說如果要唯一定位一個值,需要四個都唯一才行。下面參考Excel來說明一下: 4、對 HBase 的操作和訪問有 5 個基本方式,即 Get、Put、Delete 和 Scan 以及 Increment,HBase 基于非行鍵值查詢的唯一途徑是通過帶過濾器的掃描。 5、數據在HBase中的存儲(物理上): 6、數據在HBase中的存儲(邏輯上): 四、 支撐OpenTSDB運行的HBase表 如果你第一次用你的HBase實例運行OpenTSDB,需要創建必要的HBase表,OpenTSDB 運行僅僅需要四張表:tsdb, tsdb-uid, tsdb-tree 和 tsdb-meta,所有的DataPoint 數據都保存在這四張表中,建表語句如下: 1、tsdb-uidcreate 'tsdb-uid', {NAME => 'id', COMPRESSION => 'NONE', BLOOMFILTER => 'ROW', DATA_BLOCK_ENCODING => 'PREFIX_TREE'}, {NAME => 'name', COMPRESSION => 'NONE', BLOOMFILTER => 'ROW', DATA_BLOCK_ENCODING => 'PREFIX_TREE'} 2、tsdbcreate 'tsdb', {NAME => 't', VERSIONS => 1, COMPRESSION => 'NONE', BLOOMFILTER => 'ROW', DATA_BLOCK_ENCODING => 'PREFIX_TREE'} 3、tsdb-treecreate 'tsdb-tree', {NAME => 't', VERSIONS => 1, COMPRESSION => 'NONE', BLOOMFILTER => 'ROW', DATA_BLOCK_ENCODING => 'PREFIX_TREE'} 4、tsdb-metacreate 'tsdb-meta', {NAME => 'name', COMPRESSION => 'NONE', BLOOMFILTER => 'ROW', DATA_BLOCK_ENCODING => 'PREFIX_TREE'} 后面將對照實際數據來專門講解這四張表分別存儲的內容。 五、 OpenTSDB是如何把一個數據點保存到HBase中的呢? 1、首先檢查一下四個表里面的數據 從上面看,四個表里面的數據都是空的 2、然后我們往OpenTSDB寫一個數據點@Test public void addData() { String metricName = "metric"; long value = 1; Map tags = new HashMap(); tags.put("tagk", "tagv"); long timestamp = System.currentTimeMillis(); tsdb.addPoint(metricName, timestamp, value, tags); System.out.println("------------"); } 3、插入數據之后我們再來查看一下四個表數據 發現HBase里面有數據,在tsdb-uid、tsdb、和 tsdb-meta 表里面有數據,而tsdb-tree 表里面沒任何數據,下面我們針對這些數據做一下具體分析。 4、tsdb-tree表 它是一張索引表,用于展示樹狀結構的,類似于文件系統,以方便其他系統使用,這里我們不做深入的分析。 通過配置項tsd.core.tree.enable_processing來打開是否需要往此表里面寫入數據。 5、tsdb-meta表 這個表是OpenTSDB中不同時間序列的一個索引,可以用來存儲一些額外的信息,該表只有一個列族name,兩個列,分別為ts_meta、ts_ctr。這個表里面的數據是可以根據配置項配置來控制是否生成與否,生成幾個列,具體的配置項有: tsd.core.meta.enable_realtime_ts tsd.core.meta.enable_tsuid_incrementing tsd.core.meta.enable_tsuid_tracking Row Key 和tsdb表一樣,其中不包含時間戳,[...] ts_meta Column 和UIDMeta相似,其為UTF-8編碼的JSON格式字符串 ts_ctr Column 計數器,用來記錄一個時間序列中存儲的數據個數,其列名為ts_ctr,為8位有符號的整數。 6、tsdb-uid表數據分析 tsdb-uid用來存儲UID映射,包括正向的和反向的。存在兩列族,一列族叫做name用來將一個UID映射到一個字符串,另一個列族叫做id,用來將字符串映射到UID。列族的每一行都至少有以下三列中的一個: metrics 將metric的名稱映射到UID tagk 將tag名稱映射到UID tagv 將tag的值映射到UID 如果配置了metadata,則name列族還可以包括額外的metatata列。 6.1、id 列族 Row Key: 實際的指標名稱或者tagK或者tagV Column Qualifiers: metrics、tagk、tagv三種列類型中一種 Column Value : 一個無符號的整數,默認是被編碼為3個byte,自增的數字,其值為UID 6.2、name 列族 Row Key : UID,就是ID列簇的值 Column Qualifiers: metrics、tagk、tagv、metrics_meta、tagk_meta、tagv_meta六種列類型中一種,*_meta是需要開啟tsd.core.meta.enable_realtime_uid才會生成 Column Value: 與UID對應的字符串,對于一個*_meta列,其值將會是一個UTF-8編碼的JSON格式字符串。不要在OpenTSDB外部去修改該值,其中的字段順序會影響CAS調用。 7、tsdb表: 時間點數據就保存在此表中,只有一個列簇t: 7.1、RowKey格式 UID: 默認編碼為3 Bytes,而時間戳會編碼為4 Bytes salt: 打散同一metric不同時間線的熱點 metric, tagK, tagV: 實際存儲的是字符串對應的UID(在tsdb-uid表中) timestamp: 每小時數據存在一行,記錄的是每小時整點秒級時間戳 7.2、Column格式 column qualifier 占用2 Bytes或者4 Bytes, 占用2 Bytes時表示以秒為單位的偏移,格式為: 12 bits:相對row表示的小時的delta, 最多2^ 12 = 4096 > 3600因此沒有問題 1 bit: an integer or floating point 3 bits: 標明數據的長度,其長度必須是1、2、4、8。000表示1個byte,010表示2byte,011表示4byte,100表示8byte 占用4 Bytes時表示以毫秒為單位的偏移,格式為: 4 bits:十六進制的1或者F 22 bits:毫秒偏移 2 bit:保留 1 bit: an integer or floating point,0表示整數,1表示浮點數 3 bits: 標明數據的長度,其長度必須是1、2、4、8。000表示1個byte,010表示2byte,011表示4byte,100表示8byte 7.3、value value 使用8 Bytes存儲,既可以存儲long,也可以存儲double。 7.4、tsdb表設計的特點: metric和tag映射成UID,不存儲實際字符串,以節約空間。 每條時間線每小時的數據點歸在一行,每列是一個數據點,這樣每列只需要記錄與這行起始時間偏移,以節省空間。 每列就是一個KeyValue。 六、 寫在最后 1、應用場景作為時序數據庫,OpenTSDB 不僅僅可以提供原始數據的查詢,并且還支持對原始數據的聚合能力,支持過濾、過濾之后的聚合計算 。 支持降采樣查詢,比如原始數據是1分鐘一個數據點,如果我想1個小時一個數據點進行展示,也能支持。 支持根據維度分組查詢,比如我有一個中國地市的數據,現在我想根據省份進行分組之后查詢,也能支持。 2、使用注意事項OpenTSDB 默認情況下的字符集是ISO-8859-1,為什么會使用這個字符集呢,是因為它的編碼是單字節編碼,編碼后的長度是固定的,如果要支持中文,需要對源碼進行編譯,修改為UTF-8即可。 默認提供的HBase建表語句是沒有預分區的,這樣會導致大批量數據寫入的時候有熱點問題,建議進行預分區。 OpenTSDB不適合超大數據量,在千萬級、億級中提取幾萬條數據,比如某個指標半年內的5分鐘級別的數據,還是很快響應的。但如果再提取多點數據,幾十萬,百萬這樣的量級,又或者提取后再做個聚合運算,OpenTSDB 就勉為其難,實際使用的時候用作服務端機器的監控無任何問題,如果作為客戶端APP監控,響應就比較遲緩。 OpenTSDB 只有4 張HBase 表,所有的數據都存放在一張表,這就意味在OpenTSDB 這個層級上是無法更小的粒度來區別對待不同業務,比如不同的業務建不同的表存儲數據。 OpenTSDB 支持實時聚合計算功能,但是基于單點,所以運算能力有限。 3、展望 如果需要支持特大批量時序數據,建議使用Druid或InfluxDB,其中InfluxDB是最易用的時序數據庫。 更多內容敬請關注 vivo 互聯網技術 微信公眾號 注:轉載文章請先與微信號: Labs2020 聯系。
來源:OSCHINA
發布時間:2020-07-16 10:12:00
1.?業務需求 接收實時數據流數據,實時更新狀態,并且每隔一定的時間,將所有狀態數據輸出。 實時數據類型:("張", 1) 狀態更新:第一個元素為key,將第二個元素全部緩存起來,放到list中,最后將key和其對應的list全部輸出。 2. 實現方案 使用processFunction算子,在processElement函數中僅注冊一次定時器,然后在onTimer函數中處理定時器任務,并且重新注冊定時器。 3.?實現代碼 3.1?source /** * 每隔1秒發送一個tuple2類型的數據,第一個字段值為隨機的一個姓氏,第二個字段為自增的數字 **/ class MySourceTuple2 extends SourceFunction[(String, Long)] { var isRunning: Boolean = true val names: List[String] = List("張", "王", "李", "趙") private val random = new Random() var number: Long = 1 override def run(ctx: SourceFunction.SourceContext[(String, Long)]): Unit = { while (true) { val index: Int = random.nextInt(4) ctx.collect((names(index), number)) number += 1 Thread.sleep(1000) } } override def cancel(): Unit = { isRunning = false } } 3.2?流處理 object TimerMain2 { def main(args: Array[String]): Unit = { val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) env .addSource(new MySourceTuple2) .keyBy(_._1) .process(new KeyedProcessFunction[String, (String, Long), String] { //緩存流數據 private val cache: mutable.Map[String, ListBuffer[Long]] = mutable.Map[String, ListBuffer[Long]]() private var first: Boolean = true /** * 定時器觸發時回調該函數 * * @param timestamp 定時器觸發時間 */ override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[String, (String, Long), String]#OnTimerContext, out: Collector[String]): Unit = { println("定時器觸發:" + timestamp) //將緩存中的數據組織成需要的格式 val builder = new StringBuilder() for (entry: (String, ListBuffer[Long]) <- cache) { builder.append(entry._1).append(":") for (ele <- entry._2) { builder.append(ele).append(",") } builder.delete(builder.size - 1, builder.size).append(";") cache(entry._1).clear() } println("定時器注冊:" + timestamp) //該定時器執行完任務之后,重新注冊一個定時器 ctx.timerService().registerProcessingTimeTimer(timestamp + 5000) out.collect(builder.toString()) } /** * 處理每一個流數據 */ override def processElement(value: (String, Long), ctx: KeyedProcessFunction[String, (String, Long), String]#Context, out: Collector[String]): Unit = { //僅在該算子接收到第一個數據時,注冊一個定時器 if (first) { first = false val time: Long = System.currentTimeMillis() println("定時器第一次注冊:" + time) ctx.timerService().registerProcessingTimeTimer(time + 5000) } //將流數據更新到緩存中 if (cache.contains(value._1)) { cache(value._1).append(value._2) } else { cache.put(value._1, ListBuffer[Long](value._2)) } } } ) .print("處理結果:") env.execute() } } 所有代碼解釋均在注釋中。 4. 運行結果 可以看到,定時器注冊之后,過5秒就會被觸發,同時注冊下個5秒的注冊器,然后將數據發送到下個算子打印出來。 注意:該實例中算子并行度為1,所以“定時器第一次注冊”只會觸發一次,如果是多個并行度的話,則會在每個并行度里面進行“定時器第一次注冊”,并且每個算子維護自己的定時器,算子之間互不影響。
來源:OSCHINA
發布時間:2020-07-15 17:55:00
女人个人私人电话联系杭州的|热久久久久香蕉无品码|爱情岛亚洲永久自拍品质|国产丶欧美丶日本不卡