|
|
|
package dataswamp
|
|
|
|
|
|
|
|
import (
|
|
|
|
"caj-larsson/bog/dataswamp/namespace"
|
|
|
|
"caj-larsson/bog/dataswamp/swampfile"
|
|
|
|
"caj-larsson/bog/util"
|
|
|
|
"io"
|
|
|
|
"strconv"
|
|
|
|
"time"
|
|
|
|
// "errors"
|
|
|
|
// "fmt"
|
|
|
|
)
|
|
|
|
|
|
|
|
type DataSwampService struct {
|
|
|
|
ns_svc namespace.NamespaceService
|
|
|
|
swamp_file_repo swampfile.Repository
|
|
|
|
logger util.Logger
|
|
|
|
eventBus util.EventBus
|
|
|
|
}
|
|
|
|
|
|
|
|
func NewDataSwampService(
|
|
|
|
ns_svc namespace.NamespaceService,
|
|
|
|
swamp_file_repo swampfile.Repository,
|
|
|
|
logger util.Logger,
|
|
|
|
) *DataSwampService {
|
|
|
|
s := DataSwampService{ns_svc, swamp_file_repo, logger, *util.NewEventBus()}
|
|
|
|
ns_svc.Wire(s.eventBus.Register, s.eventBus.Handle)
|
|
|
|
return &s
|
|
|
|
}
|
|
|
|
|
|
|
|
func (s DataSwampService) NamespaceStats() []namespace.Namespace {
|
|
|
|
return s.ns_svc.All()
|
|
|
|
}
|
|
|
|
|
|
|
|
func (s DataSwampService) SaveFile(ref swampfile.FileReference, src io.Reader, size int64) error {
|
|
|
|
ns := s.ns_svc.GetOrCreateNs(ref.UserAgent)
|
|
|
|
|
|
|
|
r, err := ref.Clean(true)
|
|
|
|
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
if !ns.FileQuota.Allows(size) {
|
|
|
|
return namespace.ErrExceedQuota
|
|
|
|
}
|
|
|
|
|
|
|
|
f, err := s.swamp_file_repo.Create(r.Path, strconv.FormatInt(ns.ID, 10))
|
|
|
|
|
|
|
|
if err != nil {
|
|
|
|
// TODO: convert this into a different error.
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
s.eventBus.Handle(*util.NewEvent("FileUsed", struct {
|
|
|
|
Name string
|
|
|
|
Size int64
|
|
|
|
}{
|
|
|
|
ns.Name,
|
|
|
|
f.Size(),
|
|
|
|
}))
|
|
|
|
|
|
|
|
// TODO: rewrite this into an interruptable loop that emits downloaded events
|
|
|
|
written, err := io.CopyN(f, src, size)
|
|
|
|
|
|
|
|
if written < size {
|
|
|
|
s.swamp_file_repo.Delete(r.Path, strconv.FormatInt(ns.ID, 10)) //
|
|
|
|
return swampfile.ErrContentSizeExaggerated
|
|
|
|
}
|
|
|
|
|
|
|
|
var buf = make([]byte, 1)
|
|
|
|
|
|
|
|
overread, err := src.Read(buf)
|
|
|
|
|
|
|
|
if overread > 0 || err != io.EOF {
|
|
|
|
s.swamp_file_repo.Delete(r.Path, strconv.FormatInt(ns.ID, 10))
|
|
|
|
return swampfile.ErrContentSizeExceeded
|
|
|
|
}
|
|
|
|
|
|
|
|
err = f.Close()
|
|
|
|
if err != nil {
|
|
|
|
panic(err)
|
|
|
|
}
|
|
|
|
|
|
|
|
s.eventBus.Handle(*util.NewEvent("FileRecieved", struct {
|
|
|
|
Name string
|
|
|
|
Size int64
|
|
|
|
}{
|
|
|
|
ns.Name,
|
|
|
|
written,
|
|
|
|
}))
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (s DataSwampService) OpenOutFile(ref swampfile.FileReference) (swampfile.SwampOutFile, error) {
|
|
|
|
ns := s.ns_svc.GetOrCreateNs(ref.UserAgent)
|
|
|
|
|
|
|
|
r, err := ref.Clean(true)
|
|
|
|
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
f, err := s.swamp_file_repo.Open(r.Path, strconv.FormatInt(ns.ID, 10))
|
|
|
|
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
s.eventBus.Handle(*util.NewEvent("FileSent", f.Size()))
|
|
|
|
|
|
|
|
return f, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (s DataSwampService) CleanUpExpiredFiles() error {
|
|
|
|
s.logger.Info("Cleaning up expired files")
|
|
|
|
|
|
|
|
for _, ns := range s.ns_svc.All() {
|
|
|
|
expiry := time.Now().Add(-ns.AllowanceDuration)
|
|
|
|
dfs, err := s.swamp_file_repo.DeleteOlderThan(strconv.FormatInt(ns.ID, 10), expiry)
|
|
|
|
|
|
|
|
if err != nil {
|
|
|
|
panic(err)
|
|
|
|
}
|
|
|
|
|
|
|
|
for _, df := range dfs {
|
|
|
|
s.eventBus.Handle(*util.NewEvent("FileDeleted", struct {
|
|
|
|
Name string
|
|
|
|
Size int64
|
|
|
|
}{
|
|
|
|
ns.Name,
|
|
|
|
df.Size,
|
|
|
|
}))
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|