Skip to content

Commit

Permalink
adding clone channel helper
Browse files Browse the repository at this point in the history
  • Loading branch information
Mzack9999 committed Jul 3, 2023
1 parent 6ad94c0 commit 8c6e2f1
Show file tree
Hide file tree
Showing 2 changed files with 93 additions and 0 deletions.
34 changes: 34 additions & 0 deletions channel/channel.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package channel

// Clone makes n copies of a channel
// they should be used as
// gen := Clone(in, 2)
// consumer1 := <- gen // pop consumer 1
// consumer2 := <- gen // pop consumer 2
// each consumer can be used with for x := range consumerN { ... }
func Clone[T any](in chan T, n int) <-chan <-chan T {
ret := make(chan (<-chan T), n)
out := make([]chan T, n)
for i := 0; i < n; i++ {
out[i] = make(chan T, cap(in))
ret <- out[i]
}

go func() {
for {
msg, ok := <-in
if ok {
for _, ch := range out {
ch <- msg
}
} else {
for _, ch := range out {
close(ch)
}
return
}
}
}()

return ret
}
59 changes: 59 additions & 0 deletions channel/channel_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package channel

import (
"sync"
"testing"

"github.com/stretchr/testify/require"
)

func TestClone(t *testing.T) {

var wg sync.WaitGroup

// producer
data := make([]int, 100)
for i := 0; i < 100; i++ {
data[i] = i
}

prod := make(chan int)
wg.Add(1)
go func() {
defer wg.Done()
defer close(prod)

for _, i := range data {
prod <- i
}
}()

// 2 consumers
c := Clone(prod, 2)
wg.Add(1)
c1 := <-c
var cons1 []int
go func(c <-chan int) {
defer wg.Done()

for i := range c {
cons1 = append(cons1, i)
}
}(c1)

wg.Add(1)
c2 := <-c
var cons2 []int
go func(c <-chan int) {
defer wg.Done()

for i := range c {
cons2 = append(cons2, i)
}
}(c2)

wg.Wait()

require.ElementsMatch(t, data, cons1)
require.ElementsMatch(t, data, cons2)
}

0 comments on commit 8c6e2f1

Please sign in to comment.