Massive data processing relies on data streams: data cannot reside as a whole in memory. Instead, data chunks are incomplete data. In this context, Web streams is an Application Programming Interface -API-, which hides memory issues: computations are expressed on data while all data are not present, but arrive from a Web source.
ReadableStream
const decompression_stream = new DecompressionStream("gzip"); fetch("https://bulk.openweathermap.org/sample/city.list.json.gz").then(async (response) => { // 'response.body' is a 'ReadableStream' object: console.assert(response.body.constructor.name === 'ReadableStream'); console.assert(response.body.locked === false /*&& response.body.state === 'readable'*/); const data_stream: ReadableStream<number> = response.body.pipeThrough(decompression_stream); for await (const chunk of Chunks(data_stream.getReader())) { console.assert(data_stream.locked); // 'getReader()' locks the stream... // Raw data stream: console.assert(chunk.constructor.name === 'Uint8Array'); // console.info(`Chunk of size ${chunk.length}... with raw data: ${chunk}`); // Data stream as (incomplete) JSON. Result *CANNOT* be parsed as JSON: // console.info((new TextDecoder()).decode(chunk)); /** * Caution, loss of data ('import * as Parser_with_data_loss from 'partial-json-parser';'): */ const incomplete_data: Array<City> = Parser_with_data_loss((new TextDecoder()).decode(chunk)); console.assert(incomplete_data.constructor.name === 'Array'); // Stop, first chunk only for test: break; } });
Creation of a
ReadStream
object (file system) to be served asReadableStream
object (Web)const application = new Koa(); application.use((context: any /* Koa 'Context' type */) => { if (context.request.url === '/Prison_de_Nantes.json') { context.response.set('content-type', 'application/json'); // A stream is set as response body: context.body = file_system.createReadStream('./Prison_de_Nantes.json'); } }); http.createServer(application.callback()).listen(1963);
TextDecoderStream
fetch("https://FranckBarbier.com/Enterprise_Architect/Prison_de_Nantes/Prison_de_Nantes.json").then(async (response) => { // 'TextDecoderStream' transforms raw data into strings: const data_stream = response.body.pipeThrough(new TextDecoderStream()); for await (const chunk of Chunks(data_stream.getReader())) { console.assert(chunk.constructor.name === 'String'); console.info(`Chunk of size ${chunk.length}... with string data: ${chunk}`); // Stop, first chunk only for test: break; } });
TransformStream
interface City { "id": number, "name": string, "state": string, "country": string, "coord"?: { "lon": number, "lat": number } // UTM coord. are computed by the transformation stream: "UTM"?: { "Easting": number, "Northing": number, "ZoneNumber": number, "ZoneLetter": string } } // https://exploringjs.com/nodejs-shell-scripting/ch_web-streams.html#implementing-custom-transformstreams class Compute_UTM_from_lat_lon implements Transformer<Uint8Array, City> { private static readonly _Precision = 1; private static readonly _UTM = new UTM; // Default Ellipsoid is 'WGS 84' private readonly _text_decoder: TextDecoder = new TextDecoder(); start(controller: TransformStreamDefaultController<City>): void | Promise<void> { console.info('Any initialization?'); } transform(chunk: Uint8Array, controller: TransformStreamDefaultController<City>): void | Promise<void> { /** * Caution, loss of data ('import * as Parser_with_data_loss from 'partial-json-parser';'): */ const cities: Array<City> = Parser_with_data_loss(this._text_decoder.decode(chunk)); for (const city of cities) if ('coord' in city) { city.UTM = Compute_UTM_from_lat_lon._UTM.convertLatLngToUtm(city.coord.lat, city.coord.lon, Compute_UTM_from_lat_lon._Precision) controller.enqueue(city); } controller.terminate(); // Stop, first chunk only for test... } flush(controller: TransformStreamDefaultController<City>): void | Promise<void> { // Any finalization? } } export const Transformation = () => { const decompression_stream = new DecompressionStream("gzip"); fetch("https://bulk.openweathermap.org/sample/city.list.json.gz").then(async (response) => { const data_stream: ReadableStream<any> = response.body.pipeThrough(decompression_stream); // const tranformation = new TransformStream({ // transform(chunk, controller) { // controller.enqueue(new TextDecoder().decode(chunk)); // controller.terminate(); // Stop, first chunk only for test... // }, // }); // 'Transformer' object is passed to 'TransformStream' at creation time: const tranformation = new TransformStream(new Compute_UTM_from_lat_lon); const data_stream_: ReadableStream<City> = data_stream.pipeThrough(tranformation); for await (const city of Chunks(data_stream_.getReader())) console.info(city); }); }
fetch(OpenSLR_org_88.URL).then((response) => { if ('body' in response) { // 'response.body' is a 'ReadableStream' object: console.assert(response.body!.constructor.name === 'ReadableStream'); console.assert(response.body!.locked === false /*&& response.body.state === 'readable'*/); const uncompressed_data_stream: ReadableStream<number> = response.body!.pipeThrough(new DecompressionStream(Compression.GZIP)); // Conversion from Web 'ReadableStream' to file system 'ReadStream' (https://stackoverflow.com/questions/71509505/how-to-convert-web-stream-to-nodejs-native-stream): // TypeScript compilation error (https://stackoverflow.com/questions/63630114/argument-of-type-readablestreamany-is-not-assignable-to-parameter-of-type-r): // @ts-ignore const source = Readable.fromWeb(uncompressed_data_stream); const target = file_system.createWriteStream(OpenSLR_org_88.Archive_file_name); // Flat uncompressed (archive) file on disk (10,6 GB) is simply saved: source.pipe(target); source.on('end', () => { // Use of 'node-tar' library: tar.list({ // Let us the possibility of extracting directory hierarchy inside archive file: file: OpenSLR_org_88.Archive_file_name, onReadEntry: entry => console.info(entry.path) }); }); } });
import * as zlib from 'node:zlib';
await fetch(OpenSLR_org_88.URL).then(async (response) => { if ('body' in response) { // 'response.body' is a 'ReadableStream' object: console.assert(response.body!.constructor.name === 'ReadableStream'); console.assert(response.body!.locked === false /*&& response.body.state === 'readable'*/); // Conversion from Web 'ReadableStream' to file system 'ReadStream' (https://stackoverflow.com/questions/71509505/how-to-convert-web-stream-to-nodejs-native-stream): // @ts-ignore const source = Readable.fromWeb(response.body); // Compressed data stream... // source.pipe(file_system.createWriteStream('OpenSLR_org_88.tgz')); // Compressed version on local disk... const target = file_system.createWriteStream(OpenSLR_org_88.Archive_file_name); /** 'zlib'-based decompression (optional because 'tar' does the job later on if compressed) */ await promisify(pipeline)(source, zlib.createUnzip(), target).catch((error) => { throw new Error("'zlib' failed... " + error); }); // Archive file created... // Use of 'node-tar' library: await tar.extract({file: OpenSLR_org_88.Archive_file_name}).then(_ => { // Archive file expanded... }).catch((error) => { throw new Error("'tar.extract' failed... " + error); }); } else throw new Error("''body' in response', untrue... "); }).catch((error) => { throw new Error("'fetch' failed... " + error); });