HTTP #
Data in event streams are accessed over HTTP. This API is designed to be easily usable with or without SDKs.
Note, in all the following HTTP examples, some headers (like Content-Length
or Host
) are omitted for brevity.
Push events #
Response is the index of the event in the stream. You can specify event metadata with either HTTP headers or query parameters, with the former taking precedence.
POST /user/stream HTTP/1.1
Hello, world!
fetch('https://pipet.io/user/stream', {method: 'POST', body: 'Hello, world!'})
curl -d 'Hello, world!' https://pipet.io/user/stream
Bulk push #
You can send multiple events in a single request with multipart/*
, ignoring the subtype form-data
or alternative
(those are still considered a single event). The header Accept: application/json
is assumed and the response includes a JSON array of stream indexes.
POST /user/stream HTTP/1.1
Content-Type: multipart/mixed; boundary=uuid-boundary
A preamble can be included.
--uuid-boundary
Hello, world!
--uuid-boundary
Hi, again!
--uuid-boundary--
fetch('pipet.io/user/stream', {
method: 'POST',
body: `--uuid-boundary
Hello, world!
--uuid-boundary
Hi, again!
--uuid-boundary--`,
headers: {
'Content-Type': 'multipart/mixed; boundary=uuid-boundary'
},
})
curl -d $'--uuid-boundary\n\nHello, world!\n--uuid-boundary\n\n\Hi, again!\n--uuid-boundary--\n' \
-H 'Content-Type: multipart/mixed; boundary=uuid-boundary' \
https://pipet.io/user/stream
Form data also uses multipart
MIME type, but the Content-Type: multipart/form-data
is always parsed as a single event.
Setting event type #
Assuming you’ve created an event type click
:
{
"properties": {
"url": { "type": "string" },
"xpath": { "elements": {"type": "string" } }
},
"additionalProperties": true
}
You can use the syntax /acct/strm:type
to set the event type.
POST /user/stream:click HTTP/1.1
Content-Type: application/json
{
"url": "https://pipet.io",
"xpath": ["html", "body", "div", "a"]
}
fetch('https://pipet.io/user/stream:click', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
url: 'https://pipet.io',
xpath: ['html', 'body', 'div', 'a']
})
})
curl -d '{"url": "https://pipet.io", "xpath": ["html", "body", "div", "a"]}' \
-H 'Content-Type: application/json' \
https://pipet.io/user/stream:click
Forced timestamp #
You can set the timestamp on an event. If the timestamp is less than the stream’s current timestamp, the event will be rejected. This is useful when backfilling external events into Pipet.
POST /user/stream HTTP/1.1
Timestamp: 1677633286640
Hello, world!
As a general rule, force-timestamping (or stamping for short) events is a bad idea and should only be useful for backfilling external streams. While a stream is being backfilled, no other “live” sources should be dispatching to the same stream. Stream timestamps are required to be monotonically increasing and if the event violates that, it will be rejected.
At-most-once sending #
You can attach an event index for “at most once” message sending. This is useful if you need to guarantee not only order, but exact index positions of the stream and there is only one client sending the events. Like forced timestamping, this is generally a bad idea.
POST /user/stream[1] HTTP/1.1
Hello, world!
You send the POST
request to the stream’s index. If that index is not exactly the stream’s next index, the event will be rejected.
List or stream events #
The easiest way to get an event a GET request
GET /user/stream[4] HTTP/1.1
Accept: application/json
fetch('https://pipet.io/user/stream[4]')
curl https://pipet.io/user/stream[4]
You can configure the returned format with the Accept
header with text/event-stream
, application/json
, text/csv
.
Query parameters
eventType: string
since: timestamp
until: timestamp
from: integer, minimum index for range
to: integer, maximum index for range, not inclusive
limit: number of events
JavaScript chainings
You can filter with JavaScript function chains: https://pipet.io/user/stream.slice(2, 38).eventType('click')
.
.eventType('text/plain', ...)
.range(since, until)
.slice(from, to)
.limit(number)
EventSource
You can listen for new events on the stream. This notifies you of event id’s and it is up to you to GET request from the stream. EventSource supports filtering notifications based on event type.
GET /user/stream HTTP/1.1
Connection: keep-alive
Accept: text/event-stream
new EventSource('https://pipet.io/user/stream')
curl -H 'Accept: text/event-stream' 'https://pipet.io/user/stream'
Response
Based on Accept
header:
HTTP/1.1 200 OK
Content-Type: text/event-stream
Connection: keep-alive
id: 3
HTTP/1.1 200 OK
Content-Type: application/json
[
{
"id": 3,
"timestamp": 1650661388844,
"event": "text/plain",
"data": "Hello, world!"
}
]
HTTP/1.1 200 OK
Content-Type: text/csv
id,timestamp,event,data
3,1650661388844,text/plain,"Hello, world!"
Reducer State #
Stream reducer state is accessible over HTTP at https://pipet.io/acct/stream.reducer
GET /user/stream.reducer HTTP/1.1
fetch('https://pipet.io/user/stream.reducer')
curl https://pipet.io/user/stream.reducer
Response
HTTP/1.1 200 OK
Content-Type: application/json
{"Hello": 1}
Authentication #
Streams have read, write, and execute permissions. Authorization can be via auth token, API key, or HMAC. Other accounts can be granted rwx permissions on streams. Streams can also enable PUBLIC access, but this should only be done for demonstration purposes.
Streams can use HMAC authentication to ensure only authorized users can access substreams. Streams have private keys that can be passed to server-side applications. Those applications can then generate HMAC signatures against the substream path to ensure the client is authorized to write events to this substream. A stream can have muliple keys and they can be created on a per-stream basis to allow for rotation.
Cookies are not used because pipet has no awareness of session. Headers are always respected over query args. HMAC hash function is SHA256 by default.
HMAC Authentication #
We sign with HMAC SHA-256 on the path, passing along values in the [HTTP Signature Header]. keyId
and algorithm
parameters are optional, but the headers
and signature
parameters are not. If passing signature through a query parameter, you only need to provide the signature itself, though keyId
may also be passed.
POST /user/stream(1) HTTP/1.1
Signature: keyId="pipet-key-id", algorithm="hmac-sha256", headers="(request-target)", signature="bd28ee142ca5b46259f6e27fc3a4216f447bd5843c406e63219cff30e73b135b"
Hello, world!
// signature hashed by server
const signature = 'bd28ee142ca5b46259f6e27fc3a4216f447bd5843c406e63219cff30e73b135b';
const key = 'pipet-key-id';
fetch('https://pipet.io/user/stream', {
method: 'POST',
headers: {
'Signature': `keyId="${key}", algorithm="hmac-sha256", headers="(request-target)", signature="${signature}"`
}, body: 'Hello, world!'})
// alternatively, using sendBeacon and URL query params
navigator.sendBeacon(`https://pipet.io/user/stream?key=${key}&signature=${signature}`,
'Hello, world!')
curl -X POST -d 'Hello, world!' \
-H 'Event-Type: text/plain' \
-H 'Signature: keyId="pipet-key-id", algorithm="hmac-sha256", headers="(request-target)", signature="bd28ee142ca5b46259f6e27fc3a4216f447bd5843c406e63219cff30e73b135b"' \
https://pipet.io/user/stream
#
Pipet has the optional shorthands for all event property in the path, so all these things combine in crazy ways.
fetch('https://pipet.io/eric/greetings(1):click[2]', {body: 'Hello, world!'})
In this case, we’re sending a greeting “Hello, world!” to a substream, setting the type to click, and forcing the index to 2. Based on the :click
event type, Pipet can discern that the content type is application/json
. We’re also passing the key and signature as query parameters for authentication.