Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

multicast: dont cancel upstream sequence when client is cancelled #32

Merged
merged 1 commit into from
Jan 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
**v0.5.2 - Oxygen:**

This version is a bug fix version.

- Multicast: don't cancel the upstream sequence when a client is cancelled

**v0.5.1 - Nitrogen:**

This version removes compilation unsafe flags
Expand Down
26 changes: 11 additions & 15 deletions Sources/AsyncSubjects/AsyncCurrentValueSubject.swift
Original file line number Diff line number Diff line change
Expand Up @@ -67,28 +67,24 @@ public final class AsyncCurrentValueSubject<Element>: AsyncSubject where Element
/// Sends a value to all consumers
/// - Parameter element: the value to send
public func send(_ element: Element) {
let channels = self.state.withCriticalRegion { state -> [AsyncBufferedChannel<Element>] in
self.state.withCriticalRegion { state in
state.current = element
return Array(state.channels.values)
}

for channel in channels {
channel.send(element)
for channel in state.channels.values {
channel.send(element)
}
}
}

/// Finishes the async sequences with a normal ending.
/// - Parameter termination: The termination to finish the subject.
public func send(_ termination: Termination<Failure>) {
let channels = self.state.withCriticalRegion { state -> [AsyncBufferedChannel<Element>] in
self.state.withCriticalRegion { state in
state.terminalState = termination
let channels = Array(state.channels.values)
state.channels.removeAll()
return channels
}

for channel in channels {
channel.finish()
for channel in channels {
channel.finish()
}
}
}

Expand Down Expand Up @@ -138,10 +134,10 @@ public final class AsyncCurrentValueSubject<Element>: AsyncSubject where Element
}

public mutating func next() async -> Element? {
await withTaskCancellationHandler { [unregister] in
unregister()
} operation: {
await withTaskCancellationHandler {
await self.iterator.next()
} onCancel: { [unregister] in
unregister()
}
}
}
Expand Down
26 changes: 11 additions & 15 deletions Sources/AsyncSubjects/AsyncPassthroughSubject.swift
Original file line number Diff line number Diff line change
Expand Up @@ -52,27 +52,23 @@ public final class AsyncPassthroughSubject<Element>: AsyncSubject {
/// Sends a value to all consumers
/// - Parameter element: the value to send
public func send(_ element: Element) {
let channels = self.state.withCriticalRegion { state in
state.channels.values
}

for channel in channels {
channel.send(element)
self.state.withCriticalRegion { state in
for channel in state.channels.values {
channel.send(element)
}
}
}

/// Finishes the subject with a normal ending.
/// - Parameter termination: The termination to finish the subject
public func send(_ termination: Termination<Failure>) {
let channels = self.state.withCriticalRegion { state -> [AsyncBufferedChannel<Element>] in
self.state.withCriticalRegion { state in
state.terminalState = termination
let channels = Array(state.channels.values)
state.channels.removeAll()
return channels
}

for channel in channels {
channel.finish()
for channel in channels {
channel.finish()
}
}
}

Expand Down Expand Up @@ -120,10 +116,10 @@ public final class AsyncPassthroughSubject<Element>: AsyncSubject {
}

public mutating func next() async -> Element? {
await withTaskCancellationHandler { [unregister] in
unregister()
} operation: {
await withTaskCancellationHandler {
await self.iterator.next()
} onCancel: { [unregister] in
unregister()
}
}
}
Expand Down
26 changes: 11 additions & 15 deletions Sources/AsyncSubjects/AsyncReplaySubject.swift
Original file line number Diff line number Diff line change
Expand Up @@ -46,33 +46,29 @@ public final class AsyncReplaySubject<Element>: AsyncSubject where Element: Send
/// Sends a value to all consumers
/// - Parameter element: the value to send
public func send(_ element: Element) {
let channels = self.state.withCriticalRegion { state -> [AsyncBufferedChannel<Element>] in
self.state.withCriticalRegion { state in
if state.buffer.count >= state.bufferSize && !state.buffer.isEmpty {
state.buffer.removeFirst()
}
state.buffer.append(element)
return Array(state.channels.values)
}

for channel in channels {
channel.send(element)
for channel in state.channels.values {
channel.send(element)
}
}
}

/// Finishes the subject with a normal ending.
/// - Parameter termination: The termination to finish the subject.
public func send(_ termination: Termination<Failure>) {
let channels = self.state.withCriticalRegion { state -> [AsyncBufferedChannel<Element>] in
self.state.withCriticalRegion { state in
state.terminalState = termination
let channels = Array(state.channels.values)
state.channels.removeAll()
state.buffer.removeAll()
state.bufferSize = 0
return channels
}

for channel in channels {
channel.finish()
for channel in channels {
channel.finish()
}
}
}

Expand Down Expand Up @@ -124,10 +120,10 @@ public final class AsyncReplaySubject<Element>: AsyncSubject where Element: Send
}

public mutating func next() async -> Element? {
await withTaskCancellationHandler { [unregister] in
unregister()
} operation: {
await withTaskCancellationHandler {
await self.iterator.next()
} onCancel: { [unregister] in
unregister()
}
}
}
Expand Down
34 changes: 15 additions & 19 deletions Sources/AsyncSubjects/AsyncThrowingCurrentValueSubject.swift
Original file line number Diff line number Diff line change
Expand Up @@ -67,32 +67,28 @@ public final class AsyncThrowingCurrentValueSubject<Element, Failure: Error>: As
/// Sends a value to all consumers
/// - Parameter element: the value to send
public func send(_ element: Element) {
let channels = self.state.withCriticalRegion { state -> [AsyncThrowingBufferedChannel<Element, Error>] in
self.state.withCriticalRegion { state in
state.current = element
return Array(state.channels.values)
}

for channel in channels {
channel.send(element)
for channel in state.channels.values {
channel.send(element)
}
}
}

/// Finishes the subject with either a normal ending or an error.
/// - Parameter termination: The termination to finish the subject.
public func send(_ termination: Termination<Failure>) {
let channels = self.state.withCriticalRegion { state -> [AsyncThrowingBufferedChannel<Element, Error>] in
self.state.withCriticalRegion { state in
state.terminalState = termination
let channels = Array(state.channels.values)
state.channels.removeAll()
return channels
}

for channel in channels {
switch termination {
case .finished:
channel.finish()
case .failure(let error):
channel.fail(error)
for channel in channels {
switch termination {
case .finished:
channel.finish()
case .failure(let error):
channel.fail(error)
}
}
}
}
Expand Down Expand Up @@ -149,10 +145,10 @@ public final class AsyncThrowingCurrentValueSubject<Element, Failure: Error>: As
}

public mutating func next() async throws -> Element? {
try await withTaskCancellationHandler { [unregister] in
unregister()
} operation: {
try await withTaskCancellationHandler {
try await self.iterator.next()
} onCancel: { [unregister] in
unregister()
}
}
}
Expand Down
33 changes: 15 additions & 18 deletions Sources/AsyncSubjects/AsyncThrowingPassthroughSubject.swift
Original file line number Diff line number Diff line change
Expand Up @@ -53,31 +53,28 @@ public final class AsyncThrowingPassthroughSubject<Element, Failure: Error>: Asy
/// Sends a value to all consumers
/// - Parameter element: the value to send
public func send(_ element: Element) {
let channels = self.state.withCriticalRegion { state in
state.channels.values
}

for channel in channels {
channel.send(element)
self.state.withCriticalRegion { state in
for channel in state.channels.values {
channel.send(element)
}
}
}

/// Finishes the subject with either a normal ending or an error.
/// - Parameter termination: The termination to finish the subject
public func send(_ termination: Termination<Failure>) {
let channels = self.state.withCriticalRegion { state -> [AsyncThrowingBufferedChannel<Element, Error>] in
self.state.withCriticalRegion { state in
state.terminalState = termination
let channels = Array(state.channels.values)
state.channels.removeAll()
return channels
}

for channel in channels {
switch termination {
case .finished:
channel.finish()
case .failure(let error):
channel.fail(error)
for channel in channels {
switch termination {
case .finished:
channel.finish()
case .failure(let error):
channel.fail(error)
}
}
}
}
Expand Down Expand Up @@ -132,10 +129,10 @@ public final class AsyncThrowingPassthroughSubject<Element, Failure: Error>: Asy
}

public mutating func next() async throws -> Element? {
try await withTaskCancellationHandler { [unregister] in
unregister()
} operation: {
try await withTaskCancellationHandler {
try await self.iterator.next()
} onCancel: { [unregister] in
unregister()
}
}
}
Expand Down
34 changes: 15 additions & 19 deletions Sources/AsyncSubjects/AsyncThrowingReplaySubject.swift
Original file line number Diff line number Diff line change
Expand Up @@ -45,37 +45,33 @@ public final class AsyncThrowingReplaySubject<Element, Failure: Error>: AsyncSub
/// Sends a value to all consumers
/// - Parameter element: the value to send
public func send(_ element: Element) {
let channels = self.state.withCriticalRegion { state -> [AsyncThrowingBufferedChannel<Element, Error>] in
self.state.withCriticalRegion { state in
if state.buffer.count >= state.bufferSize && !state.buffer.isEmpty {
state.buffer.removeFirst()
}
state.buffer.append(element)
return Array(state.channels.values)
}

for channel in channels {
channel.send(element)
for channel in state.channels.values {
channel.send(element)
}
}
}

/// Finishes the subject with either a normal ending or an error.
/// - Parameter termination: The termination to finish the subject
public func send(_ termination: Termination<Failure>) {
let channels = self.state.withCriticalRegion { state -> [AsyncThrowingBufferedChannel<Element, Error>] in
self.state.withCriticalRegion { state in
state.terminalState = termination
let channels = Array(state.channels.values)
state.channels.removeAll()
state.buffer.removeAll()
state.bufferSize = 0
return channels
}

for channel in channels {
switch termination {
case .finished:
channel.finish()
case .failure(let error):
channel.fail(error)
for channel in channels {
switch termination {
case .finished:
channel.finish()
case .failure(let error):
channel.fail(error)
}
}
}
}
Expand Down Expand Up @@ -134,10 +130,10 @@ public final class AsyncThrowingReplaySubject<Element, Failure: Error>: AsyncSub
}

public mutating func next() async throws -> Element? {
try await withTaskCancellationHandler { [unregister] in
unregister()
} operation: {
try await withTaskCancellationHandler {
try await self.iterator.next()
} onCancel: { [unregister] in
unregister()
}
}
}
Expand Down
Loading