Streams

Streams #

Pipet is an event stream server with an external HTTP interface and JavaScript processing engine.

Events #

Events are immutable values in the stream. They are appended and accessed with HTTP requests to the stream URL (e.g. https://pipet.io/user/stream). Once in an event stream, events are objects with properties.

  • timestamp: monotonically increasing unix timestamp in milliseconds. Can be overridden by the client.
  • eventType: A string with a maximum length of 100 characters. Assigned with the Event-Type header or the event-type query parameter.
  • contentType: Usually a MIME type, this defines how event data should be parsed. Supported content type’s are application/json, application/x-form-data, text/csv, though other common types will be supported. Assigned with the Content-Type header.
  • data: A string with a maximum size of 100KBs. Assigned with the body of the HTTP request.

Timestamps #

Stream timestamps are guaranteed to be monotonically increasing. Generally, when events are dispatched to the stream, the event timestamp is assigned by the system to the current unix time in milliseconds. Events can be assigned timestamps so long as the timestamps in the stream always monotonically increasing. If you push timestamps in the future, the stream will assign all “un-timestamped” events the latest timestamp.

Mutability #

Events in the stream are immutable. But we should strive to make easy things easy and hard things possible. While you cannot change events once they are in the stream, you can clone a stream in place, replacing the current stream. This has consequences of re-running all reducers and recalling events that were dispatched downstream. Re-creating a stream is a manual operation that must be done by the stream owner by approving a splice request.

Selectors #

Selectors filter which events are passed to reducers or pipes and therefore speed processing. Selectors can filter by event types, index or timestamp ranges, and substreams. Reducers, pipes, server-sent-event connections, webhooks can all be configured to filter the stream with selectors. By default, server-sent events connections and webhooks do not deliver the events themselves, only the latest index and event type.

Controls #

Rate limits #

Rate limits are enforced on the stream level.

Blocklists #

You can blocklist Referers or specific stream factory arguments, which will discard incoming event data.

Read and write access #

Public, private key, or HMAC authentication (for substreams).

Forbidden stream names #

  • this
  • dispatch
  • request

Event Types #

By default, events can be of any type. You can use JSON Type Def to define event types that are validated on stream ingest. This will also generate Typescript types to be used in Typescript reducers or are accessible at https://pipet.io/acct/strm:eventType. You can access the JSON Type Def or Typescript interfaces with different file suffixes on the URL: .json or .ts.

{"properties": {"id": {"type": "int64"}}}

Types can be added to a stream later and can be applied to events going forward. However, typed events cannot be backfilled, as events are immutable. However, you can create a new typed stream and attach a reducer to the original stream that applies types to events and dispatches them to the new stream.

A type does not travel with a reducer. Instead, it is assigned by the stream on ingest. However, streams can import type definitions from other streams into its namespace. When events are piped, typed events retain their original type, accessible within TypeScript reducers as account.stream.type.

Substreams #

You sometimes need to be able to dynamically create n-number of streams. For instance, you may want to create an events stream per customer so each only has permissions to write to their own stream and read reducer state. However, you want all of those streams to inherit reducers, pipes, and types.

To address this, you can create a “stream factory” instead of a stream, which accepts arguments to dynamically create substreams https://pipet.io/eric/events(1). Substreams can be dynamically created by POSTing to the URL. Stream factory arguments must be defined as a JavaScript primitives (string, number, boolean, null). Each substream has its own log and reducers state is partitioned per sub-stream.

Stream factories accept not only primitives but also identifiers. These identifiers reference Pipet accounts. A substream /eric/events(bob) is an “account substream” and stream factories can grant access read or write access to account substreams en-masse without issuing permissions or access tokens per substream.

Stream URLs #

https://pipet.io/acct/strm
https://pipet.io/acct/strm.reducer

https://pipet.io/acct/strm('arg')
https://pipet.io/acct/strm('arg').reducer

https://pipet.io/acct/strm/pipe/usr2/strm2
https://pipet.io/acct/strm/logs
https://pipet.io/acct/strm/logs/pipe/usr2/strm2

Consumers #

Webhooks or EventSource are stream consumers. By default, they deliver the latest id; it remains up to the consumer to request event ranges from the stream. For webhooks, response status should be in the 200 range.

Webhooks or server-sent events can be configured to deliver the event data itself, though these consumers run a risk of being overwhelmed by the volume of events.

Builtin streams #

Pipet includes some builtin streams.

/pipet/cron(’expr') #

cron stream is public readable but not writeable. It takes a cron expression as a string argument /pipet/cron('0 9 * * 1') receives a time event with the unixtimestamp every Monday at 9am. You generally will pipe from this stream to a stream you own.

/pipet/events(account) #

Account events are readable by the account but not writeable. Events include any mutation to the account, such as adding a new stream or removing a member.