Skip to content

Commit

Permalink
Fully implement BlobCache
Browse files Browse the repository at this point in the history
This class is responsible for caching blobs during a backup run,
so we can know that a blob for the given chunk ID already exists
and does not need to be uploaded again.

It builds up its cache from snapshots available on the backend
and from the persistent cache that includes blobs that could not be added to a snapshot,
because the backup was aborted.
  • Loading branch information
grote committed Sep 12, 2024
1 parent a2b7182 commit 81389ad
Show file tree
Hide file tree
Showing 13 changed files with 388 additions and 139 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import okio.Buffer
import okio.buffer
import okio.sink
import org.calyxos.seedvault.core.backends.AppBackupFileType
import org.calyxos.seedvault.core.backends.TopLevelFolder

internal class SnapshotManager(
private val crypto: Crypto,
Expand All @@ -27,37 +26,23 @@ internal class SnapshotManager(
private val log = KotlinLogging.logger {}

/**
* The latest [Snapshot]. May be stale if [loadSnapshots] has not returned
* The latest [Snapshot]. May be stale if [onSnapshotsLoaded] has not returned
* or wasn't called since new snapshots have been created.
*/
var latestSnapshot: Snapshot? = null
private set

suspend fun loadSnapshots(callback: (Snapshot) -> Unit) {
log.info { "Loading snapshots..." }
val handles = mutableListOf<AppBackupFileType.Snapshot>()
backendManager.backend.list(
topLevelFolder = TopLevelFolder(crypto.repoId),
AppBackupFileType.Snapshot::class,
) { fileInfo ->
fileInfo.fileHandle as AppBackupFileType.Snapshot
handles.add(fileInfo.fileHandle as AppBackupFileType.Snapshot)
}
handles.forEach { fileHandle ->
suspend fun onSnapshotsLoaded(handles: List<AppBackupFileType.Snapshot>): List<Snapshot> {
return handles.map { snapshotHandle ->
// TODO set up local snapshot cache, so we don't need to download those all the time
// TODO is it a fatal error when one snapshot is corrupted or couldn't get loaded?
val snapshot = onSnapshotFound(fileHandle)
callback(snapshot)
}
}

private suspend fun onSnapshotFound(snapshotHandle: AppBackupFileType.Snapshot): Snapshot {
// TODO set up local snapshot cache, so we don't need to download those all the time
val snapshot = loader.loadFile(snapshotHandle).use { inputStream ->
Snapshot.parseFrom(inputStream)
val snapshot = loader.loadFile(snapshotHandle).use { inputStream ->
Snapshot.parseFrom(inputStream)
}
// update latest snapshot if this one is more recent
if (snapshot.token > (latestSnapshot?.token ?: 0)) latestSnapshot = snapshot
snapshot
}
// update latest snapshot if this one is more recent
if (snapshot.token > (latestSnapshot?.token ?: 0)) latestSnapshot = snapshot
return snapshot
}

suspend fun saveSnapshot(snapshot: Snapshot) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,21 @@
package com.stevesoltys.seedvault.transport.backup

import com.stevesoltys.seedvault.MemoryLogger
import com.stevesoltys.seedvault.backend.BackendManager
import com.stevesoltys.seedvault.crypto.Crypto
import com.stevesoltys.seedvault.settings.SettingsManager
import com.stevesoltys.seedvault.transport.SnapshotManager
import io.github.oshai.kotlinlogging.KotlinLogging
import kotlinx.coroutines.delay
import org.calyxos.seedvault.core.backends.AppBackupFileType.Blob
import org.calyxos.seedvault.core.backends.AppBackupFileType.Snapshot
import org.calyxos.seedvault.core.backends.FileInfo
import org.calyxos.seedvault.core.backends.TopLevelFolder

internal class AppBackupManager(
private val blobsCache: BlobsCache,
private val crypto: Crypto,
private val blobCache: BlobCache,
private val backendManager: BackendManager,
private val settingsManager: SettingsManager,
private val snapshotManager: SnapshotManager,
private val snapshotCreatorFactory: SnapshotCreatorFactory,
Expand All @@ -23,23 +31,43 @@ internal class AppBackupManager(
private set

suspend fun beforeBackup() {
log.info { "Before backup" }
log.info { "Loading existing snapshots and blobs..." }
val blobInfos = mutableListOf<FileInfo>()
val snapshotHandles = mutableListOf<Snapshot>()
backendManager.backend.list(
topLevelFolder = TopLevelFolder(crypto.repoId),
Blob::class, Snapshot::class,
) { fileInfo ->
when (fileInfo.fileHandle) {
is Blob -> blobInfos.add(fileInfo)
is Snapshot -> snapshotHandles.add(fileInfo.fileHandle as Snapshot)
else -> error("Unexpected FileHandle: $fileInfo")
}
}
snapshotCreator = snapshotCreatorFactory.createSnapshotCreator()
blobsCache.populateCache()
val snapshots = snapshotManager.onSnapshotsLoaded(snapshotHandles)
blobCache.populateCache(blobInfos, snapshots)
}

suspend fun afterBackupFinished(success: Boolean) {
log.info { "After backup finished. Success: $success" }
MemoryLogger.log()
blobsCache.clear()
if (success) {
val snapshot = snapshotCreator?.finalizeSnapshot() ?: error("Had no snapshotCreator")
keepTrying {
snapshotManager.saveSnapshot(snapshot)
// free up memory by clearing blobs cache
blobCache.clear()
try {
if (success) {
val snapshot =
snapshotCreator?.finalizeSnapshot() ?: error("Had no snapshotCreator")
keepTrying {
snapshotManager.saveSnapshot(snapshot)
}
settingsManager.token = snapshot.token
// after snapshot was written, we can clear local cache as its info is in snapshot
blobCache.clearLocalCache()
}
settingsManager.token = snapshot.token
} finally {
snapshotCreator = null
}
snapshotCreator = null
MemoryLogger.log()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import org.koin.dsl.module
val backupModule = module {
single { BackupInitializer(get()) }
single { BackupReceiver(get(), get(), get()) }
single { BlobsCache(get(), get(), get()) }
single { BlobCache(androidContext()) }
single { BlobCreator(get(), get()) }
single { SnapshotManager(get(), get(), get()) }
single { SnapshotCreatorFactory(androidContext(), get(), get(), get()) }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ data class BackupData(
}

internal class BackupReceiver(
private val blobsCache: BlobsCache,
private val blobCache: BlobCache,
private val blobCreator: BlobCreator,
private val crypto: Crypto,
private val replaceableChunker: Chunker? = null,
Expand Down Expand Up @@ -91,11 +91,11 @@ internal class BackupReceiver(
private suspend fun onNewChunk(chunk: Chunk) {
chunks.add(chunk.hash)

val existingBlob = blobsCache.getBlob(chunk.hash)
val existingBlob = blobCache[chunk.hash]
if (existingBlob == null) {
val blob = blobCreator.createNewBlob(chunk)
chunkMap[chunk.hash] = blob
blobsCache.saveNewBlob(chunk.hash, blob)
blobCache.saveNewBlob(chunk.hash, blob)
Log.e("TEST", "NEW BLOB: ${f(blob.uncompressedLength)}")
} else {
chunkMap[chunk.hash] = existingBlob
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
/*
* SPDX-FileCopyrightText: 2024 The Calyx Institute
* SPDX-License-Identifier: Apache-2.0
*/

package com.stevesoltys.seedvault.transport.backup

import android.content.Context
import android.content.Context.MODE_APPEND
import com.stevesoltys.seedvault.MemoryLogger
import com.stevesoltys.seedvault.proto.Snapshot
import com.stevesoltys.seedvault.proto.Snapshot.Blob
import io.github.oshai.kotlinlogging.KotlinLogging
import org.calyxos.seedvault.core.backends.FileInfo
import org.calyxos.seedvault.core.toByteArrayFromHex
import org.calyxos.seedvault.core.toHexString
import java.io.IOException

private const val CACHE_FILE_NAME = "blobsCache"

/**
* Responsible for caching blobs during a backup run,
* so we can know that a blob for the given chunk ID already exists
* and does not need to be uploaded again.
*
* It builds up its cache from snapshots available on the backend
* and from the persistent cache that includes blobs that could not be added to a snapshot,
* because the backup was aborted.
*/
internal class BlobCache(
private val context: Context,
) {

private val log = KotlinLogging.logger {}
private val blobMap = mutableMapOf<String, Blob>()

/**
* This must be called before saving files to the backend to avoid uploading duplicate blobs.
*/
@Throws(IOException::class)
fun populateCache(blobs: List<FileInfo>, snapshots: List<Snapshot>) {
log.info { "Getting all blobs from backend..." }
MemoryLogger.log()
blobMap.clear()
// create map of blobId to size of blob on backend
val blobIds = blobs.associate {
Pair(it.fileHandle.name, it.size.toInt())
}
// load local blob cache and include only blobs on backend
loadPersistentBlobCache(blobIds)
// build up mapping from chunkId to blob from available snapshots
snapshots.forEach { snapshot ->
onSnapshotLoaded(snapshot, blobIds)
}
MemoryLogger.log()
}

/**
* Should only be called after [populateCache] has returned.
*/
operator fun get(chunkId: String): Blob? = blobMap[chunkId]

/**
* Should get called for all new blobs as soon as they've been saved to the backend.
*/
fun saveNewBlob(chunkId: String, blob: Blob) {
val previous = blobMap.put(chunkId, blob)
if (previous == null) {
// persist this new blob locally in case backup gets interrupted
context.openFileOutput(CACHE_FILE_NAME, MODE_APPEND).use { outputStream ->
outputStream.write(chunkId.toByteArrayFromHex())
blob.writeDelimitedTo(outputStream)
}
}
}

/**
* Clears the cached blob mapping.
* Should be called after a backup run to free up memory.
*/
fun clear() {
MemoryLogger.log()
log.info { "Clearing cache..." }
blobMap.clear()
MemoryLogger.log()
}

/**
* Clears the local cache.
* Should get called after uploading a new snapshot
* to prevent the persistent cache from growing indefinitely.
*/
fun clearLocalCache() {
log.info { "Clearing local cache..." }
context.deleteFile(CACHE_FILE_NAME)
}

/**
* Loads persistent cache from disk and adds blobs to [blobMap]
* if available in [allowedBlobIds] with the right size.
*/
private fun loadPersistentBlobCache(allowedBlobIds: Map<String, Int>) {
try {
context.openFileInput(CACHE_FILE_NAME).use { inputStream ->
val chunkIdBytes = ByteArray(32)
while (true) {
val bytesRead = inputStream.read(chunkIdBytes)
if (bytesRead != 32) break
val chunkId = chunkIdBytes.toHexString()
// parse blob
val blob = Blob.parseDelimitedFrom(inputStream)
val blobId = blob.id.hexFromProto()
// include blob only if size is equal to size on backend
val sizeOnBackend = allowedBlobIds[blobId]
if (sizeOnBackend == blob.length) {
blobMap[chunkId] = blob
} else log.warn {
if (sizeOnBackend == null) {
"Cached blob $blobId is missing from backend."
} else {
"Cached blob $blobId had different size on backend: $sizeOnBackend"
}
}
}
}
} catch (e: Exception) {
// If the local cache is corrupted, that's not the end of the world.
// We can still continue normally,
// but may be writing out duplicated blobs we can't re-use.
// Those will get deleted again when pruning.
// So swallow the exception.
log.error(e) { "Error loading blobs cache" }
}
}

/**
* Used for populating local [blobMap] cache.
* Adds mapping from chunkId to [Blob], if it exists on backend, i.e. part of [blobIds]
* and its size matches the one on backend, i.e. value of [blobIds].
*/
private fun onSnapshotLoaded(snapshot: Snapshot, blobIds: Map<String, Int>) {
snapshot.blobsMap.forEach { (chunkId, blob) ->
// check if referenced blob still exists on backend
val blobId = blob.id.hexFromProto()
val sizeOnBackend = blobIds[blobId]
if (sizeOnBackend == blob.length) {
// only add blob to our mapping, if it still exists
blobMap.putIfAbsent(chunkId, blob)?.let { previous ->
if (previous.id != blob.id) log.warn {
"Chunk ID ${chunkId.substring(0..5)} had more than one blob."
}
}
} else log.warn {
if (sizeOnBackend == null) {
"Blob $blobId in snapshot ${snapshot.token} is missing."
} else {
"Blob $blobId has unexpected size: $sizeOnBackend"
}
}
}
}

}
Loading

0 comments on commit 81389ad

Please sign in to comment.