nashcft's blog

時々何か書く。

Jetpack DataStore で保存する値はキャッシュされている

タイトルの通りで、実用性を考えたらそれはそうでしょというものなんだけど、確認のために実装を読んだのでそのメモ。というのをしばらく放置していたら実装がガッと書き換えられてしまったので書き換え以前 (= 1.1.0-alpha01 時点) と書き換え後両方の実装を見ていくことにする。

1.1.0-alpha01 時点の実装

1.1.0-alpha01 時点では DataStore の実装 class は single-process 向けと multi-process 向けの2種類ある:

このそれぞれが downstreamFlow という StateFlow を持っていて、ここに保存しているデータを保持している。

@Suppress("UNCHECKED_CAST")
private val downstreamFlow = MutableStateFlow(UnInitialized as State<T>)

Read 側に当たるDataStore.data はこの downstreamFlow の状態がデータ保持済みでなかったり整合性が取れていなかったりする場合にストレージアクセスの依頼を行い、あとは downstreamFlow の中身を見続けるというようになっている。

e.g.) SingleProcessDataStore:

override val data: Flow<T> = flow {
    /**
     ...
     */

    val currentDownStreamFlowState = downstreamFlow.value

    if (currentDownStreamFlowState !is Data) {
        // We need to send a read request because we don't have data yet.
        actor.offer(Message.Read(currentDownStreamFlowState))
    }

    emitAll(
        downstreamFlow.dropWhile {
            // ...
        }.map {
            when (it) {
                is ReadException<T> -> throw it.readException
                is Final<T> -> throw it.finalException
                is Data<T> -> it.value
                is UnInitialized -> error(
                    "..."
                )
            }
        }
    )
}

Write 側の DataStore#updateData を追っていくと最終的にはストレージに新しい値を書き込んだ後にその値で downstreamFlow を更新している。 ストレージアクセスは actor を介して行われるので追うのが少し手間だが、 SingleProcessDataStore では transformAndWrite, MultiProcessDataStore では writeData が実際に downstreamFlow を更新している関数となる。

SingleProcessDataStore:

// downstreamFlow.value must be successfully set to data before calling this
private suspend fun transformAndWrite(
    transform: suspend (t: T) -> T,
    callerContext: CoroutineContext
): T {
    // ...
    val curDataAndHash = downstreamFlow.value as Data<T>
    curDataAndHash.checkHashCode()

    val curData = curDataAndHash.value
    val newData = withContext(callerContext) { transform(curData) }

    // Check that curData has not changed...
    curDataAndHash.checkHashCode()

    return if (curData == newData) {
        curData
    } else {
        connection.writeData(newData)
        downstreamFlow.value = Data(newData, newData.hashCode())
        newData
    }
}

MultiProcessDataStore:

// Write data to disk and return the corresponding version if succeed.
internal suspend fun writeData(newData: T, updateCache: Boolean = true): Int {
    var newVersion: Int = 0

    // ...
    storageConnection.writeScope {
        // ...
        newVersion = sharedCounter.incrementAndGetValue()
        writeData(newData)
        if (updateCache) {
            downstreamFlow.value = Data(newData, newData.hashCode(), newVersion)
        }
    }

    return newVersion
}

書き換え以降の実装

注意: 本節の内容はリリースされていない開発中のコードに関するものなので、今後の変更によってなかったことになる可能性がある

2月の下旬ごろに DataStore の内部実装の大きな変更が行われた。それによって DataStore の実装 class は1つ (DataStoreImpl)に統一され、 single-process / multi-process の切り替えは新しく作られた interface InterProcessCoordinator の実装で行われるようになった。また、キャッシュ部分は DataStore の実装 class が持つ StateFlow だったのが DataStoreInMemoryCache として分離された。

変更に関する主な CL:

ということで、この変更後の状態でキャッシュにの仕組みについて追いかける時に見るのは以下の class になる:

DataStoreImplMultiProcessDataStore の実装をベースに細かい変更が加えられたものというのが現在の状態で、キャッシュ関連は保持している値の取得や更新が downstreamFlow から inMemoryCache に変わった他は流れを追う分には変わりなく読めると思う。

DataStoreInMemoryCache は、以前の downstreamFlow に相当する StateFlow の保持と値の更新に関するちょっとした処理を持った helper class で、 multi-process のためのケアを追加することでコードが大きくなったので切り出したみたいな雰囲気。

おわりに

というわけで DataStore の内部実装で値のキャッシュを持っており DataStore.data を 動かす度にストレージまで読みに行くわけではないので、アプリ側でキャッシュする仕組みを持つ必要はないことがわかった。冒頭にも書いたけどそれはそうという感想しか出てこなくて締まらないので関係ない話をすると DataStore のリリースは随分されていないのでそろそろ何か出してほしいですねという気持ちがある。あと保存データを暗号化する機能を組み込んでほしい。