Implemented file storage

This commit is contained in:
Eduard Urbach 2023-07-08 17:26:36 +02:00
parent fee6a812d6
commit 20e7c596be
Signed by: akyoto
GPG Key ID: C874F672B1AF20C0
7 changed files with 289 additions and 143 deletions

View File

@ -15,6 +15,7 @@ type Collection[T any] interface {
Filter(func(*T) bool) <-chan *T Filter(func(*T) bool) <-chan *T
Get(key string) (value *T, err error) Get(key string) (value *T, err error)
Set(key string, value *T) Set(key string, value *T)
Sync()
} }
// collection is a hash map of homogeneous data. // collection is a hash map of homogeneous data.
@ -45,7 +46,7 @@ func New[T any](directories ...string) (*collection[T], error) {
c := &collection[T]{ c := &collection[T]{
name: name, name: name,
root: directory, root: directory,
storage: &DirectoryStorage[T]{}, storage: &FileStorage[T]{},
} }
return c, c.storage.Init(c) return c, c.storage.Init(c)
@ -67,6 +68,41 @@ func (c *collection[T]) All() <-chan *T {
return channel return channel
} }
// Clear deletes all objects from the collection.
func (c *collection[T]) Clear() {
c.data.Range(func(key, value any) bool {
c.Delete(key.(string))
return true
})
}
// Delete deletes a key from the collection.
func (c *collection[T]) Delete(key string) {
if !c.Exists(key) {
return
}
c.data.Delete(key)
c.storage.Delete(key)
}
// Exists returns whether or not the key exists.
func (c *collection[T]) Exists(key string) bool {
_, exists := c.data.Load(key)
return exists
}
// Get returns the value for the given key.
func (c *collection[T]) Get(key string) (*T, error) {
value, exists := c.data.Load(key)
if !exists {
return nil, &KeyNotFoundError{Key: key}
}
return value.(*T), nil
}
// Filter returns a channel of all objects that pass the given filter function. // Filter returns a channel of all objects that pass the given filter function.
func (c *collection[T]) Filter(filter func(*T) bool) <-chan *T { func (c *collection[T]) Filter(filter func(*T) bool) <-chan *T {
channel := make(chan *T) channel := make(chan *T)
@ -86,17 +122,6 @@ func (c *collection[T]) Filter(filter func(*T) bool) <-chan *T {
return channel return channel
} }
// Get returns the value for the given key.
func (c *collection[T]) Get(key string) (*T, error) {
value, exists := c.data.Load(key)
if !exists {
return nil, &KeyNotFoundError{Key: key}
}
return value.(*T), nil
}
// Set sets the value for the given key. // Set sets the value for the given key.
func (c *collection[T]) Set(key string, value *T) { func (c *collection[T]) Set(key string, value *T) {
c.data.Store(key, value) c.data.Store(key, value)
@ -107,26 +132,7 @@ func (c *collection[T]) Set(key string, value *T) {
} }
} }
// Delete deletes a key from the collection. // Sync waits for all disk writes to finish before it returns.
func (c *collection[T]) Delete(key string) { func (c *collection[T]) Sync() {
if !c.Exists(key) { c.storage.Sync()
return
}
c.data.Delete(key)
c.storage.Delete(key)
}
// Exists returns whether or not the key exists.
func (c *collection[T]) Exists(key string) bool {
_, exists := c.data.Load(key)
return exists
}
// Clear deletes all objects from the collection.
func (c *collection[T]) Clear() {
c.data.Range(func(key, value any) bool {
c.Delete(key.(string))
return true
})
} }

View File

@ -17,6 +17,7 @@ type User struct {
func TestCollection(t *testing.T) { func TestCollection(t *testing.T) {
users, err := ocean.New[User]("test") users, err := ocean.New[User]("test")
assert.Nil(t, err) assert.Nil(t, err)
defer users.Sync()
defer users.Clear() defer users.Clear()
users.Set("1", &User{Name: "User 1"}) users.Set("1", &User{Name: "User 1"})
@ -85,6 +86,8 @@ func TestCollection(t *testing.T) {
}) })
t.Run("Persistence", func(t *testing.T) { t.Run("Persistence", func(t *testing.T) {
users.Sync()
again, err := ocean.New[User]("test") again, err := ocean.New[User]("test")
assert.Nil(t, err) assert.Nil(t, err)
@ -132,6 +135,7 @@ func TestCollection(t *testing.T) {
func BenchmarkGet(b *testing.B) { func BenchmarkGet(b *testing.B) {
users, err := ocean.New[User]("test") users, err := ocean.New[User]("test")
assert.Nil(b, err) assert.Nil(b, err)
defer users.Sync()
defer users.Clear() defer users.Clear()
users.Set("1", &User{Name: "User 1"}) users.Set("1", &User{Name: "User 1"})
@ -154,6 +158,7 @@ func BenchmarkGet(b *testing.B) {
func BenchmarkSet(b *testing.B) { func BenchmarkSet(b *testing.B) {
users, err := ocean.New[User]("test") users, err := ocean.New[User]("test")
assert.Nil(b, err) assert.Nil(b, err)
defer users.Sync()
defer users.Clear() defer users.Clear()
user := &User{Name: "User 1"} user := &User{Name: "User 1"}
@ -172,6 +177,7 @@ func BenchmarkSet(b *testing.B) {
func BenchmarkDelete(b *testing.B) { func BenchmarkDelete(b *testing.B) {
users, err := ocean.New[User]("test") users, err := ocean.New[User]("test")
assert.Nil(b, err) assert.Nil(b, err)
defer users.Sync()
defer users.Clear() defer users.Clear()
b.ReportAllocs() b.ReportAllocs()
@ -186,23 +192,26 @@ func BenchmarkDelete(b *testing.B) {
b.StopTimer() b.StopTimer()
} }
func BenchmarkColdStart100Files(b *testing.B) { func BenchmarkColdStart(b *testing.B) {
users, err := ocean.New[User]("test") users, err := ocean.New[User]("test")
assert.Nil(b, err) assert.Nil(b, err)
defer users.Sync()
defer users.Clear() defer users.Clear()
for i := 0; i < 100; i++ { b.Run("100 records", func(b *testing.B) {
users.Set(strconv.Itoa(i), &User{Name: fmt.Sprintf("User %d", i)}) for i := 0; i < 100; i++ {
} users.Set(strconv.Itoa(i), &User{Name: fmt.Sprintf("User %d", i)})
}
b.ReportAllocs() b.ReportAllocs()
b.ResetTimer() b.ResetTimer()
for n := 0; n < b.N; n++ { for n := 0; n < b.N; n++ {
again, err := ocean.New[User]("test") again, err := ocean.New[User]("test")
assert.Nil(b, err) assert.Nil(b, err)
assert.NotNil(b, again) assert.NotNil(b, again)
} }
b.StopTimer() b.StopTimer()
})
} }

View File

@ -17,12 +17,12 @@ func (ds *DirectoryStorage[T]) Init(c *collection[T]) error {
ds.collection = c ds.collection = c
ds.directory = filepath.Join(c.root, c.name) ds.directory = filepath.Join(c.root, c.name)
os.Mkdir(ds.directory, 0700) os.Mkdir(ds.directory, 0700)
return ds.loadFromDisk() return ds.read()
} }
// Set saves the value in a file. // Set saves the value in a file.
func (ds *DirectoryStorage[T]) Set(key string, value *T) error { func (ds *DirectoryStorage[T]) Set(key string, value *T) error {
return ds.writeFileToDisk(key, value) return ds.writeFile(key, value)
} }
// Delete deletes the file for the given key. // Delete deletes the file for the given key.
@ -30,13 +30,8 @@ func (ds *DirectoryStorage[T]) Delete(key string) error {
return os.Remove(ds.keyFile(key)) return os.Remove(ds.keyFile(key))
} }
// keyFile returns the file path for the given key. // read loads the collection data from the disk.
func (ds *DirectoryStorage[T]) keyFile(key string) string { func (ds *DirectoryStorage[T]) read() error {
return filepath.Join(ds.directory, key+".json")
}
// loadFromDisk loads the collection data from the disk.
func (ds *DirectoryStorage[T]) loadFromDisk() error {
dir, err := os.Open(ds.directory) dir, err := os.Open(ds.directory)
if err != nil { if err != nil {
@ -47,7 +42,7 @@ func (ds *DirectoryStorage[T]) loadFromDisk() error {
files, err := dir.Readdirnames(0) files, err := dir.Readdirnames(0)
for _, fileName := range files { for _, fileName := range files {
fileError := ds.loadFileFromDisk(fileName) fileError := ds.readFile(fileName)
if fileError != nil { if fileError != nil {
return fileError return fileError
@ -57,30 +52,31 @@ func (ds *DirectoryStorage[T]) loadFromDisk() error {
return err return err
} }
// loadFileFromDisk loads a single file from the disk. // readFile loads a single file from the disk.
func (ds *DirectoryStorage[T]) loadFileFromDisk(fileName string) error { func (ds *DirectoryStorage[T]) readFile(fileName string) error {
file, err := os.Open(filepath.Join(ds.directory, fileName)) fileName = filepath.Join(ds.directory, fileName)
file, err := os.Open(fileName)
if err != nil { if err != nil {
return err return err
} }
defer file.Close()
value := new(T) value := new(T)
decoder := NewDecoder(file) decoder := NewDecoder(file)
err = decoder.Decode(value) err = decoder.Decode(value)
if err != nil { if err != nil {
file.Close()
return err return err
} }
key := strings.TrimSuffix(fileName, ".json") key := strings.TrimSuffix(fileName, ".json")
ds.collection.data.Store(key, value) ds.collection.data.Store(key, value)
return file.Close() return nil
} }
// writeFileToDisk writes the value for the key to disk as a JSON file. // writeFile writes the value for the key to disk as a JSON file.
func (ds *DirectoryStorage[T]) writeFileToDisk(key string, value *T) error { func (ds *DirectoryStorage[T]) writeFile(key string, value *T) error {
fileName := ds.keyFile(key) fileName := ds.keyFile(key)
file, err := os.OpenFile(fileName, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0600) file, err := os.OpenFile(fileName, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0600)
@ -98,3 +94,8 @@ func (ds *DirectoryStorage[T]) writeFileToDisk(key string, value *T) error {
return file.Close() return file.Close()
} }
// keyFile returns the file path for the given key.
func (ds *DirectoryStorage[T]) keyFile(key string) string {
return filepath.Join(ds.directory, key+".json")
}

View File

@ -1,69 +1,185 @@
package ocean package ocean
// import ( import (
// "bufio" "bufio"
// "encoding/json" "encoding/json"
// "io" "io"
// "os" "log"
// "path/filepath" "os"
// ) "path/filepath"
// "runtime"
// type FileStorage[T any] struct { "sort"
// collection *collection[T] "sync/atomic"
// dirty chan struct{} )
// }
// type FileStorage[T any] struct {
// func (fs *FileStorage[T]) Init(c *collection[T]) error { collection *collection[T]
// fs.collection = c dirty atomic.Uint32
// fileName := filepath.Join(c.root, c.name+".dat") sync chan struct{}
// stream, err := os.OpenFile(fileName, os.O_RDONLY, 0600) }
//
// if os.IsNotExist(err) { func (fs *FileStorage[T]) Init(c *collection[T]) error {
// return nil fs.collection = c
// } fs.sync = make(chan struct{})
//
// if err != nil { go fs.flushWorker()
// return err
// } fileName := filepath.Join(c.root, c.name+".dat")
// file, err := os.OpenFile(fileName, os.O_RDONLY, 0600)
// defer stream.Close()
// return fs.readRecords(stream) if os.IsNotExist(err) {
// } return nil
// }
// func (fs *FileStorage[T]) Set(key string, value *T) error {
// return nil if err != nil {
// } return err
// }
// func (fs *FileStorage[T]) Delete(key string) error {
// return nil defer file.Close()
// } return fs.readFrom(file)
// }
// // readRecords reads the entire collection.
// func (fs *FileStorage[T]) readRecords(stream io.Reader) error { func (fs *FileStorage[T]) Delete(key string) error {
// var ( fs.dirty.Store(1)
// key string return nil
// value []byte }
// )
// func (fs *FileStorage[T]) Set(key string, value *T) error {
// scanner := bufio.NewScanner(stream) fs.dirty.Store(1)
// return nil
// for scanner.Scan() { }
// if key == "" {
// key = scanner.Text() func (fs *FileStorage[T]) Sync() {
// continue <-fs.sync
// } }
//
// value = scanner.Bytes() func (fs *FileStorage[T]) flushWorker() {
// object := new(T) for {
// err := json.Unmarshal(value, object) runtime.Gosched()
//
// if err != nil { if fs.dirty.Swap(0) == 0 {
// return err select {
// } case fs.sync <- struct{}{}:
// default:
// fs.collection.data.Store(key, object) }
// key = ""
// } continue
// }
// return nil
// } err := fs.flush()
if err != nil {
log.Println(err)
}
}
}
func (fs *FileStorage[T]) flush() error {
fileName := filepath.Join(fs.collection.root, fs.collection.name+".dat")
file, err := os.OpenFile(fileName, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0600)
if err != nil {
return err
}
bufferedWriter := bufio.NewWriter(file)
err = fs.writeTo(bufferedWriter)
if err != nil {
file.Close()
return err
}
err = bufferedWriter.Flush()
if err != nil {
file.Close()
return err
}
err = file.Sync()
if err != nil {
file.Close()
return err
}
return file.Close()
}
// readFrom reads the entire collection.
func (fs *FileStorage[T]) readFrom(stream io.Reader) error {
var (
key string
value []byte
)
scanner := bufio.NewScanner(stream)
for scanner.Scan() {
if key == "" {
key = scanner.Text()
continue
}
value = scanner.Bytes()
object := new(T)
err := json.Unmarshal(value, object)
if err != nil {
return err
}
fs.collection.data.Store(key, object)
key = ""
}
return nil
}
// writeTo writes the entire collection.
func (fs *FileStorage[T]) writeTo(writer io.Writer) error {
stringWriter, ok := writer.(io.StringWriter)
if !ok {
panic("The given io.Writer is not an io.StringWriter")
}
records := []keyValue{}
fs.collection.data.Range(func(key, value any) bool {
records = append(records, keyValue{
key: key.(string),
value: value,
})
return true
})
sort.Slice(records, func(i, j int) bool {
return records[i].key < records[j].key
})
encoder := NewEncoder(writer)
for _, record := range records {
_, err := stringWriter.WriteString(record.key)
if err != nil {
return err
}
_, err = stringWriter.WriteString("\n")
if err != nil {
return err
}
err = encoder.Encode(record.value)
if err != nil {
return err
}
}
return nil
}

View File

@ -1,6 +1,15 @@
# ocean # ocean
In-memory key value store that saves your data to plain old JSON files. In-memory key value store that saves your data in JSON format.
```
1
{"name":"User 1"}
2
{"name":"User 2"}
3
{"name":"User 3"}
```
If you like, you can operate on your entire data with classic UNIX tools. If you like, you can operate on your entire data with classic UNIX tools.
@ -18,14 +27,14 @@ type User struct {
Name string Name string
} }
// Load existing data from ~/.ocean/User/ // Load existing data from ~/.ocean/User.dat
users := ocean.New[User]() users := ocean.New[User]()
// Store in memory and also store in ~/.ocean/User/1 // Store in memory and also store in ~/.ocean/User.dat
users.Set("1", &User{Name: "User 1"}) users.Set("1", &User{Name: "User 1"})
// Read from memory // Read from memory
firstUser, err := users.Get("1") first, err := users.Get("1")
// Iterate over all users // Iterate over all users
for user := range users.All() { for user := range users.All() {
@ -36,16 +45,14 @@ for user := range users.All() {
In a real project you would usually prefix your collections with a project or company name: In a real project you would usually prefix your collections with a project or company name:
```go ```go
// Data saved to ~/.ocean/google/User/ // Data saved to ~/.ocean/google/User.dat
users := ocean.New[User]("google") users := ocean.New[User]("google")
``` ```
You can add as many directory hierarchies as you need but I recommend using a simple `/namespace/collection/` structure. Disk writes are async and they work like this:
## Limitations 1. Set key and value in memory (sync.Map.Store)
2. Mark the collection as "dirty" (atomic.StoreUint32)
3. Immediately return control to the program
* Keys cannot be empty and they cannot contain a directory separator like `/`. Because a `Set` call doesn't immediately flush the memory to disk, calling `Set` multiple times in a web server request becomes extremely efficient.
* This storage mechanism is only suitable for small to medium data volume.
Ocean isn't meant to be used for big data, however the package is very lightweight so you can combine it with a big data store.

View File

@ -1,7 +1,8 @@
package ocean package ocean
type Storage[T any] interface { type Storage[T any] interface {
Delete(key string) error
Init(c *collection[T]) error Init(c *collection[T]) error
Set(key string, value *T) error Set(key string, value *T) error
Delete(key string) error Sync()
} }

6
keyValue.go Normal file
View File

@ -0,0 +1,6 @@
package ocean
type keyValue struct {
key string
value any
}