package main import ( "bytes" "fmt" "io/ioutil" "log" "net/http" "os" "path/filepath" "regexp" "strings" "sync" "time" "github.com/flosch/pongo2/v4" "github.com/rjeczalik/notify" "gopkg.in/yaml.v3" ) type ActionConfig struct { Type string // http Method *string URL *string Content *string Header map[string]string // move To *string Stop bool OnSuccess *ActionConfig `yaml:"onSuccess"` OnError *ActionConfig `yaml:"onError"` } type DirectoryConfig struct { Path string Recursive bool Events []string Regex *string _regex *regexp.Regexp Actions []ActionConfig } type Config struct { Directories []DirectoryConfig } var wg sync.WaitGroup func startWatcher(directory *DirectoryConfig) { c := make(chan notify.EventInfo, 10) monitoredEvents := []notify.Event{} for _, e := range directory.Events { switch e { case "CREATE": monitoredEvents = append(monitoredEvents, notify.Create) case "REMOVE": monitoredEvents = append(monitoredEvents, notify.Remove) case "WRITE": monitoredEvents = append(monitoredEvents, notify.Write) case "RENAME": monitoredEvents = append(monitoredEvents, notify.Rename) case "ACCESS": monitoredEvents = append(monitoredEvents, notify.InAccess) case "MODIFY": monitoredEvents = append(monitoredEvents, notify.InModify) case "ATTRIB": monitoredEvents = append(monitoredEvents, notify.InAttrib) case "CLOSE_WRITE": monitoredEvents = append(monitoredEvents, notify.InCloseWrite) case "CLOSE_NOWRITE": monitoredEvents = append(monitoredEvents, notify.InCloseNowrite) case "OPEN": monitoredEvents = append(monitoredEvents, notify.InOpen) case "MOVED_FROM": monitoredEvents = append(monitoredEvents, notify.InMovedFrom) case "MOVED_TO": monitoredEvents = append(monitoredEvents, notify.InMovedTo) case "DELETE": monitoredEvents = append(monitoredEvents, notify.InDelete) case "DELETE_SELF": monitoredEvents = append(monitoredEvents, notify.InDeleteSelf) case "MOVE_SELF": monitoredEvents = append(monitoredEvents, notify.InMoveSelf) default: log.Fatalf("event %s unknown", e) } } if directory.Regex != nil { directory._regex = regexp.MustCompile(*directory.Regex) } wg.Add(1) go func(directory *DirectoryConfig, monitoredEvents []notify.Event) { defer wg.Done() log.Printf("watching %s for %v\n", directory.Path, directory.Events) if err := notify.Watch(directory.Path, c, monitoredEvents...); err != nil { log.Fatal(err) } defer notify.Stop(c) for { eventInfo := <-c p := eventInfo.Path() log.Printf("path: %s; event: %s\n", p, eventInfo.Event().String()) if directory._regex == nil || directory._regex.MatchString(p) { for _, action := range directory.Actions { contin, _ := runAction(&action, eventInfo) if !contin { break } } } else { log.Printf("path: %s; no regex match\n", p) } } }(directory, monitoredEvents) } func runAction(action *ActionConfig, eventInfo notify.EventInfo) (contin bool, err error) { contin = !action.Stop log.Printf("path: %s; action: %s; stop: %v\n", eventInfo.Path(), action.Type, !contin) switch action.Type { case "http": err = actionHttp(action, eventInfo) case "move": err = actionMove(action, eventInfo) case "delete": err = actionDelete(action, eventInfo) case "log": err = actionLog(action, eventInfo) default: err = fmt.Errorf("action type %s unknown", action.Type) } if err != nil { log.Printf("path: %s; action: %s; error %s", eventInfo.Path(), action.Type, err) if action.OnError != nil { _c, errE := runAction(action.OnError, eventInfo) contin = contin && _c if errE != nil { log.Printf("path: %s; action: %s; onError %s, error %s", eventInfo.Path(), action.Type, action.OnError.Type, errE) } } } else if err == nil && action.OnSuccess != nil { _c, errS := runAction(action.OnSuccess, eventInfo) contin = contin && _c if errS != nil { log.Printf("path: %s; action: %s; onSuccess %s, error %s", eventInfo.Path(), action.Type, action.OnError.Type, errS) } err = errS } return contin, err } func actionHttp(action *ActionConfig, eventInfo notify.EventInfo) error { method := "POST" if action.Method != nil { method = strings.ToUpper(*action.Method) } if action.URL == nil { return fmt.Errorf("missing url in http action") } postData := bytes.NewBuffer([]byte{}) if method != "GET" { if action.Content == nil { // read file postBytes, err := ioutil.ReadFile(eventInfo.Path()) if err != nil { return nil } postData = bytes.NewBuffer(postBytes) } } log.Printf("%s %s", method, *action.URL) req, err := http.NewRequest(method, *action.URL, postData) if err != nil { return err } for key, val := range action.Header { req.Header.Set(key, val) } client := &http.Client{} client.Timeout = time.Second * 20 resp, err := client.Do(req) if err != nil { return err } body, _ := ioutil.ReadAll(resp.Body) fmt.Println(string(body)) defer resp.Body.Close() if resp.StatusCode >= 400 { return fmt.Errorf("response status %s", resp.Status) } return nil } func actionMove(action *ActionConfig, eventInfo notify.EventInfo) error { if action.To == nil { return fmt.Errorf("missing to: as destination for move action") } to, err := tpl2String(*action.To, action, eventInfo) if err != nil { return err } err = os.Rename(eventInfo.Path(), to) return err } func actionDelete(action *ActionConfig, eventInfo notify.EventInfo) error { log.Println("DELETE") return nil } func actionLog(action *ActionConfig, eventInfo notify.EventInfo) error { var logstr string if action.Content != nil { logstr = *action.Content } else { logstr = eventInfo.Path() } log.Printf("LOG: %s", logstr) return nil } func tpl2String(tpl string, action *ActionConfig, eventInfo notify.EventInfo) (string, error) { _tpl, err := pongo2.FromString(tpl) if err != nil { return "", err } p := eventInfo.Path() return _tpl.Execute(pongo2.Context{ "path": p, "dirname": filepath.Dir(p), "filename": filepath.Base(p), }) } func main() { conf := Config{} confDat, err := ioutil.ReadFile("./config.yml") if err != nil { log.Fatalf("error %v", err) } err = yaml.Unmarshal(confDat, &conf) if err != nil { log.Fatalf("error %v", err) } for _, directory := range conf.Directories { startWatcher(&directory) } wg.Wait() log.Println("fstrigger exit") }