Add streaming command support.

Add options
- `stream-stdout-in-response`
- `stream-stdout-in-response-on-error`
- `stream-command-kill-grace-period-seconds`

to allow defining webhooks which dynamically stream large content back to the
requestor. This allows the creation of download endpoints from scripts, i.e.
running a `git archive` command or a database dump from a docker container,
without needing to buffer up the original.
This commit is contained in:
Will Rouesnel 2019-03-01 15:17:23 +11:00
parent 0aa7395e21
commit 08fc28b38f
8 changed files with 564 additions and 128 deletions

View file

@ -5,13 +5,58 @@ package main
import (
"fmt"
"os"
"strconv"
"strings"
"strconv"
"io"
)
func checkPrefix(prefixMap map[string]struct{}, prefix string, arg string) bool {
if _, found := prefixMap[prefix]; found {
fmt.Printf("prefix specified more then once: %s", arg)
os.Exit(-1)
}
if strings.HasPrefix(arg, prefix) {
prefixMap[prefix] = struct{}{}
return true
}
return false
}
func main() {
var outputStream io.Writer
outputStream = os.Stdout
seenPrefixes := make(map[string]struct{})
exit_code := 0
for _, arg := range os.Args[1:] {
if checkPrefix(seenPrefixes, "stream=", arg) {
switch arg {
case "stream=stdout":
outputStream = os.Stdout
case "stream=stderr":
outputStream = os.Stderr
case "stream=both":
outputStream = io.MultiWriter(os.Stdout, os.Stderr)
default:
fmt.Printf("unrecognized stream specification: %s", arg)
os.Exit(-1)
}
} else if checkPrefix(seenPrefixes, "exit=", arg) {
exit_code_str := arg[5:]
var err error
exit_code_conv, err := strconv.Atoi(exit_code_str)
exit_code = exit_code_conv
if err != nil {
fmt.Printf("Exit code %s not an int!", exit_code_str)
os.Exit(-1)
}
}
}
if len(os.Args) > 1 {
fmt.Printf("arg: %s\n", strings.Join(os.Args[1:], " "))
fmt.Fprintf(outputStream, "arg: %s\n", strings.Join(os.Args[1:], " "))
}
var env []string
@ -22,16 +67,8 @@ func main() {
}
if len(env) > 0 {
fmt.Printf("env: %s\n", strings.Join(env, " "))
fmt.Fprintf(outputStream, "env: %s\n", strings.Join(env, " "))
}
if (len(os.Args) > 1) && (strings.HasPrefix(os.Args[1], "exit=")) {
exit_code_str := os.Args[1][5:]
exit_code, err := strconv.Atoi(exit_code_str)
if err != nil {
fmt.Printf("Exit code %s not an int!", exit_code_str)
os.Exit(-1)
}
os.Exit(exit_code)
}
os.Exit(exit_code)
}

View file

@ -204,5 +204,36 @@
"name": "passed"
}
],
},
{
"id": "stream-stdout-in-response",
"pass-arguments-to-command": [
{
"source": "string",
"name": "exit=0"
},
{
"source": "string",
"name": "stream=both"
}
],
"execute-command": "{{ .Hookecho }}",
"stream-stdout-in-response": true
},
{
"id": "stream-stderr-in-response-on-error",
"pass-arguments-to-command": [
{
"source": "string",
"name": "exit=1"
},
{
"source": "string",
"name": "stream=stderr"
}
],
"execute-command": "{{ .Hookecho }}",
"stream-stdout-in-response": true,
"stream-stderr-in-response-on-error": true
}
]

View file

@ -113,4 +113,23 @@
- id: warn-on-space
execute-command: '{{ .Hookecho }} foo'
include-command-output-in-response: true
include-command-output-in-response: true
- id: stream-stdout-in-response
execute-command: '{{ .Hookecho }}'
stream-stdout-in-response: true
pass-arguments-to-command:
- source: string
name: exit=0
- source: string
name: stream=both
- id: stream-stderr-in-response-on-error
execute-command: '{{ .Hookecho }}'
stream-stdout-in-response: true
stream-stderr-in-response-on-error: true
pass-arguments-to-command:
- source: string
name: exit=1
- source: string
name: stream=stderr

View file

@ -0,0 +1,107 @@
// Hook Stream is a simple utility for testing Webhook streaming capability. It spawns a TCP server on execution
// which echos all connections to its stdout until it receives the string EOF.
package main
import (
"fmt"
"os"
"strings"
"strconv"
"io"
"net"
"bufio"
)
func checkPrefix(prefixMap map[string]struct{}, prefix string, arg string) bool {
if _, found := prefixMap[prefix]; found {
fmt.Printf("prefix specified more then once: %s", arg)
os.Exit(-1)
}
if strings.HasPrefix(arg, prefix) {
prefixMap[prefix] = struct{}{}
return true
}
return false
}
func main() {
var outputStream io.Writer
outputStream = os.Stdout
seenPrefixes := make(map[string]struct{})
exit_code := 0
for _, arg := range os.Args[1:] {
if checkPrefix(seenPrefixes, "stream=", arg) {
switch arg {
case "stream=stdout":
outputStream = os.Stdout
case "stream=stderr":
outputStream = os.Stderr
case "stream=both":
outputStream = io.MultiWriter(os.Stdout, os.Stderr)
default:
fmt.Printf("unrecognized stream specification: %s", arg)
os.Exit(-1)
}
} else if checkPrefix(seenPrefixes, "exit=", arg) {
exit_code_str := arg[5:]
var err error
exit_code_conv, err := strconv.Atoi(exit_code_str)
exit_code = exit_code_conv
if err != nil {
fmt.Printf("Exit code %s not an int!", exit_code_str)
os.Exit(-1)
}
}
}
l, err := net.Listen("tcp", "localhost:0")
if err != nil {
fmt.Printf("Error starting tcp server: %v\n", err)
os.Exit(-1)
}
defer l.Close()
// Emit the address of the server
fmt.Printf("%v\n",l.Addr())
manageCh := make(chan struct{})
go func() {
for {
conn, err := l.Accept()
if err != nil {
fmt.Printf("Error accepting connection: %v\n", err)
os.Exit(-1)
}
go handleRequest(manageCh, outputStream, conn)
}
}()
<- manageCh
l.Close()
os.Exit(exit_code)
}
// Handles incoming requests.
func handleRequest(manageCh chan<- struct{}, w io.Writer, conn net.Conn) {
defer conn.Close()
bio := bufio.NewScanner(conn)
for bio.Scan() {
if line := strings.TrimSuffix(bio.Text(), "\n"); line == "EOF" {
// Request program close
select {
case manageCh <- struct{}{}:
// Request sent.
default:
// Already closing
}
break
}
fmt.Fprintf(w, "%s\n", bio.Text())
}
}