Skip to content

Commit

Permalink
Don't export Packet and Forward
Browse files Browse the repository at this point in the history
  • Loading branch information
ineiti committed Aug 14, 2024
1 parent 1dc65df commit 16eadf9
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 45 deletions.
30 changes: 15 additions & 15 deletions mino/minows/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ import (
const pathCall = "/call"
const pathStream = "/stream"

// Packet encapsulates a message sent over the network streams.
type Packet struct {
// packet encapsulates a message sent over the network streams.
type packet struct {
Source []byte
Payload []byte
}
Expand Down Expand Up @@ -278,7 +278,7 @@ func (r rpc) send(enc *gob.Encoder, msg serde.Message) error {
payload = bytes
}

err := enc.Encode(&Packet{Source: from, Payload: payload})
err := enc.Encode(&packet{Source: from, Payload: payload})
if errors.Is(err, network.ErrReset) {
return err
}
Expand All @@ -289,25 +289,25 @@ func (r rpc) send(enc *gob.Encoder, msg serde.Message) error {
}

func (r rpc) receive(dec *gob.Decoder) (ma.Multiaddr, serde.Message, error) {
var packet Packet
err := dec.Decode(&packet)
var pkt packet
err := dec.Decode(&pkt)
if errors.Is(err, network.ErrReset) {
return nil, nil, err
}
if err != nil {
return nil, nil, xerrors.Errorf("could not decode packet: %v", err)
}

from, err := ma.NewMultiaddrBytes(packet.Source)
from, err := ma.NewMultiaddrBytes(pkt.Source)
if err != nil {
return nil, nil, xerrors.Errorf("could not unmarshal address: %v",
packet.Source)
pkt.Source)
}

if packet.Payload == nil {
if pkt.Payload == nil {
return from, nil, nil
}
msg, err := r.factory.Deserialize(r.context, packet.Payload)
msg, err := r.factory.Deserialize(r.context, pkt.Payload)
if err != nil {
return from, nil, xerrors.Errorf(
"could not deserialize message: %v",
Expand Down Expand Up @@ -335,7 +335,7 @@ func (r rpc) createOrchestrator(ctx context.Context,
myAddr: myAddr,
rpc: r,
outs: encoders,
in: make(chan Packet, MaxUnreadAllowed),
in: make(chan packet, MaxUnreadAllowed),
}

var wg sync.WaitGroup
Expand All @@ -345,7 +345,7 @@ func (r rpc) createOrchestrator(ctx context.Context,
defer wg.Done()
decoder := gob.NewDecoder(stream)
for {
packet, err := o.listen(decoder)
pkt, err := o.listen(decoder)
if err != nil {
if strings.Contains(err.Error(), network.ErrReset.Error()) {
return
Expand All @@ -356,7 +356,7 @@ func (r rpc) createOrchestrator(ctx context.Context,
select {
case <-ctx.Done():
return
case o.in <- packet:
case o.in <- pkt:
}
}
}(stream)
Expand All @@ -379,7 +379,7 @@ func (r rpc) createParticipant(stream network.Stream) participant {
myAddr: r.mino.myAddr,
rpc: r,
out: encoder,
in: make(chan Packet),
in: make(chan packet),
}

done := make(chan any)
Expand All @@ -401,7 +401,7 @@ func (r rpc) createParticipant(stream network.Stream) participant {

go func() {
for {
packet, err := p.listen(decoder)
pkt, err := p.listen(decoder)
if err != nil {
if strings.Contains(err.Error(), network.ErrReset.Error()) {
close(p.in)
Expand All @@ -413,7 +413,7 @@ func (r rpc) createParticipant(stream network.Stream) participant {
select {
case <-done:
return
case p.in <- packet:
case p.in <- pkt:
}
}
}()
Expand Down
60 changes: 30 additions & 30 deletions mino/minows/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ var ErrNotPlayer = xerrors.New("not player")
// in orchestrator's incoming message buffer before pausing relaying
const MaxUnreadAllowed = 1e3

type Forward struct {
Packet
type forward struct {
Packet packet
Destination []byte
}

Expand All @@ -31,7 +31,7 @@ type orchestrator struct {
rpc rpc
// Connects to the participants
outs map[peer.ID]*gob.Encoder
in chan Packet
in chan packet
}

func (o orchestrator) Send(msg serde.Message, addrs ...mino.Address) <-chan error {
Expand Down Expand Up @@ -67,31 +67,31 @@ func (o orchestrator) send(addr mino.Address, msg serde.Message) error {
return xerrors.Errorf("could not serialize message: %v", err)
}

err = encoder.Encode(&Packet{Source: src, Payload: payload})
err = encoder.Encode(&packet{Source: src, Payload: payload})
if err != nil {
return xerrors.Errorf("could not encode packet: %v", err)
}
return nil
}

func (o orchestrator) fetch(decoder *gob.Decoder) (Packet, mino.Address,
func (o orchestrator) fetch(decoder *gob.Decoder) (packet, mino.Address,
error) {
var forward Forward
err := decoder.Decode(&forward)
var fw forward
err := decoder.Decode(&fw)
if err != nil {
return Packet{}, nil,
return packet{}, nil,
xerrors.Errorf("could not decode packet: %v", err)
}
dest := o.rpc.mino.GetAddressFactory().
FromText(forward.Destination)
FromText(fw.Destination)
if dest == nil {
return Packet{}, nil,
return packet{}, nil,
xerrors.New("could not unmarshal address")
}
return forward.Packet, dest, nil
return fw.Packet, dest, nil
}

func (o orchestrator) relay(packet Packet, dest address) error {
func (o orchestrator) relay(packet packet, dest address) error {
encoder, ok := o.outs[dest.identity]
if !ok {
return xerrors.Errorf("%v: %v", ErrNotPlayer, dest)
Expand All @@ -104,21 +104,21 @@ func (o orchestrator) relay(packet Packet, dest address) error {
return nil
}

func (o orchestrator) listen(decoder *gob.Decoder) (Packet, error) {
func (o orchestrator) listen(decoder *gob.Decoder) (packet, error) {
for {
packet, dest, err := o.fetch(decoder)
pkt, dest, err := o.fetch(decoder)
if err != nil {
return Packet{}, xerrors.Errorf("could not receive: %v", err)
return packet{}, xerrors.Errorf("could not receive: %v", err)
}
switch to := dest.(type) {
case orchestratorAddr:
if o.myAddr.Equal(to) {
return packet, nil
return pkt, nil
}
case address:
err := o.relay(packet, to)
err := o.relay(pkt, to)
if err != nil {
return Packet{}, xerrors.Errorf("could not relay: %v", err)
return packet{}, xerrors.Errorf("could not relay: %v", err)
}
}
}
Expand All @@ -131,7 +131,7 @@ type participant struct {
rpc rpc
// Connects to the orchestrator
out *gob.Encoder
in chan Packet
in chan packet
}

func (p participant) Send(msg serde.Message, addrs ...mino.Address) <-chan error {
Expand Down Expand Up @@ -165,24 +165,24 @@ func (p participant) send(addr mino.Address, msg serde.Message) error {
}

// Send to orchestrator to relay to the destination participant
forward := Forward{
Packet: Packet{Source: src, Payload: payload},
fw := forward{
Packet: packet{Source: src, Payload: payload},
Destination: dest,
}
err = p.out.Encode(&forward)
err = p.out.Encode(&fw)
if err != nil {
return xerrors.Errorf("could not encode packet: %v", err)
}
return nil
}

func (p participant) listen(decoder *gob.Decoder) (Packet, error) {
var packet Packet
err := decoder.Decode(&packet)
func (p participant) listen(decoder *gob.Decoder) (packet, error) {
var pkt packet
err := decoder.Decode(&pkt)
if err != nil {
return Packet{}, xerrors.Errorf("could not decode packet: %v", err)
return packet{}, xerrors.Errorf("could not decode packet: %v", err)
}
return packet, nil
return pkt, nil
}

type sendFn func(addr mino.Address, msg serde.Message) error
Expand Down Expand Up @@ -212,11 +212,11 @@ func doSend(addrs []mino.Address, msg serde.Message, send sendFn,
return errs
}

type unpackFn func(packet Packet) (mino.Address, serde.Message, error)
type unpackFn func(packet packet) (mino.Address, serde.Message, error)

func unpacker(af mino.AddressFactory, f serde.Factory,
c serde.Context) unpackFn {
return func(packet Packet) (mino.Address, serde.Message, error) {
return func(packet packet) (mino.Address, serde.Message, error) {
src := af.FromText(packet.Source)
if src == nil {
return nil, nil, xerrors.New("could not unmarshal address")
Expand All @@ -229,7 +229,7 @@ func unpacker(af mino.AddressFactory, f serde.Factory,
}
}

func doReceive(ctx context.Context, in chan Packet,
func doReceive(ctx context.Context, in chan packet,
unpack unpackFn, logger zerolog.Logger) (mino.Address, serde.Message, error) {
select {
case <-ctx.Done():
Expand Down

0 comments on commit 16eadf9

Please sign in to comment.