job manager, jobm pkg

This commit is contained in:
Sebastian Frank 2019-03-25 10:16:33 +01:00
parent a17926f54b
commit 740fb94556
Signed by: apairon
GPG Key ID: 7270D06DDA7FE8C3
7 changed files with 245 additions and 178 deletions

View File

@ -2,6 +2,7 @@ package filter
import (
"io/ioutil"
"os"
"path"
"strings"
@ -15,16 +16,14 @@ import (
// RegisterFilters reads a directory and register filters from files within it
func RegisterFilters(dir string) {
files, err := ioutil.ReadDir(dir)
if err != nil {
logger.Log.Panicf("could not read from template filters dir '%s': %s", dir, err)
}
logger.Eexit(err, "could not read from template filters dir '%s'", dir)
for _, f := range files {
if !f.IsDir() {
switch path.Ext(f.Name()) {
case ".js":
fileBase := strings.TrimSuffix(f.Name(), ".js")
jsFile := dir + "/" + f.Name()
logger.Log.Debugf("trying to register filter from: %s", jsFile)
logger.D("trying to register filter from: %s", jsFile)
/*
jsStr, err := ioutil.ReadFile(jsFile)
if err != nil {
@ -33,38 +32,31 @@ func RegisterFilters(dir string) {
*/
vm := motto.New()
fn, err := vm.Run(jsFile)
if err != nil {
logger.Log.Panicf("error in javascript vm for '%s': %s", jsFile, err)
}
logger.Eexit(err, "error in javascript vm for '%s'", jsFile)
if !fn.IsFunction() {
logger.Log.Panicf("%s does not contain a function code", jsFile)
logger.E("%s does not contain a function code", jsFile)
os.Exit(1)
}
err = pongo2.RegisterFilter(
fileBase,
func(in, param *pongo2.Value) (out *pongo2.Value, erro *pongo2.Error) {
thisObj, _ := vm.Object("({})")
var err error
if mark2web.CurrentContext != nil {
thisObj.Set("context", *mark2web.CurrentContext)
err = thisObj.Set("context", *mark2web.CurrentContext)
}
if err != nil {
logger.Log.Panicf("could not set context as in '%s': %s", jsFile, err)
}
logger.Perr(err, "could not set context in '%s': %s", jsFile)
ret, err := fn.Call(thisObj.Value(), in.Interface(), param.Interface())
if err != nil {
logger.Log.Panicf("error in javascript file '%s' while calling returned function: %s", jsFile, err)
}
logger.Eexit(err, "error in javascript file '%s' while calling returned function", jsFile)
retGo, err := ret.Export()
if err != nil {
logger.Log.Panicf("export error for '%s': %s", jsFile, err)
}
logger.Perr(err, "export error for '%s'", jsFile)
return pongo2.AsValue(retGo), nil
},
)
if err != nil {
logger.Log.Panicf("could not register filter from '%s': %s", jsFile, err)
}
logger.Perr(err, "could not register filter from '%s'", jsFile)
}
}

View File

@ -13,6 +13,7 @@ import (
"strings"
"gitbase.de/apairon/mark2web/pkg/helper"
"gitbase.de/apairon/mark2web/pkg/jobm"
"gitbase.de/apairon/mark2web/pkg/logger"
"gitbase.de/apairon/mark2web/pkg/mark2web"
"github.com/disintegration/imaging"
@ -168,63 +169,67 @@ func ImageProcessFilter(in *pongo2.Value, param *pongo2.Value) (*pongo2.Value, *
if f, err := os.Stat(imgTarget); err == nil && !f.IsDir() {
logger.Log.Noticef("skipped processing image from %s to %s, file already exists", imgSource, imgTarget)
} else {
mark2web.ThreadStart(func() {
logger.Log.Noticef("processing image from %s to %s", imgSource, imgTarget)
if strings.HasPrefix(imgSource, "http://") || strings.HasPrefix(imgSource, "https://") {
// webrequest before finding target filename, because of file format in filename
} else {
img, err = imaging.Open(imgSource, imaging.AutoOrientation(true))
if err != nil {
logger.Log.Panicf("filter:image_resize, could not open image '%s': %s", imgSource, err)
jobm.Enqueue(jobm.Job{
Function: func() {
logger.Log.Noticef("processing image from %s to %s", imgSource, imgTarget)
if strings.HasPrefix(imgSource, "http://") || strings.HasPrefix(imgSource, "https://") {
// webrequest before finding target filename, because of file format in filename
} else {
img, err = imaging.Open(imgSource, imaging.AutoOrientation(true))
if err != nil {
logger.Log.Panicf("filter:image_resize, could not open image '%s': %s", imgSource, err)
}
}
}
switch p.Process {
case "resize":
img = imaging.Resize(img, p.Width, p.Height, imaging.Lanczos)
case "fit":
img = imaging.Fit(img, p.Width, p.Height, imaging.Lanczos)
case "fill":
var anchor imaging.Anchor
switch strings.ToLower(p.Anchor) {
case "":
fallthrough
case "center":
anchor = imaging.Center
case "topleft":
anchor = imaging.TopLeft
case "top":
anchor = imaging.Top
case "topright":
anchor = imaging.TopRight
case "left":
anchor = imaging.Left
case "right":
anchor = imaging.Right
case "bottomleft":
anchor = imaging.BottomLeft
case "bottom":
anchor = imaging.Bottom
case "bottomright":
anchor = imaging.BottomRight
switch p.Process {
case "resize":
img = imaging.Resize(img, p.Width, p.Height, imaging.Lanczos)
case "fit":
img = imaging.Fit(img, p.Width, p.Height, imaging.Lanczos)
case "fill":
var anchor imaging.Anchor
switch strings.ToLower(p.Anchor) {
case "":
fallthrough
case "center":
anchor = imaging.Center
case "topleft":
anchor = imaging.TopLeft
case "top":
anchor = imaging.Top
case "topright":
anchor = imaging.TopRight
case "left":
anchor = imaging.Left
case "right":
anchor = imaging.Right
case "bottomleft":
anchor = imaging.BottomLeft
case "bottom":
anchor = imaging.Bottom
case "bottomright":
anchor = imaging.BottomRight
default:
logger.Log.Panicf("filter:image_resize, unknown anchor a=%s definition", p.Anchor)
}
img = imaging.Fill(img, p.Width, p.Height, anchor, imaging.Lanczos)
default:
logger.Log.Panicf("filter:image_resize, unknown anchor a=%s definition", p.Anchor)
logger.Log.Panicf("filter:image_resize, invalid p parameter '%s'", p.Process)
}
img = imaging.Fill(img, p.Width, p.Height, anchor, imaging.Lanczos)
default:
logger.Log.Panicf("filter:image_resize, invalid p parameter '%s'", p.Process)
}
var encodeOptions = make([]imaging.EncodeOption, 0)
if p.Quality > 0 {
encodeOptions = append(encodeOptions, imaging.JPEGQuality(p.Quality))
}
var encodeOptions = make([]imaging.EncodeOption, 0)
if p.Quality > 0 {
encodeOptions = append(encodeOptions, imaging.JPEGQuality(p.Quality))
}
err = imaging.Save(img, imgTarget, encodeOptions...)
if err != nil {
logger.Log.Panicf("filter:image_resize, could save image '%s': %s", imgTarget, err)
}
logger.Log.Noticef("finished image: %s", imgTarget)
err = imaging.Save(img, imgTarget, encodeOptions...)
if err != nil {
logger.Log.Panicf("filter:image_resize, could save image '%s': %s", imgTarget, err)
}
logger.Log.Noticef("finished image: %s", imgTarget)
},
Description: "process image " + imgSource,
Category: "image process",
})
}
return pongo2.AsValue(mark2web.CurrentTreeNode.ResolveNavPath(p.Filename)), nil

58
pkg/jobm/jobmanager.go Normal file
View File

@ -0,0 +1,58 @@
package jobm
import (
"runtime"
"sync"
"gitbase.de/apairon/mark2web/pkg/logger"
)
var wg sync.WaitGroup
var numCPU = runtime.NumCPU()
// Job is a wrapper to descripe a Job function
type Job struct {
Function func()
Description string
Category string
}
var jobChan = make(chan Job)
func worker(jobChan <-chan Job) {
defer wg.Done()
for job := range jobChan {
job.Function()
}
}
func init() {
logger.Log.Infof("number of CPU core: %d", numCPU)
// one core for main thread
for i := 0; i < (numCPU - 1); i++ {
wg.Add(1)
go worker(jobChan)
}
}
// Enqueue enqueues a job to the job queue
func Enqueue(job Job) {
if numCPU <= 1 {
// no threading
job.Function()
return
}
jobChan <- job
}
// Wait will wait for all jobs to finish
func Wait() {
close(jobChan)
wg.Wait()
}
// SetNumCPU is for testing package without threading
func SetNumCPU(i int) {
numCPU = i
}

View File

@ -13,7 +13,7 @@ var logBackendLeveled logging.LeveledBackend
// SetLogLevel sets log level for global logger (debug, info, notice, warning, error)
func SetLogLevel(level string) {
logBackendLevel := logging.NOTICE
logBackendLevel := logging.INFO
switch level {
case "debug":
logBackendLevel = logging.DEBUG
@ -58,3 +58,67 @@ func init() {
spew.Config.DisablePointerMethods = true
configureLogger()
}
// D is shorthand for Debugf
func D(format string, args ...interface{}) {
Log.Debugf(format, args...)
}
// I is shorthand for Infof
func I(format string, args ...interface{}) {
Log.Infof(format, args...)
}
// N is shorthand for Noticef
func N(format string, args ...interface{}) {
Log.Noticef(format, args...)
}
// W is shorthand for Warningf
func W(format string, args ...interface{}) {
Log.Warningf(format, args...)
}
// E is shorthand for Errorf
func E(format string, args ...interface{}) {
Log.Errorf(format, args...)
}
// P is shorthand for Panicf
func P(format string, args ...interface{}) {
Log.Panicf(format, args...)
}
// Eerr is shorthand for
// if err != nil {
// Log.Errorf(...)
// }
func Eerr(err error, format string, args ...interface{}) {
if err != nil {
args = append(args, err)
Log.Errorf(format+"\nError: %s", args...)
}
}
// Eexit is shorthand for
// if err != nil {
// Log.Errorf(...)
// os.Exit(1)
// }
func Eexit(err error, format string, args ...interface{}) {
Eerr(err, format, args...)
if err != nil {
os.Exit(1)
}
}
// Perr is shorthand for
// if err != nil {
// Log.Panicf(...)
// }
func Perr(err error, format string, args ...interface{}) {
if err != nil {
args = append(args, err)
Log.Panicf(format+"\nError: %s", args...)
}
}

View File

@ -7,56 +7,62 @@ import (
"os"
"path"
"gitbase.de/apairon/mark2web/pkg/jobm"
"gitbase.de/apairon/mark2web/pkg/logger"
)
func handleCompression(filename string, content []byte) {
ThreadStart(func() {
if _, ok := Config.Compress.Extensions[path.Ext(filename)]; ok {
jobm.Enqueue(jobm.Job{
Function: func() {
if _, ok := Config.Compress.Extensions[path.Ext(filename)]; ok {
if Config.Compress.Brotli {
handleBrotliCompression(filename, content)
if Config.Compress.Brotli {
handleBrotliCompression(filename, content)
}
if Config.Compress.GZIP {
gzFilename := filename + ".gz"
logger.Log.Infof("writing to compressed output file: %s", gzFilename)
f, err := os.Create(gzFilename)
if err != nil {
logger.Log.Panicf("could not create file '%s': %s", gzFilename, err)
}
defer f.Close()
zw, err := gzip.NewWriterLevel(f, gzip.BestCompression)
if err != nil {
logger.Log.Panicf("could not initialize gzip writer for '%s': %s", filename, err)
}
defer zw.Close()
if content != nil {
// content given
_, err = zw.Write(content)
if err != nil {
logger.Log.Panicf("could not write gziped content for '%s': %s", filename, err)
}
} else {
// read file
r, err := os.Open(filename)
if err != nil {
logger.Log.Panicf("could not open file '%s': %s", filename, err)
}
defer r.Close()
_, err = io.Copy(zw, r)
if err != nil {
logger.Log.Panicf("could not gzip file '%s': %s", filename, err)
}
}
}
}
if Config.Compress.GZIP {
gzFilename := filename + ".gz"
logger.Log.Infof("writing to compressed output file: %s", gzFilename)
f, err := os.Create(gzFilename)
if err != nil {
logger.Log.Panicf("could not create file '%s': %s", gzFilename, err)
}
defer f.Close()
zw, err := gzip.NewWriterLevel(f, gzip.BestCompression)
if err != nil {
logger.Log.Panicf("could not initialize gzip writer for '%s': %s", filename, err)
}
defer zw.Close()
if content != nil {
// content given
_, err = zw.Write(content)
if err != nil {
logger.Log.Panicf("could not write gziped content for '%s': %s", filename, err)
}
} else {
// read file
r, err := os.Open(filename)
if err != nil {
logger.Log.Panicf("could not open file '%s': %s", filename, err)
}
defer r.Close()
_, err = io.Copy(zw, r)
if err != nil {
logger.Log.Panicf("could not gzip file '%s': %s", filename, err)
}
}
}
}
},
Description: "compress " + filename,
Category: "compress",
})
}

View File

@ -1,5 +1,7 @@
package mark2web
import "gitbase.de/apairon/mark2web/pkg/jobm"
// Run will do a complete run of mark2web
func Run(inDir, outDir string, defaultPathConfig *PathConfig) {
SetTemplateDir(inDir + "/templates")
@ -12,5 +14,5 @@ func Run(inDir, outDir string, defaultPathConfig *PathConfig) {
tree.WriteWebserverConfig()
Wait()
jobm.Wait()
}

View File

@ -1,60 +0,0 @@
package mark2web
import (
"runtime"
"sync"
"gitbase.de/apairon/mark2web/pkg/logger"
)
var wg sync.WaitGroup
var numCPU = runtime.NumCPU()
var curNumThreads = 1 // main thread is 1
func init() {
logger.Log.Infof("number of CPU core: %d", numCPU)
}
// Wait will wait for all our internal go threads
func Wait() {
wg.Wait()
}
// ThreadSetup adds 1 to wait group
func ThreadSetup() {
curNumThreads++
wg.Add(1)
}
// ThreadDone removes 1 from wait group
func ThreadDone() {
curNumThreads--
wg.Done()
}
// ThreadStart will start a thread an manages the wait group
func ThreadStart(f func(), forceNewThread ...bool) {
force := false
if len(forceNewThread) > 0 && forceNewThread[0] {
force = true
}
if numCPU > curNumThreads || force {
// only new thread if empty CPU core available or forced
threadF := func() {
f()
ThreadDone()
}
ThreadSetup()
go threadF()
} else {
logger.Log.Debugf("no more CPU core (%d used), staying in main thread", curNumThreads)
f()
}
}
// SetNumCPU is for testing package without threading
func SetNumCPU(i int) {
numCPU = i
}