Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support multiprocessing on macOS and Windows #88

Merged
merged 10 commits into from
Aug 23, 2023
147 changes: 45 additions & 102 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -88,24 +88,32 @@ using the <name>-<date>-pages-articles.xml.bz2 files.
Usage example:

```python
from wikitextprocessor import Wtp, WikiNode, NodeKind, Page
ctx = Wtp()

def page_handler(page: Page) -> None:
if page.model != "wikitext" or page.title.startswith("Template:"):
return None
tree = ctx.parse(page.body, pre_expand=True)
... process parse tree
... value = ctx.node_to_wikitext(node)

ctx.process("enwiktionary-20201201-pages-articles.xml.bz2", page_handler)
# for testing, you can iterate over ctx.process(...) with
# list(ctx.process...) to actually have the iterator run.
from functools import partial
from typing import Any

from wikitextprocessor import Wtp, WikiNode, NodeKind, Page
from wikitextprocessor.dumpparser import process_dump

def page_handler(page: Page, wtp: Wtp | None = None) -> Any:
tree = wtp.parse(page.body, pre_expand=True)
# process parse tree
# value = wtp.node_to_wikitext(node)

wtp = Wtp()
process_dump(
wtp,
"enwiktionary-20230801-pages-articles.xml.bz2",
{0, 10, 110, 828}, # namespace id
)
for _ in map(
partial(page_handler, wtp=wtp), wtp.get_all_pages([0])
):
pass
```

The basic operation of ``Wtp.process()`` is as follows:
The basic operation is as follows:
* Extract templates, modules, and other pages from the dump file and save
them in a temporary file
them in a SQLite file
* Heuristically analyze which templates need to be pre-expanded before
parsing to make sense of the page structure (this cannot detect templates
that call Lua code that outputs wikitext that affects parsed structure).
Expand All @@ -114,10 +122,8 @@ The basic operation of ``Wtp.process()`` is as follows:
The page handler can extract, parse, and otherwise process the page, and
has full access to templates and Lua macros defined in the dump. This may
call the page handler in multiple processes in parallel. Return values
from the page handler calls are returned to the caller (this function acts
as an iterator). This is called the second phase.
* Optionally, the ``Wtp.reprocess()`` function may be used for processing the
same data several times (it basically repeats the second phase).
from the page handler calls are returned to the caller. This is
called the second phase.

Most of the functionality is hidden behind the ``Wtp`` object.
``WikiNode`` objects are used for representing the parse
Expand All @@ -127,19 +133,17 @@ is an enumeration type used to encode the type of a ``WikiNode``.
### class Wtp

```python
def __init__(self, num_threads: Optional[int] = None, db_path: Optional[Path] = None,
quiet: bool = False, lang_code: str = "en", languages_by_code: Dict[str, List[str]] = {},
template_override_funcs: Dict[str, Callable[List[str], str]] = {})
def __init__(
self,
db_path: Optional[Union[str, Path]] = None,
lang_code="en",
languages_by_code: Dict[str, List[str]] = {},
template_override_funcs: Dict[str, Callable[[Sequence[str]], str]] = {},
):
```

The initializer can usually be called without arguments, but recognizes
the following arguments:
* ``num_threads`` - if set to an integer, use that many parallel processes
for processing the dump. The default is to use as many processors as there
are available cores/hyperthreads. You may need to limit the number of
parallel processes if you are limited by available memory; we have found
that processing Wiktionary (including templates and Lua macros)
requires 3-4GB of memory per process. This MUST be set to 1 on Windows.
* `db_path` can be `None`, in which case a temporary database file
will be created under `/tmp`, or a path for the database file which contains
page texts and other data of the dump file.
Expand All @@ -156,83 +160,14 @@ the following arguments:
to parse a dump file, which will initialize the database file during the
first phase. If you wish to re-create the database, you should remove
the old file first.
* ``quiet`` - if set to True, suppress progress messages during processing
* `lang_code` - the language code of the dump file.
* `languages_by_code` - Languages data.
* `template_override_funcs` - Python functions for overriding expanded template text.

**Windows and MacOS note:** Setting `num_threads` to a value other than 1
doesn't work on Windows and MacOS. It now defaults to 1 on these platforms.
This is because these platforms don't use `fork()` in the Python multiprocessing
package, and the current parallelization implementation depends on this.

```python
def process(self, path: str, page_handler, namespace_ids: Set[int], phase1_only=False,
override_folders: Optional[List[Path]] = None, skip_extract_dump: bool = False):
```

This function processes a WikiMedia dump, uncompressing and extracing pages
(including templates and Lua modules), and calling ``Wtp.add_page()`` for
each page (phase 1). This then calls ``Wtp.reprocess()`` to execute the
second phase.

This takes the following arguments:
* ``path`` (str) - path to the WikiMedia dump file to be processed
(e.g., "enwiktionary-20201201-pages-articles.xml.bz2"). Note that the
compressed file can be used. Dump files can be
downloaded [here](https://dumps.wikimedia.org).
* `page_handler` (function) - this function will be called for each page
in phase 2 (unless `phase1_only` is set to `True`). The call takes the form
`page_handler(page: Page)`, where `page.model` is the `model` value
for the page in the dump (`wikitext` for normal wikitext pages and
templates, `Scribunto` for Lua modules; other values are also possible),
`page.title` is page title (e.g., `sample` or `Template:foobar`
or `Module:mystic`), and `page.body` is the contents of the page (usually
wikitext).
* `namespace_ids` - a set of namespace ids, pages have namespace ids that not
included in this set won't be processed.
* ``phase1_only`` (boolean) - if set to True, prevents phase 2
processing and the ``page_handler`` function will not be called. The
``Wtp.reprocess()`` function can be used to run the second phase separately,
or ``Wtp.expand()``, ``Wtp.parse()`` and other functions can be used.
* `override_folders` - a list of folder paths, each folder contains files for
overriding pages in the dump file.
* `skip_extract_dump` - Extract dump file can be skipped if the database was
created before.

This function returns an iterator over the values returned by the
``page_handler`` function (if ``page_handler`` returns ``None`` or no
value, the iterator does not return those values). Note that
``page_handler`` will usually be run in a separate process, and cannot
pass any values back in global variables. It can, however, access
global variables assigned before calling ``Wtp.process()`` (in Linux
only).

```python
def reprocess(self, page_handler, namespace_ids: Optional[List[int]] = None,
include_redirects: bool = True):
```

Iterates over all pages in the cache file and calls ``page_handler``
for each page. This basically implements phase 2 of processing a dump
file (see ``Wtp.process()``). This can be called more than once if desired.

The arguments are:
* `page_handler` (function) - as same as the argument in `Wtp.process()`.
* `namespace_ids` - as same as the argument in `Wtp.process()`.
* `include_redirects` - redirect pages will be processed if set to `True`.

This function returns an iterator that iterates over the return values
of ``page_handler``. If the return value is ``None`` or ``page_handler``
returns no value, no value is returned by the iterator for such calls.

This calls the ``page_handler`` using subprocesses (unless ``num_threads``
was set to 1 in the initializer). It may be necessary to set it to 1
on Windows and MacOS due to operating system/python limitations on those
platforms.

```python
def read_by_title(self, title: str, namespace_id: Optional[int] = None) -> Optional[str]
def read_by_title(
self, title: str, namespace_id: Optional[int] = None
) -> Optional[str]:
```

Reads the contents of the page with the specified title from the cache
Expand All @@ -249,8 +184,16 @@ This returns the page contents as a string, or ``None`` if the page
does not exist.

```python
def parse(self, text, pre_expand=False, expand_all=False,
additional_expand=None)
def parse(
self,
text: str,
pre_expand=False,
expand_all=False,
additional_expand=None,
do_not_pre_expand=None,
template_fn=None,
post_template_fn=None,
) -> WikiNode:
```

Parses wikitext into a parse tree (``WikiNode``), optionally expanding
Expand Down
35 changes: 25 additions & 10 deletions tests/test_dumpparser.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,26 @@
import unittest
from unittest.mock import patch
from collections import namedtuple
from pathlib import Path
from unittest.mock import patch

from wikitextprocessor.dumpparser import path_is_on_windows_partition
from wikitextprocessor import Wtp
from wikitextprocessor.dumpparser import (
path_is_on_windows_partition,
process_dump,
)

sdisktype = namedtuple("sdisktype", "fstype mountpoint")

sdisktype = namedtuple('sdisktype', 'fstype mountpoint')

class DumpParserTests(unittest.TestCase):

@patch('wikitextprocessor.dumpparser.disk_partitions')
@patch('pathlib.Path.resolve', new = lambda x: x)
def setUp(self):
self.wtp = Wtp()

def tearDown(self):
self.wtp.close_db_conn()

@patch("psutil.disk_partitions")
@patch("pathlib.Path.resolve", new=lambda x: x)
def test_path_is_on_windows_partition_nix(self, mock_disk_partitions):
partitions = [
sdisktype("fakelongfsname", "/"),
Expand All @@ -25,16 +35,21 @@ def test_path_is_on_windows_partition_nix(self, mock_disk_partitions):
self.assertTrue(path_is_on_windows_partition(Path("/mnt/windows0")))
self.assertTrue(path_is_on_windows_partition(Path("/mnt/windows1/foo")))

@patch('wikitextprocessor.dumpparser.disk_partitions')
@patch('pathlib.Path.resolve', new = lambda x: x)
@patch("psutil.disk_partitions")
@patch("pathlib.Path.resolve", new=lambda x: x)
def test_path_is_on_windows_partition_windows(self, mock_disk_partitions):
partitions = [
sdisktype("NTFS", "C:\\"),
sdisktype("exFAT", "D:\\"),
]

mock_disk_partitions.return_value = partitions
self.assertTrue(path_is_on_windows_partition(Path("D:\\")))
self.assertTrue(path_is_on_windows_partition(Path("C:\\Users\\user0")))


def test_process_dump(self):
process_dump(
self.wtp,
"tests/test-pages-articles.xml.bz2",
{0, 4, 10, 14, 100, 110, 118, 828},
)
self.assertGreater(self.wtp.saved_page_nums(), 0)
57 changes: 0 additions & 57 deletions tests/test_long.py

This file was deleted.

1 change: 1 addition & 0 deletions tests/test_wikiprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -4151,6 +4151,7 @@ def test_analyze_used_pre_expand_template(self) -> None:
)
self.ctx.add_page("Template:En-nm", 10, body="{{-n2-|英語}}")
self.ctx.analyze_templates()
self.ctx.get_page.cache_clear()
page = self.ctx.get_page("Template:En-nm", 10)
self.assertTrue(page.need_pre_expand)

Expand Down
Loading