Moved storage to a separate package

This commit is contained in:
Eduard Urbach 2023-07-10 16:27:41 +02:00
parent 2aa395ed9c
commit 620308fab5
Signed by: akyoto
GPG Key ID: C874F672B1AF20C0
11 changed files with 144 additions and 95 deletions

View File

@ -1,8 +1,6 @@
package ocean_test
import (
"fmt"
"strconv"
"testing"
"git.akyoto.dev/go/assert"
@ -10,7 +8,7 @@ import (
)
func BenchmarkGet(b *testing.B) {
users, err := ocean.New[User]("test")
users, err := ocean.New[User]("test", nil)
assert.Nil(b, err)
defer users.Sync()
@ -23,7 +21,7 @@ func BenchmarkGet(b *testing.B) {
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
users.Get("1")
_, _ = users.Get("1")
}
})
@ -31,7 +29,7 @@ func BenchmarkGet(b *testing.B) {
}
func BenchmarkSet(b *testing.B) {
users, err := ocean.New[User]("test")
users, err := ocean.New[User]("test", nil)
assert.Nil(b, err)
defer users.Sync()
@ -52,7 +50,7 @@ func BenchmarkSet(b *testing.B) {
}
func BenchmarkDelete(b *testing.B) {
users, err := ocean.New[User]("test")
users, err := ocean.New[User]("test", nil)
assert.Nil(b, err)
defer users.Sync()
@ -70,27 +68,10 @@ func BenchmarkDelete(b *testing.B) {
b.StopTimer()
}
func BenchmarkColdStart(b *testing.B) {
users, err := ocean.New[User]("test")
assert.Nil(b, err)
defer users.Sync()
defer users.Clear()
b.Run("100", func(b *testing.B) {
for i := 0; i < 100; i++ {
users.Set(strconv.Itoa(i), &User{Name: fmt.Sprintf("User %d", i)})
func BenchmarkNew(b *testing.B) {
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
ocean.New[User]("test", nil)
}
b.ReportAllocs()
b.ResetTimer()
for n := 0; n < b.N; n++ {
again, err := ocean.New[User]("test")
assert.Nil(b, err)
assert.NotNil(b, again)
}
b.StopTimer()
})
}

View File

@ -10,10 +10,13 @@ import (
type Collection[T any] interface {
All() <-chan *T
Clear()
Data() *sync.Map
Delete(key string)
Exists(key string) bool
Filter(func(*T) bool) <-chan *T
Get(key string) (value *T, err error)
Name() string
Root() string
Set(key string, value *T)
Sync()
}
@ -27,29 +30,31 @@ type collection[T any] struct {
}
// New creates a new collection with the given name.
func New[T any](directories ...string) (*collection[T], error) {
func New[T any](namespace string, storage Storage[T]) (*collection[T], error) {
name := reflect.TypeOf((*T)(nil)).Elem().Name()
c := &collection[T]{
name: name,
}
if storage == nil {
return c, nil
}
home, err := os.UserHomeDir()
if err != nil {
return nil, err
}
directories = append([]string{home, ".ocean"}, directories...)
directory := filepath.Join(directories...)
err = os.MkdirAll(directory, 0700)
c.storage = storage
c.root = filepath.Join(home, ".ocean", namespace)
err = os.MkdirAll(c.root, 0700)
if err != nil {
return nil, err
}
c := &collection[T]{
name: name,
root: directory,
storage: &FileStorage[T]{},
}
return c, c.storage.Init(c)
return c, storage.Init(c)
}
// All returns a channel of all objects in the collection.
@ -76,9 +81,19 @@ func (c *collection[T]) Clear() {
})
}
// Data returns the internal data structure.
func (c *collection[T]) Data() *sync.Map {
return &c.data
}
// Delete deletes a key from the collection.
func (c *collection[T]) Delete(key string) {
c.data.Delete(key)
if c.storage == nil {
return
}
c.storage.Delete(key)
}
@ -118,9 +133,24 @@ func (c *collection[T]) Filter(filter func(*T) bool) <-chan *T {
return channel
}
// Name returns the name of the collection.
func (c *collection[T]) Name() string {
return c.name
}
// Root returns the root path.
func (c *collection[T]) Root() string {
return c.root
}
// Set sets the value for the given key.
func (c *collection[T]) Set(key string, value *T) {
c.data.Store(key, value)
if c.storage == nil {
return
}
err := c.storage.Set(key, value)
if err != nil {
@ -130,5 +160,9 @@ func (c *collection[T]) Set(key string, value *T) {
// Sync waits for all disk writes to finish before it returns.
func (c *collection[T]) Sync() {
if c.storage == nil {
return
}
c.storage.Sync()
}

View File

@ -13,7 +13,7 @@ type User struct {
}
func TestCollection(t *testing.T) {
users, err := ocean.New[User]("test")
users, err := ocean.New[User]("test", nil)
assert.Nil(t, err)
defer users.Sync()
@ -84,28 +84,6 @@ func TestCollection(t *testing.T) {
wg.Wait()
})
t.Run("Persistence", func(t *testing.T) {
users.Sync()
again, err := ocean.New[User]("test")
assert.Nil(t, err)
user, err := again.Get("1")
assert.Nil(t, err)
assert.NotNil(t, user)
assert.Equal(t, user.Name, "User 1")
user, err = again.Get("2")
assert.Nil(t, err)
assert.NotNil(t, user)
assert.Equal(t, user.Name, "User 2")
user, err = again.Get("3")
assert.Nil(t, err)
assert.NotNil(t, user)
assert.Equal(t, user.Name, "User 3")
})
t.Run("Delete", func(t *testing.T) {
assert.True(t, users.Exists("1"))
assert.True(t, users.Exists("2"))

View File

@ -15,7 +15,7 @@ go get git.akyoto.dev/go/ocean
type User struct { Name string }
// Create a new collection
users := ocean.New[User]("todolist")
users := ocean.New[User]("todolist", &storage.File[User]{})
// Store some data
users.Set("1", &User{Name: "User 1"})
@ -33,7 +33,7 @@ for user := range users.All() {
Data will be stored in `~/.ocean/todolist/User.dat`.
## Example file: User.dat
## Format
```json
1

View File

@ -1,8 +1,8 @@
package ocean
type Storage[T any] interface {
Init(c Collection[T]) error
Delete(key string) error
Init(c *collection[T]) error
Set(key string, value *T) error
Sync()
}

View File

@ -1,4 +1,4 @@
package ocean
package storage
import (
"encoding/json"

View File

@ -1,37 +1,42 @@
package ocean
package storage
import (
"os"
"path/filepath"
"strings"
"git.akyoto.dev/go/ocean"
)
// DirectoryStorage creates a directory and stores every record in a separate file.
type DirectoryStorage[T any] struct {
collection *collection[T]
// Directory creates a directory and stores every record in a separate file.
type Directory[T any] struct {
collection ocean.Collection[T]
directory string
}
// Init loads all existing records from the directory.
func (ds *DirectoryStorage[T]) Init(c *collection[T]) error {
func (ds *Directory[T]) Init(c ocean.Collection[T]) error {
ds.collection = c
ds.directory = filepath.Join(c.root, c.name)
ds.directory = filepath.Join(c.Root(), c.Name())
os.Mkdir(ds.directory, 0700)
return ds.read()
}
// Set saves the value in a file.
func (ds *DirectoryStorage[T]) Set(key string, value *T) error {
func (ds *Directory[T]) Set(key string, value *T) error {
return ds.writeFile(key, value)
}
// Delete deletes the file for the given key.
func (ds *DirectoryStorage[T]) Delete(key string) error {
func (ds *Directory[T]) Delete(key string) error {
return os.Remove(ds.keyFile(key))
}
// Sync does nothing when using directory storage.
func (ds *Directory[T]) Sync() {}
// read loads the collection data from the disk.
func (ds *DirectoryStorage[T]) read() error {
func (ds *Directory[T]) read() error {
dir, err := os.Open(ds.directory)
if err != nil {
@ -53,7 +58,7 @@ func (ds *DirectoryStorage[T]) read() error {
}
// readFile loads a single file from the disk.
func (ds *DirectoryStorage[T]) readFile(fileName string) error {
func (ds *Directory[T]) readFile(fileName string) error {
fileName = filepath.Join(ds.directory, fileName)
file, err := os.Open(fileName)
@ -71,12 +76,12 @@ func (ds *DirectoryStorage[T]) readFile(fileName string) error {
}
key := strings.TrimSuffix(fileName, ".json")
ds.collection.data.Store(key, value)
ds.collection.Data().Store(key, value)
return nil
}
// writeFile writes the value for the key to disk as a JSON file.
func (ds *DirectoryStorage[T]) writeFile(key string, value *T) error {
func (ds *Directory[T]) writeFile(key string, value *T) error {
fileName := ds.keyFile(key)
file, err := os.OpenFile(fileName, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0600)
@ -96,6 +101,6 @@ func (ds *DirectoryStorage[T]) writeFile(key string, value *T) error {
}
// keyFile returns the file path for the given key.
func (ds *DirectoryStorage[T]) keyFile(key string) string {
func (ds *Directory[T]) keyFile(key string) string {
return filepath.Join(ds.directory, key+".json")
}

View File

@ -1,4 +1,4 @@
package ocean
package storage
import (
"encoding/json"

View File

@ -1,4 +1,4 @@
package ocean
package storage
import (
"bufio"
@ -10,23 +10,25 @@ import (
"sort"
"sync/atomic"
"time"
"git.akyoto.dev/go/ocean"
)
const diskWriteInterval = 100 * time.Millisecond
type FileStorage[T any] struct {
collection *collection[T]
type File[T any] struct {
collection ocean.Collection[T]
dirty atomic.Uint32
sync chan struct{}
}
func (fs *FileStorage[T]) Init(c *collection[T]) error {
func (fs *File[T]) Init(c ocean.Collection[T]) error {
fs.collection = c
fs.sync = make(chan struct{})
go fs.flushWorker()
fileName := filepath.Join(c.root, c.name+".dat")
fileName := filepath.Join(c.Root(), c.Name()+".dat")
file, err := os.Open(fileName)
if os.IsNotExist(err) {
@ -41,21 +43,21 @@ func (fs *FileStorage[T]) Init(c *collection[T]) error {
return fs.readFrom(file)
}
func (fs *FileStorage[T]) Delete(key string) error {
func (fs *File[T]) Delete(key string) error {
fs.dirty.Store(1)
return nil
}
func (fs *FileStorage[T]) Set(key string, value *T) error {
func (fs *File[T]) Set(key string, value *T) error {
fs.dirty.Store(1)
return nil
}
func (fs *FileStorage[T]) Sync() {
func (fs *File[T]) Sync() {
<-fs.sync
}
func (fs *FileStorage[T]) flushWorker() {
func (fs *File[T]) flushWorker() {
for {
time.Sleep(diskWriteInterval)
@ -76,8 +78,8 @@ func (fs *FileStorage[T]) flushWorker() {
}
}
func (fs *FileStorage[T]) flush() error {
oldPath := filepath.Join(fs.collection.root, fs.collection.name+".dat")
func (fs *File[T]) flush() error {
oldPath := filepath.Join(fs.collection.Root(), fs.collection.Name()+".dat")
newPath := oldPath + ".tmp"
file, err := os.OpenFile(newPath, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0600)
@ -117,7 +119,7 @@ func (fs *FileStorage[T]) flush() error {
}
// readFrom reads the entire collection.
func (fs *FileStorage[T]) readFrom(stream io.Reader) error {
func (fs *File[T]) readFrom(stream io.Reader) error {
var (
key string
value []byte
@ -139,7 +141,7 @@ func (fs *FileStorage[T]) readFrom(stream io.Reader) error {
return err
}
fs.collection.data.Store(key, object)
fs.collection.Data().Store(key, object)
key = ""
}
@ -147,7 +149,7 @@ func (fs *FileStorage[T]) readFrom(stream io.Reader) error {
}
// writeTo writes the entire collection.
func (fs *FileStorage[T]) writeTo(writer io.Writer) error {
func (fs *File[T]) writeTo(writer io.Writer) error {
stringWriter, ok := writer.(io.StringWriter)
if !ok {
@ -156,7 +158,7 @@ func (fs *FileStorage[T]) writeTo(writer io.Writer) error {
records := []keyValue{}
fs.collection.data.Range(func(key, value any) bool {
fs.collection.Data().Range(func(key, value any) bool {
records = append(records, keyValue{
key: key.(string),
value: value,

View File

@ -1,4 +1,4 @@
package ocean
package storage
type keyValue struct {
key string

49
storage/storage_test.go Normal file
View File

@ -0,0 +1,49 @@
package storage_test
import (
"testing"
"git.akyoto.dev/go/assert"
"git.akyoto.dev/go/ocean"
"git.akyoto.dev/go/ocean/storage"
)
type User struct {
Name string `json:"name"`
}
func TestInterface(t *testing.T) {
var _ ocean.Storage[string] = (*storage.File[string])(nil)
var _ ocean.Storage[string] = (*storage.Directory[string])(nil)
}
func TestPersistence(t *testing.T) {
users, err := ocean.New[User]("test", &storage.File[User]{})
assert.Nil(t, err)
defer users.Sync()
defer users.Clear()
users.Set("1", &User{Name: "User 1"})
users.Set("2", &User{Name: "User 2"})
users.Set("3", &User{Name: "User 3"})
users.Sync()
reload, err := ocean.New[User]("test", &storage.File[User]{})
assert.Nil(t, err)
user, err := reload.Get("1")
assert.Nil(t, err)
assert.NotNil(t, user)
assert.Equal(t, user.Name, "User 1")
user, err = reload.Get("2")
assert.Nil(t, err)
assert.NotNil(t, user)
assert.Equal(t, user.Name, "User 2")
user, err = reload.Get("3")
assert.Nil(t, err)
assert.NotNil(t, user)
assert.Equal(t, user.Name, "User 3")
}