Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GPU Aggregation (3/8): CPUAggregator #8888

Merged
merged 9 commits into from
Jun 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ export type AggregatedBin = {
value: number[];
/** Count of data points in this bin */
count: number;
/** Indices of data points in this bin. Only available if using CPU aggregation. */
pointIndices?: number[];
};

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
import type {Bin} from './cpu-aggregator';
import type {AggregationOperation} from '../aggregator';

type AggregationFunc = (pointIndices: number[], getValue: (index: number) => number) => number;

const count: AggregationFunc = pointIndices => {
return pointIndices.length;
};

const sum: AggregationFunc = (pointIndices, getValue) => {
let result = 0;
for (const i of pointIndices) {
result += getValue(i);
}
return result;
};

const mean: AggregationFunc = (pointIndices, getValue) => {
if (pointIndices.length === 0) {
return NaN;
}
return sum(pointIndices, getValue) / pointIndices.length;
};

const min: AggregationFunc = (pointIndices, getValue) => {
let result = Infinity;
for (const i of pointIndices) {
const value = getValue(i);
if (value < result) {
result = value;
}
}
return result;
};

const max: AggregationFunc = (pointIndices, getValue) => {
let result = -Infinity;
for (const i of pointIndices) {
const value = getValue(i);
if (value > result) {
result = value;
}
}
return result;
};

const AGGREGATION_FUNC: Record<AggregationOperation, AggregationFunc> = {
COUNT: count,
SUM: sum,
MEAN: mean,
MIN: min,
MAX: max
} as const;

/**
* Performs the aggregation step. See interface Aggregator comments.
* @returns Floa32Array of aggregated values, one for each bin, and the [min,max] of the values
*/
export function aggregate({
bins,
getValue,
operation
}: {
/** Data points sorted by bins */
bins: Bin[];
/** Given the index of a data point, returns its value */
getValue: (index: number) => number;
/** Method used to reduce a list of values to one number */
operation: AggregationOperation;
}): {
value: Float32Array;
domain: [min: number, max: number];
} {
const target = new Float32Array(bins.length);
let min = Infinity;
let max = -Infinity;

const aggregationFunc = AGGREGATION_FUNC[operation];

for (let j = 0; j < bins.length; j++) {
const {points} = bins[j];
target[j] = aggregationFunc(points, getValue);
if (target[j] < min) min = target[j];
if (target[j] > max) max = target[j];
}

return {value: target, domain: [min, max]};
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
import type {Aggregator, AggregationProps, AggregatedBin} from '../aggregator';
import {_deepEqual as deepEqual, BinaryAttribute} from '@deck.gl/core';
import {sortBins, packBinIds} from './sort-bins';
import {aggregate} from './aggregate';
import {VertexAccessor, evaluateVertexAccessor} from './vertex-accessor';

/** Options used to construct a new CPUAggregator */
export type CPUAggregatorProps = {
/** Size of bin IDs */
dimensions: number;
/** Accessor to map each data point to a bin ID.
* If dimensions=1, bin ID should be a number;
* If dimensions>1, bin ID should be an array with [dimensions] elements;
* The data point will be skipped if bin ID is null.
*/
getBin: VertexAccessor<number[] | null, any>;
/** Accessor to map each data point to a weight value, defined per channel */
getValue: VertexAccessor<number>[];
} & Partial<CPUAggregationProps>;

/** Props used to run CPU aggregation, can be changed at any time */
type CPUAggregationProps = AggregationProps & {};

export type Bin = {
id: number[];
index: number;
/** list of data point indices */
points: number[];
};

/** An Aggregator implementation that calculates aggregation on the CPU */
export class CPUAggregator implements Aggregator {
readonly dimensions: number;
readonly channelCount: number;

props: CPUAggregatorProps & CPUAggregationProps;

/** Dirty flag
* If true, redo sorting
* If array, redo aggregation on the specified channel
*/
protected needsUpdate: boolean[] | boolean;

protected bins: Bin[] = [];
protected binIds: Float32Array | null = null;
protected results: {value: Float32Array; domain: [min: number, max: number]}[] = [];

constructor(props: CPUAggregatorProps) {
this.dimensions = props.dimensions;
this.channelCount = props.getValue.length;
this.props = {
...props,
binOptions: {},
pointCount: 0,
operations: [],
attributes: {}
};
this.needsUpdate = true;
this.setProps(props);
}

destroy() {}

get binCount() {
return this.bins.length;
}

/** Update aggregation props */
setProps(props: Partial<CPUAggregationProps>) {
const oldProps = this.props;

if (props.binOptions) {
if (!deepEqual(props.binOptions, oldProps.binOptions, 2)) {
this.setNeedsUpdate();
}
}
if (props.operations) {
for (let channel = 0; channel < this.channelCount; channel++) {
if (props.operations[channel] !== oldProps.operations[channel]) {
this.setNeedsUpdate(channel);
}
}
}
if (props.pointCount !== undefined && props.pointCount !== oldProps.pointCount) {
this.setNeedsUpdate();
}
if (props.attributes) {
props.attributes = {...oldProps.attributes, ...props.attributes};
}
Object.assign(this.props, props);
}

/** Flags a channel to need update
* This is called internally by setProps() if certain props change
* Users of this class still need to manually set the dirty flag sometimes, because even if no props changed
* the underlying buffers could have been updated and require rerunning the aggregation
* @param {number} channel - mark the given channel as dirty. If not provided, all channels will be updated.
*/
setNeedsUpdate(channel?: number): void {
if (channel === undefined) {
this.needsUpdate = true;
} else if (this.needsUpdate !== true) {
this.needsUpdate = this.needsUpdate || [];
this.needsUpdate[channel] = true;
}
}

/** Run aggregation */
update() {
if (this.needsUpdate === true) {
this.bins = sortBins({
pointCount: this.props.pointCount,
getBinId: evaluateVertexAccessor(
this.props.getBin,
this.props.attributes,
this.props.binOptions
)
});
this.binIds = packBinIds({
bins: this.bins,
dimensions: this.dimensions
});
}
for (let channel = 0; channel < this.channelCount; channel++) {
if (this.needsUpdate === true || this.needsUpdate[channel]) {
this.results[channel] = aggregate({
bins: this.bins,
getValue: evaluateVertexAccessor(
this.props.getValue[channel],
this.props.attributes,
undefined
),
operation: this.props.operations[channel]
});
}
}
this.needsUpdate = false;
}

preDraw() {}

/** Returns an accessor to the bins. */
getBins(): BinaryAttribute | null {
if (!this.binIds) {
return null;
}
return {value: this.binIds, type: 'float32', size: this.dimensions};
}

/** Returns an accessor to the output for a given channel. */
getResult(channel: number): BinaryAttribute | null {
const result = this.results[channel];
if (!result) {
return null;
}
return {value: result.value, type: 'float32', size: 1};
}

/** Returns the [min, max] of aggregated values for a given channel. */
getResultDomain(channel: number): [min: number, max: number] {
return this.results[channel]?.domain ?? [Infinity, -Infinity];
}

/** Returns the information for a given bin. */
getBin(index: number): AggregatedBin | null {
const bin = this.bins[index];
if (!bin) {
return null;
}
const value = new Array(this.channelCount);
for (let i = 0; i < value.length; i++) {
const result = this.results[i];
value[i] = result?.value[index];
}
return {
id: bin.id,
value,
count: bin.points.length,
pointIndices: bin.points
};
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
import type {Bin} from './cpu-aggregator';

/** Group data points into bins */
export function sortBins({
pointCount,
getBinId
}: {
pointCount: number;
getBinId: (index: number) => number[] | null;
}): Bin[] {
const binsById: Map<string, Bin> = new Map();

for (let i = 0; i < pointCount; i++) {
const id = getBinId(i);
if (id === null) {
continue;
}
let bin = binsById.get(String(id));
if (bin) {
bin.points.push(i);
} else {
bin = {
id,
index: binsById.size,
points: [i]
};
binsById.set(String(id), bin);
}
}
return Array.from(binsById.values());
}

/** Pack bin ids into a typed array */
export function packBinIds({
bins,
dimensions
}: {
bins: Bin[];
/** Size of bin IDs */
dimensions: number;
}): Float32Array {
const target = new Float32Array(bins.length * dimensions);

for (let i = 0; i < bins.length; i++) {
const {id} = bins[i];
if (Array.isArray(id)) {
target.set(id, i * dimensions);
} else {
target[i] = id;
}
}
return target;
}
Loading
Loading