From 210ac5a42be00300673f201cdd164cc4ec06acff Mon Sep 17 00:00:00 2001 From: Laura Date: Wed, 13 Aug 2025 03:57:52 +0200 Subject: [PATCH] multithreading --- cmd/ffwebp/main.go | 99 ++++++++++++++++++++++++++++++++++--------- internal/logx/logx.go | 17 ++++++-- 2 files changed, 93 insertions(+), 23 deletions(-) diff --git a/cmd/ffwebp/main.go b/cmd/ffwebp/main.go index 1e696f5..715e485 100644 --- a/cmd/ffwebp/main.go +++ b/cmd/ffwebp/main.go @@ -1,11 +1,13 @@ package main import ( + "bytes" "context" "fmt" "io" "os" "path/filepath" + "runtime" "strings" "time" @@ -65,6 +67,11 @@ func main() { Aliases: []string{"t"}, Usage: "create a thumbnail no wider/taller than the specified size", }, + &cli.IntFlag{ + Name: "threads", + Usage: "number of worker threads (0=auto)", + Value: 0, + }, &cli.BoolFlag{ Name: "silent", Aliases: []string{"s"}, @@ -192,26 +199,79 @@ func run(_ context.Context, cmd *cli.Command) error { } } - for i := range inputs { - in := inputs[i] - out := outputs[i] + threads := cmd.Int("threads") - if err := processOne(in, out, cmd, &common); err != nil { - return err + if threads <= 0 { + threads = runtime.NumCPU() + } + + threads = min(threads, len(inputs)) + + type job struct { + i int + } + + var ( + jobs = make(chan job) + errs = make(chan error, len(inputs)) + ) + + for w := 0; w < threads; w++ { + go func() { + for j := range jobs { + var logger *bytes.Buffer + + if threads > 1 { + logger = bytes.NewBuffer(nil) + } + + in := inputs[j.i] + out := outputs[j.i] + + local := common + + if err := processOne(in, out, cmd, &local, logger); err != nil { + errs <- fmt.Errorf("%s -> %s: %w", filepath.ToSlash(in), filepath.ToSlash(out), err) + } else { + errs <- nil + } + + if threads > 1 { + logx.Print(logger.String()) + } + } + }() + } + + go func() { + for i := range inputs { + jobs <- job{i: i} + } + + close(jobs) + }() + + var first error + + for range inputs { + err := <-errs + + if err != nil && first == nil { + first = err } } - return nil + return first } -func processOne(input, output string, cmd *cli.Command, common *opts.Common) error { +func processOne(input, output string, cmd *cli.Command, common *opts.Common, logger io.Writer) error { var ( reader io.Reader = os.Stdin writer *countWriter = &countWriter{w: os.Stdout} ) if input != "-" { - logx.Printf("opening input file %q\n", filepath.ToSlash(input)) + logx.Fprintf(logger, "opening input file %q\n", filepath.ToSlash(input)) file, err := os.OpenFile(input, os.O_RDONLY, 0) if err != nil { @@ -222,7 +282,7 @@ func processOne(input, output string, cmd *cli.Command, common *opts.Common) err reader = file } else { - logx.Printf("reading input from \n") + logx.Fprintf(logger, "reading input from \n") } sniffed, reader2, err := codec.Sniff(reader, input, cmd.Bool("sniff")) @@ -232,7 +292,7 @@ func processOne(input, output string, cmd *cli.Command, common *opts.Common) err reader = reader2 - logx.Printf("sniffed codec: %s (%q)\n", sniffed.Codec, sniffed) + logx.Fprintf(logger, "sniffed codec: %s (%q)\n", sniffed.Codec, sniffed) var mappedFromDir bool @@ -259,9 +319,10 @@ func processOne(input, output string, cmd *cli.Command, common *opts.Common) err return err } - common.OutputExtension = oExt + localOpts := *common + localOpts.OutputExtension = oExt - logx.Printf("output codec: %s (forced=%v)\n", oCodec, cmd.IsSet("codec")) + logx.Fprintf(logger, "output codec: %s (forced=%v)\n", oCodec, cmd.IsSet("codec")) if output != "-" { curExt := strings.TrimPrefix(filepath.Ext(output), ".") @@ -273,7 +334,7 @@ func processOne(input, output string, cmd *cli.Command, common *opts.Common) err } if output != "-" { - logx.Printf("opening output file %q\n", filepath.ToSlash(output)) + logx.Fprintf(logger, "opening output file %q\n", filepath.ToSlash(output)) if err := os.MkdirAll(filepath.Dir(output), 0755); err != nil { return err @@ -288,7 +349,7 @@ func processOne(input, output string, cmd *cli.Command, common *opts.Common) err writer = &countWriter{w: file} } else { - logx.Printf("writing output to \n") + logx.Fprintf(logger, "writing output to \n") } t0 := time.Now() @@ -298,14 +359,14 @@ func processOne(input, output string, cmd *cli.Command, common *opts.Common) err return err } - logx.Printf("decoded image: %dx%d %s in %s\n", img.Bounds().Dx(), img.Bounds().Dy(), colorModel(img), time.Since(t0).Truncate(time.Millisecond)) + logx.Fprintf(logger, "decoded image: %dx%d %s in %s\n", img.Bounds().Dx(), img.Bounds().Dy(), colorModel(img), time.Since(t0).Truncate(time.Millisecond)) if thumbnail := cmd.Uint("thumbnail"); thumbnail > 0 { t2 := time.Now() img = resize.Thumbnail(thumbnail, thumbnail, img, resize.Lanczos3) - logx.Printf("resized image: %dx%d in %s\n", img.Bounds().Dx(), img.Bounds().Dy(), time.Since(t2).Truncate(time.Millisecond)) + logx.Fprintf(logger, "resized image: %dx%d in %s\n", img.Bounds().Dx(), img.Bounds().Dy(), time.Since(t2).Truncate(time.Millisecond)) } t1 := time.Now() @@ -316,16 +377,16 @@ func processOne(input, output string, cmd *cli.Command, common *opts.Common) err if err != nil { return err } else if n > 0 { - logx.Printf("applied %d effect(s) in %s\n", n, time.Since(t1).Truncate(time.Millisecond)) + logx.Fprintf(logger, "applied %d effect(s) in %s\n", n, time.Since(t1).Truncate(time.Millisecond)) } t2 := time.Now() - if err := oCodec.Encode(writer, img, *common); err != nil { + if err := oCodec.Encode(writer, img, localOpts); err != nil { return err } - logx.Printf("encoded %d KiB in %s\n", (writer.n+1023)/1024, time.Since(t2).Truncate(time.Millisecond)) + logx.Fprintf(logger, "encoded %d KiB in %s\n", (writer.n+1023)/1024, time.Since(t2).Truncate(time.Millisecond)) return nil } diff --git a/internal/logx/logx.go b/internal/logx/logx.go index 35ee5ff..2bafdf0 100644 --- a/internal/logx/logx.go +++ b/internal/logx/logx.go @@ -3,6 +3,7 @@ package logx import ( "fmt" "image" + "io" "os" "sync/atomic" "time" @@ -18,11 +19,15 @@ func SetSilent() { enabled.Store(false) } -func Printf(format string, a ...any) { +func Fprintf(writer io.Writer, format string, a ...any) { if !enabled.Load() { return } + if writer == nil { + writer = os.Stderr + } + for i, v := range a { switch r := v.(type) { case time.Time: @@ -36,15 +41,19 @@ func Printf(format string, a ...any) { } } - fmt.Fprintf(os.Stderr, format, a...) + fmt.Fprintf(writer, format, a...) } -func PrintKV(codec, key string, val any) { +func Printf(format string, a ...any) { + Fprintf(os.Stderr, format, a...) +} + +func Print(message string) { if !enabled.Load() { return } - Printf("%s: %s=%v\n", codec, key, val) + fmt.Fprint(os.Stderr, message) } func Errorf(f string, a ...any) {