Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add test_recorder implementation and integration with integ_test_suite #467

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions bundle-workflow/src/run_integ_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from system import console
from system.temporary_directory import TemporaryDirectory
from test_workflow.integ_test.integ_test_suite import IntegTestSuite
from test_workflow.test_recorder import TestRecorder

# TODO: 1. log test related logging into a log file. Output only workflow logs on shell.
# TODO: 2. Move common functions to utils.py
Expand All @@ -37,6 +38,9 @@ def parse_arguments():
parser.add_argument(
"--test-manifest", type=argparse.FileType("r"), help="Test Manifest file."
)
parser.add_argument(
"--test-run-id", type=str, help="Test Run ID"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You might want to sync up with @setiah as he also added this param as part of #456

)
parser.add_argument(
"--keep",
dest="keep",
Expand Down Expand Up @@ -104,6 +108,7 @@ def main():
bundle_manifest = BundleManifest.from_file(args.bundle_manifest)
build_manifest = BuildManifest.from_file(args.build_manifest)
test_manifest = TestManifest.from_file(args.test_manifest)
test_recorder = TestRecorder(args.test_run_id, "integ-test")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You might want to define a standard constants which defines each of the test suites.
We would want all the workflows to use same identifiers.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah that would a part of it. I was getting towards something in common which could be used by all workflows not something which TestRecorder can understand. But this is a good start.

integ_test_config = dict()
for component in test_manifest.components:
if component.integ_test is not None:
Expand All @@ -120,6 +125,7 @@ def main():
integ_test_config[component.name],
bundle_manifest,
work_dir,
test_recorder
)
test_suite.execute()
else:
Expand Down
4 changes: 0 additions & 4 deletions bundle-workflow/src/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,15 @@
# this file be licensed under the Apache-2.0 license or a
# compatible open source license.

import os

from manifests.bundle_manifest import BundleManifest
from system import console
from test_workflow.bwc_test.bwc_test_suite import BwcTestSuite
from test_workflow.test_args import TestArgs
from test_workflow.test_recorder import TestRecorder

args = TestArgs()
console.configure(level=args.logging_level)

manifest = BundleManifest.from_file(args.manifest)
test_recorder = TestRecorder(os.path.dirname(manifest.name))


def bwc_test_suite():
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

from git.git_repository import GitRepository
from paths.script_finder import ScriptFinder
from paths.tree_walker import walk
from system.execute import execute
from test_workflow.integ_test.local_test_cluster import LocalTestCluster

Expand All @@ -20,11 +21,12 @@ class IntegTestSuite:
test_support_matrix.yml
"""

def __init__(self, component, test_config, bundle_manifest, work_dir):
def __init__(self, component, test_config, bundle_manifest, work_dir, test_recorder):
self.component = component
self.bundle_manifest = bundle_manifest
self.work_dir = work_dir
self.test_config = test_config
self.test_recorder = test_recorder
self.script_finder = ScriptFinder()
self.repo = GitRepository(
self.component.repository,
Expand Down Expand Up @@ -73,21 +75,32 @@ def _is_security_enabled(self, config):
return False

def _setup_cluster_and_execute_test_config(self, config):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the convention for private methods? I think we've been doing two __

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not written by me but I can change if @setiah is okay with that or not making changes on his side

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You may skip this. This (and more) will be refactored in a separate PR which adds UTs as well

security = self._is_security_enabled(config)
with LocalTestCluster.create(self.work_dir, self.bundle_manifest, security) as (test_cluster_endpoint, test_cluster_port):
with LocalTestCluster.create(self.work_dir, self.bundle_manifest, self.component.name, config,
self.test_recorder) as (test_cluster_endpoint, test_cluster_port):
logging.info("component name: " + self.component.name)
os.chdir(self.work_dir)
# TODO: (Create issue) Since plugins don't have integtest.sh in version branch, hardcoded it to main
self._execute_integtest_sh(test_cluster_endpoint, test_cluster_port, security)
security = self._is_security_enabled(config)
self._execute_integtest_sh(test_cluster_endpoint, test_cluster_port, security, config)

def _execute_integtest_sh(self, endpoint, port, security):
def _execute_integtest_sh(self, endpoint, port, security, config):
script = self.script_finder.find_integ_test_script(
self.component.name, self.repo.dir
)
if os.path.exists(script):
cmd = f"sh {script} -b {endpoint} -p {port} -s {str(security).lower()}"
(status, stdout, stderr) = execute(cmd, self.repo.dir, True, False)
results_dir = os.path.join(
self.repo.dir, "integ-test", "build", "reports", "tests", "integTest"
)
self.test_recorder.record_test_outcome(self.component.name,
config,
status,
stdout,
stderr,
walk(results_dir)
)
else:
logging.info(
f"{script} does not exist. Skipping integ tests for {self.name}"
f"{script} does not exist. Skipping integ tests for {self.component.name}"
)
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

import requests

from paths.tree_walker import walk
from test_workflow.test_cluster import ClusterCreationException, TestCluster


Expand All @@ -20,19 +21,23 @@ class LocalTestCluster(TestCluster):
Represents an on-box test cluster. This class downloads a bundle (from a BundleManifest) and runs it as a background process.
"""

def __init__(self, work_dir, bundle_manifest, security_enabled):
def __init__(self, work_dir, bundle_manifest, component_name, component_config, test_recorder):
self.manifest = bundle_manifest
self.work_dir = os.path.join(work_dir, "local-test-cluster")
os.makedirs(self.work_dir, exist_ok=True)
self.security_enabled = security_enabled
self.component_config = component_config
self.component_name = component_name
self.test_recorder = test_recorder
self.security = None
self.process = None

def create_cluster(self):
self.download()
self.stdout = open("stdout.txt", "w")
self.stderr = open("stderr.txt", "w")
self.install_dir = f"opensearch-{self.manifest.build.version}"
if not self.security_enabled:
self.security = self._is_security_enabled(self.component_config)
if not self.security:
self.disable_security(self.install_dir)
self.process = subprocess.Popen(
"./opensearch-tar-install.sh",
Expand All @@ -55,21 +60,33 @@ def destroy(self):
logging.info("Local test cluster is not started")
return
self.terminate_process()
logs_files = walk(os.path.join(self.work_dir, self.install_dir, "logs"))
self.test_recorder.record_local_cluster_logs(self.component_name,
self.component_config,
self.local_cluster_stdout,
self.local_cluster_stderr,
logs_files)

def url(self, path=""):
return f'{"https" if self.security_enabled else "http"}://{self.endpoint()}:{self.port()}{path}'
return f'{"https" if self.security else "http"}://{self.endpoint()}:{self.port()}{path}'

def download(self):
logging.info(f"Creating local test cluster in {self.work_dir}")
os.chdir(self.work_dir)
logging.info(f"Downloading bundle from {self.manifest.build.location}")
urllib.request.urlretrieve(self.manifest.build.location, "bundle.tgz")
logging.info(f'Downloaded bundle to {os.path.realpath("bundle.tgz")}')

logging.info("Unpacking")
subprocess.check_call("tar -xzf bundle.tgz", shell=True)
logging.info("Unpacked")

def _is_security_enabled(self, config):
# TODO: Separate this logic in function once we have test-configs defined
if config == "with-security":
return True
else:
return False

def disable_security(self, dir):
subprocess.check_call(
f'echo "plugins.security.disabled: true" >> {os.path.join(dir, "config", "opensearch.yml")}',
Expand Down Expand Up @@ -110,6 +127,10 @@ def terminate_process(self):
raise
finally:
logging.info(f"Process terminated with exit code {self.process.returncode}")
stdout = open(os.path.join(os.path.realpath(self.work_dir), self.stdout.name), "r")
stderr = open(os.path.join(os.path.realpath(self.work_dir), self.stderr.name), "r")
self.local_cluster_stdout = stdout.read()
self.local_cluster_stderr = stderr.read()
self.stdout.close()
self.stderr.close()
self.process = None
125 changes: 112 additions & 13 deletions bundle-workflow/src/test_workflow/test_recorder.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,34 +3,133 @@
# The OpenSearch Contributors require contributions made to
# this file be licensed under the Apache-2.0 license or a
# compatible open source license.

import logging
import os
import shutil
import tempfile

import yaml


class TestRecorder:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make this class a singleton. With the design pattern we have, we'd like to have just one instance of TestRecorder for a given testsuite/jenkins job.
This also prevents other instantiations, avoids passing the test recorder instance in multiple places within the code base.

Suggested change
class TestRecorder:
class TestRecorder:
__instance = None
@staticmethod
def getInstance(test_run_id, test_type, location=None):
""" Static access method. """
if TestRecorder.__instance == None:
TestRecorder(test_run_id, test_type, location)
return TestRecorder.__instance

Python by default doesn't have private constructors, if you'd like to block instantiation outside of the class, you could raise an error in __init__ and use a different private method which could be called by getInstance(...)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also rename getInstance to get_instance

def __init__(self, location):
self.location = location
logging.info(f"TestRecorder storing results in {location}")
ACCEPTED_TEST_TYPES = ["integ-test", "bwc-test", "perf-test"]

def __init__(self, test_run_id, test_type, location=None):
self.test_type = test_type
self.test_run_id = test_run_id

if self.test_type not in self.ACCEPTED_TEST_TYPES:
raise ValueError(f"test_type is invalid. Acceptable test_type are {self.ACCEPTED_TEST_TYPES}")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Include the type passed in.

raise ValueError(f"Invalid test type: {test_type}, expected one of ...")


if location is None:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have to clean up the directory when TestRecorder goes out of scope. Tmp directory gives us a great feature which takes care of it. It's better to clean it up by ourself, we do not want to leave them on the system.

Lets only use temporary directory as the only option for TestRecorder and when it goes out of scope let it destroy the directory and add a keep parameter if caller wants the directory to be preserved.
Anyway we don't have a use case yet for a custom directory, avoids the hassle of cleaning up.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense. Regarding clean up. We cannot clean up the directory unless publisher is done publishing the result. So would it make sense that publisher deletes the directory when it is done publishing? or is there any other way?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets leave the ownership to the test suite runners. They use test recorder, TestRecorder records them and deletes them if it goes out of scope.
Any way TestPublisher(aka TestResults) consumes TestRecorder so it is implicitly taken care of.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sarat suggests that the topmost caller creates a temporary dir and it's passed down to anyone that needs to write files.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah that is happening right now. Will see how to clean_up when the test_recorder object goes out of scope

location = tempfile.TemporaryDirectory()
self.location = location.name
else:
self.location = location
logging.info(f'TestRecorder storing results in {os.path.realpath(self.location)}')

def record_local_cluster_logs(self, component_name, component_test_config, stdout, stderr, log_files):
"""
Record the test cluster logs.
:param component_name: component that is under test right now.
:param component_test_config: component_config under consideration for test eg: with/without-security.
:param stdout: A string containing the stdout stream from the test process.
:param stderr: A string containing the stderr stream from the test process.
:param log_files: A generator that yields tuples containing test cluster log files, in the form (absolute_path, relative_path).
"""

def record_cluster_logs(self, log_files):
base = self.__create_base_folder_structure(component_name, component_test_config)
dest_directory = os.path.join(base, "local_cluster_logs")
os.makedirs(dest_directory, exist_ok=False)
logging.info(
f"Recording local cluster logs for {component_name} with {component_test_config} config in {os.path.realpath(dest_directory)}")
self.__generate_std_files(stdout, stderr, os.path.realpath(dest_directory))
local_cluster_log_files = list(log_files)
for log_file in local_cluster_log_files:
dest_file = os.path.join(dest_directory, os.path.basename(log_file[0]))
shutil.copyfile(log_file[0], dest_file)
Comment on lines +48 to +50
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like this could be stubbed out into another method. This code is repeated in record_remote_cluster_logs and record_test_outcome


def record_remote_cluster_logs(self, component_name, component_test_config, exit_code, stdout, stderr,
Copy link
Member

@saratvemulapalli saratvemulapalli Sep 14, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The flow doesn't look clean to me. Let me know if I didn't understand this correctly.

  • For test suite which uses local cluster, it uses both record_test_outcome and record_local_cluster_logs.
    record_test_outcome generates the test outcome manifest.
  • For test suite which uses remote cluster, it just uses record_remote_cluster_logs . record_remote_cluster_logs generates the test outcome manifest.

If that is the case why don't we merge both remote and local test cluster into one function record_test_cluster which takes in local/remote as a parameter. The only logic different between them is creating a different directory.

Let me know what you think.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree. I'll work on this. One more logic that differs is remote cluster logs won't have log_files but just the location of the logs. Will see how to differentiate that.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

record_local_cluster_logs also copies the local test cluster log files into the directory where remote cluster just gives the location of the logs. That will be an additional change but with right param it should be distinguishable

log_file_location, results):
"""
Record the test cluster logs.
:param results: A generator that yields tuples containing test cluster log files, in the form (absolute_path, relative_path)
:param component_name: component that is under test right now.
:param exit_code: Integer value of the exit code.
:param stdout: A string containing the stdout stream from the test process.
:param stderr: A string containing the stderr stream from the test process.
:param component_test_config: component_config under consideration for test eg: with/without-security.
:param log_file_location: A string that gives log file location.
:param results: A generator that yields tuples containing test results files, in the form (absolute_path, relative_path).
"""
logging.info(f"Recording log files: {list(log_files)}")

def record_integ_test_outcome(
self, component_name, exit_status, stdout, stderr, results
base = self.__create_base_folder_structure(component_name, component_test_config)
dest_directory = os.path.join(base, "remote_cluster_logs")
os.makedirs(dest_directory, exist_ok=False)
logging.info(f"Recording remote cluster logs for {component_name} in {os.path.realpath(dest_directory)}")
self.__generate_std_files(stdout, stderr, os.path.realpath(dest_directory))
exit_status = self.__get_exit_status(exit_code)
component_yml = self.__generate_test_outcome_yml(component_name, component_test_config, exit_status,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

Suggested change
component_yml = self.__generate_test_outcome_yml(component_name, component_test_config, exit_status,
test_outcome_yml = self.__generate_test_outcome_yml(component_name, component_test_config, exit_status,

log_file_location)
shutil.copyfile(component_yml, os.path.join(dest_directory, os.path.basename(component_yml)))
results_dir = list(results)
for result in results_dir:
dest_file = os.path.join(dest_directory, os.path.basename(result[0]))
shutil.copyfile(result[0], dest_file)

def record_test_outcome(
self, component_name, component_test_config, exit_code, stdout, stderr, results
):
"""
Record the outcome of a integration test run.
:param component_name: The name of the component that ran tests.
:param exit_code: One of SUCCESS, FAILED, ERROR. SUCCESS means the tests ran and all of them passed.
FAILED means the tests ran and at least one failed. ERROR means the test suite did not run correctly.
:param component_test_config: component_config under consideration for test eg: with/without-security.
:param exit_code: Integer value of the exit code.
:param stdout: A string containing the stdout stream from the test process.
:param stderr: A string containing the stderr stream from the test process.
:param results: A generator that yields tuples containing test results files, in the form (absolute_path, relative_path).
"""

base = self.__create_base_folder_structure(component_name, component_test_config)
dest_directory = os.path.join(base, "test_outcome")
os.makedirs(dest_directory, exist_ok=False)
logging.info(
f"Recording test results for {component_name}. Exit status: {exit_status}, stdout: {stdout}, stderr: {stderr}, results files: {results}"
)
f"Recording component test results for {component_name} at {os.path.realpath(dest_directory)}")
self.__generate_std_files(stdout, stderr, dest_directory)
results_dir = list(results)
for result in results_dir:
dest_file = os.path.join(dest_directory, os.path.basename(result[0]))
shutil.copyfile(result[0], dest_file)
exit_status = self.__get_exit_status(exit_code)
component_yml = self.__generate_test_outcome_yml(component_name, component_test_config, exit_status, "S3")
shutil.copyfile(component_yml, os.path.join(dest_directory, os.path.basename(component_yml)))

def __create_base_folder_structure(self, component_name, component_test_config):
dest_directory = os.path.join(self.location, "tests", self.test_run_id, self.test_type, str(component_name),
str(component_test_config))
os.makedirs(dest_directory, exist_ok=True)
return os.path.realpath(dest_directory)

def __get_exit_status(self, exit_code):
if exit_code == 0:
return "SUCCESS"
else:
return "FAILED/ERROR"

def __generate_test_outcome_yml(self, component_name, component_test_config, exit_status, log_file_location):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

log_file_location -> output_path

outcome = {
"test_type": self.test_type,
"test_run_id": self.test_run_id,
"component_name": component_name,
"test_config": component_test_config,
"status": exit_status,
"log_file_location": log_file_location
}
with open("%s.yml" % component_name, "w") as file:
yaml.dump(outcome, file)
return os.path.realpath("%s.yml" % component_name)

def __generate_std_files(self, stdout, stderr, output_path):
with open(os.path.join(output_path, "stdout.txt"), "w") as stdout_file:
stdout_file.write(stdout)
with open(os.path.join(output_path, "stderr.txt"), "w") as stderr_file:
stderr_file.write(stderr)