lib: improve DataStore API

This commit is contained in:
Oscar Mira 2025-04-21 21:02:50 +02:00
parent e9d08f95d1
commit d2dd6171fe
No known key found for this signature in database
GPG key ID: B371B98C5DC32237
14 changed files with 233 additions and 189 deletions

View file

@ -25,7 +25,7 @@ class MoneroSdkClient(private val context: Context) {
network = network,
dataStore = WalletDataStoreFile(filename, newFile = true),
).also { wallet ->
wallet.commit()
wallet.save()
}
}
@ -42,7 +42,7 @@ class MoneroSdkClient(private val context: Context) {
secretSpendKey = secretSpendKey,
restorePoint = restorePoint,
).also { wallet ->
wallet.commit()
wallet.save()
}
}
@ -87,7 +87,7 @@ class MoneroSdkClient(private val context: Context) {
throw IOException("Cannot create wallet data directory: ${walletDataDir.path}")
}
override suspend fun write(writer: (OutputStream) -> Unit) {
override suspend fun save(writer: (OutputStream) -> Unit, overwrite: Boolean) {
val output = file.startWrite()
try {
writer(output)
@ -98,7 +98,7 @@ class MoneroSdkClient(private val context: Context) {
}
}
override suspend fun read(): InputStream {
override suspend fun load(): InputStream {
return file.openRead()
}
}

View file

@ -38,7 +38,7 @@ class SyncService(
syncedWalletIds.map {
walletRepository.getWallet(it)
}.forEach { wallet ->
wallet.commit()
wallet.save()
}
}
delay(60.seconds)
@ -56,7 +56,7 @@ class SyncService(
if (result.isError()) {
// TODO: Handle non-recoverable errors
}
wallet.commit()
wallet.save()
delay(10.seconds)
}
}

View file

@ -1,6 +0,0 @@
package im.molly.monero;
interface IStorageAdapter {
boolean writeAsync(in ParcelFileDescriptor pfd);
oneway void readAsync(in ParcelFileDescriptor pfd);
}

View file

@ -18,7 +18,7 @@ interface IWallet {
oneway void resumeRefresh(boolean skipCoinbase, in IWalletCallbacks callback);
oneway void cancelRefresh();
oneway void setRefreshSince(long heightOrTimestamp);
oneway void commit(in IWalletCallbacks callback);
oneway void commit(in ParcelFileDescriptor outputFd, in IWalletCallbacks callback);
oneway void createPayment(in PaymentRequest request, in ITransferCallback callback);
oneway void createSweep(in SweepRequest request, in ITransferCallback callback);
oneway void requestFees(in IWalletCallbacks callback);

View file

@ -1,6 +1,5 @@
package im.molly.monero.internal;
import im.molly.monero.IStorageAdapter;
import im.molly.monero.SecretKey;
import im.molly.monero.WalletConfig;
import im.molly.monero.internal.IHttpRpcClient;
@ -8,8 +7,8 @@ import im.molly.monero.internal.IWalletServiceCallbacks;
import im.molly.monero.internal.IWalletServiceListener;
interface IWalletService {
oneway void createWallet(in WalletConfig config, in IStorageAdapter storage, in IHttpRpcClient rpcClient, in IWalletServiceCallbacks callback);
oneway void restoreWallet(in WalletConfig config, in IStorageAdapter storage, in IHttpRpcClient rpcClient, in IWalletServiceCallbacks callback, in SecretKey spendSecretKey, long restorePoint);
oneway void openWallet(in WalletConfig config, in IStorageAdapter storage, in IHttpRpcClient rpcClient, in IWalletServiceCallbacks callback);
oneway void createWallet(in WalletConfig config, in IHttpRpcClient rpcClient, in IWalletServiceCallbacks callback);
oneway void restoreWallet(in WalletConfig config, in IHttpRpcClient rpcClient, in IWalletServiceCallbacks callback, in SecretKey spendSecretKey, long restorePoint);
oneway void openWallet(in WalletConfig config, in IHttpRpcClient rpcClient, in IWalletServiceCallbacks callback, in ParcelFileDescriptor inputFd);
void setListener(in IWalletServiceListener listener);
}

View file

@ -1,7 +1,10 @@
package im.molly.monero
import im.molly.monero.exceptions.InternalRuntimeException
import im.molly.monero.exceptions.NoSuchAccountException
import im.molly.monero.internal.LedgerFactory
import im.molly.monero.internal.NativeWallet
import im.molly.monero.internal.DataStoreAdapter
import im.molly.monero.internal.TxInfo
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.channels.awaitClose
@ -13,24 +16,24 @@ import kotlinx.coroutines.flow.conflate
import kotlinx.coroutines.flow.first
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.suspendCancellableCoroutine
import kotlin.coroutines.resume
import kotlin.coroutines.resumeWithException
import kotlin.coroutines.suspendCoroutine
import kotlin.time.Duration.Companion.seconds
@OptIn(ExperimentalCoroutinesApi::class)
class MoneroWallet internal constructor(
private val wallet: IWallet,
private val storageAdapter: StorageAdapter,
private val defaultStore: DataStoreAdapter?,
val moneroNodeClient: MoneroNodeClient?,
) : AutoCloseable {
private val logger = loggerFor<MoneroWallet>()
val publicAddress: PublicAddress = PublicAddress.parse(wallet.publicAddress)
val network: MoneroNetwork
get() = publicAddress.network
var dataStore by storageAdapter::dataStore
private val logger = loggerFor<MoneroWallet>()
// suspend fun addDetachedSubAddress(accountIndex: Int, subAddressIndex: Int): AccountAddress =
// suspendCancellableCoroutine { continuation ->
@ -172,12 +175,37 @@ class MoneroWallet internal constructor(
continuation.invokeOnCancellation { wallet.cancelRefresh() }
}
suspend fun commit(): Boolean = suspendCancellableCoroutine { continuation ->
wallet.commit(object : BaseWalletCallbacks() {
override fun onCommitResult(success: Boolean) {
continuation.resume(success) {}
suspend fun save() =
saveToDataStore(
adapter = defaultStore ?: error("No dataStore associated with this wallet"),
overwrite = true,
)
suspend fun save(targetStore: WalletDataStore, overwrite: Boolean = false) =
saveToDataStore(
adapter = DataStoreAdapter(targetStore),
overwrite = overwrite,
)
private suspend fun saveToDataStore(adapter: DataStoreAdapter, overwrite: Boolean) {
adapter.saveWithFd(overwrite) { fd ->
suspendCoroutine { continuation ->
val callback = object : BaseWalletCallbacks() {
override fun onCommitResult(success: Boolean) {
if (success) {
continuation.resume(Unit)
} else {
continuation.resumeWithException(
InternalRuntimeException(
"Serialization error: Wallet data could not be saved"
)
)
}
}
}
wallet.commit(fd, callback)
}
})
}
}
suspend fun createTransfer(transferRequest: TransferRequest): PendingTransfer =
@ -239,11 +267,6 @@ class MoneroWallet internal constructor(
override fun close() = wallet.close()
}
class NoSuchAccountException(private val accountIndex: Int) : NoSuchElementException() {
override val message: String
get() = "No account was found with the specified index: $accountIndex"
}
private abstract class BaseWalletCallbacks : IWalletCallbacks.Stub() {
override fun onRefreshResult(blockchainTime: BlockchainTime, status: Int) = Unit

View file

@ -1,47 +0,0 @@
package im.molly.monero
import android.os.ParcelFileDescriptor
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.launch
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
internal class StorageAdapter(var dataStore: WalletDataStore?) : IStorageAdapter.Stub() {
private val ioStorageScope = CoroutineScope(Dispatchers.IO + SupervisorJob())
private val mutex = Mutex()
override fun writeAsync(pfd: ParcelFileDescriptor): Boolean {
val localDataStore = dataStore
return if (localDataStore != null) {
val inputStream = ParcelFileDescriptor.AutoCloseInputStream(pfd)
ioStorageScope.launch {
mutex.withLock {
localDataStore.write { output ->
inputStream.copyTo(output)
}
}
}.invokeOnCompletion { inputStream.close() }
true
} else {
pfd.close()
false
}
}
override fun readAsync(pfd: ParcelFileDescriptor) {
val outputStream = ParcelFileDescriptor.AutoCloseOutputStream(pfd)
ioStorageScope.launch {
val localDataStore =
dataStore ?: throw IllegalArgumentException("WalletDataStore cannot be null")
mutex.withLock {
localDataStore.read().use { input ->
input.copyTo(outputStream)
}
}
}.invokeOnCompletion { outputStream.close() }
}
}

View file

@ -2,23 +2,38 @@ package im.molly.monero
import java.io.ByteArrayInputStream
import java.io.ByteArrayOutputStream
import java.io.IOException
import java.io.InputStream
import java.io.OutputStream
interface WalletDataStore {
suspend fun write(writer: (OutputStream) -> Unit)
suspend fun read(): InputStream
@Throws(IOException::class)
suspend fun load(): InputStream
@Throws(IOException::class)
suspend fun save(writer: (OutputStream) -> Unit, overwrite: Boolean)
}
class InMemoryWalletDataStore : WalletDataStore {
class InMemoryWalletDataStore() : WalletDataStore {
private val data = ByteArrayOutputStream()
override suspend fun write(writer: (OutputStream) -> Unit) {
constructor(byteArray: ByteArray) : this() {
data.write(byteArray)
}
override suspend fun load(): InputStream {
return ByteArrayInputStream(data.toByteArray())
}
override suspend fun save(writer: (OutputStream) -> Unit, overwrite: Boolean) {
check(overwrite || data.size() == 0) { "Wallet data already exists" }
data.reset()
writer(data)
}
override suspend fun read(): InputStream {
return ByteArrayInputStream(data.toByteArray())
fun toByteArray(): ByteArray {
return data.toByteArray()
}
}
fun InMemoryWalletDataStore.copy() = InMemoryWalletDataStore(this.toByteArray())

View file

@ -0,0 +1,9 @@
package im.molly.monero.exceptions
class InternalRuntimeException(message: String, cause: Throwable? = null) : RuntimeException(
buildString {
append(message.trimEnd('.'))
append(". This is likely a bug; please report the issue to the Monero SDK team on GitHub.")
},
cause
)

View file

@ -0,0 +1,6 @@
package im.molly.monero.exceptions
class NoSuchAccountException(private val accountIndex: Int) : NoSuchElementException() {
override val message: String
get() = "No account was found with the specified index: $accountIndex"
}

View file

@ -0,0 +1,94 @@
package im.molly.monero.internal
import android.os.ParcelFileDescriptor
import im.molly.monero.WalletDataStore
import im.molly.monero.loggerFor
import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.launch
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import kotlinx.coroutines.withContext
import java.io.FileInputStream
import java.io.FileOutputStream
import java.io.InputStream
import java.io.OutputStream
internal class DataStoreAdapter(
private val dataStore: WalletDataStore,
private val ioDispatcher: CoroutineDispatcher = Dispatchers.IO,
) {
private val mutex = Mutex()
private val storeName: String? = dataStore::class.simpleName
private val logger = loggerFor<DataStoreAdapter>()
suspend fun <T> loadWithFd(
block: suspend (ParcelFileDescriptor) -> T,
): T = withContext(ioDispatcher) {
val (readFd, writeFd) = ParcelFileDescriptor.createPipe()
val writerJob = launch {
FileOutputStream(writeFd.fileDescriptor).use { output ->
load(output)
}
}
writerJob.invokeOnCompletion {
writeFd.close()
}
try {
readFd.use { block(readFd) }
} finally {
writerJob.join()
}
}
suspend fun <T> saveWithFd(
overwrite: Boolean,
block: suspend (ParcelFileDescriptor) -> T,
): T = withContext(ioDispatcher) {
val (readFd, writeFd) = ParcelFileDescriptor.createPipe()
val readerJob = launch {
FileInputStream(readFd.fileDescriptor).use { input ->
save(input, overwrite)
}
}
readerJob.invokeOnCompletion {
readFd.close()
}
try {
writeFd.use { block(writeFd) }
} finally {
readerJob.join()
}
}
private suspend fun load(output: OutputStream) {
try {
mutex.withLock {
dataStore.load().use { input -> input.copyTo(output) }
}
} catch (t: Throwable) {
logger.e("Error loading data from WalletDataStore ($storeName)", t)
throw t
}
}
private suspend fun save(input: InputStream, overwrite: Boolean) {
try {
mutex.withLock {
dataStore.save(
writer = { output -> input.copyTo(output) },
overwrite = overwrite,
)
}
} catch (t: Throwable) {
logger.e("Error saving data to WalletDataStore ($storeName)", t)
throw t
}
}
}

View file

@ -6,17 +6,14 @@ import im.molly.monero.BlockchainTime
import im.molly.monero.CalledByNative
import im.molly.monero.IBalanceListener
import im.molly.monero.IPendingTransfer
import im.molly.monero.IStorageAdapter
import im.molly.monero.ITransferCallback
import im.molly.monero.IWallet
import im.molly.monero.IWalletCallbacks
import im.molly.monero.InMemoryWalletDataStore
import im.molly.monero.Ledger
import im.molly.monero.MoneroNetwork
import im.molly.monero.NativeLoader
import im.molly.monero.PaymentRequest
import im.molly.monero.SecretKey
import im.molly.monero.StorageAdapter
import im.molly.monero.SweepRequest
import im.molly.monero.WalletAccount
import im.molly.monero.estimateTimestamp
@ -33,42 +30,45 @@ import kotlin.coroutines.CoroutineContext
internal class NativeWallet private constructor(
private val network: MoneroNetwork,
private val storageAdapter: IStorageAdapter,
private val rpcClient: IHttpRpcClient?,
private val scope: CoroutineScope,
private val ioDispatcher: CoroutineDispatcher,
) : IWallet.Stub(), Closeable {
companion object {
suspend fun localSyncWallet(
fun localSyncWallet(
networkId: Int,
storageAdapter: IStorageAdapter = StorageAdapter(InMemoryWalletDataStore()),
rpcClient: IHttpRpcClient? = null,
walletDataFd: ParcelFileDescriptor? = null,
secretSpendKey: SecretKey? = null,
restorePoint: Long? = null,
coroutineContext: CoroutineContext = Dispatchers.Default + SupervisorJob(),
ioDispatcher: CoroutineDispatcher = Dispatchers.IO,
) = NativeWallet(
network = MoneroNetwork.fromId(networkId),
storageAdapter = storageAdapter,
rpcClient = rpcClient,
scope = CoroutineScope(coroutineContext),
ioDispatcher = ioDispatcher,
).apply {
when {
secretSpendKey != null -> {
require(restorePoint == null || restorePoint >= 0)
val restorePointOrNow = restorePoint ?: (System.currentTimeMillis() / 1000)
nativeRestoreAccount(handle, secretSpendKey.bytes, restorePointOrNow)
tryWriteState()
}
else -> {
require(restorePoint == null)
readState()
}
if (secretSpendKey != null) {
require(restorePoint == null || restorePoint >= 0)
val restorePointOrNow = restorePoint ?: (System.currentTimeMillis() / 1000)
restoreFromKey(secretSpendKey, restorePointOrNow)
} else {
require(restorePoint == null)
requireNotNull(walletDataFd)
restoreFromStorage(walletDataFd)
}
}
private fun NativeWallet.restoreFromKey(secretSpendKey: SecretKey, restorePoint: Long) {
nativeRestoreAccount(handle, secretSpendKey.bytes, restorePoint)
}
private fun NativeWallet.restoreFromStorage(walletDataFd: ParcelFileDescriptor) {
val loaded = nativeLoad(handle, walletDataFd.fd)
check(loaded)
}
}
private val logger = loggerFor<NativeWallet>()
@ -79,47 +79,6 @@ internal class NativeWallet private constructor(
private val handle: Long = nativeCreate(network.id)
private suspend fun tryWriteState(): Boolean {
return withContext(ioDispatcher) {
val pipe = ParcelFileDescriptor.createPipe()
val readFd = pipe[0]
val writeFd = pipe[1]
val storageIsReady = storageAdapter.writeAsync(readFd)
if (storageAdapter.isRemote()) {
readFd.close()
}
writeFd.use {
if (storageIsReady) {
val result = nativeSave(handle, it.fd)
if (!result) {
logger.e("Wallet data serialization failed")
}
result
} else {
logger.w("Unable to save wallet data because dataStore is unset")
false
}
}
}
}
private suspend fun readState() {
withContext(ioDispatcher) {
val pipe = ParcelFileDescriptor.createPipe()
val readFd = pipe[0]
val writeFd = pipe[1]
storageAdapter.readAsync(writeFd)
if (storageAdapter.isRemote()) {
writeFd.close()
}
readFd.use {
if (!nativeLoad(handle, it.fd)) {
error("Wallet data deserialization failed")
}
}
}
}
override fun getPublicAddress(): String = nativeGetPublicAddress(handle)
fun getCurrentBlockchainTime(): BlockchainTime {
@ -196,10 +155,12 @@ internal class NativeWallet private constructor(
}
}
override fun commit(callback: IWalletCallbacks?) {
override fun commit(outputFd: ParcelFileDescriptor, callback: IWalletCallbacks?) {
scope.launch(ioDispatcher) {
val result = tryWriteState()
callback?.onCommitResult(result)
val saved = nativeSave(handle, outputFd.fd)
callback?.onCommitResult(saved)
}.invokeOnCompletion {
outputFd.close()
}
}
@ -344,7 +305,7 @@ internal class NativeWallet private constructor(
return
}
val batchSize = getMaxIpcSize() / TxInfo.MAX_PARCEL_SIZE_BYTES
val batchSize = getMaxIpcSize() / TxInfo.MAX_PARCEL_SIZE_BYTES
val chunkedSeq = txList.asSequence().chunked(batchSize).iterator()
while (chunkedSeq.hasNext()) {

View file

@ -1,6 +1,6 @@
package im.molly.monero.internal
import im.molly.monero.IStorageAdapter
import android.os.ParcelFileDescriptor
import im.molly.monero.IWallet
import im.molly.monero.LogAdapter
import im.molly.monero.NativeLoader
@ -10,7 +10,6 @@ import im.molly.monero.loggerFor
import im.molly.monero.randomSecretKey
import im.molly.monero.setLoggingAdapter
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.launch
internal class NativeWalletService(
private val serviceScope: CoroutineScope,
@ -38,62 +37,54 @@ internal class NativeWalletService(
override fun createWallet(
config: WalletConfig,
storage: IStorageAdapter,
rpcClient: IHttpRpcClient?,
callback: IWalletServiceCallbacks?,
) {
serviceScope.launch {
val secretSpendKey = randomSecretKey()
val wallet = secretSpendKey.use { secret ->
createOrRestoreWallet(config, storage, rpcClient, secret)
}
callback?.onWalletResult(wallet)
val secretSpendKey = randomSecretKey()
val wallet = secretSpendKey.use { secret ->
createOrRestoreWallet(config, rpcClient, secret)
}
callback?.onWalletResult(wallet)
}
override fun restoreWallet(
config: WalletConfig,
storage: IStorageAdapter,
rpcClient: IHttpRpcClient?,
callback: IWalletServiceCallbacks?,
secretSpendKey: SecretKey,
restorePoint: Long,
) {
serviceScope.launch {
val wallet = secretSpendKey.use { secret ->
createOrRestoreWallet(config, storage, rpcClient, secret, restorePoint)
}
callback?.onWalletResult(wallet)
val wallet = secretSpendKey.use { secret ->
createOrRestoreWallet(config, rpcClient, secret, restorePoint)
}
callback?.onWalletResult(wallet)
}
override fun openWallet(
config: WalletConfig,
storage: IStorageAdapter,
rpcClient: IHttpRpcClient?,
callback: IWalletServiceCallbacks?,
inputFd: ParcelFileDescriptor,
) {
serviceScope.launch {
val wallet = NativeWallet.localSyncWallet(
val wallet = inputFd.use {
NativeWallet.localSyncWallet(
networkId = config.networkId,
storageAdapter = storage,
rpcClient = rpcClient,
walletDataFd = inputFd,
coroutineContext = serviceScope.coroutineContext,
)
callback?.onWalletResult(wallet)
}
callback?.onWalletResult(wallet)
}
private suspend fun createOrRestoreWallet(
private fun createOrRestoreWallet(
config: WalletConfig,
storage: IStorageAdapter,
rpcClient: IHttpRpcClient?,
secretSpendKey: SecretKey,
restorePoint: Long? = null,
): IWallet {
return NativeWallet.localSyncWallet(
networkId = config.networkId,
storageAdapter = storage,
rpcClient = rpcClient,
secretSpendKey = secretSpendKey,
restorePoint = restorePoint,

View file

@ -13,7 +13,6 @@ import im.molly.monero.MoneroNodeClient
import im.molly.monero.MoneroWallet
import im.molly.monero.RestorePoint
import im.molly.monero.SecretKey
import im.molly.monero.StorageAdapter
import im.molly.monero.WalletConfig
import im.molly.monero.WalletDataStore
import im.molly.monero.WalletProvider
@ -33,7 +32,7 @@ internal class WalletServiceClient(
companion object {
suspend fun bindService(
context: Context,
serviceClass: Class<out BaseWalletService>
serviceClass: Class<out BaseWalletService>,
): WalletServiceClient {
val (serviceConnection, service) = bindServiceAwait(context, serviceClass)
return WalletServiceClient(context, service, serviceConnection)
@ -79,15 +78,14 @@ internal class WalletServiceClient(
client: MoneroNodeClient?,
): MoneroWallet {
validateClientNetwork(client, network)
val storageAdapter = StorageAdapter(dataStore)
val wallet = suspendCancellableCoroutine { continuation ->
service.createWallet(
buildConfig(network), storageAdapter, client?.httpRpcClient,
buildConfig(network), client?.httpRpcClient,
WalletResultCallback(continuation),
)
}
return MoneroWallet(wallet, storageAdapter, client)
val storeAdapter = dataStore?.let { DataStoreAdapter(it) }
return MoneroWallet(wallet, storeAdapter, client)
}
override suspend fun restoreWallet(
@ -99,17 +97,16 @@ internal class WalletServiceClient(
): MoneroWallet {
validateClientNetwork(client, network)
validateRestorePoint(restorePoint, network)
val storageAdapter = StorageAdapter(dataStore)
val wallet = suspendCancellableCoroutine { continuation ->
service.restoreWallet(
buildConfig(network), storageAdapter, client?.httpRpcClient,
buildConfig(network), client?.httpRpcClient,
WalletResultCallback(continuation),
secretSpendKey,
restorePoint.toLong(),
)
}
return MoneroWallet(wallet, storageAdapter, client)
val storeAdapter = dataStore?.let { DataStoreAdapter(it) }
return MoneroWallet(wallet, storeAdapter, client)
}
override suspend fun openWallet(
@ -118,15 +115,17 @@ internal class WalletServiceClient(
client: MoneroNodeClient?,
): MoneroWallet {
validateClientNetwork(client, network)
val storageAdapter = StorageAdapter(dataStore)
val wallet = suspendCancellableCoroutine { continuation ->
service.openWallet(
buildConfig(network), storageAdapter, client?.httpRpcClient,
WalletResultCallback(continuation),
)
val storeAdapter = DataStoreAdapter(dataStore)
return storeAdapter.loadWithFd { fd ->
val wallet = suspendCancellableCoroutine { continuation ->
service.openWallet(
buildConfig(network), client?.httpRpcClient,
WalletResultCallback(continuation),
fd,
)
}
MoneroWallet(wallet, storeAdapter, client)
}
return MoneroWallet(wallet, storageAdapter, client)
}
private fun buildConfig(network: MoneroNetwork): WalletConfig {