Skip to content

Commit

Permalink
Open-horizon#3806 - Bug: mms object delivered to all nodes instead of…
Browse files Browse the repository at this point in the history
… only nodes specified in destinationsList

Signed-off-by: Le Zhang <zhangl@us.ibm.com>
  • Loading branch information
LiilyZhang committed Jun 19, 2023
1 parent 1b4c598 commit 8d37d51
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 17 deletions.
4 changes: 2 additions & 2 deletions core/storage/boltStorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -712,8 +712,8 @@ func (store *BoltStorage) RetrieveObjects(orgID string, destType string, destID

function := func(object boltObject) (*boltObject, common.SyncServiceError) {
if object.Meta.DestinationPolicy == nil && orgID == object.Meta.DestOrgID &&
(object.Meta.DestType == "" || object.Meta.DestType == destType) &&
(object.Meta.DestID == "" || object.Meta.DestID == destID) {
(((object.Meta.DestType == "" && len(object.Meta.DestinationsList) == 0) || object.Meta.DestType == destType) && (object.Meta.DestID == "" || object.Meta.DestID == destID) ||
common.StringListContains(object.Meta.DestinationsList, fmt.Sprintf("%s:%s", destType, destID))) {
status := common.Pending
if object.Status == common.ReadyToSend && !object.Meta.Inactive {
status = common.Delivering
Expand Down
27 changes: 12 additions & 15 deletions core/storage/mongoStorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -900,8 +900,8 @@ OUTER:
if r.MetaData.DestinationPolicy != nil {
continue
}
if (r.MetaData.DestType == "" || r.MetaData.DestType == destType) &&
(r.MetaData.DestID == "" || r.MetaData.DestID == destID) {
if (((r.MetaData.DestType == "" && len(r.MetaData.DestinationsList) == 0) || r.MetaData.DestType == destType) && (r.MetaData.DestID == "" || r.MetaData.DestID == destID)) ||
common.StringListContains(r.MetaData.DestinationsList, fmt.Sprintf("%s:%s", destType, destID)) {
status := common.Pending
if r.Status == common.ReadyToSend && !r.MetaData.Inactive {
status = common.Delivering
Expand Down Expand Up @@ -1134,7 +1134,7 @@ func (store *MongoStorage) RemoveObjectTempData(orgID string, objectType string,
var offset int64 = 0
chunkNumber := 1

// Will be 0 if data was uploaded with streaming
// Will be 0 if data was uploaded with streaming
if metaData.UploadChunkSize > 0 {

for offset < metaData.ObjectSize {
Expand All @@ -1143,13 +1143,13 @@ func (store *MongoStorage) RemoveObjectTempData(orgID string, objectType string,
if trace.IsLogging(logger.TRACE) {
trace.Trace(fmt.Sprintf("RemoveObjectTempData for org - %s, type - %s, id - %s, chunkNum - %d", orgID, objectType, objectID, chunkNumber))
}
fileHandle,_ := store.retrieveObjectTempData(id)
fileHandle, _ := store.retrieveObjectTempData(id)

if fileHandle != nil {
store.CloseDataReader(fileHandle.file)
store.deleteFileHandle(id)

//Don't return on errors
//Don't return on errors
store.removeFile(id)
}

Expand All @@ -1158,7 +1158,6 @@ func (store *MongoStorage) RemoveObjectTempData(orgID string, objectType string,
}
}


return nil
}

Expand Down Expand Up @@ -1188,7 +1187,7 @@ func (store *MongoStorage) RetrieveObjectTempData(orgID string, objectType strin
}
fileHandle, err := store.retrieveObjectTempData(id)
if err != nil {
return nil, &Error{fmt.Sprintf("Error in retrieving objects chunk data. Error: %s.\n", err)}
return nil, &Error{fmt.Sprintf("Error in retrieving objects chunk data. Error: %s.\n", err)}
}

if fileHandle != nil {
Expand Down Expand Up @@ -1226,7 +1225,6 @@ func (store *MongoStorage) retrieveObjectTempData(id string) (*fileHandle, commo
func (store *MongoStorage) AppendObjectData(orgID string, objectType string, objectID string, dataReader io.Reader,
dataLength uint32, offset int64, total int64, isFirstChunk bool, isLastChunk bool, isTempData bool) (bool, common.SyncServiceError) {


var n int
var err error
var data []byte
Expand Down Expand Up @@ -1259,7 +1257,7 @@ func (store *MongoStorage) AppendObjectData(orgID string, objectType string, obj

// Figure out which chunk this is by looking at offset + length of data
var chunkNumber int
if ( offset + n_int64 ) < total {
if (offset + n_int64) < total {
chunkNumber = (int(offset) + n) / n
} else {
updatedLastChunk = true
Expand All @@ -1276,8 +1274,8 @@ func (store *MongoStorage) AppendObjectData(orgID string, objectType string, obj
return isLastChunk, &Error{fmt.Sprintf("Failed to read the upload chunk size. Error: %s.", err)}
}

chunkNumber = (int)(total / chunkSize )
if total % chunkSize != 0 {
chunkNumber = (int)(total / chunkSize)
if total%chunkSize != 0 {
chunkNumber += 1
}
}
Expand Down Expand Up @@ -1335,12 +1333,11 @@ func (store *MongoStorage) AppendObjectData(orgID string, objectType string, obj
return updatedLastChunk, nil
}


// Handles storing the upload data chunk size and the total size
func (store *MongoStorage) setUploadDataInfo(orgID string, objectType string, objectID string, chunkSize int64, totalSize int64) common.SyncServiceError {
id := createObjectCollectionID(orgID, objectType, objectID)
if err := store.update(objects, bson.M{"_id": id}, bson.M{ "$set": bson.M{"metadata.upload-chunk-size": chunkSize, "metadata.object-size": totalSize}}); err != nil {
return &Error{fmt.Sprintf("Failed to set uploadDataChunkSize. Error: %s.", err)}
if err := store.update(objects, bson.M{"_id": id}, bson.M{"$set": bson.M{"metadata.upload-chunk-size": chunkSize, "metadata.object-size": totalSize}}); err != nil {
return &Error{fmt.Sprintf("Failed to set uploadDataChunkSize. Error: %s.", err)}
}
return nil
}
Expand Down Expand Up @@ -1462,7 +1459,7 @@ func (store *MongoStorage) DeleteStoredData(orgID string, objectType string, obj
var id string
if isTempData {
// Make sure we have all the temp data by calling RetrieveObjectTempData here
_,err := store.RetrieveObjectTempData(orgID, objectType, objectID)
_, err := store.RetrieveObjectTempData(orgID, objectType, objectID)
if err == nil {
return store.RemoveObjectTempData(orgID, objectType, objectID)
} else {
Expand Down

0 comments on commit 8d37d51

Please sign in to comment.