これ、lambda関数使って作ったのですが、AWSが最初から搭載しておくべき機能じゃないかなと思うんですが、AWSの中の方ご検討ください。
※2023/08/04 今回はlambda関数で全部やろうとしてますが、AthenaでIP検出させたほうがいいのではないと思います。lambdaは要りますが設定に使うだけです。短いコードになります。
さて、レシピを解説します。
①GaurdDutyを有効化(ボタン一発)
②lambda関数を作成(後で中身作るのでフレームだけ作る)※入力は設定しなくて良い。入力は③で作って勝手に設定されるし、出力にWAFはない。
③EventBridgeを作成する。
イベントパターンでAWSのものを指定し、GaurdDutyのFindingを指定する。
こんな感じ。
ターゲットにlambda関数を指定。名前は②で作った関数名がドロップダウンで出てくるはず。
lambda関数のIAMロールにポリシーを付ける。
AmazonGuardDutyReadOnlyAccess
読み取り:wafv2:GetIPSet
書き込み:wafv2:UpdateIPSetとwafv2:UpdateIPSet
処理時間はデフォの3秒ではタイムアウトするので、2,30秒ほど取っとく。
さて、lambda関数のpythonコードは以下。
まず、テストデータとGaurdDutyの検出結果のデータは入り口が違うので2種類書いておく。先頭部分が違うだけで、後半は同じです。
ちょっと、ポートスキャンに対応してないことに気づきました。データ形式が違うんです。仕事の方は対応しましたが、SSHブルートゥースアタック対応だけで勘弁してください。bingのチャットの「厳密」でデーターと一緒に質問すれば正解教えてくれます。(Bardはへっぽこだった……)
テスト用と本番用コードの2つに分けてましたが、EventBridgeを通すのならevent変数から取ればいいらしいので、統一します。
また、IpAddressV4を取得するのに全パターンみるのも面倒だったので、JSONデータ検索して該当するものを取得するようにしました。
直す変数はip_set_nameとip_set_idの2つ。両方ともWAFのマネージメントコンソールに表示されてます。
import boto3
import json
import ipaddress
def extract_ipv4_addresses(data):
addresses = set()
def process_data(obj):
if isinstance(obj, dict):
for key, value in obj.items():
if key == "IpAddressV4":
ip = value
ip_network = ipaddress.ip_network(ip)
if ip_network.is_global:
addresses.add(ip)
else:
process_data(value)
elif isinstance(obj, list):
for item in obj:
process_data(item)
process_data(data)
return list(addresses)
def update_ip_set(client, ip_set_name, scope, ip_set_id, ips):
# Get existing IP set
ip_set = client.get_ip_set(
Name=ip_set_name,
Scope=scope,
Id=ip_set_id
)
# Update IP set with new addresses
addresses = ip_set['IPSet']['Addresses'] + ips
unique_addresses = list(set(addresses)) # Remove duplicates
response = client.update_ip_set(
Name=ip_set_name,
Scope=scope,
Id=ip_set['IPSet']['Id'],
Addresses=unique_addresses,
LockToken=ip_set['LockToken']
)
return response
def lambda_handler(event, context):
json_data = event
ips = extract_ipv4_addresses(json_data)
global_ips = [ip + '/32' for ip in ips if ipaddress.ip_address(ip).is_global]
print(global_ips)
if not global_ips:
return 'グローバルIPアドレスがありません。'
client = boto3.client('wafv2')
ip_set_name = 'test'
scope = 'REGIONAL'
ip_set_id = '123456-1234-1234-1234-1234567890'
max_attempts = 3
for attempt in range(max_attempts):
try:
response = update_ip_set(client, ip_set_name, scope, ip_set_id, global_ips)
break
except client.exceptions.WAFOptimisticLockException as e:
if attempt == max_attempts - 1:
raise e
if response['ResponseMetadata']['HTTPStatusCode'] == 200:
return 'グローバルIPアドレスが正常に設定されました。'
else:
return 'グローバルIPアドレスの設定に失敗しました。'
以上
変更する変数ip_set_nameとidはマネコンより取得できる。WAFに載ってる。
ちなみにテストデータは、GaurdDutyの検出結果のサンプルの生成ボタンを押すと、本番用コードがlambda関数に送られる。テストコードの場合、このサンプルをエクスポートするとJSONデータが表示されるのでコピペしてlambda関数のテストデータとする。
さて、本番のテストだが、公式に攻撃用のインスタンスを作成するCloudFormationのコードがあり、シェルがGitでダウンロードできるようになっているのでそれを利用する。
まあ、休日になればクラッカーが勝手に攻撃してくるので、それでまともに動いているか分かるんだけどね(GaurdDutyに検出され、WAFに登録される)
ちなみに、ALBのDNS攻撃しても検出されないから。EC2とか見てるそうなんで。あとテスト用の192.100.0.0と1はテスト用IPなのでグローバルIPアドレスと認識されません。
どなたかのご参考になれば幸いです。
コメント