diff --git a/Collection.go b/Collection.go index b505e3f..631350e 100644 --- a/Collection.go +++ b/Collection.go @@ -15,6 +15,7 @@ type Collection[T any] interface { Filter(func(*T) bool) <-chan *T Get(key string) (value *T, err error) Set(key string, value *T) + Sync() } // collection is a hash map of homogeneous data. @@ -45,7 +46,7 @@ func New[T any](directories ...string) (*collection[T], error) { c := &collection[T]{ name: name, root: directory, - storage: &DirectoryStorage[T]{}, + storage: &FileStorage[T]{}, } return c, c.storage.Init(c) @@ -67,6 +68,41 @@ func (c *collection[T]) All() <-chan *T { return channel } +// Clear deletes all objects from the collection. +func (c *collection[T]) Clear() { + c.data.Range(func(key, value any) bool { + c.Delete(key.(string)) + return true + }) +} + +// Delete deletes a key from the collection. +func (c *collection[T]) Delete(key string) { + if !c.Exists(key) { + return + } + + c.data.Delete(key) + c.storage.Delete(key) +} + +// Exists returns whether or not the key exists. +func (c *collection[T]) Exists(key string) bool { + _, exists := c.data.Load(key) + return exists +} + +// Get returns the value for the given key. +func (c *collection[T]) Get(key string) (*T, error) { + value, exists := c.data.Load(key) + + if !exists { + return nil, &KeyNotFoundError{Key: key} + } + + return value.(*T), nil +} + // Filter returns a channel of all objects that pass the given filter function. func (c *collection[T]) Filter(filter func(*T) bool) <-chan *T { channel := make(chan *T) @@ -86,17 +122,6 @@ func (c *collection[T]) Filter(filter func(*T) bool) <-chan *T { return channel } -// Get returns the value for the given key. -func (c *collection[T]) Get(key string) (*T, error) { - value, exists := c.data.Load(key) - - if !exists { - return nil, &KeyNotFoundError{Key: key} - } - - return value.(*T), nil -} - // Set sets the value for the given key. func (c *collection[T]) Set(key string, value *T) { c.data.Store(key, value) @@ -107,26 +132,7 @@ func (c *collection[T]) Set(key string, value *T) { } } -// Delete deletes a key from the collection. -func (c *collection[T]) Delete(key string) { - if !c.Exists(key) { - return - } - - c.data.Delete(key) - c.storage.Delete(key) -} - -// Exists returns whether or not the key exists. -func (c *collection[T]) Exists(key string) bool { - _, exists := c.data.Load(key) - return exists -} - -// Clear deletes all objects from the collection. -func (c *collection[T]) Clear() { - c.data.Range(func(key, value any) bool { - c.Delete(key.(string)) - return true - }) +// Sync waits for all disk writes to finish before it returns. +func (c *collection[T]) Sync() { + c.storage.Sync() } diff --git a/Collection_test.go b/Collection_test.go index f2dd91a..c160b33 100644 --- a/Collection_test.go +++ b/Collection_test.go @@ -17,6 +17,7 @@ type User struct { func TestCollection(t *testing.T) { users, err := ocean.New[User]("test") assert.Nil(t, err) + defer users.Sync() defer users.Clear() users.Set("1", &User{Name: "User 1"}) @@ -85,6 +86,8 @@ func TestCollection(t *testing.T) { }) t.Run("Persistence", func(t *testing.T) { + users.Sync() + again, err := ocean.New[User]("test") assert.Nil(t, err) @@ -132,6 +135,7 @@ func TestCollection(t *testing.T) { func BenchmarkGet(b *testing.B) { users, err := ocean.New[User]("test") assert.Nil(b, err) + defer users.Sync() defer users.Clear() users.Set("1", &User{Name: "User 1"}) @@ -154,6 +158,7 @@ func BenchmarkGet(b *testing.B) { func BenchmarkSet(b *testing.B) { users, err := ocean.New[User]("test") assert.Nil(b, err) + defer users.Sync() defer users.Clear() user := &User{Name: "User 1"} @@ -172,6 +177,7 @@ func BenchmarkSet(b *testing.B) { func BenchmarkDelete(b *testing.B) { users, err := ocean.New[User]("test") assert.Nil(b, err) + defer users.Sync() defer users.Clear() b.ReportAllocs() @@ -186,23 +192,26 @@ func BenchmarkDelete(b *testing.B) { b.StopTimer() } -func BenchmarkColdStart100Files(b *testing.B) { +func BenchmarkColdStart(b *testing.B) { users, err := ocean.New[User]("test") assert.Nil(b, err) + defer users.Sync() defer users.Clear() - for i := 0; i < 100; i++ { - users.Set(strconv.Itoa(i), &User{Name: fmt.Sprintf("User %d", i)}) - } + b.Run("100 records", func(b *testing.B) { + for i := 0; i < 100; i++ { + users.Set(strconv.Itoa(i), &User{Name: fmt.Sprintf("User %d", i)}) + } - b.ReportAllocs() - b.ResetTimer() + b.ReportAllocs() + b.ResetTimer() - for n := 0; n < b.N; n++ { - again, err := ocean.New[User]("test") - assert.Nil(b, err) - assert.NotNil(b, again) - } + for n := 0; n < b.N; n++ { + again, err := ocean.New[User]("test") + assert.Nil(b, err) + assert.NotNil(b, again) + } - b.StopTimer() + b.StopTimer() + }) } diff --git a/DirectoryStorage.go b/DirectoryStorage.go index 9584a87..0ec380c 100644 --- a/DirectoryStorage.go +++ b/DirectoryStorage.go @@ -17,12 +17,12 @@ func (ds *DirectoryStorage[T]) Init(c *collection[T]) error { ds.collection = c ds.directory = filepath.Join(c.root, c.name) os.Mkdir(ds.directory, 0700) - return ds.loadFromDisk() + return ds.read() } // Set saves the value in a file. func (ds *DirectoryStorage[T]) Set(key string, value *T) error { - return ds.writeFileToDisk(key, value) + return ds.writeFile(key, value) } // Delete deletes the file for the given key. @@ -30,13 +30,8 @@ func (ds *DirectoryStorage[T]) Delete(key string) error { return os.Remove(ds.keyFile(key)) } -// keyFile returns the file path for the given key. -func (ds *DirectoryStorage[T]) keyFile(key string) string { - return filepath.Join(ds.directory, key+".json") -} - -// loadFromDisk loads the collection data from the disk. -func (ds *DirectoryStorage[T]) loadFromDisk() error { +// read loads the collection data from the disk. +func (ds *DirectoryStorage[T]) read() error { dir, err := os.Open(ds.directory) if err != nil { @@ -47,7 +42,7 @@ func (ds *DirectoryStorage[T]) loadFromDisk() error { files, err := dir.Readdirnames(0) for _, fileName := range files { - fileError := ds.loadFileFromDisk(fileName) + fileError := ds.readFile(fileName) if fileError != nil { return fileError @@ -57,30 +52,31 @@ func (ds *DirectoryStorage[T]) loadFromDisk() error { return err } -// loadFileFromDisk loads a single file from the disk. -func (ds *DirectoryStorage[T]) loadFileFromDisk(fileName string) error { - file, err := os.Open(filepath.Join(ds.directory, fileName)) +// readFile loads a single file from the disk. +func (ds *DirectoryStorage[T]) readFile(fileName string) error { + fileName = filepath.Join(ds.directory, fileName) + file, err := os.Open(fileName) if err != nil { return err } + defer file.Close() value := new(T) decoder := NewDecoder(file) err = decoder.Decode(value) if err != nil { - file.Close() return err } key := strings.TrimSuffix(fileName, ".json") ds.collection.data.Store(key, value) - return file.Close() + return nil } -// writeFileToDisk writes the value for the key to disk as a JSON file. -func (ds *DirectoryStorage[T]) writeFileToDisk(key string, value *T) error { +// writeFile writes the value for the key to disk as a JSON file. +func (ds *DirectoryStorage[T]) writeFile(key string, value *T) error { fileName := ds.keyFile(key) file, err := os.OpenFile(fileName, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0600) @@ -98,3 +94,8 @@ func (ds *DirectoryStorage[T]) writeFileToDisk(key string, value *T) error { return file.Close() } + +// keyFile returns the file path for the given key. +func (ds *DirectoryStorage[T]) keyFile(key string) string { + return filepath.Join(ds.directory, key+".json") +} diff --git a/FileStorage.go b/FileStorage.go index ccb6b82..ca57c69 100644 --- a/FileStorage.go +++ b/FileStorage.go @@ -1,69 +1,185 @@ package ocean -// import ( -// "bufio" -// "encoding/json" -// "io" -// "os" -// "path/filepath" -// ) -// -// type FileStorage[T any] struct { -// collection *collection[T] -// dirty chan struct{} -// } -// -// func (fs *FileStorage[T]) Init(c *collection[T]) error { -// fs.collection = c -// fileName := filepath.Join(c.root, c.name+".dat") -// stream, err := os.OpenFile(fileName, os.O_RDONLY, 0600) -// -// if os.IsNotExist(err) { -// return nil -// } -// -// if err != nil { -// return err -// } -// -// defer stream.Close() -// return fs.readRecords(stream) -// } -// -// func (fs *FileStorage[T]) Set(key string, value *T) error { -// return nil -// } -// -// func (fs *FileStorage[T]) Delete(key string) error { -// return nil -// } -// -// // readRecords reads the entire collection. -// func (fs *FileStorage[T]) readRecords(stream io.Reader) error { -// var ( -// key string -// value []byte -// ) -// -// scanner := bufio.NewScanner(stream) -// -// for scanner.Scan() { -// if key == "" { -// key = scanner.Text() -// continue -// } -// -// value = scanner.Bytes() -// object := new(T) -// err := json.Unmarshal(value, object) -// -// if err != nil { -// return err -// } -// -// fs.collection.data.Store(key, object) -// key = "" -// } -// -// return nil -// } +import ( + "bufio" + "encoding/json" + "io" + "log" + "os" + "path/filepath" + "runtime" + "sort" + "sync/atomic" +) + +type FileStorage[T any] struct { + collection *collection[T] + dirty atomic.Uint32 + sync chan struct{} +} + +func (fs *FileStorage[T]) Init(c *collection[T]) error { + fs.collection = c + fs.sync = make(chan struct{}) + + go fs.flushWorker() + + fileName := filepath.Join(c.root, c.name+".dat") + file, err := os.OpenFile(fileName, os.O_RDONLY, 0600) + + if os.IsNotExist(err) { + return nil + } + + if err != nil { + return err + } + + defer file.Close() + return fs.readFrom(file) +} + +func (fs *FileStorage[T]) Delete(key string) error { + fs.dirty.Store(1) + return nil +} + +func (fs *FileStorage[T]) Set(key string, value *T) error { + fs.dirty.Store(1) + return nil +} + +func (fs *FileStorage[T]) Sync() { + <-fs.sync +} + +func (fs *FileStorage[T]) flushWorker() { + for { + runtime.Gosched() + + if fs.dirty.Swap(0) == 0 { + select { + case fs.sync <- struct{}{}: + default: + } + + continue + } + + err := fs.flush() + + if err != nil { + log.Println(err) + } + } +} + +func (fs *FileStorage[T]) flush() error { + fileName := filepath.Join(fs.collection.root, fs.collection.name+".dat") + file, err := os.OpenFile(fileName, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0600) + + if err != nil { + return err + } + + bufferedWriter := bufio.NewWriter(file) + err = fs.writeTo(bufferedWriter) + + if err != nil { + file.Close() + return err + } + + err = bufferedWriter.Flush() + + if err != nil { + file.Close() + return err + } + + err = file.Sync() + + if err != nil { + file.Close() + return err + } + + return file.Close() +} + +// readFrom reads the entire collection. +func (fs *FileStorage[T]) readFrom(stream io.Reader) error { + var ( + key string + value []byte + ) + + scanner := bufio.NewScanner(stream) + + for scanner.Scan() { + if key == "" { + key = scanner.Text() + continue + } + + value = scanner.Bytes() + object := new(T) + err := json.Unmarshal(value, object) + + if err != nil { + return err + } + + fs.collection.data.Store(key, object) + key = "" + } + + return nil +} + +// writeTo writes the entire collection. +func (fs *FileStorage[T]) writeTo(writer io.Writer) error { + stringWriter, ok := writer.(io.StringWriter) + + if !ok { + panic("The given io.Writer is not an io.StringWriter") + } + + records := []keyValue{} + + fs.collection.data.Range(func(key, value any) bool { + records = append(records, keyValue{ + key: key.(string), + value: value, + }) + return true + }) + + sort.Slice(records, func(i, j int) bool { + return records[i].key < records[j].key + }) + + encoder := NewEncoder(writer) + + for _, record := range records { + _, err := stringWriter.WriteString(record.key) + + if err != nil { + return err + } + + _, err = stringWriter.WriteString("\n") + + if err != nil { + return err + } + + err = encoder.Encode(record.value) + + if err != nil { + return err + } + } + + return nil +} diff --git a/README.md b/README.md index a3ddd9b..be69d25 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,15 @@ # ocean -In-memory key value store that saves your data to plain old JSON files. +In-memory key value store that saves your data in JSON format. + +``` +1 +{"name":"User 1"} +2 +{"name":"User 2"} +3 +{"name":"User 3"} +``` If you like, you can operate on your entire data with classic UNIX tools. @@ -18,14 +27,14 @@ type User struct { Name string } -// Load existing data from ~/.ocean/User/ +// Load existing data from ~/.ocean/User.dat users := ocean.New[User]() -// Store in memory and also store in ~/.ocean/User/1 +// Store in memory and also store in ~/.ocean/User.dat users.Set("1", &User{Name: "User 1"}) // Read from memory -firstUser, err := users.Get("1") +first, err := users.Get("1") // Iterate over all users for user := range users.All() { @@ -36,16 +45,14 @@ for user := range users.All() { In a real project you would usually prefix your collections with a project or company name: ```go -// Data saved to ~/.ocean/google/User/ +// Data saved to ~/.ocean/google/User.dat users := ocean.New[User]("google") ``` -You can add as many directory hierarchies as you need but I recommend using a simple `/namespace/collection/` structure. +Disk writes are async and they work like this: -## Limitations +1. Set key and value in memory (sync.Map.Store) +2. Mark the collection as "dirty" (atomic.StoreUint32) +3. Immediately return control to the program -* Keys cannot be empty and they cannot contain a directory separator like `/`. - -* This storage mechanism is only suitable for small to medium data volume. - -Ocean isn't meant to be used for big data, however the package is very lightweight so you can combine it with a big data store. +Because a `Set` call doesn't immediately flush the memory to disk, calling `Set` multiple times in a web server request becomes extremely efficient. diff --git a/Storage.go b/Storage.go index b9ce7eb..6fbeecf 100644 --- a/Storage.go +++ b/Storage.go @@ -1,7 +1,8 @@ package ocean type Storage[T any] interface { + Delete(key string) error Init(c *collection[T]) error Set(key string, value *T) error - Delete(key string) error + Sync() } diff --git a/keyValue.go b/keyValue.go new file mode 100644 index 0000000..7e4d60d --- /dev/null +++ b/keyValue.go @@ -0,0 +1,6 @@ +package ocean + +type keyValue struct { + key string + value any +}