Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fixup: publish concurrency #1261

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions AUTHORS
Original file line number Diff line number Diff line change
Expand Up @@ -58,3 +58,4 @@ List of contributors, in chronological order:
* Ryan Gonzalez (https://github.com/refi64)
* Paul Cacheux (https://github.com/paulcacheux)
* Nic Waller (https://github.com/sf-nwaller)
* Ramón N.Rodriguez (https://github.com/runitonmetal)
102 changes: 102 additions & 0 deletions system/t12_api/publish.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import inspect
import os
import threading

from api_lib import TASK_SUCCEEDED, APITest

Expand Down Expand Up @@ -282,6 +283,107 @@ def check(self):
self.check_not_exists("public/" + prefix + "dists/")



class PublishConcurrentUpdateAPITestRepo(APITest):
"""
PUT /publish/:prefix/:distribution (local repos), DELETE /publish/:prefix/:distribution
"""
fixtureGpg = True

def check(self):
repo_name = self.random_name()
self.check_equal(self.post(
"/api/repos", json={"Name": repo_name, "DefaultDistribution": "wheezy"}).status_code, 201)

d = self.random_name()
self.check_equal(
self.upload("/api/files/" + d,
"pyspi_0.6.1-1.3.dsc",
"pyspi_0.6.1-1.3.diff.gz", "pyspi_0.6.1.orig.tar.gz",
"pyspi-0.6.1-1.3.stripped.dsc").status_code, 200)
self.check_equal(self.post_task("/api/repos/" + repo_name + "/file/" + d).json()['State'], TASK_SUCCEEDED)

prefix = self.random_name()
resp = self.post_task(
"/api/publish/" + prefix,
json={
"Architectures": ["i386", "source"],
"SourceKind": "local",
"Sources": [{"Name": repo_name}],
"Signing": DefaultSigningOptions,
}
)

self.check_equal(resp.json()['State'], TASK_SUCCEEDED)

self.check_not_exists(
"public/" + prefix + "/pool/main/b/boost-defaults/libboost-program-options-dev_1.49.0.1_i386.deb")
self.check_exists("public/" + prefix +
"/pool/main/p/pyspi/pyspi-0.6.1-1.3.stripped.dsc")

d = self.random_name()
self.check_equal(self.upload("/api/files/" + d,
"libboost-program-options-dev_1.49.0.1_i386.deb").status_code, 200)
self.check_equal(self.post_task("/api/repos/" + repo_name + "/file/" + d).json()['State'], TASK_SUCCEEDED)

self.check_equal(self.delete_task("/api/repos/" + repo_name + "/packages/",
json={"PackageRefs": ['Psource pyspi 0.6.1-1.4 f8f1daa806004e89']}).json()['State'], TASK_SUCCEEDED)

def _do_update(result, index):
resp = self.put_task(
"/api/publish/" + prefix + "/wheezy",
json={
"AcquireByHash": True,
"Signing": DefaultSigningOptions,
}
)
try:
self.check_equal(resp.json()['State'], TASK_SUCCEEDED)
except BaseException as e:
result[index] = e

n_workers = 10
worker_results = [None] * n_workers
tasks = [threading.Thread(target=_do_update, args=(worker_results, i,)) for i in range(n_workers)]
[task.start() for task in tasks]
[task.join() for task in tasks]
for result in worker_results:
if isinstance(result, BaseException):
raise result

repo_expected = {
'AcquireByHash': True,
'Architectures': ['i386', 'source'],
'Codename': '',
'Distribution': 'wheezy',
'Label': '',
'Origin': '',
'NotAutomatic': '',
'ButAutomaticUpgrades': '',
'Path': prefix + '/' + 'wheezy',
'Prefix': prefix,
'SkipContents': False,
'SourceKind': 'local',
'Sources': [{'Component': 'main', 'Name': repo_name}],
'Storage': '',
'Suite': ''}

all_repos = self.get("/api/publish")
self.check_equal(all_repos.status_code, 200)
self.check_in(repo_expected, all_repos.json())

self.check_exists("public/" + prefix +
"/dists/wheezy/main/binary-i386/by-hash")

self.check_exists(
"public/" + prefix + "/pool/main/b/boost-defaults/libboost-program-options-dev_1.49.0.1_i386.deb")
self.check_not_exists(
"public/" + prefix + "/pool/main/p/pyspi/pyspi-0.6.1-1.3.stripped.dsc")

self.check_equal(self.delete_task("/api/publish/" + prefix + "/wheezy").json()['State'], TASK_SUCCEEDED)
self.check_not_exists("public/" + prefix + "dists/")


class PublishUpdateSkipCleanupAPITestRepo(APITest):
"""
PUT /publish/:prefix/:distribution (local repos), DELETE /publish/:prefix/:distribution
Expand Down
15 changes: 8 additions & 7 deletions task/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,19 +117,20 @@
return task.processReturnValue, nil
}

// RunTaskInBackground creates task and runs it in background. It won't be run and an error
// returned if there are running tasks which are using needed resources already.
// RunTaskInBackground creates task and runs it in background. This will block until the necessary resources
// become available.
func (list *List) RunTaskInBackground(name string, resources []string, process Process) (Task, *ResourceConflictError) {
list.Lock()
defer list.Unlock()

tasks := list.usedResources.UsedBy(resources)
if len(tasks) > 0 {
conflictError := &ResourceConflictError{
Tasks: tasks,
Message: "Needed resources are used by other tasks.",
for len(tasks) > 0 {
for _, task := range(tasks) {

Check failure on line 128 in task/list.go

View workflow job for this annotation

GitHub Actions / lint

File is not `gofmt`-ed with `-s` (gofmt)
list.Unlock()
list.wgTasks[task.ID].Wait()
list.Lock()
}
return Task{}, conflictError
tasks = list.usedResources.UsedBy(resources)
}

list.idCounter++
Expand Down
Loading