Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Preprocessing: Reproducible batch order even when number of workers changes #119

Closed
Tracked by #158
dlwh opened this issue Apr 20, 2023 · 0 comments
Closed
Tracked by #158

Comments

@dlwh
Copy link
Member

dlwh commented Apr 20, 2023

Part of #99

A bit tricky to pull off. The design doc has this:

Sharded Reading

We say there are K input shards, W writers, R readers. We assume K >= W (though typically K is not too large), and W ≈ R.
We produce N chunks. We also define an idealized number of readers R*, which defines the global ordering over the data.
Typically R* should be the maximum number of readers we expect to actually use.

We want to be able to read from the cache in a way that is deterministic and reproducible, even if the number of readers
changes. We also want readers to only read from the chunks that they need to read from.
We pretend the list of data is infinite by cycling. We cannot track epochs.

NB Our goal is a deterministic ordering over examples, and not merely chunks or even documents.

Given a list of chunks and the idealized number of readers R*, we define the global ordering over chunks as follows:
First define R* iterators over chunks, with chunk_iterators[r] being defined as loop(all_chunks)[r::R*].

Next, define a function mk_examples(chunk_iterator) that takes a list of iterators over chunks and returns
a list of examples. Define chunk_examples[r] = mk_examples(chunk_examples[r]).
This function depends on our sequence length, etc. Then the ordering over examples is:

chunk_examples[0][0], chunk_examples[1][0], ..., chunk_examples[R*-1][0], ..., chunk_examples[0][1], chunk_examples[1][1], ..., chunk_examples[R*-1][1], ...
that is, example[i] == chunk_examples[i % R*][i // R*]

If we have $R*$ readers, then each reader_iterator[r][j] == chunk_examples[r][j] == example[j * R* + r].
Moreover, if either R or R* is a multiple of the other, then we still get a nice property where
each reader reads from a strided slice of the chunk_iterators:

(Boring math)
If we have R readers, then reader_iterator[r][j] == example[j * R + r] == chunk_examples[(j * R + r) % R*][(j * R + r) // R*]
If we have R == n * R*, then reader_iterator[r][j] == example[j * R + r] == chunk_examples[(j * R + r) % R*][(j * R + r) // R*] == chunk_examples[r % R*][(j * n * R* + r) // R*] == chunk_examples[r % R*][j * n + r // R*], so each reader reads from
a strided slice (specifically islice(..., r//R*, None, n))
If we have R* == n * R, then reader_iterator[r][j] == example[j * R + r] == chunk_examples[(j * R + r) % R*][(j * R + r) // R*] == chunk_examples[R * (j % n) + r][(j * R + r) // R*] and so each reader reads from n different chunk_exampless.
so we round-robin over a slice of the chunk_exampless.

For other cases (R and R* don't divide each other), there's no simple relationship between the reader and chunk iterators
and you end up reading from everywhere, but that's ok.

More thoughts

There are two cases to consider: one where 1 row = 1 example, and our current use case where the number of rows and examples can't be easily determined.

For the former case, you just read round robin from your assigned chunk slices. For the latter case, a reader needs to be careful to choose produce one example from each of its chunk streams round robin. The easiest way to do this is to inject a producer function into the chunk readers...

@dlwh dlwh added this to the Spring 23 Release milestone Apr 28, 2023
@dlwh dlwh added the p3 label Apr 28, 2023
@dlwh dlwh mentioned this issue May 23, 2023
22 tasks
@dlwh dlwh removed this from the Spring 23 Release milestone Jun 14, 2023
dlwh added a commit that referenced this issue Sep 5, 2024
…le, stable mixtures and more (#716)

Introduces a massive rework of Levanter's cache system to support instant resume, perfect shuffle, stable mixtures and such.

The basic idea is to use TensorStore to store all of our data as a kind of janky column store (implemented in JaggedArrayStore) and pytrees of such (implemented in TreeStore).

TensorStore provides efficient storage and access to very large arrays. We still support streaming from an in progress cache via a new AsyncDataset class.

I've successfully tests this on the pile and, modulo the usual issues with the llama tokenizer on long documents/books, it behaves well.

Closes #626 #311 #119 #34
@dlwh dlwh closed this as completed Sep 5, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

1 participant