RFC - Streams

478 views
Skip to first unread message

Florian Loitsch

unread,
Nov 27, 2012, 3:08:57 PM11/27/12
to General Dart Discussion
Dear dartisans,

TL;DR: please play with our Stream-based IO and isolates and give us feedback. See instructions near the end of the mail.

As announced some weeks ago, we have continued our library refactoring in an experimental branch to avoid unnecessary breakages. We have now reached a small milestone in our development, and would like to invite the community to play with the current library and give us some feedback.
Milestone “streams”: users have been asking for a simpler asynchronous library, where everything works in a consistent way, for some time now (see for example http://goo.gl/cWaJh for Sean Seagan’s proposal). With this milestone we finally have running code that gets us one step closer to that goal. We have now converted both the IO library and isolates to use our new class for repeating events, Streams. Both are currently only wrapping the old primitives, but eventually we want to remove support for the old API. The html library is not yet adapted but should eventually also use streams for UI events.
We have also improved the Future class, and made it simpler to use. One simple “then” methods lets you apply asynchronous or synchronous functions to the result of a future, merging the three methods “chain”, “transform” and “then”. Streams and Futures should make asynchronous Dart programs easier to write and read, and should reduce some types of programming errors.

Stream

A “Stream” is an object that emits a (possibly non-terminating) sequence of events. In the IO library, these are mainly data events, represented as byte-arrays, whereas for the isolate library the data is most often lists, maps, Strings, numbers and booleans.
The main thing one does with a stream is subscribe to it, calling the method “subscribe” with optional arguments onData, onError, and onDone.
class Stream<T> {
 StreamSubscription<T> subscribe(
     {void onData(T data),
      void onError(AsyncError error),
      void onDone(),
      bool unsubscribeOnError});
 ...
}

Ignoring corner-cases, and in particular error-handling, this method basically subscribes to the stream and will then receive all events.
Example:
new File('/etc/passwd').openForRead() // returns a Stream.
   .subscribe(onData: (List<int> data) { print(data.length); });

On my machine this prints one line, “2060”, indicating that the whole file was read in one go, and that the data-handler was invoked once.
$ ls -l /etc/passwd
-rw-r--r-- 1 root root 2060 Sep 19 03:20 /etc/passwd

To change an existing subscription to a stream, one should use the return value from .subscribe, which is a StreamSubscription object. This lets one end a subscription (“unsubscribe”), attempt to throttle the stream (“pause”), or change the handlers that were set when subscribing initially.

class StreamSubscription<T> {
 void unsubscribe();
 void pause([Signal resume]);

 void onData(void handleData(T data));
 void onDone(void handleDone());
 void onError(void handleError(AsyncError error));
}

Contrary to the addEventHandler/removeEventHandler pattern we don’t allow unsubscription using the onData method, but require going through the subscription object.
The onData/onDone/onError methods have the same semantics as in Stream.subscribe. By having them on the subscription we provide the flexibility to change the subscription at a later point.

The pause method does a best effort to pause the stream (usually by transmitting the pause-request to its source). However, unless the Stream is a “BufferingStream”, there is no guarantee that the pause will actually have an effect.

We have learned a lot by looking at .Net’s RX (reactive extensions) and its port to Dart by John Evan: https://github.com/prujohn/Reactive-Dart. We feel that Iterables and Streams should be tightly linked together, like C#’s Enumerables and Observables. That is, we see Streams as the push version of Iterables. Iterables provide a sequence of data elements on demand (pull), and Streams asynchronously push the elements, and demand that they be handled. Both classes deal with sequences of data, though, and their interfaces thus provide similar functions:
class Stream<T> {
 ...
 Future<bool> get isEmpty;
 Future<int> get length;

 Future<bool> any(bool test(T element));
 Future<bool> every(bool test(T element));
 Future<bool> contains(T match);
 Future reduce(var initialValue, combine(previous, T element));
 Future<List<T>> toList();
 Future<Set<T>> toSet();
 ...
}

If you haven’t seen all of these methods on Iterable yet, it is because some of these methods have only been added to Iterable recently. Here is the relevant snippet of the Iterable class in our experimental branch:

class Iterable<T> {
 ...
 bool get isEmpty;
 int get length;

 bool any(bool test(T element));
 bool every(bool test(T element));
 bool contains(T match);
 reduce(var initialValue, combine(previous, T element));
 List<T> toList();
 Set<T> toSet();
 ...
 // See below for a discussion on the mappedTo and where change.
 // Replaces "map".
 Iterable mappedTo(convert(T element));
 // Replaces "filter".
 Iterable where(bool test(T element));
}


There are also a list of methods that transform a stream and return a new Stream. Again we make the behavior analog to methods in Iterable:

class Stream<T> {
 ...
 Stream mappedTo(convert(T element));
 Stream<T> where(bool test(T element));

 // Transforms one data-event into zero or more converted
 // data-events.
 Stream expand(Iterable convert(T value));
 // Transforms one event (data, error or done) into zero or more
 // converted events.
 Stream transform(StreamTransformer transformer);

 // Stream-only method without counterpart in Iterable.
 // Returns [this] wrapped in a stream that is guaranteed to
 // respect the pause-request.
 Stream<T> buffered()
 ...
}

Both Iterable and Stream are still missing some transformation functions (see for example http://goo.gl/SfUwm for a changelist that is currently under review), but we believe that there are already enough to start playing with the streams.

A StreamController lets you create streams. The stream’s events come from a linked stream sink that the data is pushed into.

class StreamController<T> {
 ...
 Stream<T> get stream;
 StreamSink<T> get sink;
}

Data that is fed into the sink is then pushed as events on the stream.

abstract class StreamSink<T> {
 void add(T event);
 void signalError(AsyncError errorEvent);
 void close();
}

For convenience, the StreamController already implements both the Stream and StreamSink interface. The stream and sink getters return wrappers that limit the functionality. This way it is possible to pass on a stream to a different library, while still knowing that the receiver cannot trigger new events.

Streams live in the async package and are asynchronous. That is, they should not send any data before the end of the current event-loop iteration. The streams provided by the async-library guarantee this behavior. This gives the program time to attach subscribers to events. After the control returns to the event loop a stream is allowed to send its events, even if there are no subscribers. This means that data and errors can be lost if there are no subscribers.
An tricky point is that transformed streams need a final subscriber, to make events go through the transformation before being dropped. Transforming streams are not subscribing themselves until they themselves have been subscribed to.

Example:
new File('/etc/passwd').openForRead()
 .mappedTo((data) {
   print(data);
   return data.length;
 });

In this example the stream (coming from ‘openForRead’) is transformed by ‘mappedTo’, which has no subscribers. At the next event-loop iteration the file stream will start firing events. Since the mappedTo stream doesn’t have any subscribers it didn’t subscribe to the file-stream. As a consequence the file-stream has no subscribers and will just discard its events. No data will be printed.

Other library changes

This stream change didn’t happen in the void. While working on them we also improved and changed Dart in other parts of the library. We recently stabilized our experimental branch, but work in the other areas is still (more) work in progress. While we appreciate input on every part of the library we are aware that there are still things missing.

Iterable

  • Iterable.iterator() has been changed to Iterable.iterator. That is, the iterator method has been replaced by the equivalent iterator getter.

  • Iterators have been changed from next and hasNext to current and moveNext, as is the case in C#.:

abstract class Iterator<T> {
 // Returns the current element which got updated at the
 // last [moveNext]. Initially (before any [moveNext]) return
 // [:null:].
 T get current;

 // Moves to the next element and and updates [current].
 // Returns [:true:] if [this] is still in bounds. Otherwise
 // sets [current] to [:null:] and returns [:false:].
 bool moveNext();
}

Note that, contrary to how C# acts, Dart iterators don’t throw when accessed out of bounds.

Example:
var it = [1, 2].iterator;
while(it.moveNext()) {
 print(it.current);  // 1, 2
}
print(it.current);  // null.

If your code relies on the hasNext and next methods we have a HasNextIterator wrapper class for easy porting:

var it = new HasNextIterator([1, 2].iterator);
while(it.hasNext) {
 print(it.next());  // 1, 2
}


  • Iterable.map and Iterable.filter are now lazily applied and return an Iterable. That is, the mapping or filtering doesn’t happen immediately, but when the resulting iterable is iterated over. Due to the new semantics the functions have been renamed to “mappedBy” and “where”.
    We have also added “toList” and “toSet” to Iterables, which create a fresh List or Set. These force a lazy iterable to be evaluated, and store the results in a concrete collection.

Example:
Iterable iterable =
   [1, 2, 3, 4].where((x) { print(x); return x < 3; });
for (var x in iterable) {  // Prints 1, 2, 3, 4.
 /* do nothing. */
}
for (var x in iterable) {  // Prints 1, 2, 3, 4 again.
 /* do nothing. */
}

List list =
   [1, 2, 3, 4]
   .where((x) { print(x); return x < 3; })
   .toList();  // prints 1, 2, 3, 4 and stores the
               // filtered result in a fresh list.

for (var x in iterable) {  // Doesn’t print anything.
 /* do nothing. */
}

Style changes

Methods whose names are nouns and don’t take any arguments should be getters.
This means that a call to a getter does not give any guarantee on the performance. The decision to use either a getter or a method now only depends on its name and the number of arguments. For example: “isEmpty” (boolean name), “iterator” (noun name) or “length” (noun name) are all getters.
In the same way that programmers need to be aware of slow functions, programmers need to be aware of slow getters. For example a “contains” method needs to be documented that it is in O(n) and not O(1). Getters now need similar documentation.
While we don’t explicitly forbid a getter to have a visible side-effects on its object it is generally a bad idea. In most cases this means that the name hides its intend and should be changed.

How to get the new libraries

Follow the instructions on https://code.google.com/p/dart/wiki/GettingTheSource?tm=4 but replace “branches/bleeding_edge” with “experimental/lib_v2”.

Once you are done building you can take the “sdk” directory and put into the editor’s directory (replacing the pre-installed version). This give you syntax highlighting for the new classes and functions. Make sure to use a very recent build of the editor. The editor-team just updated the analyzer so that it accepts the new SDK without warnings about iterator being a field and not a method.

What’s next

Even during the writing of this mail we already spotted areas for improvements, and new open questions. The current version of the asynchronous library is hence still a work in progress.
In the near-term future we will, however, also look more seriously into the collection library. Most of our effort will be spent on cleaning these two libraries.
In parallel we will go through open bugs and make sure that they don’t clash with the changes we propose. Once we are mostly satisfied with the libraries and are confident that the open bugs won’t require breaking changes to our new libraries we intend to merge back into bleeding_edge.

Feedback

This mail is an RFC. We want to hear your feedback. What works? What doesn’t? Are there important use-cases that we have overlooked?
If you think that there is something we need to address feel free to open a bug on http://dartbug.com/new. If you just want to send us your feedback you can also simply use this mailing-list. Both will be read by us but the issue-tracker is better if you want to make sure we don’t forget, and/or if you want to track progress.


Appendix - Examples

Example1 (attached as "example1.dart").

import 'dart:async';
import 'dart:io';

int fib(int v) {
 if (v <= 1) return v;
 return fib(v - 1) + fib(v - 2);
}

/**
* Splits the incoming string into lines. Emits a new data
* event for each line.
*/
class LineTransformer extends StreamTransformer {
 String _buffer = "";

 void handleData(String data, Sink sink) {
   int index = data.indexOf('\n');
   if (index >= 0) {
     int start = 0;
     do {
       sink.add("$_buffer${data.substring(start, index)}");
       _buffer = "";
       start = index + 1;
       index = data.indexOf('\n', start);
     } while (index >= start);
     _buffer = data.substring(start);
   } else {
     _buffer = "$_buffer$data";
   }
 }

 void handleDone(Sink sink) {
   if (_buffer != "") sink.add(_buffer);
   sink.close();
 }
}

void main() {
 new File('numbers.dat').openForRead()
   .transform(new Utf8Decoder())
   .transform(new LineTransformer())
   .mappedBy(int.parse)
   .mappedBy(fib)
   .subscribe(onData: print);
}




Example 2 (attached as big_example.dart"):

import 'dart:async';
import 'dart:crypto';
import 'dart:io';
import 'dart:isolate';

/**
* Computes the SHA256 hash for the incoming data. Intercepts all
* events and emits the computed hash as data-event when the incoming
* is done.
*/
class SHA256Transform extends StreamTransformer {
 SHA256 hash = new SHA256();

 void handleData(List<int> data, Sink sink) {
   hash.update(data);
 }

 void handleError(ASyncError error, Sink sink) {
   sink.add(CryptoUtils.bytesToHex(hash.digest()));
   sink.close();
 }

 void handleDone(Sink sink) {
   sink.add(CryptoUtils.bytesToHex(hash.digest()));
   sink.close();
 }
}

Stream<String> compute(String path) {
 // If the path doesn't exist, it's treated as an empty file.
 print("Serving $path");
 return new File(path).openForRead()
     .transform(new SHA256Transform());
}

void initIsolate() {
 var sink = null;
 var subscription = stream.subscribe();
 subscription.onData((data) {
   compute(data[0]).pipe(data[1]);
 });
 subscription.onDone(() {
   sink.close();
 });
}

void main() {
 // Create a pool of 32 isolates to compute from. Plain round-robin
 // distribution of jobs
 var pool = new List(32);
 int current = 0;
 for (int i = 0; i < pool.length; i++) {
   pool[i] = streamSpawnFunction(initIsolate);
 }

 Sink getSink() {
   return pool[current++ % pool.length];
 }

 Stream<String> getHash(String path) {
   var sink = getSink();
   var box = new MessageBox();
   sink.add([path, box.sink]);
   return box.stream;
 }

 var server = new HttpServer();
 server.listen("0.0.0.0", 8080);
 server.defaultRequestHandler = (request, response) {
   Process.start('cat', []).then((process) {
     getHash(".${request.path}")
       .transform(new Utf8Encoder())
       .pipe(process);
     process.stdoutStream.pipe(response);
   });
 };
}
--
Give a man a fire and he's warm for the whole day,
but set fire to him and he's warm for the rest of his life. - Terry Pratchett
big_example.dart
example1.dart

W. Brian Gourlie

unread,
Nov 27, 2012, 4:39:15 PM11/27/12
to mi...@dartlang.org
While we're waiting for html events to officially adopt streams, is there a quick and dirty way to consume html events via streams in the meantime?

Florian Loitsch

unread,
Nov 27, 2012, 4:55:50 PM11/27/12
to General Dart Discussion
Untested (on my Chromebook):
var controller = new StreamController();
element.on.click.add((event) => controller.add(event));
return controller.stream;

You could also just use the controller as a stream, but since you don't need the sink-part anymore, this should be good enough.
Note that this will never unsubscribe the controller. If you need this, you would need to extend the StreamController.


--
Consider asking HOWTO questions at Stack Overflow: http://stackoverflow.com/tags/dart
 
 

W. Brian Gourlie

unread,
Nov 27, 2012, 5:08:53 PM11/27/12
to mi...@dartlang.org
Awesome, I will start migrating my stuff from John's reactive library to streams and provide any feedback.  Big thanks to John for his initial work on the reactive library, btw.

Brian

Greg Lowe

unread,
Nov 27, 2012, 6:09:14 PM11/27/12
to mi...@dartlang.org
The Stream class looks great. The best part about having this in the standard library is that everyone's libraries which use this can interoperate, rather than having dozens of similar but incompatible implementations. 

Perhaps it's worth adding a couple of extra getters to the Stream class:

/// Return the first item sent by the stream. If the stream does not send only one item then return an error.
Future<T> get one;

/// Return the first item sent by the stream. Subsequent items are ignored. If no items are returned then return an error.
Future<T> get first;

/// Return the last item sent by the stream. If no items are returned then return an error.
Future<T> get last;

Mehmet D. Akın

unread,
Nov 27, 2012, 6:17:00 PM11/27/12
to mi...@dartlang.org
[...]

Stream

A “Stream” is an object that emits a (possibly non-terminating) sequence of events. In the IO library, these are mainly data events, represented as byte-arrays, whereas for the isolate library the data is most often lists, maps, Strings, numbers and booleans.
The main thing one does with a stream is subscribe to it, calling the method “subscribe” with optional arguments onData, onError, and onDone.
class Stream<T> {
 StreamSubscription<T> subscribe(
     {void onData(T data),
      void onError(AsyncError error),
      void onDone(),
      bool unsubscribeOnError});
 ...
}

Ignoring corner-cases, and in particular error-handling, this method basically subscribes to the stream and will then receive all events.
Example:
new File('/etc/passwd').openForRead() // returns a Stream.
   .subscribe(onData: (List<int> data) { print(data.length); });

Forgive me if Iam missing something obvious. 

Why do we need subscribe method with  optional methods? Imho this looks cleaner:
  new File('/etc/passwd').openForRead()
.subscribe()
..onData((List<int> data){...})
..onError(..)

If those optional arguments are for convenience, maybe pause method should have been included as well.

[...]

Paul Brauner

unread,
Nov 27, 2012, 8:32:20 PM11/27/12
to General Dart Discussion
This is really awesome!

A quick remark: you may have been spared by it but the lastest rage in functional programming is Iteratees (http://okmij.org/ftp/Haskell/Iteratee/describe.pdf). It's exactly about that: composing streams, making high-level streams out of low-level streams, handling close events, etc. I'm mentioning it because some people have put some deep thoughts in making them behave and compose nicely while preserving desirable properties (there are dozens of blog posts out there). It's not only a Haskell thing, it's used in Play! 2.0: http://www.playframework.org/documentation/2.0/Iteratees. Maybe it would be worth comparing these APIs and see if we can learn something.

I'll try to map your API's concepts to these concepts this week and port some examples.

Paul


--

Sean Eagan

unread,
Nov 27, 2012, 10:23:08 PM11/27/12
to General Dart Discussion

I really like thinking about Streams as async Iterables like this.  One possibility that this brings to mind is having language support for Streams similar to the proposed language support for Futures i.e. "await".  It might look something like:

/// gets the first [charCount] bytes from [file]
async String fileHead(File file, int byteCount) {
  var bytes = <int>[];
  on(List<int> chunk in file.openForRead()) {
    bytes.append(chunk);
    if(bytes.length > byteCount) {
      return bytes.getRange(0, byteCount); // both "return" and "break" trigger an "unsubscribe()"
    }
  }
}

notes:

* this function would implicitly return a Future<String>
* the "on" statement is analogous to "for" and would trigger a subscribe()
* if the Stream passed to the "on" statement throws, then the "on" statement throws the unwrapped error, not the AsyncError
* a "return" or "break" inside the "on" statement would trigger an unsubscribe()

If anyone else thinks this would be useful I can file a feature request, and someone can mark it tentatively for M50 (it's imagine it's not trivial to implement) :).

Cheers,
Sean Eagan

On Tue, Nov 27, 2012 at 2:08 PM, Florian Loitsch <floi...@google.com> wrote:
Dear dartisans,

TL;DR: please play with our Stream-based IO and isolates and give us feedback. See instructions near the end of the mail.

As announced some weeks ago, we have continued our library refactoring in an experimental branch to avoid unnecessary breakages. We have now reached a small milestone in our development, and would like to invite the community to play with the current library and give us some feedback.
Milestone “streams”: users have been asking for a simpler asynchronous library, where everything works in a consistent way, for some time now (see for example http://goo.gl/cWaJh for Sean Seagan’s proposal). With this milestone we finally have running code that gets us one step closer to that goal. We have now converted both the IO library and isolates to use our new class for repeating events, Streams. Both are currently only wrapping the old primitives, but eventually we want to remove support for the old API. The html library is not yet adapted but should eventually also use streams for UI events.
We have also improved the Future class, and made it simpler to use. One simple “then” methods lets you apply asynchronous or synchronous functions to the result of a future, merging the three methods “chain”, “transform” and “then”.

I have had a few occasions where I wanted to cancel "then" requests, if "then" returned a 

--

Eric Leese

unread,
Nov 27, 2012, 10:44:48 PM11/27/12
to mi...@dartlang.org
I'd star that.  There's one thing that would need to be cleared up in the proposal, though.

So if you have on and you have await, you could do something like:

async asyncFunc1(Stream stream) {
  on(var x in stream) {
    y = await x.asyncFunc2();
    // ... Can the next iteration of this loop begin before this code is executed?
  }
}

So...  The simple way to implement the behavior of "on" would allow for either the second iteration of the loop to begin or the first iteration (after await) to finish, depending on whichever event happened first.  And that may be more efficient, but it's also more complicated to think about; the beauty of "await" is that the code looks and acts like sequential code, except that can be interrupted at the awaits.

Ladislav Thon

unread,
Nov 28, 2012, 12:19:51 AM11/28/12
to mi...@dartlang.org

I love that streams are called Streams (instead of Observables, that's a horrible name) and I believe that this should go into dart:core. Async is central to Dart. The analogy with Iterables works great.

But there is one thing that I hate about almost all reactive frameworks, this one included -- their push-based nature. For me, inversion of control doesn't really work here. Maybe it's just me, but I always have a hard time thinking about such code. And you practically _have to_ have all these combinators like mappedTo, where or reduce, because you _can't_ use all the language features that you have (if, while, for), not even speaking about exceptions.

I wanted to try to prototype a pull-based (or at least looking-like-pull-based) reactive framework, but I have two other interesting projects in the work right now, so... :-(

And a little bit of mandatory bike shedding: I don't like the naming of Stream/Sink/StreamController. It took me quite a while to realize that Sink (which I thought would be something where the stream ends) is actually a beginning of the stream. StreamSource, maybe? And StreamController is actually a Stream and its source together... more like WritableStream or something.

LT

P.S.: huge thanks for this email, it made me understand a lot better than looking at the code!

Paul Brauner

unread,
Nov 28, 2012, 3:46:35 AM11/28/12
to General Dart Discussion
On Wed, Nov 28, 2012 at 6:19 AM, Ladislav Thon <lad...@gmail.com> wrote:

I love that streams are called Streams (instead of Observables, that's a horrible name) and I believe that this should go into dart:core. Async is central to Dart. The analogy with Iterables works great.

But there is one thing that I hate about almost all reactive frameworks, this one included -- their push-based nature. For me, inversion of control doesn't really work here. Maybe it's just me, but I always have a hard time thinking about such code. And you practically _have to_ have all these combinators like mappedTo, where or reduce, because you _can't_ use all the language features that you have (if, while, for), not even speaking about exceptions.

Yes, that's the poor man's version of coroutines :)

I wanted to try to prototype a pull-based (or at least looking-like-pull-based) reactive framework, but I have two other interesting projects in the work right now, so... :-(

Conal Elliot tried something along these lines (http://conal.net/papers/push-pull-frp/) and blogged later that he was struggling with serious performance problems. I don't know what's the current status. But anyway in his implementation the API doesn't change, which seems to be your main concern. How would the API you're thinking of look (roughly)? My understanding was that Dart lacks the basic primitives to re-invert the control.

And a little bit of mandatory bike shedding: I don't like the naming of Stream/Sink/StreamController. It took me quite a while to realize that Sink (which I thought would be something where the stream ends) is actually a beginning of the stream. StreamSource, maybe? And StreamController is actually a Stream and its source together... more like WritableStream or something.

LT

P.S.: huge thanks for this email, it made me understand a lot better than looking at the code!

--

Ladislav Thon

unread,
Nov 28, 2012, 4:31:29 AM11/28/12
to mi...@dartlang.org

I wanted to try to prototype a pull-based (or at least looking-like-pull-based) reactive framework, but I have two other interesting projects in the work right now, so... :-(

Conal Elliot tried something along these lines (http://conal.net/papers/push-pull-frp/) and blogged later that he was struggling with serious performance problems. I don't know what's the current status. But anyway in his implementation the API doesn't change, which seems to be your main concern. How would the API you're thinking of look (roughly)? My understanding was that Dart lacks the basic primitives to re-invert the control.

I wanted to try to implement something like the code in chapter 3 of the Deprecating the Observer Pattern paper (http://lampwww.epfl.ch/~imaier/pub/DeprecatingObserversTR2010.pdf), at least to find how natural would it look like in Dart.

LT

Paul Brauner

unread,
Nov 28, 2012, 4:53:41 AM11/28/12
to General Dart Discussion
I was about to cite that one :) But I think the trick of this paper (if I remember correctly) is that it relies on delimited continuations to re-invert control. And that's exactly what Dart misses (on purpose since it's not clear how to compile them to efficient Javascript).
 
LT

Erik Corry

unread,
Nov 28, 2012, 4:57:38 AM11/28/12
to mi...@dartlang.org
On Tue, Nov 27, 2012 at 9:08 PM, Florian Loitsch <floi...@google.com> wrote:
> Note that, contrary to how C# acts, Dart iterators don’t throw when accessed
> out of bounds.
>
> Example:
> var it = [1, 2].iterator;
> while(it.moveNext()) {
> print(it.current); // 1, 2
> }
> print(it.current); // null.

I wonder what the motivation for this is. It seems like it's
analogous to an out-of-bounds array access, which throws rather than
just returning null.

--
Erik Corry, Software Engineer
Google Denmark ApS - Frederiksborggade 20B, 1 sal,
1360 København K - Denmark - CVR nr. 28 86 69 84

Ladislav Thon

unread,
Nov 28, 2012, 5:10:48 AM11/28/12
to mi...@dartlang.org
I wanted to try to implement something like the code in chapter 3 of the Deprecating the Observer Pattern paper (http://lampwww.epfl.ch/~imaier/pub/DeprecatingObserversTR2010.pdf), at least to find how natural would it look like in Dart.


I was about to cite that one :) But I think the trick of this paper (if I remember correctly) is that it relies on delimited continuations to re-invert control. And that's exactly what Dart misses (on purpose since it's not clear how to compile them to efficient Javascript).

I didn't look at the code for that, but if I understand correctly, they throw continuations in the basket only in later stages. I wanted to stick to the basic pull-based model of chapter 3 and I thought I'd be able to implement that in Dart just fine. Maybe I'm mistaken, can't know until I get my feet wet.

LT

Mehmet D. Akın

unread,
Nov 28, 2012, 5:18:39 AM11/28/12
to mi...@dartlang.org
Ok, I see the explanation is also in the document: "The onData/onDone/onError methods have the same semantics as in Stream.subscribe. By having them on the subscription we provide the flexibility to change the subscription at a later point." But I still don't think this justifies two different ways of doing same thing, one can always assign the result of subscribe() to a subscription anyway.

Florian Loitsch

unread,
Nov 28, 2012, 5:46:02 AM11/28/12
to General Dart Discussion
On Wed, Nov 28, 2012 at 10:57 AM, Erik Corry <erik...@google.com> wrote:
On Tue, Nov 27, 2012 at 9:08 PM, Florian Loitsch <floi...@google.com> wrote:
> Note that, contrary to how C# acts, Dart iterators don’t throw when accessed
> out of bounds.
>
> Example:
> var it = [1, 2].iterator;
> while(it.moveNext()) {
>  print(it.current);  // 1, 2
> }
> print(it.current);  // null.

I wonder what the motivation for this is.  It seems like it's
analogous to an out-of-bounds array access, which throws rather than
just returning null.

The main reason was that it makes the implementation of iterators cleaner. Semantically it is not that bad either: 'current' just represents a field that is field by the moveNext method. This is actually reflected by the fact that 'current' should not change after a moveNext:

var list = [1, 2];
var it = list.iterator;
it.moveNext();
print(it.current); // => 1
list[0] = 499;
print(it.current.; // => 1



--
Erik Corry, Software Engineer
Google Denmark ApS - Frederiksborggade 20B, 1 sal,
1360 København K - Denmark - CVR nr. 28 86 69 84
--
Consider asking HOWTO questions at Stack Overflow: http://stackoverflow.com/tags/dart


Paul Brauner

unread,
Nov 28, 2012, 6:10:00 AM11/28/12
to General Dart Discussion
On Wed, Nov 28, 2012 at 11:10 AM, Ladislav Thon <lad...@gmail.com> wrote:

I wanted to try to implement something like the code in chapter 3 of the Deprecating the Observer Pattern paper (http://lampwww.epfl.ch/~imaier/pub/DeprecatingObserversTR2010.pdf), at least to find how natural would it look like in Dart.


I was about to cite that one :) But I think the trick of this paper (if I remember correctly) is that it relies on delimited continuations to re-invert control. And that's exactly what Dart misses (on purpose since it's not clear how to compile them to efficient Javascript).

I didn't look at the code for that, but if I understand correctly, they throw continuations in the basket only in later stages.

I think it's just not mentioned at this point but it's already used. For instance in section 3 it says: "Method next simply suspends the current reactor until the given event stream emits a value". The only way I can think of to implement that kind of suspension is by using some kind of continuation which seems to the case if you look at section 6.4. Maybe I'm missing something but that's how I understand it: you need del/cc or yield or whatever captures the stack to implement such an API.
 
I wanted to stick to the basic pull-based model of chapter 3 and I thought I'd be able to implement that in Dart just fine. Maybe I'm mistaken, can't know until I get my feet wet.

LT

--

Florian Loitsch

unread,
Nov 28, 2012, 6:38:00 AM11/28/12
to General Dart Discussion
Yes. we are going to add more of these.

--
Consider asking HOWTO questions at Stack Overflow: http://stackoverflow.com/tags/dart
 
 

Florian Loitsch

unread,
Nov 28, 2012, 8:05:56 AM11/28/12
to General Dart Discussion
The Sink is the thing were you pour stuff into. We use it as a feed for Streams, but also as the target of the pipe call. Anything that accepts stuff is a Sink.

LT

P.S.: huge thanks for this email, it made me understand a lot better than looking at the code!

--
Consider asking HOWTO questions at Stack Overflow: http://stackoverflow.com/tags/dart
 
 

Sean Eagan

unread,
Nov 28, 2012, 2:41:05 PM11/28/12
to General Dart Discussion

On Tue, Nov 27, 2012 at 9:44 PM, Eric Leese <le...@google.com> wrote:
I'd star that.  There's one thing that would need to be cleared up in the proposal, though.

So if you have on and you have await, you could do something like:

async asyncFunc1(Stream stream) {
  on(var x in stream) {
    y = await x.asyncFunc2();
    // ... Can the next iteration of this loop begin before this code is executed?
  }
}

So...  The simple way to implement the behavior of "on" would allow for either the second iteration of the loop to begin or the first iteration (after await) to finish, depending on whichever event happened first.  And that may be more efficient, but it's also more complicated to think about; the beauty of "await" is that the code looks and acts like sequential code, except that can be interrupted at the awaits.

I'm thinking any nested "await"s and "on"s could be delimited by "pause()" and "resume()" calls to the relevant StreamSubscription, but since that's only a best effort, any events which do still occur before the nested items complete could be buffered for use in later iterations of the "on" statement.

I added a feature request at:


Cheers,
Sean

dangli...@gmail.com

unread,
Oct 31, 2013, 1:23:24 AM10/31/13
to mi...@dartlang.org
>>  So if you have on and you have await, you could do something like:

async asyncFunc1(Stream stream) {
  on(var x in stream) {
    y = await x.asyncFunc2();
    // ... Can the next iteration of this loop begin before this code is executed?
  }
}

I can explain you why this will be compiled.
By convention (and limitation) operator "await" may be use only inside function body that declared as "async" explicitly.

In your case function "on" is not "async" because it not declared explicitly. It just a normal function.
Thus you cannot use operator "await" in body of this function.

If you declare it as the following:

async on(var x in stream) {

    y = await x.asyncFunc2();
    // ... Can the next iteration of this loop begin before this code is
}

Then it becomes "async" but this will be already not the synchronous function.
Later in program "on" function must be necessarily have been "awaited" otherwise all program will work not correct.
If "on" function will be not awaited and and exception throw in body of this function then this exception will be an unobserved exception and can throw in some unpredictable place.

The "await" operator is not so hard to implement but they have some limitations. By example the "catch" and "finally" block cannot have "await" operators because this is very hard to implement them in this blocks because them can be nested but in generated function body exists only one exception handler.
I.e. implementing awaiting operation in "catch" block not possible because already going the process of exception handling.
In finally block this in not possible because this block also treated as special block of exception handling and also this block transfer control by condition where as condition can be assumed these rule (approximately).

+++++++++++++++++++++++++++
// user statements of this block goes here

if(__exception != null) {
  __state = EXCEPTION_HANDLER;
} else if(transferControlTo != null) {
  __state = __transferControl; // break, continue
}
else {
  __state = NEXT_STATEMENT;
}
+++++++++++++++++++++++++++

Also does not forgotten then "finally" block can have "return" statement that may transfer control (if there is no exception) to the "finally" handler from the previous level.
Reply all
Reply to author
Forward
0 new messages