Útmutató az MDATP API Szkripteléshez Kezdőknek

 

A Microsoft Defender Advanced Threat Protection egy vállalati végpontbiztonsági platform, amelyet arra terveztek, hogy elősegítse a vállalatokat a fejlett fenyegetések megelőzésében, észlelésében, kivizsgálásában és az azokra való reagálásban.

Az MDATP API végpontjain keresztül a platform számos tulajdonságát lehet kezelni. Ebben a posztban megvizsgálunk egy egyszerű példát arra, hogyan lehet az MDATP programozási interfészét felhasználni egy scriptben,
mivel IT biztonsági szakemberekként fontos, hogy képesek legyünk automatizálni különböző folyamatokat és API használatával bővítsük a szervezet kiberbiztonsági képességeit. Példánkban a Python és az MDATP API használatával automatikusan fogunk riportokat készíteni a az eszköz riasztási eseményeiről.

 

Ehhez természetesen szükségünk lesz Pythonra, az XlsxWriter Python modulra és az MDATP-ből a következő adatokra:

  • Egy Application ID/Client Secret páros a hitelesítéshez, amelyet az Azure-ból szedhetünk ki
  • A tenant ID-ra az adott Azure instance pontos azonosításához
  • Illetve a lekérdezésre, amelyet futtatni szeretnénk

Először is készítsük el a lekérdezést. Ehhez átváltunk az MDATP Advanced Hunting oldalra és megvizsgáljuk az elérhető sémákat. Ezesetben a „DeviceAlertEvents” táblára lesz szükségünk, amelyet le szeretnénk szűrni a szkriptben megadott idősávra. Fontos megjegyezni, hogy az időformátumnak a következőre kell hasonlítania: “%Y-%m-%dT%H:%M:%SZ”.

A lekérdezés a következőképp fog kinézni:

Ezután el is lehet kezdeni a Python szkript megírását az importokkal.


import datetime
import json
import time
import requests
import xlsxwriter

Ha ez kész, deklarálunk egy változót, amely tartalmazni fogja, hogy hány percenként szeretnénk riportot készíteni. A mi esetünkben óránként fogunk riportot generálni.

runEvery = 60

A továbbiakat tegyük egy ciklusba, hogy folyamatosan fusson a script, majd a datetime modul segítségével számítsuk ki, mely két időpont között keressen eredményeket a query.

while True:
queryUntil = datetime.datetime.utcnow()
queryFrom = queryUntil - datetime.timedelta(minutes=runEvery)

A lekérdezés futtatásához először hitelesítenünk kell magunkat, hogy kapjunk egy tokent, amellyel elérhetjük az API-t. Itt lesz szükségünk az „Application ID”-ra, „Client Secret”-re és a „Tenant ID”-ra. Az Application ID és a Client Secret az „App Registrations” része, amelyeket az alkalmazások és esetünkben a szkriptek használnak hitelesítéshez (és kiváltják a hagyományos service usereket). Ha ezek nincsenek beállítva, akkor át kell térni az Azure app registrations oldara, ahol el lehet végezni a szükséges alkalmazásregisztrációt.

Amint ezzel végeztünk, hozzunk létre két „dict” -et, amelyek tartalmazni fogják a POST kérésünk fejléceit és adatait. Azok számára, akik nem ismerik a Python dicteket, ezek a „hash table” (egy fajta asszociatív tömb) adatstruktúrát implementálják, amelyek kulcs és érték párból (tuple) állnak.

headers = {"Content-Type": "application/x-www-form-urlencoded"}
data = {"grant_type": "client_credentials", "client_id": "**YOUR CLEINT ID***", "scope": "https://securitycenter.microsoft.com/mtp/.default",
"client_secret": "***YOUR CLIENT SECRET***"}

A kérést kiszolgáló URL-be kell illesztenünk a Tenant ID-nkat. A requests modul egyik előnye, hogy a „.json()” metódussal egyből fel tudjuk dolgozni a JSON választ, így egy sorba létre is tudjuk hozni a kapott „bearer token”-t.
authKey = "Bearer " + \
requests.post("https://login.microsoftonline.com/***YOUR TENANT ID*** oauth2/v2.0/token",
headers=headers, data=data).json()["access_token"]

Amint a hitelesítési token elkészült, neki lehet futni a kérésnek, amely futtatni fogja a lekérdezést. Újabb fejléceket készítünk, hozzáadjuk az időtartományt a korábban írt lekérdezéshez, ezt a lekérdezést hozzáadjuk az adatot tartalmazó dicthez, és máris küldhetjük a lekérdezést az MDATP-nek.

headers = {"Content-Type": "application/json", "authorization": authKey}query = "DeviceAlertEvents | where Timestamp between(datetime(" + \
queryFrom.strftime("%Y-%m-%dT%H:%M:%SZ") + ") .. datetime(" + queryUntil.strftime("%Y-%m-%dT%H:%M:%SZ") + "))"
data = {"Query": query}
result = requests.post("https://api-eu.securitycenter.microsoft.com/api/advancedqueries/run", headers=headers,
data=json.dumps(data)).json()

 

Ezzel le is futott a lekérdezés, amely eredményét egy JSON válaszban fogunk megkapni.

Az érdekes adatok a „Results” kulcsú tupleben lesznek, amely tartalmazni fogja az riasztásokat.

A válasz többi részeit figyelmen kívül lehet hagyni.
result = result["Results"]
Most, hogy megvannak az adatok, elkezdhetjük létrehozni az Excel riportot. Ehhez először át kell alakítanunk az API-választ tartalmazó dictet olyan formátummá, amely könnyen átvihető Excelbe. Remélhetőleg a következő ábra érthetően átadja, mit is próbálunk csinálni.

Az első lépés, hogy összevonjuk a választ formáló dict-eket. Ehhez létrehozunk egy újat, végigmegyünk a válaszban szereplőkön, majd hozzáadjuk az értékeket az új dict-hez.


mergedDict = {}
for dictionary in result:
for key, item in dictionary.items():
mergedDict.setdefault(key, []).append(item)

Ezután el lehet kezdeni létrehozni az Excel fájlt. Ehhez deklarálunk néhány változót, amelyek tartalmazni fogják a jelentés nevét (egyedinek kell lennie minden jelentésnél), a munkafüzetet, a munkalapot, néhány formázást és két számlálót, amik számontartják, melyik sornál és oszlopnál tartunk.

workbookName = "Report " + queryUntil.strftime("%Y-%m-%d %H.%M") + ".xlsx"
workbook = xlsxwriter.Workbook(workbookName)
worksheet = workbook.add_worksheet()
bold = workbook.add_format({'bold': True})
row = 0
col = 0

Áttérhetünk az adatok átalakításának második lépésére. Végigmegyünk a dict összes kulcsán, kiírva a kulcsokat az oszlop elejére, majd kiírjuk a kulcshoz tartozó értékeket a kulcsok alá.

for key in mergedDict:
worksheet.write(row, col, key, bold)
for item in mergedDict[key]:
worksheet.write(row + 1, col, str(item))
row += 1
row = 0
col += 1

Már csak annyi maradt, hogy bezárjuk a munkafüzetet és várunk amíg új riportot nem kell generálni.

workbook.close()
time.sleep(60 * runEvery)

Kész is vagyunk. A mai blogposztunkban készítettünk egy szkriptet, amely használja az MDATP API-t és manipulálja a kapott adatokat. Az ilyen szkriptek írása egyszerű, ám rendkívül fontos képesség, hogy felhőalapú biztonsági megoldásainkkal hatékonyan tudjunk dolgozni.

A végleges script:

import datetime
import json
import time
import requests
import xlsxwriterrunEvery = 60
while True:
queryUntil = datetime.datetime.utcnow()
queryFrom = queryUntil - datetime.timedelta(minutes=runEvery)
headers = {"Content-Type": "application/x-www-form-urlencoded"}
data = {"grant_type": "client_credentials", "client_id": "**YOUR CLEINT ID***", "scope": "https://securitycenter.microsoft.com/mtp/.default",
"client_secret": "***YOUR CLIENT SECRET***"}
authKey = "Bearer " + \
requests.post("https://login.microsoftonline.com/***YOUR TENANT ID*** oauth2/v2.0/token",
headers=headers, data=data).json()["access_token"]
headers = {"Content-Type": "application/json", "authorization": authKey}
query = "DeviceAlertEvents | where Timestamp between(datetime(" + \
queryFrom.strftime("%Y-%m-%dT%H:%M:%SZ") + ") .. datetime(" + queryUntil.strftime("%Y-%m-%dT%H:%M:%SZ") + "))"
data = {"Query": query}
result = requests.post("https://api-eu.securitycenter.microsoft.com/api/advancedqueries/run", headers=headers,
data=json.dumps(data)).json()
result = result["Results"]
mergedDict = {}
for dictionary in result:
for key, item in dictionary.items():
mergedDict.setdefault(key, []).append(item)
workbookName = "Report " + queryUntil.strftime("%Y-%m-%d %H.%M") + ".xlsx"
workbook = xlsxwriter.Workbook(workbookName)
worksheet = workbook.add_worksheet()
bold = workbook.add_format({'bold': True})
row = 0
col = 0
for key in mergedDict:
worksheet.write(row, col, key, bold)
for item in mergedDict[key]:
worksheet.write(row + 1, col, str(item))
row += 1
row = 0
col += 1
workbook.close()
time.sleep(60 * runEvery)

July 10th, 2020

Top