Skip to content

Commit

Permalink
fix: do not reuse consumer tag so we can record multiple queues at once
Browse files Browse the repository at this point in the history
  • Loading branch information
ThisDevDane committed Jun 5, 2023
1 parent 799c710 commit e1b9c07
Showing 1 changed file with 15 additions and 3 deletions.
18 changes: 15 additions & 3 deletions cmd/record.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,10 @@ func (rd *RecordingDevice) SetupQueues(queuesToRecord []string) ([]QueueData, er
}
}

if len(qs) <= 0 {
return nil, fmt.Errorf("no queues found for requested queues: '%v'", queuesToRecord)
}

for _, queue := range qs {
bindings, _ := rd.ListQueueBindings(vhost, queue.Name)
if len(bindings) <= 1 {
Expand Down Expand Up @@ -143,16 +147,24 @@ func (rd *RecordingDevice) BindQueuesAndRecord(data []QueueData) {
continue
}

go func(b rabbithole.BindingInfo) {
go func(qd QueueData, b rabbithole.BindingInfo) {
qd.Channel.QueueBind(qd.Queue.Name, b.RoutingKey, b.Source, false, nil)

msgs, _ := qd.Channel.Consume(qd.Queue.Name, "msgreplay", true, true, false, false, nil)
consumerTag := fmt.Sprintf("msgreplay-%s", qd.Queue.Name)
msgs, err := qd.Channel.Consume(qd.Queue.Name, consumerTag, true, true, false, false, nil)
if err != nil {
log.Err(err).
Str("queue", qd.Queue.Name).
Str("constumer_tag", consumerTag).
Msg("Error when starting to consume queue")
}

for d := range msgs {
log.Trace().Msg("message consumed")
rm := recording.DeliveryToRecordedMessage(d)
recordingFile.RecordMessage(rm)
}
}(b)
}(qd, b)
}
}

Expand Down

0 comments on commit e1b9c07

Please sign in to comment.