Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions docs/android/methodology.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,22 @@ By separating artifact collection from forensic analysis, this approach ensures

For more information, refer to the [AndroidQF project documentation](https://github.com/mvt-project/androidqf).

### VirusTotal package lookups

AndroidQF records APK file hashes in `packages.json`. MVT can optionally look up non-system APK hashes on VirusTotal while checking an AndroidQF acquisition:

```bash
MVT_VT_API_KEY=<key> mvt-android check-androidqf --virustotal /path/to/androidqf-output
```

The `--virustotal` option is disabled by default because it sends APK hashes to VirusTotal and requires network access. It uses the `VT_API_KEY` MVT configuration value, which can also be provided through the `MVT_VT_API_KEY` environment variable.

To avoid exhausting free VirusTotal API quotas, MVT waits 16 seconds between package hash requests by default. Use `--delay` to change the delay, or `--delay 0` to disable throttling:

```bash
mvt-android check-androidqf --virustotal --delay 30 /path/to/androidqf-output
```

## Android Intrusion Logs

On devices where the user has opted into Android's [**Advanced Protection Mode**](https://support.google.com/android/answer/16339980) and turned on the optional Intrusion Logging featrue, Android can create and archive structured *Intrusion Logs* in an encrypted format. These logs record DNS queries, outbound network connections, process starts, ADB activity and other security-relevant events, and are a high-fidelity complement to the rest of an AndroidQF acquisition. The logs are generated on-device and encrypted before being stored in the Google account associated with the device. The encryption key is protected by the user device PIN. The intrusion log data is not accessible to Google.
Expand Down
10 changes: 10 additions & 0 deletions src/mvt/android/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
HELP_MSG_CHECK_BUGREPORT,
HELP_MSG_CHECK_IOCS,
HELP_MSG_CHECK_INTRUSION_LOGS,
HELP_MSG_DELAY_CHECKS,
HELP_MSG_COMPLETION,
HELP_MSG_DISABLE_INDICATOR_UPDATE_CHECK,
HELP_MSG_DISABLE_UPDATE_CHECK,
Expand All @@ -36,6 +37,7 @@
HELP_MSG_STIX2,
HELP_MSG_VERBOSE,
HELP_MSG_VERSION,
HELP_MSG_VIRUS_TOTAL,
)
from mvt.common.logo import logo
from mvt.common.module_loader import CustomModuleLoadError, load_custom_modules
Expand Down Expand Up @@ -310,6 +312,10 @@ def check_backup(
help=HELP_MSG_LOAD_MODULE,
)
@click.option("--hashes", "-H", is_flag=True, help=HELP_MSG_HASHES)
@click.option("--virustotal", "-V", is_flag=True, help=HELP_MSG_VIRUS_TOTAL)
@click.option(
"--delay", "-d", type=click.IntRange(min=0), default=16, help=HELP_MSG_DELAY_CHECKS
)
@click.option("--non-interactive", "-n", is_flag=True, help=HELP_MSG_NONINTERACTIVE)
@click.option("--backup-password", "-p", help=HELP_MSG_ANDROID_BACKUP_PASSWORD)
@click.option("--verbose", "-v", is_flag=True, help=HELP_MSG_VERBOSE)
Expand All @@ -323,6 +329,8 @@ def check_androidqf(
module,
load_module,
hashes,
virustotal,
delay,
non_interactive,
backup_password,
verbose,
Expand All @@ -340,6 +348,8 @@ def check_androidqf(
module_options={
"interactive": not non_interactive,
"backup_password": cli_load_android_backup_password(log, backup_password),
"virustotal": virustotal,
"virustotal_delay": delay,
},
disable_version_check=_get_disable_flags(ctx)[0],
disable_indicator_check=_get_disable_flags(ctx)[1],
Expand Down
65 changes: 64 additions & 1 deletion src/mvt/android/modules/androidqf/aqf_packages.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,11 @@

import json
import logging
import time
from typing import Optional

from rich.progress import track

from mvt.android.utils import (
BROWSER_INSTALLERS,
PLAY_STORE_INSTALLERS,
Expand All @@ -15,7 +18,8 @@
SYSTEM_UPDATE_PACKAGES,
THIRD_PARTY_STORE_INSTALLERS,
)
from mvt.common.module_types import ModuleResults
from mvt.common.module_types import ModuleAtomicResult, ModuleResults
from mvt.common.virustotal import VTNoKey, VTQuotaExceeded, virustotal_lookup

from .base import AndroidQFModule

Expand Down Expand Up @@ -125,6 +129,65 @@ def check_indicators(self) -> None:
)
break

if self.module_options.get("virustotal", False):
self.check_virustotal(
delay=self.module_options.get("virustotal_delay", 0)
)

def check_virustotal(self, delay: int = 0) -> None:
files_by_hash: dict[
str, list[tuple[ModuleAtomicResult, ModuleAtomicResult]]
] = {}
for package in self.results:
if package.get("system", False):
continue

for package_file in package.get("files", []):
file_hash = package_file.get("sha256")
if not file_hash:
continue

files_by_hash.setdefault(file_hash, []).append((package, package_file))

total_hashes = len(files_by_hash)
if total_hashes == 0:
return

progress_desc = f"Looking up {total_hashes} package files on VirusTotal..."
for index, file_hash in enumerate(
track(files_by_hash, description=progress_desc)
):
try:
results = virustotal_lookup(file_hash)
except VTNoKey as exc:
self.log.warning("%s", exc)
return
except VTQuotaExceeded as exc:
self.log.warning("Unable to continue VirusTotal lookups: %s", exc)
break

if index < total_hashes - 1 and delay > 0:
time.sleep(delay)

if not results:
continue

attributes = results.get("attributes", {})
stats = attributes.get("last_analysis_stats", {})
positives = stats.get("malicious", 0)
total = len(attributes.get("last_analysis_results", {}))
detection = f"{positives}/{total}"

for package, package_file in files_by_hash[file_hash]:
package_file["virustotal"] = detection
if positives > 0:
self.alertstore.high(
f'VirusTotal flagged package "{package["name"]}" file '
f'"{package_file["path"]}" with {detection} detections',
"",
package,
)

def run(self) -> None:
packages = self._get_files_by_pattern("*/packages.json")
if not packages:
Expand Down
2 changes: 2 additions & 0 deletions src/mvt/common/help.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,3 +53,5 @@
HELP_MSG_CHECK_ANDROID_BACKUP = "Check an Android Backup"
HELP_MSG_CHECK_ANDROIDQF = "Check data collected with AndroidQF"
HELP_MSG_CHECK_INTRUSION_LOGS = "Check Android Intrusion Logging files"
HELP_MSG_VIRUS_TOTAL = "Check package APK hashes on VirusTotal"
HELP_MSG_DELAY_CHECKS = "Delay in seconds between VirusTotal requests"
53 changes: 53 additions & 0 deletions src/mvt/common/virustotal.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021-2023 The MVT Authors.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/

import logging
from typing import Any, Optional

import requests

from .config import settings

log = logging.getLogger(__name__)


class VTNoKey(Exception):
pass


class VTQuotaExceeded(Exception):
pass


def virustotal_lookup(file_hash: str) -> Optional[dict[str, Any]]:
if not settings.VT_API_KEY:
raise VTNoKey(
"No VirusTotal API key provided: to use VirusTotal lookups please set "
"MVT_VT_API_KEY or VT_API_KEY in the MVT configuration file"
)

headers = {
"User-Agent": "VirusTotal",
"Content-Type": "application/json",
"x-apikey": settings.VT_API_KEY,
}
res = requests.get(
f"https://www.virustotal.com/api/v3/files/{file_hash}",
headers=headers,
timeout=settings.NETWORK_TIMEOUT,
)

if res.status_code == 200:
report = res.json()
return report["data"]

if res.status_code == 404:
log.info("Could not find results for file with hash %s", file_hash)
elif res.status_code == 429:
raise VTQuotaExceeded("You have exceeded the quota for your VirusTotal API key")
else:
raise RuntimeError(f"Unexpected response from VirusTotal: {res.status_code}")

return None
55 changes: 55 additions & 0 deletions tests/android_androidqf/test_packages.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,11 @@
from pathlib import Path

import pytest
from click.testing import CliRunner

from mvt.android.cli import check_androidqf
from mvt.android.modules.androidqf.aqf_packages import AQFPackages
from mvt.android.modules.androidqf import aqf_packages as aqf_packages_module
from mvt.common.module import run_module

from ..utils import get_android_androidqf, list_files
Expand Down Expand Up @@ -132,3 +135,55 @@ def test_packages_certificate_hash_ioc(self, module, indicators_factory):
possible_detected_app[0].matched_indicator.value
== "c7e56178748be1441370416d4c10e34817ea0c961eb636c8e9d98e0fd79bf730"
)

def test_virustotal_delays_after_missing_result(self, monkeypatch):
lookups = []
sleeps = []

def fake_virustotal_lookup(file_hash):
lookups.append(file_hash)
if file_hash == "missing_hash":
return None
return {
"attributes": {
"last_analysis_stats": {"malicious": 1},
"last_analysis_results": {"engine": {}},
}
}

monkeypatch.setattr(
aqf_packages_module, "virustotal_lookup", fake_virustotal_lookup
)
monkeypatch.setattr(aqf_packages_module.time, "sleep", sleeps.append)

module = AQFPackages(
module_options={"virustotal": True, "virustotal_delay": 16},
results=[
{
"name": "org.example",
"installer": "com.android.vending",
"disabled": False,
"system": False,
"files": [
{"path": "/data/app/missing.apk", "sha256": "missing_hash"},
{"path": "/data/app/found.apk", "sha256": "found_hash"},
],
}
],
)

module.check_indicators()

assert lookups == ["missing_hash", "found_hash"]
assert sleeps == [16]
assert module.results[0]["files"][1]["virustotal"] == "1/1"
assert len(module.alertstore.alerts) == 1


def test_check_androidqf_rejects_negative_virustotal_delay(data_path):
runner = CliRunner()

result = runner.invoke(check_androidqf, ["--delay", "-1", data_path])

assert result.exit_code == 2
assert "Invalid value for '--delay'" in result.output
Loading