Reducers

Reducers #

Reducers are functions that process events on the stream. Reducers are deterministic, which allows them to be reliably replayed against the stream to reproduce state at any point. They can also dispatch side effects, such as dispatching events to other streams or making http requests, but during replay, side effects are voided.

Function arguments #

(state, event, index, current) => newState

state #

State is any serializable* value. State can accessed over HTTP with a GET request: https://pipet.io/user/stream.reducer. By default, state is a JSON value normally limited to 100KB.

*Between runs, state is serialized and parsed as JSON by default. However, reducers can also specify custom serialization and deserialization functions, either with (replacers and revivers) or with a custom class. In the case of the custom class, the JSON string will be passed to the constructor during instatiation and the toJSON method will be called on the instance during serialization.

event #

An instance of the Event class. Default is a string, but application/json and application/x-form-data are automatically parsed into objects before being passed to reducers.

index #

index is the event’s current index in the stream. Indexes are zero-based and work much like arrays.

current #

The current argument gives you access to other reducer state via private field current.#reducerName. These implicitly create a dependency, so the referenced reducer is always run before, to ensure the state is up to date with the latest event. These references cannot be circular (where reducer A references current.#B and reducer B references current.#A).

HTTP access #

Reducer state is accessible over HTTP at https://pipet.io/user/stream.reducer. Like streams, the URL accepts simple JavaScript expressions. Something like https://pipet.io/user/stream.reducer+1 will evaluate.

Side effects #

dispatch #

dispatch('/another/stream', 'Simple values can be dispatched')
// FormData can also be dispatched
const form = new FormData()
form.append('foo', 'bar')
dispatch('/another/stream', form)
// If you want to set a type on the event
dispatch('/another/stream', new Event(type, data))

dispatch is a global method that dispatches side effects, like events to other streams or requests to the network. It accepts a single argument, which can either be an Event or a Request object. dispatch does not return anything.

When dispatching events, dispatches cannot be circular. When events are dispatched to another stream, the new “child” event keeps a record of the stream that dispatched it. This event, as well as any of its children, cannot trigger a subsequent reducer from dispatching an event to its parent streams. This prevents infinite loops.

When dispatching events from a reducer to another stream, authentication is managed by Pipet, as secrets should not be stored in the reducer code. If the stream is public, no authentication is required. If the stream requires authentication and the reducer has not been granted them, dispatches will error in that stream’s logs until permissions are granted.

Event: a constructor for an event object if you want to specify event properties. The Event constructor takes two arguments: type and data. If data is not a JavaScript primitive, it will be stringified into JSON and sent with Content-Type: application/json. You can designate classes (such as Map and FormData) to be stringified and parsed based on toJSON and constructor methods.

You cannot force a timestamp on events dispatched from a reducer.

request #

Unlike dispatch, requests cannot be recalled and replayed. All calls are voided during backfill to prevent a thundering herd of requests. Because of this behavior, requests are ideally idempotent and used to push state to other systems, not to trigger mutations.

request('https://example.org', {method: 'POST', body: 'Hello, world!'})

request parameters are identical to fetch but unlike fetch, it does not return a value. Requests made have the stream’s reducer URL as the Referer. Requests push data to other systems. Response statuses are logged, but the response body is not captured. Reducers are determistic state functions that query and aggregate events, not ingest additional data.

mail #

// sends an email from account@pipet.io to eric@example.org
mail('eric@example.org', {subject: 'hi'})

Modified builtins #

Math.random is not random. It is seeded so that calls are random but deterministic within a reducer.

Date.now returns 0. Use event.timestamp instead.

eval is disabled.

console.log sends messages to the stream log with type info.

console.error is like console.log but will create log events with type error.

Logs #

Every reducer has a builtin log at pipet.io/account/stream/logs(reducer). console.log calls, errors, and side effects are all logged there. The reducer logs are streams. You cannot attach reducers to log streams but you can pipe them into your own streams.

Running reducers #

Reducers are processed “at head” and “in backfill”. When reducers are processed at head, they are run against the latest event in the stream. When new reducers are added to a stream, they run in backfill until they catch up with the stream head, at which point they switch to head mode. If a backfilling reducer has any dependencies, they are also run but as ‘pure functions’, voiding any side effects.

Reducers pause when a reducer (or any of its dependencies) throw a runtime error. Paused reducers are not run until they are manually resumed by their owners. When resumed, they will run in backfill mode until they catch up with the head.

Paused reducers are separate from completed reducers, which reach the end of their range. Stopped reducers are not “run” but their state remains available to dependencies during processing. You can restart completed reducers by extending or removing their end range. This effectively kicks off a backfill.

State checkpoints #

You can create state checkpoints, which will snapshot state at certain points in the stream, so that if you need to resume a reducer, you can start from the last checkpoint. State checkpoints can be automatically created on when reducers pause or complete or per GB processed.

Errors #

Errors pause the reducer and any dependent reducers, putting them in an error state at a certain index. You can try to restart the reducer and dependents will also be restarted.

idx:0     idx:10
A   D G   A   D G
|\  |     |\
B C E     B C E
   \|        \|
    F         F

In this dependency, if C errors at index 10, C is put into an error state, but F is paused. In the meantime, A, B, D and E can still continue to process the stream.

When an error happens, we checkpoint state, meaning we store a snapshot. If reducer G is added later, it should checkpoint as it backfills. This ensures we don’t need to re-run the entire stream to get the state of all reducers when restarting an errored reducer.

C will have error state. F enters blocked state, awaiting C. If you attempt to restart C, all dependencies and dependents are automatically rerun. Without checkpoints, you need to re-run almost the entire dependency tree (A, C, D, E, and F), but with checkpoints, you literally only need to run the errored reducer, its dependencies (in pure mode) and its blocked dependents (in impure mode).

Updating reducers #

Versions #

Reducers have a version history.

// version 1, runs for events 0-5
(state, event) => state + 1

// version 2, runs for events 6-10
(state, event) => state + 2

// version 2, runs for events 11 onwards
(state, event) => state + 3

// if the current reducer state cursor is at 7, but the stream latest is at 12, we need to get version 2 and 3 to process the next batch. current version and all versions where start > state.cursor and start < stream.latest.

Static vs continuous reducers #

Are all reducers continuous? No. The stream object can also have constant values. These are also accessible at runtime, just like other reducer states are, or can be accessed via HTTP. They are similar to reducers, but have a bounded range.

Creating new reducers #

When creating a reducer, you can choose its start and end indexes. Once attached, the reducer starts with backfilling. Backfill is made complicated by dependencies. When backfilling, you re-run other dependencies in “pure” mode to prevent side effects. The reducer itself is optionally run in side-effect mode, but depending on the mutability of the downstream, those side effects may be overritten by future. Critically, when re-running dependencies, you also use the version history.

When backfilling, dispatched events are appended to a splice request instead of being immediately dispatched to the target streams. Dispatched requests are optionally processed, but default have no effect when backfilling.

Reducer range does not need to be the entire stream. You can set a reducer’s range to be a discrete range (say the first 100 events in the stream) and potentially extend that range later, once you’ve confirmed state to be correct.

Update and Delete #

Updating a reducer can be done going forward, picking up with the same state cursor at the next reduce loop.

Deleting a reducer can but does not need to delete any downstream dispatched events. It can issue a splice request to any recipent streams, which can be merged in by the owner of that stream.

Fork & Merge #

You can pick a place in a reducer’s version history to “fork” a new reducer. This replicates the reducer and can be used to gracefully update by allowing the forked reducer to first backfill.

Initial state #

You can push new reducer state. The reducer will then start reducing the stream.

Reserved names #

Must be valid JavaScript identifiers and cannot use the following reserved names:

  • length