ログ収集処理を作った

· ☕ 2
🏷️
  • #aws
  • #lambda
  • #Python
  • ログ収集なんて本当はKinesisとか使うんだと思うんだけど、料金見たら高そうだったから、いったん諦めました。

    いつか学習のためにリベンジはするかも?

    今回はSQSとlambdaで作っていきます。

    設計とか、ここに至るまでの経緯とかはこちら

    もともと収集したいものは、SQSに吐くように作っていたので、それを拾っていくだけ。

    ゆいちゃん

    拾った後、どうするかはまだ考えてない…。

    RDSお金かかるし…

    とりあえず、s3に吐いて、slackに通知して、見れるようになれば今日はおしまい。

    今日のゴール

    lambda

    • sqsのキュー一覧を取得して、logが含まれているものに絞る。

    • それぞれのキューの中身を取得

    • キューの中身をcsvにしてs3に出力

    • 出力した結果をsnsに吐いて、それを前に作ったslack送信の仕組みでslackに送信。

    ちなみにデプロイは、前に作った簡単デプロイの仕組みを使って、bash build-lambda.sh collectionLog バケット名 dev ロール名でデプロイ。

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    
    import boto3
    import datetime
    import json
    import io
    import csv
    import os
    
    BUCKET_NAME = xxxxxxx
    TOPIC_NAME = xxxxxxx
    
    def send_sns_topic(topic_arn, message, subject):
        client = boto3.client('sns')
        return client.publish(
            TopicArn=topic_arn,
            Message=message,
            Subject=subject
        )
    
    
    def put_s3_dict_to_csv(bucket_name, file_name, header, datas):
        with io.StringIO() as f:
            writer = csv.DictWriter(f, fieldnames=header)
            writer.writeheader()
            [writer.writerow(data) for data in datas]
            s3 = boto3.resource("s3")
            s3.Bucket(bucket_name).put_object(Key=file_name,
                                            Body=f.getvalue())
    
    def main():
          
        sqs = boto3.client('sqs') 
        queues = sqs.list_queues()["QueueUrls"]
        queues = list(filter(lambda x:"log" in x,queues))
        queues = list(map(lambda x:x.split("/")[4],queues))
        print(queues)
        
        now = (datetime.datetime.utcnow() + datetime.timedelta(hours=9)).strftime("%Y_%m_%d")
        print(now)
        slackMessage = f"{now} logs\n"
        
        for queueName in queues:
            datas = []
            queue = boto3.resource("sqs").get_queue_by_name(QueueName=queueName)
            messages = ["dummy"]
            while len(messages) != 0:
                message = None
                messages = queue.receive_messages(MaxNumberOfMessages=10, WaitTimeSeconds=1)
                if len(messages) != 0:
                    for message in messages:
                        datas += [json.loads(message.body)]
                        # message.delete()
    
            if len(datas) > 0:
                put_s3_dict_to_csv(BUCKET_NAME, f"log/date={now}/{queueName}.csv", ['Subject','Timestamp','Message','MessageId', 'SignatureVersion', 'SigningCertURL', 'TopicArn', 'Type', 'Signature', 'UnsubscribeURL'], datas)
                slackMessage += f"https://{BUCKET_NAME}.s3-ap-northeast-1.amazonaws.com/log/date={now}/{queueName}.csv\n"
    
        print(slackMessage)
        topic_arn = f"arn:aws:sns:ap-northeast-1:{os.environ['ACCOUNT_ID']}:{TOPIC_NAME}"
        subject = f'log収集'
    
        send_sns_topic(topic_arn,json.dumps({"channel":"#log","message":slackMessage}), subject)
        return queues
    
    def lambda_handler(event, context):
        
        try:
            return main()
        except:
            import traceback
            try:
                traceback.print_exc()
                return {
                    'statusCode': 400,
                    'body': json.dumps({"errMessage":"error"}, ensure_ascii=False)
                }
            except:
                traceback.print_exc()
    

    slack

    届いた

    slack

    中身もちゃんと出てる。完成!


    るな
    るな
    エンジニア