From 6fa681b692ae5b63787241499edad3236cb9730b Mon Sep 17 00:00:00 2001 From: Concedo <39025047+LostRuins@users.noreply.github.com> Date: Fri, 20 Oct 2023 22:01:09 +0800 Subject: [PATCH] fixed a race condition with SSE streaming --- colab.ipynb | 2 +- klite.embd | 26 ++++++++++++++------------ koboldcpp.py | 3 ++- 3 files changed, 17 insertions(+), 14 deletions(-) diff --git a/colab.ipynb b/colab.ipynb index 9bc1956fb..7eb18a897 100644 --- a/colab.ipynb +++ b/colab.ipynb @@ -35,7 +35,7 @@ "!nohup ./cloudflared-linux-amd64 tunnel --url http://localhost:5001 &\r\n", "!sleep 10\r\n", "!cat nohup.out\r\n", - "!python koboldcpp.py model.ggml --usecublas 0 mmq --multiuser --gpulayers $Layers --hordeconfig concedo\r\n" + "!python koboldcpp.py model.ggml --usecublas 0 mmq --multiuser --gpulayers $Layers --hordeconfig concedo 1 1\r\n" ] } ], diff --git a/klite.embd b/klite.embd index 6bb441334..b7df6f401 100644 --- a/klite.embd +++ b/klite.embd @@ -2927,7 +2927,7 @@ Current version: 83 } } - function kobold_api_stream_sse(sub_endpt,submit_payload) + function kobold_api_stream_sse(sub_endpt,submit_payload,trackedgenid) { synchro_pending_stream = ""; fetch(sub_endpt, {method: 'POST', @@ -2941,15 +2941,17 @@ Current version: 83 ctrl.buf = ''; }, transform(chunk, ctrl) { - ctrl.buf += chunk; - let evs = []; - let m; - while ((m = /^event: (.*)\ndata: (.*)\n\n/.exec(ctrl.buf)) !== null) { - evs.push({event: m[1], data: JSON.parse(m[2])}); - ctrl.buf = ctrl.buf.substring(m.index + m[0].length); - } - if (evs.length) { - ctrl.enqueue(evs); + if (pending_response_id == trackedgenid) { + ctrl.buf += chunk; + let evs = []; + let m; + while ((m = /^event: (.*)\ndata: (.*)\n\n/.exec(ctrl.buf)) !== null) { + evs.push({ event: m[1], data: JSON.parse(m[2]) }); + ctrl.buf = ctrl.buf.substring(m.index + m[0].length); + } + if (evs.length) { + ctrl.enqueue(evs); + } } } })) @@ -8071,12 +8073,12 @@ Current version: 83 streamchunk = ((pstreamamount != null && pstreamamount > 0) ? pstreamamount:8); //8 tokens per stream tick by default } last_request_str = JSON.stringify(submit_payload); + let trackedgenid = pending_response_id; //if it changes, stop streaming if (localsettings.tokenstreammode==2 && is_using_kcpp_with_sse()) { let sub_endpt = apply_proxy_url(custom_kobold_endpoint + kobold_custom_gen_stream_endpoint); - kobold_api_stream_sse(sub_endpt, submit_payload); + kobold_api_stream_sse(sub_endpt, submit_payload, trackedgenid); } else { let sub_endpt = apply_proxy_url(custom_kobold_endpoint + kobold_custom_gen_endpoint); - let trackedgenid = pending_response_id; //if it changes, stop streaming kobold_api_stream(sub_endpt, submit_payload, submit_payload.max_length, trackedgenid, "", streamchunk); } } diff --git a/koboldcpp.py b/koboldcpp.py index b735dc58f..ee561f97f 100755 --- a/koboldcpp.py +++ b/koboldcpp.py @@ -365,7 +365,7 @@ maxhordelen = 256 modelbusy = threading.Lock() requestsinqueue = 0 defaultport = 5001 -KcppVersion = "1.47" +KcppVersion = "1.47.1" showdebug = True showsamplerwarning = True showmaxctxwarning = True @@ -527,6 +527,7 @@ class ServerRequestHandler(http.server.SimpleHTTPRequestHandler): current_token = 0 incomplete_token_buffer = bytearray() + await asyncio.sleep(0.1) #anti race condition, prevent check from overtaking generate while True: streamDone = handle.has_finished() #exit next loop on done tokenStr = ""