Skip to content

Commit

Permalink
StateFlow implementation (WIP, DO NOT MERGE)
Browse files Browse the repository at this point in the history
StateFlow is a Flow analogue to ConflatedBroadcastChannel. Since Flow API
is simpler than channels APIs, the implementation of StateFlow is
simpler. It consumes and allocates less memory, while still providing
full deadlock-freedom (even though it is not lock-free internally).

Fixes #1082
  • Loading branch information
elizarov committed Sep 11, 2019
1 parent 89f8c69 commit b141d07
Show file tree
Hide file tree
Showing 9 changed files with 786 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -817,6 +817,23 @@ public abstract class kotlinx/coroutines/flow/AbstractFlow : kotlinx/coroutines/
public abstract fun collectSafely (Lkotlinx/coroutines/flow/FlowCollector;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
}

public abstract interface class kotlinx/coroutines/flow/DataFlow : kotlinx/coroutines/flow/Flow {
public abstract fun close (Ljava/lang/Throwable;)Z
public abstract fun getValue ()Ljava/lang/Object;
public abstract fun isClosed ()Z
public abstract fun isInitialized ()Z
public abstract fun setValue (Ljava/lang/Object;)V
}

public final class kotlinx/coroutines/flow/DataFlow$DefaultImpls {
public static synthetic fun close$default (Lkotlinx/coroutines/flow/DataFlow;Ljava/lang/Throwable;ILjava/lang/Object;)Z
}

public final class kotlinx/coroutines/flow/DataFlowKt {
public static final fun DataFlow ()Lkotlinx/coroutines/flow/DataFlow;
public static final fun DataFlow (Ljava/lang/Object;)Lkotlinx/coroutines/flow/DataFlow;
}

public abstract interface class kotlinx/coroutines/flow/Flow {
public abstract fun collect (Lkotlinx/coroutines/flow/FlowCollector;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
}
Expand Down Expand Up @@ -966,7 +983,7 @@ public final class kotlinx/coroutines/flow/FlowKt {
public static final fun zip (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow;
}

public abstract class kotlinx/coroutines/flow/internal/ChannelFlow : kotlinx/coroutines/flow/Flow {
public abstract class kotlinx/coroutines/flow/internal/ChannelFlow : kotlinx/coroutines/flow/internal/FusibleFlow {
public final field capacity I
public final field context Lkotlin/coroutines/CoroutineContext;
public fun <init> (Lkotlin/coroutines/CoroutineContext;I)V
Expand All @@ -975,10 +992,17 @@ public abstract class kotlinx/coroutines/flow/internal/ChannelFlow : kotlinx/cor
public fun collect (Lkotlinx/coroutines/flow/FlowCollector;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
protected abstract fun collectTo (Lkotlinx/coroutines/channels/ProducerScope;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
protected abstract fun create (Lkotlin/coroutines/CoroutineContext;I)Lkotlinx/coroutines/flow/internal/ChannelFlow;
public fun fuse (Lkotlin/coroutines/CoroutineContext;I)Lkotlinx/coroutines/flow/internal/FusibleFlow;
public fun produceImpl (Lkotlinx/coroutines/CoroutineScope;)Lkotlinx/coroutines/channels/ReceiveChannel;
public fun toString ()Ljava/lang/String;
public final fun update (Lkotlin/coroutines/CoroutineContext;I)Lkotlinx/coroutines/flow/internal/ChannelFlow;
public static synthetic fun update$default (Lkotlinx/coroutines/flow/internal/ChannelFlow;Lkotlin/coroutines/CoroutineContext;IILjava/lang/Object;)Lkotlinx/coroutines/flow/internal/ChannelFlow;
}

public abstract interface class kotlinx/coroutines/flow/internal/FusibleFlow : kotlinx/coroutines/flow/Flow {
public abstract fun fuse (Lkotlin/coroutines/CoroutineContext;I)Lkotlinx/coroutines/flow/internal/FusibleFlow;
}

public final class kotlinx/coroutines/flow/internal/FusibleFlow$DefaultImpls {
public static synthetic fun fuse$default (Lkotlinx/coroutines/flow/internal/FusibleFlow;Lkotlin/coroutines/CoroutineContext;IILjava/lang/Object;)Lkotlinx/coroutines/flow/internal/FusibleFlow;
}

public final class kotlinx/coroutines/flow/internal/CombineKt {
Expand Down
7 changes: 7 additions & 0 deletions kotlinx-coroutines-core/common/src/flow/Migration.kt
Original file line number Diff line number Diff line change
Expand Up @@ -438,3 +438,10 @@ public fun <T> Flow<T>.delayEach(timeMillis: Long): Flow<T> = onEach { delay(tim
replaceWith = ReplaceWith("this.flatMapLatest(transform)")
)
public fun <T, R> Flow<T>.switchMap(transform: suspend (value: T) -> Flow<R>): Flow<R> = flatMapLatest(transform)

@Deprecated(
level = DeprecationLevel.ERROR,
message = "Flow analogues of 'withLatestFrom' is 'withStateOf'",
replaceWith = ReplaceWith("this.withStateOf(other, transform)")
)
public fun <A, B, R> Flow<A>.withLatestFrom(other: Flow<B>, transform: suspend (A, B) -> R): Flow<R> = noImpl()
Loading

0 comments on commit b141d07

Please sign in to comment.