1// Copyright (C) 2018 The Android Open Source Project 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15import {defer} from '../base/deferred'; 16import {assertExists, assertTrue} from '../base/logging'; 17import initTraceProcessor from '../gen/trace_processor'; 18 19// The Initialize() call will allocate a buffer of REQ_BUF_SIZE bytes which 20// will be used to copy the input request data. This is to avoid passing the 21// input data on the stack, which has a limited (~1MB) size. 22// The buffer will be allocated by the C++ side and reachable at 23// HEAPU8[reqBufferAddr, +REQ_BUFFER_SIZE]. 24const REQ_BUF_SIZE = 32 * 1024 * 1024; 25 26// The end-to-end interaction between JS and Wasm is as follows: 27// - [JS] Inbound data received by the worker (onmessage() in engine/index.ts). 28// - [JS] onRpcDataReceived() (this file) 29// - [C++] trace_processor_on_rpc_request (wasm_bridge.cc) 30// - [C++] some TraceProcessor::method() 31// for (batch in result_rows) 32// - [C++] RpcResponseFunction(bytes) (wasm_bridge.cc) 33// - [JS] onReply() (this file) 34// - [JS] postMessage() (this file) 35export class WasmBridge { 36 // When this promise has resolved it is safe to call callWasm. 37 whenInitialized: Promise<void>; 38 39 private aborted: boolean; 40 private connection: initTraceProcessor.Module; 41 private reqBufferAddr = 0; 42 private lastStderr: string[] = []; 43 private messagePort?: MessagePort; 44 45 constructor() { 46 this.aborted = false; 47 const deferredRuntimeInitialized = defer<void>(); 48 this.connection = initTraceProcessor({ 49 locateFile: (s: string) => s, 50 print: (line: string) => console.log(line), 51 printErr: (line: string) => this.appendAndLogErr(line), 52 onRuntimeInitialized: () => deferredRuntimeInitialized.resolve(), 53 }); 54 this.whenInitialized = deferredRuntimeInitialized.then(() => { 55 const fn = this.connection.addFunction(this.onReply.bind(this), 'vii'); 56 this.reqBufferAddr = 57 this.connection.ccall( 58 'trace_processor_rpc_init', 59 /* return=*/ 'number', 60 /* args=*/ ['number', 'number'], 61 [fn, REQ_BUF_SIZE], 62 ) >>> 0; // >>> 0 = static_cast<uint32_t> (see comment in onReply()). 63 }); 64 } 65 66 initialize(port: MessagePort) { 67 // Ensure that initialize() is called only once. 68 assertTrue(this.messagePort === undefined); 69 this.messagePort = port; 70 // Note: setting .onmessage implicitly calls port.start() and dispatches the 71 // queued messages. addEventListener('message') doesn't. 72 this.messagePort.onmessage = this.onMessage.bind(this); 73 } 74 75 onMessage(msg: MessageEvent) { 76 if (this.aborted) { 77 throw new Error('Wasm module crashed'); 78 } 79 assertTrue(msg.data instanceof Uint8Array); 80 const data = msg.data as Uint8Array; 81 let wrSize = 0; 82 // If the request data is larger than our JS<>Wasm interop buffer, split it 83 // into multiple writes. The RPC channel is byte-oriented and is designed to 84 // deal with arbitrary fragmentations. 85 while (wrSize < data.length) { 86 const sliceLen = Math.min(data.length - wrSize, REQ_BUF_SIZE); 87 const dataSlice = data.subarray(wrSize, wrSize + sliceLen); 88 this.connection.HEAPU8.set(dataSlice, this.reqBufferAddr); 89 wrSize += sliceLen; 90 try { 91 this.connection.ccall( 92 'trace_processor_on_rpc_request', // C function name. 93 'void', // Return type. 94 ['number'], // Arg types. 95 [sliceLen], // Args. 96 ); 97 } catch (err) { 98 this.aborted = true; 99 let abortReason = `${err}`; 100 if (err instanceof Error) { 101 abortReason = `${err.name}: ${err.message}\n${err.stack}`; 102 } 103 abortReason += '\n\nstderr: \n' + this.lastStderr.join('\n'); 104 throw new Error(abortReason); 105 } 106 } // while(wrSize < data.length) 107 } 108 109 // This function is bound and passed to Initialize and is called by the C++ 110 // code while in the ccall(trace_processor_on_rpc_request). 111 private onReply(heapPtr: number, size: number) { 112 // Force heapPtr to be a positive using an unsigned right shift. 113 // The issue here is the following: the matching code in wasm_bridge.cc 114 // invokes this function passing arguments as uint32_t. However, in the 115 // wasm<>JS interop bindings, the uint32 args become Js numbers. If the 116 // pointer is > 2GB, this number will be negative, which causes the wrong 117 // behaviour on slice(). 118 heapPtr = heapPtr >>> 0; // This is static_cast<uint32_t>(heapPtr). 119 size = size >>> 0; 120 const data = this.connection.HEAPU8.slice(heapPtr, heapPtr + size); 121 assertExists(this.messagePort).postMessage(data, [data.buffer]); 122 } 123 124 private appendAndLogErr(line: string) { 125 console.warn(line); 126 // Keep the last N lines in the |lastStderr| buffer. 127 this.lastStderr.push(line); 128 if (this.lastStderr.length > 512) { 129 this.lastStderr.shift(); 130 } 131 } 132} 133