193 lines
2.9 KiB
Go
Raw Normal View History

2023-07-07 16:05:52 +00:00
package ocean
2023-07-08 15:26:36 +00:00
import (
"bufio"
"encoding/json"
"io"
"log"
"os"
"path/filepath"
"sort"
"sync/atomic"
2023-07-08 17:08:28 +00:00
"time"
2023-07-08 15:26:36 +00:00
)
type FileStorage[T any] struct {
collection *collection[T]
dirty atomic.Uint32
sync chan struct{}
}
func (fs *FileStorage[T]) Init(c *collection[T]) error {
fs.collection = c
fs.sync = make(chan struct{})
go fs.flushWorker()
fileName := filepath.Join(c.root, c.name+".dat")
file, err := os.OpenFile(fileName, os.O_RDONLY, 0600)
if os.IsNotExist(err) {
return nil
}
if err != nil {
return err
}
defer file.Close()
return fs.readFrom(file)
}
func (fs *FileStorage[T]) Delete(key string) error {
fs.dirty.Store(1)
return nil
}
func (fs *FileStorage[T]) Set(key string, value *T) error {
fs.dirty.Store(1)
return nil
}
func (fs *FileStorage[T]) Sync() {
<-fs.sync
}
func (fs *FileStorage[T]) flushWorker() {
for {
2023-07-08 17:08:28 +00:00
time.Sleep(time.Millisecond)
2023-07-08 15:26:36 +00:00
if fs.dirty.Swap(0) == 0 {
select {
case fs.sync <- struct{}{}:
default:
}
continue
}
err := fs.flush()
if err != nil {
log.Println(err)
}
}
}
func (fs *FileStorage[T]) flush() error {
2023-07-08 17:08:28 +00:00
oldPath := filepath.Join(fs.collection.root, fs.collection.name+".dat")
newPath := oldPath + ".tmp"
file, err := os.OpenFile(newPath, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0600)
2023-07-08 15:26:36 +00:00
if err != nil {
return err
}
bufferedWriter := bufio.NewWriter(file)
err = fs.writeTo(bufferedWriter)
if err != nil {
file.Close()
return err
}
err = bufferedWriter.Flush()
if err != nil {
file.Close()
return err
}
err = file.Sync()
if err != nil {
file.Close()
return err
}
2023-07-08 17:08:28 +00:00
err = file.Close()
if err != nil {
return err
}
return os.Rename(newPath, oldPath)
2023-07-08 15:26:36 +00:00
}
// readFrom reads the entire collection.
func (fs *FileStorage[T]) readFrom(stream io.Reader) error {
var (
key string
value []byte
)
scanner := bufio.NewScanner(stream)
for scanner.Scan() {
if key == "" {
key = scanner.Text()
continue
}
value = scanner.Bytes()
object := new(T)
err := json.Unmarshal(value, object)
if err != nil {
return err
}
fs.collection.data.Store(key, object)
key = ""
}
return nil
}
// writeTo writes the entire collection.
func (fs *FileStorage[T]) writeTo(writer io.Writer) error {
stringWriter, ok := writer.(io.StringWriter)
if !ok {
panic("The given io.Writer is not an io.StringWriter")
}
records := []keyValue{}
fs.collection.data.Range(func(key, value any) bool {
records = append(records, keyValue{
key: key.(string),
value: value,
})
return true
})
sort.Slice(records, func(i, j int) bool {
return records[i].key < records[j].key
})
encoder := NewEncoder(writer)
for _, record := range records {
_, err := stringWriter.WriteString(record.key)
if err != nil {
return err
}
_, err = stringWriter.WriteString("\n")
if err != nil {
return err
}
err = encoder.Encode(record.value)
if err != nil {
return err
}
}
return nil
}