LambdaのエラーログをPython3でSlackに飛ばしたい
概要
Lambdaに吐いたログを監視して、エラーを検知したらSlackに通知したい。
ここでいうと"[ERROR]~"みたいな。
ググると、先人たちがたくさんやってるけど、自分の学習用振り返りとして。
TL;DR
- SNSを作成
- CloudWatchログ メトリクスフィルター作成
- CloudWatchアラームを設定
- Slack通知Lambdaを作る
手順
SNS 作成
Lambdaの実行するためにSNSを作成する。
名前だけであとは特に何もせず作成。
CloudWatchログ メトリクスフィルター作成
この"[ERROR].*"が出たら、Slack通知がしたい。
メトリクスフィルターを作成する。
[ERROR]をマッチさせるよう「"[ERROR]"」にする。(ダブルクォートしないと完全一致じゃなくなる。)
[ERROR]が出ているログストリームで「パターンをテスト」をして、マッチしたらOK。
「フィルター名」はログストリーム内のメトリクスフィルターの名前なので、適当でいい(と思う)。
「メトリクス名前空間」と「メトリクス名」は、CloudWatchメトリクスで監視するときの名前になるので、サービス名とメトリクス名とかにした方がいいかもしれない。
「メトリクス値」は検知したときにCloudWatchメトリクスに記録される値なので、検知数にしておく。
CloudWatchアラームを設定
出来たら、メトリクスフィルターリンクをクリックして、CloudWatchメトリクスを確認しに行く。
メトリクスがちゃんと取れていたら、OK。
ベルマークを押して、CloudWatchアラームを作成する。
閾値や監視間隔はおのおので。
予め作っておいたSNSトピックを選択して、作成。
アラーム名は「alert-${Lambdaファンクション名}」にする(いろいろあってこうしておきたい)
監視する側はこれでOK。
Slack通知Lambdaを作る
Python3でロールは適当なロールを選択
ソース
ほぼ、写経。Lambdaファンクション名が欲しかったのとSlackメッセージのフォーマットを整えたかったので、そこだけ追記。
CloudWatch上のエラーログをslackに通知する - Qiita
import boto3 import calendar import datetime import json import logging import os import re import slackweb from base64 import b64decode from urllib.request import Request, urlopen from urllib.error import URLError, HTTPError logger = logging.getLogger() logger.setLevel(logging.INFO) # The base-64 encoded, encrypted key (CiphertextBlob) stored in the kmsEncryptedHookUrl environment variable ENCRYPTED_HOOK_URL = os.environ['kmsEncryptedHookUrl'] # The Slack channel to send a message to stored in the slackChannel environment variable SLACK_CHANNEL = os.environ['slackChannel'] HOOK_URL = "https://" + boto3.client('kms').decrypt(CiphertextBlob=b64decode(ENCRYPTED_HOOK_URL))['Plaintext'].decode('utf-8') #抽出するログデータの最大件数 OUTPUT_LIMIT=5 #何分前までを抽出対象期間とするか TIME_FROM_MIN=5 client = boto3.client('logs') def lambda_handler(event, context): message = json.loads(event['Records'][0]['Sns']['Message']) metricfilters = client.describe_metric_filters( metricName = message['Trigger']['MetricName'] , metricNamespace = message['Trigger']['Namespace'] ) function_name = metricfilters['metricFilters'][0]['logGroupName'].replace('/aws/lambda/', '') timeto = datetime.datetime.strptime(message['StateChangeTime'][:19] ,'%Y-%m-%dT%H:%M:%S') + datetime.timedelta(minutes=1) u_to = calendar.timegm(timeto.utctimetuple()) * 1000 #開始時刻は終了時刻のTIME_FROM_MIN分前 timefrom = timeto - datetime.timedelta(minutes=TIME_FROM_MIN) u_from = calendar.timegm(timefrom.utctimetuple()) * 1000 response = client.filter_log_events( logGroupName = metricfilters['metricFilters'][0]['logGroupName'] , filterPattern = metricfilters['metricFilters'][0]['filterPattern'], startTime = u_from, endTime = u_to, limit = OUTPUT_LIMIT ) logger.info("response : " + str(response)) for event in response['events']: postText = ''' {logStreamName} {message} '''.format( logStreamName=str(event['logStreamName']), message=str(event['message'])).strip() url = "https://ap-northeast-1.console.aws.amazon.com/lambda/home?region=ap-northeast-1#/functions/" + function_name attachments = [] attachment = { "color": "#ff3333", "title": f"Error Log find at {function_name}", "title_link": url, "fields": [ { "title": "Reason", "value": postText } ] } attachments.append(attachment) slack = slackweb.Slack(url=HOOK_URL) response = slack.notify(channel=SLACK_CHANNEL, attachments=attachments)
Lambdaの環境変数
SlackのフックURLとチャンネルを登録する
フックURLは暗号化する。
しない場合はLambdaソースでdecryptしてるところを消して直接変数投入でいいと思う。
SNSのサブスクリプションにLambdaを登録する
赤枠部分に↑で作ったLambdaファンクションを入れてあげる。
再度、Lambdaを見に行って、トリガーにSNSが設定されていればOK。
テスト
適当に作ったhelloファンクションにログを仕込む。
import boto3 import json import logging import os def lambda_handler(event, context): logger = logging.getLogger() logger.setLevel(logging.INFO) logger.error("error 1") logger.error("error 2") logger.error("error 3") return { 'statusCode': 200, 'body': json.dumps('Hello from Lambda!') }
helloのファンクションのログが出力されたことを確認。
Slackに通知が飛べば完成。
文言はもっといい感じにできると思う。
Creating rich message layouts | Slack