Compress network file transfers

This commit is contained in:
Justine Tunney 2022-05-14 23:17:22 -07:00
parent 6bf6f9e376
commit c446af799e
2 changed files with 101 additions and 15 deletions

View file

@ -30,6 +30,7 @@
#include "libc/log/check.h"
#include "libc/log/log.h"
#include "libc/macros.internal.h"
#include "libc/mem/mem.h"
#include "libc/nexgen32e/crc32.h"
#include "libc/runtime/gc.internal.h"
#include "libc/runtime/runtime.h"
@ -51,6 +52,7 @@
#include "libc/x/x.h"
#include "net/https/https.h"
#include "third_party/mbedtls/ssl.h"
#include "third_party/zlib/zlib.h"
#include "tool/build/lib/eztls.h"
#include "tool/build/lib/psk.h"
#include "tool/build/runit.h"
@ -306,6 +308,28 @@ TryAgain:
freeaddrinfo(ai);
}
static void Send(const void *output, size_t outputsize) {
int rc, have;
static bool once;
static z_stream zs;
static char zbuf[4096];
if (!once) {
CHECK_EQ(Z_OK, deflateInit2(&zs, Z_DEFAULT_COMPRESSION, Z_DEFLATED,
MAX_WBITS, DEF_MEM_LEVEL, Z_DEFAULT_STRATEGY));
once = true;
}
zs.next_in = output;
zs.avail_in = outputsize;
do {
zs.avail_out = sizeof(zbuf);
zs.next_out = (unsigned char *)zbuf;
rc = deflate(&zs, Z_SYNC_FLUSH);
CHECK_NE(Z_STREAM_ERROR, rc);
have = sizeof(zbuf) - zs.avail_out;
CHECK_EQ(have, mbedtls_ssl_write(&ezssl, zbuf, have));
} while (!zs.avail_out);
}
void SendRequest(void) {
int fd;
char *p;
@ -316,7 +340,7 @@ void SendRequest(void) {
const char *name;
unsigned char *hdr, *q;
size_t progsize, namesize, hdrsize;
DEBUGF("running %s on %s", g_prog, g_hostname);
unsigned have;
CHECK_NE(-1, (fd = open(g_prog, O_RDONLY)));
CHECK_NE(-1, fstat(fd, &st));
CHECK_NE(MAP_FAILED, (p = mmap(0, st.st_size, PROT_READ, MAP_SHARED, fd, 0)));
@ -332,10 +356,9 @@ void SendRequest(void) {
q = WRITE32BE(q, crc);
q = mempcpy(q, name, namesize);
assert(hdrsize == q - hdr);
CHECK_EQ(hdrsize, mbedtls_ssl_write(&ezssl, hdr, hdrsize));
for (i = 0; i < progsize; i += rc) {
CHECK_GT((rc = mbedtls_ssl_write(&ezssl, p + i, progsize - i)), 0);
}
DEBUGF("running %s on %s", g_prog, g_hostname);
Send(hdr, hdrsize);
Send(p, progsize);
CHECK_EQ(0, EzTlsFlush(&ezbio, 0, 0));
CHECK_NE(-1, munmap(p, st.st_size));
CHECK_NE(-1, close(fd));

View file

@ -51,6 +51,7 @@
#include "net/https/https.h"
#include "third_party/getopt/getopt.h"
#include "third_party/mbedtls/ssl.h"
#include "third_party/zlib/zlib.h"
#include "tool/build/lib/eztls.h"
#include "tool/build/lib/psk.h"
#include "tool/build/runit.h"
@ -256,16 +257,78 @@ void SendOutputFragmentMessage(enum RunitCommand kind, unsigned char *buf,
CHECK_EQ(0, EzTlsFlush(&ezbio, 0, 0));
}
void Recv(void *p, size_t n) {
size_t i, rc;
for (i = 0; i < n; i += rc) {
do {
rc = mbedtls_ssl_read(&ezssl, (char *)p + i, n - i);
DEBUGF("read(%ld)", rc);
} while (rc == MBEDTLS_ERR_SSL_WANT_READ);
if (rc <= 0) TlsDie("read failed", rc);
void Recv(void *output, size_t outputsize) {
int rc;
ssize_t tx, chunk, received;
static bool once;
static int zstatus;
static char buf[4096];
static z_stream zs;
static struct {
size_t off;
size_t len;
size_t cap;
char *data;
} rbuf;
if (!once) {
CHECK_EQ(Z_OK, inflateInit(&zs));
once = true;
}
for (;;) {
if (rbuf.len >= outputsize) {
tx = MIN(outputsize, rbuf.len);
memcpy(output, rbuf.data + rbuf.off, outputsize);
rbuf.len -= outputsize;
rbuf.off += outputsize;
// trim dymanic buffer once it empties
if (!rbuf.len) {
rbuf.off = 0;
rbuf.cap = 4096;
rbuf.data = realloc(rbuf.data, rbuf.cap);
}
return;
}
if (zstatus == Z_STREAM_END) {
FATALF("recv zlib unexpected eof");
}
// get another fixed-size data packet from network
// pass along error conditions to caller
// pass along eof condition to zlib
received = mbedtls_ssl_read(&ezssl, buf, sizeof(buf));
if (received < 0) TlsDie("read failed", received);
// decompress packet completely
// into a dynamical size buffer
zs.avail_in = received;
zs.next_in = (unsigned char *)buf;
CHECK_EQ(Z_OK, zstatus);
do {
// make sure we have a reasonable capacity for zlib output
if (rbuf.cap - (rbuf.off + rbuf.len) < sizeof(buf)) {
rbuf.cap += sizeof(buf);
rbuf.data = realloc(rbuf.data, rbuf.cap);
}
// inflate packet, which naturally can be much larger
// permit zlib no delay flushes that come from sender
zs.next_out = (unsigned char *)rbuf.data + (rbuf.off + rbuf.len);
zs.avail_out = chunk = rbuf.cap - (rbuf.off + rbuf.len);
zstatus = inflate(&zs, Z_SYNC_FLUSH);
CHECK_NE(Z_STREAM_ERROR, zstatus);
switch (zstatus) {
case Z_NEED_DICT:
zstatus = Z_DATA_ERROR; // make negative
// fallthrough
case Z_DATA_ERROR:
case Z_MEM_ERROR:
FATALF("tls recv zlib hard error %d", zstatus);
case Z_BUF_ERROR:
zstatus = Z_OK; // harmless? nothing for inflate to do
break; // it probably just our wraparound eof
default:
rbuf.len += chunk - zs.avail_out;
break;
}
} while (!zs.avail_out);
}
DEBUGF("Recv(%ld)", n);
}
void HandleClient(void) {
@ -293,6 +356,7 @@ void HandleClient(void) {
EzHandshake();
addrstr = gc(DescribeAddress(&addr));
DEBUGF("%s %s %s", gc(DescribeAddress(&g_servaddr)), "accepted", addrstr);
Recv(msg, sizeof(msg));
CHECK_EQ(RUNITD_MAGIC, READ32BE(msg));
CHECK_EQ(kRunitExecute, msg[4]);
@ -459,7 +523,6 @@ void Daemonize(void) {
int main(int argc, char *argv[]) {
int i;
ShowCrashReports();
SetupPresharedKeySsl(MBEDTLS_SSL_IS_SERVER, GetRunitPsk());
/* __log_level = kLogDebug; */
GetOpts(argc, argv);