Categories
程式開發

Serverless 實戰:通過 Serverless 架構實現監控告警


在實際生產中,我們經常需要做一些監控腳本來監控網站服務或者API服務是否可用。傳統的方法是使用網站監控平台(例如DNSPod監控、360網站服務監控,以及阿里雲監控等),它們的原理是通過用戶自己設置要監控的服務地址和監測的時間閾值,由監控平台定期發起請求對網站或服務的可用性進行判斷。

這些方法很大眾化,通用性很強,但也不是所有場景都適合。例如,如果我們的需求是監控網站狀態碼,不同區域的延時,並且通過監控得到的數據,設定一個閾值,一旦超過閾值就通過郵件等進行統治告警,目前大部分的監控平台是很難滿足這些需求的,這時就需要定制開發一個監控工具。

Serverless服務的一個重要應用場景就是運維、監控與告警,所以本文將會通過現有的Serverless平台,部署一個網站狀態監控腳本,對目標網站的可用性進行監控告警。

Web服務監控告警

針對Web服務,我們先設計一個簡單的監控告警功能的流程:

Serverless 實戰:通過 Serverless 架構實現監控告警 1

在這個流程中,我們僅對網站的狀態碼進行監控,即返回的狀態為200,則判定網站可正常使用,否則進行告警:

# -*- coding: utf8 -*-
import ssl
import json
import smtplib
import urllib.request
from email.mime.text import MIMEText
from email.header import Header

ssl._create_default_https_context = ssl._create_unverified_context


def sendEmail(content, to_user):
    sender = '[email protected]'
    receivers = [to_user]

    mail_msg = content
    message = MIMEText(mail_msg, 'html', 'utf-8')
    message['From'] = Header("网站监控", 'utf-8')
    message['To'] = Header("站长", 'utf-8')

    subject = "网站监控告警"
    message['Subject'] = Header(subject, 'utf-8')

    try:
        smtpObj = smtplib.SMTP_SSL("smtp.exmail.qq.com", 465)
        smtpObj.login('发送邮件的邮箱地址', '密码')
        smtpObj.sendmail(sender, receivers, message.as_string())
    except smtplib.SMTPException as e:
        print(e)


def getStatusCode(url):
    return urllib.request.urlopen(url).getcode()


def main_handler(event, context):
    url = "http://www.anycodes.cn"
    if getStatusCode(url) == 200:
        print("您的网站%s可以访问!" % (url))
    else:
        sendEmail("您的网站%s 不可以访问!" % (url), "接受人邮箱地址")
    return None

通過ServerlessFramework可以部署,在部署的時候可以增加時間觸發器:

MyWebMonitor:
  component: "@serverless/tencent-scf"
  inputs:
    name: MyWebMonitor
    codeUri: ./code
    handler: index.main_handler
    runtime: Python3.6
    region: ap-guangzhou
    description: 网站监控
    memorySize: 64
    timeout: 20
    events:
      - timer:
          name: timer
          parameters:
            cronExpression: '*/5 * * * *'
            enable: true

在這裡,timer表示時間觸發器,cronExpression是表達式:

創建定時觸發器時,用戶能夠使用標準的 Cron 表達式的形式自定義何時觸發。定時觸發器現已推出秒級觸發功能,為了兼容老的定時觸發器,因此 Cron 表達式有兩種寫法。

Cron 表達式語法一(推薦)

Cron 表達式有七個必需字段,按空格分隔。
Serverless 實戰:通過 Serverless 架構實現監控告警 2

其中,每個字段都有相應的取值範圍:
Serverless 實戰:通過 Serverless 架構實現監控告警 3

Cron 表達式語法二(不推薦)

Cron 表達式有五個必需字段,按空格分隔。
Serverless 實戰:通過 Serverless 架構實現監控告警 4

其中,每個字段都有相應的取值範圍:
Serverless 實戰:通過 Serverless 架構實現監控告警 5

通配符

Serverless 實戰:通過 Serverless 架構實現監控告警 6

注意事項

在 Cron 表達式中的“日”和“星期”字段同時指定值時,兩者為“或”關係,即兩者的條件分別均生效。

示例

*/5 * * * * * * 表示每5秒觸發一次
0 0 2 1 * * * 表示在每月的1日的凌晨2點觸發
0 15 10 * * MON-FRI * 表示在周一到週五每天上午10:15觸發
0 0 10,14,16 * * * * 表示在每天上午10點,下午2點,4點觸發
0 */30 9-17 * * * * 表示在每天上午9點到下午5點內每半小時觸發
0 0 12 * * WED * 表示在每個星期三中午12點觸發

因此,我們上面的代碼可以認為是每5秒觸發一次,當然,也可以根據網站監控密度,自定義設置觸發的間隔時間。當我們網站服務不可用時,就可以收到告警:

Serverless 實戰:通過 Serverless 架構實現監控告警 7

這種網站監控方法比較簡單,準確度可能會有問題,對於網站或服務的監控不能簡單的看返回值,還要看鏈接耗時、下載耗時以及不同區域、不同運營商訪問網站或者服務的延時信息等。

所以,我們需要對這個代碼進行額外的更新與優化:

  1. 通過在線網速測試的網站,抓包獲取不同地區不同運營商的請求特徵;

  2. 編寫爬蟲程序,進行在線網速測試模塊的編寫;

  3. 集成到剛剛的項目中;

下面以站長工具網站中國內網站測速工具 為例,通過網頁查閱相關信息。

對網站測速工具進行封裝,例如:

Serverless 實戰:通過 Serverless 架構實現監控告警 8

通過對網頁進行分析,獲取請求特徵,包括Url,Form data,以及Headers等相關信息,其中該網站在使用不同監測點對網站進行請求時,是通過Form data中的guid的參數實現的,例如部分監測點的guid:

广东佛山电信f403cdf2-27f8-4ccd-8f22-6f5a28a01309
江苏宿迁多线74cb6a5c-b044-49d0-abee-bf42beb6ae05
江苏常州移动5074fb13-4ab9-4f0a-87d9-f8ae51cb81c5
浙江嘉兴联通ddfeba9f-a432-4b9a-b0a9-ef76e9499558

此時,我們可以編寫基本的爬蟲代碼,來對Response進行初步解析,以62a55a0e-387e-4d87-bf69-5e0c9dd6b983 江苏宿迁[电信]為例,編寫代碼:

import urllib.request
import urllib.parse

url = "*某测速网站地址*"
form_data = {
    'guid': '62a55a0e-387e-4d87-bf69-5e0c9dd6b983',
    'host': 'anycodes.cn',
    'ishost': '1',
    'encode': 'ECvBP9vjbuXRi0CVhnXAbufDNPDryYzO',
    'checktype': '1',
}
headers = {
    'Host': 'tool.chinaz.com',
    'Origin': '*某测速网站地址*',
    'Referer': '*某测速网站地址*',
    'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_3) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/74.0.3729.108 Safari/537.36',
    'X-Requested-With': 'XMLHttpRequest'
}

print(urllib.request.urlopen(
    urllib.request.Request(
        url=url,
        data=urllib.parse.urlencode(form_data).encode('utf-8'),
        headers=headers
    )
).read().decode("utf-8"))

獲得結果:

({
state: 1,
msg: '',
result: {
ip: '119.28.190.46',
httpstate: 200,
alltime: '212',
dnstime: '18',
conntime: '116',
downtime: '78',
filesize: '-',
downspeed: '4.72',
ipaddress: '新加坡新加坡',
headers: 'HTTP/1.1 200 OK br>Server: ...',
pagehtml: ''
}
})

在這個結果中,我們可以提取部分數據,例如江蘇宿遷[电信]訪問目標網站的基礎數據:

总耗时:alltime:'212'
链接耗时:conntime:'116'
下载耗时:downtime:'78'

此時,我們可以改造代碼對更多的節點,進行測試:

江苏宿迁[电信]总耗时:223链接耗时:121下载耗时:81
广东佛山[电信]总耗时:44链接耗时:27下载耗时:17
广东惠州[电信]总耗时:56链接耗时:34下载耗时:22
广东深圳[电信]总耗时:149链接耗时:36下载耗时:25
浙江湖州[电信]总耗时:3190链接耗时:3115下载耗时:75
辽宁大连[电信]总耗时:468链接耗时:255下载耗时:170
江苏泰州[电信]总耗时:180链接耗时:104下载耗时:69
安徽合肥[电信]总耗时:196链接耗时:110下载耗时:73
...

並對項目中的index.py進行代碼修改:

# -*- coding: utf8 -*-
import ssl
import json
import re
import socket
import smtplib
import urllib.request
from email.mime.text import MIMEText
from email.header import Header

socket.setdefaulttimeout(2.5)
ssl._create_default_https_context = ssl._create_unverified_context

def getWebTime():

    final_list = []
    final_status = True

    total_list = '''62a55a0e-387e-4d87-bf69-5e0c9dd6b983 江苏宿迁[电信]
    f403cdf2-27f8-4ccd-8f22-6f5a28a01309 广东佛山[电信]
    5bea1430-f7c2-4146-88f4-17a7dc73a953 河南新乡[多线]
    1f430ff0-eae9-413a-af2a-1c2a8986cff0 河南新乡[多线]
    ea551b59-2609-4ab4-89bc-14b2080f501a 河南新乡[多线]
    2805fa9f-05ea-46bc-8ac0-1769b782bf52 黑龙江哈尔滨[联通]
    722e28ca-dd02-4ccd-a134-f9d4218505a5 广东深圳[移动]
8e7a403c-d998-4efa-b3d1-b67c0dfabc41 广东深圳[移动]'''

    url = "*某测速网站地址*"
    for eve in total_list.split('n'):
        id_data, node_name = eve.strip().split(" ")
        form_data = {
            'guid': id_data,
            'host': 'anycodes.cn',
            'ishost': '1',
            'encode': 'ECvBP9vjbuXRi0CVhnXAbufDNPDryYzO',
            'checktype': '1',
        }
        headers = {
            'Host': '*某测速网站地址*',
            'Origin': '*某测速网站地址*',
            'Referer': '*某测速网站地址*',
            'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_3) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/74.0.3729.108 Safari/537.36',
            'X-Requested-With': 'XMLHttpRequest'
        }
        try:
            result_data = urllib.request.urlopen(
                urllib.request.Request(
                    url=url,
                    data=urllib.parse.urlencode(form_data).encode('utf-8'),
                    headers=headers
                )
            ).read().decode("utf-8")
            try:
                alltime = re.findall("alltime:'(.*?)'", result_data)[0]
                conntime = re.findall("conntime:'(.*?)'", result_data)[0]
                downtime = re.findall("downtime:'(.*?)'", result_data)[0]
                final_string = "%st总耗时:%st链接耗时:%st下载耗时:%s" % (node_name, alltime, conntime, downtime)
            except:
                final_string = "%s链接异常!" % (node_name)
                final_status = False
        except:
            final_string = "%s链接超时!" % (node_name)
            final_status = False
        final_list.append(final_string)
        print(final_string)
    return (final_status,final_list)
def sendEmail(content, to_user):
    sender = '[email protected]'
    receivers = [to_user]
    mail_msg = content
    message = MIMEText(mail_msg, 'html', 'utf-8')
    message['From'] = Header("网站监控", 'utf-8')
    message['To'] = Header("站长", 'utf-8')
    subject = "网站监控告警"
    message['Subject'] = Header(subject, 'utf-8')
    try:
        smtpObj = smtplib.SMTP_SSL("smtp.exmail.qq.com", 465)
        smtpObj.login('[email protected]', '密码')
        smtpObj.sendmail(sender, receivers, message.as_string())
    except smtplib.SMTPException:
        pass

def getStatusCode(url):
    return urllib.request.urlopen(url).getcode()

def main_handler(event, context):
    url = "http://www.anycodes.cn"
    final_status,final_list = getWebTime()
    if not final_status:
        sendEmail("您的网站%s的状态:
%s" % (url, "
".join(final_list)), "[email protected]")

由於本文是以學習為主,所以我們將節點列表進行縮減,只保留幾個。通過部署,可得到結果:

Serverless 實戰:通過 Serverless 架構實現監控告警 9

告警的靈敏度和監控的頻率,在實際生產過程中可以根據自己的需求進行調整。

雲服務監控告警

前文,我們對網站狀態以及健康等信息進行了監控與告警,在實際的生產運維中,還需要對服務進行監控,例如在使用Hadoop、Spark的時候對節點的健康進行監控,在使用K8S的時候對API網關、ETCD等多維度的指標進行監控,在使用Kafka的時候,對數據積壓量,以及Topic、Consumer等進行監控…

而這些服務的監控,往往不能通過簡單的URL以及某些狀態來進行判斷。傳統運維的做法是在額外的機器上設置一個定時任務,對相關的服務進行旁路監控。而在本文中,我們則通過Serverless技術,對雲產品進行相關的監控與告警。

在使用雲上的Kafka時,我們通常要看數據積壓量,因為如果Consumer集群掛掉了,或者消費能力突然降低導致數據積壓,很可能會對服務產生不可預估的影響,這個時候對Kafka的數據積壓量進行監控告警,就顯得額外重要。

本文以監控騰訊雲的Ckafka為例進行實踐,並通過多個雲產品進行組合(包括雲監控、Ckafka、雲API以及雲短信等)來實現短信告警、郵件告警以及企業微信告警功能。

首先,可以設計簡單的流程圖:

Serverless 實戰:通過 Serverless 架構實現監控告警 10

在開始項目之前,我們要準備一些基礎的模塊:

  • Kafka數據積壓量獲取模塊:
def GetSignature(param):
    # 公共参数
    param["SecretId"] = ""
    param["Timestamp"] = int(time.time())
    param["Nonce"] = random.randint(1, sys.maxsize)
    param["Region"] = "ap-guangzhou"
    # param["SignatureMethod"] = "HmacSHA256"
    # 生成待签名字符串
    sign_str = "GETckafka.api.qcloud.com/v2/index.php?"
    sign_str += "&".join("%s=%s" % (k, param[k]) for k in sorted(param))
    # 生成签名
    secret_key = ""
    if sys.version_info[0] > 2:
        sign_str = bytes(sign_str, "utf-8")
        secret_key = bytes(secret_key, "utf-8")
    hashed = hmac.new(secret_key, sign_str, hashlib.sha1)
    signature = binascii.b2a_base64(hashed.digest())[:-1]
    if sys.version_info[0] > 2:
        signature = signature.decode()
    # 签名串编码
    signature = urllib.parse.quote(signature)
    return signature

def GetGroupOffsets(max_lag, phoneList):
    param = {}
    param["Action"] = "GetGroupOffsets"
    param["instanceId"] = ""
    param["group"] = ""
    signature = GetSignature(param)
    # 生成请求地址
    param["Signature"] = signature
    url = "https://ckafka.api.qcloud.com/v2/index.php?Action=GetGroupOffsets&"
    url += "&".join("%s=%s" % (k, param[k]) for k in sorted(param))
    req_attr = urllib.request.urlopen(url)
    res_data = req_attr.read().decode("utf-8")
    json_data = json.loads(res_data)
    for eve_topic in json_data['data']['topicList']:
        temp_lag = 0
        result_list = []
        for eve_partition in eve_topic["partitions"]:
            lag = eve_partition["lag"]
            temp_lag = temp_lag + lag
        if temp_lag > max_lag:
            result_list.append(
                {
                    "topic": eve_topic["topic"],
                    "lag": lag
                }
            )
        print(result_list)
        if len(result_list)>0:
            KafkaLagRobot(result_list)
            KafkaLagSMS(result_list,phoneList)

  • 接入企業微信機器人模塊:
def KafkaLagRobot(content):
    url = ""
    data = {
        "msgtype": "markdown",
        "markdown": {
            "content": content,
        }
    }
    data = json.dumps(data).encode("utf-8")
    req_attr = urllib.request.Request(url, data)
    resp_attr = urllib.request.urlopen(req_attr)
    return_msg = resp_attr.read().decode("utf-8")

  • 接入騰訊雲短信服務模塊:
def KafkaLagSMS(infor, phone_list):
    url = ""
    strMobile = phone_list
    strAppKey = ""
    strRand = str(random.randint(1, sys.maxsize))
    strTime = int(time.time())
    strSign = "appkey=%s&random=%s&time=%s&mobile=%s" % (strAppKey, strRand, strTime, ",".join(strMobile))
    sig = hashlib.sha256()
    sig.update(strSign.encode("utf-8"))

    phone_dict = []
    for eve_phone in phone_list:
        phone_dict.append(
            {
                "mobile": eve_phone,
                "nationcode": "86"
            }
        )
    data = {
        "ext": "",
        "extend": "",
        "params": [
            infor,
        ],
        "sig": sig.hexdigest(),
        "sign": "你的sign",
        "tel": phone_dict,
        "time": strTime,
        "tpl_id": 你的模板id
    }
    data = json.dumps(data).encode("utf-8")
    req_attr = urllib.request.Request(url=url, data=data)
    resp_attr = urllib.request.urlopen(req_attr)
    return_msg = resp_attr.read().decode("utf-8")

  • 發送郵件告警模塊:
def sendEmail(content, to_user):
    sender = '[email protected]'
    message = MIMEText(content, 'html', 'utf-8')
    message['From'] = Header("监控", 'utf-8')
    message['To'] = Header("站长", 'utf-8')
    message['Subject'] = Header("告警", 'utf-8')
    try:
        smtpObj = smtplib.SMTP_SSL("smtp.exmail.qq.com", 465)
        smtpObj.login('[email protected]', '密码')
        smtpObj.sendmail(sender, [to_user], message.as_string())
    except smtplib.SMTPException as e:
        logging.debug(e)

完成模塊編寫,和上面的方法一樣,進行項目部署。部署成功之後進行測試,測試可看到功能可用:

  • 短信告警樣式:

Serverless 實戰:通過 Serverless 架構實現監控告警 11

  • 企業微信告警樣式:

Serverless 實戰:通過 Serverless 架構實現監控告警 12

總結

通過本文的實踐,希望讀者可以了解到Serverless相關產品在運維行業中的基本應用,尤其是監控告警的基本使用方法和初步靈感。設計一個網站監控程序實際上是一個很初級的入門場景,希望大家可以將更多的監控告警功與Serverless技術進行結合,例如監控自己的MySQL壓力情況、監控已有服務器的數據指標等,通過對這些指標的監控告警,不僅僅可以讓管理者及時發現服務的潛在風險,也可以通過一些自動化流程實現項目的自動化運維。

通過本場景實踐,我們也可以對項目進行額外的優化或者應用在不同的領域以及場景中。例如,我們可以通過增加短信告警、微信告警、企業微信告警等多個維度,來確保相關人員可以及時收到告警信息;我們也可以通過監控某個小說網站、視頻網站等,看到我們關注的小說或者視頻的更新情況,便於追更等。

本章內容大部分資料來自圖書《Serverless架構:從原理、設計到項目實戰》