diff --git a/client/lod_worker.js b/client/lod_worker.js new file mode 100644 index 0000000..2f382f8 --- /dev/null +++ b/client/lod_worker.js @@ -0,0 +1,44 @@ +let thread_id = null; +let exports = null; + +async function init(tid, memory, heap_base) { + thread_id = tid; + + const result = await WebAssembly.instantiateStreaming(fetch('wasm/lod.wasm'), { + env: { 'memory': memory } + }); + + exports = result.instance.exports; + exports.set_sp(heap_base - thread_id * 16 * 4096); // 64K stack + + postMessage({ 'type': 'init_done' }); +} + +function work(indices_base, indices_count, zoom, offsets) { + exports.do_lod( + indices_base, indices_count, zoom, + offsets['coords_from'], + offsets['line_threshold'], + offsets['xs'], + offsets['ys'], + offsets['pressures'], + offsets['result_buffers'] + thread_id * 4, + offsets['result_counts'] + thread_id * 4, + ); + + postMessage({ 'type': 'lod_done' }); +} + +onmessage = (e) => { + const d = e.data; + + if (d.type === 'init') { + init(d.thread_id, d.memory, d.heap_base); + } else if (d.type === 'lod') { + work(d.indices_base, d.indices_count, d.zoom, d.offsets); + } else { + console.error('unexpected worker command:', d.type); + } +} + + diff --git a/client/speed.js b/client/speed.js index bc91a27..9cfbaa6 100644 --- a/client/speed.js +++ b/client/speed.js @@ -1,74 +1,65 @@ -async function init_test() { - const memory = new WebAssembly.Memory({ - initial: 32, - maximum: 100, - shared: true, - }); - - const results = await WebAssembly.instantiateStreaming(fetch('wasm/multi.wasm'), { - env: { 'memory': memory } +function worker_message(worker, message) { + return new Promise((resolve) => { + worker.onmessage = (e) => resolve(e.data); + worker.postMessage(message); }); +} - const nworkers = navigator.hardwareConcurrency; - const heap_base = results.instance.exports.alloc(0); - const buf_offset = results.instance.exports.alloc(1024); - const workers = []; - const sab = new SharedArrayBuffer(nworkers * 4); - const flags = new Int32Array(sab); - let done = 0; +function workers_messages(workers, messages) { + const promises = []; - for (let i = 0; i < nworkers; ++i) { - const w = new Worker('wasm_worker.js'); - workers.push(w); + for (let i = 0; i < workers.length; ++i) { + promises.push(worker_message(workers[i], messages[i])); } - for (let i = 0; i < nworkers; ++i) { - workers[i].onmessage = () => { - ++done; - if (done % nworkers === 0) { - console.log('init done'); - - for (let j = 0; j < nworkers; ++j) { - workers[j].onmessage = () => { - ++done; - - if (done % nworkers === 0) { - console.log('work done'); - console.log(new Int32Array(memory.buffer, buf_offset, nworkers)); - } - } - - workers[j].postMessage({ - 'type': 1, - 'num': 10, - }); - } - } - } + return Promise.all(promises); +} - workers[i].postMessage({ - 'type': 0, - 'thread_id': i, - 'memory': memory, - 'stack_base': heap_base, - 'buffer_offset': buf_offset, - 'flags': flags - }); - } +function workers_thread_message(workers, message, thread_field=null) { + const messages = []; -// const results = await WebAssembly.instantiateStreaming(fetch('wasm/multi.wasm')); + for (let i = 0; i < workers.length; ++i) { + if (thread_field !== null) { + const m = structuredClone(message); + m[thread_field] = i; + messages.push(m); + } else { + messages.push(message); + } + } -// state.wasm.exports = results.instance.exports; -// state.wasm.exports.memory.grow(4096); + return workers_messages(workers, messages); } async function init_wasm(state) { - await init_test(); + const memory = new WebAssembly.Memory({ + initial: 32, // 2MiB, 1MiB of which is stack + maximum: 16384, // 1GiB + shared: true, + }); - const results = await WebAssembly.instantiateStreaming(fetch('wasm/lod.wasm')); + // "Master thread" to do maintance on (static allocations, merging results etc) + const master_wasm = await WebAssembly.instantiateStreaming(fetch('wasm/lod.wasm'), { + env: { 'memory': memory } + }); - state.wasm.exports = results.instance.exports; - state.wasm.exports.memory.grow(4096); + const nworkers = navigator.hardwareConcurrency; + + state.wasm.exports = master_wasm.instance.exports; + state.wasm.heap_base = state.wasm.exports.alloc_static(0); + state.wasm.workers = []; + state.wasm.memory = memory; + + for (let i = 0; i < nworkers; ++i) { + const w = new Worker('lod_worker.js'); + state.wasm.workers.push(w); + } + + await workers_thread_message(state.wasm.workers, { + 'type': 'init', + 'heap_base': state.wasm.heap_base, + 'memory': memory, + }, 'thread_id'); state.wasm.stroke_bytes = 4096; state.wasm.coords_bytes = 4096; @@ -95,7 +86,7 @@ async function init_wasm(state) { }, }; - const mem = state.wasm.exports.memory.buffer; + const mem = state.wasm.memory.buffer; state.wasm.buffers['xs'].tv = tv_create_on(Float32Array, state.wasm.coords_bytes / 8, mem, state.wasm.buffers['xs'].offset); @@ -135,11 +126,14 @@ function wasm_ensure_by(state, nstrokes, ncoords) { } if (realloc) { - // TODO: we do memory.grow() somewhere here if needed + const current_pages = state.wasm.memory.buffer.byteLength / (4096 * 16); + const need_pages = (state.wasm.coords_bytes * 3 + state.wasm.stroke_bytes * 2) / (4096 * 16); // TODO: figure out actual memory requirements + const grow_by = need_pages - current_pages; + state.wasm.memory.grow(grow_by); state.wasm.exports.free_static(); - const mem = state.wasm.exports.memory.buffer; + const mem = state.wasm.memory.buffer; const memv = new Uint8Array(mem); buffers['xs'].offset = state.wasm.exports.alloc_static(state.wasm.coords_bytes / 2); @@ -175,35 +169,65 @@ function wasm_ensure_by(state, nstrokes, ncoords) { } } -function do_lod(state, context) { +async function do_lod(state, context) { state.wasm.exports.free_dynamic(); + const buffers = state.wasm.buffers; + const result_buffers = state.wasm.exports.alloc_dynamic(state.wasm.workers.length * 4); + const result_counts = state.wasm.exports.alloc_dynamic(state.wasm.workers.length * 4); const clipped_indices = state.wasm.exports.alloc_dynamic(context.clipped_indices.size * 4); - const mem = new Uint8Array(state.wasm.exports.memory.buffer); + const mem = new Uint8Array(state.wasm.memory.buffer); // Dynamic input data that should (by design) never be too big mem.set(tv_bytes(context.clipped_indices), clipped_indices); - const buffers = state.wasm.buffers; - const segment_count = state.wasm.exports.do_lod( - clipped_indices, context.clipped_indices.size, state.canvas.zoom, - buffers['coords_from'].offset, - buffers['line_threshold'].offset, - buffers['xs'].offset, - buffers['ys'].offset, - buffers['pressures'].offset, - buffers['xs'].used / 4, + // TODO: this is a very naive and dumb way of distributing work. Better way + // would be to distrubute strokes based on total point count, so that + // each worker gets approximately the same amout of _points_ + const indices_per_thread = Math.floor(context.clipped_indices.size / state.wasm.workers.length); + const offsets = { + 'coords_from': buffers['coords_from'].offset, + 'line_threshold': buffers['line_threshold'].offset, + 'xs': buffers['xs'].offset, + 'ys': buffers['ys'].offset, + 'pressures': buffers['pressures'].offset, + 'result_buffers': result_buffers, + 'result_counts': result_counts, + }; + + const jobs = []; + + for (let i = 0; i < state.wasm.workers.length; ++i) { + let count = indices_per_thread; + + if (i === state.wasm.workers.length - 1) { + count += context.clipped_indices.size % state.wasm.workers.length; + } + + jobs.push({ + 'type': 'lod', + 'indices_base': clipped_indices + i * 4 * indices_per_thread, + 'indices_count': count, + 'zoom': state.canvas.zoom, + 'offsets': offsets + }); + } + + await workers_messages(state.wasm.workers, jobs); + const result_offset = state.wasm.exports.merge_results( + result_counts, + result_buffers, + state.wasm.workers.length ); - // Use results without copying from WASM memory - const result_offset = clipped_indices + context.clipped_indices.size * 4 - + (context.clipped_indices.size + 1) * 4 + buffers['xs'].used; + const segment_count = new Int32Array(state.wasm.memory.buffer, result_counts, 1)[0]; // by convention - const wasm_points = new Float32Array(state.wasm.exports.memory.buffer, + // Use results without copying from WASM memory + const wasm_points = new Float32Array(state.wasm.memory.buffer, result_offset, segment_count * 2); - const wasm_ids = new Uint32Array(state.wasm.exports.memory.buffer, + const wasm_ids = new Uint32Array(state.wasm.memory.buffer, result_offset + segment_count * 2 * 4, segment_count); - const wasm_pressures = new Uint8Array(state.wasm.exports.memory.buffer, + const wasm_pressures = new Uint8Array(state.wasm.memory.buffer, result_offset + segment_count * 2 * 4 + segment_count * 4, segment_count); context.instance_data_points.data = wasm_points; diff --git a/client/wasm/compile_command b/client/wasm/compile_command index b496251..412a79b 100644 --- a/client/wasm/compile_command +++ b/client/wasm/compile_command @@ -1 +1 @@ -clang -Oz --target=wasm32 -nostdlib -msimd128 -mbulk-memory -matomics -Wl,--no-entry,--import-memory,--shared-memory,--max-memory=$((1024 * 1024 * 1024)) -z stack-size=$((1024 * 1024)) multi.c -o multi.wasm +clang -Oz --target=wasm32 -nostdlib -msimd128 -mbulk-memory -matomics -Wl,--no-entry,--import-memory,--shared-memory,--export-all,--max-memory=$((1024 * 1024 * 1024)) -z stack-size=$((1024 * 1024)) lod.c -o lod.wasm diff --git a/client/wasm/lod.c b/client/wasm/lod.c index bed8a70..fa7dc81 100644 --- a/client/wasm/lod.c +++ b/client/wasm/lod.c @@ -8,10 +8,11 @@ static int allocated_static; static int allocated_dynamic; void -set_sp(void *sp) +set_sp(char *sp) { - __asm__ volatile( - "local.get 0\n" + __asm__ __volatile__( + ".globaltype __stack_pointer, i32\n" + "local.get %0\n" "global.set __stack_pointer\n" : : "r"(sp) ); @@ -32,16 +33,26 @@ free_dynamic(void) void * alloc_static(int size) { + // This IS NOT thread-safe void *result = &__heap_base + allocated_static; allocated_static += size; return(result); } +static int +round_to_pow2(int value, int multiple) +{ + return((value + multiple - 1) & -multiple); +} + void * alloc_dynamic(int size) { - void *result = &__heap_base + allocated_static + allocated_dynamic; - allocated_dynamic += size; + // Very ad-van-ced thread-safe allocator + // CAN be called from multiple threads + size = round_to_pow2(size, 4); + int original_allocated_dynamic = __atomic_fetch_add(&allocated_dynamic, size, __ATOMIC_SEQ_CST); + void *result = &__heap_base + allocated_static + original_allocated_dynamic; return(result); } @@ -183,21 +194,27 @@ rdp_find_max(float *xs, float *ys, unsigned char *pressures, float zoom, int coo return(result); } -int +void do_lod(int *clipped_indices, int clipped_count, float zoom, int *stroke_coords_from, float *line_threshold, float *xs, float *ys, unsigned char *pressures, - int coordinates_count) + char **result_buffer, + int *result_count) { if (clipped_count == 0) { - return(0); + result_count[0] = 0; + return; } + int first_stroke = clipped_indices[0]; + int last_stroke = clipped_indices[clipped_count - 1]; + int total_points = stroke_coords_from[last_stroke + 1] - stroke_coords_from[first_stroke]; + int *segments_from = alloc_dynamic((clipped_count + 1) * 4); - int *segments = alloc_dynamic(coordinates_count * 4); + int *segments = alloc_dynamic(total_points * 4); // TODO: this is a very conservative estimate, we can lower memory usage if we get this tighter int segments_head = 0; int stack[4096]; // TODO: what's a reasonable max size for this? @@ -267,9 +284,11 @@ do_lod(int *clipped_indices, int clipped_count, float zoom, segments_from[clipped_count] = segments_head; // Write actual coordinates (points) and stroke ids - float *points = alloc_dynamic(segments_head * 2 * 4); - int *ids = alloc_dynamic(segments_head * 4); - unsigned char *pressures_res = alloc_dynamic(segments_head); + // Do this in one allocation so that they're not interleaved between threads + char *output = alloc_dynamic(segments_head * (3 * 4 + 1)); + float *points = (float *) output; + int *ids = (int *) (output + segments_head * 4 * 2); + unsigned char *pressures_res = (unsigned char *) (output + segments_head * 4 * 3); int phead = 0; int ihead = 0; @@ -300,5 +319,40 @@ do_lod(int *clipped_indices, int clipped_count, float zoom, } } - return(segments_head); + result_buffer[0] = output; + result_count[0] = segments_head; +} + +// NOT thread-safe, only call from one thread +char * +merge_results(int *segment_counts, char **buffers, int nthreads) +{ + int total_segments = 0; + + for (int i = 0; i < nthreads; ++i) { + total_segments += segment_counts[i]; + } + + char *merged = alloc_dynamic(total_segments * (3 * 4 + 1)); + + float *points = (float *) merged; + int *ids = (int *) (merged + total_segments * 4 * 2); + unsigned char *pressures = (unsigned char *) (merged + total_segments * 4 * 3); + + for (int i = 0; i < nthreads; ++i) { + int segments = segment_counts[i]; + if (segments > 0) { + __builtin_memcpy(points, buffers[i], segments * 4 * 2); + __builtin_memcpy(ids, buffers[i] + segments * 4 * 2, segments * 4); + __builtin_memcpy(pressures, buffers[i] + segments * 4 * 3, segments); + + points += segments * 2; + ids += segments; + pressures += segments; + } + } + + segment_counts[0] = total_segments; + + return(merged); } diff --git a/client/wasm/lod.wasm b/client/wasm/lod.wasm index 7f218d7..1a066e7 100755 Binary files a/client/wasm/lod.wasm and b/client/wasm/lod.wasm differ diff --git a/client/wasm/multi.c b/client/wasm/multi.c deleted file mode 100644 index 20765cb..0000000 --- a/client/wasm/multi.c +++ /dev/null @@ -1,37 +0,0 @@ -#include - -extern char __heap_base; - -static int allocated; - -void -set_sp(char *sp) -{ - __asm__ __volatile__( - ".globaltype __stack_pointer, i32\n" - "local.get %0\n" - "global.set __stack_pointer\n" - : : "r"(sp) - ); -} - -void * -alloc(int size) -{ - void *result = &__heap_base + allocated; - allocated += size; - return(result); -} - -static void -impl(int *buffer, int index, int number) -{ - buffer[index] = number; -} - -void -write_a_number(int *buffer, int index, int number) -{ - int n = number * 2; - impl(buffer, index, n); -} diff --git a/client/wasm/multi.wasm b/client/wasm/multi.wasm deleted file mode 100755 index 01e32fb..0000000 Binary files a/client/wasm/multi.wasm and /dev/null differ diff --git a/client/wasm_worker.js b/client/wasm_worker.js deleted file mode 100644 index 0e0ff7c..0000000 --- a/client/wasm_worker.js +++ /dev/null @@ -1,40 +0,0 @@ -let thread_id = null; -let buf_offset = null; -let exports = null; -let flags = null; - -function done() { - postMessage('done'); -} - -async function init_wasm(tid, memory, offset, notify_flags, stack_base) { - thread_id = tid; - buf_offset = offset; - - const result = await WebAssembly.instantiateStreaming(fetch('wasm/multi.wasm'), { - env: { 'memory': memory } - }); - - exports = result.instance.exports; - //console.log(tid, 'init'); - exports.set_sp(stack_base - thread_id * 4096); - - flags = notify_flags; - done(); -} - -function do_work(num, callback) { - //console.log(thread_id, 'work'); - exports.write_a_number(buf_offset, thread_id, thread_id * num); - done(); -} - -onmessage = (e) => { - if (e.data.type === 0) { - init_wasm(e.data.thread_id, e.data.memory, e.data.buffer_offset, e.data.flags, e.data.stack_base); - } else if (e.data.type === 1) { - do_work(e.data.num); - } -} - - diff --git a/client/webgl_draw.js b/client/webgl_draw.js index ee6f51d..546799c 100644 --- a/client/webgl_draw.js +++ b/client/webgl_draw.js @@ -1,7 +1,7 @@ function schedule_draw(state, context) { if (!state.timers.raf) { - window.requestAnimationFrame(() => { - draw(state, context); + window.requestAnimationFrame(async () => { + await draw(state, context); }); state.timers.raf = true; } @@ -73,7 +73,7 @@ function draw_html(state) { } } -function draw(state, context) { +async function draw(state, context) { const cpu_before = performance.now(); state.timers.raf = false; @@ -89,22 +89,23 @@ function draw(state, context) { gl.beginQuery(context.gpu_timer_ext.TIME_ELAPSED_EXT, query); } - gl.viewport(0, 0, context.canvas.width, context.canvas.height); - gl.clearColor(context.bgcolor.r, context.bgcolor.g, context.bgcolor.b, 1); - gl.clearDepth(0.0); - gl.clear(gl.COLOR_BUFFER_BIT | gl.DEPTH_BUFFER_BIT); - locations = context.locations['sdf'].main; buffers = context.buffers['sdf']; - gl.useProgram(context.programs['sdf'].main); - bvh_clip(state, context); - const segment_count = geometry_write_instances(state, context); + const segment_count = await geometry_write_instances(state, context); const dynamic_segment_count = context.dynamic_segment_count; const dynamic_stroke_count = context.dynamic_stroke_count; + // Only clear once we have the data, this might not always be on the same frame? + gl.viewport(0, 0, context.canvas.width, context.canvas.height); + gl.clearColor(context.bgcolor.r, context.bgcolor.g, context.bgcolor.b, 1); + gl.clearDepth(0.0); + gl.clear(gl.COLOR_BUFFER_BIT | gl.DEPTH_BUFFER_BIT); + + gl.useProgram(context.programs['sdf'].main); + // "Static" data upload if (segment_count > 0) { const total_static_size = context.instance_data_points.size * 4 + diff --git a/client/webgl_geometry.js b/client/webgl_geometry.js index 7d7bce4..ad22b65 100644 --- a/client/webgl_geometry.js +++ b/client/webgl_geometry.js @@ -37,11 +37,11 @@ function geometry_prepare_stroke(state) { } -function geometry_write_instances(state, context) { +async function geometry_write_instances(state, context, callback) { state.stats.rdp_max_count = 0; state.stats.rdp_segments = 0; - const segment_count = do_lod(state, context); + const segment_count = await do_lod(state, context); if (config.debug_print) console.debug('instances:', segment_count, 'rdp max:', state.stats.rdp_max_count, 'rdp segments:', state.stats.rdp_segments);