Skip to content

Commit

Permalink
Merge pull request #83 from ipa-lab/development
Browse files Browse the repository at this point in the history
add more web-api-testing changes, fix unittest
  • Loading branch information
andreashappe committed Aug 28, 2024
2 parents 5b65518 + eaf5fd8 commit 550a517
Show file tree
Hide file tree
Showing 60 changed files with 2,354 additions and 742 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,5 @@ dist/
.coverage
src/hackingBuddyGPT/usecases/web_api_testing/openapi_spec/
src/hackingBuddyGPT/usecases/web_api_testing/converted_files/
/src/hackingBuddyGPT/usecases/web_api_testing/utils/openapi_spec/
/src/hackingBuddyGPT/usecases/web_api_testing/documentation/openapi_spec/
/src/hackingBuddyGPT/usecases/web_api_testing/documentation/reports/
9 changes: 4 additions & 5 deletions src/hackingBuddyGPT/cli/wintermute.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,12 @@ def main():
parser = argparse.ArgumentParser()
subparser = parser.add_subparsers(required=True)
for name, use_case in use_cases.items():
subb = subparser.add_parser(
use_case.build_parser(subparser.add_parser(
name=use_case.name,
help=use_case.description
)
use_case.build_parser(subb)
x= sys.argv[1:]
parsed = parser.parse_args(x)
))

parsed = parser.parse_args(sys.argv[1:])
instance = parsed.use_case(parsed)
instance.init()
instance.run()
Expand Down
2 changes: 1 addition & 1 deletion src/hackingBuddyGPT/usecases/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from .privesc import *
from .minimal import *
from .examples import *
from .web import *
from .web_api_testing import *
4 changes: 4 additions & 0 deletions src/hackingBuddyGPT/usecases/examples/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from .agent import ExPrivEscLinux
from .agent_with_state import ExPrivEscLinuxTemplated
from .hintfile import ExPrivEscLinuxHintFileUseCase
from .lse import ExPrivEscLinuxLSEUseCase
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
template_next_cmd = Template(filename=str(template_dir / "next_cmd.txt"))


class MinimalLinuxPrivesc(Agent):
class ExPrivEscLinux(Agent):

conn: SSHConnection = None
_sliding_history: SlidingCliHistory = None
Expand Down Expand Up @@ -49,5 +49,5 @@ def perform_round(self, turn: int) -> bool:


@use_case("Showcase Minimal Linux Priv-Escalation")
class MinimalLinuxPrivescUseCase(AutonomousAgentUseCase[MinimalLinuxPrivesc]):
class ExPrivEscLinuxUseCase(AutonomousAgentUseCase[ExPrivEscLinux]):
pass
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@


@dataclass
class MinimalLinuxTemplatedPrivescState(AgentWorldview):
class ExPrivEscLinuxTemplatedState(AgentWorldview):
sliding_history: SlidingCliHistory
max_history_size: int = 0
conn: SSHConnection = None
Expand All @@ -31,7 +31,7 @@ def to_template(self) -> dict[str, Any]:
}


class MinimalLinuxTemplatedPrivesc(TemplatedAgent):
class ExPrivEscLinuxTemplated(TemplatedAgent):

conn: SSHConnection = None

Expand All @@ -47,9 +47,9 @@ def init(self):

# setup state
max_history_size = self.llm.context_size - llm_util.SAFETY_MARGIN - self._template_size
self.set_initial_state(MinimalLinuxTemplatedPrivescState(self.conn, self.llm, max_history_size))
self.set_initial_state(ExPrivEscLinuxTemplatedState(self.conn, self.llm, max_history_size))


@use_case("Showcase Minimal Linux Priv-Escalation")
class MinimalLinuxTemplatedPrivescUseCase(AutonomousAgentUseCase[MinimalLinuxTemplatedPrivesc]):
class ExPrivEscLinuxTemplatedUseCase(AutonomousAgentUseCase[ExPrivEscLinuxTemplated]):
pass
26 changes: 26 additions & 0 deletions src/hackingBuddyGPT/usecases/examples/hintfile.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import json

from hackingBuddyGPT.usecases.privesc.linux import LinuxPrivesc
from hackingBuddyGPT.usecases.base import use_case, AutonomousAgentUseCase

@use_case("Linux Privilege Escalation using hints from a hint file initial guidance")
class ExPrivEscLinuxHintFileUseCase(AutonomousAgentUseCase[LinuxPrivesc]):
hints: str = None

def init(self):
super().init()
self.agent.hint = self.read_hint()

# simple helper that reads the hints file and returns the hint
# for the current machine (test-case)
def read_hint(self):
try:
with open(self.hints, "r") as hint_file:
hints = json.load(hint_file)
if self.agent.conn.hostname in hints:
return hints[self.agent.conn.hostname]
except FileNotFoundError:
self._log.console.print("[yellow]Hint file not found")
except Exception as e:
self._log.console.print("[yellow]Hint file could not loaded:", str(e))
return ""
112 changes: 112 additions & 0 deletions src/hackingBuddyGPT/usecases/examples/lse.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
import pathlib
from mako.template import Template

from hackingBuddyGPT.capabilities import SSHRunCommand
from hackingBuddyGPT.utils.openai.openai_llm import OpenAIConnection
from hackingBuddyGPT.usecases.privesc.linux import LinuxPrivescUseCase, LinuxPrivesc
from hackingBuddyGPT.utils import SSHConnection
from hackingBuddyGPT.usecases.base import UseCase, use_case


template_dir = pathlib.Path(__file__).parent
template_lse = Template(filename=str(template_dir / "get_hint_from_lse.txt"))


@use_case("Linux Privilege Escalation using lse.sh for initial guidance")
class ExPrivEscLinuxLSEUseCase(UseCase):
conn: SSHConnection = None
max_turns: int = 20
enable_explanation: bool = False
enable_update_state: bool = False
disable_history: bool = False
llm: OpenAIConnection = None

_got_root: bool = False

# use either an use-case or an agent to perform the privesc
use_use_case: bool = False

def init(self):
super().init()

# simple helper that uses lse.sh to get hints from the system
def call_lse_against_host(self):
self._log.console.print("[green]performing initial enumeration with lse.sh")

run_cmd = "wget -q 'https://github.com/diego-treitos/linux-smart-enumeration/releases/latest/download/lse.sh' -O lse.sh;chmod 700 lse.sh; ./lse.sh -c -i -l 0 | grep -v 'nope$' | grep -v 'skip$'"

result, _ = SSHRunCommand(conn=self.conn, timeout=120)(run_cmd)

self.console.print("[yellow]got the output: " + result)
cmd = self.llm.get_response(template_lse, lse_output=result, number=3)
self.console.print("[yellow]got the cmd: " + cmd.result)

return [x for x in cmd.result.splitlines() if x.strip()]

def get_name(self) -> str:
return self.__class__.__name__

def run(self):
# get the hints through running LSE on the target system
hints = self.call_lse_against_host()
turns_per_hint = int(self.max_turns / len(hints))

# now try to escalate privileges using the hints
for hint in hints:

if self.use_use_case:
self.console.print("[yellow]Calling a use-case to perform the privilege escalation")
result = self.run_using_usecases(hint, turns_per_hint)
else:
self.console.print("[yellow]Calling an agent to perform the privilege escalation")
result = self.run_using_agent(hint, turns_per_hint)

if result is True:
self.console.print("[green]Got root!")
return True

def run_using_usecases(self, hint, turns_per_hint):
# TODO: init usecase
linux_privesc = LinuxPrivescUseCase(
agent = LinuxPrivesc(
conn = self.conn,
enable_explanation = self.enable_explanation,
enable_update_state = self.enable_update_state,
disable_history = self.disable_history,
llm = self.llm,
hint = hint
),
max_turns = turns_per_hint,
log_db = self.log_db,
console = self.console
)
linux_privesc.init()
return linux_privesc.run()

def run_using_agent(self, hint, turns_per_hint):
# init agent
agent = LinuxPrivesc(
conn = self.conn,
llm = self.llm,
hint = hint,
enable_explanation = self.enable_explanation,
enable_update_state = self.enable_update_state,
disable_history = self.disable_history
)
agent._log = self._log
agent.init()

# perform the privilege escalation
agent.before_run()
turn = 1
got_root = False
while turn <= turns_per_hint and not got_root:
self._log.console.log(f"[yellow]Starting turn {turn} of {turns_per_hint}")

if agent.perform_round(turn) is True:
got_root = True
turn += 1

# cleanup and finish
agent.after_run()
return got_root
2 changes: 0 additions & 2 deletions src/hackingBuddyGPT/usecases/minimal/__init__.py

This file was deleted.

1 change: 0 additions & 1 deletion src/hackingBuddyGPT/usecases/privesc/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
template_next_cmd = Template(filename=str(template_dir / "query_next_command.txt"))
template_analyze = Template(filename=str(template_dir / "analyze_cmd.txt"))
template_state = Template(filename=str(template_dir / "update_state.txt"))
template_lse = Template(filename=str(template_dir / "get_hint_from_lse.txt"))


@dataclass
Expand Down
131 changes: 1 addition & 130 deletions src/hackingBuddyGPT/usecases/privesc/linux.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,7 @@
import json
import pathlib
from mako.template import Template

from hackingBuddyGPT.capabilities import SSHRunCommand, SSHTestCredential
from hackingBuddyGPT.utils.openai.openai_llm import OpenAIConnection
from .common import Privesc
from hackingBuddyGPT.utils import SSHConnection
from hackingBuddyGPT.usecases.base import UseCase, use_case, AutonomousAgentUseCase

template_dir = pathlib.Path(__file__).parent / "templates"
template_lse = Template(filename=str(template_dir / "get_hint_from_lse.txt"))
from hackingBuddyGPT.usecases.base import use_case, AutonomousAgentUseCase


class LinuxPrivesc(Privesc):
Expand All @@ -25,124 +17,3 @@ def init(self):
@use_case("Linux Privilege Escalation")
class LinuxPrivescUseCase(AutonomousAgentUseCase[LinuxPrivesc]):
pass


@use_case("Linux Privilege Escalation using hints from a hint file initial guidance")
class LinuxPrivescWithHintFileUseCase(AutonomousAgentUseCase[LinuxPrivesc]):
hints: str = None

def init(self):
super().init()
self.agent.hint = self.read_hint()

# simple helper that reads the hints file and returns the hint
# for the current machine (test-case)
def read_hint(self):
try:
with open(self.hints, "r") as hint_file:
hints = json.load(hint_file)
if self.agent.conn.hostname in hints:
return hints[self.agent.conn.hostname]
except FileNotFoundError:
self._log.console.print("[yellow]Hint file not found")
except Exception as e:
self._log.console.print("[yellow]Hint file could not loaded:", str(e))
return ""


@use_case("Linux Privilege Escalation using lse.sh for initial guidance")
class LinuxPrivescWithLSEUseCase(UseCase):
conn: SSHConnection = None
max_turns: int = 20
enable_explanation: bool = False
enable_update_state: bool = False
disable_history: bool = False
llm: OpenAIConnection = None

_got_root: bool = False

# use either an use-case or an agent to perform the privesc
use_use_case: bool = False

def init(self):
super().init()

# simple helper that uses lse.sh to get hints from the system
def call_lse_against_host(self):
self._log.console.print("[green]performing initial enumeration with lse.sh")

run_cmd = "wget -q 'https://github.com/diego-treitos/linux-smart-enumeration/releases/latest/download/lse.sh' -O lse.sh;chmod 700 lse.sh; ./lse.sh -c -i -l 0 | grep -v 'nope$' | grep -v 'skip$'"

result, _ = SSHRunCommand(conn=self.conn, timeout=120)(run_cmd)

self.console.print("[yellow]got the output: " + result)
cmd = self.llm.get_response(template_lse, lse_output=result, number=3)
self.console.print("[yellow]got the cmd: " + cmd.result)

return [x for x in cmd.result.splitlines() if x.strip()]

def get_name(self) -> str:
return self.__class__.__name__

def run(self):
# get the hints through running LSE on the target system
hints = self.call_lse_against_host()
turns_per_hint = int(self.max_turns / len(hints))

# now try to escalate privileges using the hints
for hint in hints:

if self._use_use_case:
result = self.run_using_usecases(hint, turns_per_hint)
else:
result = self.run_using_agent(hint, turns_per_hint)

if result is True:
self.console.print("[green]Got root!")
return True

def run_using_usecases(self, hint, turns_per_hint):
# TODO: init usecase
linux_privesc = LinuxPrivescUseCase(
agent = LinuxPrivesc(
conn = self.conn,
enable_explanation = self.enable_explanation,
enable_update_state = self.enable_update_state,
disable_history = self.disable_history,
llm = self.llm,
hint = hint
),
max_turns = turns_per_hint,
log_db = self.log_db,
console = self.console
)
linux_privesc.init()
return linux_privesc.run()

def run_using_agent(self, hint, turns_per_hint):
# init agent
agent = LinuxPrivesc(
conn = self.conn,
llm = self.llm,
hint = hint,
enable_explanation = self.enable_explanation,
enable_update_state = self.enable_update_state,
disable_history = self.disable_history
)
agent._log = self._log
agent.init()

# perform the privilege escalation
agent.before_run()
turn = 1
got_root = False
while turn <= turns_per_hint and not got_root:
self._log.console.log(f"[yellow]Starting turn {turn} of {turns_per_hint}")

if agent.perform_round(turn) is True:
got_root = True
turn += 1

# cleanup and finish
agent.after_run()
return got_root
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
from .openapi_specification_handler import OpenAPISpecificationHandler
from .report_handler import ReportHandler
Loading

0 comments on commit 550a517

Please sign in to comment.