Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: record state after timestamp update #588

Merged
merged 1 commit into from
May 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions src/vunnel/provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ def _update(self) -> None:
else:
urls, count = self.update(last_updated=last_updated)

if count > 0:
if count > 0 or stale:
self.workspace.record_state(
stale=stale,
version=self.version(),
Expand All @@ -193,17 +193,19 @@ def _fetch_or_use_results_archive(self) -> tuple[list[str], int, datetime.dateti
latest_entry = listing_doc.latest_entry(schema_version=self.distribution_version())
if not latest_entry:
raise RuntimeError("no listing entry found")

timestamp = None
if self.runtime_cfg.skip_newer_archive_check or self._has_newer_archive(latest_entry=latest_entry):
self.logger.info("fetching latest listing")
self._prep_workspace_from_listing_entry(entry=latest_entry)
else:
# Update the timestamp of the state to the latest entry's built time
self.logger.info("using existing listing and updating timestamp")
self.workspace.state().timestamp = datetime.datetime.fromisoformat(latest_entry.built)
timestamp = datetime.datetime.fromisoformat(latest_entry.built)

state = self.workspace.state()
return state.urls, state.result_count(self.workspace.path), state.timestamp
if not timestamp:
timestamp = state.timestamp
return state.urls, state.result_count(self.workspace.path), timestamp

def _fetch_listing_document(self) -> distribution.ListingDocument:
url = self.runtime_cfg.import_url(provider_name=self.name())
Expand Down
54 changes: 54 additions & 0 deletions tests/unit/test_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,12 @@ def update(self, *args, **kwargs):
return ["http://localhost:8000/dummy-input-1.json"], 1


class DummyProviderWithZeroCountOnUpdate(DummyProvider):
def _fetch_or_use_results_archive(self):
urls, _ = self.update()
return urls, 0, datetime.datetime(2021, 1, 1, 0, 0, 0)


def get_random_string(length=10):
characters = string.ascii_letters + string.digits
return "".join(random.choice(characters) for _ in range(length))
Expand Down Expand Up @@ -96,6 +102,30 @@ def apply(populate=True, use_dir=None, **kwargs) -> provider.Provider:
return apply


@pytest.fixture()
def dummy_provider_with_zero_count_on_update(tmpdir):
def apply(populate=True, use_dir=None, **kwargs) -> provider.Provider:
if not use_dir:
use_dir = tmpdir + get_random_string()
# create a dummy provider
subject = DummyProviderWithZeroCountOnUpdate(root=use_dir, **kwargs)

if populate:
# update the provider
subject.run()

# check that the input and results are populated
assert os.path.exists(subject.input_file)
existing_results = os.listdir(subject.workspace.results_path)
assert len(existing_results) > 0
else:
subject.workspace.create()

return subject

return apply


def test_clear_existing_state(dummy_provider):
policy = provider.RuntimeConfig(
existing_input=provider.InputStatePolicy.DELETE,
Expand Down Expand Up @@ -813,6 +843,30 @@ def test_has_newer_archive_false(dummy_provider):
assert not subject._has_newer_archive(entry)


def test_timestamp_updated_on_fetch_or_use_results_archive(tmpdir, dummy_provider):
subject = dummy_provider(populate=True)
subject.runtime_cfg.import_results_enabled = True
subject.runtime_cfg.import_results_host = "http://localhost"
subject.runtime_cfg.import_results_path = "{provider_name}/listing.json"
current_state = subject.workspace.state()
# fetch the results archive
urls, count, timestamp = subject._fetch_or_use_results_archive()
assert current_state.timestamp != timestamp
assert timestamp == datetime.datetime(2021, 1, 1, 0, 0, 0)


def test_state_update_on_stale(tmpdir, dummy_provider_with_zero_count_on_update):
subject = dummy_provider_with_zero_count_on_update(populate=True)
current_state = subject.workspace.state()
subject.runtime_cfg.import_results_enabled = True
subject.runtime_cfg.import_results_host = "http://localhost"
subject.runtime_cfg.import_results_path = "{provider_name}/listing.json"
subject._update()
new_state = subject.workspace.state()
assert new_state.timestamp is not None
assert new_state.timestamp == datetime.datetime(2021, 1, 1, 0, 0, 0)


@pytest.mark.parametrize(
"host,path,want",
[
Expand Down
Loading