From beef6107fc50be5f5c6eea319bd52a5a75ef9890 Mon Sep 17 00:00:00 2001 From: Gergely Szilvasy Date: Fri, 5 Jan 2024 09:27:04 -0800 Subject: [PATCH] faiss paper benchmarks (#3189) Summary: - IVF benchmarks: `bench_fw_ivf.py bench_fw_ivf.py bigann /checkpoint/gsz/bench_fw/ivf` - Codec benchmarks: `bench_fw_codecs.py contriever /checkpoint/gsz/bench_fw/codecs` and `bench_fw_codecs.py deep1b /checkpoint/gsz/bench_fw/codecs` - A range codec evaluation: `bench_fw_range.py ssnpp /checkpoint/gsz/bench_fw/range` - Visualize with `bench_fw_notebook.ipynb` - Support for running on a cluster Pull Request resolved: https://github.com/facebookresearch/faiss/pull/3189 Reviewed By: mdouze Differential Revision: D52544642 Pulled By: algoriddle fbshipit-source-id: 21dcdfd076aef6d36467c908e6be78ef851b0e98 --- benchs/bench_fw/benchmark.py | 482 +++++++++++++------- benchs/bench_fw/benchmark_io.py | 63 ++- benchs/bench_fw/descriptors.py | 27 ++ benchs/bench_fw/index.py | 759 +++++++++++++++++++++----------- benchs/bench_fw/utils.py | 107 +++++ benchs/bench_fw_codecs.py | 146 ++++++ benchs/bench_fw_ivf.py | 120 +++++ benchs/bench_fw_ivf_flat.py | 37 -- benchs/bench_fw_notebook.ipynb | 376 +++++++++++++++- benchs/bench_fw_range.py | 83 ++++ benchs/bench_fw_test.py | 61 --- contrib/factory_tools.py | 21 +- 12 files changed, 1729 insertions(+), 553 deletions(-) create mode 100644 benchs/bench_fw/utils.py create mode 100644 benchs/bench_fw_codecs.py create mode 100644 benchs/bench_fw_ivf.py delete mode 100644 benchs/bench_fw_ivf_flat.py create mode 100644 benchs/bench_fw_range.py delete mode 100644 benchs/bench_fw_test.py diff --git a/benchs/bench_fw/benchmark.py b/benchs/bench_fw/benchmark.py index 8ee53103e5..ccdbf9c5d6 100644 --- a/benchs/bench_fw/benchmark.py +++ b/benchs/bench_fw/benchmark.py @@ -4,18 +4,17 @@ # LICENSE file in the root directory of this source tree. import logging +from copy import copy from dataclasses import dataclass from operator import itemgetter from statistics import median, mean from typing import Any, Dict, List, Optional +from .utils import dict_merge from .index import Index, IndexFromCodec, IndexFromFactory from .descriptors import DatasetDescriptor, IndexDescriptor import faiss # @manual=//faiss/python:pyfaiss_gpu -from faiss.contrib.evaluation import ( # @manual=//faiss/contrib:faiss_contrib_gpu - knn_intersection_measure, -) import numpy as np @@ -90,26 +89,18 @@ def optimizer(op, search, cost_metric, perf_metric): continue logger.info(f"{cno=:4d} {str(parameters):50}: RUN") - cost, perf = search( + cost, perf, requires = search( parameters, cost_metric, perf_metric, ) + if requires is not None: + return requires logger.info( f"{cno=:4d} {str(parameters):50}: DONE, {cost=:.3f} {perf=:.3f}" ) op.add_operating_point(key, perf, cost) - - -def distance_ratio_measure(I, R, D_GT, metric): - sum_of_R = np.sum(np.where(I >= 0, R, 0)) - sum_of_D_GT = np.sum(np.where(I >= 0, D_GT, 0)) - if metric == faiss.METRIC_INNER_PRODUCT: - return (sum_of_R / sum_of_D_GT).item() - elif metric == faiss.METRIC_L2: - return (sum_of_D_GT / sum_of_R).item() - else: - raise RuntimeError(f"unknown metric {metric}") + return None # range_metric possible values: @@ -194,6 +185,7 @@ def sigmoid(x, a, b, c): @dataclass class Benchmark: + num_threads: int training_vectors: Optional[DatasetDescriptor] = None database_vectors: Optional[DatasetDescriptor] = None query_vectors: Optional[DatasetDescriptor] = None @@ -233,7 +225,8 @@ def range_search_reference(self, index, parameters, range_metric): else: m_radius = range_metric - lims, D, I, R, P = self.range_search( + lims, D, I, R, P, _ = self.range_search( + False, index, parameters, radius=m_radius, @@ -258,7 +251,8 @@ def range_search_reference(self, index, parameters, range_metric): ) def estimate_range(self, index, parameters, range_scoring_radius): - D, I, R, P = index.knn_search( + D, I, R, P, _ = index.knn_search( + False, parameters, self.query_vectors, self.k, @@ -275,10 +269,13 @@ def estimate_range(self, index, parameters, range_scoring_radius): def range_search( self, + dry_run, index: Index, search_parameters: Optional[Dict[str, int]], radius: Optional[float] = None, gt_radius: Optional[float] = None, + range_search_metric_function = None, + gt_rsm = None, ): logger.info("range_search: begin") if radius is None: @@ -293,16 +290,32 @@ def range_search( ) ) logger.info(f"Radius={radius}") - return index.range_search( + lims, D, I, R, P, requires = index.range_search( + dry_run=dry_run, search_parameters=search_parameters, query_vectors=self.query_vectors, radius=radius, ) + if requires is not None: + return None, None, None, None, None, requires + if range_search_metric_function is not None: + range_search_metric = range_search_metric_function(R) + range_search_pr = range_search_pr_curve( + D, range_search_metric, gt_rsm + ) + range_score_sum = np.sum(range_search_metric).item() + P |= { + "range_score_sum": range_score_sum, + "range_score_max_recall": range_score_sum / gt_rsm, + "range_search_pr": range_search_pr, + } + return lims, D, I, R, P, requires def range_ground_truth(self, gt_radius, range_search_metric_function): logger.info("range_ground_truth: begin") flat_desc = self.get_index_desc("Flat") - lims, D, I, R, P = self.range_search( + lims, D, I, R, P, _ = self.range_search( + False, flat_desc.index, search_parameters=None, radius=gt_radius, @@ -311,166 +324,277 @@ def range_ground_truth(self, gt_radius, range_search_metric_function): logger.info("range_ground_truth: end") return gt_rsm - def range_search_benchmark( - self, - results: Dict[str, Any], - index: Index, - metric_key: str, - radius: float, - gt_radius: float, - range_search_metric_function, - gt_rsm: float, - ): - logger.info(f"range_search_benchmark: begin {index.get_index_name()}") - - def experiment(parameters, cost_metric, perf_metric): - nonlocal results - key = index.get_range_search_name( - search_parameters=parameters, - query_vectors=self.query_vectors, - radius=radius, - ) - key += metric_key - if key in results["experiments"]: - metrics = results["experiments"][key] - else: - lims, D, I, R, P = self.range_search( - index, - parameters, - radius=radius, - gt_radius=gt_radius, - ) - range_search_metric = range_search_metric_function(R) - range_search_pr = range_search_pr_curve( - D, range_search_metric, gt_rsm - ) - range_score_sum = np.sum(range_search_metric).item() - metrics = P | { - "range_score_sum": range_score_sum, - "range_score_max_recall": range_score_sum / gt_rsm, - "range_search_pr": range_search_pr, - } - results["experiments"][key] = metrics - return metrics[cost_metric], metrics[perf_metric] - - for cost_metric in ["time"]: - for perf_metric in ["range_score_max_recall"]: - op = index.get_operating_points() - optimizer( - op, - experiment, - cost_metric, - perf_metric, - ) - logger.info("range_search_benchmark: end") - return results - def knn_ground_truth(self): logger.info("knn_ground_truth: begin") flat_desc = self.get_index_desc("Flat") - self.gt_knn_D, self.gt_knn_I, _, _ = flat_desc.index.knn_search( + self.build_index_wrapper(flat_desc) + self.gt_knn_D, self.gt_knn_I, _, _, requires = flat_desc.index.knn_search( + dry_run=False, search_parameters=None, query_vectors=self.query_vectors, k=self.k, ) + assert requires is None logger.info("knn_ground_truth: end") - def knn_search_benchmark(self, results: Dict[str, Any], index: Index): + def search_benchmark( + self, + name, + search_func, + key_func, + cost_metrics, + perf_metrics, + results: Dict[str, Any], + index: Index, + ): index_name = index.get_index_name() - logger.info(f"knn_search_benchmark: begin {index_name}") + logger.info(f"{name}_benchmark: begin {index_name}") def experiment(parameters, cost_metric, perf_metric): nonlocal results - key = index.get_knn_search_name( - parameters, - self.query_vectors, - self.k, - ) - key += "knn" + key = key_func(parameters) if key in results["experiments"]: metrics = results["experiments"][key] else: - D, I, R, P = index.knn_search( - parameters, self.query_vectors, self.k - ) - metrics = P | { - "knn_intersection": knn_intersection_measure( - I, self.gt_knn_I - ), - "distance_ratio": distance_ratio_measure( - I, R, self.gt_knn_D, self.distance_metric_type - ), - } + metrics, requires = search_func(parameters) + if requires is not None: + return None, None, requires results["experiments"][key] = metrics - return metrics[cost_metric], metrics[perf_metric] + return metrics[cost_metric], metrics[perf_metric], None - for cost_metric in ["time"]: - for perf_metric in ["knn_intersection", "distance_ratio"]: + for cost_metric in cost_metrics: + for perf_metric in perf_metrics: op = index.get_operating_points() - optimizer( + requires = optimizer( op, experiment, cost_metric, perf_metric, ) - logger.info("knn_search_benchmark: end") - return results + if requires is not None: + break + logger.info(f"{name}_benchmark: end") + return results, requires + + def knn_search_benchmark(self, dry_run, results: Dict[str, Any], index: Index): + return self.search_benchmark( + name="knn_search", + search_func=lambda parameters: index.knn_search( + dry_run, parameters, self.query_vectors, self.k, self.gt_knn_I, self.gt_knn_D, + )[3:], + key_func=lambda parameters: index.get_knn_search_name( + search_parameters=parameters, + query_vectors=self.query_vectors, + k=self.k, + reconstruct=False, + ), + cost_metrics=["time"], + perf_metrics=["knn_intersection", "distance_ratio"], + results=results, + index=index, + ) - def train(self, results): - xq = self.io.get_dataset(self.query_vectors) - self.d = xq.shape[1] - if self.get_index_desc("Flat") is None: - self.index_descs.append(IndexDescriptor(factory="Flat")) - for index_desc in self.index_descs: - if index_desc.factory is not None: - index = IndexFromFactory( - d=self.d, - metric=self.distance_metric, - database_vectors=self.database_vectors, - search_params=index_desc.search_params, - construction_params=index_desc.construction_params, - factory=index_desc.factory, - training_vectors=self.training_vectors, - ) - index.set_io(self.io) - index.train() - index_desc.index = index - results["indices"][index.get_codec_name()] = { - "code_size": index.get_code_size() - } + def reconstruct_benchmark(self, dry_run, results: Dict[str, Any], index: Index): + return self.search_benchmark( + name="reconstruct", + search_func=lambda parameters: index.reconstruct( + dry_run, parameters, self.query_vectors, self.k, self.gt_knn_I, + ), + key_func=lambda parameters: index.get_knn_search_name( + search_parameters=parameters, + query_vectors=self.query_vectors, + k=self.k, + reconstruct=True, + ), + cost_metrics=["encode_time"], + perf_metrics=["sym_recall"], + results=results, + index=index, + ) + + def range_search_benchmark( + self, + dry_run, + results: Dict[str, Any], + index: Index, + metric_key: str, + radius: float, + gt_radius: float, + range_search_metric_function, + gt_rsm: float, + ): + return self.search_benchmark( + name="range_search", + search_func=lambda parameters: self.range_search( + dry_run=dry_run, + index=index, + search_parameters=parameters, + radius=radius, + gt_radius=gt_radius, + range_search_metric_function=range_search_metric_function, + gt_rsm=gt_rsm, + )[4:], + key_func=lambda parameters: index.get_range_search_name( + search_parameters=parameters, + query_vectors=self.query_vectors, + radius=radius, + ) + metric_key, + cost_metrics=["time"], + perf_metrics=["range_score_max_recall"], + results=results, + index=index, + ) + + def build_index_wrapper(self, index_desc: IndexDescriptor): + if hasattr(index_desc, 'index'): + return + if index_desc.factory is not None: + training_vectors = copy(self.training_vectors) + training_vectors.num_vectors = index_desc.training_size + index = IndexFromFactory( + num_threads=self.num_threads, + d=self.d, + metric=self.distance_metric, + database_vectors=self.database_vectors, + search_params=index_desc.search_params, + construction_params=index_desc.construction_params, + factory=index_desc.factory, + training_vectors=training_vectors, + ) + else: + index = IndexFromCodec( + num_threads=self.num_threads, + d=self.d, + metric=self.distance_metric, + database_vectors=self.database_vectors, + search_params=index_desc.search_params, + construction_params=index_desc.construction_params, + path=index_desc.path, + bucket=index_desc.bucket, + ) + index.set_io(self.io) + index_desc.index = index + + def clone_one(self, index_desc): + benchmark = Benchmark( + num_threads=self.num_threads, + training_vectors=self.training_vectors, + database_vectors=self.database_vectors, + query_vectors=self.query_vectors, + index_descs = [self.get_index_desc("Flat"), index_desc], + range_ref_index_desc=self.range_ref_index_desc, + k=self.k, + distance_metric=self.distance_metric, + ) + benchmark.set_io(self.io) + return benchmark + + def benchmark_one(self, dry_run, results: Dict[str, Any], index_desc: IndexDescriptor, train, reconstruct, knn, range): + faiss.omp_set_num_threads(self.num_threads) + if not dry_run: + self.knn_ground_truth() + self.build_index_wrapper(index_desc) + meta, requires = index_desc.index.fetch_meta(dry_run=dry_run) + if requires is not None: + return results, (requires if train else None) + results["indices"][index_desc.index.get_codec_name()] = meta + + # results, requires = self.reconstruct_benchmark( + # dry_run=True, + # results=results, + # index=index_desc.index, + # ) + # if reconstruct and requires is not None: + # if dry_run: + # return results, requires + # else: + # results, requires = self.reconstruct_benchmark( + # dry_run=False, + # results=results, + # index=index_desc.index, + # ) + # assert requires is None + + results, requires = self.knn_search_benchmark( + dry_run=True, + results=results, + index=index_desc.index, + ) + if knn and requires is not None: + if dry_run: + return results, requires else: - index = IndexFromCodec( - d=self.d, - metric=self.distance_metric, - database_vectors=self.database_vectors, - search_params=index_desc.search_params, - construction_params=index_desc.construction_params, - path=index_desc.path, - bucket=index_desc.bucket, + results, requires = self.knn_search_benchmark( + dry_run=False, + results=results, + index=index_desc.index, ) - index.set_io(self.io) - index_desc.index = index - results["indices"][index.get_codec_name()] = { - "code_size": index.get_code_size() - } - return results + assert requires is None + + if self.range_ref_index_desc is None or not index_desc.index.supports_range_search(): + return results, None + + ref_index_desc = self.get_index_desc(self.range_ref_index_desc) + if ref_index_desc is None: + raise ValueError( + f"Unknown range index {self.range_ref_index_desc}" + ) + if ref_index_desc.range_metrics is None: + raise ValueError( + f"Range index {ref_index_desc.factory} has no radius_score" + ) + for metric_key, range_metric in ref_index_desc.range_metrics.items(): + ( + gt_radius, + range_search_metric_function, + coefficients, + coefficients_training_data, + ) = self.range_search_reference( + ref_index_desc.index, ref_index_desc.search_params, range_metric + ) + gt_rsm = self.range_ground_truth( + gt_radius, range_search_metric_function + ) + results, requires = self.range_search_benchmark( + dry_run=True, + results=results, + index=index_desc.index, + metric_key=metric_key, + radius=index_desc.radius, + gt_radius=gt_radius, + range_search_metric_function=range_search_metric_function, + gt_rsm=gt_rsm, + ) + if range and requires is not None: + if dry_run: + return results, requires + else: + results, requires = self.range_search_benchmark( + dry_run=False, + results=results, + index=index_desc.index, + metric_key=metric_key, + radius=index_desc.radius, + gt_radius=gt_radius, + range_search_metric_function=range_search_metric_function, + gt_rsm=gt_rsm, + ) + assert requires is None - def benchmark(self, result_file=None): + return results, None + + def benchmark(self, result_file=None, local=False, train=False, reconstruct=False, knn=False, range=False): logger.info("begin evaluate") - faiss.omp_set_num_threads(24) + faiss.omp_set_num_threads(self.num_threads) results = {"indices": {}, "experiments": {}} - results = self.train(results) + xq = self.io.get_dataset(self.query_vectors) + self.d = xq.shape[1] + if self.get_index_desc("Flat") is None: + self.index_descs.append(IndexDescriptor(factory="Flat")) - # knn search self.knn_ground_truth() - for index_desc in self.index_descs: - results = self.knn_search_benchmark( - results=results, - index=index_desc.index, - ) - # range search if self.range_ref_index_desc is not None: index_desc = self.get_index_desc(self.range_ref_index_desc) if index_desc is None: @@ -498,19 +622,63 @@ def benchmark(self, result_file=None): gt_rsm = self.range_ground_truth( gt_radius, range_search_metric_function ) - for index_desc in self.index_descs: - if not index_desc.index.supports_range_search(): - continue - results = self.range_search_benchmark( - results=results, - index=index_desc.index, - metric_key=metric_key, - radius=index_desc.radius, - gt_radius=gt_radius, - range_search_metric_function=range_search_metric_function, - gt_rsm=gt_rsm, - ) + + self.index_descs = list(dict.fromkeys(self.index_descs)) + + todo = self.index_descs + for index_desc in self.index_descs: + index_desc.requires = None + + queued = set() + while todo: + current_todo = [] + next_todo = [] + for index_desc in todo: + results, requires = self.benchmark_one( + dry_run=True, + results=results, + index_desc=index_desc, + train=train, + reconstruct=reconstruct, + knn=knn, + range=range, + ) + if requires is None: + continue + if requires in queued: + if index_desc.requires != requires: + index_desc.requires = requires + next_todo.append(index_desc) + else: + queued.add(requires) + index_desc.requires = requires + current_todo.append(index_desc) + + if current_todo: + results_one = {"indices": {}, "experiments": {}} + params = [(self.clone_one(index_desc), results_one, index_desc, train, reconstruct, knn, range) for index_desc in current_todo] + for result in self.io.launch_jobs(run_benchmark_one, params, local=local): + dict_merge(results, result) + + todo = next_todo + if result_file is not None: self.io.write_json(results, result_file, overwrite=True) logger.info("end evaluate") return results + +def run_benchmark_one(params): + logger.info(params) + benchmark, results, index_desc, train, reconstruct, knn, range = params + results, requires = benchmark.benchmark_one( + dry_run=False, + results=results, + index_desc=index_desc, + train=train, + reconstruct=reconstruct, + knn=knn, + range=range, + ) + assert requires is None + assert results is not None + return results diff --git a/benchs/bench_fw/benchmark_io.py b/benchs/bench_fw/benchmark_io.py index 370efffce5..483acba8c7 100644 --- a/benchs/bench_fw/benchmark_io.py +++ b/benchs/bench_fw/benchmark_io.py @@ -10,6 +10,7 @@ import os import pickle from dataclasses import dataclass +import submitit from typing import Any, List, Optional from zipfile import ZipFile @@ -106,7 +107,7 @@ def write_file( fn = self.get_local_filename(filename) with ZipFile(fn, "w") as zip_file: for key, value in zip(keys, values, strict=True): - with zip_file.open(key, "w") as f: + with zip_file.open(key, "w", force_zip64=True) as f: if key in ["D", "I", "R", "lims"]: np.save(f, value) elif key in ["P"]: @@ -117,22 +118,22 @@ def write_file( self.upload_file_to_blobstore(filename, overwrite=overwrite) def get_dataset(self, dataset): - if dataset.namespace is not None and dataset.namespace[:4] == "std_": - if dataset.tablename not in self.cached_ds: - self.cached_ds[dataset.tablename] = dataset_from_name( - dataset.tablename, - ) - p = dataset.namespace[4] - if p == "t": - return self.cached_ds[dataset.tablename].get_train() - elif p == "d": - return self.cached_ds[dataset.tablename].get_database() - elif p == "q": - return self.cached_ds[dataset.tablename].get_queries() - else: - raise ValueError - elif dataset not in self.cached_ds: - if dataset.namespace == "syn": + if dataset not in self.cached_ds: + if dataset.namespace is not None and dataset.namespace[:4] == "std_": + if dataset.tablename not in self.cached_ds: + self.cached_ds[dataset.tablename] = dataset_from_name( + dataset.tablename, + ) + p = dataset.namespace[4] + if p == "t": + self.cached_ds[dataset] = self.cached_ds[dataset.tablename].get_train(dataset.num_vectors) + elif p == "d": + self.cached_ds[dataset] = self.cached_ds[dataset.tablename].get_database() + elif p == "q": + self.cached_ds[dataset] = self.cached_ds[dataset.tablename].get_queries() + else: + raise ValueError + elif dataset.namespace == "syn": d, seed = dataset.tablename.split("_") d = int(d) seed = int(seed) @@ -225,3 +226,31 @@ def write_index( logger.info(f"Saving index to {fn}") faiss.write_index(index, fn) self.upload_file_to_blobstore(filename) + assert os.path.exists(fn) + return os.path.getsize(fn) + + def launch_jobs(self, func, params, local=True): + if local: + results = [func(p) for p in params] + return results + print(f'launching {len(params)} jobs') + executor = submitit.AutoExecutor(folder='/checkpoint/gsz/jobs') + executor.update_parameters( + nodes=1, + gpus_per_node=8, + cpus_per_task=80, + # mem_gb=640, + tasks_per_node=1, + name="faiss_benchmark", + slurm_array_parallelism=512, + slurm_partition="scavenge", + slurm_time=4 * 60, + slurm_constraint="bldg1", + ) + jobs = executor.map_array(func, params) + print(f'launched {len(jobs)} jobs') + # for job, param in zip(jobs, params): + # print(f"{job.job_id=} {param=}") + results = [job.result() for job in jobs] + print(f'received {len(results)} results') + return results diff --git a/benchs/bench_fw/descriptors.py b/benchs/bench_fw/descriptors.py index 15e5b9330b..113f46b545 100644 --- a/benchs/bench_fw/descriptors.py +++ b/benchs/bench_fw/descriptors.py @@ -4,8 +4,13 @@ # LICENSE file in the root directory of this source tree. from dataclasses import dataclass +import logging from typing import Any, Dict, List, Optional +import faiss # @manual=//faiss/python:pyfaiss_gpu +from .utils import timer +logger = logging.getLogger(__name__) + @dataclass class IndexDescriptor: @@ -33,6 +38,10 @@ class IndexDescriptor: # [radius2_from, radius2_to) -> score2 range_metrics: Optional[Dict[str, Any]] = None radius: Optional[float] = None + training_size: Optional[int] = None + + def __hash__(self): + return hash(str(self)) @dataclass @@ -85,3 +94,21 @@ def get_filename( filename += f"_{self.num_vectors}" filename += "." return filename + + def k_means(self, io, k, dry_run): + logger.info(f"k_means {k} {self}") + kmeans_vectors = DatasetDescriptor( + tablename=f"{self.get_filename()}kmeans_{k}.npy" + ) + meta_filename = kmeans_vectors.tablename + ".json" + if not io.file_exist(kmeans_vectors.tablename) or not io.file_exist(meta_filename): + if dry_run: + return None, None, kmeans_vectors.tablename + x = io.get_dataset(self) + kmeans = faiss.Kmeans(d=x.shape[1], k=k, gpu=True) + _, t, _ = timer("k_means", lambda: kmeans.train(x)) + io.write_nparray(kmeans.centroids, kmeans_vectors.tablename) + io.write_json({"k_means_time": t}, meta_filename) + else: + t = io.read_json(meta_filename)["k_means_time"] + return kmeans_vectors, t, None diff --git a/benchs/bench_fw/index.py b/benchs/bench_fw/index.py index 3405f59561..4c536aa753 100644 --- a/benchs/bench_fw/index.py +++ b/benchs/bench_fw/index.py @@ -4,11 +4,11 @@ # LICENSE file in the root directory of this source tree. +from copy import copy import logging import os +from collections import OrderedDict from dataclasses import dataclass -from multiprocessing.pool import ThreadPool -from time import perf_counter from typing import ClassVar, Dict, List, Optional import faiss # @manual=//faiss/python:pyfaiss_gpu @@ -16,8 +16,8 @@ import numpy as np from faiss.contrib.evaluation import ( # @manual=//faiss/contrib:faiss_contrib_gpu OperatingPointsWithRanges, + knn_intersection_measure, ) - from faiss.contrib.factory_tools import ( # @manual=//faiss/contrib:faiss_contrib_gpu reverse_index_factory, ) @@ -27,67 +27,11 @@ ) from .descriptors import DatasetDescriptor +from .utils import distance_ratio_measure, get_cpu_info, timer, refine_distances_knn, refine_distances_range logger = logging.getLogger(__name__) -def timer(name, func, once=False) -> float: - logger.info(f"Measuring {name}") - t1 = perf_counter() - res = func() - t2 = perf_counter() - t = t2 - t1 - repeat = 1 - if not once and t < 1.0: - repeat = int(2.0 // t) - logger.info( - f"Time for {name}: {t:.3f} seconds, repeating {repeat} times" - ) - t1 = perf_counter() - for _ in range(repeat): - res = func() - t2 = perf_counter() - t = (t2 - t1) / repeat - logger.info(f"Time for {name}: {t:.3f} seconds") - return res, t, repeat - - -def refine_distances_knn( - D: np.ndarray, I: np.ndarray, xq: np.ndarray, xb: np.ndarray, metric -): - return np.where( - I >= 0, - np.square(np.linalg.norm(xq[:, None] - xb[I], axis=2)) - if metric == faiss.METRIC_L2 - else np.einsum("qd,qkd->qk", xq, xb[I]), - D, - ) - - -def refine_distances_range( - lims: np.ndarray, - D: np.ndarray, - I: np.ndarray, - xq: np.ndarray, - xb: np.ndarray, - metric, -): - with ThreadPool(32) as pool: - R = pool.map( - lambda i: ( - np.sum(np.square(xq[i] - xb[I[lims[i]:lims[i + 1]]]), axis=1) - if metric == faiss.METRIC_L2 - else np.tensordot( - xq[i], xb[I[lims[i]:lims[i + 1]]], axes=(0, 1) - ) - ) - if lims[i + 1] > lims[i] - else [], - range(len(lims) - 1), - ) - return np.hstack(R) - - # The classes below are wrappers around Faiss indices, with different # implementations for the case when we start with an already trained # index (IndexFromCodec) vs factory strings (IndexFromFactory). @@ -107,6 +51,7 @@ def param_dict_list_to_name(param_dict_list): n = "" for param_dict in param_dict_list: n += IndexBase.param_dict_to_name(param_dict, f"cp{l}") + l += 1 return n @staticmethod @@ -115,65 +60,120 @@ def param_dict_to_name(param_dict, prefix="sp"): return "" n = prefix for name, val in param_dict.items(): - if name != "noop": - n += f"_{name}_{val}" + if name == "snap": + continue + if name == "lsq_gpu" and val == 0: + continue + if name == "use_beam_LUT" and val == 0: + continue + n += f"_{name}_{val}" if n == prefix: return "" n += "." return n @staticmethod - def set_index_param_dict_list(index, param_dict_list): + def set_index_param_dict_list(index, param_dict_list, assert_same=False): if not param_dict_list: return index = faiss.downcast_index(index) for param_dict in param_dict_list: assert index is not None - IndexBase.set_index_param_dict(index, param_dict) + IndexBase.set_index_param_dict(index, param_dict, assert_same) index = faiss.try_extract_index_ivf(index) + if index is not None: + index = index.quantizer @staticmethod - def set_index_param_dict(index, param_dict): + def set_index_param_dict(index, param_dict, assert_same=False): if not param_dict: return for name, val in param_dict.items(): - IndexBase.set_index_param(index, name, val) + IndexBase.set_index_param(index, name, val, assert_same) @staticmethod - def set_index_param(index, name, val): + def set_index_param(index, name, val, assert_same=False): index = faiss.downcast_index(index) - + val = int(val) if isinstance(index, faiss.IndexPreTransform): Index.set_index_param(index.index, name, val) - elif name == "efSearch": - index.hnsw.efSearch - index.hnsw.efSearch = int(val) - elif name == "efConstruction": - index.hnsw.efConstruction - index.hnsw.efConstruction = int(val) - elif name == "nprobe": - index_ivf = faiss.extract_index_ivf(index) - index_ivf.nprobe - index_ivf.nprobe = int(val) - elif name == "k_factor": - index.k_factor - index.k_factor = int(val) - elif name == "parallel_mode": - index_ivf = faiss.extract_index_ivf(index) - index_ivf.parallel_mode - index_ivf.parallel_mode = int(val) - elif name == "noop": - pass + return + elif name == "snap": + return + elif name == "lsq_gpu": + if val == 1: + ngpus = faiss.get_num_gpus() + icm_encoder_factory = faiss.GpuIcmEncoderFactory(ngpus) + if isinstance(index, faiss.IndexProductLocalSearchQuantizer): + for i in range(index.plsq.nsplits): + lsq = faiss.downcast_Quantizer(index.plsq.subquantizer(i)) + if lsq.icm_encoder_factory is None: + lsq.icm_encoder_factory = icm_encoder_factory + else: + if index.lsq.icm_encoder_factory is None: + index.lsq.icm_encoder_factory = icm_encoder_factory + return + elif name in ["efSearch", "efConstruction"]: + obj = index.hnsw + elif name in ["nprobe", "parallel_mode"]: + obj = faiss.extract_index_ivf(index) + elif name in ["use_beam_LUT", "max_beam_size"]: + if isinstance(index, faiss.IndexProductResidualQuantizer): + obj = [faiss.downcast_Quantizer(index.prq.subquantizer(i)) for i in range(index.prq.nsplits)] + else: + obj = index.rq + elif name == "encode_ils_iters": + if isinstance(index, faiss.IndexProductLocalSearchQuantizer): + obj = [faiss.downcast_Quantizer(index.plsq.subquantizer(i)) for i in range(index.plsq.nsplits)] + else: + obj = index.lsq else: - raise RuntimeError(f"could not set param {name} on {index}") + obj = index + + if not isinstance(obj, list): + obj = [obj] + for o in obj: + test = getattr(o, name) + if assert_same and not name == 'use_beam_LUT': + assert test == val + else: + setattr(o, name, val) + + @staticmethod + def filter_index_param_dict_list(param_dict_list): + if param_dict_list is not None and param_dict_list[0] is not None and "k_factor" in param_dict_list[0]: + filtered = copy(param_dict_list) + del filtered[0]["k_factor"] + return filtered + else: + return param_dict_list def is_flat(self): - codec = faiss.downcast_index(self.get_model()) - return isinstance(codec, faiss.IndexFlat) + model = faiss.downcast_index(self.get_model()) + return isinstance(model, faiss.IndexFlat) def is_ivf(self): - codec = self.get_model() - return faiss.try_extract_index_ivf(codec) is not None + model = self.get_model() + return faiss.try_extract_index_ivf(model) is not None + + def is_2layer(self): + def is_2layer_(index): + index = faiss.downcast_index(index) + if isinstance(index, faiss.IndexPreTransform): + return is_2layer_(index.index) + return isinstance(index, faiss.Index2Layer) + + model = self.get_model() + return is_2layer_(model) + + def is_decode_supported(self): + model = self.get_model() + if isinstance(model, faiss.IndexPreTransform): + for i in range(model.chain.size()): + vt = faiss.downcast_VectorTransform(model.chain.at(i)) + if isinstance(vt, faiss.ITQTransform): + return False + return True def is_pretransform(self): codec = self.get_model() @@ -208,12 +208,15 @@ def get_model_name(self): def get_model(self): raise NotImplementedError + def get_construction_params(self): + raise NotImplementedError + def transform(self, vectors): transformed_vectors = DatasetDescriptor( tablename=f"{vectors.get_filename()}{self.get_codec_name()}transform.npy" ) if not self.io.file_exist(transformed_vectors.tablename): - codec = self.fetch_codec() + codec = self.get_codec() assert isinstance(codec, faiss.IndexPreTransform) transform = faiss.downcast_VectorTransform(codec.chain.at(0)) x = self.io.get_dataset(vectors) @@ -221,7 +224,18 @@ def transform(self, vectors): self.io.write_nparray(xt, transformed_vectors.tablename) return transformed_vectors - def knn_search_quantizer(self, index, query_vectors, k): + def snap(self, vectors): + transformed_vectors = DatasetDescriptor( + tablename=f"{vectors.get_filename()}{self.get_codec_name()}snap.npy" + ) + if not self.io.file_exist(transformed_vectors.tablename): + codec = self.get_codec() + x = self.io.get_dataset(vectors) + xt = codec.sa_decode(codec.sa_encode(x)) + self.io.write_nparray(xt, transformed_vectors.tablename) + return transformed_vectors + + def knn_search_quantizer(self, query_vectors, k): if self.is_pretransform(): pretransform = self.get_pretransform() quantizer_query_vectors = pretransform.transform(query_vectors) @@ -229,7 +243,9 @@ def knn_search_quantizer(self, index, query_vectors, k): pretransform = None quantizer_query_vectors = query_vectors - QD, QI, _, QP = self.get_quantizer(pretransform).knn_search( + quantizer, _, _ = self.get_quantizer(dry_run=False, pretransform=pretransform) + QD, QI, _, QP, _ = quantizer.knn_search( + dry_run=False, search_parameters=None, query_vectors=quantizer_query_vectors, k=k, @@ -242,20 +258,31 @@ def get_knn_search_name( search_parameters: Optional[Dict[str, int]], query_vectors: DatasetDescriptor, k: int, + reconstruct: bool = False, ): name = self.get_index_name() name += Index.param_dict_to_name(search_parameters) name += query_vectors.get_filename("q") name += f"k_{k}." + name += f"t_{self.num_threads}." + if reconstruct: + name += "rec." + else: + name += "knn." return name def knn_search( self, + dry_run, search_parameters: Optional[Dict[str, int]], query_vectors: DatasetDescriptor, k: int, + I_gt=None, + D_gt=None, ): - logger.info("knn_seach: begin") + logger.info("knn_search: begin") + if search_parameters is not None and search_parameters["snap"] == 1: + query_vectors = self.snap(query_vectors) filename = ( self.get_knn_search_name(search_parameters, query_vectors, k) + "zip" @@ -264,15 +291,28 @@ def knn_search( logger.info(f"Using cached results for {filename}") D, I, R, P = self.io.read_file(filename, ["D", "I", "R", "P"]) else: - xq = self.io.get_dataset(query_vectors) + if dry_run: + return None, None, None, None, filename index = self.get_index() Index.set_index_param_dict(index, search_parameters) - if self.is_ivf(): + if self.is_2layer(): + # Index2Layer doesn't support search + xq = self.io.get_dataset(query_vectors) + xb = index.reconstruct_n(0, index.ntotal) + (D, I), t, _ = timer("knn_search 2layer", lambda: faiss.knn(xq, xb, k)) + elif self.is_ivf() and not isinstance(index, faiss.IndexRefine): + index_ivf = faiss.extract_index_ivf(index) + nprobe = ( + search_parameters["nprobe"] + if search_parameters is not None + and "nprobe" in search_parameters + else index_ivf.nprobe + ) xqt, QD, QI, QP = self.knn_search_quantizer( - index, query_vectors, search_parameters["nprobe"] + query_vectors=query_vectors, + k=nprobe, ) - index_ivf = faiss.extract_index_ivf(index) if index_ivf.parallel_mode != 2: logger.info("Setting IVF parallel mode") index_ivf.parallel_mode = 2 @@ -281,22 +321,23 @@ def knn_search( "knn_search_preassigned", lambda: index_ivf.search_preassigned(xqt, k, QI, QD), ) + # Dref, Iref = index.search(xq, k) + # np.testing.assert_array_equal(I, Iref) + # np.testing.assert_allclose(D, Dref) else: + xq = self.io.get_dataset(query_vectors) (D, I), t, _ = timer("knn_search", lambda: index.search(xq, k)) if self.is_flat() or not hasattr(self, "database_vectors"): # TODO R = D else: + xq = self.io.get_dataset(query_vectors) xb = self.io.get_dataset(self.database_vectors) - R = refine_distances_knn(D, I, xq, xb, self.metric_type) + R = refine_distances_knn(xq, xb, I, self.metric_type) P = { "time": t, - "index": self.get_index_name(), - "codec": self.get_codec_name(), - "factory": self.factory if hasattr(self, "factory") else "", - "search_params": search_parameters, "k": k, } - if self.is_ivf(): + if self.is_ivf() and not isinstance(index, faiss.IndexRefine): stats = faiss.cvar.indexIVF_stats P |= { "quantizer": QP, @@ -310,16 +351,113 @@ def knn_search( "search_time": int(stats.search_time // repeat), } self.io.write_file(filename, ["D", "I", "R", "P"], [D, I, R, P]) - logger.info("knn_seach: end") - return D, I, R, P + P |= { + "index": self.get_index_name(), + "codec": self.get_codec_name(), + "factory": self.get_model_name(), + "construction_params": self.get_construction_params(), + "search_params": search_parameters, + "knn_intersection": knn_intersection_measure( + I, I_gt, + ) if I_gt is not None else None, + "distance_ratio": distance_ratio_measure( + I, R, D_gt, self.metric_type, + ) if D_gt is not None else None, + } + logger.info("knn_search: end") + return D, I, R, P, None + + def reconstruct( + self, + dry_run, + parameters: Optional[Dict[str, int]], + query_vectors: DatasetDescriptor, + k: int, + I_gt, + ): + logger.info("reconstruct: begin") + filename = ( + self.get_knn_search_name(parameters, query_vectors, k, reconstruct=True) + + "zip" + ) + if self.io.file_exist(filename): + logger.info(f"Using cached results for {filename}") + P, = self.io.read_file(filename, ["P"]) + P["index"] = self.get_index_name() + P["codec"] = self.get_codec_name() + P["factory"] = self.get_model_name() + P["reconstruct_params"] = parameters + P["construction_params"] = self.get_construction_params() + else: + if dry_run: + return None, filename + codec = self.get_codec() + codec_meta = self.fetch_meta() + Index.set_index_param_dict(codec, parameters) + xb = self.io.get_dataset(self.database_vectors) + xb_encoded, encode_t, _ = timer("sa_encode", lambda: codec.sa_encode(xb)) + xq = self.io.get_dataset(query_vectors) + if self.is_decode_supported(): + xb_decoded, decode_t, _ = timer("sa_decode", lambda: codec.sa_decode(xb_encoded)) + mse = np.square(xb_decoded - xb).sum(axis=1).mean().item() + _, I = faiss.knn(xq, xb_decoded, k, metric=self.metric_type) + asym_recall = knn_intersection_measure(I, I_gt) + xq_decoded = codec.sa_decode(codec.sa_encode(xq)) + _, I = faiss.knn(xq_decoded, xb_decoded, k, metric=self.metric_type) + else: + mse = None + asym_recall = None + decode_t = None + # assume hamming for sym + xq_encoded = codec.sa_encode(xq) + bin = faiss.IndexBinaryFlat(xq_encoded.shape[1] * 8) + bin.add(xb_encoded) + _, I = bin.search(xq_encoded, k) + sym_recall = knn_intersection_measure(I, I_gt) + P = { + "encode_time": encode_t, + "decode_time": decode_t, + "mse": mse, + "sym_recall": sym_recall, + "asym_recall": asym_recall, + "cpu": get_cpu_info(), + "num_threads": self.num_threads, + "index": self.get_index_name(), + "codec": self.get_codec_name(), + "factory": self.get_model_name(), + "reconstruct_params": parameters, + "construction_params": self.get_construction_params(), + "codec_meta": codec_meta, + } + self.io.write_file(filename, ["P"], [P]) + logger.info("reconstruct: end") + return P, None + + def get_range_search_name( + self, + search_parameters: Optional[Dict[str, int]], + query_vectors: DatasetDescriptor, + radius: Optional[float] = None, + ): + name = self.get_index_name() + name += Index.param_dict_to_name(search_parameters) + name += query_vectors.get_filename("q") + if radius is not None: + name += f"r_{int(radius * 1000)}." + else: + name += "r_auto." + return name def range_search( self, + dry_run, search_parameters: Optional[Dict[str, int]], query_vectors: DatasetDescriptor, radius: Optional[float] = None, ): logger.info("range_search: begin") + if search_parameters is not None and search_parameters["snap"] == 1: + query_vectors = self.snap(query_vectors) filename = ( self.get_range_search_name( search_parameters, query_vectors, radius @@ -332,13 +470,15 @@ def range_search( filename, ["lims", "D", "I", "R", "P"] ) else: + if dry_run: + return None, None, None, None, None, filename xq = self.io.get_dataset(query_vectors) index = self.get_index() Index.set_index_param_dict(index, search_parameters) if self.is_ivf(): xqt, QD, QI, QP = self.knn_search_quantizer( - index, query_vectors, search_parameters["nprobe"] + query_vectors, search_parameters["nprobe"] ) index_ivf = faiss.extract_index_ivf(index) if index_ivf.parallel_mode != 2: @@ -364,9 +504,6 @@ def range_search( ) P = { "time": t, - "index": self.get_codec_name(), - "codec": self.get_codec_name(), - "search_params": search_parameters, "radius": radius, "count": len(I), } @@ -386,8 +523,15 @@ def range_search( self.io.write_file( filename, ["lims", "D", "I", "R", "P"], [lims, D, I, R, P] ) + P |= { + "index": self.get_index_name(), + "codec": self.get_codec_name(), + "factory": self.get_model_name(), + "construction_params": self.get_construction_params(), + "search_params": search_parameters, + } logger.info("range_seach: end") - return lims, D, I, R, P + return lims, D, I, R, P, None # Common base for IndexFromCodec and IndexFromFactory, @@ -396,16 +540,15 @@ def range_search( # they share the configuration of their parent IndexFromCodec @dataclass class Index(IndexBase): + num_threads: int d: int metric: str database_vectors: DatasetDescriptor construction_params: List[Dict[str, int]] search_params: Dict[str, int] - cached_codec_name: ClassVar[str] = None - cached_codec: ClassVar[faiss.Index] = None - cached_index_name: ClassVar[str] = None - cached_index: ClassVar[faiss.Index] = None + cached_codec: ClassVar[OrderedDict[str, faiss.Index]] = OrderedDict() + cached_index: ClassVar[OrderedDict[str, faiss.Index]] = OrderedDict() def __post_init__(self): if isinstance(self.metric, str): @@ -438,16 +581,13 @@ def supports_range_search(self): def fetch_codec(self): raise NotImplementedError - def train(self): - # get triggers a train, if necessary - self.get_codec() - def get_codec(self): codec_name = self.get_codec_name() - if Index.cached_codec_name != codec_name: - Index.cached_codec = self.fetch_codec() - Index.cached_codec_name = codec_name - return Index.cached_codec + if codec_name not in Index.cached_codec: + Index.cached_codec[codec_name], _, _ = self.fetch_codec() + if len(Index.cached_codec) > 1: + Index.cached_codec.popitem(last=False) + return Index.cached_codec[codec_name] def get_index_name(self): name = self.get_codec_name() @@ -456,14 +596,16 @@ def get_index_name(self): return name def fetch_index(self): - index = faiss.clone_index(self.get_codec()) + index = self.get_codec() + index.reset() assert index.ntotal == 0 logger.info("Adding vectors to index") xb = self.io.get_dataset(self.database_vectors) - if self.is_ivf(): + if self.is_ivf() and not isinstance(index, faiss.IndexRefine): xbt, QD, QI, QP = self.knn_search_quantizer( - index, self.database_vectors, 1 + query_vectors=self.database_vectors, + k=1, ) index_ivf = faiss.extract_index_ivf(index) if index_ivf.parallel_mode != 2: @@ -483,31 +625,43 @@ def fetch_index(self): ) assert index.ntotal == xb.shape[0] or index_ivf.ntotal == xb.shape[0] logger.info("Added vectors to index") - return index + return index, t def get_index(self): index_name = self.get_index_name() - if Index.cached_index_name != index_name: - Index.cached_index = self.fetch_index() - Index.cached_index_name = index_name - return Index.cached_index - - def get_code_size(self): - def get_index_code_size(index): - index = faiss.downcast_index(index) - if isinstance(index, faiss.IndexPreTransform): - return get_index_code_size(index.index) - elif isinstance(index, faiss.IndexHNSWFlat): - return index.d * 4 # TODO - elif type(index) in [faiss.IndexRefine, faiss.IndexRefineFlat]: - return get_index_code_size( - index.base_index - ) + get_index_code_size(index.refine_index) - else: - return index.code_size - - codec = self.get_codec() - return get_index_code_size(codec) + if index_name not in Index.cached_index: + Index.cached_index[index_name], _ = self.fetch_index() + if len(Index.cached_index) > 3: + Index.cached_index.popitem(last=False) + return Index.cached_index[index_name] + + def get_construction_params(self): + return self.construction_params + + # def get_code_size(self): + # def get_index_code_size(index): + # index = faiss.downcast_index(index) + # if isinstance(index, faiss.IndexPreTransform): + # return get_index_code_size(index.index) + # elif isinstance(index, faiss.IndexHNSWFlat): + # return index.d * 4 # TODO + # elif type(index) in [faiss.IndexRefine, faiss.IndexRefineFlat]: + # return get_index_code_size( + # index.base_index + # ) + get_index_code_size(index.refine_index) + # else: + # return index.code_size + + # codec = self.get_codec() + # return get_index_code_size(codec) + + def get_sa_code_size(self, codec=None): + if codec is None: + codec = self.get_codec() + try: + return codec.sa_code_size() + except: + return None def get_operating_points(self): op = OperatingPointsWithRanges() @@ -520,45 +674,70 @@ def add_range_or_val(name, range): else range, ) - op.add_range("noop", [0]) - codec = faiss.downcast_index(self.get_codec()) - codec_ivf = faiss.try_extract_index_ivf(codec) - if codec_ivf is not None: + add_range_or_val("snap", [0]) + model = self.get_model() + model_ivf = faiss.try_extract_index_ivf(model) + if model_ivf is not None: add_range_or_val( "nprobe", - [ + # [ + # 2**i + # for i in range(12) + # if 2**i <= model_ivf.nlist * 0.5 + # ], + [1, 2, 4, 6, 8, 10, 12, 14, 16, 20, 24, 28] + [ + i + for i in range(32, 64, 8) + if i <= model_ivf.nlist * 0.1 + ] + [ + i + for i in range(64, 128, 16) + if i <= model_ivf.nlist * 0.1 + ] + [ + i + for i in range(128, 256, 32) + if i <= model_ivf.nlist * 0.1 + ] + [ + i + for i in range(256, 512, 64) + if i <= model_ivf.nlist * 0.1 + ] + [ 2**i - for i in range(12) - if 2**i <= codec_ivf.nlist * 0.25 + for i in range(9, 12) + if 2**i <= model_ivf.nlist * 0.1 ], ) - if isinstance(codec, faiss.IndexRefine): + model = faiss.downcast_index(model) + if isinstance(model, faiss.IndexRefine): add_range_or_val( "k_factor", - [2**i for i in range(11)], + [2**i for i in range(13)], ) - if isinstance(codec, faiss.IndexHNSWFlat): + elif isinstance(model, faiss.IndexHNSWFlat): add_range_or_val( "efSearch", [2**i for i in range(3, 11)], ) + elif isinstance(model, faiss.IndexResidualQuantizer) or isinstance(model, faiss.IndexProductResidualQuantizer): + add_range_or_val( + "max_beam_size", + [1, 2, 4, 8, 16, 32], + ) + add_range_or_val( + "use_beam_LUT", + [1], + ) + elif isinstance(model, faiss.IndexLocalSearchQuantizer) or isinstance(model, faiss.IndexProductLocalSearchQuantizer): + add_range_or_val( + "encode_ils_iters", + [2, 4, 8, 16], + ) + add_range_or_val( + "lsq_gpu", + [1], + ) return op - def get_range_search_name( - self, - search_parameters: Optional[Dict[str, int]], - query_vectors: DatasetDescriptor, - radius: Optional[float] = None, - ): - name = self.get_index_name() - name += Index.param_dict_to_name(search_parameters) - name += query_vectors.get_filename("q") - if radius is not None: - name += f"r_{int(radius * 1000)}." - else: - name += "r_auto." - return name - # IndexFromCodec, IndexFromQuantizer and IndexFromPreTransform # are used to wrap pre-trained Faiss indices (codecs) @@ -581,6 +760,9 @@ def get_pretransform(self): quantizer.set_io(self.io) return quantizer + def get_model_name(self): + return os.path.basename(self.path) + def get_codec_name(self): assert self.path is not None name = os.path.basename(self.path) @@ -596,7 +778,7 @@ def fetch_codec(self): assert self.d == codec.d assert self.metric_type == codec.metric_type Index.set_index_param_dict_list(codec, self.construction_params) - return codec + return codec, None, None def get_model(self): return self.get_codec() @@ -609,6 +791,9 @@ def __init__(self, ivf_index: Index): self.ivf_index = ivf_index super().__init__() + def get_model_name(self): + return self.get_index_name() + def get_codec_name(self): return self.get_index_name() @@ -657,17 +842,49 @@ def get_codec_name(self): name += Index.param_dict_list_to_name(self.construction_params) return name - def fetch_codec(self): + def fetch_meta(self, dry_run=False): + meta_filename = self.get_codec_name() + "json" + if self.io.file_exist(meta_filename): + meta = self.io.read_json(meta_filename) + report = None + else: + _, meta, report = self.fetch_codec(dry_run=dry_run) + return meta, report + + def fetch_codec(self, dry_run=False): codec_filename = self.get_codec_name() + "codec" - if self.io.file_exist(codec_filename): + meta_filename = self.get_codec_name() + "json" + if self.io.file_exist(codec_filename) and self.io.file_exist(meta_filename): codec = self.io.read_index(codec_filename) assert self.d == codec.d assert self.metric_type == codec.metric_type + meta = self.io.read_json(meta_filename) else: - codec = self.assemble() - if self.factory != "Flat": - self.io.write_index(codec, codec_filename) - return codec + codec, training_time, requires = self.assemble(dry_run=dry_run) + if requires is not None: + assert dry_run + if requires == "": + return None, None, codec_filename + else: + return None, None, requires + codec_size = self.io.write_index(codec, codec_filename) + assert codec_size is not None + meta = { + "training_time": training_time, + "training_size": self.training_vectors.num_vectors, + "codec_size": codec_size, + "sa_code_size": self.get_sa_code_size(codec), + "cpu": get_cpu_info(), + } + self.io.write_json(meta, meta_filename, overwrite=True) + + Index.set_index_param_dict_list( + codec, self.construction_params, assert_same=True + ) + return codec, meta, None + + def get_model_name(self): + return self.factory def get_model(self): model = faiss.index_factory(self.d, self.factory, self.metric_type) @@ -675,27 +892,27 @@ def get_model(self): return model def get_pretransform(self): - model = faiss.index_factory(self.d, self.factory, self.metric_type) + model = self.get_model() assert isinstance(model, faiss.IndexPreTransform) sub_index = faiss.downcast_index(model.index) if isinstance(sub_index, faiss.IndexFlat): return self # replace the sub-index with Flat - codec = faiss.clone_index(model) - codec.index = faiss.IndexFlat(codec.index.d, codec.index.metric_type) + model.index = faiss.IndexFlat(model.index.d, model.index.metric_type) pretransform = IndexFromFactory( - d=codec.d, - metric=codec.metric_type, + num_threads=self.num_threads, + d=model.d, + metric=model.metric_type, database_vectors=self.database_vectors, construction_params=self.construction_params, - search_params=self.search_params, - factory=reverse_index_factory(codec), + search_params=None, + factory=reverse_index_factory(model), training_vectors=self.training_vectors, ) pretransform.set_io(self.io) return pretransform - def get_quantizer(self, pretransform=None): + def get_quantizer(self, dry_run, pretransform=None): model = self.get_model() model_ivf = faiss.extract_index_ivf(model) assert isinstance(model_ivf, faiss.IndexIVF) @@ -704,82 +921,114 @@ def get_quantizer(self, pretransform=None): training_vectors = self.training_vectors else: training_vectors = pretransform.transform(self.training_vectors) - centroids = self.k_means(training_vectors, model_ivf.nlist) + centroids, t, requires = training_vectors.k_means(self.io, model_ivf.nlist, dry_run) + if requires is not None: + return None, None, requires quantizer = IndexFromFactory( + num_threads=self.num_threads, d=model_ivf.quantizer.d, metric=model_ivf.quantizer.metric_type, database_vectors=centroids, - construction_params=None, # self.construction_params[1:], - search_params=None, # self.construction_params[0], # TODO: verify + construction_params=self.construction_params[1:] + if self.construction_params is not None + else None, + search_params=None, factory=reverse_index_factory(model_ivf.quantizer), training_vectors=centroids, ) quantizer.set_io(self.io) - return quantizer + return quantizer, t, None - def k_means(self, vectors, k): - kmeans_vectors = DatasetDescriptor( - tablename=f"{vectors.get_filename()}kmeans_{k}.npy" - ) - if not self.io.file_exist(kmeans_vectors.tablename): - x = self.io.get_dataset(vectors) - kmeans = faiss.Kmeans(d=x.shape[1], k=k, gpu=True) - kmeans.train(x) - self.io.write_nparray(kmeans.centroids, kmeans_vectors.tablename) - return kmeans_vectors - - def assemble(self): + def assemble(self, dry_run): + logger.info(f"assemble {self.factory}") model = self.get_model() - codec = faiss.clone_index(model) - if isinstance(model, faiss.IndexPreTransform): - sub_index = faiss.downcast_index(model.index) - if not isinstance(sub_index, faiss.IndexFlat): - # replace the sub-index with Flat and fetch pre-trained - pretransform = self.get_pretransform() - codec = pretransform.fetch_codec() - assert codec.is_trained - transformed_training_vectors = pretransform.transform( - self.training_vectors - ) - transformed_database_vectors = pretransform.transform( - self.database_vectors - ) - # replace the Flat index with the required sub-index + opaque = True + t_aggregate = 0 + try: + reverse_index_factory(model) + opaque = False + except NotImplementedError: + opaque = True + if opaque: + codec = model + else: + if isinstance(model, faiss.IndexPreTransform): + logger.info(f"assemble: pretransform {self.factory}") + sub_index = faiss.downcast_index(model.index) + if not isinstance(sub_index, faiss.IndexFlat): + # replace the sub-index with Flat and fetch pre-trained + pretransform = self.get_pretransform() + codec, meta, report = pretransform.fetch_codec(dry_run=dry_run) + if report is not None: + return None, None, report + t_aggregate += meta["training_time"] + assert codec.is_trained + transformed_training_vectors = pretransform.transform( + self.training_vectors + ) + # replace the Flat index with the required sub-index + wrapper = IndexFromFactory( + num_threads=self.num_threads, + d=sub_index.d, + metric=sub_index.metric_type, + database_vectors=None, + construction_params=self.construction_params, + search_params=None, + factory=reverse_index_factory(sub_index), + training_vectors=transformed_training_vectors, + ) + wrapper.set_io(self.io) + codec.index, meta, report = wrapper.fetch_codec(dry_run=dry_run) + if report is not None: + return None, None, report + t_aggregate += meta["training_time"] + assert codec.index.is_trained + else: + codec = model + elif isinstance(model, faiss.IndexIVF): + logger.info(f"assemble: ivf {self.factory}") + # replace the quantizer + quantizer, t, requires = self.get_quantizer(dry_run=dry_run) + if requires is not None: + return None, None, requires + t_aggregate += t + codec = faiss.clone_index(model) + quantizer_index, t = quantizer.fetch_index() + t_aggregate += t + replace_ivf_quantizer(codec, quantizer_index) + assert codec.quantizer.is_trained + assert codec.nlist == codec.quantizer.ntotal + elif isinstance(model, faiss.IndexRefine) or isinstance( + model, faiss.IndexRefineFlat + ): + logger.info(f"assemble: refine {self.factory}") + # replace base_index wrapper = IndexFromFactory( - d=sub_index.d, - metric=sub_index.metric_type, - database_vectors=transformed_database_vectors, - construction_params=self.construction_params, - search_params=self.search_params, - factory=reverse_index_factory(sub_index), - training_vectors=transformed_training_vectors, + num_threads=self.num_threads, + d=model.base_index.d, + metric=model.base_index.metric_type, + database_vectors=self.database_vectors, + construction_params=IndexBase.filter_index_param_dict_list(self.construction_params), + search_params=None, + factory=reverse_index_factory(model.base_index), + training_vectors=self.training_vectors, ) wrapper.set_io(self.io) - codec.index = wrapper.fetch_codec() - assert codec.index.is_trained - elif isinstance(model, faiss.IndexIVF): - # replace the quantizer - quantizer = self.get_quantizer() - replace_ivf_quantizer(codec, quantizer.fetch_index()) - assert codec.quantizer.is_trained - assert codec.nlist == codec.quantizer.ntotal - elif isinstance(model, faiss.IndexRefine) or isinstance( - model, faiss.IndexRefineFlat - ): - # replace base_index - wrapper = IndexFromFactory( - d=model.base_index.d, - metric=model.base_index.metric_type, - database_vectors=self.database_vectors, - construction_params=self.construction_params, - search_params=self.search_params, - factory=reverse_index_factory(model.base_index), - training_vectors=self.training_vectors, - ) - wrapper.set_io(self.io) - codec.base_index = wrapper.fetch_codec() - assert codec.base_index.is_trained + codec = faiss.clone_index(model) + codec.base_index, meta, requires = wrapper.fetch_codec(dry_run=dry_run) + if requires is not None: + return None, None, requires + t_aggregate += meta["training_time"] + assert codec.base_index.is_trained + else: + codec = model - xt = self.io.get_dataset(self.training_vectors) - codec.train(xt) - return codec + if self.factory != "Flat": + if dry_run: + return None, None, "" + logger.info(f"assemble, train {self.factory}") + xt = self.io.get_dataset(self.training_vectors) + _, t, _ = timer("train", lambda: codec.train(xt), once=True) + t_aggregate += t + + return codec, t_aggregate, None diff --git a/benchs/bench_fw/utils.py b/benchs/bench_fw/utils.py new file mode 100644 index 0000000000..e1e513169b --- /dev/null +++ b/benchs/bench_fw/utils.py @@ -0,0 +1,107 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# +# This source code is licensed under the MIT license found in the +# LICENSE file in the root directory of this source tree. + +from time import perf_counter +import logging +from multiprocessing.pool import ThreadPool +import numpy as np +import faiss # @manual=//faiss/python:pyfaiss_gpu +import functools + +logger = logging.getLogger(__name__) + +def timer(name, func, once=False) -> float: + logger.info(f"Measuring {name}") + t1 = perf_counter() + res = func() + t2 = perf_counter() + t = t2 - t1 + repeat = 1 + if not once and t < 1.0: + repeat = int(2.0 // t) + logger.info( + f"Time for {name}: {t:.3f} seconds, repeating {repeat} times" + ) + t1 = perf_counter() + for _ in range(repeat): + res = func() + t2 = perf_counter() + t = (t2 - t1) / repeat + logger.info(f"Time for {name}: {t:.3f} seconds") + return res, t, repeat + + +def refine_distances_knn( + xq: np.ndarray, xb: np.ndarray, I: np.ndarray, metric, +): + """ Recompute distances between xq[i] and xb[I[i, :]] """ + nq, k = I.shape + xq = np.ascontiguousarray(xq, dtype='float32') + nq2, d = xq.shape + xb = np.ascontiguousarray(xb, dtype='float32') + nb, d2 = xb.shape + I = np.ascontiguousarray(I, dtype='int64') + assert nq2 == nq + assert d2 == d + D = np.empty(I.shape, dtype='float32') + D[:] = np.inf + if metric == faiss.METRIC_L2: + faiss.fvec_L2sqr_by_idx( + faiss.swig_ptr(D), faiss.swig_ptr(xq), faiss.swig_ptr(xb), + faiss.swig_ptr(I), d, nq, k + ) + else: + faiss.fvec_inner_products_by_idx( + faiss.swig_ptr(D), faiss.swig_ptr(xq), faiss.swig_ptr(xb), + faiss.swig_ptr(I), d, nq, k + ) + return D + + +def refine_distances_range( + lims: np.ndarray, + D: np.ndarray, + I: np.ndarray, + xq: np.ndarray, + xb: np.ndarray, + metric, +): + with ThreadPool(32) as pool: + R = pool.map( + lambda i: ( + np.sum(np.square(xq[i] - xb[I[lims[i] : lims[i + 1]]]), axis=1) + if metric == faiss.METRIC_L2 + else np.tensordot( + xq[i], xb[I[lims[i] : lims[i + 1]]], axes=(0, 1) + ) + ) + if lims[i + 1] > lims[i] + else [], + range(len(lims) - 1), + ) + return np.hstack(R) + + +def distance_ratio_measure(I, R, D_GT, metric): + sum_of_R = np.sum(np.where(I >= 0, R, 0)) + sum_of_D_GT = np.sum(np.where(I >= 0, D_GT, 0)) + if metric == faiss.METRIC_INNER_PRODUCT: + return (sum_of_R / sum_of_D_GT).item() + elif metric == faiss.METRIC_L2: + return (sum_of_D_GT / sum_of_R).item() + else: + raise RuntimeError(f"unknown metric {metric}") + + +@functools.cache +def get_cpu_info(): + return [l for l in open("/proc/cpuinfo", "r") if "model name" in l][0][13:].strip() + +def dict_merge(target, source): + for k, v in source.items(): + if isinstance(v, dict) and k in target: + dict_merge(target[k], v) + else: + target[k] = v diff --git a/benchs/bench_fw_codecs.py b/benchs/bench_fw_codecs.py new file mode 100644 index 0000000000..80741e23f7 --- /dev/null +++ b/benchs/bench_fw_codecs.py @@ -0,0 +1,146 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# +# This source code is licensed under the MIT license found in the +# LICENSE file in the root directory of this source tree. + +import logging +import argparse +import os + +from bench_fw.benchmark import Benchmark +from bench_fw.benchmark_io import BenchmarkIO +from bench_fw.descriptors import DatasetDescriptor, IndexDescriptor +from bench_fw.index import IndexFromFactory + +logging.basicConfig(level=logging.INFO) + +def factory_factory(d): + return [ + ("SQ4", None, 256 * (2 ** 10), None), + ("SQ8", None, 256 * (2 ** 10), None), + ("SQfp16", None, 256 * (2 ** 10), None), + ("ITQ64,LSH", None, 256 * (2 ** 10), None), + ("Pad128,ITQ128,LSH", None, 256 * (2 ** 10), None), + ("Pad256,ITQ256,LSH", None, 256 * (2 ** 10), None), + ] + [ + (f"OPQ32_128,Residual2x14,PQ32x{b}", None, 256 * (2 ** 14), None) + for b in range(8, 16, 2) + ] + [ + (f"PCAR{2 ** d_out},SQ{b}", None, 256 * (2 ** 10), None) + for d_out in range(6, 11) + if 2 ** d_out <= d + for b in [4, 8] + ] + [ + (f"OPQ{M}_{M * dim},PQ{M}x{b}", None, 256 * (2 ** b), None) + for M in [8, 12, 16, 32, 64, 128] + for dim in [2, 4, 6, 8, 12, 16] + if M * dim <= d + for b in range(8, 16, 2) + ] + [ + (f"RQ{cs // b}x{b}", [{"max_beam_size": 32}], 256 * (2 ** b), {"max_beam_size": bs, "use_beam_LUT": bl}) + for cs in [64, 128, 256, 512] + for b in [6, 8, 10, 12] + for bs in [1, 2, 4, 8, 16, 32] + for bl in [0, 1] + if cs // b > 1 + if cs // b < 65 + if cs < d * 8 * 2 + ] + [ + (f"LSQ{cs // b}x{b}", [{"encode_ils_iters": 16}], 256 * (2 ** b), {"encode_ils_iters": eii, "lsq_gpu": lg}) + for cs in [64, 128, 256, 512] + for b in [6, 8, 10, 12] + for eii in [2, 4, 8, 16] + for lg in [0, 1] + if cs // b > 1 + if cs // b < 65 + if cs < d * 8 * 2 + ] + [ + (f"PRQ{sub}x{cs // sub // b}x{b}", [{"max_beam_size": 32}], 256 * (2 ** b), {"max_beam_size": bs, "use_beam_LUT": bl}) + for sub in [2, 3, 4, 8, 16, 32] + for cs in [64, 96, 128, 192, 256, 384, 512, 768, 1024, 2048] + for b in [6, 8, 10, 12] + for bs in [1, 2, 4, 8, 16, 32] + for bl in [0, 1] + if cs // sub // b > 1 + if cs // sub // b < 65 + if cs < d * 8 * 2 + if d % sub == 0 + ] + [ + (f"PLSQ{sub}x{cs // sub // b}x{b}", [{"encode_ils_iters": 16}], 256 * (2 ** b), {"encode_ils_iters": eii, "lsq_gpu": lg}) + for sub in [2, 3, 4, 8, 16, 32] + for cs in [64, 128, 256, 512, 1024, 2048] + for b in [6, 8, 10, 12] + for eii in [2, 4, 8, 16] + for lg in [0, 1] + if cs // sub // b > 1 + if cs // sub // b < 65 + if cs < d * 8 * 2 + if d % sub == 0 + ] + +def run_local(rp): + bio, d, tablename, distance_metric = rp + if tablename == "contriever": + training_vectors=DatasetDescriptor( + tablename="training_set.npy" + ) + database_vectors=DatasetDescriptor( + tablename="database1M.npy", + ) + query_vectors=DatasetDescriptor( + tablename="queries.npy", + ) + else: + training_vectors=DatasetDescriptor( + namespace="std_t", tablename=tablename, + ) + database_vectors=DatasetDescriptor( + namespace="std_d", tablename=tablename, + ) + query_vectors=DatasetDescriptor( + namespace="std_q", tablename=tablename, + ) + + benchmark = Benchmark( + num_threads=32, + training_vectors=training_vectors, + database_vectors=database_vectors, + query_vectors=query_vectors, + index_descs=[ + IndexDescriptor( + factory=factory, + construction_params=construction_params, + training_size=training_size, + search_params=search_params, + ) + for factory, construction_params, training_size, search_params in factory_factory(d) + ], + k=1, + distance_metric=distance_metric, + ) + benchmark.set_io(bio) + benchmark.benchmark(result_file="result.json", train=False, reconstruct=False, knn=False, range=False) + +def run(bio, d, tablename, distance_metric): + bio.launch_jobs(run_local, [(bio, d, tablename, distance_metric)], local=True) + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + parser.add_argument('experiment') + parser.add_argument('path') + args = parser.parse_args() + assert os.path.exists(args.path) + path = os.path.join(args.path, args.experiment) + if not os.path.exists(path): + os.mkdir(path) + bio = BenchmarkIO( + path=path, + ) + if args.experiment == "sift1M": + run(bio, 128, "sift1M", "L2") + elif args.experiment == "bigann": + run(bio, 128, "bigann1M", "L2") + elif args.experiment == "deep1b": + run(bio, 96, "deep1M", "L2") + elif args.experiment == "contriever": + run(bio, 768, "contriever", "IP") diff --git a/benchs/bench_fw_ivf.py b/benchs/bench_fw_ivf.py new file mode 100644 index 0000000000..8c84743e27 --- /dev/null +++ b/benchs/bench_fw_ivf.py @@ -0,0 +1,120 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# +# This source code is licensed under the MIT license found in the +# LICENSE file in the root directory of this source tree. + +import logging +import argparse +import os + +from bench_fw.benchmark import Benchmark +from bench_fw.benchmark_io import BenchmarkIO +from bench_fw.descriptors import DatasetDescriptor, IndexDescriptor + +logging.basicConfig(level=logging.INFO) + +def sift1M(bio): + benchmark = Benchmark( + num_threads=32, + training_vectors=DatasetDescriptor( + namespace="std_d", tablename="sift1M" + ), + database_vectors=DatasetDescriptor( + namespace="std_d", tablename="sift1M" + ), + query_vectors=DatasetDescriptor( + namespace="std_q", tablename="sift1M" + ), + index_descs=[ + IndexDescriptor( + factory=f"IVF{2 ** nlist},Flat", + ) + for nlist in range(8, 15) + ], + k=1, + distance_metric="L2", + ) + benchmark.set_io(bio) + benchmark.benchmark(result_file="result.json", local=False, train=True, reconstruct=False, knn=True, range=False) + +def bigann(bio): + for scale in [1, 2, 5, 10, 20, 50]: + benchmark = Benchmark( + num_threads=32, + training_vectors=DatasetDescriptor( + namespace="std_t", tablename="bigann1M" + ), + database_vectors=DatasetDescriptor( + namespace="std_d", tablename=f"bigann{scale}M" + ), + query_vectors=DatasetDescriptor( + namespace="std_q", tablename="bigann1M" + ), + index_descs=[ + IndexDescriptor( + factory=f"IVF{2 ** nlist},Flat", + ) for nlist in range(11, 19) + ] + [ + IndexDescriptor( + factory=f"IVF{2 ** nlist}_HNSW32,Flat", + construction_params=[None, {"efConstruction": 200, "efSearch": 40}], + ) for nlist in range(11, 19) + ], + k=1, + distance_metric="L2", + ) + benchmark.set_io(bio) + benchmark.benchmark(f"result{scale}.json", local=False, train=True, reconstruct=False, knn=True, range=False) + +def ssnpp(bio): + benchmark = Benchmark( + num_threads=32, + training_vectors=DatasetDescriptor( + tablename="ssnpp_training_5M.npy" + ), + database_vectors=DatasetDescriptor( + tablename="ssnpp_database_5M.npy" + ), + query_vectors=DatasetDescriptor( + tablename="ssnpp_queries_10K.npy" + ), + index_descs=[ + IndexDescriptor( + factory=f"IVF{2 ** nlist},PQ256x4fs,Refine(SQfp16)", + ) for nlist in range(9, 16) + ] + [ + IndexDescriptor( + factory=f"IVF{2 ** nlist},Flat", + ) for nlist in range(9, 16) + ] + [ + IndexDescriptor( + factory=f"PQ256x4fs,Refine(SQfp16)", + ), + IndexDescriptor( + factory=f"HNSW32", + ), + ], + k=1, + distance_metric="L2", + ) + benchmark.set_io(bio) + benchmark.benchmark("result.json", local=False, train=True, reconstruct=False, knn=True, range=False) + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + parser.add_argument('experiment') + parser.add_argument('path') + args = parser.parse_args() + assert os.path.exists(args.path) + path = os.path.join(args.path, args.experiment) + if not os.path.exists(path): + os.mkdir(path) + bio = BenchmarkIO( + path=path, + ) + if args.experiment == "sift1M": + sift1M(bio) + elif args.experiment == "bigann": + bigann(bio) + elif args.experiment == "ssnpp": + ssnpp(bio) diff --git a/benchs/bench_fw_ivf_flat.py b/benchs/bench_fw_ivf_flat.py deleted file mode 100644 index 37b4bd7862..0000000000 --- a/benchs/bench_fw_ivf_flat.py +++ /dev/null @@ -1,37 +0,0 @@ -# Copyright (c) Meta Platforms, Inc. and affiliates. -# -# This source code is licensed under the MIT license found in the -# LICENSE file in the root directory of this source tree. - -import logging - -from bench_fw.benchmark import Benchmark -from bench_fw.benchmark_io import BenchmarkIO -from bench_fw.descriptors import DatasetDescriptor, IndexDescriptor - -logging.basicConfig(level=logging.INFO) - -benchmark = Benchmark( - training_vectors=DatasetDescriptor( - namespace="std_d", tablename="sift1M" - ), - database_vectors=DatasetDescriptor( - namespace="std_d", tablename="sift1M" - ), - query_vectors=DatasetDescriptor( - namespace="std_q", tablename="sift1M" - ), - index_descs=[ - IndexDescriptor( - factory=f"IVF{2 ** nlist},Flat", - ) - for nlist in range(8, 15) - ], - k=1, - distance_metric="L2", -) -io = BenchmarkIO( - path="/checkpoint", -) -benchmark.set_io(io) -print(benchmark.benchmark("result.json")) diff --git a/benchs/bench_fw_notebook.ipynb b/benchs/bench_fw_notebook.ipynb index 7cc39ea2cb..c6183a8eb9 100644 --- a/benchs/bench_fw_notebook.ipynb +++ b/benchs/bench_fw_notebook.ipynb @@ -4,24 +4,35 @@ "cell_type": "code", "execution_count": null, "id": "be081589-e1b2-4569-acb7-44203e273899", - "metadata": {}, + "metadata": { + "tags": [] + }, "outputs": [], "source": [ "import matplotlib.pyplot as plt\n", "import itertools\n", "from faiss.contrib.evaluation import OperatingPoints\n", "from enum import Enum\n", - "from bench_fw.benchmark_io import BenchmarkIO as BIO" + "from bench_fw.benchmark_io import BenchmarkIO as BIO\n", + "from copy import copy\n", + "import numpy as np\n", + "import datetime\n", + "import glob\n", + "import io\n", + "import json\n", + "from zipfile import ZipFile" ] }, { "cell_type": "code", "execution_count": null, "id": "a6492e95-24c7-4425-bf0a-27e10e879ca6", - "metadata": {}, + "metadata": { + "tags": [] + }, "outputs": [], "source": [ - "root = \"/checkpoint\"\n", + "root = \"/checkpoint/gsz/bench_fw/range/ssnpp\"\n", "results = BIO(root).read_json(\"result.json\")\n", "results.keys()" ] @@ -30,17 +41,21 @@ "cell_type": "code", "execution_count": null, "id": "0875d269-aef4-426d-83dd-866970f43777", - "metadata": {}, + "metadata": { + "tags": [] + }, "outputs": [], "source": [ - "results['indices']" + "results['experiments']" ] }, { "cell_type": "code", "execution_count": null, "id": "a7ff7078-29c7-407c-a079-201877b764ad", - "metadata": {}, + "metadata": { + "tags": [] + }, "outputs": [], "source": [ "class Cost:\n", @@ -89,7 +104,7 @@ " time_metric = lambda v: v['time'] * scaling_factor + (v['quantizer']['time'] if 'quantizer' in v else 0)\n", "\n", " if space_metric is None:\n", - " space_metric = lambda v: results['indices'][v['codec']]['code_size']\n", + " space_metric = lambda v: results['indices'][v['codec']]['sa_code_size']\n", " \n", " fe = []\n", " ops = {}\n", @@ -102,12 +117,16 @@ " if min_accuracy > 0 and accuracy < min_accuracy:\n", " continue\n", " space = space_metric(v)\n", + " if space is None:\n", + " space = 0 \n", " if max_space > 0 and space > max_space:\n", " continue\n", " time = time_metric(v)\n", " if max_time > 0 and time > max_time:\n", " continue\n", - " idx_name = v['index']\n", + " idx_name = v['index'] + (\"snap\" if 'search_params' in v and v['search_params'][\"snap\"] == 1 else \"\")\n", + " # if idx_name.startswith(\"HNSW\"):\n", + " # continue\n", " experiment = (accuracy, space, time, k, v)\n", " if pareto_mode == ParetoMode.DISABLE:\n", " fe.append(experiment)\n", @@ -136,14 +155,18 @@ "cell_type": "code", "execution_count": null, "id": "f080a6e2-1565-418b-8732-4adeff03a099", - "metadata": {}, + "metadata": { + "tags": [] + }, "outputs": [], "source": [ - "def plot_metric(experiments, accuracy_title, cost_title, plot_space=False):\n", + "def plot_metric(experiments, accuracy_title, cost_title, plot_space=False, plot=None):\n", + " if plot is None:\n", + " plot = plt.subplot()\n", " x = {}\n", " y = {}\n", " for accuracy, space, time, k, v in experiments:\n", - " idx_name = v['index']\n", + " idx_name = v['index'] + (\"snap\" if 'search_params' in v and v['search_params'][\"snap\"] == 1 else \"\")\n", " if idx_name not in x:\n", " x[idx_name] = []\n", " y[idx_name] = []\n", @@ -154,26 +177,43 @@ " y[idx_name].append(time)\n", "\n", " #plt.figure(figsize=(10,6))\n", - " plt.yscale(\"log\")\n", - " plt.title(accuracy_title)\n", - " plt.xlabel(accuracy_title)\n", - " plt.ylabel(cost_title)\n", + " #plt.title(accuracy_title)\n", + " plot.set_xlabel(accuracy_title)\n", + " plot.set_ylabel(cost_title)\n", " marker = itertools.cycle((\"o\", \"v\", \"^\", \"<\", \">\", \"s\", \"p\", \"P\", \"*\", \"h\", \"X\", \"D\")) \n", " for index in x.keys():\n", - " plt.plot(x[index], y[index], marker=next(marker), label=index)\n", - " plt.legend(bbox_to_anchor=(1, 1), loc='upper left')" + " plot.plot(x[index], y[index], marker=next(marker), label=index, linewidth=0)\n", + " plot.legend(bbox_to_anchor=(1, 1), loc='upper left')" ] }, { "cell_type": "code", "execution_count": null, "id": "61007155-5edc-449e-835e-c141a01a2ae5", - "metadata": {}, + "metadata": { + "tags": [] + }, "outputs": [], "source": [ + "# index local optima\n", "accuracy_metric = \"knn_intersection\"\n", "fr = filter_results(results, evaluation=\"knn\", accuracy_metric=accuracy_metric, pareto_mode=ParetoMode.INDEX, pareto_metric=ParetoMetric.TIME, scaling_factor=1)\n", - "plot_metric(fr, accuracy_title=\"knn intersection\", cost_title=\"time (seconds, 16 cores)\")" + "plot_metric(fr, accuracy_title=\"knn intersection\", cost_title=\"time (seconds, 32 cores)\", plot_space=False)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "f9f94dcc-5abe-4cad-9619-f5d1d24fb8c1", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "# global optima\n", + "accuracy_metric = \"knn_intersection\"\n", + "fr = filter_results(results, evaluation=\"knn\", accuracy_metric=accuracy_metric, min_accuracy=0.5, pareto_mode=ParetoMode.GLOBAL, pareto_metric=ParetoMetric.TIME, scaling_factor=1)\n", + "plot_metric(fr, accuracy_title=\"knn intersection\", cost_title=\"time (seconds, 32 cores)\")" ] }, { @@ -183,6 +223,7 @@ "metadata": {}, "outputs": [], "source": [ + "# index local optima @ precision 0.8\n", "precision = 0.8\n", "accuracy_metric = lambda exp: range_search_recall_at_precision(exp, precision)\n", "fr = filter_results(results, evaluation=\"weighted\", accuracy_metric=accuracy_metric, pareto_mode=ParetoMode.INDEX, pareto_metric=ParetoMetric.TIME, scaling_factor=1)\n", @@ -196,7 +237,7 @@ "metadata": {}, "outputs": [], "source": [ - "# index local optima\n", + "# index local optima @ precision 0.2\n", "precision = 0.2\n", "accuracy_metric = lambda exp: range_search_recall_at_precision(exp, precision)\n", "fr = filter_results(results, evaluation=\"weighted\", accuracy_metric=accuracy_metric, pareto_mode=ParetoMode.INDEX, pareto_metric=ParetoMetric.TIME, scaling_factor=1)\n", @@ -210,7 +251,7 @@ "metadata": {}, "outputs": [], "source": [ - "# global optima\n", + "# global optima @ precision 0.8\n", "precision = 0.8\n", "accuracy_metric = lambda exp: range_search_recall_at_precision(exp, precision)\n", "fr = filter_results(results, evaluation=\"weighted\", accuracy_metric=accuracy_metric, pareto_mode=ParetoMode.GLOBAL, pareto_metric=ParetoMetric.TIME, scaling_factor=1)\n", @@ -260,6 +301,293 @@ "cell_type": "code", "execution_count": null, "id": "fdf8148a-0da6-4c5e-8d60-f8f85314574c", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "root = \"/checkpoint/gsz/bench_fw/ivf/bigann\"\n", + "scales = [1, 2, 5, 10, 20, 50]\n", + "fig, plots = plt.subplots(len(scales), sharex=True, figsize=(5,25))\n", + "fig.tight_layout()\n", + "for plot, scale in zip(plots, scales, strict=True):\n", + " results = BIO(root).read_json(f\"result{scale}.json\")\n", + " accuracy_metric = \"knn_intersection\"\n", + " fr = filter_results(results, evaluation=\"knn\", accuracy_metric=accuracy_metric, min_accuracy=0.9, pareto_mode=ParetoMode.GLOBAL, pareto_metric=ParetoMetric.TIME, scaling_factor=1)\n", + " plot_metric(fr, accuracy_title=\"knn intersection\", cost_title=\"time (seconds, 64 cores)\", plot=plot)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "e503828c-ee61-45f7-814b-cce6461109bc", + "metadata": {}, + "outputs": [], + "source": [ + "x = {}\n", + "y = {}\n", + "accuracy=0.9\n", + "root = \"/checkpoint/gsz/bench_fw/ivf/bigann\"\n", + "scales = [1, 2, 5, 10, 20, 50]\n", + "#fig, plots = plt.subplots(len(scales), sharex=True, figsize=(5,25))\n", + "#fig.tight_layout()\n", + "for scale in scales:\n", + " results = BIO(root).read_json(f\"result{scale}.json\")\n", + " scale *= 1_000_000\n", + " accuracy_metric = \"knn_intersection\"\n", + " fr = filter_results(results, evaluation=\"knn\", accuracy_metric=accuracy_metric, min_accuracy=accuracy, pareto_mode=ParetoMode.INDEX, pareto_metric=ParetoMetric.TIME, scaling_factor=1)\n", + " seen = set()\n", + " print(scale)\n", + " for _, _, _, _, exp in fr:\n", + " fact = exp[\"factory\"]\n", + " # \"HNSW\" in fact or \n", + " if fact in seen or fact in [\"Flat\", \"IVF512,Flat\", \"IVF1024,Flat\", \"IVF2048,Flat\"]:\n", + " continue\n", + " seen.add(fact)\n", + " if fact not in x:\n", + " x[fact] = []\n", + " y[fact] = []\n", + " x[fact].append(scale)\n", + " y[fact].append(exp[\"time\"] + exp[\"quantizer\"][\"time\"])\n", + " if (exp[\"knn_intersection\"] > 0.92):\n", + " print(fact)\n", + " print(exp[\"search_params\"])\n", + " print(exp[\"knn_intersection\"])\n", + "\n", + " #plot_metric(fr, accuracy_title=\"knn intersection\", cost_title=\"time (seconds, 64 cores)\", plot=plot)\n", + " \n", + "plt.title(f\"recall @ 1 = {accuracy*100}%\")\n", + "plt.xlabel(\"database size\")\n", + "plt.ylabel(\"time\")\n", + "plt.xscale(\"log\")\n", + "plt.yscale(\"log\")\n", + "\n", + "marker = itertools.cycle((\"o\", \"v\", \"^\", \"<\", \">\", \"s\", \"p\", \"P\", \"*\", \"h\", \"X\", \"D\")) \n", + "for index in x.keys():\n", + " if \"HNSW\" in index:\n", + " plt.plot(x[index], y[index], label=index, linewidth=1, marker=next(marker), linestyle=\"dashed\")\n", + " else:\n", + " plt.plot(x[index], y[index], label=index, linewidth=1, marker=next(marker))\n", + "plt.legend(bbox_to_anchor=(1.0, 1.0), loc='upper left')" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "37a99bb2-f998-461b-a345-7cc6e702cb3a", + "metadata": {}, + "outputs": [], + "source": [ + "# global optima\n", + "accuracy_metric = \"sym_recall\"\n", + "fr = filter_results(results, evaluation=\"rec\", accuracy_metric=accuracy_metric, time_metric=lambda e:e['encode_time'], min_accuracy=0.9, pareto_mode=ParetoMode.GLOBAL, pareto_metric=ParetoMetric.SPACE, scaling_factor=1)\n", + "plot_metric(fr, accuracy_title=\"knn intersection\", cost_title=\"space\", plot_space=True)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "c973ce4e-3566-4f02-bd93-f113e3e0c791", + "metadata": {}, + "outputs": [], + "source": [ + "def pretty_time(s):\n", + " if s is None:\n", + " return \"None\"\n", + " s = int(s * 1000) / 1000\n", + " m, s = divmod(s, 60)\n", + " h, m = divmod(m, 60)\n", + " d, h = divmod(h, 24)\n", + " r = \"\"\n", + " if d > 0:\n", + " r += f\"{int(d)}d \"\n", + " if h > 0:\n", + " r += f\"{int(h)}h \"\n", + " if m > 0:\n", + " r += f\"{int(m)}m \"\n", + " if s > 0 or len(r) == 0:\n", + " r += f\"{s:.3f}s\"\n", + " return r\n", + "\n", + "def pretty_size(s):\n", + " if s > 1024 * 1024:\n", + " return f\"{s / 1024 / 1024:.1f}\".rstrip('0').rstrip('.') + \"MB\"\n", + " if s > 1024:\n", + " return f\"{s / 1024:.1f}\".rstrip('0').rstrip('.') + \"KB\"\n", + " return f\"{s}\"\n", + "\n", + "def pretty_mse(m):\n", + " if m is None:\n", + " return \"None\"\n", + " else:\n", + " return f\"{m:.6f}\"" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "1ddcf226-fb97-4a59-9fc3-3ed8f7d5e703", + "metadata": {}, + "outputs": [], + "source": [ + "data = {}\n", + "root = \"/checkpoint/gsz/bench_fw/bigann\"\n", + "scales = [1, 2, 5, 10, 20, 50]\n", + "for scale in scales:\n", + " results = BIO(root).read_json(f\"result{scale}.json\")\n", + " accuracy_metric = \"knn_intersection\"\n", + " fr = filter_results(results, evaluation=\"knn\", accuracy_metric=accuracy_metric, min_accuracy=0, pareto_mode=ParetoMode.INDEX, pareto_metric=ParetoMetric.TIME, scaling_factor=1)\n", + " d = {}\n", + " data[f\"{scale}M\"] = d\n", + " for _, _, _, _, exp in fr:\n", + " fact = exp[\"factory\"]\n", + " # \"HNSW\" in fact or \n", + " if fact in [\"Flat\", \"IVF512,Flat\", \"IVF1024,Flat\", \"IVF2048,Flat\"]:\n", + " continue\n", + " if fact not in d:\n", + " d[fact] = []\n", + " d[fact].append({\n", + " \"nprobe\": exp[\"search_params\"][\"nprobe\"],\n", + " \"recall\": exp[\"knn_intersection\"],\n", + " \"time\": exp[\"time\"] + exp[\"quantizer\"][\"time\"],\n", + " })\n", + "data\n", + "# with open(\"/checkpoint/gsz/bench_fw/codecs.json\", \"w\") as f:\n", + "# json.dump(data, f)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "e54eebb6-0a9f-4a72-84d2-f12c5bd44510", + "metadata": {}, + "outputs": [], + "source": [ + "ds = \"deep1b\"\n", + "data = []\n", + "jss = []\n", + "root = f\"/checkpoint/gsz/bench_fw/codecs/{ds}\"\n", + "results = BIO(root).read_json(f\"result.json\")\n", + "for k, e in results[\"experiments\"].items():\n", + " if \"rec\" in k and e['factory'] != 'Flat': # and e['sym_recall'] > 0.0: # and \"PRQ\" in e['factory'] and e['sym_recall'] > 0.0:\n", + " code_size = results['indices'][e['codec']]['sa_code_size']\n", + " codec_size = results['indices'][e['codec']]['codec_size']\n", + " training_time = results['indices'][e['codec']]['training_time']\n", + " # training_size = results['indices'][e['codec']]['training_size']\n", + " cpu = e['cpu'] if 'cpu' in e else \"\"\n", + " ps = ', '.join([f\"{k}={v}\" for k,v in e['construction_params'][0].items()]) if e['construction_params'] else \" \"\n", + " eps = ', '.join([f\"{k}={v}\" for k,v in e['reconstruct_params'].items() if k != \"snap\"]) if e['reconstruct_params'] else \" \"\n", + " data.append((code_size, f\"|{e['factory']}|{ps}|{eps}|{code_size}|{pretty_size(codec_size)}|{pretty_time(training_time)}|{training_size}|{pretty_mse(e['mse'])}|{e['sym_recall']}|{e['asym_recall']}|{pretty_time(e['encode_time'])}|{pretty_time(e['decode_time'])}|{cpu}|\"))\n", + " jss.append({\n", + " 'factory': e['factory'],\n", + " 'parameters': e['construction_params'][0] if e['construction_params'] else \"\",\n", + " 'evaluation_params': e['reconstruct_params'],\n", + " 'code_size': code_size,\n", + " 'codec_size': codec_size,\n", + " 'training_time': training_time,\n", + " 'training_size': training_size,\n", + " 'mse': e['mse'],\n", + " 'sym_recall': e['sym_recall'],\n", + " 'asym_recall': e['asym_recall'],\n", + " 'encode_time': e['encode_time'],\n", + " 'decode_time': e['decode_time'],\n", + " 'cpu': cpu,\n", + " })\n", + "\n", + "print(\"|factory key|construction parameters|evaluation parameters|code size|codec size|training time|training size|mean squared error|sym recall @ 1|asym recall @ 1|encode time|decode time|cpu|\")\n", + "print(\"|-|-|-|-|-|-|-|-|-|\")\n", + "data.sort()\n", + "for d in data:\n", + " print(d[1])\n", + "\n", + "with open(f\"/checkpoint/gsz/bench_fw/codecs_{ds}_test.json\", \"w\") as f:\n", + " json.dump(jss, f)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "d1216733-9670-407c-b3d2-5f87bce0321c", + "metadata": {}, + "outputs": [], + "source": [ + "def read_file(filename: str, keys):\n", + " results = []\n", + " with ZipFile(filename, \"r\") as zip_file:\n", + " for key in keys:\n", + " with zip_file.open(key, \"r\") as f:\n", + " if key in [\"D\", \"I\", \"R\", \"lims\"]:\n", + " results.append(np.load(f))\n", + " elif key in [\"P\"]:\n", + " t = io.TextIOWrapper(f)\n", + " results.append(json.load(t))\n", + " else:\n", + " raise AssertionError()\n", + " return results" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "56de051e-22db-4bef-b242-1ddabc9e0bb9", + "metadata": {}, + "outputs": [], + "source": [ + "ds = \"contriever\"\n", + "data = []\n", + "jss = []\n", + "root = f\"/checkpoint/gsz/bench_fw/codecs/{ds}\"\n", + "for lf in glob.glob(root + '/*rec*.zip'):\n", + " e, = read_file(lf, ['P'])\n", + " if e['factory'] != 'Flat': # and e['sym_recall'] > 0.0: # and \"PRQ\" in e['factory'] and e['sym_recall'] > 0.0:\n", + " code_size = e['codec_meta']['sa_code_size']\n", + " codec_size = e['codec_meta']['codec_size']\n", + " training_time = e['codec_meta']['training_time']\n", + " training_size = None # e['codec_meta']['training_size']\n", + " cpu = e['cpu'] if 'cpu' in e else \"\"\n", + " ps = ', '.join([f\"{k}={v}\" for k,v in e['construction_params'][0].items()]) if e['construction_params'] else \" \"\n", + " eps = ', '.join([f\"{k}={v}\" for k,v in e['reconstruct_params'].items() if k != \"snap\"]) if e['reconstruct_params'] else \" \"\n", + " if eps in ps and eps != \"encode_ils_iters=16\" and eps != \"max_beam_size=32\":\n", + " eps = \" \"\n", + " data.append((code_size, f\"|{e['factory']}|{ps}|{eps}|{code_size}|{pretty_size(codec_size)}|{pretty_time(training_time)}|{pretty_mse(e['mse'])}|{e['sym_recall']}|{e['asym_recall']}|{pretty_time(e['encode_time'])}|{pretty_time(e['decode_time'])}|{cpu}|\"))\n", + " eps = e['reconstruct_params']\n", + " del eps['snap']\n", + " params = copy(e['construction_params'][0]) if e['construction_params'] else {}\n", + " for k, v in e['reconstruct_params'].items():\n", + " params[k] = v\n", + " jss.append({\n", + " 'factory': e['factory'],\n", + " 'params': params,\n", + " 'construction_params': e['construction_params'][0] if e['construction_params'] else {},\n", + " 'evaluation_params': e['reconstruct_params'],\n", + " 'code_size': code_size,\n", + " 'codec_size': codec_size,\n", + " 'training_time': training_time,\n", + " # 'training_size': training_size,\n", + " 'mse': e['mse'],\n", + " 'sym_recall': e['sym_recall'],\n", + " 'asym_recall': e['asym_recall'],\n", + " 'encode_time': e['encode_time'],\n", + " 'decode_time': e['decode_time'],\n", + " 'cpu': cpu,\n", + " })\n", + "\n", + "print(\"|factory key|construction parameters|encode/decode parameters|code size|codec size|training time|mean squared error|sym recall @ 1|asym recall @ 1|encode time|decode time|cpu|\")\n", + "print(\"|-|-|-|-|-|-|-|-|-|\")\n", + "data.sort()\n", + "# for d in data:\n", + "# print(d[1])\n", + "\n", + "print(len(data))\n", + "\n", + "with open(f\"/checkpoint/gsz/bench_fw/codecs_{ds}_5.json\", \"w\") as f:\n", + " json.dump(jss, f)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "2fd712bf-f147-4c1b-9dbf-b04428e4c1eb", "metadata": {}, "outputs": [], "source": [] @@ -267,9 +595,9 @@ ], "metadata": { "kernelspec": { - "display_name": "Python [conda env:faiss_cpu_from_source] *", + "display_name": "Python [conda env:.conda-faiss_from_source] *", "language": "python", - "name": "conda-env-faiss_cpu_from_source-py" + "name": "conda-env-.conda-faiss_from_source-py" }, "language_info": { "codemirror_mode": { diff --git a/benchs/bench_fw_range.py b/benchs/bench_fw_range.py new file mode 100644 index 0000000000..f38de114f9 --- /dev/null +++ b/benchs/bench_fw_range.py @@ -0,0 +1,83 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# +# This source code is licensed under the MIT license found in the +# LICENSE file in the root directory of this source tree. + +import logging +import argparse +import os + +from bench_fw.benchmark import Benchmark +from bench_fw.benchmark_io import BenchmarkIO +from bench_fw.descriptors import DatasetDescriptor, IndexDescriptor + +logging.basicConfig(level=logging.INFO) + +def ssnpp(bio): + benchmark = Benchmark( + num_threads=32, + training_vectors=DatasetDescriptor( + tablename="ssnpp_training_5M.npy", + ), + database_vectors=DatasetDescriptor( + tablename="ssnpp_xb_range_filtered_119201.npy", + ), + query_vectors=DatasetDescriptor(tablename="ssnpp_xq_range_filtered_33615.npy"), + index_descs=[ + IndexDescriptor( + factory="Flat", + range_metrics={ + "weighted": [ + [0.05, 0.971], + [0.1, 0.956], + [0.15, 0.923], + [0.2, 0.887], + [0.25, 0.801], + [0.3, 0.729], + [0.35, 0.651], + [0.4, 0.55], + [0.45, 0.459], + [0.5, 0.372], + [0.55, 0.283], + [0.6, 0.189], + [0.65, 0.143], + [0.7, 0.106], + [0.75, 0.116], + [0.8, 0.088], + [0.85, 0.064], + [0.9, 0.05], + [0.95, 0.04], + [1.0, 0.028], + [1.05, 0.02], + [1.1, 0.013], + [1.15, 0.007], + [1.2, 0.004], + [1.3, 0], + ] + }, + ), + IndexDescriptor( + factory="IVF262144(PQ256x4fs),PQ32", + ), + ], + k=10, + distance_metric="L2", + range_ref_index_desc="Flat", + ) + benchmark.set_io(bio) + benchmark.benchmark("result.json", local=False, train=True, reconstruct=False, knn=False, range=True) + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + parser.add_argument('experiment') + parser.add_argument('path') + args = parser.parse_args() + assert os.path.exists(args.path) + path = os.path.join(args.path, args.experiment) + if not os.path.exists(path): + os.mkdir(path) + bio = BenchmarkIO( + path=path, + ) + if args.experiment == "ssnpp": + ssnpp(bio) diff --git a/benchs/bench_fw_test.py b/benchs/bench_fw_test.py deleted file mode 100644 index 55b9e16e65..0000000000 --- a/benchs/bench_fw_test.py +++ /dev/null @@ -1,61 +0,0 @@ -# Copyright (c) Meta Platforms, Inc. and affiliates. -# -# This source code is licensed under the MIT license found in the -# LICENSE file in the root directory of this source tree. - -import logging - -from bench_fw.benchmark import Benchmark -from bench_fw.benchmark_io import BenchmarkIO -from bench_fw.descriptors import DatasetDescriptor, IndexDescriptor - -logging.basicConfig(level=logging.INFO) - -benchmark = Benchmark( - training_vectors=DatasetDescriptor( - tablename="training.npy", num_vectors=200000 - ), - database_vectors=DatasetDescriptor( - tablename="database.npy", num_vectors=200000 - ), - query_vectors=DatasetDescriptor(tablename="query.npy", num_vectors=2000), - index_descs=[ - IndexDescriptor( - factory="Flat", - range_metrics={ - "weighted": [ - [0.1, 0.928], - [0.2, 0.865], - [0.3, 0.788], - [0.4, 0.689], - [0.5, 0.49], - [0.6, 0.308], - [0.7, 0.193], - [0.8, 0.0], - ] - }, - ), - IndexDescriptor( - factory="OPQ32_128,IVF512,PQ32", - ), - IndexDescriptor( - factory="OPQ32_256,IVF512,PQ32", - ), - IndexDescriptor( - factory="HNSW32", - construction_params=[ - { - "efConstruction": 64, - } - ], - ), - ], - k=10, - distance_metric="L2", - range_ref_index_desc="Flat", -) -io = BenchmarkIO( - path="/checkpoint", -) -benchmark.set_io(io) -print(benchmark.benchmark("result.json")) diff --git a/contrib/factory_tools.py b/contrib/factory_tools.py index da90e986f8..745dc7f7ff 100644 --- a/contrib/factory_tools.py +++ b/contrib/factory_tools.py @@ -101,12 +101,23 @@ def reverse_index_factory(index): return prefix + ",SQ8" if isinstance(index, faiss.IndexIVFPQ): return prefix + f",PQ{index.pq.M}x{index.pq.nbits}" + if isinstance(index, faiss.IndexIVFPQFastScan): + return prefix + f",PQ{index.pq.M}x{index.pq.nbits}fs" elif isinstance(index, faiss.IndexPreTransform): - assert index.chain.size() == 1 + if index.chain.size() != 1: + raise NotImplementedError() vt = faiss.downcast_VectorTransform(index.chain.at(0)) if isinstance(vt, faiss.OPQMatrix): - return f"OPQ{vt.M}_{vt.d_out},{reverse_index_factory(index.index)}" + prefix = f"OPQ{vt.M}_{vt.d_out}" + elif isinstance(vt, faiss.ITQTransform): + prefix = f"ITQ{vt.itq.d_out}" + elif isinstance(vt, faiss.PCAMatrix): + assert vt.eigen_power == 0 + prefix = "PCA" + ("R" if vt.random_rotation else "") + str(vt.d_out) + else: + raise NotImplementedError() + return f"{prefix},{reverse_index_factory(index.index)}" elif isinstance(index, faiss.IndexHNSW): return f"HNSW{get_hnsw_M(index)}" @@ -117,6 +128,12 @@ def reverse_index_factory(index): elif isinstance(index, faiss.IndexPQFastScan): return f"PQ{index.pq.M}x{index.pq.nbits}fs" + elif isinstance(index, faiss.IndexPQ): + return f"PQ{index.pq.M}x{index.pq.nbits}" + + elif isinstance(index, faiss.IndexLSH): + return "LSH" + ("r" if index.rotate_data else "") + ("t" if index.train_thresholds else "") + elif isinstance(index, faiss.IndexScalarQuantizer): sqtypes = { faiss.ScalarQuantizer.QT_8bit: "8",