package main import ( "bytes" "fmt" "io/ioutil" "log" "net/http" "os" "path/filepath" "regexp" "strings" "sync" "time" "github.com/domodwyer/mailyak/v3" "github.com/flosch/pongo2/v4" "github.com/rjeczalik/notify" "gopkg.in/yaml.v3" ) type ActionConfig struct { Id string Type string Description string // http Method *string URL *string Content *string Header map[string]string // move To *string // mdir Path *string // mail // To *string Subject *string Body *string From *string Stop bool OnSuccess []ActionConfig `yaml:"onSuccess"` OnError []ActionConfig `yaml:"onError"` Directory *DirectoryConfig `yaml:"-"` Parent *ActionConfig `yaml:"-"` Error error `yaml:"-"` } type DirectoryConfig struct { Path string Recursive bool Events []string PollingInterval *int64 `yaml:"pollingInterval"` Regex *string _regex *regexp.Regexp _regexMatches []string Actions []ActionConfig } type MailConfig struct { Host *string } type Config struct { Mail MailConfig Directories []DirectoryConfig } var conf = Config{} var wg sync.WaitGroup type pollInfo struct { path string event notify.Event } func (p pollInfo) Path() string { return p.path } func (p pollInfo) Event() notify.Event { return p.event } func (p pollInfo) Sys() interface{} { return nil } type actionCtx map[string]map[string]interface{} func startWatcher(directory *DirectoryConfig) { c := make(chan notify.EventInfo, 1000) monitoredEvents := []notify.Event{} for _, e := range directory.Events { switch e { case "CREATE": monitoredEvents = append(monitoredEvents, notify.Create) case "REMOVE": monitoredEvents = append(monitoredEvents, notify.Remove) case "WRITE": monitoredEvents = append(monitoredEvents, notify.Write) case "RENAME": monitoredEvents = append(monitoredEvents, notify.Rename) case "ACCESS": monitoredEvents = append(monitoredEvents, notify.InAccess) case "MODIFY": monitoredEvents = append(monitoredEvents, notify.InModify) case "ATTRIB": monitoredEvents = append(monitoredEvents, notify.InAttrib) case "CLOSE_WRITE": monitoredEvents = append(monitoredEvents, notify.InCloseWrite) case "CLOSE_NOWRITE": monitoredEvents = append(monitoredEvents, notify.InCloseNowrite) case "OPEN": monitoredEvents = append(monitoredEvents, notify.InOpen) case "MOVED_FROM": monitoredEvents = append(monitoredEvents, notify.InMovedFrom) case "MOVED_TO": monitoredEvents = append(monitoredEvents, notify.InMovedTo) case "DELETE": monitoredEvents = append(monitoredEvents, notify.InDelete) case "DELETE_SELF": monitoredEvents = append(monitoredEvents, notify.InDeleteSelf) case "MOVE_SELF": monitoredEvents = append(monitoredEvents, notify.InMoveSelf) default: log.Fatalf("event %s unknown", e) } } if directory.Regex != nil { directory._regex = regexp.MustCompile(*directory.Regex) } wg.Add(1) go func(directory *DirectoryConfig, monitoredEvents []notify.Event) { defer wg.Done() log.Printf("[ INFO ] path=\"%s\" watching=\"%v\" regex \"%s\"\n", directory.Path, directory.Events, *directory.Regex) if err := notify.Watch(directory.Path, c, monitoredEvents...); err != nil { log.Fatal(err) } defer notify.Stop(c) firstRun := true for { var eventInfo notify.EventInfo if directory.PollingInterval != nil { if !firstRun { select { case eventInfo = <-c: // event case <-time.After(time.Second * time.Duration(*directory.PollingInterval)): // timeout for polling } } firstRun = false // do polling as init } else { eventInfo = <-c } if eventInfo != nil { p := eventInfo.Path() eventStr := eventInfo.Event().String() if eventInfo.Event() == 0 { eventStr = "POLLING" } log.Printf("[ INFO ] path=\"%s\" event=\"%s\"\n", p, eventStr) if directory._regex != nil { directory._regexMatches = directory._regex.FindStringSubmatch(p) } if directory._regex == nil || directory._regexMatches != nil { for _, action := range directory.Actions { action.Directory = directory contin, _ := runAction(&action, eventInfo, nil) if !contin { break } } } else { log.Printf("[ INFO ] path=\"%s\" info=\"no regex match\"\n", p) } } if directory.PollingInterval != nil && len(c) == 0 { // polling only while queue is empty files, err := ioutil.ReadDir(directory.Path) if err != nil { log.Printf("[ ERROR ] path=\"%s\" polling=\"true\" error=\"%s\"", directory.Path, err) } for _, f := range files { p := directory.Path + "/" + f.Name() if directory._regex == nil || directory._regex.MatchString(p) { if time.Now().Add(-1 * time.Minute).After(f.ModTime()) { pollI := pollInfo{ path: p, event: 0, } select { case c <- pollI: default: } } else { log.Printf("[ INFO ] path=\"%s\" polling=\"true\" info=\"ignoring, too young\"\n", p) } } else { log.Printf("[ INFO ] path=\"%s\" polling=\"true\" info=\"no regex match\"\n", p) } } } } }(directory, monitoredEvents) } func runAction(action *ActionConfig, eventInfo notify.EventInfo, ctx actionCtx) (contin bool, err error) { if ctx == nil { ctx = make(actionCtx) } add2Ctx(action, ctx, "action", action) contin = !action.Stop log.Printf("[ INFO ] path=\"%s\" action=\"%s\" stop=\"%v\"\n", eventInfo.Path(), action.Id, !contin) switch action.Type { case "http": err = actionHttp(action, eventInfo, ctx) case "move": err = actionMove(action, eventInfo, ctx) case "mkdir": err = actionMkDir(action, eventInfo, ctx) case "delete": err = actionDelete(action, eventInfo, ctx) case "log": err = actionLog(action, eventInfo, ctx) case "mail": err = actionMail(action, eventInfo, ctx) default: err = fmt.Errorf("action type %s unknown", action.Type) } if err != nil { add2Ctx(action, ctx, "error", err.Error()) action.Error = err log.Printf("[ ERROR ] path=\"%s\" action=\"%s\" error=\"%s\"", eventInfo.Path(), action.Id, err) for _, aE := range action.OnError { aE.Directory = action.Directory aE.Parent = action _c, _ := runAction(&aE, eventInfo, ctx) contin = contin && _c if !contin { break } } } else { for _, aS := range action.OnSuccess { aS.Directory = action.Directory aS.Parent = action _c, errS := runAction(&aS, eventInfo, ctx) contin = contin && _c if errS != nil { err = errS } if !contin { break } } } return contin, err } func add2Ctx(action *ActionConfig, ctx actionCtx, key string, val interface{}) { if action.Id != "" { if ctx[action.Id] == nil { ctx[action.Id] = make(map[string]interface{}) } ctx[action.Id][key] = val } } func actionHttp(action *ActionConfig, eventInfo notify.EventInfo, ctx actionCtx) error { method := "POST" if action.Method != nil { m, err := tpl2String(*action.Method, action, eventInfo, ctx) if err != nil { return err } method = strings.ToUpper(m) } if action.URL == nil { return fmt.Errorf("missing url in http action") } url, err := tpl2String(*action.URL, action, eventInfo, ctx) if err != nil { return err } postData := bytes.NewBuffer([]byte{}) if method != "GET" { if action.Content == nil { // read file postBytes, err := ioutil.ReadFile(eventInfo.Path()) if err != nil { return nil } postData = bytes.NewBuffer(postBytes) } else { p, err := tpl2String(*action.Content, action, eventInfo, ctx) if err != nil { return err } log.Println("BODY: ", p) postData = bytes.NewBufferString(p) } } // log.Printf("%s %s", method, url) req, err := http.NewRequest(method, url, postData) if err != nil { return err } for key, val := range action.Header { req.Header.Set(key, val) } client := &http.Client{} client.Timeout = time.Second * 20 resp, err := client.Do(req) if err != nil { return err } body, _ := ioutil.ReadAll(resp.Body) // fmt.Println(string(body)) defer resp.Body.Close() add2Ctx(action, ctx, "response", map[string]interface{}{ "Status": resp.Status, "StatusCode": resp.StatusCode, "Body": string(body), }) if resp.StatusCode >= 400 { return fmt.Errorf("response status %s", resp.Status) } return nil } func actionMove(action *ActionConfig, eventInfo notify.EventInfo, ctx actionCtx) error { if action.To == nil { return fmt.Errorf("missing to: as destination for move action") } to, err := tpl2String(*action.To, action, eventInfo, ctx) if err != nil { return err } err = os.Rename(eventInfo.Path(), to) return err } func actionDelete(action *ActionConfig, eventInfo notify.EventInfo, ctx actionCtx) error { return os.Remove(eventInfo.Path()) } func actionMkDir(action *ActionConfig, eventInfo notify.EventInfo, ctx actionCtx) error { if action.Path == nil { return fmt.Errorf(("missing path: for mkdir action")) } path, err := tpl2String(*action.Path, action, eventInfo, ctx) if err != nil { return err } return os.MkdirAll(path, 0755) } func actionLog(action *ActionConfig, eventInfo notify.EventInfo, ctx actionCtx) error { var logstr string if action.Content != nil { var err error logstr, err = tpl2String(*action.Content, action, eventInfo, ctx) if err != nil { return err } } else { logstr = eventInfo.Path() } log.Printf("LOG: %s", logstr) return nil } func actionMail(action *ActionConfig, eventInfo notify.EventInfo, ctx actionCtx) error { if conf.Mail.Host == nil { return fmt.Errorf("missing host in mail: config") } if action.To == nil { return fmt.Errorf("missing to: as recipient mail address") } if action.Subject == nil { return fmt.Errorf("missing subject: action") } if action.Body == nil { return fmt.Errorf("missing body: action") } if action.From == nil { return fmt.Errorf("missing from: in action") } mail := mailyak.New(*conf.Mail.Host, nil) to, err := tpl2String(*action.To, action, eventInfo, ctx) if err != nil { return err } mail.To(to) from, err := tpl2String(*action.From, action, eventInfo, ctx) if err != nil { return err } mail.From(from) mail.FromName("fstrigger") subject, err := tpl2String(*action.Subject, action, eventInfo, ctx) if err != nil { return err } mail.Subject(subject) body, err := tpl2String(*action.Body, action, eventInfo, ctx) if err != nil { return err } mail.Plain().Set(body) return mail.Send() } func tpl2String(tpl string, action *ActionConfig, eventInfo notify.EventInfo, ctx actionCtx) (string, error) { _tpl, err := pongo2.FromString(tpl) if err != nil { return "", err } p := eventInfo.Path() return _tpl.Execute(pongo2.Context{ "path": p, "dirname": filepath.Dir(p), "filename": filepath.Base(p), "event": eventInfo.Event().String(), "action": action, "directory": action.Directory, "regexMatches": action.Directory._regexMatches, "context": ctx, }) } func main() { confDat, err := ioutil.ReadFile("./config.yml") if err != nil { log.Fatalf("error %v", err) } err = yaml.Unmarshal(confDat, &conf) if err != nil { log.Fatalf("error %v", err) } for _, directory := range conf.Directories { directory := directory startWatcher(&directory) } wg.Wait() log.Println("fstrigger exit") }