Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Easy Unsubscription #277

Closed
ZakTaccardi opened this issue Mar 11, 2018 · 6 comments
Closed

Easy Unsubscription #277

ZakTaccardi opened this issue Mar 11, 2018 · 6 comments

Comments

@ZakTaccardi
Copy link

One of the best parts of Rx is that unsubscriptions automatically transfer upstream. I can create a complex stream from multiple data sources, transform them, and subscribe to them. Then, I can add them to a CompositeDisposable - which can automatically get cleared during the appropriate lifecycle event, like onDestroyed() (I'm an android developer). It would be really helpful if co-routines could implement this behavior.

val disposables = CompositeDisposable()

// writeable stream in our model layer
val writeableSource: Subject<Int> = PublishSubject.create()

// readable stream. our model layer only exposes readable properties
val readableSource: Observable<Int> = writeableSource

val disposable = readableSource.map {
    println("map executed for: $it")
    it
}
    .subscribe { println(it) }
    .addTo(disposables)

writeableSource.onNext(1)

disposables.clear()

// should be ignored, because we previously called clear()
writeableSource.onNext(2)

// outputs:
// map executed for: 1
// 1

The biggest roadblock to this is that because Channel<T> subscriptions are hot, unsubscriptions do not automatically transfer upstream. Would #254 provide this? More specifically:

// writeable stream in our model layer
val broadcastChannel = BroadcastChannel<Int>(10)

// readable stream
val source: SubscriptionReceiveChannel<Int> = broadcastChannel.openSubscription()

launch {
    source // closing this prevents downstream operators from emitting
        .map { it + 1 }
        .map { it + 2 }
        .also { receiveChannel ->
            // calling `.cancel(null)` on the `ReceiveChannel` returned by the previous
            // `.map` operator above does not close the upstream channel. This can easily
            // cause a memory leak
            receiveChannel.cancel(null)
        }
        // ideally, `.consumeEach()` could somehow be modified to return a `Disposable` like
        // interface that cancels all upstream emissions
        .consumeEach { println(it) }
}
@ZakTaccardi
Copy link
Author

If this is exactly the type of behavior #254 is meant to provide, please close this in favor of it.

@jcornaz
Copy link
Contributor

jcornaz commented Mar 11, 2018

In your latest code example, if you cancel the job returned by launch, it would make the receive method performed by consumeEach to throw a cancellation exception. And, because consumeEach use consume, any exception thrown (including the cancellation exception) would cancel the channel upstream. And because the upstream is a subscription, cancelling means "unsubscribing".

So your code does return a kind of "disposable". It is returned by launch instead of consumeEach as you were expecting. and it is called Job not Disposable.

Aggregating the cancellation can easily be made with cancellation via explicit job.

For example:

val compositeDisposable = Job()

launch(parent = compositeDisposable) {
   channel.consumeEach { println(it) }
}

compositeDisposable.cancel() // This cancel all child jobs.

@jcornaz
Copy link
Contributor

jcornaz commented Mar 11, 2018

// calling .cancel(null) on the ReceiveChannel returned by the previous
// .map operator above does not close the upstream channel. This can easily
// cause a memory leak
receiveChannel.cancel(null)

This is not correct. cancelling the result of map, filter or any other operator built in "kotlinx.coroutines". does cancel the upstream channel. (If you build your own operator, you should take care of it. The easiest way, is to use produce and consume)

You can easily convince yourself if you print something when cancel is called:

fun main(args: Array<String>) {

    // writeable stream in the model layer
    val broadcastChannel = BroadcastChannel<Int>(10)

    // readable stream
    val sub = broadcastChannel.openSubscription()

    // use a delegate to print something when cancel is called on the subscription
    val source: SubscriptionReceiveChannel<Int> = object : SubscriptionReceiveChannel<Int> by sub { 
       override fun cancel(cause: Throwable?): Boolean {
            println("cancelling subscribtion with cause: $cause")
            return sub.cancel(cause)
        }
    }

    val job = launch {
        source // closing this prevents downstream operators from emitting
                .map { it + 1 }
                .map { it + 2 }

                // calling `.cancel(null)` on the `ReceiveChannel` returned by the previous
                // `.map` operator above DOES close the upstream channel. This easily
                // avoid memory leaks
                .cancel(null)
    }

    runBlocking { job.join() }
}

You'll see that this code does print "cancelling subscribtion with cause: null" because of the receiveChannel.cancel(null).

@elizarov
Copy link
Contributor

@jcornaz is absolutely right that channel operators like filter are supposed to be inherently safe and are designed to cancel the upstream data source problem without any need to manually manage it with CompositeDisposable like you have to do with Rx.

Note, that current implementation of filter and others is slightly buggy and may fail to cancel upstream source under some very specific circumstances that are unlikely to happen in real-life code, but still see #279 for details.

I'm closing this particular issue anyway.

@ZakTaccardi
Copy link
Author

ZakTaccardi commented Mar 12, 2018

without any need to manually manage it with CompositeDisposable like you have to do with Rx.

I'm confused, because there still is a need for multi-job cancellation. I would recommend adding a CompositeJobs/CompositeCloseable (or something similar) class whose purpose is to to cancel multiple in flight jobs.

Is there a reason ReceiveChannel<T> and Job do not implement Closeable?

@JakeWharton
Copy link
Contributor

Pass a parent Job, cancel the parent.

Closeable is JVM-specific.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants