Skip to content

Commit

Permalink
Merge pull request #3961 from carlaKC/3709-cooperativecloseinitiator
Browse files Browse the repository at this point in the history
channeldb: Indicate cooperative close initator
  • Loading branch information
halseth committed Feb 21, 2020
2 parents 94717a2 + b3e6395 commit 38b521d
Show file tree
Hide file tree
Showing 19 changed files with 1,455 additions and 736 deletions.
12 changes: 9 additions & 3 deletions chancloser.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,14 +155,17 @@ type channelCloser struct {
// remoteDeliveryScript is the script that we'll send the remote
// party's settled channel funds to.
remoteDeliveryScript []byte

// locallyInitiated is true if we initiated the channel close.
locallyInitiated bool
}

// newChannelCloser creates a new instance of the channel closure given the
// passed configuration, and delivery+fee preference. The final argument should
// only be populated iff, we're the initiator of this closing request.
func newChannelCloser(cfg chanCloseCfg, deliveryScript []byte,
idealFeePerKw chainfee.SatPerKWeight, negotiationHeight uint32,
closeReq *htlcswitch.ChanClose) *channelCloser {
closeReq *htlcswitch.ChanClose, locallyInitiated bool) *channelCloser {

// Given the target fee-per-kw, we'll compute what our ideal _total_
// fee will be starting at for this fee negotiation.
Expand Down Expand Up @@ -198,6 +201,7 @@ func newChannelCloser(cfg chanCloseCfg, deliveryScript []byte,
idealFeeSat: idealFeeSat,
localDeliveryScript: deliveryScript,
priorFeeOffers: make(map[btcutil.Amount]*lnwire.ClosingSigned),
locallyInitiated: locallyInitiated,
}
}

Expand All @@ -224,7 +228,7 @@ func (c *channelCloser) initChanShutdown() (*lnwire.Shutdown, error) {
// guarantees that our listchannels rpc will be externally consistent,
// and reflect that the channel is being shutdown by the time the
// closing request returns.
err := c.cfg.channel.MarkCoopBroadcasted(nil)
err := c.cfg.channel.MarkCoopBroadcasted(nil, c.locallyInitiated)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -511,7 +515,9 @@ func (c *channelCloser) ProcessCloseMsg(msg lnwire.Message) ([]lnwire.Message, b
// Before publishing the closing tx, we persist it to the
// database, such that it can be republished if something goes
// wrong.
err = c.cfg.channel.MarkCoopBroadcasted(closeTx)
err = c.cfg.channel.MarkCoopBroadcasted(
closeTx, c.locallyInitiated,
)
if err != nil {
return nil, false, err
}
Expand Down
76 changes: 59 additions & 17 deletions channeldb/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -401,20 +401,35 @@ var (
// will have.
ChanStatusRestored ChannelStatus = 1 << 3

// ChanStatusCoopBroadcasted indicates that a cooperative close for this
// channel has been broadcasted.
// ChanStatusCoopBroadcasted indicates that a cooperative close for
// this channel has been broadcasted. Older cooperatively closed
// channels will only have this status set. Newer ones will also have
// close initiator information stored using the local/remote initiator
// status. This status is set in conjunction with the initiator status
// so that we do not need to check multiple channel statues for
// cooperative closes.
ChanStatusCoopBroadcasted ChannelStatus = 1 << 4

// ChanStatusLocalCloseInitiator indicates that we initiated closing
// the channel.
ChanStatusLocalCloseInitiator ChannelStatus = 1 << 5

// ChanStatusRemoteCloseInitiator indicates that the remote node
// initiated closing the channel.
ChanStatusRemoteCloseInitiator ChannelStatus = 1 << 6
)

// chanStatusStrings maps a ChannelStatus to a human friendly string that
// describes that status.
var chanStatusStrings = map[ChannelStatus]string{
ChanStatusDefault: "ChanStatusDefault",
ChanStatusBorked: "ChanStatusBorked",
ChanStatusCommitBroadcasted: "ChanStatusCommitBroadcasted",
ChanStatusLocalDataLoss: "ChanStatusLocalDataLoss",
ChanStatusRestored: "ChanStatusRestored",
ChanStatusCoopBroadcasted: "ChanStatusCoopBroadcasted",
ChanStatusDefault: "ChanStatusDefault",
ChanStatusBorked: "ChanStatusBorked",
ChanStatusCommitBroadcasted: "ChanStatusCommitBroadcasted",
ChanStatusLocalDataLoss: "ChanStatusLocalDataLoss",
ChanStatusRestored: "ChanStatusRestored",
ChanStatusCoopBroadcasted: "ChanStatusCoopBroadcasted",
ChanStatusLocalCloseInitiator: "ChanStatusLocalCloseInitiator",
ChanStatusRemoteCloseInitiator: "ChanStatusRemoteCloseInitiator",
}

// orderedChanStatusFlags is an in-order list of all that channel status flags.
Expand All @@ -425,6 +440,8 @@ var orderedChanStatusFlags = []ChannelStatus{
ChanStatusLocalDataLoss,
ChanStatusRestored,
ChanStatusCoopBroadcasted,
ChanStatusLocalCloseInitiator,
ChanStatusRemoteCloseInitiator,
}

// String returns a human-readable representation of the ChannelStatus.
Expand Down Expand Up @@ -974,30 +991,37 @@ func (c *OpenChannel) isBorked(chanBucket *bbolt.Bucket) (bool, error) {
// closing tx _we believe_ will appear in the chain. This is only used to
// republish this tx at startup to ensure propagation, and we should still
// handle the case where a different tx actually hits the chain.
func (c *OpenChannel) MarkCommitmentBroadcasted(closeTx *wire.MsgTx) error {
func (c *OpenChannel) MarkCommitmentBroadcasted(closeTx *wire.MsgTx,
locallyInitiated bool) error {

return c.markBroadcasted(
ChanStatusCommitBroadcasted, forceCloseTxKey, closeTx,
locallyInitiated,
)
}

// MarkCoopBroadcasted marks the channel to indicate that a cooperative close
// transaction has been broadcast, either our own or the remote, and that we
// should wach the chain for it to confirm before taking further action. It
// should watch the chain for it to confirm before taking further action. It
// takes as argument a cooperative close tx that could appear on chain, and
// should be rebroadcast upon startup. This is only used to republish and ensure
// propagation, and we should still handle the case where a different tx
// should be rebroadcast upon startup. This is only used to republish and
// ensure propagation, and we should still handle the case where a different tx
// actually hits the chain.
func (c *OpenChannel) MarkCoopBroadcasted(closeTx *wire.MsgTx) error {
func (c *OpenChannel) MarkCoopBroadcasted(closeTx *wire.MsgTx,
locallyInitiated bool) error {

return c.markBroadcasted(
ChanStatusCoopBroadcasted, coopCloseTxKey, closeTx,
locallyInitiated,
)
}

// markBroadcasted is a helper function which modifies the channel status of the
// receiving channel and inserts a close transaction under the requested key,
// which should specify either a coop or force close.
// which should specify either a coop or force close. It adds a status which
// indicates the party that initiated the channel close.
func (c *OpenChannel) markBroadcasted(status ChannelStatus, key []byte,
closeTx *wire.MsgTx) error {
closeTx *wire.MsgTx, locallyInitiated bool) error {

c.Lock()
defer c.Unlock()
Expand All @@ -1016,6 +1040,15 @@ func (c *OpenChannel) markBroadcasted(status ChannelStatus, key []byte,
}
}

// Add the initiator status to the status provided. These statuses are
// set in addition to the broadcast status so that we do not need to
// migrate the original logic which does not store initiator.
if locallyInitiated {
status |= ChanStatusLocalCloseInitiator
} else {
status |= ChanStatusRemoteCloseInitiator
}

return c.putChanStatus(status, putClosingTx)
}

Expand Down Expand Up @@ -2349,8 +2382,12 @@ type ChannelCloseSummary struct {
// entails deleting all saved state within the database concerning this
// channel. This method also takes a struct that summarizes the state of the
// channel at closing, this compact representation will be the only component
// of a channel left over after a full closing.
func (c *OpenChannel) CloseChannel(summary *ChannelCloseSummary) error {
// of a channel left over after a full closing. It takes an optional set of
// channel statuses which will be written to the historical channel bucket.
// These statuses are used to record close initiators.
func (c *OpenChannel) CloseChannel(summary *ChannelCloseSummary,
statuses ...ChannelStatus) error {

c.Lock()
defer c.Unlock()

Expand Down Expand Up @@ -2428,6 +2465,11 @@ func (c *OpenChannel) CloseChannel(summary *ChannelCloseSummary) error {
return err
}

// Apply any additional statuses to the channel state.
for _, status := range statuses {
chanState.chanStatus |= status
}

err = putOpenChannel(historicalChanBucket, chanState)
if err != nil {
return err
Expand Down
144 changes: 141 additions & 3 deletions channeldb/channel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1089,13 +1089,13 @@ func TestFetchWaitingCloseChannels(t *testing.T) {
},
)

if err := channel.MarkCommitmentBroadcasted(closeTx); err != nil {
if err := channel.MarkCommitmentBroadcasted(closeTx, true); err != nil {
t.Fatalf("unable to mark commitment broadcast: %v", err)
}

// Now try to marking a coop close with a nil tx. This should
// succeed, but it shouldn't exit when queried.
if err = channel.MarkCoopBroadcasted(nil); err != nil {
if err = channel.MarkCoopBroadcasted(nil, true); err != nil {
t.Fatalf("unable to mark nil coop broadcast: %v", err)
}
_, err := channel.BroadcastedCooperative()
Expand All @@ -1107,7 +1107,7 @@ func TestFetchWaitingCloseChannels(t *testing.T) {
// it as coop closed. Later we will test that distinct
// transactions are returned for both coop and force closes.
closeTx.TxIn[0].PreviousOutPoint.Index ^= 1
if err := channel.MarkCoopBroadcasted(closeTx); err != nil {
if err := channel.MarkCoopBroadcasted(closeTx, true); err != nil {
t.Fatalf("unable to mark coop broadcast: %v", err)
}
}
Expand Down Expand Up @@ -1255,3 +1255,141 @@ func TestRefreshShortChanID(t *testing.T) {
t.Fatalf("channel pending state wasn't updated: want false got true")
}
}

// TestCloseInitiator tests the setting of close initiator statuses for
// cooperative closes and local force closes.
func TestCloseInitiator(t *testing.T) {
tests := []struct {
name string
// updateChannel is called to update the channel as broadcast,
// cooperatively or not, based on the test's requirements.
updateChannel func(c *OpenChannel) error
expectedStatuses []ChannelStatus
}{
{
name: "local coop close",
// Mark the channel as cooperatively closed, initiated
// by the local party.
updateChannel: func(c *OpenChannel) error {
return c.MarkCoopBroadcasted(
&wire.MsgTx{}, true,
)
},
expectedStatuses: []ChannelStatus{
ChanStatusLocalCloseInitiator,
ChanStatusCoopBroadcasted,
},
},
{
name: "remote coop close",
// Mark the channel as cooperatively closed, initiated
// by the remote party.
updateChannel: func(c *OpenChannel) error {
return c.MarkCoopBroadcasted(
&wire.MsgTx{}, false,
)
},
expectedStatuses: []ChannelStatus{
ChanStatusRemoteCloseInitiator,
ChanStatusCoopBroadcasted,
},
},
{
name: "local force close",
// Mark the channel's commitment as broadcast with
// local initiator.
updateChannel: func(c *OpenChannel) error {
return c.MarkCommitmentBroadcasted(
&wire.MsgTx{}, true,
)
},
expectedStatuses: []ChannelStatus{
ChanStatusLocalCloseInitiator,
ChanStatusCommitBroadcasted,
},
},
}

for _, test := range tests {
test := test

t.Run(test.name, func(t *testing.T) {
t.Parallel()

cdb, cleanUp, err := makeTestDB()
if err != nil {
t.Fatalf("unable to make test database: %v",
err)
}
defer cleanUp()

// Create an open channel.
channel := createTestChannel(
t, cdb, openChannelOption(),
)

err = test.updateChannel(channel)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}

// Lookup open channels in the database.
dbChans, err := fetchChannels(
cdb, pendingChannelFilter(false),
)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if len(dbChans) != 1 {
t.Fatalf("expected 1 channel, got: %v",
len(dbChans))
}

// Check that the statuses that we expect were written
// to disk.
for _, status := range test.expectedStatuses {
if !dbChans[0].HasChanStatus(status) {
t.Fatalf("expected channel to have "+
"status: %v, has status: %v",
status, dbChans[0].chanStatus)
}
}
})
}
}

// TestCloseChannelStatus tests setting of a channel status on the historical
// channel on channel close.
func TestCloseChannelStatus(t *testing.T) {
cdb, cleanUp, err := makeTestDB()
if err != nil {
t.Fatalf("unable to make test database: %v",
err)
}
defer cleanUp()

// Create an open channel.
channel := createTestChannel(
t, cdb, openChannelOption(),
)

if err := channel.CloseChannel(
&ChannelCloseSummary{
ChanPoint: channel.FundingOutpoint,
RemotePub: channel.IdentityPub,
}, ChanStatusRemoteCloseInitiator,
); err != nil {
t.Fatalf("unexpected error: %v", err)
}

histChan, err := channel.Db.FetchHistoricalChannel(
&channel.FundingOutpoint,
)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}

if !histChan.HasChanStatus(ChanStatusRemoteCloseInitiator) {
t.Fatalf("channel should have status")
}
}
Loading

0 comments on commit 38b521d

Please sign in to comment.