Dart – Streams

Streams sú messaging vrstva v Darte. Ak sa zamýšľate nad posielaním vlastných správ alebo eventov medzi jednotlivými objektmi, Streams sú tu pre vás.
Seriál: Úvod do Dartu (9 dílů)
- Dart – Čo? Prečo? 2. 8. 2013
- Dart – Úvod do jazyka 23. 8. 2013
- Dart – Ponorme sa hlbšie 6. 9. 2013
- Dart – v DOMe 19. 9. 2013
- Dart – Futures 4. 10. 2013
- Dart – Streams 17. 10. 2013
- Dart – Používame JavaScript 1. 11. 2013
- Dart Typesystem 19. 11. 2013
- Dart – Neznesiteľná ľahkosť asynchrónneho bytia 2. 12. 2013
Nálepky:
Pripomeňme si ukážku kódu z druhého dielu. Ak si potrebujete osviežiť syntax jazyka, odporúčam túto prehľadovú stránku.
import 'dart:html';
void main() {
query("#sample_text_id")
..text = "Click me!"
..onClick.listen(reverseText);
}
void reverseText(MouseEvent event) {
var text = query("#sample_text_id").text;
var buffer = new StringBuffer();
for (int i = text.length - 1; i >= 0; i--) {
buffer.write(text[i]);
}
query("#sample_text_id").text = buffer.toString();
}
V piatom riadku registrujeme reverseText
ako event handler na onClick
property elementu div#sample_text_id
. Čo za objekt je však onClick
? Keď sa dôkladnejšie pozrieme do API, zistíme, že onClick
implementuje rozhranie Stream
.
Kdekoľvek potrebujeme riešiť asynchrónnu komunikáciu, spracovávanie udalostí, posielanie správ, či dokonca čítanie súboru, nastupujú v Darte Stream
s.
Konzumujeme Stream
Predstavme si, že programujeme jednoduchý notifikátor príchodzieho emailu. Máme triedu Email
a Stream<Email>
emailov Inbox.onMessage
, ktorý sa postupne s prichádzajúcimi emailami plní. O príchode každej správy chceme užívateľa notifikovať.
void main() {
myInbox = new Inbox(someConfiguration);
myInbox.onMessage.listen(onEmailArrival);
}
void onEmailArrival(Email email) {
alert("New email from ${email.from}!");
}
Vždy, keď príde nový email, Inbox
nás prostredníctvom Stream
u onMessage
bude informovať zavolaním funkcie onEmailArrival
, ktorá ako jediný parameter dostane príchodzí email.
Čo však, ak počas príchodu emailov nastane nejaká chyba? Funkcia listen
nám umožňuje definovať aj dva ďalšie parametre: onError
a onDone
. Posledný menovaný je zavolaný po zatvorení Streamu
, je to vlastne notifikácia o tom, že ďalšie udalosti už nebudú.
void main() {
myInbox = new Inbox(someConfiguration);
myInbox.onMessage.listen(onEmailArrival, onError: emailError, onDone: noMoreEmails, cancelOnError: true);
}
void onEmailArrival(Email email) {
alert("New email from ${email.from}!");
}
void emailError(error) {
alert("There was an error ${error.message}");
}
void noMoreEmails() {
alert("No money, no emails!");
}
Stream nie je automaticky ukončený, keď nastane chyba. Ak chceme prestať počúvať po výskyte chyby, treba nastaviť parameter cancelOnError
na true
.
Občas sa môžeme rozhodnúť prestať na Stream počúvať.
void main() {
myInbox = new Inbox(someConfiguration);
var subscription = myInbox.onMessage.listen(null, cancelOnError: true);
subscription.onData((Email email) {
alert("New email from ${email.from}!");
if (email.from == 'john.doe@gmail.com') {
subscription.cancel();
}
});
subscription.OnError((error) => alert("There was an error ${error.message}"));
subscription.OnDone(() => alert("No money, no emails!"));
);
V ukážke ignorujeme všetko, čo sa stane po obdržaní emailu z adresy "john.doe@gmail.com"
. Zároveň si môžeme všimnúť alternatívny spôsob registrácie handlerov – cez metódy StreamSubscription
objektu subscription
. Treba si dať pozor na prvý parameter listen
metódy, je povinný a nemôžeme ho vynechať, no môžeme ho nastaviť na null
.
Vytvárame Stream
Občas potrebujeme vlastné Stream
s. Vtedy prichádza na scénu StreamController
, vďaka ktorému ich vieme jednoducho vytvárať.
class Inbox {
final StreamController<Email> _onMessageController = new StreamController.broadcast();
Stream<Email> get onMessage => _onMessageController.stream;
void addEmailToInbox(Email email) {
_onMessageController.add(email);
}
Inbox(someConfiguration) {
// Do something with someConfiguration.
}
}
V ukážke je primitívna implementácia triedy Inbox
z predošlých príkladov, ktorá po zavolaní metódy addEmailToInbox
pridá email
do Stream
u. Všetko zabezpečuje StreamController _onMessageController
, ktorý pridáva udalosti do svojho Stream
u dostupného pod _onMessageController.stream
. Ten sme sprístupnili do okolia cez getter onMessage
. V momente zavolania metódy addEmailToInbox
je email pridaný do Streamu
cez _onMessageController.add(email)
.
Pri vytváraní StreamController
u sme použili konštruktor StreamController.broadcast
. Dôvodom je, že defaultný konštruktor vytvára Stream
, ktorý môže mať naraz len jediného listenera. Toto je len drobný detail, na ktorý treba pamätať, keď chceme produkovať udalosti, na ktorých môže potenciálne počúvať viacero listenerov.
Uzavrieť Stream
vieme pomocou metódy StreamController.close()
, pridať error cez StreamController.addError(Object error, [Object stackTrace])
.
Kompletný príklad by potom vyzeral nasledovne:
class Inbox {
final StreamController<Email> _onMessageController = new StreamController.broadcast();
Stream<Email> get onMessage => _onMessageController.stream;
int _numOfEmails = 0;
int _maxEmails = 100;
void addEmailToInbox(Email email) {
if (!email.from.contains('@')) {
_onMessageController.addError(new Exception("Invalid from address."));
}
_onMessageController.add(email);
_numOfEmails++;
if (_numOfEmails > _maxEmails) {
_onMessageController.close();
}
}
Inbox(someConfiguration) {
// Do something with someConfiguration.
}
}
V ukážke ukončíme Stream
v momente, keď počet prijatých emailov presiahne 100 kusov. Zároveň kontrolujeme validitu from
adresy a vytvárame error pre emaily, ktorú ju nemajú korektnú.
To je na dnes všetko. Nabudúce sa pozrieme na Isolates a dozvieme sa, ako sa v Darte rieši multithreading.
Feedback prosím!
Nájdite si prosím chvíľu času na ohodnotenie tohto článku.
Zdroje
Pri písaní článku som čerpal z viacerých hodnotných zdrojov, nižšie ich nájdete zoradené podľa užitočnosti, zaujímavosti a aktuálnosti.
- https://www.dartlang.org/articles/feet-wet-streams/
- http://api.dartlang.org/docs/releases/latest/dart_async/Stream.html
- http://api.dartlang.org/docs/releases/latest/dart_async/StreamSubscription.html
- http://api.dartlang.org/docs/releases/latest/dart_async/StreamController.html#stream
- https://www.dartlang.org/docs/dart-up-and-running/contents/ch02.html
Zdravím,
tohle je v podstatě třetí druh zpracování asynchroních událostí na který jsem v poslední době narazil. Jsou tu Eventy, Defrredy (pokud vím, tak se také nazývá Features nebo Promises) a teď Streamy. Abych se přiznal tak moc nevím jaký je v tom rozdíl.
Ve finále je to o tom, že mám nějaký objekt, do něho nějakým způsobem předám svojí funkci (callback) a ta se zavolá při dokončení operace. Vždycky můžu navěsit více callbacků které se zavolají. Někdy můžu navěsit také callback pro error. Nicméně základ mi přijde stejný.
Proč je tolik druhů, jsou opravdu tak moc jiné?
Future (též Promise) je _jedna_ asynchronně dodaná hodnota. Stream (též Observable v Reactive eXtensions) je _kolekce_ asynchronně dodaných hodnot. (Abychom si rozuměli: nedostanete celou kolekci najednou, ale dostáváte postupně jednotlivé její prvky, tak jak jsou dostupné.)
Rozdíl mezi observables (ať už Future nebo Stream) a callbacky je v tom, že observables se dají rozumně _komponovat_. Článek to nezmiňuje, ale Futures i Streamy můžete různě filtrovat, transformovat a agregovat (filter, map, flatMap, reduce, atd.).