Compare commits
No commits in common. 'master' and 'v0.3.0' have entirely different histories.
@ -1,7 +0,0 @@
|
|||||||
pipeline:
|
|
||||||
build:
|
|
||||||
image: golang
|
|
||||||
commands:
|
|
||||||
- go test ./...
|
|
||||||
environment:
|
|
||||||
- GOPRIVATE=git.sg.caj.me/caj
|
|
@ -1,7 +0,0 @@
|
|||||||
package dataswamp
|
|
||||||
|
|
||||||
import (
|
|
||||||
// "caj-larsson/bog/dataswamp/namespace"
|
|
||||||
)
|
|
||||||
|
|
||||||
type AdminService struct{}
|
|
@ -1 +0,0 @@
|
|||||||
package dataswamp
|
|
@ -0,0 +1,7 @@
|
|||||||
|
package dataswamp
|
||||||
|
|
||||||
|
type Logger interface {
|
||||||
|
Debug(format string, a ...interface{})
|
||||||
|
Info(format string, a ...interface{})
|
||||||
|
Warn(format string, a ...interface{})
|
||||||
|
}
|
@ -1,111 +0,0 @@
|
|||||||
package namespace
|
|
||||||
|
|
||||||
import (
|
|
||||||
"caj-larsson/bog/util"
|
|
||||||
"fmt"
|
|
||||||
"time"
|
|
||||||
)
|
|
||||||
|
|
||||||
type Clock interface {
|
|
||||||
Now() time.Time
|
|
||||||
}
|
|
||||||
|
|
||||||
type NamespaceService struct {
|
|
||||||
repo Repository
|
|
||||||
outboxes []func(util.Event)
|
|
||||||
logger util.Logger
|
|
||||||
clock Clock
|
|
||||||
default_ttl time.Duration
|
|
||||||
default_quota_bytes int64
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewNamespaceService(repo Repository, logger util.Logger, clock Clock, default_ttl time.Duration, default_quota_bytes int64) *NamespaceService {
|
|
||||||
return &NamespaceService{
|
|
||||||
repo,
|
|
||||||
nil,
|
|
||||||
logger,
|
|
||||||
clock,
|
|
||||||
default_ttl,
|
|
||||||
default_quota_bytes,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *NamespaceService) GetOrCreateNs(name string) *Namespace {
|
|
||||||
ns, err := s.repo.GetByName(name)
|
|
||||||
|
|
||||||
if err == ErrNotExists {
|
|
||||||
new_ns := Namespace{
|
|
||||||
0,
|
|
||||||
name,
|
|
||||||
s.clock.Now(),
|
|
||||||
s.default_ttl,
|
|
||||||
FileSizeQuota{s.default_quota_bytes, 0},
|
|
||||||
FileStat{0, 0},
|
|
||||||
FileStat{0, 0},
|
|
||||||
}
|
|
||||||
created_ns, err := s.repo.Create(new_ns)
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
return created_ns
|
|
||||||
}
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
return ns
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *NamespaceService) Wire(reg func(string, util.EventHandler), outbox func(ev util.Event)) {
|
|
||||||
reg("FileUsed", s.handleFileUsed)
|
|
||||||
s.outboxes = append(s.outboxes, outbox)
|
|
||||||
|
|
||||||
reg("FileUsed", s.handleFileUsed)
|
|
||||||
reg("FileDeleted", s.handleFileDeleted)
|
|
||||||
reg("FileRecieved", s.handleFileRecieved)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *NamespaceService) All() []Namespace {
|
|
||||||
nss, err := s.repo.All()
|
|
||||||
if err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
return nss
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *NamespaceService) handleFileUsed(payload interface{}) {
|
|
||||||
var payload_s = payload.(struct {
|
|
||||||
Name string
|
|
||||||
Size int64
|
|
||||||
})
|
|
||||||
fmt.Printf("file used %v\n", payload_s)
|
|
||||||
ns := s.GetOrCreateNs(payload_s.Name)
|
|
||||||
ns.FileQuota = ns.FileQuota.Add(payload_s.Size)
|
|
||||||
s.repo.Update(ns.ID, *ns)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *NamespaceService) handleFileDeleted(payload interface{}) {
|
|
||||||
var payload_s = payload.(struct {
|
|
||||||
Name string
|
|
||||||
Size int64
|
|
||||||
})
|
|
||||||
fmt.Printf("file deleted %v\n", payload_s)
|
|
||||||
ns := s.GetOrCreateNs(payload_s.Name)
|
|
||||||
ns.FileQuota = ns.FileQuota.Add(-payload_s.Size)
|
|
||||||
fmt.Printf("file usage %v\n", ns.FileQuota)
|
|
||||||
s.repo.Update(ns.ID, *ns)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *NamespaceService) handleFileRecieved(payload interface{}) {
|
|
||||||
var payload_s = payload.(struct {
|
|
||||||
Name string
|
|
||||||
Size int64
|
|
||||||
})
|
|
||||||
fmt.Printf("file recieved %v\n", payload_s)
|
|
||||||
ns := s.GetOrCreateNs(payload_s.Name)
|
|
||||||
ns.FileQuota = ns.FileQuota.Add(payload_s.Size)
|
|
||||||
fmt.Printf("file usage %v\n", ns.FileQuota)
|
|
||||||
s.repo.Update(ns.ID, *ns)
|
|
||||||
}
|
|
@ -1,38 +0,0 @@
|
|||||||
package namespace
|
|
||||||
|
|
||||||
import (
|
|
||||||
"caj-larsson/bog/util"
|
|
||||||
"testing"
|
|
||||||
)
|
|
||||||
|
|
||||||
func TestEventTest(t *testing.T) {
|
|
||||||
eb := util.NewEventBus()
|
|
||||||
svc := NamespaceService{}
|
|
||||||
|
|
||||||
svc.Wire(eb.Register, eb.Handle)
|
|
||||||
|
|
||||||
events := []util.Event{
|
|
||||||
*util.NewEvent("FileUsed", struct {
|
|
||||||
Name string
|
|
||||||
Size int64
|
|
||||||
}{
|
|
||||||
"asd",
|
|
||||||
int64(12),
|
|
||||||
}),
|
|
||||||
*util.NewEvent("FileDeleted", struct {
|
|
||||||
Name string
|
|
||||||
Size int64
|
|
||||||
}{
|
|
||||||
"asd",
|
|
||||||
int64(12),
|
|
||||||
}),
|
|
||||||
*util.NewEvent("FileRecieved", struct {
|
|
||||||
Name string
|
|
||||||
Size int64
|
|
||||||
}{
|
|
||||||
"asd",
|
|
||||||
int64(12),
|
|
||||||
}),
|
|
||||||
}
|
|
||||||
util.AcceptsMessage(t, eb, events)
|
|
||||||
}
|
|
@ -0,0 +1,153 @@
|
|||||||
|
package dataswamp
|
||||||
|
|
||||||
|
import (
|
||||||
|
"caj-larsson/bog/dataswamp/namespace"
|
||||||
|
"caj-larsson/bog/dataswamp/swampfile"
|
||||||
|
"io"
|
||||||
|
"strconv"
|
||||||
|
"time"
|
||||||
|
// "errors"
|
||||||
|
// "fmt"
|
||||||
|
)
|
||||||
|
|
||||||
|
type SwampFileService struct {
|
||||||
|
namespace_repo namespace.Repository
|
||||||
|
swamp_file_repo swampfile.Repository
|
||||||
|
default_allowance_bytes int64
|
||||||
|
default_allowance_duration time.Duration
|
||||||
|
logger Logger
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewSwampFileService(
|
||||||
|
namespace_repo namespace.Repository,
|
||||||
|
swamp_file_repo swampfile.Repository,
|
||||||
|
da_bytes int64,
|
||||||
|
da_duration time.Duration,
|
||||||
|
logger Logger,
|
||||||
|
) SwampFileService {
|
||||||
|
return SwampFileService{namespace_repo, swamp_file_repo, da_bytes, da_duration, logger}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s SwampFileService) getOrCreateNs(namespace_in string) *namespace.Namespace {
|
||||||
|
ns, err := s.namespace_repo.GetByName(namespace_in)
|
||||||
|
|
||||||
|
if err == namespace.ErrNotExists {
|
||||||
|
new_ns := namespace.Namespace{
|
||||||
|
0,
|
||||||
|
namespace_in,
|
||||||
|
time.Now(),
|
||||||
|
s.default_allowance_duration,
|
||||||
|
namespace.FileSizeQuota{s.default_allowance_bytes, 0},
|
||||||
|
namespace.FileStat{0, 0},
|
||||||
|
namespace.FileStat{0, 0},
|
||||||
|
namespace.FileStat{0, 0},
|
||||||
|
}
|
||||||
|
created_ns, err := s.namespace_repo.Create(new_ns)
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return created_ns
|
||||||
|
}
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return ns
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s SwampFileService) SaveFile(ref swampfile.FileReference, src io.Reader, size int64) error {
|
||||||
|
ns := s.getOrCreateNs(ref.UserAgent)
|
||||||
|
|
||||||
|
err := ref.Clean(true)
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if !ns.FileQuota.Allows(size) {
|
||||||
|
return namespace.ErrExceedQuota
|
||||||
|
}
|
||||||
|
|
||||||
|
f, err := s.swamp_file_repo.Create(ref.Path, strconv.FormatInt(ns.ID, 10))
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
written, err := io.CopyN(f, src, size)
|
||||||
|
|
||||||
|
if written < size {
|
||||||
|
s.swamp_file_repo.Delete(ref.Path, strconv.FormatInt(ns.ID, 10))
|
||||||
|
return swampfile.ErrContentSizeExaggerated
|
||||||
|
}
|
||||||
|
|
||||||
|
var buf = make([]byte, 1)
|
||||||
|
|
||||||
|
overread, err := src.Read(buf)
|
||||||
|
|
||||||
|
if overread > 0 || err != io.EOF {
|
||||||
|
s.swamp_file_repo.Delete(ref.Path, strconv.FormatInt(ns.ID, 10))
|
||||||
|
return swampfile.ErrContentSizeExceeded
|
||||||
|
}
|
||||||
|
|
||||||
|
f.Close()
|
||||||
|
ns.FileQuota.Add(size)
|
||||||
|
ns.Usage = ns.Usage.Add(size)
|
||||||
|
ns.Upload = ns.Upload.Add(size)
|
||||||
|
s.namespace_repo.Update(ns.ID, *ns)
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s SwampFileService) OpenOutFile(ref swampfile.FileReference) (swampfile.SwampOutFile, error) {
|
||||||
|
ns := s.getOrCreateNs(ref.UserAgent)
|
||||||
|
|
||||||
|
err := ref.Clean(true)
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
f, err := s.swamp_file_repo.Open(ref.Path, strconv.FormatInt(ns.ID, 10))
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
ns.Download = ns.Download.Add(f.Size())
|
||||||
|
s.namespace_repo.Update(ns.ID, *ns)
|
||||||
|
|
||||||
|
return f, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s SwampFileService) NamespaceStats() ([]namespace.Namespace, error) {
|
||||||
|
return s.namespace_repo.All()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s SwampFileService) CleanUpExpiredFiles() error {
|
||||||
|
s.logger.Info("Cleaning up expired files")
|
||||||
|
nss, err := s.namespace_repo.All()
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, ns := range nss {
|
||||||
|
expiry := time.Now().Add(-ns.AllowanceDuration)
|
||||||
|
dfs, err := s.swamp_file_repo.DeleteOlderThan(strconv.FormatInt(ns.ID, 10), expiry)
|
||||||
|
|
||||||
|
for _, df := range dfs {
|
||||||
|
ns.FileQuota.Remove(df.Size)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
s.namespace_repo.Update(ns.ID, ns)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
@ -1,138 +0,0 @@
|
|||||||
package dataswamp
|
|
||||||
|
|
||||||
import (
|
|
||||||
"caj-larsson/bog/dataswamp/namespace"
|
|
||||||
"caj-larsson/bog/dataswamp/swampfile"
|
|
||||||
"caj-larsson/bog/util"
|
|
||||||
"io"
|
|
||||||
"strconv"
|
|
||||||
"time"
|
|
||||||
// "errors"
|
|
||||||
// "fmt"
|
|
||||||
)
|
|
||||||
|
|
||||||
type DataSwampService struct {
|
|
||||||
ns_svc namespace.NamespaceService
|
|
||||||
swamp_file_repo swampfile.Repository
|
|
||||||
logger util.Logger
|
|
||||||
eventBus util.EventBus
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewDataSwampService(
|
|
||||||
ns_svc namespace.NamespaceService,
|
|
||||||
swamp_file_repo swampfile.Repository,
|
|
||||||
logger util.Logger,
|
|
||||||
) *DataSwampService {
|
|
||||||
s := DataSwampService{ns_svc, swamp_file_repo, logger, *util.NewEventBus()}
|
|
||||||
ns_svc.Wire(s.eventBus.Register, s.eventBus.Handle)
|
|
||||||
return &s
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s DataSwampService) NamespaceStats() []namespace.Namespace {
|
|
||||||
return s.ns_svc.All()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s DataSwampService) SaveFile(ref swampfile.FileReference, src io.Reader, size int64) error {
|
|
||||||
ns := s.ns_svc.GetOrCreateNs(ref.UserAgent)
|
|
||||||
|
|
||||||
r, err := ref.Clean(true)
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
if !ns.FileQuota.Allows(size) {
|
|
||||||
return namespace.ErrExceedQuota
|
|
||||||
}
|
|
||||||
|
|
||||||
f, err := s.swamp_file_repo.Create(r.Path, strconv.FormatInt(ns.ID, 10))
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
// TODO: convert this into a different error.
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
s.eventBus.Handle(*util.NewEvent("FileUsed", struct {
|
|
||||||
Name string
|
|
||||||
Size int64
|
|
||||||
}{
|
|
||||||
ns.Name,
|
|
||||||
f.Size(),
|
|
||||||
}))
|
|
||||||
|
|
||||||
// TODO: rewrite this into an interruptable loop that emits downloaded events
|
|
||||||
written, err := io.CopyN(f, src, size)
|
|
||||||
|
|
||||||
if written < size {
|
|
||||||
s.swamp_file_repo.Delete(r.Path, strconv.FormatInt(ns.ID, 10)) //
|
|
||||||
return swampfile.ErrContentSizeExaggerated
|
|
||||||
}
|
|
||||||
|
|
||||||
var buf = make([]byte, 1)
|
|
||||||
|
|
||||||
overread, err := src.Read(buf)
|
|
||||||
|
|
||||||
if overread > 0 || err != io.EOF {
|
|
||||||
s.swamp_file_repo.Delete(r.Path, strconv.FormatInt(ns.ID, 10))
|
|
||||||
return swampfile.ErrContentSizeExceeded
|
|
||||||
}
|
|
||||||
|
|
||||||
err = f.Close()
|
|
||||||
if err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
s.eventBus.Handle(*util.NewEvent("FileRecieved", struct {
|
|
||||||
Name string
|
|
||||||
Size int64
|
|
||||||
}{
|
|
||||||
ns.Name,
|
|
||||||
written,
|
|
||||||
}))
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s DataSwampService) OpenOutFile(ref swampfile.FileReference) (swampfile.SwampOutFile, error) {
|
|
||||||
ns := s.ns_svc.GetOrCreateNs(ref.UserAgent)
|
|
||||||
|
|
||||||
r, err := ref.Clean(true)
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
f, err := s.swamp_file_repo.Open(r.Path, strconv.FormatInt(ns.ID, 10))
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
s.eventBus.Handle(*util.NewEvent("FileSent", f.Size()))
|
|
||||||
|
|
||||||
return f, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s DataSwampService) CleanUpExpiredFiles() error {
|
|
||||||
s.logger.Info("Cleaning up expired files")
|
|
||||||
|
|
||||||
for _, ns := range s.ns_svc.All() {
|
|
||||||
expiry := time.Now().Add(-ns.AllowanceDuration)
|
|
||||||
dfs, err := s.swamp_file_repo.DeleteOlderThan(strconv.FormatInt(ns.ID, 10), expiry)
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, df := range dfs {
|
|
||||||
s.eventBus.Handle(*util.NewEvent("FileDeleted", struct {
|
|
||||||
Name string
|
|
||||||
Size int64
|
|
||||||
}{
|
|
||||||
ns.Name,
|
|
||||||
df.Size,
|
|
||||||
}))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
Binary file not shown.
Before Width: | Height: | Size: 10 KiB |
@ -1,11 +0,0 @@
|
|||||||
package system_time
|
|
||||||
|
|
||||||
import (
|
|
||||||
"time"
|
|
||||||
)
|
|
||||||
|
|
||||||
type Clock struct{}
|
|
||||||
|
|
||||||
func (c Clock) Now() time.Time {
|
|
||||||
return time.Now()
|
|
||||||
}
|
|
@ -1,41 +0,0 @@
|
|||||||
package util
|
|
||||||
|
|
||||||
type Event struct {
|
|
||||||
eventName string
|
|
||||||
payload interface{}
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewEvent(eventName string, payload interface{}) *Event {
|
|
||||||
return &Event{
|
|
||||||
eventName,
|
|
||||||
payload,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (e *Event) EventName() string {
|
|
||||||
return e.eventName
|
|
||||||
}
|
|
||||||
|
|
||||||
func (e *Event) Payload() interface{} {
|
|
||||||
return e.payload
|
|
||||||
}
|
|
||||||
|
|
||||||
type EventHandler func(payload interface{})
|
|
||||||
|
|
||||||
type EventBus struct {
|
|
||||||
handlers map[string][]EventHandler
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewEventBus() *EventBus {
|
|
||||||
return &EventBus{make(map[string][]EventHandler)}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (eb *EventBus) Register(eventName string, handler EventHandler) {
|
|
||||||
eb.handlers[eventName] = append(eb.handlers[eventName], handler)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (eb *EventBus) Handle(e Event) {
|
|
||||||
for _, handler := range eb.handlers[e.EventName()] {
|
|
||||||
handler(e.Payload())
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,23 +0,0 @@
|
|||||||
package util
|
|
||||||
|
|
||||||
import (
|
|
||||||
"github.com/matryer/is"
|
|
||||||
"testing"
|
|
||||||
)
|
|
||||||
|
|
||||||
func (eb *EventBus) Handled(e Event) bool {
|
|
||||||
// TODO: figure out how to verify the event signature here.
|
|
||||||
handlers, exists := eb.handlers[e.EventName()]
|
|
||||||
if !exists {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
return len(handlers) > 0
|
|
||||||
}
|
|
||||||
|
|
||||||
func AcceptsMessage(t *testing.T, eb *EventBus, es []Event) {
|
|
||||||
is := is.New(t)
|
|
||||||
for _, e := range es {
|
|
||||||
is.True(eb.Handled(e))
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,13 +0,0 @@
|
|||||||
package util
|
|
||||||
|
|
||||||
type Logger interface {
|
|
||||||
Debug(format string, a ...interface{})
|
|
||||||
Info(format string, a ...interface{})
|
|
||||||
Warn(format string, a ...interface{})
|
|
||||||
}
|
|
||||||
|
|
||||||
type TestLogger struct{}
|
|
||||||
|
|
||||||
func (t TestLogger) Debug(format string, a ...interface{}) {}
|
|
||||||
func (t TestLogger) Info(format string, a ...interface{}) {}
|
|
||||||
func (t TestLogger) Warn(format string, a ...interface{}) {}
|
|
Loading…
Reference in New Issue