package main import ( "bytes" "fmt" "io/ioutil" "log" "net/http" "os" "path/filepath" "regexp" "strings" "sync" "time" "github.com/flosch/pongo2/v4" "github.com/rjeczalik/notify" "gopkg.in/yaml.v3" ) type ActionConfig struct { Type string // http Method *string URL *string Content *string Header map[string]string // move To *string Stop bool OnSuccess []ActionConfig `yaml:"onSuccess"` OnError []ActionConfig `yaml:"onError"` } type DirectoryConfig struct { Path string Recursive bool Events []string PollingInterval *int64 `yaml:"pollingInterval"` Regex *string _regex *regexp.Regexp Actions []ActionConfig } type Config struct { Directories []DirectoryConfig } var wg sync.WaitGroup type pollInfo struct { path string event notify.Event } func (p pollInfo) Path() string { return p.path } func (p pollInfo) Event() notify.Event { return p.event } func (p pollInfo) Sys() interface{} { return nil } func startWatcher(directory *DirectoryConfig) { c := make(chan notify.EventInfo, 1000) monitoredEvents := []notify.Event{} for _, e := range directory.Events { switch e { case "CREATE": monitoredEvents = append(monitoredEvents, notify.Create) case "REMOVE": monitoredEvents = append(monitoredEvents, notify.Remove) case "WRITE": monitoredEvents = append(monitoredEvents, notify.Write) case "RENAME": monitoredEvents = append(monitoredEvents, notify.Rename) case "ACCESS": monitoredEvents = append(monitoredEvents, notify.InAccess) case "MODIFY": monitoredEvents = append(monitoredEvents, notify.InModify) case "ATTRIB": monitoredEvents = append(monitoredEvents, notify.InAttrib) case "CLOSE_WRITE": monitoredEvents = append(monitoredEvents, notify.InCloseWrite) case "CLOSE_NOWRITE": monitoredEvents = append(monitoredEvents, notify.InCloseNowrite) case "OPEN": monitoredEvents = append(monitoredEvents, notify.InOpen) case "MOVED_FROM": monitoredEvents = append(monitoredEvents, notify.InMovedFrom) case "MOVED_TO": monitoredEvents = append(monitoredEvents, notify.InMovedTo) case "DELETE": monitoredEvents = append(monitoredEvents, notify.InDelete) case "DELETE_SELF": monitoredEvents = append(monitoredEvents, notify.InDeleteSelf) case "MOVE_SELF": monitoredEvents = append(monitoredEvents, notify.InMoveSelf) default: log.Fatalf("event %s unknown", e) } } if directory.Regex != nil { directory._regex = regexp.MustCompile(*directory.Regex) } wg.Add(1) go func(directory *DirectoryConfig, monitoredEvents []notify.Event) { defer wg.Done() log.Printf("watching %s for %v, regex %s\n", directory.Path, directory.Events, *directory.Regex) if err := notify.Watch(directory.Path, c, monitoredEvents...); err != nil { log.Fatal(err) } defer notify.Stop(c) firstRun := true for { var eventInfo notify.EventInfo if directory.PollingInterval != nil { if !firstRun { select { case eventInfo = <-c: // event case <-time.After(time.Second * time.Duration(*directory.PollingInterval)): // timeout for polling } } firstRun = false // do polling as init } else { eventInfo = <-c } if eventInfo != nil { p := eventInfo.Path() log.Printf("path: %s; event: %s\n", p, eventInfo.Event().String()) if directory._regex == nil || directory._regex.MatchString(p) { for _, action := range directory.Actions { contin, _ := runAction(&action, eventInfo) if !contin { break } } } else { log.Printf("path: %s; no regex match\n", p) } } if directory.PollingInterval != nil && len(c) == 0 { // polling only while queue is empty files, err := ioutil.ReadDir(directory.Path) if err != nil { log.Printf("path: %s; polling error: %s", directory.Path, err) } for _, f := range files { p := directory.Path + "/" + f.Name() if directory._regex == nil || directory._regex.MatchString(p) { pollI := pollInfo{ path: p, event: 0, } select { case c <- pollI: default: } } else { log.Printf("path: %s; no regex match (polling)\n", p) } } } } }(directory, monitoredEvents) } func runAction(action *ActionConfig, eventInfo notify.EventInfo) (contin bool, err error) { contin = !action.Stop log.Printf("path: %s; action: %s; stop: %v\n", eventInfo.Path(), action.Type, !contin) switch action.Type { case "http": err = actionHttp(action, eventInfo) case "move": err = actionMove(action, eventInfo) case "delete": err = actionDelete(action, eventInfo) case "log": err = actionLog(action, eventInfo) default: err = fmt.Errorf("action type %s unknown", action.Type) } if err != nil { log.Printf("path: %s; action: %s; error %s", eventInfo.Path(), action.Type, err) for _, aE := range action.OnError { _c, errE := runAction(&aE, eventInfo) contin = contin && _c if errE != nil { log.Printf("path: %s; action: %s; onError %s, error %s", eventInfo.Path(), action.Type, aE.Type, errE) } if !contin { break } } } else { for _, aS := range action.OnSuccess { _c, errS := runAction(&aS, eventInfo) contin = contin && _c if errS != nil { log.Printf("path: %s; action: %s; onSuccess %s, error %s", eventInfo.Path(), action.Type, aS.Type, errS) err = errS } if !contin { break } } } return contin, err } func actionHttp(action *ActionConfig, eventInfo notify.EventInfo) error { method := "POST" if action.Method != nil { m, err := tpl2String(*action.Method, action, eventInfo) if err != nil { return err } method = strings.ToUpper(m) } if action.URL == nil { return fmt.Errorf("missing url in http action") } url, err := tpl2String(*action.URL, action, eventInfo) if err != nil { return err } postData := bytes.NewBuffer([]byte{}) if method != "GET" { if action.Content == nil { // read file postBytes, err := ioutil.ReadFile(eventInfo.Path()) if err != nil { return nil } postData = bytes.NewBuffer(postBytes) } else { p, err := tpl2String(*action.Content, action, eventInfo) if err != nil { return err } log.Println("BODY: ", p) postData = bytes.NewBufferString(p) } } log.Printf("%s %s", method, url) req, err := http.NewRequest(method, url, postData) if err != nil { return err } for key, val := range action.Header { req.Header.Set(key, val) } client := &http.Client{} client.Timeout = time.Second * 20 resp, err := client.Do(req) if err != nil { return err } body, _ := ioutil.ReadAll(resp.Body) fmt.Println(string(body)) defer resp.Body.Close() if resp.StatusCode >= 400 { return fmt.Errorf("response status %s", resp.Status) } return nil } func actionMove(action *ActionConfig, eventInfo notify.EventInfo) error { if action.To == nil { return fmt.Errorf("missing to: as destination for move action") } to, err := tpl2String(*action.To, action, eventInfo) if err != nil { return err } err = os.Rename(eventInfo.Path(), to) return err } func actionDelete(action *ActionConfig, eventInfo notify.EventInfo) error { return os.Remove(eventInfo.Path()) } func actionLog(action *ActionConfig, eventInfo notify.EventInfo) error { var logstr string if action.Content != nil { logstr = *action.Content } else { logstr = eventInfo.Path() } log.Printf("LOG: %s", logstr) return nil } func tpl2String(tpl string, action *ActionConfig, eventInfo notify.EventInfo) (string, error) { _tpl, err := pongo2.FromString(tpl) if err != nil { return "", err } p := eventInfo.Path() return _tpl.Execute(pongo2.Context{ "path": p, "dirname": filepath.Dir(p), "filename": filepath.Base(p), }) } func main() { conf := Config{} confDat, err := ioutil.ReadFile("./config.yml") if err != nil { log.Fatalf("error %v", err) } err = yaml.Unmarshal(confDat, &conf) if err != nil { log.Fatalf("error %v", err) } for _, directory := range conf.Directories { directory := directory startWatcher(&directory) } wg.Wait() log.Println("fstrigger exit") }