feat: Implemented Lock Mechanism for Synchronization.

- Refactored beforeSync() to incorporate a robust locking mechanism.
- Added backoff strategy to handle race conditions during concurrent sync attempts.
- Ensured single instance of sync data file on Google Drive to prevent duplication.
- Optimized lock file checking and handling for efficient synchronization process.

This update significantly improves the reliability and efficiency of the syncing process by preventing race conditions and ensuring that synchronization actions are atomic.

Signed-off-by: KaiserBh <kaiserbh@proton.me>
This commit is contained in:
KaiserBh 2024-01-07 09:48:44 +11:00
parent 2e5686372a
commit 39d5714a1b
No known key found for this signature in database
GPG Key ID: 14D73B142042BBA9
2 changed files with 131 additions and 32 deletions

View File

@ -20,6 +20,7 @@ import com.google.api.services.drive.DriveScopes
import com.google.api.services.drive.model.File
import eu.kanade.tachiyomi.R
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.delay
import kotlinx.coroutines.withContext
import kotlinx.serialization.encodeToString
import kotlinx.serialization.json.Json
@ -32,6 +33,7 @@ import java.io.ByteArrayInputStream
import java.io.ByteArrayOutputStream
import java.io.IOException
import java.io.InputStreamReader
import java.time.Instant
import java.util.zip.GZIPInputStream
import java.util.zip.GZIPOutputStream
@ -57,12 +59,50 @@ class GoogleDriveSyncService(context: Context, json: Json, syncPreferences: Sync
private val remoteFileName = "tachiyomi_sync_data.gz"
private val lockFileName = "tachiyomi_sync.lock"
private val googleDriveService = GoogleDriveService(context)
override suspend fun beforeSync() = googleDriveService.refreshToken()
override suspend fun beforeSync() {
googleDriveService.refreshToken()
val drive = googleDriveService.driveService ?: throw Exception("Google Drive service not initialized")
var backoff = 2000L // Start with 2 seconds
while (true) {
val lockFiles = findLockFile(drive) // Fetch the current list of lock files
when {
lockFiles.isEmpty() -> {
// No lock file exists, try to create a new one
createLockFile(drive)
}
lockFiles.size == 1 -> {
// Exactly one lock file exists
val lockFile = lockFiles.first()
val createdTime = Instant.parse(lockFile.createdTime.toString())
val ageMinutes = java.time.Duration.between(createdTime, Instant.now()).toMinutes()
if (ageMinutes <= 3) {
// Lock file is new and presumably held by this process, break the loop to proceed
break
} else {
// Lock file is old, delete and attempt to create a new one
deleteLockFile(drive)
createLockFile(drive)
}
}
else -> {
// More than one lock file exists, likely due to a race condition
delay(backoff) // Apply backoff strategy
backoff = (backoff * 2).coerceAtMost(32000L) // Max backoff of 32 seconds
}
}
// The loop continues until it can confirm that there's exactly one new lock file.
}
}
override suspend fun pullSyncData(): SyncData? {
val drive = googleDriveService.googleDriveService
val drive = googleDriveService.driveService
// Check if the Google Drive service is initialized
if (drive == null) {
@ -89,27 +129,11 @@ class GoogleDriveSyncService(context: Context, json: Json, syncPreferences: Sync
override suspend fun pushSyncData(syncData: SyncData) {
val jsonData = json.encodeToString(syncData)
val drive = googleDriveService.driveService ?: throw Exception(context.getString(R.string.google_drive_not_signed_in))
val drive = googleDriveService.googleDriveService
// Check if the Google Drive service is initialized
if (drive == null) {
logcat(LogPriority.ERROR) { "Google Drive service not initialized" }
throw Exception(context.getString(R.string.google_drive_not_signed_in))
}
// delete file if exists
val fileList = getFileList(drive)
if (fileList.isNotEmpty()) {
drive.files().delete(fileList[0].id).execute()
}
val fileMetadata = File()
fileMetadata.name = remoteFileName
fileMetadata.mimeType = "application/gzip"
val byteArrayOutputStream = ByteArrayOutputStream()
withContext(Dispatchers.IO) {
val gzipOutputStream = GZIPOutputStream(byteArrayOutputStream)
gzipOutputStream.write(jsonData.toByteArray(Charsets.UTF_8))
@ -117,24 +141,98 @@ class GoogleDriveSyncService(context: Context, json: Json, syncPreferences: Sync
}
val byteArrayContent = ByteArrayContent("application/octet-stream", byteArrayOutputStream.toByteArray())
val uploadedFile = drive.files().create(fileMetadata, byteArrayContent)
.setFields("id")
.execute()
logcat(LogPriority.DEBUG) { "Created sync data file in Google Drive with file ID: ${uploadedFile.id}" }
try {
if (fileList.isNotEmpty()) {
// File exists, so update it
val fileId = fileList[0].id
drive.files().update(fileId, null, byteArrayContent).execute()
logcat(LogPriority.DEBUG) { "Updated existing sync data file in Google Drive with file ID: $fileId" }
} else {
// File doesn't exist, so create it
val fileMetadata = File().apply {
name = remoteFileName
mimeType = "application/gzip"
}
val uploadedFile = drive.files().create(fileMetadata, byteArrayContent)
.setFields("id")
.execute()
logcat(LogPriority.DEBUG) { "Created new sync data file in Google Drive with file ID: ${uploadedFile.id}" }
}
// Data has been successfully pushed or updated, delete the lock file
deleteLockFile(drive)
} catch (e: Exception) {
logcat(LogPriority.ERROR) { "Failed to push or update sync data: ${e.message}" }
}
}
private fun getFileList(drive: Drive): MutableList<File> {
// Search for the existing file by name
val query = "mimeType='application/gzip' and trashed = false and name = '$remoteFileName'"
val fileList = drive.files().list().setQ(query).execute().files
Log.d("GoogleDrive", "File list: $fileList")
try {
// Search for the existing file by name
val query = "mimeType='application/gzip' and trashed = false and name = '$remoteFileName'"
val fileList = drive.files().list().setQ(query).execute().files
Log.d("GoogleDrive", "File list: $fileList")
return fileList
return fileList
} catch (e: Exception) {
Log.e("GoogleDrive", "Error no sync data found: ${e.message}")
return mutableListOf()
}
}
private fun createLockFile(drive: Drive) {
try {
val fileMetadata = File()
fileMetadata.name = lockFileName
fileMetadata.mimeType = "text/plain"
// Create an empty content to upload as the lock file
val emptyContent = ByteArrayContent.fromString("text/plain", "")
val file = drive.files().create(fileMetadata, emptyContent)
.setFields("id, name, createdTime")
.execute()
Log.d("GoogleDrive", "Created lock file with ID: ${file.id}")
} catch (e: Exception) {
Log.e("GoogleDrive", "Error creating lock file: ${e.message}")
}
}
private fun findLockFile(drive: Drive): MutableList<File> {
try {
val query = "mimeType='text/plain' and trashed = false and name = '$lockFileName'"
val fileList = drive.files().list().setQ(query).setFields("files(id, name, createdTime)").execute().files
Log.d("GoogleDrive", "Lock file search result: $fileList")
return fileList
} catch (e: Exception) {
Log.e("GoogleDrive", "Error finding lock file: ${e.message}")
return mutableListOf()
}
}
private fun deleteLockFile(drive: Drive) {
try {
val lockFiles = findLockFile(drive)
if (lockFiles.isNotEmpty()) {
for (file in lockFiles) {
drive.files().delete(file.id).execute()
Log.d("GoogleDrive", "Deleted lock file with ID: ${file.id}")
}
} else {
Log.d("GoogleDrive", "No lock file found to delete.")
}
} catch (e: Exception) {
Log.e("GoogleDrive", "Error deleting lock file: ${e.message}")
throw Exception(context.getString(R.string.error_deleting_google_drive_lock_file))
}
}
suspend fun deleteSyncDataFromGoogleDrive(): DeleteSyncDataStatus {
val drive = googleDriveService.googleDriveService
val drive = googleDriveService.driveService
if (drive == null) {
logcat(LogPriority.ERROR) { "Google Drive service not initialized" }
@ -160,7 +258,7 @@ class GoogleDriveSyncService(context: Context, json: Json, syncPreferences: Sync
}
class GoogleDriveService(private val context: Context) {
var googleDriveService: Drive? = null
var driveService: Drive? = null
companion object {
const val REDIRECT_URI = "eu.kanade.google.oauth:/oauth2redirect"
}
@ -179,7 +277,7 @@ class GoogleDriveService(private val context: Context) {
val refreshToken = syncPreferences.googleDriveRefreshToken().get()
if (accessToken == "" || refreshToken == "") {
googleDriveService = null
driveService = null
return
}
@ -296,7 +394,7 @@ class GoogleDriveService(private val context: Context) {
credential.accessToken = accessToken
credential.refreshToken = refreshToken
googleDriveService = Drive.Builder(
driveService = Drive.Builder(
NetHttpTransport(),
jsonFactory,
credential,

View File

@ -577,6 +577,7 @@
<string name="google_drive_login_success">Logged in to Google Drive</string>
<string name="google_drive_login_failed">Failed to log in to Google Drive: %s</string>
<string name="google_drive_not_signed_in">Not signed in to Google Drive</string>
<string name="error_deleting_google_drive_lock_file">Error Deleting Google Drive Lock File</string>
<string name="pref_purge_confirmation_title">Purge confirmation</string>
<string name="pref_purge_confirmation_message">Purging sync data will delete all your sync data from Google Drive. Are you sure you want to continue?</string>