visit
What is a Stream? A Stream is a composable enumerable that is computed lazily. This is typically accomplished by maintaining a little bit of state that describes where you currently are in your enumerable and a method of computing the next item. A trivial example would be a Stream of natural numbers:
Stream.iterate(0, fn x -> x + 1 end)
The gory details of how this is implemented under the hood is the topic for another day. You rarely need to get that low level — the Stream library has a wealth of functions similar to that you can use as building blocks to construct your own Streams. And several Elixir library functions natively return streams: IO.stream will turn a file (or other source) into a Stream allowing you to process very large files without having to read them into memory first in their entirety. The lazy aspect of Streams means that the minimum amount of the Stream is computed that’s needed to generate your answer.
1
|> Stream.iterate(fn n -> n + 1 end)
|> Stream.filter(fn n -> s = Integer.to_string(n); s == String.reverse(s) end)
|> Stream.filter(fn n -> rem(n, 3) == 0 end)
|> Enum.at(99)
Now onto . The first thing to know is that ES caps the size of the result set at 10k items. After scrutinizing the documentation, it became clear that the original method that popped up on my Stack Overflow () had been deprecated and the new approved method was to use . PIT IDs essentially give you a snapshot of your index at one point in time so that you get a consistent view of your search results across multiple queries. The process is:
Creating the PID is just a matter of requesting a PID associated with the index that we’re interested in running queries against.
@spec create(ES.index_t()) :: {:ok, pit_t()} | {:error, ES.response_t()}
def create(index) do
:post
|> HTTP.request(url(index), HTTP.no_payload(), HTTP.json_encoding())
|> HTTP.on_200(& &1["id"])
end
Deleting a PID is a simple HTTP delete except that the PIDs are huge so you need to supply them in the body and not as URL params. Some HTTP libraries won’t let you do this. The HTTP spec is a little muddy on the matter and last I checked on Stack Overflow there was a healthy debate on the subject. This is why we use HTTPoison — it allows payloads on DELETE requests.
@spec delete(pit_t()) :: :ok | {:error, HTTPoison.AsyncResponse | HTTPoison.MaybeRedirect | HTTPoison.Response}
def delete(pit) do
url = HTTP.generate_url("_pit", ES.no_index(), HTTP.no_options())
with {:ok, payload} <- Poison.encode(%{"id" => pit}),
%HTTPoison.Response{status_code: 200} <-
HTTPoison.request!(:delete, url, payload, HTTP.headers(HTTP.json_encoding())) do
:ok
else
error -> {:error, error}
end
end
Now that the PIDs are sorted out, our next order of business is figuring out how to leverage them in our queries. Our basic query is enhanced with three extra parameters:
pit, a hash that contains %{id: <id>, keep_alive: “1m”}
sort, a hash that contains %{_shard_doc: “desc”}
(For sort, you need to provide something and _shard_doc is baked into every Elastic search index so that’s nice and portable.)
@spec initial(PIT.pit_t(), ES.query_t()) :: ES.query_t()
def initial(pit, query) do
%{
query: query,
pit: %{id: pit, keep_alive: PIT.expiration()},
size: ES.max_results(),
sort: %{"_shard_doc" => "desc"}
}
end
With our basic query down, we can focus on how to generate the sequence of queries that will extract all the items that satisfy our search criteria. The trick here is that we feed into the next query the value of our sort field from the last hit of the previous result, like so:
@spec update(ES.query_t(), ES.response_t()) :: ES.query_t()
def update(query, %Req.Response{body: %{"hits" => %{"hits" => hits}}, status: 200}) do
Map.put(query, :search_after, List.last(hits)["sort"])
end
Armed with these two query-generating functions, we can code up Stream.iterator that will pull all the results out of our desired index:
@spec streamer(ES.query_t()) :: {:ok, Enumerable.t()} | {:error, any}
def streamer(initial) do
{:ok,
initial
|> search()
|> Stream.iterate(fn previous -> search(Query.update(initial, previous)) end)
}
rescue
error -> {:error, error}
end
@spec stream_many(ES.query_t(), ES.index_t(), ES.consumer_t(), non_neg_integer()) :: {:error, any()} | {:ok, Enumerable.t()}
def stream_many(query, index, consumer, count) do
case PIT.create(index) do
{:ok, pit_id} ->
try do
case pit_id |> Query.initial(query) |> streamer() do
{:ok, stream} ->
stream
|> Stream.flat_map(& &1.body["hits"]["hits"])
|> Stream.take(count)
|> then(consumer)
error -> error
end
after
PIT.delete(pit_id)
end
error -> error
end
rescue
error -> {:error, error}
end
@spec stream_one(ES.query_t(), ES.index_t(), ES.consumer_t()) :: {:ok, any()} | {:error, any()}
def stream_one(query, index, consumer) do
query
|> search(index)
|> HTTP.on_200(fn body -> consumer.(body["hits"]["hits"]) end)
end
Now we need a top-level function that query the size of the result and chooses between stream_one or stream_many depending on the value, like so:
@spec stream(ES.query_t(), ES.index_t(), ES.consumer_t()) :: {:ok, any()} | {:error, any()}
def stream(query, index, consumer) do
case count(query, index) do
{:ok, count} ->
if count > ES.max_results() do
stream_many(query, index, consumer, count)
else
stream_one(query, index, consumer)
end
error ->
error
end
end
The library we use to access is ex_aws_s3. It handles all the low-level details for us but it does have one requirement: the input data stream must be chunks of at least 4 Mbytes.
To accomplish this, we use the Stream function chunk_while. This takes four inputs:
Next, we turn our attention to the step function. It should check if the current size is greater than or equal to the desired chunk size. If it is, we should take the list of items from the accumulator and convert them into a chunk using convert; if not, we should add it to the current chunk (and update the size) using add_chunk.
What should add_chunk do? Just push the item onto the front of a list and increase the value of size by the current chunk’s size.
The behaviour of convert depends on whether we care about the order of the items in the chunk being preserved in the output because the items in the list will be in reverse order so need to be reversed. But if we don’t care, we can skip that transformation. Putting this all together gives us:
@spec chunker(Enumerable.t(), non_neg_integer(), boolean(), (any() -> non_neg_integer()), (Enumerable.t() -> any())) :: Enumerable.t()
def chunker(
chunks,
chunk_size,
ordered \\ true,
sizer \\ &String.length/1,
joiner \\ &Enum.join/1
) do
zero = {0, []}
convert =
if ordered do
fn chunks -> chunks |> Enum.reverse() |> then(joiner) end
else
joiner
end
final = fn {_, chunks} -> {:cont, convert.(chunks), zero} end
add_chunk = fn {size, chunks}, chunk -> {size + sizer.(chunk), [chunk | chunks]} end
step = fn chunk, accum = {size, chunks} ->
if size >= chunk_size do
{:cont, convert.(chunks), add_chunk.(zero, chunk)}
else
{:cont, add_chunk.(accum, chunk)}
end
end
Stream.chunk_while(chunks, zero, step, final)
end
We can now combine all the components we have written to produce a simple controller action that requests an output stream of hits from our Elasticsearch section into 4M chunks that will satisfy the requirement of the ExAWS.S3 module:
defmodule Controller do
@four_megabytes 4*1024*1024
def find_all(params) do
query(params) |> ElasticSearch.stream(params.index, &consumer(params, &1))
end
def query(params) do
%{term: %{params.query_term => params.query_value}}
end
def consumer(params, stream) do
stream
|> Stream.map(&Poison.encode!(&1["_source"]))
|> chunker(@four_megabytes, false)
|> S3.upload(params.bucket, params.filename)
|> ExAws.request()
end
end
By Lob Staff Engineer, Guy Argo.