自動RTbotを作った

フォロワー6000人記念企画で1週間ストーカーRTというのをやりました。

当選者のツイートを一週間ストーカーして、RTするというものです。

手動でやるとストーカー出来ないので、botさんにお願いすることにしましたw

概要

システム構成

システム構成

  • 1時間に一回cloud watch eventから起動

  • RT対象ユーザー情報はdynamoで管理

設計概要

  • 1時間に一回cloud watch eventでlambdaをキック

  • dynamoDBから対象ユーザーを取得

  • 対象ユーザーの最新ツイートを取得

  • 最新ツイートをリツイート

コード全量

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
from requests_oauthlib import OAuth1Session, OAuth1
import requests
import json
import os
import datetime
import random
import boto3

OAUTH1 = os.environ['OAUTH1']
OAUTH2 = os.environ['OAUTH2']
OAUTH3 = os.environ['OAUTH3']
OAUTH4 = os.environ['OAUTH4']
SCREEN_NAME = os.environ['SCREEN_NAME']
DEBUG_MODE = False

def getOauth():
    return OAuth1(OAUTH1, OAUTH2, OAUTH3, OAUTH4)


def getParam(key):
    dynamoDB = boto3.resource("dynamodb")
    paramTable = dynamoDB.Table("twitterLotPram")
    item = paramTable.get_item(Key={"key": key})
    if "Item" in item:
        return item["Item"]["data"]
    else:
        return []


def putParam(key, data):
    dynamoDB = boto3.resource("dynamodb")
    paramTable = dynamoDB.Table("twitterLotPram")
    paramTable.put_item(Item={"key": key, "data": data})


def main():

    if DEBUG_MODE:
        print(f"★★★debugモード★★★")

    # 対象ユーザーを取得
    user_name_list = getParam("retweetUserNameList")
    print(user_name_list)

    # 最新のツイートを1件取得する
    option = 'count=1&exclude_replies=true&include_rts=false'
    for screen_name in user_name_list:
        user_timeline = requests.get(
            f'https://api.twitter.com/1.1/statuses/user_timeline.json?screen_name={screen_name}&{option}',
            auth=getOauth()
        ).json()[0]
        print(user_timeline)

        # リツイートする
        print(f'{user_timeline["id"]}をリツイート')
        
        if not DEBUG_MODE:
            response = requests.post(
                f'https://api.twitter.com/1.1/statuses/retweet/{user_timeline["id"]}.json',
                auth=getOauth()
            ).json()
            print(response)


def lambda_handler(event, context):

    try:
        main()

    except:
        import traceback
        traceback.print_exc()

るな
るな
エンジニア