197 lines
2.9 KiB
Go
Raw Normal View History

2023-07-10 14:27:41 +00:00
package storage
2023-07-07 16:05:52 +00:00
2023-07-08 15:26:36 +00:00
import (
"bufio"
"encoding/json"
"io"
"log"
"os"
"path/filepath"
"sort"
"sync/atomic"
2023-07-08 17:08:28 +00:00
"time"
2023-07-10 14:27:41 +00:00
"git.akyoto.dev/go/ocean"
2023-07-08 15:26:36 +00:00
)
2023-07-08 20:10:02 +00:00
const diskWriteInterval = 100 * time.Millisecond
2023-07-10 14:27:41 +00:00
type File[T any] struct {
2023-07-12 08:58:20 +00:00
collection ocean.StorageData
2023-07-08 15:26:36 +00:00
dirty atomic.Uint32
sync chan struct{}
}
2023-07-12 08:58:20 +00:00
func (fs *File[T]) Init(c ocean.StorageData) error {
2023-07-08 15:26:36 +00:00
fs.collection = c
fs.sync = make(chan struct{})
go fs.flushWorker()
2023-07-10 14:27:41 +00:00
fileName := filepath.Join(c.Root(), c.Name()+".dat")
2023-07-08 20:10:02 +00:00
file, err := os.Open(fileName)
2023-07-08 15:26:36 +00:00
if os.IsNotExist(err) {
return nil
}
if err != nil {
return err
}
defer file.Close()
return fs.readFrom(file)
}
2023-07-10 14:27:41 +00:00
func (fs *File[T]) Delete(key string) error {
2023-07-08 15:26:36 +00:00
fs.dirty.Store(1)
return nil
}
2023-07-10 14:27:41 +00:00
func (fs *File[T]) Set(key string, value *T) error {
2023-07-08 15:26:36 +00:00
fs.dirty.Store(1)
return nil
}
2023-07-10 14:27:41 +00:00
func (fs *File[T]) Sync() {
2023-07-08 15:26:36 +00:00
<-fs.sync
}
2023-07-10 14:27:41 +00:00
func (fs *File[T]) flushWorker() {
2023-07-08 15:26:36 +00:00
for {
2023-07-08 20:10:02 +00:00
time.Sleep(diskWriteInterval)
2023-07-08 15:26:36 +00:00
if fs.dirty.Swap(0) == 0 {
select {
case fs.sync <- struct{}{}:
default:
}
continue
}
err := fs.flush()
if err != nil {
log.Println(err)
}
}
}
2023-07-10 14:27:41 +00:00
func (fs *File[T]) flush() error {
oldPath := filepath.Join(fs.collection.Root(), fs.collection.Name()+".dat")
2023-07-08 17:08:28 +00:00
newPath := oldPath + ".tmp"
file, err := os.OpenFile(newPath, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0600)
2023-07-08 15:26:36 +00:00
if err != nil {
return err
}
bufferedWriter := bufio.NewWriter(file)
err = fs.writeTo(bufferedWriter)
if err != nil {
file.Close()
return err
}
err = bufferedWriter.Flush()
if err != nil {
file.Close()
return err
}
err = file.Sync()
if err != nil {
file.Close()
return err
}
2023-07-08 17:08:28 +00:00
err = file.Close()
if err != nil {
return err
}
return os.Rename(newPath, oldPath)
2023-07-08 15:26:36 +00:00
}
// readFrom reads the entire collection.
2023-07-10 14:27:41 +00:00
func (fs *File[T]) readFrom(stream io.Reader) error {
2023-07-08 15:26:36 +00:00
var (
key string
value []byte
)
scanner := bufio.NewScanner(stream)
for scanner.Scan() {
if key == "" {
key = scanner.Text()
continue
}
value = scanner.Bytes()
object := new(T)
err := json.Unmarshal(value, object)
if err != nil {
return err
}
2023-07-10 14:27:41 +00:00
fs.collection.Data().Store(key, object)
2023-07-08 15:26:36 +00:00
key = ""
}
return nil
}
// writeTo writes the entire collection.
2023-07-10 14:27:41 +00:00
func (fs *File[T]) writeTo(writer io.Writer) error {
2023-07-08 15:26:36 +00:00
stringWriter, ok := writer.(io.StringWriter)
if !ok {
panic("The given io.Writer is not an io.StringWriter")
}
records := []keyValue{}
2023-07-10 14:27:41 +00:00
fs.collection.Data().Range(func(key, value any) bool {
2023-07-08 15:26:36 +00:00
records = append(records, keyValue{
key: key.(string),
value: value,
})
return true
})
sort.Slice(records, func(i, j int) bool {
return records[i].key < records[j].key
})
encoder := NewEncoder(writer)
for _, record := range records {
_, err := stringWriter.WriteString(record.key)
if err != nil {
return err
}
_, err = stringWriter.WriteString("\n")
if err != nil {
return err
}
err = encoder.Encode(record.value)
if err != nil {
return err
}
}
return nil
}