diff --git a/README.md b/README.md index e3639d0..df3cbeb 100644 --- a/README.md +++ b/README.md @@ -67,7 +67,9 @@ The following is a list of the main features provided by the sync service: 1. `export GOPATH=$(pwd)` 2. `go get -d github.com/open-horizon/edge-sync-service` 3. `cd src/github.com/open-horizon/edge-sync-service` - 4. `./get_dependencies.sh` + 4. `go mod tidy` + 5. `go mod vendor` + 6. `./get_dependencies.sh` ### Build diff --git a/common/config.go b/common/config.go index 01e2f67..4e2a159 100644 --- a/common/config.go +++ b/common/config.go @@ -744,7 +744,7 @@ func SetDefaultConfig(config *Config) { config.EnableDataChunk = true config.MaxDataChunkSize = 5120 * 1024 config.MaxInflightChunks = 1 - config.MongoAddressCsv = "localhost:27017" + config.MongoAddressCsv = "mongodb://localhost:27017" config.MongoDbName = "d_edge" config.MongoAuthDbName = "admin" config.MongoUsername = "" diff --git a/core/storage/mongoStorage.go b/core/storage/mongoStorage.go index 3980a57..88e6838 100644 --- a/core/storage/mongoStorage.go +++ b/core/storage/mongoStorage.go @@ -1,42 +1,43 @@ package storage import ( + "bytes" + "context" "crypto/tls" "crypto/x509" "fmt" "io" "io/ioutil" - "net" "os" "strings" "time" - "github.com/globalsign/mgo" - "github.com/globalsign/mgo/bson" "github.com/open-horizon/edge-sync-service/common" "github.com/open-horizon/edge-utilities/logger" "github.com/open-horizon/edge-utilities/logger/log" "github.com/open-horizon/edge-utilities/logger/trace" + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/bson/primitive" + "go.mongodb.org/mongo-driver/mongo" + "go.mongodb.org/mongo-driver/mongo/gridfs" + "go.mongodb.org/mongo-driver/mongo/options" ) -type fileHandle struct { - file *mgo.GridFile - session *mgo.Session - offset int64 - chunks map[int64][]byte +type gridfsFile struct { + Id string `bson:"_id"` + Name string `bson:"filename"` + Length int64 `bson:"length"` } // MongoStorage is a MongoDB based store type MongoStorage struct { - session *mgo.Session - dialInfo *mgo.DialInfo - openFiles map[string]*fileHandle - connected bool - lockChannel chan int - mapLock chan int - sessionCache []*mgo.Session - cacheSize int - cacheIndex int + client *mongo.Client + clientConnectOpts *options.ClientOptions + database *mongo.Database + gridfsBucket *gridfs.Bucket + connected bool + lockChannel chan int + mapLock chan int } type object struct { @@ -47,13 +48,13 @@ type object struct { RemainingConsumers int `bson:"remaining-consumers"` RemainingReceivers int `bson:"remaining-receivers"` Destinations []common.StoreDestinationStatus `bson:"destinations"` - LastUpdate bson.MongoTimestamp `bson:"last-update"` + LastUpdate time.Time `bson:"last-update"` } type destinationObject struct { - ID string `bson:"_id"` - Destination common.Destination `bson:"destination"` - LastPingTime bson.MongoTimestamp `bson:"last-ping-time"` + ID string `bson:"_id"` + Destination common.Destination `bson:"destination"` + LastPingTime time.Time `bson:"last-ping-time"` } type notificationObject struct { @@ -62,11 +63,11 @@ type notificationObject struct { } type leaderDocument struct { - ID int32 `bson:"_id"` - UUID string `bson:"uuid"` - LastHeartbeatTS bson.MongoTimestamp `bson:"last-heartbeat-ts"` - HeartbeatTimeout int32 `bson:"heartbeat-timeout"` - Version int64 `bson:"version"` + ID int32 `bson:"_id"` + UUID string `bson:"uuid"` + LastHeartbeatTS time.Time `bson:"last-heartbeat-ts"` + HeartbeatTimeout int32 `bson:"heartbeat-timeout"` + Version int64 `bson:"version"` } type isMasterResult struct { @@ -76,9 +77,9 @@ type isMasterResult struct { } type messagingGroupObject struct { - ID string `bson:"_id"` - GroupName string `bson:"group-name"` - LastUpdate bson.MongoTimestamp `bson:"last-update"` + ID string `bson:"_id"` + GroupName string `bson:"group-name"` + LastUpdate time.Time `bson:"last-update"` } // This is almost the same type as common.StoredOrganization except for the timestamp type. @@ -86,30 +87,30 @@ type messagingGroupObject struct { type organizationObject struct { ID string `bson:"_id"` Organization common.Organization `bson:"org"` - LastUpdate bson.MongoTimestamp `bson:"last-update"` + LastUpdate time.Time `bson:"last-update"` } type webhookObject struct { - ID string `bson:"_id"` - Hooks []string `bson:"hooks"` - LastUpdate bson.MongoTimestamp `bson:"last-update"` + ID string `bson:"_id"` + Hooks []string `bson:"hooks"` + LastUpdate time.Time `bson:"last-update"` } type aclObject struct { - ID string `bson:"_id"` - Users []common.ACLentry `bson:"users"` - OrgID string `bson:"org-id"` - ACLType string `bson:"acl-type"` - LastUpdate bson.MongoTimestamp `bson:"last-update"` + ID string `bson:"_id"` + Users []common.ACLentry `bson:"users"` + OrgID string `bson:"org-id"` + ACLType string `bson:"acl-type"` + LastUpdate time.Time `bson:"last-update"` } type dataInfoObject struct { - ID string `bson:"_id"` - ChunkSize int32 `bson:"chunkSize"` - UploadDate bson.MongoTimestamp `bson:"uploadDate"` - Length int32 `bson:"length"` - MD5 string `bson:"md5"` - Filename string `bson:"filename"` + ID string `bson:"_id"` + ChunkSize int32 `bson:"chunkSize"` + UploadDate time.Time `bson:"uploadDate"` + Length int32 `bson:"length"` + MD5 string `bson:"md5"` + Filename string `bson:"filename"` } const maxUpdateTries = 5 @@ -123,15 +124,26 @@ func (store *MongoStorage) Init() common.SyncServiceError { store.mapLock = make(chan int, 1) store.mapLock <- 1 - store.dialInfo = &mgo.DialInfo{ - Addrs: strings.Split(common.Configuration.MongoAddressCsv, ","), - Source: common.Configuration.MongoAuthDbName, - Username: common.Configuration.MongoUsername, - Password: common.Configuration.MongoPassword, - Timeout: time.Duration(20 * time.Second), - ReadTimeout: time.Duration(60 * time.Second), - WriteTimeout: time.Duration(60 * time.Second), + /* + store.dialInfo = &mgo.DialInfo{ + Addrs: strings.Split(common.Configuration.MongoAddressCsv, ","), + Source: common.Configuration.MongoAuthDbName, + Username: common.Configuration.MongoUsername, + Password: common.Configuration.MongoPassword, + Timeout: time.Duration(20 * time.Second), + ReadTimeout: time.Duration(60 * time.Second), + WriteTimeout: time.Duration(60 * time.Second), + }*/ + + var mongoClient *mongo.Client + var err error + if trace.IsLogging(logger.INFO) { + trace.Info("CConnecting to mongo...") } + // Set up MongoDB client options + clientOptions := options.Client().ApplyURI(common.Configuration.MongoAddressCsv) + ctx, cancel := context.WithTimeout(context.Background(), time.Duration(20*time.Second)) + defer cancel() if common.Configuration.MongoUseSSL { tlsConfig := &tls.Config{} @@ -163,21 +175,18 @@ func (store *MongoStorage) Init() common.SyncServiceError { tlsConfig.InsecureSkipVerify = true } - store.dialInfo.DialServer = func(addr *mgo.ServerAddr) (net.Conn, error) { - return tls.Dial("tcp", addr.String(), tlsConfig) - } + // Sets TLS options in options instance + clientOptions.SetTLSConfig(tlsConfig) } - var session *mgo.Session - var err error - if trace.IsLogging(logger.INFO) { - trace.Info("Connecting to mongo...") - } for connectTime := 0; connectTime < common.Configuration.DatabaseConnectTimeout; connectTime += 10 { - session, err = mgo.DialWithInfo(store.dialInfo) - if err == nil { + trace.Info("connect to mongo...") + if mongoClient, err = mongo.Connect(ctx, clientOptions); err == nil { break + } else { + trace.Error("Error connecting to mongo. Error was: " + err.Error()) } + if connectTime == 0 && trace.IsLogging(logger.ERROR) { trace.Error("Failed to dial mgo. Error: " + err.Error()) } @@ -190,12 +199,17 @@ func (store *MongoStorage) Init() common.SyncServiceError { if connectTime == 0 && trace.IsLogging(logger.ERROR) { trace.Error("Retrying to connect to mongo") } + } - if session == nil { - message := fmt.Sprintf("Failed to dial mgo. Error: %s.", err) + if err = mongoClient.Ping(ctx, nil); err != nil { + message := fmt.Sprintf("Failed to ping mgo. Error: %s.", err) return &Error{message} } + if trace.IsLogging(logger.INFO) { + trace.Info("Connected to mongo...") + } + store.connected = true common.HealthStatus.ReconnectedToDatabase() if trace.IsLogging(logger.INFO) { @@ -205,58 +219,63 @@ func (store *MongoStorage) Init() common.SyncServiceError { log.Info("Connected to the database") } - session.SetSafe(&mgo.Safe{}) - //session.SetMode(mgo.Monotonic, true) - - db := session.DB(common.Configuration.MongoDbName) - db.C(destinations).EnsureIndexKey("destination.destination-org-id") - notificationsCollection := db.C(notifications) - notificationsCollection.EnsureIndexKey("notification.destination-org-id", "notification.destination-id", "notification.destination-type") - notificationsCollection.EnsureIndexKey("notification.resend-time", "notification.status") - objectsCollection := db.C(objects) - objectsCollection.EnsureIndexKey("metadata.destination-org-id") - err = objectsCollection.EnsureIndex( - mgo.Index{ - Key: []string{ - "metadata.destination-org-id", - "metadata.destination-policy.services.org-id", - "metadata.destination-policy.services.service-name", + db := mongoClient.Database(common.Configuration.MongoDbName) + destinationsCollection := db.Collection(destinations) + indexModel := mongo.IndexModel{Keys: bson.D{{"destination.destination-org-id", -1}}} + destinationsCollection.Indexes().CreateOne(context.TODO(), indexModel) + + notificationsCollection := db.Collection(notifications) + indexModel1 := mongo.IndexModel{ + Keys: bson.D{ + {"notification.destination-org-id", -1}, + {"notification.destination-id", -1}, + {"notification.destination-type", -1}, + }, + } + indexModel2 := mongo.IndexModel{Keys: bson.D{{"notification.resend-time", -1}, {"notification.status", -1}}} + notificationsCollection.Indexes().CreateMany(context.TODO(), []mongo.IndexModel{indexModel1, indexModel2}) + + objectsCollection := db.Collection(objects) + indexModel = mongo.IndexModel{Keys: bson.D{{"metadata.destination-org-id", -1}}} + objectsCollection.Indexes().CreateOne(context.TODO(), indexModel) + _, err = objectsCollection.Indexes().CreateOne( + context.TODO(), + mongo.IndexModel{ + Keys: bson.D{ + {"metadata.destination-org-id", -1}, + {"metadata.destination-policy.services.org-id", -1}, + {"metadata.destination-policy.services.service-name", -1}, }, - Name: "syncObjects-destination-policy.services.service-id", - Unique: false, - DropDups: false, - Background: false, - Sparse: true, + Options: options.Index().SetName("syncObjects-destination-policy.services.service-id").SetSparse(true), + // need to set index name??? }) if err != nil { log.Error("Failed to create an index on %s. Error: %s", objects, err) } - err = objectsCollection.EnsureIndex( - mgo.Index{ - Key: []string{ - "metadata.destination-org-id", - "metadata.destination-policy.timestamp", + + _, err = objectsCollection.Indexes().CreateOne( + context.TODO(), + mongo.IndexModel{ + Keys: bson.D{ + {"metadata.destination-org-id", -1}, + {"metadata.destination-policy.timestamp", -1}, }, - Unique: false, - DropDups: false, - Background: false, - Sparse: true, + Options: options.Index().SetSparse(true), }) if err != nil { log.Error("Failed to create an index on %s. Error: %s", objects, err) } - db.C(acls).EnsureIndexKey("org-id", "acl-type") - - store.session = session - store.cacheSize = common.Configuration.MongoSessionCacheSize - if store.cacheSize > 1 { - store.sessionCache = make([]*mgo.Session, store.cacheSize) - for i := 0; i < store.cacheSize; i++ { - store.sessionCache[i] = store.session.Copy() - } + db.Collection(acls).Indexes().CreateOne(context.TODO(), mongo.IndexModel{Keys: bson.D{{"org-id", 1}, {"acl-type", 1}}}) + gridfsBucket, err := gridfs.NewBucket(db) + if err != nil { + trace.Error("Error creating gridfs buket Error was: " + err.Error()) } - store.openFiles = make(map[string]*fileHandle) + store.client = mongoClient + store.database = db + store.gridfsBucket = gridfsBucket + + //store.openFiles = make(map[string]*fileHandle2) sleepInMS = common.Configuration.MongoSleepTimeBetweenRetry @@ -269,12 +288,11 @@ func (store *MongoStorage) Init() common.SyncServiceError { // Stop stops the MongoStorage store func (store *MongoStorage) Stop() { - if store.cacheSize > 1 { - for i := 0; i < store.cacheSize; i++ { - store.sessionCache[i].Close() - } + c := store.client + if c != nil { + c.Disconnect(context.TODO()) } - store.session.Close() + } // PerformMaintenance performs store's maintenance @@ -290,16 +308,17 @@ func (store *MongoStorage) Cleanup(isTest bool) common.SyncServiceError { // GetObjectsToActivate returns inactive objects that are ready to be activated func (store *MongoStorage) GetObjectsToActivate() ([]common.MetaData, common.SyncServiceError) { currentTime := time.Now().UTC().Format(time.RFC3339) - query := bson.M{"$or": []bson.M{ + query := bson.M{"$or": bson.A{ bson.M{"status": common.NotReadyToSend}, bson.M{"status": common.ReadyToSend}, bson.M{"status": common.Verifying}, bson.M{"status": common.VerificationFailed}}, "metadata.inactive": true, - "$and": []bson.M{ + "$and": bson.A{ bson.M{"metadata.activation-time": bson.M{"$ne": ""}}, bson.M{"metadata.activation-time": bson.M{"$lte": currentTime}}}} - selector := bson.M{"metadata": bson.ElementDocument} + + selector := bson.D{{"metadata", 1}} result := []object{} if err := store.fetchAll(objects, query, selector, &result); err != nil { return nil, err @@ -348,7 +367,7 @@ func (store *MongoStorage) StoreObject(metaData common.MetaData, data []byte, st existingObject := &object{} if err := store.fetchOne(objects, bson.M{"_id": id}, nil, existingObject); err != nil { - if err != mgo.ErrNotFound { + if err != mongo.ErrNoDocuments { return nil, &Error{fmt.Sprintf("Failed to retrieve object's status. Error: %s.", err)} } existingObject = nil @@ -372,10 +391,12 @@ func (store *MongoStorage) StoreObject(metaData common.MetaData, data []byte, st } } - newObject := object{ID: id, MetaData: metaData, Status: status, PolicyReceived: false, - RemainingConsumers: metaData.ExpectedConsumers, - RemainingReceivers: metaData.ExpectedConsumers, Destinations: dests} - if err := store.upsert(objects, bson.M{"_id": id, "metadata.destination-org-id": metaData.DestOrgID}, newObject); err != nil { + newObj := bson.M{"$set": bson.M{"_id": id, "metadata": metaData, "status": status, "policy-received": false, + "remaining-consumers": metaData.ExpectedConsumers, + "remaining-receivers": metaData.ExpectedConsumers, "destinations": dests, + "last-update": primitive.NewDateTimeFromTime(time.Now())}} + + if err := store.upsert(objects, bson.M{"_id": id, "metadata.destination-org-id": metaData.DestOrgID}, newObj); err != nil { return nil, &Error{fmt.Sprintf("Failed to store an object. Error: %s.", err)} } @@ -386,9 +407,10 @@ func (store *MongoStorage) StoreObject(metaData common.MetaData, data []byte, st func (store *MongoStorage) GetObjectDestinations(metaData common.MetaData) ([]common.Destination, common.SyncServiceError) { result := object{} id := getObjectCollectionID(metaData) - if err := store.fetchOne(objects, bson.M{"_id": id}, bson.M{"destinations": bson.ElementArray}, &result); err != nil { + // TODO: check if selector bson.A{"destinations", 1} is correct. Or should use bson.D{{"destinations", 1}} + if err := store.fetchOne(objects, bson.M{"_id": id}, bson.A{"destinations", 1}, &result); err != nil { switch err { - case mgo.ErrNotFound: + case mongo.ErrNoDocuments: return nil, nil default: return nil, &Error{fmt.Sprintf("Failed to retrieve object's destinations. Error: %s.", err)} @@ -406,9 +428,10 @@ func (store *MongoStorage) GetObjectDestinationsList(orgID string, objectType st objectID string) ([]common.StoreDestinationStatus, common.SyncServiceError) { result := object{} id := createObjectCollectionID(orgID, objectType, objectID) - if err := store.fetchOne(objects, bson.M{"_id": id}, bson.M{"destinations": bson.ElementArray}, &result); err != nil { + // TODO: check if selector bson.A{"destinations", 1} is correct. Or should use bson.D{{"destinations", 1}} + if err := store.fetchOne(objects, bson.M{"_id": id}, bson.A{"destinations", 1}, &result); err != nil { switch err { - case mgo.ErrNotFound: + case mongo.ErrNoDocuments: return nil, nil default: return nil, &Error{fmt.Sprintf("Failed to retrieve object's destinations. Error: %s.", err)} @@ -425,7 +448,8 @@ func (store *MongoStorage) UpdateObjectDestinations(orgID string, objectType str result := object{} id := createObjectCollectionID(orgID, objectType, objectID) - selector := bson.M{"metadata": bson.ElementDocument, "destinations": bson.ElementArray, "last-update": bson.ElementTimestamp, "status": bson.ElementString} + // bson.D{{"title", 1}, {"enrollment", 1}} + selector := bson.D{{"metadata", 1}, {"destinations", 1}, {"last-update", 1}, {"status", 1}} for i := 0; i < maxUpdateTries; i++ { if err := store.fetchOne(objects, bson.M{"_id": id}, selector, &result); err != nil { return nil, "", nil, nil, &Error{fmt.Sprintf("Failed to retrieve object's destinations. Error: %s.", err)} @@ -436,12 +460,12 @@ func (store *MongoStorage) UpdateObjectDestinations(orgID string, objectType str return nil, "", nil, nil, err } - query := bson.M{ + update := bson.M{ "$set": bson.M{"destinations": dests}, - "$currentDate": bson.M{"last-update": bson.M{"$type": "timestamp"}}, + "$currentDate": bson.M{"last-update": bson.M{"$type": "date"}}, } - if err := store.update(objects, bson.M{"_id": id, "last-update": result.LastUpdate}, query); err != nil { - if err == mgo.ErrNotFound { + if err := store.update(objects, bson.M{"_id": id, "last-update": result.LastUpdate}, update); err != nil { + if IsNotFound(err) { time.Sleep(time.Duration(sleepInMS) * time.Millisecond) continue } @@ -457,7 +481,8 @@ func (store *MongoStorage) UpdateObjectDestinations(orgID string, objectType str func (store *MongoStorage) AddObjectDestinations(orgID string, objectType string, objectID string, destinationsList []string) (*common.MetaData, string, []common.StoreDestinationStatus, common.SyncServiceError) { result := object{} id := createObjectCollectionID(orgID, objectType, objectID) - selector := bson.M{"metadata": bson.ElementDocument, "destinations": bson.ElementArray, "last-update": bson.ElementTimestamp, "status": bson.ElementString} + //selector: bson.D{{"title", 1}, {"enrollment", 1}} + selector := bson.M{"metadata": 1, "destinations": 1, "last-update": 1, "status": 1} for i := 0; i < maxUpdateTries; i++ { if err := store.fetchOne(objects, bson.M{"_id": id}, selector, &result); err != nil { return nil, "", nil, &Error{fmt.Sprintf("Failed to retrieve object's destinations. Error: %s.", err)} @@ -470,10 +495,10 @@ func (store *MongoStorage) AddObjectDestinations(orgID string, objectType string query := bson.M{ "$set": bson.M{"destinations": updatedDests}, - "$currentDate": bson.M{"last-update": bson.M{"$type": "timestamp"}}, + "$currentDate": bson.M{"last-update": bson.M{"$type": "date"}}, } if err := store.update(objects, bson.M{"_id": id, "last-update": result.LastUpdate}, query); err != nil { - if err == mgo.ErrNotFound { + if IsNotFound(err) { time.Sleep(time.Duration(sleepInMS) * time.Millisecond) continue } @@ -489,7 +514,7 @@ func (store *MongoStorage) AddObjectDestinations(orgID string, objectType string func (store *MongoStorage) DeleteObjectDestinations(orgID string, objectType string, objectID string, destinationsList []string) (*common.MetaData, string, []common.StoreDestinationStatus, common.SyncServiceError) { result := object{} id := createObjectCollectionID(orgID, objectType, objectID) - selector := bson.M{"metadata": bson.ElementDocument, "destinations": bson.ElementArray, "last-update": bson.ElementTimestamp, "status": bson.ElementString} + selector := bson.M{"metadata": 1, "destinations": 1, "last-update": 1, "status": 1} for i := 0; i < maxUpdateTries; i++ { if err := store.fetchOne(objects, bson.M{"_id": id}, selector, &result); err != nil { return nil, "", nil, &Error{fmt.Sprintf("Failed to retrieve object's destinations. Error: %s.", err)} @@ -502,10 +527,10 @@ func (store *MongoStorage) DeleteObjectDestinations(orgID string, objectType str query := bson.M{ "$set": bson.M{"destinations": updatedDests}, - "$currentDate": bson.M{"last-update": bson.M{"$type": "timestamp"}}, + "$currentDate": bson.M{"last-update": bson.M{"$type": "date"}}, } if err := store.update(objects, bson.M{"_id": id, "last-update": result.LastUpdate}, query); err != nil { - if err == mgo.ErrNotFound { + if IsNotFound(err) { time.Sleep(time.Duration(sleepInMS) * time.Millisecond) continue } @@ -530,7 +555,7 @@ func (store *MongoStorage) UpdateObjectDeliveryStatus(status string, message str for i := 0; i < maxUpdateTries; i++ { if err := store.fetchOne(objects, bson.M{"_id": id}, - bson.M{"metadata": bson.ElementDocument, "destinations": bson.ElementArray, "last-update": bson.ElementTimestamp}, + bson.M{"metadata": 1, "destinations": 1, "last-update": 1}, &result); err != nil { return false, &Error{fmt.Sprintf("Failed to retrieve object. Error: %s.", err)} } @@ -560,18 +585,18 @@ func (store *MongoStorage) UpdateObjectDeliveryStatus(status string, message str query := bson.M{ "$set": bson.M{"destinations": result.Destinations}, - "$currentDate": bson.M{"last-update": bson.M{"$type": "timestamp"}}, + "$currentDate": bson.M{"last-update": bson.M{"$type": "date"}}, } if result.MetaData.AutoDelete && status == common.Consumed && allConsumed && result.MetaData.Expiration == "" { // Delete the object by setting its expiration time to one hour expirationTime := time.Now().Add(time.Hour * time.Duration(1)).UTC().Format(time.RFC3339) query = bson.M{ "$set": bson.M{"destinations": result.Destinations, "metadata.expiration": expirationTime}, - "$currentDate": bson.M{"last-update": bson.M{"$type": "timestamp"}}, + "$currentDate": bson.M{"last-update": bson.M{"$type": "date"}}, } } if err := store.update(objects, bson.M{"_id": id, "last-update": result.LastUpdate}, query); err != nil { - if err == mgo.ErrNotFound { + if IsNotFound(err) { time.Sleep(time.Duration(sleepInMS) * time.Millisecond) continue } @@ -588,7 +613,7 @@ func (store *MongoStorage) UpdateObjectDelivering(orgID string, objectType strin id := createObjectCollectionID(orgID, objectType, objectID) for i := 0; i < maxUpdateTries; i++ { if err := store.fetchOne(objects, bson.M{"_id": id}, - bson.M{"destinations": bson.ElementArray, "last-update": bson.ElementTimestamp}, + bson.M{"destinations": 1, "last-update": 1}, &result); err != nil { return &Error{fmt.Sprintf("Failed to retrieve object. Error: %s.", err)} } @@ -599,9 +624,9 @@ func (store *MongoStorage) UpdateObjectDelivering(orgID string, objectType strin if err := store.update(objects, bson.M{"_id": id, "last-update": result.LastUpdate}, bson.M{ "$set": bson.M{"destinations": result.Destinations}, - "$currentDate": bson.M{"last-update": bson.M{"$type": "timestamp"}}, + "$currentDate": bson.M{"last-update": bson.M{"$type": "date"}}, }); err != nil { - if err == mgo.ErrNotFound { + if IsNotFound(err) { time.Sleep(time.Duration(sleepInMS) * time.Millisecond) continue } @@ -616,9 +641,9 @@ func (store *MongoStorage) UpdateObjectDelivering(orgID string, objectType strin func (store *MongoStorage) RetrieveObjectStatus(orgID string, objectType string, objectID string) (string, common.SyncServiceError) { result := object{} id := createObjectCollectionID(orgID, objectType, objectID) - if err := store.fetchOne(objects, bson.M{"_id": id}, bson.M{"status": bson.ElementString}, &result); err != nil { + if err := store.fetchOne(objects, bson.M{"_id": id}, bson.M{"status": 1}, &result); err != nil { switch err { - case mgo.ErrNotFound: + case mongo.ErrNoDocuments: return "", nil default: return "", &Error{fmt.Sprintf("Failed to retrieve object's status. Error: %s.", err)} @@ -632,7 +657,7 @@ func (store *MongoStorage) RetrieveObjectStatus(orgID string, objectType string, func (store *MongoStorage) RetrieveObjectRemainingConsumers(orgID string, objectType string, objectID string) (int, common.SyncServiceError) { result := object{} id := createObjectCollectionID(orgID, objectType, objectID) - if err := store.fetchOne(objects, bson.M{"_id": id}, bson.M{"remaining-consumers": bson.ElementInt32}, &result); err != nil { + if err := store.fetchOne(objects, bson.M{"_id": id}, bson.M{"remaining-consumers": 1}, &result); err != nil { return 0, &Error{fmt.Sprintf("Failed to retrieve object's remaining comsumers. Error: %s.", err)} } return result.RemainingConsumers, nil @@ -645,12 +670,12 @@ func (store *MongoStorage) DecrementAndReturnRemainingConsumers(orgID string, ob if err := store.update(objects, bson.M{"_id": id}, bson.M{ "$inc": bson.M{"remaining-consumers": -1}, - "$currentDate": bson.M{"last-update": bson.M{"$type": "timestamp"}}, + "$currentDate": bson.M{"last-update": bson.M{"$type": "date"}}, }); err != nil { return 0, &Error{fmt.Sprintf("Failed to decrement object's remaining consumers. Error: %s.", err)} } result := object{} - if err := store.fetchOne(objects, bson.M{"_id": id}, bson.M{"remaining-consumers": bson.ElementInt32}, &result); err != nil { + if err := store.fetchOne(objects, bson.M{"_id": id}, bson.M{"remaining-consumers": 1}, &result); err != nil { return 0, &Error{fmt.Sprintf("Failed to retrieve object's remaining consumers. Error: %s.", err)} } return result.RemainingConsumers, nil @@ -663,12 +688,12 @@ func (store *MongoStorage) DecrementAndReturnRemainingReceivers(orgID string, ob if err := store.update(objects, bson.M{"_id": id}, bson.M{ "$inc": bson.M{"remaining-receivers": -1}, - "$currentDate": bson.M{"last-update": bson.M{"$type": "timestamp"}}, + "$currentDate": bson.M{"last-update": bson.M{"$type": "date"}}, }); err != nil { return 0, &Error{fmt.Sprintf("Failed to decrement object's remaining receivers. Error: %s.", err)} } result := object{} - if err := store.fetchOne(objects, bson.M{"_id": id}, bson.M{"remaining-receivers": bson.ElementInt32}, &result); err != nil { + if err := store.fetchOne(objects, bson.M{"_id": id}, bson.M{"remaining-receivers": 1}, &result); err != nil { return 0, &Error{fmt.Sprintf("Failed to retrieve object's remaining receivers. Error: %s.", err)} } return result.RemainingReceivers, nil @@ -678,14 +703,14 @@ func (store *MongoStorage) DecrementAndReturnRemainingReceivers(orgID string, ob func (store *MongoStorage) ResetObjectRemainingConsumers(orgID string, objectType string, objectID string) common.SyncServiceError { id := createObjectCollectionID(orgID, objectType, objectID) result := object{} - if err := store.fetchOne(objects, bson.M{"_id": id}, bson.M{"metadata": bson.ElementDocument}, &result); err != nil { + if err := store.fetchOne(objects, bson.M{"_id": id}, bson.M{"metadata": 1}, &result); err != nil { return &Error{fmt.Sprintf("Failed to retrieve object. Error: %s.", err)} } if err := store.update(objects, bson.M{"_id": id}, bson.M{ "$set": bson.M{"remaining-consumers": result.MetaData.ExpectedConsumers}, - "$currentDate": bson.M{"last-update": bson.M{"$type": "timestamp"}}, + "$currentDate": bson.M{"last-update": bson.M{"$type": "date"}}, }); err != nil { return &Error{fmt.Sprintf("Failed to reset object's remaining comsumers. Error: %s.", err)} } @@ -698,20 +723,20 @@ func (store *MongoStorage) RetrieveUpdatedObjects(orgID string, objectType strin result := []object{} var query interface{} if received { - query = bson.M{"$or": []bson.M{ + query = bson.M{"$or": bson.A{ bson.M{"status": common.CompletelyReceived}, bson.M{"status": common.ObjReceived}, bson.M{"status": common.ObjDeleted}}, "metadata.destination-org-id": orgID, "metadata.object-type": objectType} } else { - query = bson.M{"$or": []bson.M{ + query = bson.M{"$or": bson.A{ bson.M{"status": common.CompletelyReceived}, bson.M{"status": common.ObjDeleted}}, "metadata.destination-org-id": orgID, "metadata.object-type": objectType} } if err := store.fetchAll(objects, query, nil, &result); err != nil { switch err { - case mgo.ErrNotFound: + case mongo.ErrNoDocuments: return nil, nil default: return nil, &Error{fmt.Sprintf("Failed to fetch the objects. Error: %s.", err)} @@ -732,7 +757,7 @@ func (store *MongoStorage) RetrieveObjectsWithDestinationPolicy(orgID string, re if received { query = bson.M{ "metadata.destination-org-id": orgID, - "$and": []bson.M{ + "$and": bson.A{ bson.M{"status": bson.M{"$ne": common.ObjDeleted}}, bson.M{"metadata.destination-policy": bson.M{"$ne": nil}}, }, @@ -741,7 +766,7 @@ func (store *MongoStorage) RetrieveObjectsWithDestinationPolicy(orgID string, re query = bson.M{ "metadata.destination-org-id": orgID, "policy-received": false, - "$and": []bson.M{ + "$and": bson.A{ bson.M{"status": bson.M{"$ne": common.ObjDeleted}}, bson.M{"metadata.destination-policy": bson.M{"$ne": nil}}, }, @@ -815,14 +840,14 @@ func (store *MongoStorage) RetrieveObjectsWithFilters(orgID string, destinationP } if destinationType != "" { - var subquery []bson.M + var subquery bson.A if destinationID == "" { - subquery = []bson.M{ + subquery = bson.A{ bson.M{"metadata.destination-type": destinationType}, bson.M{"metadata.destinations-list": bson.M{"$regex": destinationType + ":*"}}, } } else { - subquery = []bson.M{ + subquery = bson.A{ bson.M{"metadata.destination-type": destinationType, "metadata.destination-id": destinationID}, bson.M{"metadata.destinations-list": destinationType + ":" + destinationID}, } @@ -849,7 +874,7 @@ func (store *MongoStorage) RetrieveObjectsWithFilters(orgID string, destinationP if err := store.fetchAll(objects, query, nil, &result); err != nil { switch err { - case mgo.ErrNotFound: + case mongo.ErrNoDocuments: return nil, nil default: return nil, &Error{fmt.Sprintf("Failed to fetch the objects. Error: %s.", err)} @@ -879,7 +904,7 @@ func (store *MongoStorage) RetrieveAllObjects(orgID string, objectType string) ( func (store *MongoStorage) RetrieveObjects(orgID string, destType string, destID string, resend int) ([]common.MetaData, common.SyncServiceError) { result := []object{} query := bson.M{"metadata.destination-org-id": orgID, - "$or": []bson.M{ + "$or": bson.A{ bson.M{"status": common.ReadyToSend}, bson.M{"status": common.NotReadyToSend}, }} @@ -888,7 +913,7 @@ OUTER: for i := 0; i < maxUpdateTries; i++ { if err := store.fetchAll(objects, query, nil, &result); err != nil { switch err { - case mgo.ErrNotFound: + case mongo.ErrNoDocuments: return nil, nil default: return nil, &Error{fmt.Sprintf("Failed to fetch the objects. Error: %s.", err)} @@ -937,9 +962,9 @@ OUTER: if err := store.update(objects, bson.M{"_id": id, "last-update": r.LastUpdate}, bson.M{ "$set": bson.M{"destinations": r.Destinations}, - "$currentDate": bson.M{"last-update": bson.M{"$type": "timestamp"}}, + "$currentDate": bson.M{"last-update": bson.M{"$type": "date"}}, }); err != nil { - if err == mgo.ErrNotFound { + if IsNotFound(err) { time.Sleep(time.Duration(sleepInMS) * time.Millisecond) continue OUTER } @@ -964,9 +989,9 @@ func (store *MongoStorage) RetrieveConsumedObjects() ([]common.ConsumedObject, c func (store *MongoStorage) RetrieveObject(orgID string, objectType string, objectID string) (*common.MetaData, common.SyncServiceError) { result := object{} id := createObjectCollectionID(orgID, objectType, objectID) - if err := store.fetchOne(objects, bson.M{"_id": id}, bson.M{"metadata": bson.ElementDocument}, &result); err != nil { + if err := store.fetchOne(objects, bson.M{"_id": id}, bson.M{"metadata": 1}, &result); err != nil { switch err { - case mgo.ErrNotFound: + case mongo.ErrNoDocuments: return nil, nil default: return nil, &Error{fmt.Sprintf("Failed to fetch the object. Error: %s.", err)} @@ -977,11 +1002,11 @@ func (store *MongoStorage) RetrieveObject(orgID string, objectType string, objec // RetrieveObjectAndStatus returns the object meta data and status with the specified parameters func (store *MongoStorage) RetrieveObjectAndStatus(orgID string, objectType string, objectID string) (*common.MetaData, string, common.SyncServiceError) { - result := object{} + result := &object{} id := createObjectCollectionID(orgID, objectType, objectID) - if err := store.fetchOne(objects, bson.M{"_id": id}, nil, &result); err != nil { + if err := store.fetchOne(objects, bson.M{"_id": id}, nil, result); err != nil { switch err { - case mgo.ErrNotFound: + case mongo.ErrNoDocuments: return nil, "", nil default: return nil, "", &Error{fmt.Sprintf("Failed to fetch the object. Error: %s.", err)} @@ -999,30 +1024,34 @@ func (store *MongoStorage) RetrieveObjectData(orgID string, objectType string, o id = createObjectCollectionID(orgID, objectType, objectID) } - fileHandle, err := store.openFile(id) + if store.gridfsBucket == nil { + if bucket, err := gridfs.NewBucket(store.database); err != nil { + return nil, err + } else { + store.gridfsBucket = bucket + } + } + + downloadStream, err := store.gridfsBucket.OpenDownloadStreamByName(id) if err != nil { switch err { - case mgo.ErrNotFound: + case mongo.ErrNoDocuments: + return nil, nil + case gridfs.ErrFileNotFound: return nil, nil default: - return nil, &Error{fmt.Sprintf("Failed to open file to read the data. Error: %s.", err)} + return nil, &Error{fmt.Sprintf("Failed to find file to read the data. Error: %s.", err)} } } - store.putFileHandle(id, fileHandle) - return fileHandle.file, nil + return downloadStream, nil } // CloseDataReader closes the data reader if necessary func (store *MongoStorage) CloseDataReader(dataReader io.Reader) common.SyncServiceError { switch v := dataReader.(type) { - case *mgo.GridFile: - err := v.Close() - if id, ok := v.Id().(string); ok { - if fileHandle := store.getFileHandle(id); fileHandle != nil { - store.deleteFileHandle(id) - } - } - return err + case *gridfs.DownloadStream: + v.Close() + return nil default: return nil } @@ -1033,42 +1062,53 @@ func (store *MongoStorage) ReadObjectData(orgID string, objectType string, objec id := createObjectCollectionID(orgID, objectType, objectID) fileHandle, err := store.openFile(id) if err != nil { - if err == mgo.ErrNotFound { + if err == mongo.ErrNoDocuments { return nil, true, 0, &common.NotFound{} } return nil, true, 0, &Error{fmt.Sprintf("Failed to open file to read the data. Error: %s.", err)} } offset64 := int64(offset) - if offset64 >= fileHandle.file.Size() { - fileHandle.file.Close() + if offset64 >= fileHandle.GetFile().Length { + fileHandle.Close() return make([]byte, 0), true, 0, nil } - _, err = fileHandle.file.Seek(offset64, 0) + b := make([]byte, fileHandle.GetFile().Length) + _, err = fileHandle.Read(b) if err != nil { - fileHandle.file.Close() + fileHandle.Close() return nil, true, 0, &Error{fmt.Sprintf("Failed to read the data. Error: %s.", err)} } - s := int64(size) - if s > fileHandle.file.Size()-offset64 { - s = fileHandle.file.Size() - offset64 + + if err = fileHandle.Close(); err != nil { + return nil, true, 0, &Error{fmt.Sprintf("Failed to close the file. Error: %s.", err)} } - b := make([]byte, s) - n, err := fileHandle.file.Read(b) + + br := bytes.NewReader(b) + _, err = br.Seek(offset64, 0) if err != nil { - fileHandle.file.Close() return nil, true, 0, &Error{fmt.Sprintf("Failed to read the data. Error: %s.", err)} } - if err = fileHandle.file.Close(); err != nil { - return nil, true, 0, &Error{fmt.Sprintf("Failed to close the file. Error: %s.", err)} + + s := int64(size) + if s > fileHandle.GetFile().Length-offset64 { + s = fileHandle.GetFile().Length - offset64 + } + + ret := make([]byte, s) + n, err := br.Read(ret) + + if err != nil { + return nil, true, 0, &Error{fmt.Sprintf("Failed to read the data. Error: %s.", err)} } + eof := false - if fileHandle.file.Size()-offset64 == int64(n) { + if fileHandle.GetFile().Length-offset64 == int64(n) { eof = true } - return b, eof, n, nil + return ret, eof, n, nil } // StoreObjectData stores object's data @@ -1077,9 +1117,9 @@ func (store *MongoStorage) ReadObjectData(orgID string, objectType string, objec func (store *MongoStorage) StoreObjectData(orgID string, objectType string, objectID string, dataReader io.Reader) (bool, common.SyncServiceError) { id := createObjectCollectionID(orgID, objectType, objectID) result := object{} - if err := store.fetchOne(objects, bson.M{"_id": id}, bson.M{"status": bson.ElementString}, &result); err != nil { + if err := store.fetchOne(objects, bson.M{"_id": id}, bson.M{"status": 1}, &result); err != nil { switch err { - case mgo.ErrNotFound: + case mongo.ErrNoDocuments: return false, nil default: return false, &Error{fmt.Sprintf("Failed to store the data. Error: %s.", err)} @@ -1096,18 +1136,22 @@ func (store *MongoStorage) StoreObjectData(orgID string, objectType string, obje if err := store.update(objects, bson.M{"_id": id}, bson.M{ "$set": bson.M{"metadata.data-id": newID, "metadata.instance-id": newID}, - "$currentDate": bson.M{"last-update": bson.M{"$type": "timestamp"}}, + "$currentDate": bson.M{"last-update": bson.M{"$type": "date"}}, }); err != nil { return false, &Error{fmt.Sprintf("Failed to set instance id. Error: %s.", err)} } } - _, size, err := store.copyDataToFile(id, dataReader, true, true) - if err != nil { + if err := store.copyDataToFile(id, dataReader); err != nil { return false, err } + fileInfo, err := store.getFileInfo(id) + if err != nil { + return false, &Error{fmt.Sprintf("Failed to get mongo file information. Error: %s.", err)} + } + // Update object size - if err := store.update(objects, bson.M{"_id": id}, bson.M{"$set": bson.M{"metadata.object-size": size}}); err != nil { + if err := store.update(objects, bson.M{"_id": id}, bson.M{"$set": bson.M{"metadata.object-size": fileInfo.Length}}); err != nil { return false, &Error{fmt.Sprintf("Failed to update object's size. Error: %s.", err)} } @@ -1146,8 +1190,8 @@ func (store *MongoStorage) RemoveObjectTempData(orgID string, objectType string, fileHandle, _ := store.retrieveObjectTempData(id) if fileHandle != nil { - store.CloseDataReader(fileHandle.file) - store.deleteFileHandle(id) + fileHandle.Close() + //store.deleteFileHandle(id) //Don't return on errors store.removeFile(id) @@ -1191,7 +1235,7 @@ func (store *MongoStorage) RetrieveObjectTempData(orgID string, objectType strin } if fileHandle != nil { - readers = append(readers, fileHandle.file) + readers = append(readers, fileHandle) } chunkNumber += 1 @@ -1207,11 +1251,11 @@ func (store *MongoStorage) RetrieveObjectTempData(orgID string, objectType strin } -func (store *MongoStorage) retrieveObjectTempData(id string) (*fileHandle, common.SyncServiceError) { +func (store *MongoStorage) retrieveObjectTempData(id string) (*gridfs.DownloadStream, common.SyncServiceError) { fileHandle, err := store.openFile(id) if err != nil { switch err { - case mgo.ErrNotFound: + case mongo.ErrNoDocuments: return nil, nil default: return nil, &Error{fmt.Sprintf("Failed to open file to read the data. Error: %s.", err)} @@ -1284,39 +1328,12 @@ func (store *MongoStorage) AppendObjectData(orgID string, objectType string, obj // In Mongo implementation, all data passed to this function is to be stored in temp data id := createTempObjectCollectionID(orgID, objectType, objectID, chunkNumber) - var fileHandle *fileHandle + //var fileHandle *fileHandle store.removeFile(id) - fh, err := store.createFile(id) - if err != nil { + br := bytes.NewReader(data) + if err = store.createFile(id, br); err != nil { return isLastChunk, err } - fileHandle = fh - store.putFileHandle(id, fileHandle) - - for { - if trace.IsLogging(logger.TRACE) { - trace.Trace("Put data (data size: %d) in file %s\n", len(data), id) - } - n, err = fileHandle.file.Write(data) - if err != nil { - return isLastChunk, &Error{fmt.Sprintf("Failed to write the data to the file. Error: %s.", err)} - } - if n != len(data) { - return isLastChunk, &Error{fmt.Sprintf("Failed to write all the data to the file. Wrote %d instead of %d.", n, len(data))} - } - fileHandle.offset += int64(n) - if fileHandle.chunks == nil { - break - } - data = fileHandle.chunks[fileHandle.offset] - if data == nil { - break - } - delete(fileHandle.chunks, fileHandle.offset) - if trace.IsLogging(logger.TRACE) { - trace.Trace(" Get data (%d) from map at offset %d\n", len(data), offset) - } - } if updatedLastChunk { if trace.IsLogging(logger.TRACE) { @@ -1324,8 +1341,8 @@ func (store *MongoStorage) AppendObjectData(orgID string, objectType string, obj } } - store.deleteFileHandle(id) - err = fileHandle.file.Close() + //store.deleteFileHandle(id) + //err = fileHandle.Close() if err != nil { return updatedLastChunk, &Error{fmt.Sprintf("Failed to close the file. Error: %s.", err)} } @@ -1362,9 +1379,9 @@ func (store *MongoStorage) HandleObjectInfoForLastDataChunk(orgID string, object id := createObjectCollectionID(orgID, objectType, objectID) result := object{} - if err := store.fetchOne(objects, bson.M{"_id": id}, bson.M{"status": bson.ElementString}, &result); err != nil { + if err := store.fetchOne(objects, bson.M{"_id": id}, bson.M{"status": 1}, &result); err != nil { switch err { - case mgo.ErrNotFound: + case mongo.ErrNoDocuments: return false, nil default: return false, &Error{fmt.Sprintf("Failed to store the data. Error: %s.", err)} @@ -1379,7 +1396,7 @@ func (store *MongoStorage) HandleObjectInfoForLastDataChunk(orgID string, object if err := store.update(objects, bson.M{"_id": id}, bson.M{ "$set": bson.M{"metadata.data-id": newID, "metadata.instance-id": newID}, - "$currentDate": bson.M{"last-update": bson.M{"$type": "timestamp"}}, + "$currentDate": bson.M{"last-update": bson.M{"$type": "date"}}, }); err != nil { return false, &Error{fmt.Sprintf("Failed to set instance id. Error: %s.", err)} } @@ -1399,7 +1416,7 @@ func (store *MongoStorage) UpdateObjectStatus(orgID string, objectType string, o if err := store.update(objects, bson.M{"_id": id}, bson.M{ "$set": bson.M{"status": status}, - "$currentDate": bson.M{"last-update": bson.M{"$type": "timestamp"}}, + "$currentDate": bson.M{"last-update": bson.M{"$type": "date"}}, }); err != nil { return &Error{fmt.Sprintf("Failed to update object's status. Error: %s.", err)} } @@ -1417,7 +1434,7 @@ func (store *MongoStorage) MarkObjectDeleted(orgID string, objectType string, ob if err := store.update(objects, bson.M{"_id": id}, bson.M{ "$set": bson.M{"status": common.ObjDeleted, "metadata.deleted": true}, - "$currentDate": bson.M{"last-update": bson.M{"$type": "timestamp"}}, + "$currentDate": bson.M{"last-update": bson.M{"$type": "date"}}, }); err != nil { return &Error{fmt.Sprintf("Failed to mark object as deleted. Error: %s.", err)} } @@ -1430,7 +1447,7 @@ func (store *MongoStorage) MarkDestinationPolicyReceived(orgID string, objectTyp if err := store.update(objects, bson.M{"_id": id}, bson.M{ "$set": bson.M{"policy-received": true}, - "$currentDate": bson.M{"last-update": bson.M{"$type": "timestamp"}}, + "$currentDate": bson.M{"last-update": bson.M{"$type": "date"}}, }); err != nil { return &Error{fmt.Sprintf("Failed to mark an object's destination policy as received. Error: %s", err)} } @@ -1442,7 +1459,7 @@ func (store *MongoStorage) ActivateObject(orgID string, objectType string, objec id := createObjectCollectionID(orgID, objectType, objectID) if err := store.update(objects, bson.M{"_id": id}, bson.M{"$set": bson.M{"metadata.inactive": false}, - "$currentDate": bson.M{"last-update": bson.M{"$type": "timestamp"}}, + "$currentDate": bson.M{"last-update": bson.M{"$type": "date"}}, }); err != nil { return &Error{fmt.Sprintf("Failed to mark object as active. Error: %s.", err)} } @@ -1451,7 +1468,8 @@ func (store *MongoStorage) ActivateObject(orgID string, objectType string, objec // DeleteStoredObject deletes the object func (store *MongoStorage) DeleteStoredObject(orgID string, objectType string, objectID string) common.SyncServiceError { - return store.deleteObject(orgID, objectType, objectID, -1) + t := time.Date(0001, 1, 1, 00, 00, 00, 00, time.UTC) + return store.deleteObject(orgID, objectType, objectID, t) } // DeleteStoredData deletes the object's data @@ -1492,7 +1510,7 @@ func (store *MongoStorage) CleanObjects() common.SyncServiceError { // currently stored in this node's storage func (store *MongoStorage) GetNumberOfStoredObjects() (uint32, common.SyncServiceError) { query := bson.M{ - "$or": []bson.M{ + "$or": bson.A{ bson.M{"status": common.ReadyToSend}, bson.M{"status": common.NotReadyToSend}, bson.M{"status": common.Verifying}, @@ -1510,12 +1528,12 @@ func (store *MongoStorage) AddWebhook(orgID string, objectType string, url strin result := &webhookObject{} for i := 0; i < maxUpdateTries; i++ { if err := store.fetchOne(webhooks, bson.M{"_id": id}, nil, &result); err != nil { - if err == mgo.ErrNotFound { + if err == mongo.ErrNoDocuments { result.Hooks = make([]string, 0) result.Hooks = append(result.Hooks, url) result.ID = id if err = store.insert(webhooks, result); err != nil { - if mgo.IsDup(err) { + if mongo.IsDuplicateKeyError(err) { time.Sleep(time.Duration(sleepInMS) * time.Millisecond) continue } @@ -1536,9 +1554,9 @@ func (store *MongoStorage) AddWebhook(orgID string, objectType string, url strin if err := store.update(webhooks, bson.M{"_id": id, "last-update": result.LastUpdate}, bson.M{ "$set": bson.M{"hooks": result.Hooks}, - "$currentDate": bson.M{"last-update": bson.M{"$type": "timestamp"}}, + "$currentDate": bson.M{"last-update": bson.M{"$type": "date"}}, }); err != nil { - if err == mgo.ErrNotFound { + if IsNotFound(err) { time.Sleep(time.Duration(sleepInMS) * time.Millisecond) continue } @@ -1575,9 +1593,9 @@ func (store *MongoStorage) DeleteWebhook(orgID string, objectType string, url st if err := store.update(webhooks, bson.M{"_id": id, "last-update": result.LastUpdate}, bson.M{ "$set": bson.M{"hooks": result.Hooks}, - "$currentDate": bson.M{"last-update": bson.M{"$type": "timestamp"}}, + "$currentDate": bson.M{"last-update": bson.M{"$type": "date"}}, }); err != nil { - if err == mgo.ErrNotFound { + if IsNotFound(err) { time.Sleep(time.Duration(sleepInMS) * time.Millisecond) continue } @@ -1611,7 +1629,7 @@ func (store *MongoStorage) RetrieveDestinations(orgID string, destType string) ( if orgID == "" { if destType == "" { - err = store.fetchAll(destinations, nil, nil, &result) + err = store.fetchAll(destinations, bson.M{}, nil, &result) } else { err = store.fetchAll(destinations, bson.M{"destination.destination-type": destType}, nil, &result) } @@ -1622,7 +1640,7 @@ func (store *MongoStorage) RetrieveDestinations(orgID string, destType string) ( err = store.fetchAll(destinations, bson.M{"destination.destination-org-id": orgID, "destination.destination-type": destType}, nil, &result) } } - if err != nil && err != mgo.ErrNotFound { + if err != nil && err != mongo.ErrNoDocuments { return nil, &Error{fmt.Sprintf("Failed to fetch the destinations. Error: %s.", err)} } @@ -1638,7 +1656,7 @@ func (store *MongoStorage) DestinationExists(orgID string, destType string, dest result := destinationObject{} id := createDestinationCollectionID(orgID, destType, destID) if err := store.fetchOne(destinations, bson.M{"_id": id}, nil, &result); err != nil { - if err == mgo.ErrNotFound { + if err == mongo.ErrNoDocuments { return false, nil } return false, err @@ -1649,8 +1667,9 @@ func (store *MongoStorage) DestinationExists(orgID string, destType string, dest // StoreDestination stores the destination func (store *MongoStorage) StoreDestination(destination common.Destination) common.SyncServiceError { id := getDestinationCollectionID(destination) - newObject := destinationObject{ID: id, Destination: destination} - err := store.upsert(destinations, bson.M{"_id": id, "destination.destination-org-id": destination.DestOrgID}, newObject) + filter := bson.D{{"_id", id}, {"destination.destination-org-id", destination.DestOrgID}} + newObject := bson.D{{"$set", bson.D{{"_id", id}, {"destination", destination}, {"last-ping-time", primitive.NewDateTimeFromTime(time.Now())}}}} + err := store.upsert(destinations, filter, newObject) if err != nil { return &Error{fmt.Sprintf("Failed to store a destination. Error: %s.", err)} } @@ -1671,11 +1690,11 @@ func (store *MongoStorage) UpdateDestinationLastPingTime(destination common.Dest id := getDestinationCollectionID(destination) err := store.update(destinations, bson.M{"_id": id}, - bson.M{"$currentDate": bson.M{"last-ping-time": bson.M{"$type": "timestamp"}}}, + bson.M{"$currentDate": bson.M{"last-ping-time": bson.M{"$type": "date"}}}, ) if err != nil { - if err == mgo.ErrNotFound { - return &NotFound{} + if IsNotFound(err) { + return err } return &Error{fmt.Sprintf("Failed to update the last ping time for destination. Error: %s\n", err)} } @@ -1685,15 +1704,11 @@ func (store *MongoStorage) UpdateDestinationLastPingTime(destination common.Dest // RemoveInactiveDestinations removes destinations that haven't sent ping since the provided timestamp func (store *MongoStorage) RemoveInactiveDestinations(lastTimestamp time.Time) { - timestamp, err := bson.NewMongoTimestamp(lastTimestamp, 1) - if err != nil { - return - } - query := bson.M{"last-ping-time": bson.M{"$lte": timestamp}} - selector := bson.M{"destination": bson.ElementDocument} + query := bson.M{"last-ping-time": bson.M{"$lte": lastTimestamp}} + selector := bson.M{"destination": 1} dests := []destinationObject{} if err := store.fetchAll(destinations, query, selector, &dests); err != nil { - if err != mgo.ErrNotFound && log.IsLogging(logger.ERROR) { + if err != mongo.ErrNoDocuments && log.IsLogging(logger.ERROR) { log.Error("Error in mongoStorage.RemoveInactiveDestinations: failed to remove inactive destinations. Error: %s\n", err) } return @@ -1703,11 +1718,11 @@ func (store *MongoStorage) RemoveInactiveDestinations(lastTimestamp time.Time) { } for _, d := range dests { if err := store.DeleteNotificationRecords(d.Destination.DestOrgID, "", "", d.Destination.DestType, d.Destination.DestID); err != nil && - err != mgo.ErrNotFound && log.IsLogging(logger.ERROR) { + err != mongo.ErrNoDocuments && log.IsLogging(logger.ERROR) { log.Error("Error in mongoStorage.RemoveInactiveDestinations: failed to remove notifications for inactive destinations. Error: %s\n", err) } if err := store.DeleteDestination(d.Destination.DestOrgID, d.Destination.DestType, d.Destination.DestID); err != nil && - err != mgo.ErrNotFound && log.IsLogging(logger.ERROR) { + err != mongo.ErrNoDocuments && log.IsLogging(logger.ERROR) { log.Error("Error in mongoStorage.RemoveInactiveDestinations: failed to remove inactive destination. Error: %s\n", err) } } @@ -1733,7 +1748,7 @@ func (store *MongoStorage) RetrieveDestination(orgID string, destType string, de result := destinationObject{} id := createDestinationCollectionID(orgID, destType, destID) if err := store.fetchOne(destinations, bson.M{"_id": id}, nil, &result); err != nil { - if err != mgo.ErrNotFound { + if err != mongo.ErrNoDocuments { return nil, &Error{fmt.Sprintf("Failed to fetch the destination. Error: %s.", err)} } return nil, &NotFound{fmt.Sprintf(" The destination %s:%s does not exist", destType, destID)} @@ -1744,7 +1759,7 @@ func (store *MongoStorage) RetrieveDestination(orgID string, destType string, de // GetObjectsForDestination retrieves objects that are in use on a given node func (store *MongoStorage) GetObjectsForDestination(orgID string, destType string, destID string) ([]common.ObjectStatus, common.SyncServiceError) { notificationRecords := []notificationObject{} - query := bson.M{"$or": []bson.M{ + query := bson.M{"$or": bson.A{ bson.M{"notification.status": common.Update}, bson.M{"notification.status": common.UpdatePending}, bson.M{"notification.status": common.Updated}, @@ -1755,7 +1770,7 @@ func (store *MongoStorage) GetObjectsForDestination(orgID string, destType strin "notification.destination-id": destID, "notification.destination-type": destType} - if err := store.fetchAll(notifications, query, nil, ¬ificationRecords); err != nil && err != mgo.ErrNotFound { + if err := store.fetchAll(notifications, query, nil, ¬ificationRecords); err != nil && err != mongo.ErrNoDocuments { return nil, &Error{fmt.Sprintf("Failed to fetch the notifications. Error: %s.", err)} } @@ -1798,7 +1813,7 @@ func (store *MongoStorage) RetrieveAllObjectsAndUpdateDestinationListForDestinat if err := store.fetchAll(objects, query, nil, &result); err != nil { switch err { - case mgo.ErrNotFound: + case mongo.ErrNoDocuments: return nil, nil default: return nil, &Error{fmt.Sprintf("Failed to fetch the objects for destination %s %s %s from storage. Error: %s.", destOrgID, destType, destID, err)} @@ -1818,10 +1833,10 @@ func (store *MongoStorage) RetrieveAllObjectsAndUpdateDestinationListForDestinat query := bson.M{ "$set": bson.M{"destinations": updatedDestinationList}, - "$currentDate": bson.M{"last-update": bson.M{"$type": "timestamp"}}, + "$currentDate": bson.M{"last-update": bson.M{"$type": "date"}}, } if err := store.update(objects, bson.M{"_id": r.ID, "last-update": r.LastUpdate}, query); err != nil { - if err == mgo.ErrNotFound { + if IsNotFound(err) { continue } emptyMeta := make([]common.MetaData, 0) @@ -1850,7 +1865,7 @@ func (store *MongoStorage) UpdateNotificationRecord(notification common.Notifica resendTime := time.Now().Unix() + int64(common.Configuration.ResendInterval*6) notification.ResendTime = resendTime } - n := notificationObject{ID: id, Notification: notification} + n := bson.M{"$set": bson.M{"_id": id, "notification": notification}} err := store.upsert(notifications, bson.M{ "_id": id, @@ -1862,6 +1877,9 @@ func (store *MongoStorage) UpdateNotificationRecord(notification common.Notifica if err != nil { return &Error{fmt.Sprintf("Failed to update notification record. Error: %s.", err)} } + + no := notificationObject{} + _ = store.fetchOne(notifications, bson.M{"_id": id}, nil, &no) return nil } @@ -1881,7 +1899,7 @@ func (store *MongoStorage) RetrieveNotificationRecord(orgID string, objectType s id := createNotificationCollectionID(orgID, objectType, objectID, destType, destID) result := notificationObject{} if err := store.fetchOne(notifications, bson.M{"_id": id}, nil, &result); err != nil { - if err == mgo.ErrNotFound { + if err == mongo.ErrNoDocuments { return nil, nil } return nil, &Error{fmt.Sprintf("Failed to fetch the notification. Error: %s.", err)} @@ -1907,7 +1925,7 @@ func (store *MongoStorage) DeleteNotificationRecords(orgID string, objectType st "notification.destination-id": destID}) } - if err != nil && err != mgo.ErrNotFound { + if err != nil && err != mongo.ErrNoDocuments { return &Error{fmt.Sprintf("Failed to delete notification records. Error: %s.", err)} } return nil @@ -1918,13 +1936,13 @@ func (store *MongoStorage) RetrieveNotifications(orgID string, destType string, result := []notificationObject{} var query bson.M if destType == "" && destID == "" { - currentTime := time.Now().Unix() + currentTime := primitive.NewDateTimeFromTime(time.Now()) - query = bson.M{"$or": []bson.M{ + query = bson.M{"$or": bson.A{ bson.M{"notification.status": common.Getdata}, bson.M{ "notification.resend-time": bson.M{"$lte": currentTime}, - "$or": []bson.M{ + "$or": bson.A{ bson.M{"notification.status": common.Update}, bson.M{"notification.status": common.Received}, bson.M{"notification.status": common.Consumed}, @@ -1934,7 +1952,7 @@ func (store *MongoStorage) RetrieveNotifications(orgID string, destType string, bson.M{"notification.status": common.Error}}}}} } else { if retrieveReceived { - query = bson.M{"$or": []bson.M{ + query = bson.M{"$or": bson.A{ bson.M{"notification.status": common.Update}, bson.M{"notification.status": common.Updated}, bson.M{"notification.status": common.Received}, @@ -1947,7 +1965,7 @@ func (store *MongoStorage) RetrieveNotifications(orgID string, destType string, "notification.destination-id": destID, "notification.destination-type": destType} } else { - query = bson.M{"$or": []bson.M{ + query = bson.M{"$or": bson.A{ bson.M{"notification.status": common.Update}, bson.M{"notification.status": common.Received}, bson.M{"notification.status": common.Consumed}, @@ -1959,7 +1977,7 @@ func (store *MongoStorage) RetrieveNotifications(orgID string, destType string, "notification.destination-type": destType} } } - if err := store.fetchAll(notifications, query, nil, &result); err != nil && err != mgo.ErrNotFound { + if err := store.fetchAll(notifications, query, nil, &result); err != nil && err != mongo.ErrNoDocuments { return nil, &Error{fmt.Sprintf("Failed to fetch the notifications. Error: %s.", err)} } @@ -1976,7 +1994,7 @@ func (store *MongoStorage) RetrievePendingNotifications(orgID string, destType s var query bson.M if destType == "" && destID == "" { - query = bson.M{"$or": []bson.M{ + query = bson.M{"$or": bson.A{ bson.M{"notification.status": common.UpdatePending}, bson.M{"notification.status": common.ReceivedPending}, bson.M{"notification.status": common.ConsumedPending}, @@ -1984,7 +2002,7 @@ func (store *MongoStorage) RetrievePendingNotifications(orgID string, destType s bson.M{"notification.status": common.DeletedPending}}, "notification.destination-org-id": orgID} } else { - query = bson.M{"$or": []bson.M{ + query = bson.M{"$or": bson.A{ bson.M{"notification.status": common.UpdatePending}, bson.M{"notification.status": common.ReceivedPending}, bson.M{"notification.status": common.ConsumedPending}, @@ -1994,7 +2012,7 @@ func (store *MongoStorage) RetrievePendingNotifications(orgID string, destType s "notification.destination-id": destID, "notification.destination-type": destType} } - if err := store.fetchAll(notifications, query, nil, &result); err != nil && err != mgo.ErrNotFound { + if err := store.fetchAll(notifications, query, nil, &result); err != nil && err != mongo.ErrNoDocuments { return nil, &Error{fmt.Sprintf("Failed to fetch the notifications. Error: %s.", err)} } @@ -2011,7 +2029,7 @@ func (store *MongoStorage) InsertInitialLeader(leaderID string) (bool, common.Sy err := store.insert(leader, doc) if err != nil { - if !mgo.IsDup(err) { + if !mongo.IsDuplicateKeyError(err) { return false, &Error{fmt.Sprintf("Failed to insert document into syncLeaderElection collection. Error: %s\n", err)} } return false, nil @@ -2026,10 +2044,10 @@ func (store *MongoStorage) LeaderPeriodicUpdate(leaderID string) (bool, common.S for i := 0; i < maxUpdateTries; i++ { err = store.update(leader, bson.M{"_id": 1, "uuid": leaderID}, - bson.M{"$currentDate": bson.M{"last-heartbeat-ts": bson.M{"$type": "timestamp"}}}, + bson.M{"$currentDate": bson.M{"last-heartbeat-ts": bson.M{"$type": "date"}}}, ) if err != nil { - if err == mgo.ErrNotFound { + if IsNotFound(err) { time.Sleep(time.Duration(sleepInMS) * time.Millisecond) continue } @@ -2049,12 +2067,12 @@ func (store *MongoStorage) RetrieveLeader() (string, int32, time.Time, int64, co doc := leaderDocument{} err := store.fetchOne(leader, bson.M{"_id": 1}, nil, &doc) if err != nil { - if err == mgo.ErrNotFound { + if err == mongo.ErrNoDocuments { return "", 0, time.Now(), 0, &NotFound{} } return "", 0, time.Now(), 0, &Error{fmt.Sprintf("Failed to fetch the document in the syncLeaderElection collection. Error: %s", err)} } - return doc.UUID, doc.HeartbeatTimeout, doc.LastHeartbeatTS.Time(), doc.Version, nil + return doc.UUID, doc.HeartbeatTimeout, doc.LastHeartbeatTS, doc.Version, nil } // UpdateLeader updates the leader entry for a leadership takeover @@ -2062,7 +2080,7 @@ func (store *MongoStorage) UpdateLeader(leaderID string, version int64) (bool, c err := store.update(leader, bson.M{"_id": 1, "version": version}, bson.M{ - "$currentDate": bson.M{"last-heartbeat-ts": bson.M{"$type": "timestamp"}}, + "$currentDate": bson.M{"last-heartbeat-ts": bson.M{"$type": "date"}}, "$set": bson.M{ "uuid": leaderID, "heartbeat-timeout": common.Configuration.LeadershipTimeout, @@ -2071,7 +2089,7 @@ func (store *MongoStorage) UpdateLeader(leaderID string, version int64) (bool, c }, ) if err != nil { - if err != mgo.ErrNotFound { + if !IsNotFound(err) { // Only complain if someone else didn't steal the leadership return false, &Error{fmt.Sprintf("Failed to update the document in the syncLeaderElection collection. Error: %s\n", err)} } @@ -2082,11 +2100,9 @@ func (store *MongoStorage) UpdateLeader(leaderID string, version int64) (bool, c // ResignLeadership causes this sync service to give up the Leadership func (store *MongoStorage) ResignLeadership(leaderID string) common.SyncServiceError { - timestamp, err := bson.NewMongoTimestamp(time.Date(1970, 1, 1, 0, 0, 0, 0, time.UTC), 1) - if err != nil { - return err - } - err = store.update(leader, + timestamp := time.Date(1970, 1, 1, 0, 0, 0, 0, time.UTC) + + err := store.update(leader, bson.M{"_id": 1, "uuid": leaderID}, bson.M{ "$set": bson.M{ @@ -2094,7 +2110,7 @@ func (store *MongoStorage) ResignLeadership(leaderID string) common.SyncServiceE }, }, ) - if err != nil && mgo.ErrNotFound != err { + if err != nil && !IsNotFound(err) { return &Error{fmt.Sprintf("Failed to update the document in the syncLeaderElection collection. Error: %s\n", err)} } @@ -2103,8 +2119,28 @@ func (store *MongoStorage) ResignLeadership(leaderID string) common.SyncServiceE // RetrieveTimeOnServer retrieves the current time on the database server func (store *MongoStorage) RetrieveTimeOnServer() (time.Time, error) { + /* + > db.runCommand( { isMaster: 1 } ) + { + ismaster: true, + topologyVersion: { + processId: ObjectId('66425e524a6e53adeab8efca'), + counter: Long('0') + }, + maxBsonObjectSize: 16777216, + maxMessageSizeBytes: 48000000, + maxWriteBatchSize: 100000, + localTime: ISODate('2024-05-15T19:13:13.097Z'), + logicalSessionTimeoutMinutes: 30, + connectionId: 523, + minWireVersion: 0, + maxWireVersion: 17, + readOnly: false, + ok: 1 + } + */ result := isMasterResult{} - err := store.run("isMaster", &result) + err := store.run(bson.D{{"isMaster", 1}}, &result) if err == nil && !result.OK { err = &Error{"Failed running isMaster command on MongoDB server"} } @@ -2113,8 +2149,8 @@ func (store *MongoStorage) RetrieveTimeOnServer() (time.Time, error) { // StoreOrgToMessagingGroup inserts organization to messaging groups table func (store *MongoStorage) StoreOrgToMessagingGroup(orgID string, messagingGroup string) common.SyncServiceError { - object := messagingGroupObject{ID: orgID, GroupName: messagingGroup} - err := store.upsert(messagingGroups, bson.M{"_id": orgID}, object) + mg := bson.M{"$set": bson.M{"_id": orgID, "group-name": messagingGroup, "last-update": primitive.NewDateTimeFromTime(time.Now())}} + err := store.upsert(messagingGroups, bson.M{"_id": orgID}, mg) if err != nil { return &Error{fmt.Sprintf("Failed to store organization's messaging group. Error: %s.", err)} } @@ -2123,7 +2159,7 @@ func (store *MongoStorage) StoreOrgToMessagingGroup(orgID string, messagingGroup // DeleteOrgToMessagingGroup deletes organization from messaging groups table func (store *MongoStorage) DeleteOrgToMessagingGroup(orgID string) common.SyncServiceError { - if err := store.removeAll(messagingGroups, bson.M{"_id": orgID}); err != nil && err != mgo.ErrNotFound { + if err := store.removeAll(messagingGroups, bson.M{"_id": orgID}); err != nil && err != mongo.ErrNoDocuments { return err } return nil @@ -2133,7 +2169,7 @@ func (store *MongoStorage) DeleteOrgToMessagingGroup(orgID string) common.SyncSe func (store *MongoStorage) RetrieveMessagingGroup(orgID string) (string, common.SyncServiceError) { result := messagingGroupObject{} if err := store.fetchOne(messagingGroups, bson.M{"_id": orgID}, nil, &result); err != nil { - if err != mgo.ErrNotFound { + if err != mongo.ErrNoDocuments { return "", err } return "", nil @@ -2142,14 +2178,11 @@ func (store *MongoStorage) RetrieveMessagingGroup(orgID string) (string, common. } // RetrieveUpdatedMessagingGroups retrieves messaging groups that were updated after the specified time -func (store *MongoStorage) RetrieveUpdatedMessagingGroups(time time.Time) ([]common.MessagingGroup, +func (store *MongoStorage) RetrieveUpdatedMessagingGroups(timeToCheck time.Time) ([]common.MessagingGroup, common.SyncServiceError) { - timestamp, err := bson.NewMongoTimestamp(time, 1) - if err != nil { - return nil, err - } + result := []messagingGroupObject{} - if err := store.fetchAll(messagingGroups, bson.M{"last-update": bson.M{"$gte": timestamp}}, nil, &result); err != nil { + if err := store.fetchAll(messagingGroups, bson.M{"last-update": bson.M{"$gte": timeToCheck}}, nil, &result); err != nil { return nil, err } groups := make([]common.MessagingGroup, 0) @@ -2165,15 +2198,15 @@ func (store *MongoStorage) DeleteOrganization(orgID string) common.SyncServiceEr return err } - if err := store.removeAll(destinations, bson.M{"destination.destination-org-id": orgID}); err != nil && err != mgo.ErrNotFound { + if err := store.removeAll(destinations, bson.M{"destination.destination-org-id": orgID}); err != nil && err != mongo.ErrNoDocuments { return &Error{fmt.Sprintf("Failed to delete destinations. Error: %s.", err)} } - if err := store.removeAll(notifications, bson.M{"notification.destination-org-id": orgID}); err != nil && err != mgo.ErrNotFound { + if err := store.removeAll(notifications, bson.M{"notification.destination-org-id": orgID}); err != nil && err != mongo.ErrNoDocuments { return &Error{fmt.Sprintf("Failed to delete notifications. Error: %s.", err)} } - if err := store.removeAll(acls, bson.M{"org-id": orgID}); err != nil && err != mgo.ErrNotFound { + if err := store.removeAll(acls, bson.M{"org-id": orgID}); err != nil && err != mongo.ErrNoDocuments { return &Error{fmt.Sprintf("Failed to delete ACLs. Error: %s.", err)} } @@ -2181,14 +2214,14 @@ func (store *MongoStorage) DeleteOrganization(orgID string) common.SyncServiceEr ID string `bson:"_id"` } results := []idstruct{} - if err := store.fetchAll(objects, bson.M{"metadata.destination-org-id": orgID}, bson.M{"_id": bson.ElementString}, &results); err != nil && err != mgo.ErrNotFound { + if err := store.fetchAll(objects, bson.M{"metadata.destination-org-id": orgID}, bson.M{"_id": 1}, &results); err != nil && err != mongo.ErrNoDocuments { return &Error{fmt.Sprintf("Failed to fetch objects to delete. Error: %s.", err)} } for _, result := range results { store.removeFile(result.ID) } - if err := store.removeAll(objects, bson.M{"metadata.destination-org-id": orgID}); err != nil && err != mgo.ErrNotFound { + if err := store.removeAll(objects, bson.M{"metadata.destination-org-id": orgID}); err != nil && err != mongo.ErrNoDocuments { return &Error{fmt.Sprintf("Failed to delete objects. Error: %s.", err)} } @@ -2203,34 +2236,37 @@ func (store *MongoStorage) IsConnected() bool { // StoreOrganization stores organization information // Returns the stored record timestamp for multiple CSS updates func (store *MongoStorage) StoreOrganization(org common.Organization) (time.Time, common.SyncServiceError) { - object := organizationObject{ID: org.OrgID, Organization: org} - err := store.upsert(organizations, bson.M{"_id": org.OrgID}, object) + + timestamp := time.Now() + objectBson := bson.M{"$set": bson.M{"_id": org.OrgID, "org": org, "last-update": primitive.NewDateTimeFromTime(timestamp)}} + err := store.upsert(organizations, bson.M{"_id": org.OrgID}, objectBson) if err != nil { - return time.Now(), &Error{fmt.Sprintf("Failed to store organization's info. Error: %s.", err)} + return timestamp, &Error{fmt.Sprintf("Failed to store organization's info. Error: %s.", err)} } + object := organizationObject{} if err := store.fetchOne(organizations, bson.M{"_id": org.OrgID}, nil, &object); err != nil { - return time.Now(), err + return timestamp, err } - return object.LastUpdate.Time(), nil + return object.LastUpdate, nil } // RetrieveOrganizationInfo retrieves organization information func (store *MongoStorage) RetrieveOrganizationInfo(orgID string) (*common.StoredOrganization, common.SyncServiceError) { result := organizationObject{} if err := store.fetchOne(organizations, bson.M{"_id": orgID}, nil, &result); err != nil { - if err != mgo.ErrNotFound { + if err != mongo.ErrNoDocuments { return nil, err } return nil, nil } - return &common.StoredOrganization{Org: result.Organization, Timestamp: result.LastUpdate.Time()}, nil + return &common.StoredOrganization{Org: result.Organization, Timestamp: result.LastUpdate}, nil } // DeleteOrganizationInfo deletes organization information func (store *MongoStorage) DeleteOrganizationInfo(orgID string) common.SyncServiceError { - if err := store.removeAll(organizations, bson.M{"_id": orgID}); err != nil && err != mgo.ErrNotFound { + if err := store.removeAll(organizations, bson.M{"_id": orgID}); err != nil && err != mongo.ErrNoDocuments { return err } return nil @@ -2239,29 +2275,29 @@ func (store *MongoStorage) DeleteOrganizationInfo(orgID string) common.SyncServi // RetrieveOrganizations retrieves stored organizations' info func (store *MongoStorage) RetrieveOrganizations() ([]common.StoredOrganization, common.SyncServiceError) { result := []organizationObject{} - if err := store.fetchAll(organizations, nil, nil, &result); err != nil { + query := bson.M{} + if err := store.fetchAll(organizations, query, nil, &result); err != nil { return nil, err } orgs := make([]common.StoredOrganization, 0) for _, org := range result { - orgs = append(orgs, common.StoredOrganization{Org: org.Organization, Timestamp: org.LastUpdate.Time()}) + orgs = append(orgs, common.StoredOrganization{Org: org.Organization, Timestamp: org.LastUpdate}) } return orgs, nil } // RetrieveUpdatedOrganizations retrieves organizations that were updated after the specified time -func (store *MongoStorage) RetrieveUpdatedOrganizations(time time.Time) ([]common.StoredOrganization, common.SyncServiceError) { - timestamp, err := bson.NewMongoTimestamp(time, 1) - if err != nil { - return nil, err - } +func (store *MongoStorage) RetrieveUpdatedOrganizations(timevalue time.Time) ([]common.StoredOrganization, common.SyncServiceError) { + //timestamp, err := bson.NewMongoTimestamp(timevalue, 1) + //timestamp := primitive.Timestamp{T: uint32(timevalue.Unix())} + result := []organizationObject{} - if err := store.fetchAll(organizations, bson.M{"last-update": bson.M{"$gte": timestamp}}, nil, &result); err != nil { + if err := store.fetchAll(organizations, bson.M{"last-update": bson.M{"$gte": timevalue}}, nil, &result); err != nil { return nil, err } orgs := make([]common.StoredOrganization, 0) for _, org := range result { - orgs = append(orgs, common.StoredOrganization{Org: org.Organization, Timestamp: org.LastUpdate.Time()}) + orgs = append(orgs, common.StoredOrganization{Org: org.Organization, Timestamp: org.LastUpdate}) } return orgs, nil } diff --git a/core/storage/mongoStorageHelpers.go b/core/storage/mongoStorageHelpers.go index 05d1cf0..adf8332 100644 --- a/core/storage/mongoStorageHelpers.go +++ b/core/storage/mongoStorageHelpers.go @@ -1,28 +1,30 @@ package storage import ( + "bytes" + "context" "fmt" "io" "strings" "time" - "github.com/globalsign/mgo" - "github.com/globalsign/mgo/bson" "github.com/open-horizon/edge-sync-service/common" "github.com/open-horizon/edge-utilities/logger" "github.com/open-horizon/edge-utilities/logger/log" "github.com/open-horizon/edge-utilities/logger/trace" + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/bson/primitive" + "go.mongodb.org/mongo-driver/mongo" + "go.mongodb.org/mongo-driver/mongo/gridfs" + "go.mongodb.org/mongo-driver/mongo/options" ) -func (store *MongoStorage) getSession() *mgo.Session { - if store.cacheSize < 2 { - return store.session - } +func (store *MongoStorage) getMongoClient() *mongo.Client { + // Need lock?? store.lock() - session := store.sessionCache[store.cacheIndex] - store.cacheIndex = (store.cacheIndex + 1) % store.cacheSize + client := store.client store.unLock() - return session + return client } func (store *MongoStorage) checkObjects() { @@ -32,20 +34,20 @@ func (store *MongoStorage) checkObjects() { currentTime := time.Now().UTC().Format(time.RFC3339) query := bson.M{ - "$and": []bson.M{ + "$and": bson.A{ bson.M{"metadata.expiration": bson.M{"$ne": ""}}, bson.M{"metadata.expiration": bson.M{"$lte": currentTime}}, - bson.M{"$or": []bson.M{ + bson.M{"$or": bson.A{ bson.M{"status": common.NotReadyToSend}, bson.M{"status": common.ReadyToSend}, bson.M{"status": common.Verifying}, bson.M{"status": common.VerificationFailed}}}}, } - selector := bson.M{"metadata": bson.ElementDocument, "last-update": bson.ElementTimestamp} + selector := bson.M{"metadata": 1, "last-update": 1} result := []object{} if err := store.fetchAll(objects, query, selector, &result); err != nil { - if err != mgo.ErrNotFound && log.IsLogging(logger.ERROR) { + if err != mongo.ErrNoDocuments && log.IsLogging(logger.ERROR) { log.Error("Error in mongoStorage.checkObjects: failed to remove expired objects. Error: %s\n", err) } return @@ -64,18 +66,19 @@ func (store *MongoStorage) checkObjects() { } } -func (store *MongoStorage) deleteObject(orgID string, objectType string, objectID string, timestamp bson.MongoTimestamp) common.SyncServiceError { +func (store *MongoStorage) deleteObject(orgID string, objectType string, objectID string, timestamp time.Time) common.SyncServiceError { id := createObjectCollectionID(orgID, objectType, objectID) if trace.IsLogging(logger.TRACE) { trace.Trace("Deleting object %s\n", id) } query := bson.M{"_id": id} - if timestamp != -1 { + if !timestamp.IsZero() { query = bson.M{"_id": id, "last-update": timestamp} } + if err := store.removeAll(objects, query); err != nil { - if err == mgo.ErrNotFound && timestamp != -1 { + if err == mongo.ErrNoDocuments && !timestamp.IsZero() { return nil } return &Error{fmt.Sprintf("Failed to delete object. Error: %s.", err)} @@ -89,67 +92,39 @@ func (store *MongoStorage) deleteObject(orgID string, objectType string, objectI return nil } -func (store *MongoStorage) copyDataToFile(id string, dataReader io.Reader, isFirstChunk bool, isLastChunk bool) (fileHanlde *fileHandle, - written int64, err common.SyncServiceError) { - if isFirstChunk { - store.removeFile(id) - fileHanlde, err = store.createFile(id) - } else { - fileHanlde = store.getFileHandle(id) - if fileHanlde == nil { - err = &Error{fmt.Sprintf("Failed to append the data, the file doesn't exist.")} - return - } - } +// append data stream to mongodb data +func (store *MongoStorage) copyDataToFile(id string, dataReader io.Reader) (err common.SyncServiceError) { + store.removeFile(id) + err = store.createFile(id, dataReader) if err != nil { err = &Error{fmt.Sprintf("Failed to create file to store the data. Error: %s.", err)} return } - written, err = io.Copy(fileHanlde.file, dataReader) - if err != nil { - err = &Error{fmt.Sprintf("Failed to write the data to the file. Error: %s.", err)} - return - } - if isLastChunk { - if err = fileHanlde.file.Close(); err != nil { - err = &Error{fmt.Sprintf("Failed to close the file. Error: %s.", err)} - return - } - store.deleteFileHandle(id) - } + return } +// stores the data bytes into mongodb func (store *MongoStorage) storeDataInFile(id string, data []byte) common.SyncServiceError { store.removeFile(id) - fileHanlde, err := store.createFile(id) - if err != nil { + br := bytes.NewReader(data) + if err := store.createFile(id, br); err != nil { return &Error{fmt.Sprintf("Failed to create file to store the data. Error: %s.", err)} } - n, err := fileHanlde.file.Write(data) - if err != nil { - return &Error{fmt.Sprintf("Failed to write the data to the file. Error: %s.", err)} - } - if n != len(data) { - return &Error{fmt.Sprintf("Failed to write all the data: wrote %d instead of %d.", n, len(data))} - } - if err = fileHanlde.file.Close(); err != nil { - return &Error{fmt.Sprintf("Failed to close the file. Error: %s.", err)} - } return nil } func (store *MongoStorage) retrievePolicies(query interface{}) ([]common.ObjectDestinationPolicy, common.SyncServiceError) { results := []object{} - selectedFields := bson.M{"metadata.destination-org-id": bson.ElementString, - "metadata.object-type": bson.ElementString, "metadata.object-id": bson.ElementString, - "metadata.destination-policy": bson.ElementDocument, - "destinations": bson.ElementArray, + selectedFields := bson.M{"metadata.destination-org-id": 1, + "metadata.object-type": 1, "metadata.object-id": 1, + "metadata.destination-policy": 1, + "destinations": 1, } if err := store.fetchAll(objects, query, selectedFields, &results); err != nil { switch err { - case mgo.ErrNotFound: + case mongo.ErrNoDocuments: return nil, nil default: return nil, &Error{fmt.Sprintf("Failed to fetch the objects with a Destination Policy. Error: %s", err)} @@ -174,8 +149,8 @@ func (store *MongoStorage) retrievePolicies(query interface{}) ([]common.ObjectD } func (store *MongoStorage) removeAll(collectionName string, query interface{}) common.SyncServiceError { - function := func(collection *mgo.Collection) error { - _, err := collection.RemoveAll(query) + function := func(collection *mongo.Collection) error { + _, err := collection.DeleteMany(context.TODO(), query) return err } @@ -191,8 +166,16 @@ func (store *MongoStorage) removeAll(collectionName string, query interface{}) c } func (store *MongoStorage) fetchAll(collectionName string, query interface{}, selector interface{}, result interface{}) common.SyncServiceError { - function := func(collection *mgo.Collection) error { - return collection.Find(query).Select(selector).All(result) + function := func(collection *mongo.Collection) error { + opts := options.Find().SetProjection(selector) + // selector looks like: bson.D{{"field1", 1}, {"field2", 1}}, 1 means include + + cursor, err := collection.Find(context.TODO(), query, opts) + if err != nil { + return err + } + + return cursor.All(context.TODO(), result) } retry, err := store.withCollectionHelper(collectionName, function, true) @@ -207,8 +190,9 @@ func (store *MongoStorage) fetchAll(collectionName string, query interface{}, se } func (store *MongoStorage) fetchOne(collectionName string, query interface{}, selector interface{}, result interface{}) common.SyncServiceError { - function := func(collection *mgo.Collection) error { - return collection.Find(query).Select(selector).One(result) + function := func(collection *mongo.Collection) error { + opts := options.FindOne() + return collection.FindOne(context.TODO(), query, opts).Decode(result) } retry, err := store.withCollectionHelper(collectionName, function, true) @@ -222,9 +206,16 @@ func (store *MongoStorage) fetchOne(collectionName string, query interface{}, se return nil } -func (store *MongoStorage) update(collectionName string, selector interface{}, update interface{}) common.SyncServiceError { - function := func(collection *mgo.Collection) error { - return collection.Update(selector, update) +func (store *MongoStorage) update(collectionName string, filter interface{}, update interface{}) common.SyncServiceError { + function := func(collection *mongo.Collection) error { + opts := options.Update() + updatedResult, err := collection.UpdateOne(context.TODO(), filter, update, opts) + v := int64(0) + if updatedResult.MatchedCount == v { + return &NotFound{} + } + + return err } retry, err := store.withCollectionHelper(collectionName, function, false) @@ -233,14 +224,15 @@ func (store *MongoStorage) update(collectionName string, selector interface{}, u } if retry { - return store.update(collectionName, selector, update) + return store.update(collectionName, filter, update) } return nil } -func (store *MongoStorage) upsert(collectionName string, selector interface{}, update interface{}) common.SyncServiceError { - function := func(collection *mgo.Collection) error { - _, err := collection.Upsert(selector, update) +func (store *MongoStorage) upsert(collectionName string, filter interface{}, update interface{}) common.SyncServiceError { + function := func(collection *mongo.Collection) error { + opts := options.Update().SetUpsert(true) + _, err := collection.UpdateOne(context.TODO(), filter, update, opts) return err } @@ -250,14 +242,15 @@ func (store *MongoStorage) upsert(collectionName string, selector interface{}, u } if retry { - return store.upsert(collectionName, selector, update) + return store.upsert(collectionName, filter, update) } return nil } func (store *MongoStorage) insert(collectionName string, doc interface{}) common.SyncServiceError { - function := func(collection *mgo.Collection) error { - return collection.Insert(doc) + function := func(collection *mongo.Collection) error { + _, err := collection.InsertOne(context.TODO(), doc) + return err } retry, err := store.withCollectionHelper(collectionName, function, false) @@ -271,11 +264,10 @@ func (store *MongoStorage) insert(collectionName string, doc interface{}) common return nil } -func (store *MongoStorage) count(collectionName string, selector interface{}) (uint32, common.SyncServiceError) { +func (store *MongoStorage) count(collectionName string, filter interface{}) (uint32, common.SyncServiceError) { var count uint32 - function := func(collection *mgo.Collection) error { - var err error - countInt, err := collection.Find(selector).Count() + function := func(collection *mongo.Collection) error { + countInt, err := collection.CountDocuments(context.TODO(), filter) count = uint32(countInt) return err } @@ -286,14 +278,51 @@ func (store *MongoStorage) count(collectionName string, selector interface{}) (u } if retry { - return store.count(collectionName, selector) + return store.count(collectionName, filter) } return count, nil } +func (store *MongoStorage) getFileInfo(id string) (*gridfsFile, common.SyncServiceError) { + filter := bson.D{{"filename", id}} + + if store.gridfsBucket == nil { + gridfsBucket, err := gridfs.NewBucket(store.database) + if err != nil { + return nil, err + } + store.gridfsBucket = gridfsBucket + } + cursor, err := store.gridfsBucket.Find(filter) + if err != nil { + return nil, err + } + var foundFiles []gridfsFile + if err = cursor.All(context.TODO(), &foundFiles); err != nil { + return nil, err + } else if len(foundFiles) == 0 { + return nil, &NotFound{fmt.Sprintf("File %v not found in mongo db", id)} + } + + return &foundFiles[0], nil +} + func (store *MongoStorage) removeFile(id string) common.SyncServiceError { - function := func(db *mgo.Database) error { - return db.GridFS("fs").Remove(id) + function := func(db *mongo.Database) error { + file, err := store.getFileInfo(id) + if err != nil { + return err + } + + gridfsBucket, err := gridfs.NewBucket(db) + if err != nil { + return err + } + dbId, err := primitive.ObjectIDFromHex(file.Id) + if err != nil { + return err + } + return gridfsBucket.Delete(dbId) } retry, err := store.withDBHelper(function, false) @@ -308,12 +337,12 @@ func (store *MongoStorage) removeFile(id string) common.SyncServiceError { return nil } -func (store *MongoStorage) openFile(id string) (*fileHandle, common.SyncServiceError) { - function := func(db *mgo.Database) (*mgo.GridFile, error) { - return db.GridFS("fs").Open(id) +func (store *MongoStorage) openFile(id string) (*gridfs.DownloadStream, common.SyncServiceError) { + function := func(db *mongo.Database) (*gridfs.DownloadStream, error) { + return store.gridfsBucket.OpenDownloadStreamByName(id) } - file, session, retry, err := store.withDBAndReturnHelper(function, true) + downloadStream, retry, err := store.withDBAndReturnHelper(function, true) if err != nil { return nil, err } @@ -322,29 +351,42 @@ func (store *MongoStorage) openFile(id string) (*fileHandle, common.SyncServiceE return store.openFile(id) } - return &fileHandle{file, session, 0, nil}, nil + return downloadStream, nil } -func (store *MongoStorage) createFile(id string) (*fileHandle, common.SyncServiceError) { - function := func(db *mgo.Database) (*mgo.GridFile, error) { - return db.GridFS("fs").Create(id) +// Save file into mongo gridFS +func (store *MongoStorage) createFile(id string, data io.Reader) common.SyncServiceError { + + function := func(db *mongo.Database) (*gridfs.DownloadStream, error) { + var err error + bucket := store.gridfsBucket + if bucket == nil { + if bucket, err = gridfs.NewBucket(db); err != nil { + return nil, err + } + } + + uploadOpts := options.GridFSUpload().SetChunkSizeBytes(int32(common.Configuration.MaxDataChunkSize)) + // filename of the object in fs.File is the value of id + _, err = bucket.UploadFromStream(id, data, uploadOpts) + return nil, err } - file, session, retry, err := store.withDBAndReturnHelper(function, false) + _, retry, err := store.withDBAndReturnHelper(function, false) if err != nil { - return nil, err + return err } if retry { - return store.createFile(id) + return store.createFile(id, data) } - file.SetChunkSize(common.Configuration.MaxDataChunkSize) - return &fileHandle{file, session, 0, nil}, nil + + return nil } func (store *MongoStorage) run(cmd interface{}, result interface{}) common.SyncServiceError { - function := func(db *mgo.Database) error { - return db.Run(cmd, result) + function := func(db *mongo.Database) error { + return db.RunCommand(context.TODO(), cmd).Decode(&result) } retry, err := store.withDBHelper(function, true) @@ -358,40 +400,55 @@ func (store *MongoStorage) run(cmd interface{}, result interface{}) common.SyncS return nil } -func (store *MongoStorage) withDBHelper(function func(*mgo.Database) error, isRead bool) (bool, common.SyncServiceError) { +func (store *MongoStorage) withDBHelper(function func(*mongo.Database) error, isRead bool) (bool, common.SyncServiceError) { if !store.connected { return false, &NotConnected{"Disconnected from the database"} } - session := store.getSession() - db := session.DB(common.Configuration.MongoDbName) + mongoClient := store.getMongoClient() + db := mongoClient.Database(common.Configuration.MongoDbName) err := function(db) - - if err == nil || err == mgo.ErrNotFound || err == mgo.ErrCursor || mgo.IsDup(err) { + if err == nil || err == mongo.ErrNoDocuments || err == mongo.ErrNilCursor || mongo.IsDuplicateKeyError(err) || IsNotFound(err) { return false, err } - pingErr := session.Ping() + + pingErr := mongoClient.Ping(context.Background(), nil) if pingErr == nil { if isRead { common.HealthStatus.DBReadFailed() } else { common.HealthStatus.DBWriteFailed() } + return false, pingErr + } + + // reach here if has ping err + ctx, cancel := context.WithTimeout(context.Background(), time.Duration(20*time.Second)) + defer cancel() + mongoClient, err = mongo.Connect(ctx, store.clientConnectOpts) + if err != nil { return false, err } - session.Refresh() - pingErr = session.Ping() + pingErr = mongoClient.Ping(context.Background(), nil) if pingErr == nil { - db := session.DB(common.Configuration.MongoDbName) - err := function(db) - if err == nil || err == mgo.ErrNotFound || err == mgo.ErrCursor || mgo.IsDup(err) { - return false, err + db := mongoClient.Database(common.Configuration.MongoDbName) + store.database = db + gridfsBucket, err := gridfs.NewBucket(db) + if err != nil { + return false, nil + } + store.gridfsBucket = gridfsBucket + err = function(db) + if err == nil { + return false, nil } - if isRead { - common.HealthStatus.DBReadFailed() - } else { - common.HealthStatus.DBWriteFailed() + if err == nil || err == mongo.ErrNoDocuments || err == mongo.ErrNilCursor || mongo.IsDuplicateKeyError(err) || IsNotFound(err) { + if isRead { + common.HealthStatus.DBReadFailed() + } else { + common.HealthStatus.DBWriteFailed() + } } return false, err } @@ -402,89 +459,117 @@ func (store *MongoStorage) withDBHelper(function func(*mgo.Database) error, isRe return false, &NotConnected{"Disconnected from the database"} } -func (store *MongoStorage) withDBAndReturnHelper(function func(*mgo.Database) (*mgo.GridFile, error), isRead bool) (*mgo.GridFile, - *mgo.Session, bool, common.SyncServiceError) { +func (store *MongoStorage) withDBAndReturnHelper(function func(*mongo.Database) (*gridfs.DownloadStream, error), isRead bool) (*gridfs.DownloadStream, bool, common.SyncServiceError) { if !store.connected { - return nil, nil, false, &NotConnected{"Disconnected from the database"} + return nil, false, &NotConnected{"Disconnected from the database"} } - session := store.getSession() - db := session.DB(common.Configuration.MongoDbName) - file, err := function(db) + mongoClient := store.getMongoClient() + db := mongoClient.Database(common.Configuration.MongoDbName) + + fileHandler, err := function(db) if err == nil { - return file, session, false, nil + return fileHandler, false, nil } - if err == mgo.ErrNotFound || err == mgo.ErrCursor || mgo.IsDup(err) { - return nil, nil, false, err + if err == mongo.ErrNoDocuments || err == mongo.ErrNilCursor || mongo.IsDuplicateKeyError(err) || IsNotFound(err) { + return nil, false, err } - pingErr := session.Ping() + pingErr := mongoClient.Ping(context.Background(), nil) if pingErr == nil { if isRead { common.HealthStatus.DBReadFailed() } else { common.HealthStatus.DBWriteFailed() } - return nil, nil, false, err + return nil, false, pingErr + } + + // reach here if has ping err + ctx, cancel := context.WithTimeout(context.Background(), time.Duration(20*time.Second)) + defer cancel() + mongoClient, err = mongo.Connect(ctx, store.clientConnectOpts) + if err != nil { + return nil, false, err } - session.Refresh() - pingErr = session.Ping() + pingErr = mongoClient.Ping(context.Background(), nil) if pingErr == nil { - db := session.DB(common.Configuration.MongoDbName) - file, err := function(db) + db := mongoClient.Database(common.Configuration.MongoDbName) + store.database = db + gridfsBucket, err := gridfs.NewBucket(db) + if err != nil { + return nil, false, err + } + store.gridfsBucket = gridfsBucket + + fileHandler, err := function(db) if err == nil { - return file, session, false, nil + return fileHandler, false, nil } - if err != mgo.ErrNotFound && err != mgo.ErrCursor || mgo.IsDup(err) { + if err != mongo.ErrNoDocuments && !IsNotFound(err) && err != mongo.ErrNilCursor || mongo.IsDuplicateKeyError(err) { if isRead { common.HealthStatus.DBReadFailed() } else { common.HealthStatus.DBWriteFailed() } } - return nil, nil, false, err + return nil, false, err } if connected := store.reconnect(true); connected { - return nil, nil, true, nil + return nil, true, nil } - return nil, nil, false, &NotConnected{"Disconnected from the database"} + return nil, false, &NotConnected{"Disconnected from the database"} } -func (store *MongoStorage) withCollectionHelper(collectionName string, function func(*mgo.Collection) error, isRead bool) (bool, +func (store *MongoStorage) withCollectionHelper(collectionName string, function func(*mongo.Collection) error, isRead bool) (bool, common.SyncServiceError) { if !store.connected { return false, &NotConnected{"Disconnected from the database"} } - session := store.getSession() - collection := session.DB(common.Configuration.MongoDbName).C(collectionName) - + mongoClient := store.getMongoClient() + collection := mongoClient.Database(common.Configuration.MongoDbName).Collection(collectionName) err := function(collection) - - if err == nil || err == mgo.ErrNotFound || err == mgo.ErrCursor || mgo.IsDup(err) { + if err == nil || err == mongo.ErrNoDocuments || err == mongo.ErrNilCursor || mongo.IsDuplicateKeyError(err) || IsNotFound(err) { return false, err } - pingErr := session.Ping() + + pingErr := mongoClient.Ping(context.Background(), nil) if pingErr == nil { if isRead { common.HealthStatus.DBReadFailed() } else { common.HealthStatus.DBWriteFailed() } + return false, pingErr + } + + // reach here if has ping err + ctx, cancel := context.WithTimeout(context.Background(), time.Duration(20*time.Second)) + defer cancel() + mongoClient, err = mongo.Connect(ctx, store.clientConnectOpts) + if err != nil { return false, err } - session.Refresh() - pingErr = session.Ping() + pingErr = mongoClient.Ping(context.Background(), nil) + if pingErr == nil { - collection := session.DB(common.Configuration.MongoDbName).C(collectionName) - err := function(collection) - if err == nil || err == mgo.ErrNotFound || err == mgo.ErrCursor || mgo.IsDup(err) { - return false, err + db := mongoClient.Database(common.Configuration.MongoDbName) + store.database = db + gridfsBucket, err := gridfs.NewBucket(db) + if err != nil { + return false, nil } - if isRead { - common.HealthStatus.DBReadFailed() - } else { - common.HealthStatus.DBWriteFailed() + store.gridfsBucket = gridfsBucket + collection = db.Collection(collectionName) + + err = function(collection) + if err == nil || err == mongo.ErrNoDocuments || IsNotFound(err) || err == mongo.ErrNilCursor || mongo.IsDuplicateKeyError(err) { + if isRead { + common.HealthStatus.DBReadFailed() + } else { + common.HealthStatus.DBWriteFailed() + } } return false, err } @@ -492,6 +577,7 @@ func (store *MongoStorage) withCollectionHelper(collectionName string, function if connected := store.reconnect(true); connected { return true, nil } + return false, &NotConnected{"Disconnected from the database"} } @@ -510,7 +596,8 @@ func (store *MongoStorage) reconnect(timeout bool) bool { return false } - pingErr := store.session.Ping() + c := store.getMongoClient() + pingErr := c.Ping(context.Background(), nil) if pingErr == nil { store.connected = true return true @@ -526,11 +613,14 @@ func (store *MongoStorage) reconnect(timeout bool) bool { log.Error("Disconnected from the database") } - var session *mgo.Session + var mongoClient *mongo.Client var dialErr error + ctx, cancel := context.WithTimeout(context.Background(), time.Duration(20*time.Second)) + defer cancel() + for i := 0; i < 3; { - session, dialErr = mgo.DialWithInfo(store.dialInfo) - if dialErr == nil && session != nil { + mongoClient, dialErr = mongo.Connect(ctx, store.clientConnectOpts) + if dialErr == nil && mongoClient != nil { break } if timeout { @@ -538,20 +628,13 @@ func (store *MongoStorage) reconnect(timeout bool) bool { } } - if dialErr != nil || session == nil { + if dialErr != nil || mongoClient == nil { go store.reconnect(false) return false } - session.SetSafe(&mgo.Safe{}) - store.session = session + store.client = mongoClient store.connected = true - if store.cacheSize > 1 { - for i := 0; i < store.cacheSize; i++ { - store.sessionCache[i].Close() - store.sessionCache[i] = store.session.Copy() - } - } common.HealthStatus.ReconnectedToDatabase() @@ -573,25 +656,6 @@ func (store *MongoStorage) unLock() { store.lockChannel <- 1 } -func (store *MongoStorage) getFileHandle(id string) (fH *fileHandle) { - <-store.mapLock - fH = store.openFiles[id] - store.mapLock <- 1 - return -} - -func (store *MongoStorage) putFileHandle(id string, fH *fileHandle) { - <-store.mapLock - store.openFiles[id] = fH - store.mapLock <- 1 -} - -func (store *MongoStorage) deleteFileHandle(id string) { - <-store.mapLock - delete(store.openFiles, id) - store.mapLock <- 1 -} - func (store *MongoStorage) addUsersToACLHelper(collection string, aclType string, orgID string, key string, users []common.ACLentry) common.SyncServiceError { var id string if key == "" { @@ -606,14 +670,14 @@ func (store *MongoStorage) addUsersToACLHelper(collection string, aclType string result := &aclObject{} for i := 0; i < maxUpdateTries; i++ { if err := store.fetchOne(collection, bson.M{"_id": id}, nil, &result); err != nil { - if err == mgo.ErrNotFound { + if err == mongo.ErrNoDocuments { result.Users = make([]common.ACLentry, 0) result.Users = append(result.Users, users...) result.ID = id result.OrgID = orgID result.ACLType = aclType if err = store.insert(collection, result); err != nil { - if mgo.IsDup(err) { + if mongo.IsDuplicateKeyError(err) { continue } return &Error{fmt.Sprintf("Failed to insert a %s ACL. Error: %s.", aclType, err)} @@ -650,9 +714,9 @@ func (store *MongoStorage) addUsersToACLHelper(collection string, aclType string if err := store.update(collection, bson.M{"_id": id, "last-update": result.LastUpdate}, bson.M{ "$set": bson.M{"users": integratedUsernames}, - "$currentDate": bson.M{"last-update": bson.M{"$type": "timestamp"}}, + "$currentDate": bson.M{"last-update": bson.M{"$type": "date"}}, }); err != nil { - if err == mgo.ErrNotFound { + if IsNotFound(err) { continue } return &Error{fmt.Sprintf("Failed to add a %s ACL. Error: %s.", aclType, err)} @@ -690,7 +754,7 @@ func (store *MongoStorage) removeUsersFromACLHelper(collection string, aclType s if len(result.Users) == 1 { // Deleting the last username, delete the ACL if err := store.removeAll(collection, bson.M{"_id": id}); err != nil { - if err == mgo.ErrNotFound { + if err == mongo.ErrNoDocuments { return nil } return &Error{fmt.Sprintf("Failed to delete a %s ACL. Error: %s.", aclType, err)} @@ -710,9 +774,9 @@ func (store *MongoStorage) removeUsersFromACLHelper(collection string, aclType s if err := store.update(collection, bson.M{"_id": id, "last-update": result.LastUpdate}, bson.M{ "$set": bson.M{"users": result.Users}, - "$currentDate": bson.M{"last-update": bson.M{"$type": "timestamp"}}, + "$currentDate": bson.M{"last-update": bson.M{"$type": "date"}}, }); err != nil { - if err == mgo.ErrNotFound { + if IsNotFound(err) { continue } return &Error{fmt.Sprintf("Failed to delete a %s ACL. Error: %s.", aclType, err)} @@ -736,7 +800,7 @@ func (store *MongoStorage) retrieveACLHelper(collection string, aclType string, } result := &aclObject{} if err := store.fetchOne(collection, bson.M{"_id": id}, nil, &result); err != nil { - if err == mgo.ErrNotFound { + if err == mongo.ErrNoDocuments { return make([]common.ACLentry, 0), nil } return nil, err @@ -763,7 +827,7 @@ func (store *MongoStorage) retrieveACLsInOrgHelper(collection string, aclType st var docs []aclObject query := bson.M{"org-id": orgID, "acl-type": aclType} - selector := bson.M{"_id": bson.ElementString} + selector := bson.M{"_id": 1} if err := store.fetchAll(collection, query, selector, &docs); err != nil { return nil, err } @@ -789,7 +853,7 @@ func (store *MongoStorage) retrieveObjOrDestTypeForGivenACLUserHelper(collection subquery = bson.M{ "$elemMatch": bson.M{ "aclusertype": aclUserType, - "$or": []bson.M{ + "$or": bson.A{ bson.M{"username": aclUsername}, bson.M{"username": "*"}, }, @@ -800,7 +864,7 @@ func (store *MongoStorage) retrieveObjOrDestTypeForGivenACLUserHelper(collection "$elemMatch": bson.M{ "aclusertype": aclUserType, "aclrole": aclRole, - "$or": []bson.M{ + "$or": bson.A{ bson.M{"username": aclUsername}, bson.M{"username": "*"}, }, @@ -815,7 +879,7 @@ func (store *MongoStorage) retrieveObjOrDestTypeForGivenACLUserHelper(collection "users": subquery, } - selector := bson.M{"_id": bson.ElementString} + selector := bson.M{"_id": 1} if err := store.fetchAll(collection, query, selector, &docs); err != nil { return nil, err } diff --git a/core/storage/storage.go b/core/storage/storage.go index e60bb95..d9d9796 100644 --- a/core/storage/storage.go +++ b/core/storage/storage.go @@ -3,8 +3,8 @@ package storage import ( "fmt" "io" - "strings" "strconv" + "strings" "time" "github.com/open-horizon/edge-sync-service/common" @@ -388,7 +388,7 @@ func createObjectCollectionID(orgID string, objectType string, objectID string) func createTempObjectCollectionID(orgID string, objectType string, objectID string, chunkNumber int) string { chunkNum_str := strconv.FormatInt(int64(chunkNumber), 10) var strBuilder strings.Builder - strBuilder.Grow(len(orgID) + len(objectType) + len(objectID) + len("chunk") + len(chunkNum_str)+ 4) + strBuilder.Grow(len(orgID) + len(objectType) + len(objectID) + len("chunk") + len(chunkNum_str) + 4) strBuilder.WriteString(orgID) strBuilder.WriteByte(':') strBuilder.WriteString(objectType) diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..0da6787 --- /dev/null +++ b/go.mod @@ -0,0 +1,30 @@ +module github.com/open-horizon/edge-sync-service + +go 1.21 + +require ( + github.com/eclipse/paho.mqtt.golang v1.4.3 + github.com/globalsign/mgo v0.0.0-20181015135952-eeefdecb41b8 + github.com/google/uuid v1.6.0 + github.com/open-horizon/edge-utilities v0.0.0-20190711093331-0908b45a7152 + go.etcd.io/bbolt v1.3.10 + go.mongodb.org/mongo-driver v1.15.0 + golang.org/x/sync v0.7.0 +) + +require ( + github.com/golang/glog v1.2.1 // indirect + github.com/golang/snappy v0.0.1 // indirect + github.com/gorilla/websocket v1.5.0 // indirect + github.com/klauspost/compress v1.13.6 // indirect + github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe // indirect + github.com/xdg-go/pbkdf2 v1.0.0 // indirect + github.com/xdg-go/scram v1.1.2 // indirect + github.com/xdg-go/stringprep v1.0.4 // indirect + github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d // indirect + golang.org/x/crypto v0.17.0 // indirect + golang.org/x/net v0.10.0 // indirect + golang.org/x/sys v0.15.0 // indirect + golang.org/x/text v0.14.0 // indirect + gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..9d6b7d3 --- /dev/null +++ b/go.sum @@ -0,0 +1,81 @@ +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/eclipse/paho.mqtt.golang v1.4.3 h1:2kwcUGn8seMUfWndX0hGbvH8r7crgcJguQNCyp70xik= +github.com/eclipse/paho.mqtt.golang v1.4.3/go.mod h1:CSYvoAlsMkhYOXh/oKyxa8EcBci6dVkLCbo5tTC1RIE= +github.com/globalsign/mgo v0.0.0-20181015135952-eeefdecb41b8 h1:DujepqpGd1hyOd7aW59XpK7Qymp8iy83xq74fLr21is= +github.com/globalsign/mgo v0.0.0-20181015135952-eeefdecb41b8/go.mod h1:xkRDCp4j0OGD1HRkm4kmhM+pmpv3AKq5SU7GMg4oO/Q= +github.com/golang/glog v1.2.1 h1:OptwRhECazUx5ix5TTWC3EZhsZEHWcYWY4FQHTIubm4= +github.com/golang/glog v1.2.1/go.mod h1:6AhwSGph0fcJtXVM/PEHPqZlFeoLxhs7/t5UDAwmO+w= +github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4= +github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= +github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc= +github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= +github.com/klauspost/compress v1.13.6 h1:P76CopJELS0TiO2mebmnzgWaajssP/EszplttgQxcgc= +github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk= +github.com/kr/pretty v0.2.1 h1:Fmg33tUaq4/8ym9TJN1x7sLJnHVwhP33CNkpYV/7rwI= +github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= +github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= +github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= +github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe h1:iruDEfMl2E6fbMZ9s0scYfZQ84/6SPL6zC8ACM2oIL0= +github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe/go.mod h1:wL8QJuTMNUDYhXwkmfOly8iTdp5TEcJFWZD2D7SIkUc= +github.com/open-horizon/edge-utilities v0.0.0-20190711093331-0908b45a7152 h1:YEvNOMo3ANOQ3AwsU0cCcBA4nKHDLUlyUCRWk5rBf68= +github.com/open-horizon/edge-utilities v0.0.0-20190711093331-0908b45a7152/go.mod h1:YCsJWhuG0VERquI0geFKoneCSOVAyMdSmylGz5OlZdE= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= +github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= +github.com/xdg-go/pbkdf2 v1.0.0 h1:Su7DPu48wXMwC3bs7MCNG+z4FhcyEuz5dlvchbq0B0c= +github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI= +github.com/xdg-go/scram v1.1.2 h1:FHX5I5B4i4hKRVRBCFRxq1iQRej7WO3hhBuJf+UUySY= +github.com/xdg-go/scram v1.1.2/go.mod h1:RT/sEzTbU5y00aCK8UOx6R7YryM0iF1N2MOmC3kKLN4= +github.com/xdg-go/stringprep v1.0.4 h1:XLI/Ng3O1Atzq0oBs3TWm+5ZVgkq2aqdlvP9JtoZ6c8= +github.com/xdg-go/stringprep v1.0.4/go.mod h1:mPGuuIYwz7CmR2bT9j4GbQqutWS1zV24gijq1dTyGkM= +github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d h1:splanxYIlg+5LfHAM6xpdFEAYOk8iySO56hMFq6uLyA= +github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d/go.mod h1:rHwXgn7JulP+udvsHwJoVG1YGAP6VLg4y9I5dyZdqmA= +github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= +go.etcd.io/bbolt v1.3.10 h1:+BqfJTcCzTItrop8mq/lbzL8wSGtj94UO/3U31shqG0= +go.etcd.io/bbolt v1.3.10/go.mod h1:bK3UQLPJZly7IlNmV7uVHJDxfe5aK9Ll93e/74Y9oEQ= +go.mongodb.org/mongo-driver v1.15.0 h1:rJCKC8eEliewXjZGf0ddURtl7tTVy1TK3bfl0gkUSLc= +go.mongodb.org/mongo-driver v1.15.0/go.mod h1:Vzb0Mk/pa7e6cWw85R4F/endUC3u0U9jGcNU603k65c= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= +golang.org/x/crypto v0.17.0 h1:r8bRNjWL3GshPW3gkd+RpvzWrZAwPS49OmTGZ/uhM4k= +golang.org/x/crypto v0.17.0/go.mod h1:gCAAfMLgwOJRpTjQ2zCCt2OcSfYMTeZVSRtQlPC7Nq4= +golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= +golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= +golang.org/x/net v0.10.0 h1:X2//UzNDwYmtCLn7To6G58Wr6f5ahEAQgKNzv9Y951M= +golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M= +golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.15.0 h1:h48lPFYpsTvQJZF4EKyI4aLHaev3CxivZmv7yZig9pc= +golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= +golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= +golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ= +golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= +golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/vendor/modules.txt b/vendor/modules.txt new file mode 100644 index 0000000..e7a8905 --- /dev/null +++ b/vendor/modules.txt @@ -0,0 +1,125 @@ +# github.com/eclipse/paho.mqtt.golang v1.4.3 +## explicit; go 1.18 +github.com/eclipse/paho.mqtt.golang +github.com/eclipse/paho.mqtt.golang/packets +# github.com/globalsign/mgo v0.0.0-20181015135952-eeefdecb41b8 +## explicit +# github.com/golang/glog v1.2.1 +## explicit; go 1.19 +github.com/golang/glog +github.com/golang/glog/internal/logsink +github.com/golang/glog/internal/stackdump +# github.com/golang/snappy v0.0.1 +## explicit +github.com/golang/snappy +# github.com/google/uuid v1.6.0 +## explicit +github.com/google/uuid +# github.com/gorilla/websocket v1.5.0 +## explicit; go 1.12 +github.com/gorilla/websocket +# github.com/klauspost/compress v1.13.6 +## explicit; go 1.15 +github.com/klauspost/compress +github.com/klauspost/compress/fse +github.com/klauspost/compress/huff0 +github.com/klauspost/compress/internal/snapref +github.com/klauspost/compress/zstd +github.com/klauspost/compress/zstd/internal/xxhash +# github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe +## explicit +github.com/montanaflynn/stats +# github.com/open-horizon/edge-utilities v0.0.0-20190711093331-0908b45a7152 +## explicit +github.com/open-horizon/edge-utilities/logger +github.com/open-horizon/edge-utilities/logger/log +github.com/open-horizon/edge-utilities/logger/trace +github.com/open-horizon/edge-utilities/properties +# github.com/xdg-go/pbkdf2 v1.0.0 +## explicit; go 1.9 +github.com/xdg-go/pbkdf2 +# github.com/xdg-go/scram v1.1.2 +## explicit; go 1.11 +github.com/xdg-go/scram +# github.com/xdg-go/stringprep v1.0.4 +## explicit; go 1.11 +github.com/xdg-go/stringprep +# github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d +## explicit +github.com/youmark/pkcs8 +# go.etcd.io/bbolt v1.3.10 +## explicit; go 1.21 +go.etcd.io/bbolt +# go.mongodb.org/mongo-driver v1.15.0 +## explicit; go 1.18 +go.mongodb.org/mongo-driver/bson +go.mongodb.org/mongo-driver/bson/bsoncodec +go.mongodb.org/mongo-driver/bson/bsonoptions +go.mongodb.org/mongo-driver/bson/bsonrw +go.mongodb.org/mongo-driver/bson/bsontype +go.mongodb.org/mongo-driver/bson/primitive +go.mongodb.org/mongo-driver/event +go.mongodb.org/mongo-driver/internal/aws +go.mongodb.org/mongo-driver/internal/aws/awserr +go.mongodb.org/mongo-driver/internal/aws/credentials +go.mongodb.org/mongo-driver/internal/aws/signer/v4 +go.mongodb.org/mongo-driver/internal/bsonutil +go.mongodb.org/mongo-driver/internal/codecutil +go.mongodb.org/mongo-driver/internal/credproviders +go.mongodb.org/mongo-driver/internal/csfle +go.mongodb.org/mongo-driver/internal/csot +go.mongodb.org/mongo-driver/internal/driverutil +go.mongodb.org/mongo-driver/internal/handshake +go.mongodb.org/mongo-driver/internal/httputil +go.mongodb.org/mongo-driver/internal/logger +go.mongodb.org/mongo-driver/internal/ptrutil +go.mongodb.org/mongo-driver/internal/rand +go.mongodb.org/mongo-driver/internal/randutil +go.mongodb.org/mongo-driver/internal/uuid +go.mongodb.org/mongo-driver/mongo +go.mongodb.org/mongo-driver/mongo/address +go.mongodb.org/mongo-driver/mongo/description +go.mongodb.org/mongo-driver/mongo/gridfs +go.mongodb.org/mongo-driver/mongo/options +go.mongodb.org/mongo-driver/mongo/readconcern +go.mongodb.org/mongo-driver/mongo/readpref +go.mongodb.org/mongo-driver/mongo/writeconcern +go.mongodb.org/mongo-driver/tag +go.mongodb.org/mongo-driver/version +go.mongodb.org/mongo-driver/x/bsonx/bsoncore +go.mongodb.org/mongo-driver/x/mongo/driver +go.mongodb.org/mongo-driver/x/mongo/driver/auth +go.mongodb.org/mongo-driver/x/mongo/driver/auth/creds +go.mongodb.org/mongo-driver/x/mongo/driver/auth/internal/gssapi +go.mongodb.org/mongo-driver/x/mongo/driver/connstring +go.mongodb.org/mongo-driver/x/mongo/driver/dns +go.mongodb.org/mongo-driver/x/mongo/driver/mongocrypt +go.mongodb.org/mongo-driver/x/mongo/driver/mongocrypt/options +go.mongodb.org/mongo-driver/x/mongo/driver/ocsp +go.mongodb.org/mongo-driver/x/mongo/driver/operation +go.mongodb.org/mongo-driver/x/mongo/driver/session +go.mongodb.org/mongo-driver/x/mongo/driver/topology +go.mongodb.org/mongo-driver/x/mongo/driver/wiremessage +# golang.org/x/crypto v0.17.0 +## explicit; go 1.18 +golang.org/x/crypto/ocsp +golang.org/x/crypto/pbkdf2 +# golang.org/x/net v0.10.0 +## explicit; go 1.17 +golang.org/x/net/internal/socks +golang.org/x/net/proxy +# golang.org/x/sync v0.7.0 +## explicit; go 1.18 +golang.org/x/sync/errgroup +golang.org/x/sync/semaphore +golang.org/x/sync/singleflight +# golang.org/x/sys v0.15.0 +## explicit; go 1.18 +golang.org/x/sys/unix +golang.org/x/sys/windows +# golang.org/x/text v0.14.0 +## explicit; go 1.18 +golang.org/x/text/transform +golang.org/x/text/unicode/norm +# gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c +## explicit; go 1.11 diff --git a/vendor/vendor.json b/vendor/vendor.json deleted file mode 100644 index c2c0ffb..0000000 --- a/vendor/vendor.json +++ /dev/null @@ -1,109 +0,0 @@ -{ - "comment": "", - "ignore": "test", - "package": [ - { - "checksumSHA1": "15IV+XctBBGO7lvmYfV66dkVp7Y=", - "path": "github.com/eclipse/paho.mqtt.golang", - "revision": "36d01c2b4cbeb3d2a12063e4880ce30800af9560", - "revisionTime": "2018-03-15T09:10:34Z", - "version": "=v1.1.1", - "versionExact": "v1.1.1" - }, - { - "checksumSHA1": "ITCnG3uKUnqNXrnnulKokjQKfN4=", - "path": "github.com/eclipse/paho.mqtt.golang/packets", - "revision": "36d01c2b4cbeb3d2a12063e4880ce30800af9560", - "revisionTime": "2018-03-15T09:10:34Z", - "version": "=v1.1.1", - "versionExact": "v1.1.1" - }, - { - "checksumSHA1": "5paEOrPNmte9PRyvpBF8HTDjQD4=", - "path": "github.com/globalsign/mgo", - "revision": "113d3961e7311526535a1ef7042196563d442761", - "revisionTime": "2018-06-15T13:49:36Z", - "version": "=r2018.06.15", - "versionExact": "r2018.06.15" - }, - { - "checksumSHA1": "zdy6XflmESAiFmyptSqpUMfXSyo=", - "path": "github.com/globalsign/mgo/bson", - "revision": "113d3961e7311526535a1ef7042196563d442761", - "revisionTime": "2018-06-15T13:49:36Z", - "version": "=r2018.06.15", - "versionExact": "r2018.06.15" - }, - { - "checksumSHA1": "//bXQG4HblBwxHJDIW/PklXAxu0=", - "path": "github.com/globalsign/mgo/internal/json", - "revision": "113d3961e7311526535a1ef7042196563d442761", - "revisionTime": "2018-06-15T13:49:36Z", - "version": "=r2018.06.15", - "versionExact": "r2018.06.15" - }, - { - "checksumSHA1": "loubUcDO2QAStKIlVd7DqSeoEaw=", - "path": "github.com/globalsign/mgo/internal/sasl", - "revision": "113d3961e7311526535a1ef7042196563d442761", - "revisionTime": "2018-06-15T13:49:36Z", - "version": "=r2018.06.15", - "versionExact": "r2018.06.15" - }, - { - "checksumSHA1": "xjfEsfN7ho6dDOVeUQJhW4KzD2I=", - "path": "github.com/globalsign/mgo/internal/scram", - "revision": "113d3961e7311526535a1ef7042196563d442761", - "revisionTime": "2018-06-15T13:49:36Z", - "version": "=r2018.06.15", - "versionExact": "r2018.06.15" - }, - { - "checksumSHA1": "H8wo+NR5z+VRl0wqPYpVQfC06ks=", - "path": "github.com/go-stack/stack", - "revision": "2fee6af1a9795aafbe0253a0cfbdf668e1fb8a9a", - "revisionTime": "2018-08-26T13:48:48Z", - "version": "=v1.8.0", - "versionExact": "v1.8.0" - }, - { - "checksumSHA1": "yNyE9MrpDJQAxOxSipSIBgBhuNQ=", - "path": "github.com/google/uuid", - "revision": "9b3b1e0f5f99ae461456d768e7d301a7acdaa2d8", - "revisionTime": "2018-09-17T14:00:05Z", - "version": "=v1.1.0", - "versionExact": "v1.1.0" - }, - { - "checksumSHA1": "W/PCjyIexX/4JYyy0xQ9s4GjwVk=", - "path": "go.etcd.io/bbolt", - "revision": "68cc10a767ea1c6b9e8dcb9847317ff192d6d974", - "revisionTime": "2020-03-19T20:29:37Z" - }, - { - "checksumSHA1": "f3Y7JIZH61oMmp8nphqe8Mg+XoU=", - "path": "golang.org/x/net/internal/socks", - "revision": "65e2d4e15006aab9813ff8769e768bbf4bb667a0", - "revisionTime": "2019-02-01T23:59:58Z" - }, - { - "checksumSHA1": "mCMW3hvbWFW1k5il9yyO7ELOdws=", - "path": "golang.org/x/net/proxy", - "revision": "65e2d4e15006aab9813ff8769e768bbf4bb667a0", - "revisionTime": "2019-02-01T23:59:58Z" - }, - { - "checksumSHA1": "F+tqxPGFt5x7DKZakbbMmENX1oQ=", - "path": "golang.org/x/net/websocket", - "revision": "65e2d4e15006aab9813ff8769e768bbf4bb667a0", - "revisionTime": "2019-02-01T23:59:58Z" - }, - { - "checksumSHA1": "25eWuZl9nc6J3+a1fJochJPLhZM=", - "path": "golang.org/x/sys/unix", - "revision": "3b5209105503162ded1863c307ac66fec31120dd", - "revisionTime": "2019-02-09T17:16:25Z" - } - ], - "rootPath": "github.com/open-horizon/edge-sync-service" -}