Getting Started
@skyware/jetstream
is a utility library for consuming data from a Jetstream instance.
Setup
import { class Jetstream<WantedCollections extends CollectionOrWildcard = CollectionOrWildcard, ResolvedCollections extends Collection = ResolveLexiconWildcard<WantedCollections>>
The Jetstream client.Jetstream } from "@skyware/jetstream";
const const jetstream: Jetstream<CollectionOrWildcard, Collection>
jetstream = new new Jetstream<CollectionOrWildcard, Collection>(options?: JetstreamOptions<CollectionOrWildcard> | undefined): Jetstream<...>
The Jetstream client.Jetstream();
const jetstream: Jetstream<CollectionOrWildcard, Collection>
jetstream.Jetstream<CollectionOrWildcard, Collection>.start(): void
Opens a WebSocket connection to the server.start();
The Jetstream
class takes an object parameter with the following properties:
wantedCollections
: An array of collections to subscribe to events for. This can contain any collection name or wildcard strings such asapp.bsky.feed.*
to receive events for all collections whose name starts withapp.bsky.feed.
. If not provided or empty, you will receive events for all collections.wantedDids
: An array of DIDs to subscribe to events for. If not provided or empty, you will receive events for all DIDs.cursor
: The Unix timestamp in microseconds to start listening from. A cursor is included in every event emitted as thetime_us
property. If you don’t provide a cursor, the class will start listening from the most recent event.endpoint
: The subscription URL of the Jetstream instance to connect to. Defaults towss://jetstream1.us-east.bsky.network/subscribe
.
To begin listening for events, call the start method.
Handling events
const jetstream: Jetstream<CollectionOrWildcard, Collection>
jetstream.Jetstream<CollectionOrWildcard, Collection>.onCreate<"app.bsky.feed.post">(collection: "app.bsky.feed.post", listener: (event: CommitCreateEvent<"app.bsky.feed.post">) => void): void
Listen for records created in a specific collection.onCreate("app.bsky.feed.post", (event: CommitCreateEvent<"app.bsky.feed.post">
event) => {
var console: Console
console.Console.log(...data: any[]): void
log("New post:", event: CommitCreateEvent<"app.bsky.feed.post">
event.CommitCreateEvent<"app.bsky.feed.post">.commit: CommitCreate<"app.bsky.feed.post">
commit.CommitCreate<"app.bsky.feed.post">.record: AppBskyFeedPost.Record
record.AppBskyFeedPost.Record.text: string
The primary post content. May be an empty string, if there are embeds. \
Maximum string length: 3000 \
Maximum grapheme length: 300text)
});
Jetstream allows you to subscribe to a filtered feed of events related to specific collections. The Jetstream class has three useful methods for listening for commits:
- onCreate: Listen for new records created in a collection.
- onUpdate: Listen for updated records in a collection.
- onDelete: Listen for deleted records in a collection.
The class also emits broader events.
import { type CommitType = "create" | "update" | "delete"
const CommitType: {
readonly Create: "create";
readonly Update: "update";
readonly Delete: "delete";
}
The types of commits that can be received.CommitType } from "@skyware/jetstream";
// Listen for all commits, regardless of collection
const jetstream: Jetstream<CollectionOrWildcard, Collection>
jetstream.Jetstream<CollectionOrWildcard, Collection>.on(event: "commit", listener: (event: CommitEvent<Collection>) => void): Jetstream<CollectionOrWildcard, Collection> (+6 overloads)
Emitted when any commit is received.on("commit", (event: CommitEvent<Collection>
event) => {
if (event: CommitEvent<Collection>
event.CommitEvent<Collection>.commit: Commit<Collection>
commit.operation: "create" | "update" | "delete"
operation === const CommitType: {
readonly Create: "create";
readonly Update: "update";
readonly Delete: "delete";
}
The types of commits that can be received.CommitType.type Create: "create"
A record was created.Create) {
var console: Console
console.Console.log(...data: any[]): void
log("create in ", event: CommitEvent<Collection>
event.CommitEvent<Collection>.commit: CommitCreate<Collection>
commit.CommitBase<Collection>.collection: Collection
collection, event: CommitEvent<Collection>
event.CommitEvent<Collection>.commit: CommitCreate<Collection>
commit.CommitCreate<Collection>.record: AppBskyActorProfile.Record | AppBskyFeedGenerator.Record | AppBskyFeedLike.Record | AppBskyFeedPost.Record | ... 11 more ... | {
...;
}
record);
} else if (event: CommitEvent<Collection>
event.CommitEvent<Collection>.commit: CommitUpdate<Collection> | CommitDelete<Collection>
commit.operation: "update" | "delete"
operation === const CommitType: {
readonly Create: "create";
readonly Update: "update";
readonly Delete: "delete";
}
The types of commits that can be received.CommitType.type Update: "update"
A record was updated.Update) {
var console: Console
console.Console.log(...data: any[]): void
log("update in", event: CommitEvent<Collection>
event.CommitEvent<Collection>.commit: CommitUpdate<Collection>
commit.CommitBase<Collection>.collection: Collection
collection, event: CommitEvent<Collection>
event.CommitEvent<Collection>.commit: CommitUpdate<Collection>
commit.CommitBase<Collection>.rkey: string
rkey);
} else if (event: CommitEvent<Collection>
event.CommitEvent<Collection>.commit: CommitDelete<Collection>
commit.CommitDelete<Collection>.operation: "delete"
operation === const CommitType: {
readonly Create: "create";
readonly Update: "update";
readonly Delete: "delete";
}
The types of commits that can be received.CommitType.type Delete: "delete"
A record was deleted.Delete) {
var console: Console
console.Console.log(...data: any[]): void
log("delete in", event: CommitEvent<Collection>
event.CommitEvent<Collection>.commit: CommitDelete<Collection>
commit.CommitBase<Collection>.collection: Collection
collection, event: CommitEvent<Collection>
event.CommitEvent<Collection>.commit: CommitDelete<Collection>
commit.CommitBase<Collection>.rkey: string
rkey);
}
});
// Listen for account status updates
const jetstream: Jetstream<CollectionOrWildcard, Collection>
jetstream.Jetstream<CollectionOrWildcard, Collection>.on(event: "account", listener: (event: AccountEvent) => void): Jetstream<CollectionOrWildcard, Collection> (+6 overloads)
Emitted when an account is updated.on("account", (event: AccountEvent
event) => {
var console: Console
console.Console.log(...data: any[]): void
log("account update", event: AccountEvent
event.AccountEvent.account: ComAtprotoSyncSubscribeRepos.Account
account.ComAtprotoSyncSubscribeRepos.Account.status?: (string & {}) | "deactivated" | "deleted" | "suspended" | "takendown" | undefined
If active=false, this optional field indicates a reason for why the account is not active.status)
});
// Listen for identity updates
const jetstream: Jetstream<CollectionOrWildcard, Collection>
jetstream.Jetstream<CollectionOrWildcard, Collection>.on(event: "identity", listener: (event: IdentityEvent) => void): Jetstream<CollectionOrWildcard, Collection> (+6 overloads)
Emitted when an identity event is received.on("identity", (event: IdentityEvent
event) => {
var console: Console
console.Console.log(...data: any[]): void
log("identity update", event: IdentityEvent
event.IdentityEvent.identity: ComAtprotoSyncSubscribeRepos.Identity
identity.ComAtprotoSyncSubscribeRepos.Identity.did: `did:${string}`
did)
});
Using Jetstream over a direct Relay connection can help you save bandwidth and only receive the events you care about.
A Note on Types
When using method such as onCreate, the type of the event will be inferred from the collection name. However, you may encounter an error resulting in missing properties on the record object. Make sure that your tsconfig.json
has the following:
{
"compilerOptions": {
"moduleResolution": "node16", // or nodenext
}
}
Event Reference
The Jetstream
class may emit the following events:
Update events
Event | Description |
---|---|
commit | Represents a commit to a user’s repository. |
identity | Represents a change to an account’s identity. Could be an updated handle, signing key, or PDS hosting endpoint. |
account | Represents a change to an account’s status. The account may be deactivated, suspended, or deleted. |
System events
Event | Description |
---|---|
open | Emitted when the websocket connection is opened. |
close | Emitted when the websocket connection is closed. |
error | Emitted when an error occurs while handling a message. |