Web streams

  Copyright
Resources
Principle

Massive data processing relies on data streams: data cannot reside as a whole in memory. Instead, data chunks are incomplete data. In this context, Web streams is an Application Programming Interface -API-, which hides memory issues: computations are expressed on data while all data are not present, but arrive from a Web viewpoint.

ReadableStream
const decompression_stream = new DecompressionStream("gzip");
fetch("https://bulk.openweathermap.org/sample/city.list.json.gz").then(async (response) => {
    // 'response.body' is a 'ReadableStream' object:
    console.assert(response.body.constructor.name === 'ReadableStream');
    console.assert(response.body.locked === false /*&& response.body.state === 'readable'*/);

    const data_stream: ReadableStream<number> = response.body.pipeThrough(decompression_stream);
    for await (const chunk of Chunks(data_stream.getReader())) {
        console.assert(data_stream.locked); // 'getReader()' locks the stream...
        // Raw data stream:
        console.assert(chunk.constructor.name === 'Uint8Array');
        // console.info(`Chunk of size ${chunk.length}... with raw data: ${chunk}`);
        // Data stream as (incomplete) JSON. Result *CANNOT* be parsed as JSON:
        // console.info((new TextDecoder()).decode(chunk));
        /**
         * Caution, loss of data ('import * as Parser_with_data_loss from 'partial-json-parser';'):
        */
        const incomplete_data: Array<City> = Parser_with_data_loss((new TextDecoder()).decode(chunk));
        console.assert(incomplete_data.constructor.name === 'Array');
        // Stop, first chunk only for test:
        break;
    }
});

Creation of a ReadableStream object

const application = new Koa();
application.use((context: any /* Koa 'Context' type */) => {
    if (context.request.url === '/Prison_de_Nantes.json') {
        context.response.set('content-type', 'application/json');
        // A stream is set as response body:
        context.body = file_system.createReadStream('./Prison_de_Nantes.json');
    }
});
http.createServer(application.callback()).listen(1963);
TextDecoderStream
fetch("https://FranckBarbier.com/resources/Prison_de_Nantes/Prison_de_Nantes.json").then(async (response) => {
    // 'TextDecoderStream' transforms raw data into strings:
    const data_stream = response.body.pipeThrough(new TextDecoderStream());
    for await (const chunk of Chunks(data_stream.getReader())) {
        console.assert(chunk.constructor.name === 'String');
        console.info(`Chunk of size ${chunk.length}... with string data: ${chunk}`);
        // Stop, first chunk only for test:
        break;
    }
});
TransformStream
interface City {
    "id": number,
    "name": string,
    "state": string,
    "country": string,
    "coord"?: {
        "lon": number,
        "lat": number
    }
    // UTM coord. are computed by the transformation stream:
    "UTM"?: { "Easting": number, "Northing": number, "ZoneNumber": number, "ZoneLetter": string }
}

// https://exploringjs.com/nodejs-shell-scripting/ch_web-streams.html#implementing-custom-transformstreams
class Compute_UTM_from_lat_lon implements Transformer<Uint8Array, City> {
    private static readonly _Precision = 1;
    private static readonly _UTM = new UTM; // Default Ellipsoid is 'WGS 84'
    private readonly _text_decoder: TextDecoder = new TextDecoder();

    start(controller: TransformStreamDefaultController<City>): void | Promise<void> {
        console.info('Any initialization?');
    }

    transform(chunk: Uint8Array, controller: TransformStreamDefaultController<City>): void | Promise<void> {
        /**
         * Caution, loss of data ('import * as Parser_with_data_loss from 'partial-json-parser';'):
         */
        const cities: Array<City> = Parser_with_data_loss(this._text_decoder.decode(chunk));
        for (const city of cities)
            if ('coord' in city) {
                city.UTM = Compute_UTM_from_lat_lon._UTM.convertLatLngToUtm(city.coord.lat, city.coord.lon, Compute_UTM_from_lat_lon._Precision)
                controller.enqueue(city);
            }
        controller.terminate(); // Stop, first chunk only for test...
    }

    flush(controller: TransformStreamDefaultController<City>): void | Promise<void> {
        // Any finalization?
    }
}


export const Transformation = () => {
    const decompression_stream = new DecompressionStream("gzip");
    fetch("https://bulk.openweathermap.org/sample/city.list.json.gz").then(async (response) => {
        const data_stream: ReadableStream<any> = response.body.pipeThrough(decompression_stream);

        // const tranformation = new TransformStream({
        //     transform(chunk, controller) {
        //         controller.enqueue(new TextDecoder().decode(chunk));
        //         controller.terminate(); // Stop, first chunk only for test...
        //     },
        // });

        // 'Transformer' object is passed to 'TransformStream' at creation time:    
        const tranformation = new TransformStream(new Compute_UTM_from_lat_lon);
        const data_stream_: ReadableStream<City> = data_stream.pipeThrough(tranformation);
        for await (const city of Chunks(data_stream_.getReader()))
            console.info(city);
    });
}