Skip to content

Commit

Permalink
Merge pull request #2208 from mandiant/vmray-extractor
Browse files Browse the repository at this point in the history
dynamic: add extractor for VMRay dynamic sandbox traces
  • Loading branch information
mr-tz committed Aug 27, 2024
2 parents ed5dd38 + fa92cfd commit 3223d3f
Show file tree
Hide file tree
Showing 23 changed files with 1,300 additions and 71 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,15 @@

## master (unreleased)

Unlock powerful malware analysis with capa's new [VMRay sandbox](https://www.vmray.com/) integration! Simply provide a VMRay analysis archive, and capa will automatically extract and match capabilties, streamlining your workflow.

### New Features
- regenerate ruleset cache automatically on source change (only in dev mode) #2133 @s-ff

- add landing page https://mandiant.github.io/capa/ @williballenthin #2310
- add rules website https://mandiant.github.io/capa/rules @DeeyaSingh #2310
- add .justfile @williballenthin #2325
- dynamic: add support for VMRay dynamic sandbox traces #2208 @mike-hunhoff @r-sm2024 @mr-tz

### Breaking Changes

Expand Down
12 changes: 7 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -150,13 +150,15 @@ function @ 0x4011C0
...
```

## analyzing sandbox reports
Additionally, capa also supports analyzing sandbox reports for dynamic capability extraction.
In order to use this, you first submit your sample to one of supported sandboxes for analysis, and then run capa against the generated report file.
capa also supports dynamic capabilities detection for multiple sandboxes including:
* [CAPE](https://github.com/kevoreilly/CAPEv2) (supported report formats: `.json`, `.json_`, `.json.gz`)
* [DRAKVUF](https://github.com/CERT-Polska/drakvuf-sandbox/) (supported report formats: `.log`, `.log.gz`)
* [VMRay](https://www.vmray.com/) (supported report formats: analysis archive `.zip`)

Currently, capa supports the [CAPE sandbox](https://github.com/kevoreilly/CAPEv2) and the [DRAKVUF sandbox](https://github.com/CERT-Polska/drakvuf-sandbox/). In order to use either, simply run capa against the generated file (JSON for CAPE or LOG for DRAKVUF sandbox) and it will automatically detect the sandbox and extract capabilities from it.

Here's an example of running capa against a packed binary, and then running capa against the CAPE report of that binary:
To use this feature, submit your file to a supported sandbox and then download and run capa against the generated report file. This feature enables capa to match capabilities against dynamic and static features that the sandbox captured during execution.

Here's an example of running capa against a packed file, and then running capa against the CAPE report generated for the same packed file:

```yaml
$ capa 05be49819139a3fdcdbddbdefd298398779521f3d68daa25275cc77508e42310.exe
Expand Down
2 changes: 2 additions & 0 deletions capa/features/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -462,6 +462,7 @@ def evaluate(self, features: "capa.engine.FeatureSet", short_circuit=True):
FORMAT_SC64 = "sc64"
FORMAT_CAPE = "cape"
FORMAT_DRAKVUF = "drakvuf"
FORMAT_VMRAY = "vmray"
FORMAT_FREEZE = "freeze"
FORMAT_RESULT = "result"
STATIC_FORMATS = {
Expand All @@ -476,6 +477,7 @@ def evaluate(self, features: "capa.engine.FeatureSet", short_circuit=True):
DYNAMIC_FORMATS = {
FORMAT_CAPE,
FORMAT_DRAKVUF,
FORMAT_VMRAY,
FORMAT_FREEZE,
FORMAT_RESULT,
}
Expand Down
161 changes: 161 additions & 0 deletions capa/features/extractors/vmray/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
# Copyright (C) 2024 Mandiant, Inc. All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at: [package root]/LICENSE.txt
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.
import logging
from typing import Dict, List, Tuple, Optional
from pathlib import Path
from zipfile import ZipFile
from collections import defaultdict

from capa.exceptions import UnsupportedFormatError
from capa.features.extractors.vmray.models import File, Flog, SummaryV2, StaticData, FunctionCall, xml_to_dict

logger = logging.getLogger(__name__)

DEFAULT_ARCHIVE_PASSWORD = b"infected"

SUPPORTED_FLOG_VERSIONS = ("2",)


class VMRayAnalysis:
def __init__(self, zipfile_path: Path):
self.zipfile = ZipFile(zipfile_path, "r")

# summary_v2.json is the entry point to the entire VMRay archive and
# we use its data to find everything else that we need for capa
self.sv2 = SummaryV2.model_validate_json(
self.zipfile.read("logs/summary_v2.json", pwd=DEFAULT_ARCHIVE_PASSWORD)
)
self.file_type: str = self.sv2.analysis_metadata.sample_type

# flog.xml contains all of the call information that VMRay captured during execution
flog_xml = self.zipfile.read("logs/flog.xml", pwd=DEFAULT_ARCHIVE_PASSWORD)
flog_dict = xml_to_dict(flog_xml)
self.flog = Flog.model_validate(flog_dict)

if self.flog.analysis.log_version not in SUPPORTED_FLOG_VERSIONS:
raise UnsupportedFormatError(
"VMRay feature extractor does not support flog version %s" % self.flog.analysis.log_version
)

self.exports: Dict[int, str] = {}
self.imports: Dict[int, Tuple[str, str]] = {}
self.sections: Dict[int, str] = {}
self.process_ids: Dict[int, int] = {}
self.process_threads: Dict[int, List[int]] = defaultdict(list)
self.process_calls: Dict[int, Dict[int, List[FunctionCall]]] = defaultdict(lambda: defaultdict(list))
self.base_address: int

self.sample_file_name: Optional[str] = None
self.sample_file_analysis: Optional[File] = None
self.sample_file_static_data: Optional[StaticData] = None

self._find_sample_file()

# VMRay analysis archives in various shapes and sizes and file type does not definitively tell us what data
# we can expect to find in the archive, so to be explicit we check for the various pieces that we need at
# minimum to run capa analysis
if self.sample_file_name is None or self.sample_file_analysis is None:
raise UnsupportedFormatError("VMRay archive does not contain sample file (file_type: %s)" % self.file_type)

if not self.sample_file_static_data:
raise UnsupportedFormatError("VMRay archive does not contain static data (file_type: %s)" % self.file_type)

if not self.sample_file_static_data.pe and not self.sample_file_static_data.elf:
raise UnsupportedFormatError(
"VMRay feature extractor only supports PE and ELF at this time (file_type: %s)" % self.file_type
)

# VMRay does not store static strings for the sample file so we must use the source file
# stored in the archive
sample_sha256: str = self.sample_file_analysis.hash_values.sha256.lower()
sample_file_path: str = f"internal/static_analyses/{sample_sha256}/objects/files/{sample_sha256}"

logger.debug("file_type: %s, file_path: %s", self.file_type, sample_file_path)

self.sample_file_buf: bytes = self.zipfile.read(sample_file_path, pwd=DEFAULT_ARCHIVE_PASSWORD)

self._compute_base_address()
self._compute_imports()
self._compute_exports()
self._compute_sections()
self._compute_process_ids()
self._compute_process_threads()
self._compute_process_calls()

def _find_sample_file(self):
for file_name, file_analysis in self.sv2.files.items():
if file_analysis.is_sample:
# target the sample submitted for analysis
self.sample_file_name = file_name
self.sample_file_analysis = file_analysis

if file_analysis.ref_static_data:
# like "path": ["static_data","static_data_0"] where "static_data_0" is the summary_v2 static data
# key for the file's static data
self.sample_file_static_data = self.sv2.static_data[file_analysis.ref_static_data.path[1]]

break

def _compute_base_address(self):
assert self.sample_file_static_data is not None
if self.sample_file_static_data.pe:
self.base_address = self.sample_file_static_data.pe.basic_info.image_base

def _compute_exports(self):
assert self.sample_file_static_data is not None
if self.sample_file_static_data.pe:
for export in self.sample_file_static_data.pe.exports:
self.exports[export.address] = export.api.name

def _compute_imports(self):
assert self.sample_file_static_data is not None
if self.sample_file_static_data.pe:
for module in self.sample_file_static_data.pe.imports:
for api in module.apis:
self.imports[api.address] = (module.dll, api.api.name)

def _compute_sections(self):
assert self.sample_file_static_data is not None
if self.sample_file_static_data.pe:
for pefile_section in self.sample_file_static_data.pe.sections:
self.sections[pefile_section.virtual_address] = pefile_section.name
elif self.sample_file_static_data.elf:
for elffile_section in self.sample_file_static_data.elf.sections:
self.sections[elffile_section.header.sh_addr] = elffile_section.header.sh_name

def _compute_process_ids(self):
for process in self.sv2.processes.values():
# we expect VMRay's monitor IDs to be unique, but OS PIDs may be reused
assert process.monitor_id not in self.process_ids.keys()
self.process_ids[process.monitor_id] = process.os_pid

def _compute_process_threads(self):
# logs/flog.xml appears to be the only file that contains thread-related data
# so we use it here to map processes to threads
for function_call in self.flog.analysis.function_calls:
pid: int = self.get_process_os_pid(function_call.process_id) # flog.xml uses process monitor ID, not OS PID
tid: int = function_call.thread_id

assert isinstance(pid, int)
assert isinstance(tid, int)

if tid not in self.process_threads[pid]:
self.process_threads[pid].append(tid)

def _compute_process_calls(self):
for function_call in self.flog.analysis.function_calls:
pid: int = self.get_process_os_pid(function_call.process_id) # flog.xml uses process monitor ID, not OS PID
tid: int = function_call.thread_id

assert isinstance(pid, int)
assert isinstance(tid, int)

self.process_calls[pid][tid].append(function_call)

def get_process_os_pid(self, monitor_id: int) -> int:
return self.process_ids[monitor_id]
53 changes: 53 additions & 0 deletions capa/features/extractors/vmray/call.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
# Copyright (C) 2024 Mandiant, Inc. All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at: [package root]/LICENSE.txt
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.
import logging
from typing import Tuple, Iterator

from capa.features.insn import API, Number
from capa.features.common import String, Feature
from capa.features.address import Address
from capa.features.extractors.vmray.models import PARAM_TYPE_INT, PARAM_TYPE_STR, Param, FunctionCall, hexint
from capa.features.extractors.base_extractor import CallHandle, ThreadHandle, ProcessHandle

logger = logging.getLogger(__name__)


def get_call_param_features(param: Param, ch: CallHandle) -> Iterator[Tuple[Feature, Address]]:
if param.deref is not None:
# pointer types contain a special "deref" member that stores the deref'd value
# so we check for this first and ignore Param.value as this always contains the
# deref'd pointer value
if param.deref.value is not None:
if param.deref.type_ in PARAM_TYPE_INT:
yield Number(hexint(param.deref.value)), ch.address
elif param.deref.type_ in PARAM_TYPE_STR:
yield String(param.deref.value), ch.address
else:
logger.debug("skipping deref param type %s", param.deref.type_)
elif param.value is not None:
if param.type_ in PARAM_TYPE_INT:
yield Number(hexint(param.value)), ch.address


def extract_call_features(ph: ProcessHandle, th: ThreadHandle, ch: CallHandle) -> Iterator[Tuple[Feature, Address]]:
call: FunctionCall = ch.inner

if call.params_in:
for param in call.params_in.params:
yield from get_call_param_features(param, ch)

yield API(call.name), ch.address


def extract_features(ph: ProcessHandle, th: ThreadHandle, ch: CallHandle) -> Iterator[Tuple[Feature, Address]]:
for handler in CALL_HANDLERS:
for feature, addr in handler(ph, th, ch):
yield feature, addr


CALL_HANDLERS = (extract_call_features,)
122 changes: 122 additions & 0 deletions capa/features/extractors/vmray/extractor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
# Copyright (C) 2024 Mandiant, Inc. All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at: [package root]/LICENSE.txt
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.


from typing import List, Tuple, Iterator
from pathlib import Path

import capa.helpers
import capa.features.extractors.vmray.call
import capa.features.extractors.vmray.file
import capa.features.extractors.vmray.global_
from capa.features.common import Feature, Characteristic
from capa.features.address import NO_ADDRESS, Address, ThreadAddress, DynamicCallAddress, AbsoluteVirtualAddress
from capa.features.extractors.vmray import VMRayAnalysis
from capa.features.extractors.vmray.models import PARAM_TYPE_STR, Process, ParamList, FunctionCall
from capa.features.extractors.base_extractor import (
CallHandle,
SampleHashes,
ThreadHandle,
ProcessHandle,
DynamicFeatureExtractor,
)


def get_formatted_params(params: ParamList) -> List[str]:
params_list: List[str] = []

for param in params:
if param.deref and param.deref.value is not None:
deref_value: str = f'"{param.deref.value}"' if param.deref.type_ in PARAM_TYPE_STR else param.deref.value
params_list.append(f"{param.name}: {deref_value}")
else:
value: str = "" if param.value is None else param.value
params_list.append(f"{param.name}: {value}")

return params_list


class VMRayExtractor(DynamicFeatureExtractor):
def __init__(self, analysis: VMRayAnalysis):
assert analysis.sample_file_analysis is not None

super().__init__(
hashes=SampleHashes(
md5=analysis.sample_file_analysis.hash_values.md5.lower(),
sha1=analysis.sample_file_analysis.hash_values.sha1.lower(),
sha256=analysis.sample_file_analysis.hash_values.sha256.lower(),
)
)

self.analysis = analysis

# pre-compute these because we'll yield them at *every* scope.
self.global_features = list(capa.features.extractors.vmray.global_.extract_features(self.analysis))

def get_base_address(self) -> Address:
# value according to the PE header, the actual trace may use a different imagebase
return AbsoluteVirtualAddress(self.analysis.base_address)

def extract_file_features(self) -> Iterator[Tuple[Feature, Address]]:
yield from capa.features.extractors.vmray.file.extract_features(self.analysis)

def extract_global_features(self) -> Iterator[Tuple[Feature, Address]]:
yield from self.global_features

def get_processes(self) -> Iterator[ProcessHandle]:
yield from capa.features.extractors.vmray.file.get_processes(self.analysis)

def extract_process_features(self, ph: ProcessHandle) -> Iterator[Tuple[Feature, Address]]:
# we have not identified process-specific features for VMRay yet
yield from []

def get_process_name(self, ph) -> str:
process: Process = ph.inner
return process.image_name

def get_threads(self, ph: ProcessHandle) -> Iterator[ThreadHandle]:
for thread in self.analysis.process_threads[ph.address.pid]:
address: ThreadAddress = ThreadAddress(process=ph.address, tid=thread)
yield ThreadHandle(address=address, inner={})

def extract_thread_features(self, ph: ProcessHandle, th: ThreadHandle) -> Iterator[Tuple[Feature, Address]]:
if False:
# force this routine to be a generator,
# but we don't actually have any elements to generate.
yield Characteristic("never"), NO_ADDRESS
return

def get_calls(self, ph: ProcessHandle, th: ThreadHandle) -> Iterator[CallHandle]:
for function_call in self.analysis.process_calls[ph.address.pid][th.address.tid]:
addr = DynamicCallAddress(thread=th.address, id=function_call.fncall_id)
yield CallHandle(address=addr, inner=function_call)

def extract_call_features(
self, ph: ProcessHandle, th: ThreadHandle, ch: CallHandle
) -> Iterator[Tuple[Feature, Address]]:
yield from capa.features.extractors.vmray.call.extract_features(ph, th, ch)

def get_call_name(self, ph, th, ch) -> str:
call: FunctionCall = ch.inner
call_formatted: str = call.name

# format input parameters
if call.params_in:
call_formatted += f"({', '.join(get_formatted_params(call.params_in.params))})"
else:
call_formatted += "()"

# format output parameters
if call.params_out:
call_formatted += f" -> {', '.join(get_formatted_params(call.params_out.params))}"

return call_formatted

@classmethod
def from_zipfile(cls, zipfile_path: Path):
return cls(VMRayAnalysis(zipfile_path))
Loading

0 comments on commit 3223d3f

Please sign in to comment.