trackStream.js 1.4 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667
  1. export const streamChunk = function* (chunk, chunkSize) {
  2. let len = chunk.byteLength;
  3. if (!chunkSize || len < chunkSize) {
  4. yield chunk;
  5. return;
  6. }
  7. let pos = 0;
  8. let end;
  9. while (pos < len) {
  10. end = pos + chunkSize;
  11. yield chunk.slice(pos, end);
  12. pos = end;
  13. }
  14. }
  15. export const readBytes = async function* (iterable, chunkSize, encode) {
  16. for await (const chunk of iterable) {
  17. yield* streamChunk(ArrayBuffer.isView(chunk) ? chunk : (await encode(String(chunk))), chunkSize);
  18. }
  19. }
  20. export const trackStream = (stream, chunkSize, onProgress, onFinish, encode) => {
  21. const iterator = readBytes(stream, chunkSize, encode);
  22. let bytes = 0;
  23. let done;
  24. let _onFinish = (e) => {
  25. if (!done) {
  26. done = true;
  27. onFinish && onFinish(e);
  28. }
  29. }
  30. return new ReadableStream({
  31. async pull(controller) {
  32. try {
  33. const {done, value} = await iterator.next();
  34. if (done) {
  35. _onFinish();
  36. controller.close();
  37. return;
  38. }
  39. let len = value.byteLength;
  40. if (onProgress) {
  41. let loadedBytes = bytes += len;
  42. onProgress(loadedBytes);
  43. }
  44. controller.enqueue(new Uint8Array(value));
  45. } catch (err) {
  46. _onFinish(err);
  47. throw err;
  48. }
  49. },
  50. cancel(reason) {
  51. _onFinish(reason);
  52. return iterator.return();
  53. }
  54. }, {
  55. highWaterMark: 2
  56. })
  57. }