HTTP

HTTP #

Data in event streams are accessed over HTTP. This API is designed to be easily usable with or without SDKs.

Note, in all the following HTTP examples, some headers (like Content-Length or Host) are omitted for brevity.

Push events #

Response is the index of the event in the stream. You can specify event metadata with either HTTP headers or query parameters, with the former taking precedence.

POST /user/stream HTTP/1.1

Hello, world!
fetch('https://pipet.io/user/stream', {method: 'POST', body: 'Hello, world!'})
curl -d 'Hello, world!' https://pipet.io/user/stream

Bulk push #

You can send multiple events in a single request with multipart/*, ignoring the subtype form-data or alternative (those are still considered a single event). The header Accept: application/json is assumed and the response includes a JSON array of stream indexes.

POST /user/stream HTTP/1.1
Content-Type: multipart/mixed; boundary=uuid-boundary

A preamble can be included.
--uuid-boundary

Hello, world!
--uuid-boundary

Hi, again!
--uuid-boundary--
fetch('pipet.io/user/stream', {
  method: 'POST',
  body: `--uuid-boundary

Hello, world!
--uuid-boundary

Hi, again!
--uuid-boundary--`,
  headers: {
    'Content-Type': 'multipart/mixed; boundary=uuid-boundary'
  },
})
curl -d $'--uuid-boundary\n\nHello, world!\n--uuid-boundary\n\n\Hi, again!\n--uuid-boundary--\n' \
  -H 'Content-Type: multipart/mixed; boundary=uuid-boundary' \
  https://pipet.io/user/stream

Form data also uses multipart MIME type, but the Content-Type: multipart/form-data is always parsed as a single event.

Setting event type #

Assuming you’ve created an event type click:

{
  "properties": {
    "url": { "type": "string" },
    "xpath": { "elements": {"type": "string" } }
  },
  "additionalProperties": true
}

You can use the syntax /acct/strm:type to set the event type.

POST /user/stream:click HTTP/1.1
Content-Type: application/json

{
  "url": "https://pipet.io",
  "xpath": ["html", "body", "div", "a"]
}
fetch('https://pipet.io/user/stream:click', {
  method: 'POST',
  headers: { 'Content-Type': 'application/json' },
  body: JSON.stringify({
    url: 'https://pipet.io',
    xpath: ['html', 'body', 'div', 'a']
  })
})
curl -d '{"url": "https://pipet.io", "xpath": ["html", "body", "div", "a"]}' \
  -H 'Content-Type: application/json' \
  https://pipet.io/user/stream:click

Forced timestamp #

You can set the timestamp on an event. If the timestamp is less than the stream’s current timestamp, the event will be rejected. This is useful when backfilling external events into Pipet.

POST /user/stream HTTP/1.1
Timestamp: 1677633286640

Hello, world!

As a general rule, force-timestamping (or stamping for short) events is a bad idea and should only be useful for backfilling external streams. While a stream is being backfilled, no other “live” sources should be dispatching to the same stream. Stream timestamps are required to be monotonically increasing and if the event violates that, it will be rejected.

At-most-once sending #

You can attach an event index for “at most once” message sending. This is useful if you need to guarantee not only order, but exact index positions of the stream and there is only one client sending the events. Like forced timestamping, this is generally a bad idea.

POST /user/stream[1] HTTP/1.1

Hello, world!

You send the POST request to the stream’s index. If that index is not exactly the stream’s next index, the event will be rejected.

List or stream events #

The easiest way to get an event a GET request

GET /user/stream[4] HTTP/1.1
Accept: application/json
fetch('https://pipet.io/user/stream[4]')
curl https://pipet.io/user/stream[4]

You can configure the returned format with the Accept header with text/event-stream, application/json, text/csv.

Query parameters

eventType: string
since: timestamp
until: timestamp
from: integer, minimum index for range
to: integer, maximum index for range, not inclusive
limit: number of events

JavaScript chainings

You can filter with JavaScript function chains: https://pipet.io/user/stream.slice(2, 38).eventType('click').

.eventType('text/plain', ...)
.range(since, until)
.slice(from, to)
.limit(number)

EventSource

You can listen for new events on the stream. This notifies you of event id’s and it is up to you to GET request from the stream. EventSource supports filtering notifications based on event type.

GET /user/stream HTTP/1.1
Connection: keep-alive
Accept: text/event-stream
new EventSource('https://pipet.io/user/stream')
curl -H 'Accept: text/event-stream' 'https://pipet.io/user/stream'

Response

Based on Accept header:

HTTP/1.1 200 OK
Content-Type: text/event-stream
Connection: keep-alive

id: 3
HTTP/1.1 200 OK
Content-Type: application/json

[
  {
    "id": 3,
    "timestamp": 1650661388844,
    "event": "text/plain",
    "data": "Hello, world!"
  }
]
HTTP/1.1 200 OK
Content-Type: text/csv

id,timestamp,event,data
3,1650661388844,text/plain,"Hello, world!"

Reducer State #

Stream reducer state is accessible over HTTP at https://pipet.io/acct/stream.reducer

GET /user/stream.reducer HTTP/1.1
fetch('https://pipet.io/user/stream.reducer')
curl https://pipet.io/user/stream.reducer

Response

HTTP/1.1 200 OK
Content-Type: application/json

{"Hello": 1}

Authentication #

Streams have read, write, and execute permissions. Authorization can be via auth token, API key, or HMAC. Other accounts can be granted rwx permissions on streams. Streams can also enable PUBLIC access, but this should only be done for demonstration purposes.

Streams can use HMAC authentication to ensure only authorized users can access substreams. Streams have private keys that can be passed to server-side applications. Those applications can then generate HMAC signatures against the substream path to ensure the client is authorized to write events to this substream. A stream can have muliple keys and they can be created on a per-stream basis to allow for rotation.

Cookies are not used because pipet has no awareness of session. Headers are always respected over query args. HMAC hash function is SHA256 by default.

HMAC Authentication #

We sign with HMAC SHA-256 on the path, passing along values in the [HTTP Signature Header]. keyId and algorithm parameters are optional, but the headers and signature parameters are not. If passing signature through a query parameter, you only need to provide the signature itself, though keyId may also be passed.

POST /user/stream(1) HTTP/1.1
Signature: keyId="pipet-key-id", algorithm="hmac-sha256", headers="(request-target)", signature="bd28ee142ca5b46259f6e27fc3a4216f447bd5843c406e63219cff30e73b135b"

Hello, world!
// signature hashed by server
const signature = 'bd28ee142ca5b46259f6e27fc3a4216f447bd5843c406e63219cff30e73b135b';
const key = 'pipet-key-id';

fetch('https://pipet.io/user/stream', {
  method: 'POST',
  headers: {
    'Signature': `keyId="${key}", algorithm="hmac-sha256", headers="(request-target)", signature="${signature}"`
}, body: 'Hello, world!'})

// alternatively, using sendBeacon and URL query params
navigator.sendBeacon(`https://pipet.io/user/stream?key=${key}&signature=${signature}`,
  'Hello, world!')
curl -X POST -d 'Hello, world!' \
  -H 'Event-Type: text/plain' \
  -H 'Signature: keyId="pipet-key-id", algorithm="hmac-sha256", headers="(request-target)", signature="bd28ee142ca5b46259f6e27fc3a4216f447bd5843c406e63219cff30e73b135b"' \  
  https://pipet.io/user/stream

#

Pipet has the optional shorthands for all event property in the path, so all these things combine in crazy ways.

fetch('https://pipet.io/eric/greetings(1):click[2]', {body: 'Hello, world!'})

In this case, we’re sending a greeting “Hello, world!” to a substream, setting the type to click, and forcing the index to 2. Based on the :click event type, Pipet can discern that the content type is application/json. We’re also passing the key and signature as query parameters for authentication.