Browse Source

Multithreading works! Kinda sorta (slows down in chrome, out of bounds accesses on phone)

ssao
A.Olokhtonov 8 months ago
parent
commit
4f84509b16
  1. 44
      client/lod_worker.js
  2. 176
      client/speed.js
  3. 2
      client/wasm/compile_command
  4. 80
      client/wasm/lod.c
  5. BIN
      client/wasm/lod.wasm
  6. 37
      client/wasm/multi.c
  7. BIN
      client/wasm/multi.wasm
  8. 40
      client/wasm_worker.js
  9. 23
      client/webgl_draw.js
  10. 4
      client/webgl_geometry.js

44
client/lod_worker.js

@ -0,0 +1,44 @@ @@ -0,0 +1,44 @@
let thread_id = null;
let exports = null;
async function init(tid, memory, heap_base) {
thread_id = tid;
const result = await WebAssembly.instantiateStreaming(fetch('wasm/lod.wasm'), {
env: { 'memory': memory }
});
exports = result.instance.exports;
exports.set_sp(heap_base - thread_id * 16 * 4096); // 64K stack
postMessage({ 'type': 'init_done' });
}
function work(indices_base, indices_count, zoom, offsets) {
exports.do_lod(
indices_base, indices_count, zoom,
offsets['coords_from'],
offsets['line_threshold'],
offsets['xs'],
offsets['ys'],
offsets['pressures'],
offsets['result_buffers'] + thread_id * 4,
offsets['result_counts'] + thread_id * 4,
);
postMessage({ 'type': 'lod_done' });
}
onmessage = (e) => {
const d = e.data;
if (d.type === 'init') {
init(d.thread_id, d.memory, d.heap_base);
} else if (d.type === 'lod') {
work(d.indices_base, d.indices_count, d.zoom, d.offsets);
} else {
console.error('unexpected worker command:', d.type);
}
}

176
client/speed.js

@ -1,74 +1,65 @@ @@ -1,74 +1,65 @@
async function init_test() {
const memory = new WebAssembly.Memory({
initial: 32,
maximum: 100,
shared: true,
function worker_message(worker, message) {
return new Promise((resolve) => {
worker.onmessage = (e) => resolve(e.data);
worker.postMessage(message);
});
}
const results = await WebAssembly.instantiateStreaming(fetch('wasm/multi.wasm'), {
env: { 'memory': memory }
});
function workers_messages(workers, messages) {
const promises = [];
const nworkers = navigator.hardwareConcurrency;
const heap_base = results.instance.exports.alloc(0);
const buf_offset = results.instance.exports.alloc(1024);
const workers = [];
const sab = new SharedArrayBuffer(nworkers * 4);
const flags = new Int32Array(sab);
let done = 0;
for (let i = 0; i < nworkers; ++i) {
const w = new Worker('wasm_worker.js');
workers.push(w);
for (let i = 0; i < workers.length; ++i) {
promises.push(worker_message(workers[i], messages[i]));
}
for (let i = 0; i < nworkers; ++i) {
workers[i].onmessage = () => {
++done;
if (done % nworkers === 0) {
console.log('init done');
for (let j = 0; j < nworkers; ++j) {
workers[j].onmessage = () => {
++done;
if (done % nworkers === 0) {
console.log('work done');
console.log(new Int32Array(memory.buffer, buf_offset, nworkers));
}
}
return Promise.all(promises);
}
workers[j].postMessage({
'type': 1,
'num': 10,
});
}
function workers_thread_message(workers, message, thread_field=null) {
const messages = [];
for (let i = 0; i < workers.length; ++i) {
if (thread_field !== null) {
const m = structuredClone(message);
m[thread_field] = i;
messages.push(m);
} else {
messages.push(message);
}
}
workers[i].postMessage({
'type': 0,
'thread_id': i,
'memory': memory,
'stack_base': heap_base,
'buffer_offset': buf_offset,
'flags': flags
return workers_messages(workers, messages);
}
async function init_wasm(state) {
const memory = new WebAssembly.Memory({
initial: 32, // 2MiB, 1MiB of which is stack
maximum: 16384, // 1GiB
shared: true,
});
}
// const results = await WebAssembly.instantiateStreaming(fetch('wasm/multi.wasm'));
// "Master thread" to do maintance on (static allocations, merging results etc)
const master_wasm = await WebAssembly.instantiateStreaming(fetch('wasm/lod.wasm'), {
env: { 'memory': memory }
});
// state.wasm.exports = results.instance.exports;
// state.wasm.exports.memory.grow(4096);
}
const nworkers = navigator.hardwareConcurrency;
async function init_wasm(state) {
await init_test();
state.wasm.exports = master_wasm.instance.exports;
state.wasm.heap_base = state.wasm.exports.alloc_static(0);
state.wasm.workers = [];
state.wasm.memory = memory;
const results = await WebAssembly.instantiateStreaming(fetch('wasm/lod.wasm'));
for (let i = 0; i < nworkers; ++i) {
const w = new Worker('lod_worker.js');
state.wasm.workers.push(w);
}
state.wasm.exports = results.instance.exports;
state.wasm.exports.memory.grow(4096);
await workers_thread_message(state.wasm.workers, {
'type': 'init',
'heap_base': state.wasm.heap_base,
'memory': memory,
}, 'thread_id');
state.wasm.stroke_bytes = 4096;
state.wasm.coords_bytes = 4096;
@ -95,7 +86,7 @@ async function init_wasm(state) { @@ -95,7 +86,7 @@ async function init_wasm(state) {
},
};
const mem = state.wasm.exports.memory.buffer;
const mem = state.wasm.memory.buffer;
state.wasm.buffers['xs'].tv = tv_create_on(Float32Array, state.wasm.coords_bytes / 8,
mem, state.wasm.buffers['xs'].offset);
@ -135,11 +126,14 @@ function wasm_ensure_by(state, nstrokes, ncoords) { @@ -135,11 +126,14 @@ function wasm_ensure_by(state, nstrokes, ncoords) {
}
if (realloc) {
// TODO: we do memory.grow() somewhere here if needed
const current_pages = state.wasm.memory.buffer.byteLength / (4096 * 16);
const need_pages = (state.wasm.coords_bytes * 3 + state.wasm.stroke_bytes * 2) / (4096 * 16); // TODO: figure out actual memory requirements
const grow_by = need_pages - current_pages;
state.wasm.memory.grow(grow_by);
state.wasm.exports.free_static();
const mem = state.wasm.exports.memory.buffer;
const mem = state.wasm.memory.buffer;
const memv = new Uint8Array(mem);
buffers['xs'].offset = state.wasm.exports.alloc_static(state.wasm.coords_bytes / 2);
@ -175,35 +169,65 @@ function wasm_ensure_by(state, nstrokes, ncoords) { @@ -175,35 +169,65 @@ function wasm_ensure_by(state, nstrokes, ncoords) {
}
}
function do_lod(state, context) {
async function do_lod(state, context) {
state.wasm.exports.free_dynamic();
const buffers = state.wasm.buffers;
const result_buffers = state.wasm.exports.alloc_dynamic(state.wasm.workers.length * 4);
const result_counts = state.wasm.exports.alloc_dynamic(state.wasm.workers.length * 4);
const clipped_indices = state.wasm.exports.alloc_dynamic(context.clipped_indices.size * 4);
const mem = new Uint8Array(state.wasm.exports.memory.buffer);
const mem = new Uint8Array(state.wasm.memory.buffer);
// Dynamic input data that should (by design) never be too big
mem.set(tv_bytes(context.clipped_indices), clipped_indices);
const buffers = state.wasm.buffers;
const segment_count = state.wasm.exports.do_lod(
clipped_indices, context.clipped_indices.size, state.canvas.zoom,
buffers['coords_from'].offset,
buffers['line_threshold'].offset,
buffers['xs'].offset,
buffers['ys'].offset,
buffers['pressures'].offset,
buffers['xs'].used / 4,
// TODO: this is a very naive and dumb way of distributing work. Better way
// would be to distrubute strokes based on total point count, so that
// each worker gets approximately the same amout of _points_
const indices_per_thread = Math.floor(context.clipped_indices.size / state.wasm.workers.length);
const offsets = {
'coords_from': buffers['coords_from'].offset,
'line_threshold': buffers['line_threshold'].offset,
'xs': buffers['xs'].offset,
'ys': buffers['ys'].offset,
'pressures': buffers['pressures'].offset,
'result_buffers': result_buffers,
'result_counts': result_counts,
};
const jobs = [];
for (let i = 0; i < state.wasm.workers.length; ++i) {
let count = indices_per_thread;
if (i === state.wasm.workers.length - 1) {
count += context.clipped_indices.size % state.wasm.workers.length;
}
jobs.push({
'type': 'lod',
'indices_base': clipped_indices + i * 4 * indices_per_thread,
'indices_count': count,
'zoom': state.canvas.zoom,
'offsets': offsets
});
}
await workers_messages(state.wasm.workers, jobs);
const result_offset = state.wasm.exports.merge_results(
result_counts,
result_buffers,
state.wasm.workers.length
);
// Use results without copying from WASM memory
const result_offset = clipped_indices + context.clipped_indices.size * 4
+ (context.clipped_indices.size + 1) * 4 + buffers['xs'].used;
const segment_count = new Int32Array(state.wasm.memory.buffer, result_counts, 1)[0]; // by convention
const wasm_points = new Float32Array(state.wasm.exports.memory.buffer,
// Use results without copying from WASM memory
const wasm_points = new Float32Array(state.wasm.memory.buffer,
result_offset, segment_count * 2);
const wasm_ids = new Uint32Array(state.wasm.exports.memory.buffer,
const wasm_ids = new Uint32Array(state.wasm.memory.buffer,
result_offset + segment_count * 2 * 4, segment_count);
const wasm_pressures = new Uint8Array(state.wasm.exports.memory.buffer,
const wasm_pressures = new Uint8Array(state.wasm.memory.buffer,
result_offset + segment_count * 2 * 4 + segment_count * 4, segment_count);
context.instance_data_points.data = wasm_points;

2
client/wasm/compile_command

@ -1 +1 @@ @@ -1 +1 @@
clang -Oz --target=wasm32 -nostdlib -msimd128 -mbulk-memory -matomics -Wl,--no-entry,--import-memory,--shared-memory,--max-memory=$((1024 * 1024 * 1024)) -z stack-size=$((1024 * 1024)) multi.c -o multi.wasm
clang -Oz --target=wasm32 -nostdlib -msimd128 -mbulk-memory -matomics -Wl,--no-entry,--import-memory,--shared-memory,--export-all,--max-memory=$((1024 * 1024 * 1024)) -z stack-size=$((1024 * 1024)) lod.c -o lod.wasm

80
client/wasm/lod.c

@ -8,10 +8,11 @@ static int allocated_static; @@ -8,10 +8,11 @@ static int allocated_static;
static int allocated_dynamic;
void
set_sp(void *sp)
set_sp(char *sp)
{
__asm__ volatile(
"local.get 0\n"
__asm__ __volatile__(
".globaltype __stack_pointer, i32\n"
"local.get %0\n"
"global.set __stack_pointer\n"
: : "r"(sp)
);
@ -32,16 +33,26 @@ free_dynamic(void) @@ -32,16 +33,26 @@ free_dynamic(void)
void *
alloc_static(int size)
{
// This IS NOT thread-safe
void *result = &__heap_base + allocated_static;
allocated_static += size;
return(result);
}
static int
round_to_pow2(int value, int multiple)
{
return((value + multiple - 1) & -multiple);
}
void *
alloc_dynamic(int size)
{
void *result = &__heap_base + allocated_static + allocated_dynamic;
allocated_dynamic += size;
// Very ad-van-ced thread-safe allocator
// CAN be called from multiple threads
size = round_to_pow2(size, 4);
int original_allocated_dynamic = __atomic_fetch_add(&allocated_dynamic, size, __ATOMIC_SEQ_CST);
void *result = &__heap_base + allocated_static + original_allocated_dynamic;
return(result);
}
@ -183,21 +194,27 @@ rdp_find_max(float *xs, float *ys, unsigned char *pressures, float zoom, int coo @@ -183,21 +194,27 @@ rdp_find_max(float *xs, float *ys, unsigned char *pressures, float zoom, int coo
return(result);
}
int
void
do_lod(int *clipped_indices, int clipped_count, float zoom,
int *stroke_coords_from,
float *line_threshold,
float *xs,
float *ys,
unsigned char *pressures,
int coordinates_count)
char **result_buffer,
int *result_count)
{
if (clipped_count == 0) {
return(0);
result_count[0] = 0;
return;
}
int first_stroke = clipped_indices[0];
int last_stroke = clipped_indices[clipped_count - 1];
int total_points = stroke_coords_from[last_stroke + 1] - stroke_coords_from[first_stroke];
int *segments_from = alloc_dynamic((clipped_count + 1) * 4);
int *segments = alloc_dynamic(coordinates_count * 4);
int *segments = alloc_dynamic(total_points * 4); // TODO: this is a very conservative estimate, we can lower memory usage if we get this tighter
int segments_head = 0;
int stack[4096]; // TODO: what's a reasonable max size for this?
@ -267,9 +284,11 @@ do_lod(int *clipped_indices, int clipped_count, float zoom, @@ -267,9 +284,11 @@ do_lod(int *clipped_indices, int clipped_count, float zoom,
segments_from[clipped_count] = segments_head;
// Write actual coordinates (points) and stroke ids
float *points = alloc_dynamic(segments_head * 2 * 4);
int *ids = alloc_dynamic(segments_head * 4);
unsigned char *pressures_res = alloc_dynamic(segments_head);
// Do this in one allocation so that they're not interleaved between threads
char *output = alloc_dynamic(segments_head * (3 * 4 + 1));
float *points = (float *) output;
int *ids = (int *) (output + segments_head * 4 * 2);
unsigned char *pressures_res = (unsigned char *) (output + segments_head * 4 * 3);
int phead = 0;
int ihead = 0;
@ -300,5 +319,40 @@ do_lod(int *clipped_indices, int clipped_count, float zoom, @@ -300,5 +319,40 @@ do_lod(int *clipped_indices, int clipped_count, float zoom,
}
}
return(segments_head);
result_buffer[0] = output;
result_count[0] = segments_head;
}
// NOT thread-safe, only call from one thread
char *
merge_results(int *segment_counts, char **buffers, int nthreads)
{
int total_segments = 0;
for (int i = 0; i < nthreads; ++i) {
total_segments += segment_counts[i];
}
char *merged = alloc_dynamic(total_segments * (3 * 4 + 1));
float *points = (float *) merged;
int *ids = (int *) (merged + total_segments * 4 * 2);
unsigned char *pressures = (unsigned char *) (merged + total_segments * 4 * 3);
for (int i = 0; i < nthreads; ++i) {
int segments = segment_counts[i];
if (segments > 0) {
__builtin_memcpy(points, buffers[i], segments * 4 * 2);
__builtin_memcpy(ids, buffers[i] + segments * 4 * 2, segments * 4);
__builtin_memcpy(pressures, buffers[i] + segments * 4 * 3, segments);
points += segments * 2;
ids += segments;
pressures += segments;
}
}
segment_counts[0] = total_segments;
return(merged);
}

BIN
client/wasm/lod.wasm

Binary file not shown.

37
client/wasm/multi.c

@ -1,37 +0,0 @@ @@ -1,37 +0,0 @@
#include <wasm_simd128.h>
extern char __heap_base;
static int allocated;
void
set_sp(char *sp)
{
__asm__ __volatile__(
".globaltype __stack_pointer, i32\n"
"local.get %0\n"
"global.set __stack_pointer\n"
: : "r"(sp)
);
}
void *
alloc(int size)
{
void *result = &__heap_base + allocated;
allocated += size;
return(result);
}
static void
impl(int *buffer, int index, int number)
{
buffer[index] = number;
}
void
write_a_number(int *buffer, int index, int number)
{
int n = number * 2;
impl(buffer, index, n);
}

BIN
client/wasm/multi.wasm

Binary file not shown.

40
client/wasm_worker.js

@ -1,40 +0,0 @@ @@ -1,40 +0,0 @@
let thread_id = null;
let buf_offset = null;
let exports = null;
let flags = null;
function done() {
postMessage('done');
}
async function init_wasm(tid, memory, offset, notify_flags, stack_base) {
thread_id = tid;
buf_offset = offset;
const result = await WebAssembly.instantiateStreaming(fetch('wasm/multi.wasm'), {
env: { 'memory': memory }
});
exports = result.instance.exports;
//console.log(tid, 'init');
exports.set_sp(stack_base - thread_id * 4096);
flags = notify_flags;
done();
}
function do_work(num, callback) {
//console.log(thread_id, 'work');
exports.write_a_number(buf_offset, thread_id, thread_id * num);
done();
}
onmessage = (e) => {
if (e.data.type === 0) {
init_wasm(e.data.thread_id, e.data.memory, e.data.buffer_offset, e.data.flags, e.data.stack_base);
} else if (e.data.type === 1) {
do_work(e.data.num);
}
}

23
client/webgl_draw.js

@ -1,7 +1,7 @@ @@ -1,7 +1,7 @@
function schedule_draw(state, context) {
if (!state.timers.raf) {
window.requestAnimationFrame(() => {
draw(state, context);
window.requestAnimationFrame(async () => {
await draw(state, context);
});
state.timers.raf = true;
}
@ -73,7 +73,7 @@ function draw_html(state) { @@ -73,7 +73,7 @@ function draw_html(state) {
}
}
function draw(state, context) {
async function draw(state, context) {
const cpu_before = performance.now();
state.timers.raf = false;
@ -89,22 +89,23 @@ function draw(state, context) { @@ -89,22 +89,23 @@ function draw(state, context) {
gl.beginQuery(context.gpu_timer_ext.TIME_ELAPSED_EXT, query);
}
gl.viewport(0, 0, context.canvas.width, context.canvas.height);
gl.clearColor(context.bgcolor.r, context.bgcolor.g, context.bgcolor.b, 1);
gl.clearDepth(0.0);
gl.clear(gl.COLOR_BUFFER_BIT | gl.DEPTH_BUFFER_BIT);
locations = context.locations['sdf'].main;
buffers = context.buffers['sdf'];
gl.useProgram(context.programs['sdf'].main);
bvh_clip(state, context);
const segment_count = geometry_write_instances(state, context);
const segment_count = await geometry_write_instances(state, context);
const dynamic_segment_count = context.dynamic_segment_count;
const dynamic_stroke_count = context.dynamic_stroke_count;
// Only clear once we have the data, this might not always be on the same frame?
gl.viewport(0, 0, context.canvas.width, context.canvas.height);
gl.clearColor(context.bgcolor.r, context.bgcolor.g, context.bgcolor.b, 1);
gl.clearDepth(0.0);
gl.clear(gl.COLOR_BUFFER_BIT | gl.DEPTH_BUFFER_BIT);
gl.useProgram(context.programs['sdf'].main);
// "Static" data upload
if (segment_count > 0) {
const total_static_size = context.instance_data_points.size * 4 +

4
client/webgl_geometry.js

@ -37,11 +37,11 @@ function geometry_prepare_stroke(state) { @@ -37,11 +37,11 @@ function geometry_prepare_stroke(state) {
}
function geometry_write_instances(state, context) {
async function geometry_write_instances(state, context, callback) {
state.stats.rdp_max_count = 0;
state.stats.rdp_segments = 0;
const segment_count = do_lod(state, context);
const segment_count = await do_lod(state, context);
if (config.debug_print) console.debug('instances:', segment_count, 'rdp max:', state.stats.rdp_max_count, 'rdp segments:', state.stats.rdp_segments);

Loading…
Cancel
Save