Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature request: Async stream generators #8

Closed
omariom opened this issue Jan 15, 2015 · 7 comments
Closed

Feature request: Async stream generators #8

omariom opened this issue Jan 15, 2015 · 7 comments

Comments

@omariom
Copy link

omariom commented Jan 15, 2015

Currenty C# supports pull based sequence generators.
As IObservable<T> and IObserver<T> are now in mscorlib and Rx is such a success not only in .NET but in other languages and runtimes why not to add to C# ability to easily create async push based stream generators.

It could look like this:

private async IObservable<string> GetData()
{ 
    var data = await MakeRequest();
    yield return data;
    yield return await MakeAnotherRequest();
}

It would complete language support in the matrix of generators:

sync async
single T Task<T>
multiple IEnumerable<T> IObservable<T>
@omariom omariom changed the title Feature request: Async generators Feature request: Async stream generators Jan 15, 2015
@neuecc
Copy link

neuecc commented Jan 15, 2015

Only BCL(without Rx) can't handle IObservable[T] easily.
So if IObservable[T] supports in language native, C# needs awaitable foreach.

async Task RunAsync()
{
    // GetData() == IObservable<T>
    // syntax proposal(await foreach)
    await foreach(var item in GetData())
    {
        Console.WriteLine(item);
    }
}

It is equal to following code in Rx.

async Task RunAsync()
{
    // GetData() == IObservable<T>
    await GetData().ForEachAsync(x =>
    {
        Console.WriteLine(x);
    });
}

matrix

consume sync consume async
single () await
multiple foreach ???

@omariom
Copy link
Author

omariom commented Jan 15, 2015

Thanks @neuecc. Completely agree with you.
It is worth creating a separate feature request. I will reference it here.

@jaredpar jaredpar modified the milestone: Unknown Jan 15, 2015
@theoy theoy modified the milestone: Unknown Jan 16, 2015
@MadsTorgersen
Copy link
Contributor

I would really like to address the lack of asynchronous sequences in C#. We've discussed this at length since back when we were still adding async/await to the language.

There are a couple of issues. First off I am not sure if IObservable<T> is the right "async IEnumerable" type to use. It has gratuitous differences from IEnumerable and Task that I think count against it. But if necessary, we could certainly cook up an IAsyncEnumerable<T> that fits in better with the current async infrastructure.

Another concern is how to best deal with batching/chunking. Oftentimes a data stream would arrive in big chunks. If you access every element of each chunk asynchronously (with await) you are incurring a lot of overhead, and essentially blurring the "degree of asynchrony" of the source.

A third issue is query operators. You could certainly imagine implementing the Linq pattern over an asynchronous stream type, but the way query expressions work today, they generate synchronous lambdas, so logic applied in the query clauses themselves couldn't use await. This would be severely limiting. Maybe we can find a way to extend query expressions to deal with this.

Language support for asynchronous sequences would at least support iterators, and possibly also foreach loops and asynchronous query expressions.

So: this is something I'd love for us to look at for the next version of C#. We would have to land with a design that we are very comfortable with in order to include it.

@HaloFour
Copy link

The one issue that I agree with being how you'd use await with an IObservable<T>. I think the most natural behavior would be to read each value sequentially like consuming a queue, which is not how the ForEachAsync() extension method works.

I disagree that we'd need yet another interface to represent asynchronous streams. IObservable<T> and IObserver<T> are already a part of the BCL and I think that it would be confusing to have two interfaces that effectively represent the same thing. Assuming that interfaces like IAsyncEnumerable<T> and IAsyncEnumerator<T> would effectively mirror IEnumerable<T> and IEnumerator<T> save for MoveNextAsync() returning a Task<bool> you also lock the behavior to be more sequential and complicate the possibility of representing chunked and parallel consumption of that stream. Note that Rx already has IAsyncEnumerable<T> in the Ix assembly.

As for query operators, Microsoft already has this project and it works fantastically well. I don't know what kind of conversation needs to happen to get Rx to be shipped as a part of the runtime but just by sticking with IObservable<T> you'll get this for free. If you wanted to expand on the query expressions to allow them to be async themselves that would be a question of extending Rx with the appropriate extension methods, e.g. Observable<T>.WhereAsync(Func<T, Task<bool>> predicate). The language could then be extended to support await contextually with the query expression, e.g. where await foo.SomeAsyncOperation().

@scalablecory
Copy link

I don't see much value in reactive stream generators. Observable.Create already does this very efficiently and I can't see how this would improve on that, in terms of both readability and performance.

I do think Ix's IAsyncEnumerable is a significantly easier tool for existing developers to wrap their heads around, and it would benefit greatly from a yield return -- it is literally impossible to reach the efficiency of yield return (which uses illegal-to-C# goto) without writing the enumerator from scratch.

@omariom
Copy link
Author

omariom commented Jan 29, 2015

@MadsTorgersen, I agree that IObservable is not a good candidate for async sequences.
Async generator and async foreach would have to cooperate like their sync counterparts do.

And given that Rx.NET provides good enough means to asynchronously produce and consume IObservables I am closing this request with the hope async generator/foreach will find their way to C# 7 ✨

@omariom omariom closed this as completed Jan 29, 2015
@HaloFour
Copy link

I'd like to at least see the consumption of "async streams" use an intermediary, like how await relies on GetAwaiter() and the awaitable pattern rather than working directly with Task<T>. Then it should be trivial to use the same looping construct to loop over an IObservable<T> as it would an IAsyncEnumerable<T> or whatever more appropriate structure you guys decide to use.

I still think that it's a travesty that Rx doesn't get the love from the BCL or language teams.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

9 participants