diff --git a/CHANGELOG.md b/CHANGELOG.md index f14d7c926..66336e92d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,7 +3,7 @@ ## master (unreleased) ### New Features - +- cli: add the ability to select which specific functions or processes to analyze @yelhamer - webui: explore capa analysis results in a web-based UI online and offline #2224 @s-ff - support analyzing DRAKVUF traces #2143 @yelhamer - IDA extractor: extract names from dynamically resolved APIs stored in renamed global variables #2201 @Ana06 diff --git a/capa/exceptions.py b/capa/exceptions.py index 0c900d72c..882c07181 100644 --- a/capa/exceptions.py +++ b/capa/exceptions.py @@ -23,3 +23,15 @@ class UnsupportedOSError(ValueError): class EmptyReportError(ValueError): pass + + +class InvalidArgument(ValueError): + pass + + +class NonExistantFunctionError(ValueError): + pass + + +class NonExistantProcessError(ValueError): + pass diff --git a/capa/features/extractors/base_extractor.py b/capa/features/extractors/base_extractor.py index 002117fc6..a58016bcc 100644 --- a/capa/features/extractors/base_extractor.py +++ b/capa/features/extractors/base_extractor.py @@ -9,7 +9,9 @@ import abc import hashlib import dataclasses -from typing import Any, Dict, Tuple, Union, Iterator +from copy import copy +from types import MethodType +from typing import Any, Set, Dict, Tuple, Union, Iterator from dataclasses import dataclass # TODO(williballenthin): use typing.TypeAlias directly when Python 3.9 is deprecated @@ -296,6 +298,22 @@ def extract_insn_features( raise NotImplementedError() +def FunctionFilter(extractor: StaticFeatureExtractor, functions: Set) -> StaticFeatureExtractor: + original_get_functions = extractor.get_functions + + def filtered_get_functions(self): + yield from (f for f in original_get_functions() if f.address in functions) + + # we make a copy of the original extractor object and then update its get_functions() method with the decorated filter one. + # this is in order to preserve the original extractor object's get_functions() method, in case it is used elsewhere in the code. + # an example where this is important is in our testfiles where we may use the same extractor object with different tests, + # with some of these tests needing to install a functions filter on the extractor object. + new_extractor = copy(extractor) + new_extractor.get_functions = MethodType(filtered_get_functions, extractor) # type: ignore + + return new_extractor + + @dataclass class ProcessHandle: """ @@ -467,4 +485,20 @@ def get_call_name(self, ph: ProcessHandle, th: ThreadHandle, ch: CallHandle) -> raise NotImplementedError() +def ProcessFilter(extractor: DynamicFeatureExtractor, processes: Set) -> DynamicFeatureExtractor: + original_get_processes = extractor.get_processes + + def filtered_get_processes(self): + yield from (f for f in original_get_processes() if f.address.pid in processes) + + # we make a copy of the original extractor object and then update its get_processes() method with the decorated filter one. + # this is in order to preserve the original extractor object's get_processes() method, in case it is used elsewhere in the code. + # an example where this is important is in our testfiles where we may use the same extractor object with different tests, + # with some of these tests needing to install a processes filter on the extractor object. + new_extractor = copy(extractor) + new_extractor.get_processes = MethodType(filtered_get_processes, extractor) # type: ignore + + return new_extractor + + FeatureExtractor: TypeAlias = Union[StaticFeatureExtractor, DynamicFeatureExtractor] diff --git a/capa/main.py b/capa/main.py index b94a4967a..6f09ccdac 100644 --- a/capa/main.py +++ b/capa/main.py @@ -17,7 +17,7 @@ import textwrap import contextlib from types import TracebackType -from typing import Any, Dict, List, Optional +from typing import Any, Set, Dict, List, Optional, TypedDict from pathlib import Path import colorama @@ -62,6 +62,7 @@ log_unsupported_drakvuf_report_error, ) from capa.exceptions import ( + InvalidArgument, EmptyReportError, UnsupportedOSError, UnsupportedArchError, @@ -83,9 +84,17 @@ FORMAT_FREEZE, FORMAT_RESULT, FORMAT_DRAKVUF, + STATIC_FORMATS, + DYNAMIC_FORMATS, ) from capa.capabilities.common import find_capabilities, has_file_limitation, find_file_capabilities -from capa.features.extractors.base_extractor import FeatureExtractor, StaticFeatureExtractor, DynamicFeatureExtractor +from capa.features.extractors.base_extractor import ( + ProcessFilter, + FunctionFilter, + FeatureExtractor, + StaticFeatureExtractor, + DynamicFeatureExtractor, +) RULES_PATH_DEFAULT_STRING = "(embedded rules)" SIGNATURES_PATH_DEFAULT_STRING = "(embedded signatures)" @@ -106,10 +115,17 @@ E_MISSING_CAPE_DYNAMIC_ANALYSIS = 22 E_EMPTY_REPORT = 23 E_UNSUPPORTED_GHIDRA_EXECUTION_MODE = 24 +E_INVALID_INPUT_FORMAT = 25 +E_INVALID_FEATURE_EXTRACTOR = 26 logger = logging.getLogger("capa") +class FilterConfig(TypedDict, total=False): + processes: Set[int] + functions: Set[int] + + @contextlib.contextmanager def timing(msg: str): t0 = time.time() @@ -276,6 +292,22 @@ def install_common_args(parser, wanted=None): help=f"select backend, {backend_help}", ) + if "restrict-to-functions" in wanted: + parser.add_argument( + "--restrict-to-functions", + type=lambda s: s.replace(" ", "").split(","), + default=[], + help="provide a list of comma-separated function virtual addresses to analyze (static analysis).", + ) + + if "restrict-to-processes" in wanted: + parser.add_argument( + "--restrict-to-processes", + type=lambda s: s.replace(" ", "").split(","), + default=[], + help="provide a list of comma-separated process IDs to analyze (dynamic analysis).", + ) + if "os" in wanted: oses = [ (OS_AUTO, "detect OS automatically - default"), @@ -749,9 +781,10 @@ def get_extractor_from_cli(args, input_format: str, backend: str) -> FeatureExtr os_ = get_os_from_cli(args, backend) sample_path = get_sample_path_from_cli(args, backend) + extractor_filters = get_extractor_filters_from_cli(args, input_format) try: - return capa.loader.get_extractor( + extractor = capa.loader.get_extractor( args.input_file, input_format, os_, @@ -761,6 +794,7 @@ def get_extractor_from_cli(args, input_format: str, backend: str) -> FeatureExtr disable_progress=args.quiet or args.debug, sample_path=sample_path, ) + return apply_extractor_filters(extractor, extractor_filters) except UnsupportedFormatError as e: if input_format == FORMAT_CAPE: log_unsupported_cape_report_error(str(e)) @@ -780,6 +814,38 @@ def get_extractor_from_cli(args, input_format: str, backend: str) -> FeatureExtr raise ShouldExitError(E_CORRUPT_FILE) from e +def get_extractor_filters_from_cli(args, input_format) -> FilterConfig: + if not hasattr(args, "restrict_to_processes") and not hasattr(args, "restrict_to_functions"): + # no processes or function filters were installed in the args + return {} + + if input_format in STATIC_FORMATS: + if args.restrict_to_processes: + raise InvalidArgument("Cannot filter processes with static analysis.") + return {"functions": {int(addr, 0) for addr in args.restrict_to_functions}} + elif input_format in DYNAMIC_FORMATS: + if args.restrict_to_functions: + raise InvalidArgument("Cannot filter functions with dynamic analysis.") + return {"processes": {int(pid, 0) for pid in args.restrict_to_processes}} + else: + raise ShouldExitError(E_INVALID_INPUT_FORMAT) + + +def apply_extractor_filters(extractor: FeatureExtractor, extractor_filters: FilterConfig): + if not any(extractor_filters.values()): + return extractor + + # if the user specified extractor filters, then apply them here + if isinstance(extractor, StaticFeatureExtractor): + assert extractor_filters["functions"] + return FunctionFilter(extractor, extractor_filters["functions"]) + elif isinstance(extractor, DynamicFeatureExtractor): + assert extractor_filters["processes"] + return ProcessFilter(extractor, extractor_filters["processes"]) + else: + raise ShouldExitError(E_INVALID_FEATURE_EXTRACTOR) + + def main(argv: Optional[List[str]] = None): if sys.version_info < (3, 8): raise UnsupportedRuntimeError("This version of capa can only be used with Python 3.8+") @@ -819,7 +885,20 @@ def main(argv: Optional[List[str]] = None): parser = argparse.ArgumentParser( description=desc, epilog=epilog, formatter_class=argparse.RawDescriptionHelpFormatter ) - install_common_args(parser, {"input_file", "format", "backend", "os", "signatures", "rules", "tag"}) + install_common_args( + parser, + { + "input_file", + "format", + "backend", + "os", + "signatures", + "rules", + "tag", + "restrict-to-functions", + "restrict-to-processes", + }, + ) parser.add_argument("-j", "--json", action="store_true", help="emit JSON instead of text") args = parser.parse_args(args=argv) diff --git a/doc/usage.md b/doc/usage.md index 74b163f4a..949e03e14 100644 --- a/doc/usage.md +++ b/doc/usage.md @@ -9,6 +9,22 @@ Use the `-t` option to run rules with the given metadata value (see the rule fie For example, `capa -t william.ballenthin@mandiant.com` runs rules that reference Willi's email address (probably as the author), or `capa -t communication` runs rules with the namespace `communication`. +### only analyze selected functions +Use the `--restrict-to-functions` option to extract capabilities from only a selected set of functions. This is useful for analyzing +large functions and figuring out their capabilities and their address of occurance; for example: PEB access, RC4 encryption, etc. + +To use this, you can copy the virtual addresses from your favorite disassembler and pass them to capa as follows: +`capa sample.exe --restrict-to-functions 0x4019C0,0x401CD0`. If you add the `-v` option then capa will extract the interesting parts of a function for you. + +### only analyze selected processes +Use the `--restrict-to-processes` option to extract capabilities from only a selected set of processes. This is useful for filtering the noise +generated from analyzing non-malicious processes that can be reported by some sandboxes, as well as reduce the execution time +by not analyzing such processes in the first place. + +To use this, you can pick the PIDs of the processes you are interested in from the sandbox-generated process tree (or from the sandbox-reported malware PID) +and pass that to capa as follows: `capa report.log --restrict-to-processes 3888,3214,4299`. If you add the `-v` option then capa will tell you +which threads perform what actions (encrypt/decrypt data, initiate a connection, etc.). + ### IDA Pro plugin: capa explorer Please check out the [capa explorer documentation](/capa/ida/plugin/README.md). @@ -16,4 +32,4 @@ Please check out the [capa explorer documentation](/capa/ida/plugin/README.md). Set the environment variable `CAPA_SAVE_WORKSPACE` to instruct the underlying analysis engine to cache its intermediate results to the file system. For example, vivisect will create `.viv` files. Subsequently, capa may run faster when reprocessing the same input file. -This is particularly useful during rule development as you repeatedly test a rule against a known sample. \ No newline at end of file +This is particularly useful during rule development as you repeatedly test a rule against a known sample. diff --git a/tests/test_capabilities.py b/tests/test_capabilities.py index ddc7f6c3f..5c6de51b4 100644 --- a/tests/test_capabilities.py +++ b/tests/test_capabilities.py @@ -9,6 +9,7 @@ import textwrap import capa.capabilities.common +from capa.features.extractors.base_extractor import FunctionFilter def test_match_across_scopes_file_function(z9324d_extractor): @@ -174,6 +175,37 @@ def test_subscope_bb_rules(z9324d_extractor): assert "test rule" in capabilities +def test_match_specific_functions(z9324d_extractor): + rules = capa.rules.RuleSet( + [ + capa.rules.Rule.from_yaml( + textwrap.dedent( + """ + rule: + meta: + name: receive data + scopes: + static: function + dynamic: call + examples: + - 9324d1a8ae37a36ae560c37448c9705a:0x401CD0 + features: + - or: + - api: recv + """ + ) + ) + ] + ) + extractor = FunctionFilter(z9324d_extractor, {0x4019C0}) + capabilities, meta = capa.capabilities.common.find_capabilities(rules, extractor) + matches = capabilities["receive data"] + # test that we received only one match + assert len(matches) == 1 + # and that this match is from the specified function + assert matches[0][0] == 0x4019C0 + + def test_byte_matching(z9324d_extractor): rules = capa.rules.RuleSet( [