Skip to content

Commit

Permalink
feat: auto-generate ruleset cache on source change (#2133)
Browse files Browse the repository at this point in the history
* feat: auto-generate ruleset cache on source change

---------

Co-authored-by: mr-tz <moritz.raabe@mandiant.com>
Co-authored-by: Moritz <mr-tz@users.noreply.github.com>
Co-authored-by: Willi Ballenthin <wballenthin@google.com>
  • Loading branch information
4 people committed Aug 26, 2024
1 parent b4f60ec commit ed5dd38
Show file tree
Hide file tree
Showing 6 changed files with 124 additions and 4 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
## master (unreleased)

### New Features
- regenerate ruleset cache automatically on source change (only in dev mode) #2133 @s-ff

- add landing page https://mandiant.github.io/capa/ @williballenthin #2310
- add rules website https://mandiant.github.io/capa/rules @DeeyaSingh #2310
Expand Down
61 changes: 61 additions & 0 deletions capa/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.
import os
import sys
import gzip
import inspect
Expand All @@ -13,6 +14,7 @@
import importlib.util
from typing import Dict, Union, BinaryIO, Iterator, NoReturn
from pathlib import Path
from datetime import datetime

import tqdm
import msgspec.json
Expand Down Expand Up @@ -291,3 +293,62 @@ def is_running_standalone() -> bool:
# so we keep this in a common area.
# generally, other library code should not use this function.
return hasattr(sys, "frozen") and hasattr(sys, "_MEIPASS")


def is_dev_environment() -> bool:
if is_running_standalone():
return False

if "site-packages" in __file__:
# running from a site-packages installation
return False

capa_root = Path(__file__).resolve().parent.parent
git_dir = capa_root / ".git"

if not git_dir.is_dir():
# .git directory doesn't exist
return False

return True


def is_cache_newer_than_rule_code(cache_dir: Path) -> bool:
"""
basic check to prevent issues if the rules cache is older than relevant rules code
args:
cache_dir: the cache directory containing cache files
returns:
True if latest cache file is newer than relevant rule cache code
"""

# retrieve the latest modified cache file
cache_files = list(cache_dir.glob("*.cache"))
if not cache_files:
logger.debug("no rule cache files found")
return False

latest_cache_file = max(cache_files, key=os.path.getmtime)
cache_timestamp = os.path.getmtime(latest_cache_file)

# these are the relevant rules code files that could conflict with using an outdated cache
latest_rule_code_file = max([Path("capa/rules/__init__.py"), Path("capa/rules/cache.py")], key=os.path.getmtime)
rule_code_timestamp = os.path.getmtime(latest_rule_code_file)

if rule_code_timestamp > cache_timestamp:

def ts_to_str(ts):
return datetime.fromtimestamp(ts).strftime("%Y-%m-%d %H:%M:%S")

logger.warning(
"latest rule code file %s (%s) is newer than the latest rule cache file %s (%s)",
latest_rule_code_file,
ts_to_str(rule_code_timestamp),
latest_cache_file,
ts_to_str(cache_timestamp),
)
return False

return True
11 changes: 10 additions & 1 deletion capa/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -612,13 +612,22 @@ def get_rules_from_cli(args) -> RuleSet:
raises:
ShouldExitError: if the program is invoked incorrectly and should exit.
"""
enable_cache: bool = True
try:
if capa.helpers.is_running_standalone() and args.is_default_rules:
cache_dir = get_default_root() / "cache"
else:
cache_dir = capa.rules.cache.get_default_cache_directory()

rules = capa.rules.get_rules(args.rules, cache_dir=cache_dir)
if capa.helpers.is_dev_environment():
# using the rules cache during development may result in unexpected errors, see #1898
enable_cache = capa.helpers.is_cache_newer_than_rule_code(cache_dir)
if not enable_cache:
logger.debug("not using cache. delete the cache file manually to use rule caching again")
else:
logger.debug("cache can be used, no potentially outdated cache files found")

rules = capa.rules.get_rules(args.rules, cache_dir=cache_dir, enable_cache=enable_cache)
except (IOError, capa.rules.InvalidRule, capa.rules.InvalidRuleSet) as e:
logger.error("%s", str(e))
logger.error(
Expand Down
9 changes: 6 additions & 3 deletions capa/rules/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2130,12 +2130,14 @@ def get_rules(
rule_paths: List[RulePath],
cache_dir=None,
on_load_rule: Callable[[RulePath, int, int], None] = on_load_rule_default,
enable_cache: bool = True,
) -> RuleSet:
"""
args:
rule_paths: list of paths to rules files or directories containing rules files
cache_dir: directory to use for caching rules, or will use the default detected cache directory if None
on_load_rule: callback to invoke before a rule is loaded, use for progress or cancellation
enable_cache: enable loading of a cached ruleset (default: True)
"""
if cache_dir is None:
cache_dir = capa.rules.cache.get_default_cache_directory()
Expand All @@ -2147,9 +2149,10 @@ def get_rules(
# rule_file_paths[i] corresponds to rule_contents[i].
rule_contents = [file_path.read_bytes() for file_path in rule_file_paths]

ruleset = capa.rules.cache.load_cached_ruleset(cache_dir, rule_contents)
if ruleset is not None:
return ruleset
if enable_cache:
ruleset = capa.rules.cache.load_cached_ruleset(cache_dir, rule_contents)
if ruleset is not None:
return ruleset

rules: List[Rule] = []

Expand Down
6 changes: 6 additions & 0 deletions tests/test_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

import codecs

import capa.helpers
from capa.features.extractors import helpers


Expand Down Expand Up @@ -64,3 +65,8 @@ def test_generate_symbols():
symbols = list(helpers.generate_symbols("ws2_32", "#1", include_dll=False))
assert len(symbols) == 1
assert "ws2_32.#1" in symbols


def test_is_dev_environment():
# testing environment should be a dev environment
assert capa.helpers.is_dev_environment() is True
40 changes: 40 additions & 0 deletions tests/test_rule_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,13 @@
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.

import os
import textwrap
import contextlib
from pathlib import Path

import capa.rules
import capa.helpers
import capa.rules.cache

R1 = capa.rules.Rule.from_yaml(
Expand Down Expand Up @@ -113,3 +116,40 @@ def test_ruleset_cache_invalid():
assert capa.rules.cache.load_cached_ruleset(cache_dir, content) is None
# the invalid cache should be deleted
assert not path.exists()


def test_rule_cache_dev_environment():
# generate rules cache
rs = capa.rules.RuleSet([R2])
content = capa.rules.cache.get_ruleset_content(rs)
id = capa.rules.cache.compute_cache_identifier(content)
cache_dir = capa.rules.cache.get_default_cache_directory()
cache_path = capa.rules.cache.get_cache_path(cache_dir, id)

# clear existing cache files
for f in cache_dir.glob("*.cache"):
f.unlink()

capa.rules.cache.cache_ruleset(cache_dir, rs)
assert cache_path.exists()

assert capa.helpers.is_cache_newer_than_rule_code(cache_dir) is True

capa_root = Path(__file__).resolve().parent.parent
cachepy = capa_root / "capa" / "rules" / "cache.py" # alternative: capa_root / "capa" / "rules" / "__init__.py"

# set cache's last modified time prior to code file's modified time
os.utime(cache_path, (cache_path.stat().st_atime, cachepy.stat().st_mtime - 600000))

# debug
def ts_to_str(ts):
from datetime import datetime

return datetime.fromtimestamp(ts).strftime("%Y-%m-%d %H:%M:%S")

for g in ((capa_root / "capa" / "rules").glob("*.py"), cache_dir.glob("*.cache")):
for p in g:
print(p, "\t", ts_to_str(p.stat().st_mtime)) # noqa: T201

assert capa.helpers.is_dev_environment() is True
assert capa.helpers.is_cache_newer_than_rule_code(cache_dir) is False

0 comments on commit ed5dd38

Please sign in to comment.