|
|
|
@ -3,7 +3,6 @@ package dataswamp
|
|
|
|
|
import (
|
|
|
|
|
"caj-larsson/bog/dataswamp/namespace"
|
|
|
|
|
"caj-larsson/bog/dataswamp/swampfile"
|
|
|
|
|
"caj-larsson/bog/util"
|
|
|
|
|
"io"
|
|
|
|
|
"strconv"
|
|
|
|
|
"time"
|
|
|
|
@ -11,29 +10,56 @@ import (
|
|
|
|
|
// "fmt"
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
type DataSwampService struct {
|
|
|
|
|
ns_svc namespace.NamespaceService
|
|
|
|
|
type SwampFileService struct {
|
|
|
|
|
namespace_repo namespace.Repository
|
|
|
|
|
swamp_file_repo swampfile.Repository
|
|
|
|
|
logger util.Logger
|
|
|
|
|
eventBus util.EventBus
|
|
|
|
|
default_allowance_bytes int64
|
|
|
|
|
default_allowance_duration time.Duration
|
|
|
|
|
logger Logger
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func NewDataSwampService(
|
|
|
|
|
ns_svc namespace.NamespaceService,
|
|
|
|
|
func NewSwampFileService(
|
|
|
|
|
namespace_repo namespace.Repository,
|
|
|
|
|
swamp_file_repo swampfile.Repository,
|
|
|
|
|
logger util.Logger,
|
|
|
|
|
) *DataSwampService {
|
|
|
|
|
s := DataSwampService{ns_svc, swamp_file_repo, logger, *util.NewEventBus()}
|
|
|
|
|
ns_svc.Wire(s.eventBus.Register, s.eventBus.Handle)
|
|
|
|
|
return &s
|
|
|
|
|
da_bytes int64,
|
|
|
|
|
da_duration time.Duration,
|
|
|
|
|
logger Logger,
|
|
|
|
|
) SwampFileService {
|
|
|
|
|
return SwampFileService{namespace_repo, swamp_file_repo, da_bytes, da_duration, logger}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (s DataSwampService) NamespaceStats() []namespace.Namespace {
|
|
|
|
|
return s.ns_svc.All()
|
|
|
|
|
func (s SwampFileService) getOrCreateNs(namespace_in string) *namespace.Namespace {
|
|
|
|
|
ns, err := s.namespace_repo.GetByName(namespace_in)
|
|
|
|
|
|
|
|
|
|
if err == namespace.ErrNotExists {
|
|
|
|
|
new_ns := namespace.Namespace{
|
|
|
|
|
0,
|
|
|
|
|
namespace_in,
|
|
|
|
|
time.Now(),
|
|
|
|
|
s.default_allowance_duration,
|
|
|
|
|
namespace.FileSizeQuota{s.default_allowance_bytes, 0},
|
|
|
|
|
namespace.FileStat{0, 0},
|
|
|
|
|
namespace.FileStat{0, 0},
|
|
|
|
|
namespace.FileStat{0, 0},
|
|
|
|
|
}
|
|
|
|
|
created_ns, err := s.namespace_repo.Create(new_ns)
|
|
|
|
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
panic(err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return created_ns
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
panic(err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return ns
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (s DataSwampService) SaveFile(ref swampfile.FileReference, src io.Reader, size int64) error {
|
|
|
|
|
ns := s.ns_svc.GetOrCreateNs(ref.UserAgent)
|
|
|
|
|
func (s SwampFileService) SaveFile(ref swampfile.FileReference, src io.Reader, size int64) error {
|
|
|
|
|
ns := s.getOrCreateNs(ref.UserAgent)
|
|
|
|
|
|
|
|
|
|
r, err := ref.Clean(true)
|
|
|
|
|
|
|
|
|
@ -48,23 +74,13 @@ func (s DataSwampService) SaveFile(ref swampfile.FileReference, src io.Reader, s
|
|
|
|
|
f, err := s.swamp_file_repo.Create(r.Path, strconv.FormatInt(ns.ID, 10))
|
|
|
|
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
// TODO: convert this into a different error.
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
s.eventBus.Handle(*util.NewEvent("FileUsed", struct {
|
|
|
|
|
Name string
|
|
|
|
|
Size int64
|
|
|
|
|
}{
|
|
|
|
|
ns.Name,
|
|
|
|
|
f.Size(),
|
|
|
|
|
}))
|
|
|
|
|
|
|
|
|
|
// TODO: rewrite this into an interruptable loop that emits downloaded events
|
|
|
|
|
written, err := io.CopyN(f, src, size)
|
|
|
|
|
|
|
|
|
|
if written < size {
|
|
|
|
|
s.swamp_file_repo.Delete(r.Path, strconv.FormatInt(ns.ID, 10)) //
|
|
|
|
|
s.swamp_file_repo.Delete(r.Path, strconv.FormatInt(ns.ID, 10))
|
|
|
|
|
return swampfile.ErrContentSizeExaggerated
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -78,19 +94,20 @@ func (s DataSwampService) SaveFile(ref swampfile.FileReference, src io.Reader, s
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
f.Close()
|
|
|
|
|
s.eventBus.Handle(*util.NewEvent("FileRecieved", struct {
|
|
|
|
|
Name string
|
|
|
|
|
Size int64
|
|
|
|
|
}{
|
|
|
|
|
ns.Name,
|
|
|
|
|
written,
|
|
|
|
|
}))
|
|
|
|
|
uq, err := ns.FileQuota.Add(size)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
ns.FileQuota = *uq
|
|
|
|
|
ns.Usage = ns.Usage.Add(size)
|
|
|
|
|
ns.Upload = ns.Upload.Add(size)
|
|
|
|
|
s.namespace_repo.Update(ns.ID, *ns)
|
|
|
|
|
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (s DataSwampService) OpenOutFile(ref swampfile.FileReference) (swampfile.SwampOutFile, error) {
|
|
|
|
|
ns := s.ns_svc.GetOrCreateNs(ref.UserAgent)
|
|
|
|
|
func (s SwampFileService) OpenOutFile(ref swampfile.FileReference) (swampfile.SwampOutFile, error) {
|
|
|
|
|
ns := s.getOrCreateNs(ref.UserAgent)
|
|
|
|
|
|
|
|
|
|
r, err := ref.Clean(true)
|
|
|
|
|
|
|
|
|
@ -104,31 +121,41 @@ func (s DataSwampService) OpenOutFile(ref swampfile.FileReference) (swampfile.Sw
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
s.eventBus.Handle(*util.NewEvent("FileSent", f.Size()))
|
|
|
|
|
ns.Download = ns.Download.Add(f.Size())
|
|
|
|
|
s.namespace_repo.Update(ns.ID, *ns)
|
|
|
|
|
|
|
|
|
|
return f, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (s DataSwampService) CleanUpExpiredFiles() error {
|
|
|
|
|
func (s SwampFileService) NamespaceStats() ([]namespace.Namespace, error) {
|
|
|
|
|
return s.namespace_repo.All()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (s SwampFileService) CleanUpExpiredFiles() error {
|
|
|
|
|
s.logger.Info("Cleaning up expired files")
|
|
|
|
|
nss, err := s.namespace_repo.All()
|
|
|
|
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
for _, ns := range s.ns_svc.All() {
|
|
|
|
|
for _, ns := range nss {
|
|
|
|
|
expiry := time.Now().Add(-ns.AllowanceDuration)
|
|
|
|
|
dfs, err := s.swamp_file_repo.DeleteOlderThan(strconv.FormatInt(ns.ID, 10), expiry)
|
|
|
|
|
|
|
|
|
|
for _, df := range dfs {
|
|
|
|
|
dq, err := ns.FileQuota.Remove(df.Size)
|
|
|
|
|
if err != nil {
|
|
|
|
|
panic(err)
|
|
|
|
|
dq.CurrentUsage = 0
|
|
|
|
|
}
|
|
|
|
|
ns.FileQuota = *dq
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
for _, df := range dfs {
|
|
|
|
|
s.eventBus.Handle(*util.NewEvent("FileDeleted", struct {
|
|
|
|
|
Name string
|
|
|
|
|
Size int64
|
|
|
|
|
}{
|
|
|
|
|
ns.Name,
|
|
|
|
|
df.Size,
|
|
|
|
|
}))
|
|
|
|
|
if err != nil {
|
|
|
|
|
panic(err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
s.namespace_repo.Update(ns.ID, ns)
|
|
|
|
|
}
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|