Používáme Node.js streamy

Streamy jako takové jsou tu s námi již hodně dlouhou dobu. Co se týče Node.js, ve verzi 0.10 přišly streamy s novým API, a také díky nadstavbám nad tímto low-level mechanismem můžeme tuto techniku pohodlně používat. A jsou zde pádné důvody, proč nenechat streamy bez povšimnutí.
Úvod
Není potřeba zabíhat příliš do teorie. Stream je podle wikipedie sekvence datových prvků dostupných v průběhu času (tzn. po “kouscích”, ne jako celek). Navíc mohou být řetězeny do pipeline, kdy se výstup jednoho streamu stane vstupem následujícího, respektive následujících.
Streamy jsou odvozeny z třídy EventEmitter. Mohou být buď Readable, Writable, nebo obojí (Duplex a Transform). Obecně se dá tok dat ve streamech vyjádřit nějak takto: zdroj (Readable)
→ změna (Readable/Writable)
→ cíl (Writable)
. Uveďme si praktickou ukázku, kde zkomprimujeme soubor:
var fs = require('fs');
var zlib = require('zlib');
fs.createReadStream('large-file.txt') //Readable
.pipe(zlib.createGzip()) //Transform
.pipe(fs.createWriteStream('large-file.txt.gz')); //Writable
Jaké jsou výhody oproti tradičnímu řešení plného callbacků, kdy bychom nejprve načetli celý soubor do paměti, poté ho celý zkomprimovali a následně uložili na disk? Jak už jsme si řekli, streamy pracují s daty po kouscích (výchozí je 16kB, u fs.createReadStream
64kB). Náš kód tedy načte prvních 64kB souboru a pošle ho ke komprimaci. Když načítá druhý blok, ten první se již pilně komprimuje. Na nic se nečeká, procesy běží paralelně. Pokud by komprimovací stream nestíhal, je zde implementován mechanismus, který zbrzdí i stream před ním, tak aby se nic nebufferovalo zbytečně.
Představte si, že komprimujete obrovský soubor. Při načítání celého souboru (pokud se vám to vůbec povede, V8 má určitá omezení) budete plýtvat časem i pamětí. To povede ke značnému zpomalení vašeho skriptu a k nespokojenosti uživatele.
Samozřejmě se nemusíme omezovat pouze na jeden Transform stream. Co kdybychom chtěli data nejdříve šifrovat? Není problém, prostě jen před komprimaci vložíme šifrovací stream.
var fs = require('fs');
var crypto = require('crypto');
var zlib = require('zlib');
fs.createReadStream('large-file.txt') //Readable
.pipe(crypto.createCipher('aes256', 'password')) //Transform
.pipe(zlib.createGzip()) //Transform
.pipe(fs.createWriteStream('large-file.txt.gz')); //Writable
Protože streamy nabízejí konzistentní rozhraní, jsou vhodnou technikou k abstrakci vašeho kódu. Pokud si například chcete vytvořit vlastní Transform stream, který bude logovat velikost dat, je vám úplně jedno zda data přicházejí ze souboru, ze socketu, nebo jsou náhodně generovaná. Kdybychom tedy nechtěli komprimovat soubor, ale data, která nám pošle TCP server, stačí nám k tomu minimální změna:
var fs = require('fs');
var zlib = require('zlib');
var net = require('net');
//fs.createReadStream('large-file.txt')
net.connect(12345) //Duplex (dědí Readable)
.pipe(zlib.createGzip()) //Transform
.pipe(fs.createWriteStream('large-file.txt.gz')); //Writable
Tato flexibilita nám umožňuje i pohodlně posílat data z jednoho zdroje do více cílů. Následující snippet čte vše, co napíšeme do konzole, a to následně uloží do souboru a zároveň pošle na TCP server.
var net = require('net');
var fs = require('fs');
process.stdin.pipe(net.connect(12345));
process.stdin.pipe(fs.createWriteStream('console.txt'));
Shrňme si to do krátkého přehledu. Streamy:
- jsou paměťově méně náročné
- jsou rychlé
- poskytují abstrakci díky jednotnému rozhraní
- nabízejí příjemnou flexibilitu a čistý a přehledný kód
Object mode
Streamy běžně pracují pouze s řetězci nebo buffery. Pokud to chceme změnit, máme možnost je přepnout do object módu. Můžeme tak pracovat se všemi možnými hodnotami, které v JavaScriptu jsou (objekty, pole, čísla, …).
var Readable = require('stream').Readable;
var stream = Readable({objectMode : true });
Readable
Readable stream představuje zdroj, ze kterého data lezou ven do jiných streamů. Jako příklad mužeme uvést http request na serveru, souborový read stream, nebo standardní vstup (stdin).
Pojďme si jeden vlastní vytvořit. Můžeme si třeba vygenerovat Fibonacciho posloupnost do čísla 100 000. Nejjednodušší způsob vypadá nějak takto:
var Readable = require('stream').Readable;
//generuje n-tý prvek fibonacciho posloupnosti
function fib(n) {
if (n === 0) return 0;
if (n === 1) return 1;
return fib(n - 1) + fib(n - 2);
}
var fibonacci = Readable();
//zdroj dat pro další streamy
fibonacci._read = function (size) {
var i = 0, num = 0;
while (true) {
num = fib(i++);
if (num > 100000) break;
this.push(num + '\r\n');
}
this.push(null); //již nebudeme posílat data
};
fibonacci.pipe(process.stdout); //vypíšeme čísla do konzole
Jak vidíme, vše, co potřebujeme, je implementovat metodu _read
, kde voláme posíláme data pomocí push
. Parametrem size
v metodě _read()
říkají následující streamy, kolik dat od našeho streamu vyžadují. V našem příkladu toto ignorujeme.
Druhým způsobem je napsat si vlastní třídu, která bude z Readable dědit. Takto například mužeme posílat parametry do konstruktoru naší třídy a ovlivňovat tak chování streamu. Zápis se jen trochu prodlouží.
var Readable = require('stream').Readable;
function fib(n) {
if (n === 0) return 0;
if (n === 1) return 1;
return fib(n - 1) + fib(n - 2);
}
require('util').inherits(Fibonacci, Readable);
function Fibonacci(to) {
if (!(this instanceof Fibonacci)) return new Fibonacci(to);
Readable.call(this);
this._to = to || 100000;
}
Fibonacci.prototype._read = function (size) {
var i = 0, num = 0;
while (true) {
num = fib(i++);
if (num > this._to) break;
this.push(num + '\r\n');
}
this.push(null);
};
var fibonacci = new Fibonacci(300000);
fibonacci.pipe(process.stdout);
Tento skript nám vypíše všechna čísla fibonacciho posloupnosti menší než 300 000.
Pojďme si ještě přiblížit metodu .pipe(destination, options)
. Jejím úkolem je propojit výstup z Readable streamu (vytáhnout data) se vstupem následujícího writable streamu (zapsat je do něj). Automaticky řídí tok dat tak, aby nezahltila cíl daty z příliš rychlého zdroje. Její návratovou hodnotou je destination stream, což nám dává možnost streamy pohodlně řetězit za sebou, tak jak jsme si ukazovali v úvodu.
Pokud neřekneme jinak, .pipe
po předání všech dat uzavře destination stream, takže už do něj nelze zapisovat data (pokus skončí chybou). Pokud chceme stream používat opakovaně, stačí předat argument options s vlastností end
nastavenou na true
: input.pipe(output, { end : false })
.
Jak už víme, Readable, stejně jako ostatní streamy, je odvozen od EventEmitter
, tudíž nám nic nebrání navázat listenery na různé události:
readable
– nastane, když jsou data připravena ke čtenídata
– data se předávají listeneru hned, jak jsou k dispozici (nejrychlejší způsob, jak číst data ze streamu)
readable.on('data', function (chunk) {
console.log('%d bajtů dat', chunk.length);
});
end
– nastane, když již byla všechna data předánaclose
– pokud se například uzavře soubor před tím, než se stihla přečíst všechna data, nastane tato událosterror
– chyba při čtení se předá listeneru této události
Writable
Writable stream představuje cíl, do kterého data zapisujeme. Příkladem může být http response na serveru, souborový write stream, nebo standardní výstup (stdout).
Čistý Writable stream nemá metodu .pipe
, protože neumožňuje, aby z něho byla data čtena. Vše, co potřebujeme k tomu, abychom si vytvořili vlastní Writable stream, je implementovat metodu ._write(chunk, enc, done)
.
var Writable = require('stream').Writable;
var timelog = Writable();
timelog._write = function (chunk, enc, done) {
var time = (new Date()).toLocaleTimeString();
console.log('[%s]: %s', time, chunk.toString());
done();
};
process.stdin.pipe(timelog);
V parametru chunk
dostaneme blok dat, která nám předal předchozí stream. V enc
je uloženo kódování řetězce nebo slovo buffer
, pokud se pracuje s Buffery. A jako poslední nesmí chybět callback, kterým řekneme, že jsme hotovi.
Samozřejmě i z Writable streamu můžeme odvodit naší třídu. Postup je téměř totožný, jak jsme si ukázali u Readable, jen místo _read()
implementujeme _write()
.
Do Writable streamu můžeme zapsat data nejen pomocí readable.pipe()
, ale i “ručně” pomocí writable.write(data, [encoding], [done])
respektive writable.end([data], [encoding], [done])
. Jak už z názvu vyplývá, druhá jmenovaná zápis do streamu zároveň ukončí. Zavolání write
po end
vyvolá chybu.
var fs = require('fs');
var file = fs.createWriteStream('file.txt');
file.write('Ahoj světe!\r\n');
file.write('Foo bar');
file.end();
I zde můžeme sledovat různé události. Kromě error
to jsou:
drain
– pokud stream nestíhá zapisovaná data zpracovávat a musí si je ukládat do interního bufferu, tato událost nastane vždy, když se buffer vyprázdní a stream je připraven přijímat další datafinish
– když je zavolána metodaend()
a všechna data jsou zpracována, nastane tato událostpipe
– nastane vždy, když je stream “připajpován” k nějakému Readable a ten je také předán listeneroviunpipe
– stejný případ jako upipe
, pouze se to týká metodyreadable.unpipe()
.
Duplex
Duplex streamy implementují jak Readable, tak Writable rozhraní. Ta jsou však od sebe oddělená, mají každý samostatný buffer; čtení a zápis se provádí nezávisle. Příkladem je například třída Socket. U implementace Duplex streamu musíte nadefinovat jak _read([size])
, tak _write(chunk, enc, done)
.
Transform
Transform streamy jsou speciální odvozeninou Duplex streamů. Cílem je na vstupních datech něco vykonat a poslat je na výstup (transformovat). Jako příklad mužeme uvést všechny streamy v zlib a crypto.
Místo _read
a _write
jsou zde metody _transform(chunk, enc, done)
, volaná nad každým dostupným blokem dat, a nepovinná _flush(done)
, volaná, když stream končí.
var Transform = require('stream').Transform;
var fs = require('fs');
require('util').inherits(FindAndReplace, Transform);
function FindAndReplace(find, replace) {
if (!(this instanceof FindAndReplace))
return new FindAndReplace(find, replace);
Transform.call(this);
this._find = new RegExp(find, 'g');
this._replace = replace;
this.count = 0;
}
FindAndReplace.prototype._transform = function (chunk, enc, done) {
var str = chunk.toString();
this.count += str.match(this._find).length;
str = str.replace(this._find, this._replace);
this.push(new Buffer(str));
done();
};
FindAndReplace.prototype._flush = function (done) {
console.log('%d krát nahrazeno', this.count);
done();
};
fs.createReadStream('file.txt')
.pipe(new FindAndReplace('PHP', 'JavaScript'))
.pipe(fs.createWriteStream('file.txt', { flags : 'r+' }));
Účelem Transform streamu nemusí být pouze data změnit. Můžeme například data jen analyzovat a posílat je dál tak, jak jsou. I v metodě _flush
můžeme data přidávat pomocí _push
. To se hodí například, když potřebujete provést operaci nad daty jako celkem. V tom případě si v _transform
ukládáte bloky do proměnné a nikam je nepředáváte. To uděláte až ve _flush
.
Nadstavby
Existuje řada užitečných knihoven, které tvorbu streamů zjednodušují. Zde jsou některé z nich:
- from – zjednodušuje vytváření Readable streamů
- node-writable – usnadňuje vytváření Writable streamů
- through2 – zjednodušuje vytváření Transform streamů
- duplexer – z jednoho Readable a jednoho Writable vytvoří jeden Duplex stream
- event-stream – nabízí řadu užitečných funkcí pro práci se streamy
Závěr
Cílem tohoto článku nebylo probrat streamy úplně do hloubky. Pokud vás na to však navnadil, doporučuji přečíst si Stream handbook a samozřejmě dokumentaci. Pokud chcete vidět příklad užití streamů v praxi, tento článek je přesně pro vás.
Ještě poznámka na úplný závěr. Pokud chcete nové Streams API i ve starších verzích Node.js než je 0.10, je tu pro vás fallback v podobě knihovny readable-stream.
Při kombinaci šifrování + komprese je lépe nejdříve použít kompresi a až potom šifrování. Šifrování totiž větišinou vnese takou entropii, že je účinnost komprese šifrovaných dat protí původnímu ‚plain-textu‘ mizivá.