GaurdDutyが検出したIPをWAFに登録

AWS

これ、lambda関数使って作ったのですが、AWSが最初から搭載しておくべき機能じゃないかなと思うんですが、AWSの中の方ご検討ください。

※2023/08/04 今回はlambda関数で全部やろうとしてますが、AthenaでIP検出させたほうがいいのではないと思います。lambdaは要りますが設定に使うだけです。短いコードになります。

さて、レシピを解説します。

①GaurdDutyを有効化(ボタン一発)

②lambda関数を作成(後で中身作るのでフレームだけ作る)※入力は設定しなくて良い。入力は③で作って勝手に設定されるし、出力にWAFはない。

③EventBridgeを作成する。

 イベントパターンでAWSのものを指定し、GaurdDutyのFindingを指定する。

こんな感じ。

ターゲットにlambda関数を指定。名前は②で作った関数名がドロップダウンで出てくるはず。

lambda関数のIAMロールにポリシーを付ける。

AmazonGuardDutyReadOnlyAccess
読み取り:wafv2:GetIPSet
書き込み:wafv2:UpdateIPSetとwafv2:UpdateIPSet

処理時間はデフォの3秒ではタイムアウトするので、2,30秒ほど取っとく。

さて、lambda関数のpythonコードは以下。

まず、テストデータとGaurdDutyの検出結果のデータは入り口が違うので2種類書いておく。先頭部分が違うだけで、後半は同じです。

ちょっと、ポートスキャンに対応してないことに気づきました。データ形式が違うんです。仕事の方は対応しましたが、SSHブルートゥースアタック対応だけで勘弁してください。bingのチャットの「厳密」でデーターと一緒に質問すれば正解教えてくれます。(Bardはへっぽこだった……)

テスト用と本番用コードの2つに分けてましたが、EventBridgeを通すのならevent変数から取ればいいらしいので、統一します。

また、IpAddressV4を取得するのに全パターンみるのも面倒だったので、JSONデータ検索して該当するものを取得するようにしました。

直す変数はip_set_nameとip_set_idの2つ。両方ともWAFのマネージメントコンソールに表示されてます。

import boto3
import json
import ipaddress

def extract_ipv4_addresses(data):
    addresses = set()

    def process_data(obj):
        if isinstance(obj, dict):
            for key, value in obj.items():
                if key == "IpAddressV4":
                    ip = value
                    ip_network = ipaddress.ip_network(ip)
                    if ip_network.is_global:
                        addresses.add(ip)
                else:
                    process_data(value)
        elif isinstance(obj, list):
            for item in obj:
                process_data(item)

    process_data(data)

    return list(addresses)


def update_ip_set(client, ip_set_name, scope, ip_set_id, ips):
    # Get existing IP set
    ip_set = client.get_ip_set(
        Name=ip_set_name,
        Scope=scope,
        Id=ip_set_id
    )

    # Update IP set with new addresses
    addresses = ip_set['IPSet']['Addresses'] + ips
    unique_addresses = list(set(addresses))  # Remove duplicates

    response = client.update_ip_set(
        Name=ip_set_name,
        Scope=scope,
        Id=ip_set['IPSet']['Id'],
        Addresses=unique_addresses,
        LockToken=ip_set['LockToken']
    )

    return response


def lambda_handler(event, context):
    json_data = event
    ips = extract_ipv4_addresses(json_data)
    global_ips = [ip + '/32' for ip in ips if ipaddress.ip_address(ip).is_global]
    print(global_ips)

    if not global_ips:
        return 'グローバルIPアドレスがありません。'

    client = boto3.client('wafv2')
    ip_set_name = 'test'
    scope = 'REGIONAL'
    ip_set_id = '123456-1234-1234-1234-1234567890'

    max_attempts = 3
    for attempt in range(max_attempts):
        try:
            response = update_ip_set(client, ip_set_name, scope, ip_set_id, global_ips)
            break
        except client.exceptions.WAFOptimisticLockException as e:
            if attempt == max_attempts - 1:
                raise e

    if response['ResponseMetadata']['HTTPStatusCode'] == 200:
        return 'グローバルIPアドレスが正常に設定されました。'
    else:
        return 'グローバルIPアドレスの設定に失敗しました。'

以上

変更する変数ip_set_nameとidはマネコンより取得できる。WAFに載ってる。

ちなみにテストデータは、GaurdDutyの検出結果のサンプルの生成ボタンを押すと、本番用コードがlambda関数に送られる。テストコードの場合、このサンプルをエクスポートするとJSONデータが表示されるのでコピペしてlambda関数のテストデータとする。

さて、本番のテストだが、公式に攻撃用のインスタンスを作成するCloudFormationのコードがあり、シェルがGitでダウンロードできるようになっているのでそれを利用する。

まあ、休日になればクラッカーが勝手に攻撃してくるので、それでまともに動いているか分かるんだけどね(GaurdDutyに検出され、WAFに登録される)

ちなみに、ALBのDNS攻撃しても検出されないから。EC2とか見てるそうなんで。あとテスト用の192.100.0.0と1はテスト用IPなのでグローバルIPアドレスと認識されません。

どなたかのご参考になれば幸いです。

コメント

タイトルとURLをコピーしました