From 1a8843aa85b4bb075eff3e7ccc81f05fe477c008 Mon Sep 17 00:00:00 2001 From: Jun Yan Date: Tue, 21 Nov 2023 22:35:33 -0800 Subject: [PATCH 1/3] Add InMemoryStore and simplify concurrency --- Sources/SwiftFileStore/FileObjectStore.swift | 44 +++++------- .../SwiftFileStore/MemoryObjectStore.swift | 64 +++++++++++++++++ Sources/SwiftFileStore/ObserverManager.swift | 26 +++---- .../FileObjectStoreTests.swift | 72 +++++++++++++++++++ .../MemoryObjectStoreTests.swift | 62 ++++++++++++++++ .../SwiftFileStoreTests.swift | 7 -- Tests/SwiftFileStoreTests/TestObject.swift | 13 ++++ 7 files changed, 240 insertions(+), 48 deletions(-) create mode 100644 Sources/SwiftFileStore/MemoryObjectStore.swift create mode 100644 Tests/SwiftFileStoreTests/FileObjectStoreTests.swift create mode 100644 Tests/SwiftFileStoreTests/MemoryObjectStoreTests.swift delete mode 100644 Tests/SwiftFileStoreTests/SwiftFileStoreTests.swift create mode 100644 Tests/SwiftFileStoreTests/TestObject.swift diff --git a/Sources/SwiftFileStore/FileObjectStore.swift b/Sources/SwiftFileStore/FileObjectStore.swift index b9a6759..fcc7c15 100644 --- a/Sources/SwiftFileStore/FileObjectStore.swift +++ b/Sources/SwiftFileStore/FileObjectStore.swift @@ -28,47 +28,41 @@ public final class FileObjectStore: ObjectStore { } public func read(key: String, namespace: String, objectType _: T.Type) async throws -> T? where T: DataRepresentable { - let readTask = Task { () -> T? in - let fileURL = rootDir.appendingPathComponent(namespace).appendingPathComponent(key) - if FileManager.default.fileExists(atPath: fileURL.path) { - return try T.from(data: Data(contentsOf: fileURL)) - } else { - return nil - } + let fileURL = rootDir.appendingPathComponent(namespace).appendingPathComponent(key) + if FileManager.default.fileExists(atPath: fileURL.path) { + return try T.from(data: Data(contentsOf: fileURL)) + } else { + return nil } - return try await readTask.value } public func write(key: String, namespace: String, object: T) async throws where T: DataRepresentable { - let writeTask = Task { () in - let dirURL = rootDir.appendingPathComponent(namespace) - let fileURL = dirURL.appendingPathComponent(key) - try FileManager.default.createDirIfNotExist(url: dirURL) - try object.serialize().write(to: fileURL) + let dirURL = rootDir.appendingPathComponent(namespace) + let fileURL = dirURL.appendingPathComponent(key) + try FileManager.default.createDirIfNotExist(url: dirURL) + try object.serialize().write(to: fileURL) + Task { await observerManager.publishValue(key: key, namespace: namespace, value: object) } - return try await writeTask.value } public func remove(key: String, namespace: String) async throws { - let removeTask = Task { - let dirURL = rootDir.appendingPathComponent(namespace) - let fileURL = dirURL.appendingPathComponent(key) - if FileManager.default.fileExists(atPath: fileURL.path) { - try FileManager.default.removeItem(at: fileURL) - } + let dirURL = rootDir.appendingPathComponent(namespace) + let fileURL = dirURL.appendingPathComponent(key) + if FileManager.default.fileExists(atPath: fileURL.path) { + try FileManager.default.removeItem(at: fileURL) + } + Task { await observerManager.publishRemoval(namespace: namespace, key: key) } - return try await removeTask.value } public func removeAll(namespace: String) async throws { - let removeAllTask = Task { - let dirURL = rootDir.appendingPathComponent(namespace) - try FileManager.default.removeItem(at: dirURL) + let dirURL = rootDir.appendingPathComponent(namespace) + try FileManager.default.removeItem(at: dirURL) + Task { await observerManager.publishRemoval(namespace: namespace) } - return try await removeAllTask.value } public func observe(key: String, namespace: String, objectType: T.Type) async -> AsyncThrowingStream where T: DataRepresentable { diff --git a/Sources/SwiftFileStore/MemoryObjectStore.swift b/Sources/SwiftFileStore/MemoryObjectStore.swift new file mode 100644 index 0000000..50a18e4 --- /dev/null +++ b/Sources/SwiftFileStore/MemoryObjectStore.swift @@ -0,0 +1,64 @@ +// +// File.swift +// +// +// Created by Jun Yan on 11/21/23. +// + +import Foundation + + +/// A fallback `ObjectStore` in case file operation fails. Can also be used for unit tests. +public actor MemoryObjectStore: ObjectStore { + + private var objects: [String: [String: Data]] = [:] + private let observerManager = ObserverManager() + + + public func read(key: String, namespace: String, objectType: T.Type) async throws -> T? where T : DataRepresentable { + return objects[namespace]?[key].flatMap { try? T.from(data: $0) } + } + + public func write(key: String, namespace: String, object: T) async throws where T : DataRepresentable { + let data = try object.serialize() + objects[namespace, default: [:]][key] = data + await observerManager.publishValue(key: key, namespace: namespace, value: object) + } + + public func remove(key: String, namespace: String) async throws { + objects[namespace, default: [:]][key] = nil + await observerManager.publishRemoval(namespace: namespace, key: key) + } + + public func removeAll(namespace: String) async throws { + objects[namespace] = nil + await observerManager.publishRemoval(namespace: namespace) + } + + public func observe(key: String, namespace: String, objectType: T.Type) async -> AsyncThrowingStream where T : DataRepresentable { + let observer = await observerManager.getObserver(key: key, namespace: namespace) + do { + let existingValue = try await read(key: key, namespace: namespace, objectType: objectType) + return AsyncThrowingStream { continuation in + continuation.yield(existingValue) + let callbackID = UUID().uuidString + observer.registerCallback(id: callbackID) { data in + if let d = data, let typed = d as? T { + continuation.yield(typed) + } else if data == nil { + continuation.yield(nil) + } else { + continuation.finish(throwing: "invalid data type") + } + } + continuation.onTermination = { @Sendable _ in + observer.callbacks.removeValue(forKey: callbackID) + } + } + } catch { + return AsyncThrowingStream { continuation in + continuation.finish(throwing: error) + } + } + } +} diff --git a/Sources/SwiftFileStore/ObserverManager.swift b/Sources/SwiftFileStore/ObserverManager.swift index c76fd00..2f5809c 100644 --- a/Sources/SwiftFileStore/ObserverManager.swift +++ b/Sources/SwiftFileStore/ObserverManager.swift @@ -36,32 +36,26 @@ actor ObserverManager { } func publishValue(key: String, namespace: String, value: T) where T: DataRepresentable { - Task.detached(priority: .background) { - if let observer = await self.observers[namespace]?[key] { - observer.callbacks.values.forEach { callback in - callback(value) - } + if let observer = self.observers[namespace]?[key] { + observer.callbacks.values.forEach { callback in + callback(value) } } } func publishRemoval(namespace: String, key: String) { - Task.detached(priority: .background) { - if let observer = await self.observers[namespace]?[key] { - observer.callbacks.values.forEach { callback in - callback(nil) - } + if let observer = self.observers[namespace]?[key] { + observer.callbacks.values.forEach { callback in + callback(nil) } } } func publishRemoval(namespace: String) { - Task.detached(priority: .background) { - if let namespaceObservers = await self.observers[namespace]?.values { - namespaceObservers.forEach { observer in - observer.callbacks.values.forEach { callback in - callback(nil) - } + if let namespaceObservers = self.observers[namespace]?.values { + namespaceObservers.forEach { observer in + observer.callbacks.values.forEach { callback in + callback(nil) } } } diff --git a/Tests/SwiftFileStoreTests/FileObjectStoreTests.swift b/Tests/SwiftFileStoreTests/FileObjectStoreTests.swift new file mode 100644 index 0000000..ac9508a --- /dev/null +++ b/Tests/SwiftFileStoreTests/FileObjectStoreTests.swift @@ -0,0 +1,72 @@ +import Combine +import XCTest +@testable import SwiftFileStore + +final class FileObjectStoreTests: XCTestCase { + + var store: FileObjectStore! + + override func setUp() { + super.setUp() + store = try! FileObjectStore.create() + } + + override func tearDown() { + super.tearDown() + try! FileManager.default.removeItem(at: store.rootDir) + } + + func test_readWrite() async throws { + let object = TestObject(value: 2) + try await store.write(key: "test", namespace: "test", object: object) + let readResult = try await store.read(key: "test", namespace: "test", objectType: TestObject.self) + XCTAssertEqual(readResult, object) + } + + func test_deletetNamespace() async throws { + let object = TestObject(value: 1) + let object2 = TestObject(value: 2) + try await store.write(key: "test", namespace: "test", object: object) + try await store.write(key: "test2", namespace: "test", object: object2) + try await store.removeAll(namespace: "test") + + let readResult = try await store.read(key: "test", namespace: "test", objectType: TestObject.self) + let readResult2 = try await store.read(key: "test2", namespace: "test", objectType: TestObject.self) + XCTAssertNil(readResult) + XCTAssertNil(readResult2) + } + + func test_deleteObject() async throws { + let object = TestObject(value: 1) + try await store.write(key: "test", namespace: "test", object: object) + let readResult = try await store.read(key: "test", namespace: "test", objectType: TestObject.self) + XCTAssertNotNil(readResult) + try await store.remove(key: "test", namespace: "test") + let readResult2 = try await store.read(key: "test", namespace: "test", objectType: TestObject.self) + XCTAssertNil(readResult2) + } + + func test_observer() async throws { + let object = TestObject(value: 1) + let object2 = TestObject(value: 2) + let expectation = XCTestExpectation(description: "stream subscription") + let expectation2 = XCTestExpectation(description: "stream breaks") + Task { + var values: [TestObject?] = [] + let stream = await store.observe(key: "test", namespace: "test", objectType: TestObject.self) + expectation.fulfill() + for try await value in stream { + values.append(value) + if values.count == 3 { + break + } + } + XCTAssertEqual(values, [nil, object, object2]) + expectation2.fulfill() + } + await fulfillment(of: [expectation], timeout: 1) + try await store.write(key: "test", namespace: "test", object: object) + try await store.write(key: "test", namespace: "test", object: object2) + await fulfillment(of: [expectation2], timeout: 1) + } +} diff --git a/Tests/SwiftFileStoreTests/MemoryObjectStoreTests.swift b/Tests/SwiftFileStoreTests/MemoryObjectStoreTests.swift new file mode 100644 index 0000000..1531cff --- /dev/null +++ b/Tests/SwiftFileStoreTests/MemoryObjectStoreTests.swift @@ -0,0 +1,62 @@ +import Combine +import XCTest +@testable import SwiftFileStore + +final class MemoryObjectStoreTests: XCTestCase { + + let store = MemoryObjectStore() + + func test_readWrite() async throws { + let object = TestObject(value: 2) + try await store.write(key: "test", namespace: "test", object: object) + let readResult = try await store.read(key: "test", namespace: "test", objectType: TestObject.self) + XCTAssertEqual(readResult, object) + } + + func test_deletetNamespace() async throws { + let object = TestObject(value: 1) + let object2 = TestObject(value: 2) + try await store.write(key: "test", namespace: "test", object: object) + try await store.write(key: "test2", namespace: "test", object: object2) + try await store.removeAll(namespace: "test") + + let readResult = try await store.read(key: "test", namespace: "test", objectType: TestObject.self) + let readResult2 = try await store.read(key: "test2", namespace: "test", objectType: TestObject.self) + XCTAssertNil(readResult) + XCTAssertNil(readResult2) + } + + func test_deleteObject() async throws { + let object = TestObject(value: 1) + try await store.write(key: "test", namespace: "test", object: object) + let readResult = try await store.read(key: "test", namespace: "test", objectType: TestObject.self) + XCTAssertNotNil(readResult) + try await store.remove(key: "test", namespace: "test") + let readResult2 = try await store.read(key: "test", namespace: "test", objectType: TestObject.self) + XCTAssertNil(readResult2) + } + + func test_observer() async throws { + let object = TestObject(value: 1) + let object2 = TestObject(value: 2) + let expectation = XCTestExpectation(description: "stream subscription") + let expectation2 = XCTestExpectation(description: "stream breaks") + Task { + var values: [TestObject?] = [] + let stream = await store.observe(key: "test", namespace: "test", objectType: TestObject.self) + expectation.fulfill() + for try await value in stream { + values.append(value) + if values.count == 3 { + break + } + } + XCTAssertEqual(values, [nil, object, object2]) + expectation2.fulfill() + } + await fulfillment(of: [expectation], timeout: 1) + try await store.write(key: "test", namespace: "test", object: object) + try await store.write(key: "test", namespace: "test", object: object2) + await fulfillment(of: [expectation2], timeout: 1) + } +} diff --git a/Tests/SwiftFileStoreTests/SwiftFileStoreTests.swift b/Tests/SwiftFileStoreTests/SwiftFileStoreTests.swift deleted file mode 100644 index fa5752e..0000000 --- a/Tests/SwiftFileStoreTests/SwiftFileStoreTests.swift +++ /dev/null @@ -1,7 +0,0 @@ -import Combine -import XCTest -@testable import SwiftFileStore - -final class SwiftFileStoreTests: XCTestCase { - -} diff --git a/Tests/SwiftFileStoreTests/TestObject.swift b/Tests/SwiftFileStoreTests/TestObject.swift new file mode 100644 index 0000000..b435954 --- /dev/null +++ b/Tests/SwiftFileStoreTests/TestObject.swift @@ -0,0 +1,13 @@ +// +// File.swift +// +// +// Created by Jun Yan on 11/21/23. +// + +import Foundation +@testable import SwiftFileStore + +struct TestObject: Codable, JSONDataRepresentable, Equatable { + let value: Int +} From 63dc5027e3fc077ce27643db1293523dd722a2be Mon Sep 17 00:00:00 2001 From: Jun Yan Date: Tue, 21 Nov 2023 22:44:29 -0800 Subject: [PATCH 2/3] work around macos CI issue --- .../FileObjectStoreTests.swift | 36 +++++++++++++++++++ 1 file changed, 36 insertions(+) diff --git a/Tests/SwiftFileStoreTests/FileObjectStoreTests.swift b/Tests/SwiftFileStoreTests/FileObjectStoreTests.swift index ac9508a..0dbe996 100644 --- a/Tests/SwiftFileStoreTests/FileObjectStoreTests.swift +++ b/Tests/SwiftFileStoreTests/FileObjectStoreTests.swift @@ -70,3 +70,39 @@ final class FileObjectStoreTests: XCTestCase { await fulfillment(of: [expectation2], timeout: 1) } } + + +#if os(Linux) +import XCTest + +extension XCTestCase { + /// Wait on an array of expectations for up to the specified timeout, and optionally specify whether they + /// must be fulfilled in the given order. May return early based on fulfillment of the waited on expectations. + /// + /// - Parameter expectations: The expectations to wait on. + /// - Parameter timeout: The maximum total time duration to wait on all expectations. + /// - Parameter enforceOrder: Specifies whether the expectations must be fulfilled in the order + /// they are specified in the `expectations` Array. Default is false. + /// - Parameter file: The file name to use in the error message if + /// expectations are not fulfilled before the given timeout. Default is the file + /// containing the call to this method. It is rare to provide this + /// parameter when calling this method. + /// - Parameter line: The line number to use in the error message if the + /// expectations are not fulfilled before the given timeout. Default is the line + /// number of the call to this method in the calling file. It is rare to + /// provide this parameter when calling this method. + /// + /// - SeeAlso: XCTWaiter + func fulfillment(of expectations: [XCTestExpectation], timeout: TimeInterval, enforceOrder: Bool = false) async { + return await withCheckedContinuation { continuation in + // This function operates by blocking a background thread instead of one owned by libdispatch or by the + // Swift runtime (as used by Swift concurrency.) To ensure we use a thread owned by neither subsystem, use + // Foundation's Thread.detachNewThread(_:). + Thread.detachNewThread { [self] in + wait(for: expectations, timeout: timeout, enforceOrder: enforceOrder) + continuation.resume() + } + } + } +} +#endif From 6cf54e47342c347f64a8ce34f19ad320aef56127 Mon Sep 17 00:00:00 2001 From: Jun Yan Date: Tue, 21 Nov 2023 22:52:23 -0800 Subject: [PATCH 3/3] fix gh action attempt 2 --- .github/workflows/swift.yml | 5 +-- .../FileObjectStoreTests.swift | 36 ------------------- 2 files changed, 3 insertions(+), 38 deletions(-) diff --git a/.github/workflows/swift.yml b/.github/workflows/swift.yml index cda61a2..96ca259 100644 --- a/.github/workflows/swift.yml +++ b/.github/workflows/swift.yml @@ -11,10 +11,11 @@ on: jobs: build: - runs-on: macos-latest - steps: + - uses: swift-actions/setup-swift@v1 + with: + swift-version: '5.5' - uses: actions/checkout@v3 - name: Build run: swift build -v diff --git a/Tests/SwiftFileStoreTests/FileObjectStoreTests.swift b/Tests/SwiftFileStoreTests/FileObjectStoreTests.swift index 0dbe996..ac9508a 100644 --- a/Tests/SwiftFileStoreTests/FileObjectStoreTests.swift +++ b/Tests/SwiftFileStoreTests/FileObjectStoreTests.swift @@ -70,39 +70,3 @@ final class FileObjectStoreTests: XCTestCase { await fulfillment(of: [expectation2], timeout: 1) } } - - -#if os(Linux) -import XCTest - -extension XCTestCase { - /// Wait on an array of expectations for up to the specified timeout, and optionally specify whether they - /// must be fulfilled in the given order. May return early based on fulfillment of the waited on expectations. - /// - /// - Parameter expectations: The expectations to wait on. - /// - Parameter timeout: The maximum total time duration to wait on all expectations. - /// - Parameter enforceOrder: Specifies whether the expectations must be fulfilled in the order - /// they are specified in the `expectations` Array. Default is false. - /// - Parameter file: The file name to use in the error message if - /// expectations are not fulfilled before the given timeout. Default is the file - /// containing the call to this method. It is rare to provide this - /// parameter when calling this method. - /// - Parameter line: The line number to use in the error message if the - /// expectations are not fulfilled before the given timeout. Default is the line - /// number of the call to this method in the calling file. It is rare to - /// provide this parameter when calling this method. - /// - /// - SeeAlso: XCTWaiter - func fulfillment(of expectations: [XCTestExpectation], timeout: TimeInterval, enforceOrder: Bool = false) async { - return await withCheckedContinuation { continuation in - // This function operates by blocking a background thread instead of one owned by libdispatch or by the - // Swift runtime (as used by Swift concurrency.) To ensure we use a thread owned by neither subsystem, use - // Foundation's Thread.detachNewThread(_:). - Thread.detachNewThread { [self] in - wait(for: expectations, timeout: timeout, enforceOrder: enforceOrder) - continuation.resume() - } - } - } -} -#endif