Skip to content

Commit

Permalink
add cli arg to limit download parallelism
Browse files Browse the repository at this point in the history
  • Loading branch information
cosmicexplorer committed Aug 19, 2024
1 parent 90703cc commit af99bae
Show file tree
Hide file tree
Showing 9 changed files with 43 additions and 4 deletions.
2 changes: 1 addition & 1 deletion news/12923.feature.rst
Original file line number Diff line number Diff line change
@@ -1 +1 @@
Download concrete dists for metadata-only resolves in parallel using worker threads.
Download concrete dists for metadata-only resolves in parallel using worker threads. Add ``--batch-download-parallelism`` CLI flag to limit parallelism.
18 changes: 18 additions & 0 deletions src/pip/_internal/cli/cmdoptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,24 @@ class PipOption(Option):
help="Specify whether the progress bar should be used [on, off, raw] (default: on)",
)


batch_download_parallelism: Callable[..., Option] = partial(
Option,
"--batch-download-parallelism",
dest="batch_download_parallelism",
type="int",
default=1,
help=(
"Maximum parallelism employed for batch downloading of metadata-only dists"
" (default %default parallel requests)."
" Note that more than 10 downloads may overflow the requests connection pool,"
" which may affect performance."
" Note also that commands such as 'install --dry-run' should avoid downloads"
" entirely, and so will not be affected by this option."
),
)


log: Callable[..., Option] = partial(
PipOption,
"--log",
Expand Down
2 changes: 2 additions & 0 deletions src/pip/_internal/cli/req_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ def make_requirement_preparer(
use_user_site: bool,
download_dir: Optional[str] = None,
verbosity: int = 0,
batch_download_parallelism: Optional[int] = None,
) -> RequirementPreparer:
"""
Create a RequirementPreparer instance for the given parameters.
Expand Down Expand Up @@ -141,6 +142,7 @@ def make_requirement_preparer(
use_user_site=use_user_site,
lazy_wheel=lazy_wheel,
verbosity=verbosity,
batch_download_parallelism=batch_download_parallelism,
legacy_resolver=legacy_resolver,
)

Expand Down
2 changes: 2 additions & 0 deletions src/pip/_internal/commands/download.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ def add_options(self) -> None:
self.cmd_opts.add_option(cmdoptions.pre())
self.cmd_opts.add_option(cmdoptions.require_hashes())
self.cmd_opts.add_option(cmdoptions.progress_bar())
self.cmd_opts.add_option(cmdoptions.batch_download_parallelism())
self.cmd_opts.add_option(cmdoptions.no_build_isolation())
self.cmd_opts.add_option(cmdoptions.use_pep517())
self.cmd_opts.add_option(cmdoptions.no_use_pep517())
Expand Down Expand Up @@ -116,6 +117,7 @@ def run(self, options: Values, args: List[str]) -> int:
download_dir=options.download_dir,
use_user_site=False,
verbosity=self.verbosity,
batch_download_parallelism=options.batch_download_parallelism,
)

resolver = self.make_resolver(
Expand Down
2 changes: 2 additions & 0 deletions src/pip/_internal/commands/install.py
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,7 @@ def add_options(self) -> None:
self.cmd_opts.add_option(cmdoptions.prefer_binary())
self.cmd_opts.add_option(cmdoptions.require_hashes())
self.cmd_opts.add_option(cmdoptions.progress_bar())
self.cmd_opts.add_option(cmdoptions.batch_download_parallelism())
self.cmd_opts.add_option(cmdoptions.root_user_action())

index_opts = cmdoptions.make_option_group(
Expand Down Expand Up @@ -359,6 +360,7 @@ def run(self, options: Values, args: List[str]) -> int:
finder=finder,
use_user_site=options.use_user_site,
verbosity=self.verbosity,
batch_download_parallelism=options.batch_download_parallelism,
)
resolver = self.make_resolver(
preparer=preparer,
Expand Down
2 changes: 2 additions & 0 deletions src/pip/_internal/commands/wheel.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ def add_options(self) -> None:
self.cmd_opts.add_option(cmdoptions.ignore_requires_python())
self.cmd_opts.add_option(cmdoptions.no_deps())
self.cmd_opts.add_option(cmdoptions.progress_bar())
self.cmd_opts.add_option(cmdoptions.batch_download_parallelism())

self.cmd_opts.add_option(
"--no-verify",
Expand Down Expand Up @@ -131,6 +132,7 @@ def run(self, options: Values, args: List[str]) -> int:
download_dir=options.wheel_dir,
use_user_site=False,
verbosity=self.verbosity,
batch_download_parallelism=options.batch_download_parallelism,
)

resolver = self.make_resolver(
Expand Down
13 changes: 11 additions & 2 deletions src/pip/_internal/network/download.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from pip._vendor.requests.models import Response

from pip._internal.cli.progress_bars import get_download_progress_renderer
from pip._internal.exceptions import NetworkConnectionError
from pip._internal.exceptions import CommandError, NetworkConnectionError
from pip._internal.models.index import PyPI
from pip._internal.models.link import Link
from pip._internal.network.cache import is_from_cache
Expand Down Expand Up @@ -226,11 +226,20 @@ def __init__(
self,
session: PipSession,
progress_bar: str,
max_parallelism: Optional[int] = None,
) -> None:
self._session = session
# FIXME: support progress bar with parallel downloads!
logger.info("Ignoring progress bar %s for parallel downloads", progress_bar)

if max_parallelism is None:
max_parallelism = 1
if max_parallelism < 1:
raise CommandError(
f"invalid batch download parallelism {max_parallelism}: must be >=1"
)
self._max_parallelism: int = max_parallelism

def __call__(
self, links: Iterable[Link], location: Path
) -> Iterable[Tuple[Link, Tuple[Path, Optional[str]]]]:
Expand All @@ -254,7 +263,7 @@ def __call__(
q: "Queue[Union[Tuple[Link, Path, Optional[str]], BaseException]]" = Queue()
event = Event()
# Limit downloads to 10 at a time so we can reuse our connection pool.
semaphore = Semaphore(value=10)
semaphore = Semaphore(value=self._max_parallelism)

# Distribute request i/o across equivalent threads.
# NB: event-based/async is likely a better model than thread-per-request, but
Expand Down
5 changes: 4 additions & 1 deletion src/pip/_internal/operations/prepare.py
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,7 @@ def __init__(
use_user_site: bool,
lazy_wheel: bool,
verbosity: int,
batch_download_parallelism: Optional[int],
legacy_resolver: bool,
) -> None:
super().__init__()
Expand All @@ -239,7 +240,9 @@ def __init__(
self.build_tracker = build_tracker
self._session = session
self._download = Downloader(session, progress_bar)
self._batch_download = BatchDownloader(session, progress_bar)
self._batch_download = BatchDownloader(
session, progress_bar, max_parallelism=batch_download_parallelism
)
self.finder = finder

# Where still-packed archives should be written to. If None, they are
Expand Down
1 change: 1 addition & 0 deletions tests/unit/test_req.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ def _basic_resolver(
use_user_site=False,
lazy_wheel=False,
verbosity=0,
batch_download_parallelism=None,
legacy_resolver=True,
)
yield Resolver(
Expand Down

0 comments on commit af99bae

Please sign in to comment.