Elixir: Detect end of stream

Posted on December 16, 2022
Tags: elixir

There are certain scenarios where you want to know when you have encountered the last element in your stream of data.

For instance, if you stream a binary data to google buckets and you do not know the size of the binary you send the following headers:

{"Content-Length", "#{bytes}"}
{"Content-Range", "bytes #{total}-#{total + bytes - 1}/*"}

where bytes is the byte size of the current binary and total is the total amount of binary data sent so far.

Once you are sending the last chunk you will send the following headers:

{"Content-Length", "#{bytes}"},
{"Content-Range", "bytes #{total_binary}-#{total_binary + bytes - 1}/#{total_binary + bytes}"}

so we replace the * with total_binary + bytes.


One way to do that easily is to use Stream.concat/2:

iex(1)> 0..2 |> Stream.concat([:end]) |> Enum.to_list
[0, 1, 2, :end]

So now our binary stream to google buckets can look like this:

acc = %{total: 0, prev: nil}

stream
|> Stream.concat([:end])
|> Enum.reduce(acc, fn
  elem, %{total: 0, prev: nil} ->
    %{total: 0, prev: elem}

  :end, %{total: total, prev: prev} ->
    bytes = byte_size(last)

    headers = [
      {"Content-Length", "#{bytes}"},
      {"Content-Range", "bytes #{total}-#{total + bytes - 1}/#{total + bytes}"}
    ]

  # Upload final data chunk to google buckets

  elem, %{total: total, prev: prev} ->
    bytes = byte_size(prev)

    headers = [
      {"Content-Length", "#{bytes}"},
      {"Content-Range", "bytes #{total}-#{total + bytes - 1}/*"}
    ]

    # Upload data to google buckets

    %{total: total + bytes, prev: elem}
end)