function worker_message(worker, message) { return new Promise((resolve) => { worker.onmessage = (e) => resolve(e.data); worker.postMessage(message); }); } function workers_messages(workers, messages) { const promises = []; for (let i = 0; i < workers.length; ++i) { promises.push(worker_message(workers[i], messages[i])); } return Promise.all(promises); } function workers_thread_message(workers, message, thread_field=null) { const messages = []; for (let i = 0; i < workers.length; ++i) { if (thread_field !== null) { const m = structuredClone(message); m[thread_field] = i; messages.push(m); } else { messages.push(message); } } return workers_messages(workers, messages); } async function init_wasm(state) { const memory = new WebAssembly.Memory({ initial: 16384, // F U maximum: 16384, // 1GiB shared: true, }); // "Master thread" to do maintance on (static allocations, merging results etc) const master_wasm = await WebAssembly.instantiateStreaming(fetch('wasm/lod.wasm'), { env: { 'memory': memory } }); const nworkers = navigator.hardwareConcurrency; state.wasm.exports = master_wasm.instance.exports; state.wasm.heap_base = state.wasm.exports.alloc_static(0); state.wasm.workers = []; state.wasm.memory = memory; for (let i = 0; i < nworkers; ++i) { const w = new Worker('lod_worker.js'); state.wasm.workers.push(w); } await workers_thread_message(state.wasm.workers, { 'type': 'init', 'heap_base': state.wasm.heap_base, 'memory': memory, }, 'thread_id'); const initial = config.initial_wasm_bytes; state.wasm.buffers = { 'xs': { 'used': 0, 'cap': initial }, 'ys': { 'used': 0, 'cap': initial }, 'coords_from': { 'used': 0, 'cap': initial }, 'pressures': { 'used': 0, 'cap': initial }, }; state.wasm.buffers['xs'].offset = state.wasm.exports.alloc_static(initial); state.wasm.buffers['ys'].offset = state.wasm.exports.alloc_static(initial); state.wasm.buffers['pressures'].offset = state.wasm.exports.alloc_static(initial); state.wasm.buffers['coords_from'].offset = state.wasm.exports.alloc_static(initial); const mem = state.wasm.memory.buffer; state.wasm.buffers['xs'].tv = tv_create_on(Float32Array, initial / 4, mem, state.wasm.buffers['xs'].offset); state.wasm.buffers['ys'].tv = tv_create_on(Float32Array, initial / 4, mem, state.wasm.buffers['ys'].offset); state.wasm.buffers['pressures'].tv = tv_create_on(Uint8Array, initial, mem, state.wasm.buffers['pressures'].offset); state.wasm.buffers['coords_from'].tv = tv_create_on(Uint32Array, initial / 4, mem, state.wasm.buffers['coords_from'].offset); tv_add(state.wasm.buffers['coords_from'].tv, 0); state.wasm.buffers['coords_from'].used = 4; } function wasm_ensure_by(state, nstrokes, ncoords) { const buffers = state.wasm.buffers; const old_ys_offset = buffers['ys'].offset; const old_coords_from_offset = buffers['coords_from'].offset; const old_pressures_offset = buffers['pressures'].offset; let realloc = false; let coords_bytes = buffers['xs'].cap; let stroke_bytes = buffers['coords_from'].cap; if (buffers['xs'].used + ncoords * 4 > buffers['xs'].cap) { coords_bytes = round_to_pow2(buffers['xs'].cap + ncoords * 4, 4096 * 16); // 1 wasm page (although it doesn't matter here) realloc = true; } if (buffers['coords_from'].used + nstrokes * 4 > buffers['coords_from'].cap) { stroke_bytes = round_to_pow2(buffers['coords_from'].cap + nstrokes * 4, 4096 * 16); realloc = true; } if (realloc) { if (config.debug_print) console.debug('WASM static data re-layout'); state.wasm.exports.free_static(); const mem = state.wasm.memory.buffer; const memv = new Uint8Array(mem); buffers['xs'].offset = state.wasm.exports.alloc_static(coords_bytes); buffers['ys'].offset = state.wasm.exports.alloc_static(coords_bytes); buffers['pressures'].offset = state.wasm.exports.alloc_static(coords_bytes); buffers['coords_from'].offset = state.wasm.exports.alloc_static(stroke_bytes); buffers['xs'].tv = tv_create_on(Float32Array, coords_bytes / 4, mem, buffers['xs'].offset); buffers['ys'].tv = tv_create_on(Float32Array, coords_bytes / 4, mem, buffers['ys'].offset); buffers['pressures'].tv = tv_create_on(Uint8Array, coords_bytes / 8, mem, buffers['pressures'].offset); buffers['coords_from'].tv = tv_create_on(Uint32Array, stroke_bytes / 4, mem, buffers['coords_from'].offset); // TODO: this should have been automatic maybe? buffers['xs'].tv.size = buffers['xs'].used / 4; buffers['ys'].tv.size = buffers['ys'].used / 4; buffers['pressures'].tv.size = buffers['pressures'].used; buffers['coords_from'].tv.size = buffers['coords_from'].used / 4; buffers['xs'].cap = buffers['ys'].cap = buffers['pressures'].cap = coords_bytes; buffers['coords_from'].cap = stroke_bytes; const tmp = new Uint8Array(Math.max(coords_bytes, stroke_bytes)); // Copy from back to front (otherwise we will overwrite) tmp.set(new Uint8Array(mem, old_coords_from_offset, buffers['coords_from'].used)); memv.set(new Uint8Array(tmp.buffer, 0, buffers['coords_from'].used), buffers['coords_from'].offset); tmp.set(new Uint8Array(mem, old_pressures_offset, buffers['pressures'].used)); memv.set(new Uint8Array(tmp.buffer, 0, buffers['pressures'].used), buffers['pressures'].offset); tmp.set(new Uint8Array(mem, old_ys_offset, buffers['ys'].used)); memv.set(new Uint8Array(tmp.buffer, 0, buffers['ys'].used), buffers['ys'].offset); } } async function do_lod(state, context) { state.wasm.exports.free_dynamic(); const buffers = state.wasm.buffers; const result_buffers = state.wasm.exports.alloc_dynamic(state.wasm.workers.length * 4); const result_counts = state.wasm.exports.alloc_dynamic(state.wasm.workers.length * 4); const clipped_indices = state.wasm.exports.alloc_dynamic(context.clipped_indices.size * 4); const mem = new Uint8Array(state.wasm.memory.buffer); // Dynamic input data that should (by design) never be too big mem.set(tv_bytes(context.clipped_indices), clipped_indices); // TODO: this is a very naive and dumb way of distributing work. Better way // would be to distrubute strokes based on total point count, so that // each worker gets approximately the same amout of _points_ const indices_per_thread = Math.floor(context.clipped_indices.size / state.wasm.workers.length); const offsets = { 'coords_from': buffers['coords_from'].offset, 'xs': buffers['xs'].offset, 'ys': buffers['ys'].offset, 'pressures': buffers['pressures'].offset, 'result_buffers': result_buffers, 'result_counts': result_counts, }; const jobs = []; for (let i = 0; i < state.wasm.workers.length; ++i) { let count = indices_per_thread; if (i === state.wasm.workers.length - 1) { count += context.clipped_indices.size % state.wasm.workers.length; } jobs.push({ 'type': 'lod', 'indices_base': clipped_indices + i * 4 * indices_per_thread, 'indices_count': count, 'zoom': state.canvas.zoom, 'offsets': offsets }); } await workers_messages(state.wasm.workers, jobs); const result_offset = state.wasm.exports.merge_results( result_counts, result_buffers, state.wasm.workers.length ); const segment_count = new Int32Array(state.wasm.memory.buffer, result_counts, 1)[0]; // by convention // Use results without copying from WASM memory const wasm_points = new Float32Array(state.wasm.memory.buffer, result_offset, segment_count * 2); const wasm_ids = new Uint32Array(state.wasm.memory.buffer, result_offset + segment_count * 2 * 4, segment_count); const wasm_pressures = new Uint8Array(state.wasm.memory.buffer, result_offset + segment_count * 2 * 4 + segment_count * 4, segment_count); context.instance_data_points.data = wasm_points; context.instance_data_points.size = segment_count * 2; context.instance_data_ids.data = wasm_ids; context.instance_data_ids.size = segment_count; context.instance_data_pressures.data = wasm_pressures; context.instance_data_pressures.size = segment_count; return segment_count; }