GaurdDutyが検出したIPをWAFに登録

AWS

これ、lambda関数使って作ったのですが、AWSが最初から搭載しておくべき機能じゃないかなと思うんですが、AWSの中の方ご検討ください。

さて、レシピを解説します。

①GaurdDutyを有効化(ボタン一発)

②lambda関数を作成(後で中身作るのでフレームだけ作る)※入力は設定しなくて良い。入力は③で作って勝手に設定されるし、出力にWAFはない。

③EventBridgeを作成する。

 イベントパターンでAWSのものを指定し、GaurdDutyのFindingを指定する。

こんな感じ。

ターゲットにlambda関数を指定。名前は②で作った関数名がドロップダウンで出てくるはず。

lambda関数のIAMロールにポリシーを付ける。

AmazonGuardDutyReadOnlyAccess
読み取り:wafv2:GetIPSet
書き込み:wafv2:UpdateIPSetとwafv2:UpdateIPSet

処理時間はデフォの3秒ではタイムアウトするので、2,30秒ほど取っとく。

さて、lambda関数のpythonコードは以下。

まず、テストデータとGaurdDutyの検出結果のデータは入り口が違うので2種類書いておく。先頭部分が違うだけで、後半は同じです。

ちょっと、ポートスキャンに対応してないことに気づきました。データ形式が違うんです。仕事の方は対応しましたが、SSHブルートゥースアタック対応だけで勘弁してください。bingのチャットの「厳密」でデーターと一緒に質問すれば正解教えてくれます。(Bardはへっぽこだった……)

テスト用と本番用コードの2つに分けてましたが、EventBridgeを通すのならevent変数から取ればいいらしいので、統一します。

また、IpAddressV4を取得するのに全パターンみるのも面倒だったので、JSONデータ検索して該当するものを取得するようにしました。

直す変数はip_set_nameとip_set_idの2つ。両方ともWAFのマネージメントコンソールに表示されてます。

import boto3
import json
import ipaddress

def extract_ipv4_addresses(data):
    addresses = set()

    def process_data(obj):
        if isinstance(obj, dict):
            if 'IpAddressV4' in obj and 'LocalIpDetails' not in obj:
                addresses.add(obj['IpAddressV4'])
            else:
                for value in obj.values():
                    process_data(value)
        elif isinstance(obj, list):
            for item in obj:
                process_data(item)

    process_data(data)

    if len(addresses) == 0:
        return []

    formatted_addresses = [str(ipaddress.ip_network(f"{list(addresses)[0]}/32"))]
    return formatted_addresses
    
def update_ip_set(client, ip_set_name, scope, ip_set_id, ips):
    # Get existing IP set
    ip_set = client.get_ip_set(
        Name=ip_set_name,
        Scope=scope,
        Id=ip_set_id
    )

    # Update IP set with new addresses
    addresses = ip_set['IPSet']['Addresses'] + ips
    response = client.update_ip_set(
        Name=ip_set_name,
        Scope=scope,
        Id=ip_set['IPSet']['Id'],
        Addresses=addresses,
        LockToken=ip_set['LockToken']
    )
    
    return response

def lambda_handler(event, context):
    client = boto3.client('guardduty')
    print("****************")
    
    ips = extract_ipv4_addresses(event)
    
    print(ips)  # ログに出力されるようにするか、CloudWatch ロググループに設定する

    client = boto3.client('wafv2')
    ip_set_name = 'test'
    scope = 'REGIONAL'
    ip_set_id = '01234-56789-abcdef-ghijklmn'

    max_attempts = 3
    for attempt in range(max_attempts):
        try:
            response = update_ip_set(client, ip_set_name, scope, ip_set_id, ips)
            break
        except client.exceptions.WAFOptimisticLockException as e:
            if attempt == max_attempts - 1:
                raise e

    if response['ResponseMetadata']['HTTPStatusCode'] == 200:
        return 'IP アドレスが正常に設定されました。'
    else:
        return 'IP アドレスの設定に失敗しました。'

以上

変更する変数ip_set_nameとidはマネコンより取得できる。WAFに載ってる。

ちなみにテストデータは、GaurdDutyの検出結果のサンプルの生成ボタンを押すと、本番用コードがlambda関数に送られる。テストコードの場合、このサンプルをエクスポートするとJSONデータが表示されるのでコピペしてlambda関数のテストデータとする。

さて、本番のテストだが、公式に攻撃用のインスタンスを作成するCloudFormationのコードがあり、シェルがGitでダウンロードできるようになっているのでそれを利用する。

まあ、休日になればクラッカーが勝手に攻撃してくるので、それでまともに動いているか分かるんだけどね(GaurdDutyに検出され、WAFに登録される)

ちなみに、ALBのDNS攻撃しても検出されないから。EC2とか見てるそうなんで。

どなたかのご参考になれば幸いです。

コメント

タイトルとURLをコピーしました