Getting Started

@skyware/jetstream is a utility library for consuming data from a Jetstream instance.

Setup

import { class Jetstream<WantedCollections extends CollectionOrWildcard = CollectionOrWildcard, ResolvedCollections extends Collection = ResolveLexiconWildcard<WantedCollections>>
The Jetstream client.
Jetstream
} from "@skyware/jetstream";
const const jetstream: Jetstream<CollectionOrWildcard, Collection>jetstream = new new Jetstream<CollectionOrWildcard, Collection>(options?: JetstreamOptions<CollectionOrWildcard> | undefined): Jetstream<...>
The Jetstream client.
Jetstream
();

The Jetstream class takes an object parameter with the following properties:

  • wantedCollections: An array of collections to subscribe to events for. This can contain any collection name or wildcard strings such as app.bsky.feed.* to receive events for all collections whose name starts with app.bsky.feed.. If not provided or empty, you will receive events for all collections.
  • wantedDids: An array of DIDs to subscribe to events for. If not provided or empty, you will receive events for all DIDs.
  • cursor: The Unix timestamp in microseconds to start listening from. A cursor is included in every event emitted as the time_us property. If you don’t provide a cursor, the class will start listening from the most recent event.
  • endpoint: The subscription URL of the Jetstream instance to connect to. Defaults to wss://jetstream.atproto.tools/subscribe.

Handling events

const jetstream: Jetstream<CollectionOrWildcard, Collection>jetstream.Jetstream<CollectionOrWildcard, Collection>.onCreate<"app.bsky.feed.post">(collection: "app.bsky.feed.post", listener: (event: CommitCreateEvent<"app.bsky.feed.post">) => void): void
Listen for records created in a specific collection.
@paramcollection The name of the collection to listen for.@paramlistener A callback function that receives the commit event.
onCreate
("app.bsky.feed.post", (event: CommitCreateEvent<"app.bsky.feed.post">event) => {
var console: Consoleconsole.Console.log(...data: any[]): voidlog("New post:", event: CommitCreateEvent<"app.bsky.feed.post">event.CommitCreateEvent<"app.bsky.feed.post">.commit: CommitCreate<"app.bsky.feed.post">commit.CommitCreate<"app.bsky.feed.post">.record: AppBskyFeedPost.Recordrecord.AppBskyFeedPost.Record.text: string
The primary post content. May be an empty string, if there are embeds. \ Maximum string length: 3000 \ Maximum grapheme length: 300
text
)
});

Jetstream allows you to subscribe to a filtered feed of events related to specific collections. The Jetstream class has three useful methods for listening for commits:

  • onCreate: Listen for new records created in a collection.
  • onUpdate: Listen for updated records in a collection.
  • onDelete: Listen for deleted records in a collection.

The class also emits broader events.

import { type CommitType = "c" | "u" | "d"
const CommitType: {
    readonly Create: "c";
    readonly Update: "u";
    readonly Delete: "d";
}
The types of commits that can be received.
@enum
CommitType
} from "@skyware/jetstream";
// Listen for all commits, regardless of collection const jetstream: Jetstream<CollectionOrWildcard, Collection>jetstream.Jetstream<CollectionOrWildcard, Collection>.on(event: "commit", listener: (event: CommitEvent<Collection>) => void): Jetstream<CollectionOrWildcard, Collection> (+6 overloads)
Emitted when any commit is received.
on
("commit", (event: CommitEvent<Collection>event) => {
if (event: CommitEvent<Collection>event.CommitEvent<Collection>.commit: Commit<Collection>commit.type: "c" | "u" | "d"type === const CommitType: { readonly Create: "c"; readonly Update: "u"; readonly Delete: "d"; }
The types of commits that can be received.
@enum
CommitType
.type Create: "c"
A record was created.
Create
) {
var console: Consoleconsole.Console.log(...data: any[]): voidlog("create in ", event: CommitEvent<Collection>event.CommitEvent<Collection>.commit: CommitCreate<Collection>commit.CommitBase<Collection>.collection: Collectioncollection, event: CommitEvent<Collection>event.CommitEvent<Collection>.commit: CommitCreate<Collection>commit.CommitCreate<Collection>.record: AppBskyActorProfile.Record | AppBskyFeedGenerator.Record | AppBskyFeedLike.Record | AppBskyFeedPost.Record | ... 11 more ... | { ...; }record); } else if (event: CommitEvent<Collection>event.CommitEvent<Collection>.commit: CommitUpdate<Collection> | CommitDelete<Collection>commit.type: "u" | "d"type === const CommitType: { readonly Create: "c"; readonly Update: "u"; readonly Delete: "d"; }
The types of commits that can be received.
@enum
CommitType
.type Update: "u"
A record was updated.
Update
) {
var console: Consoleconsole.Console.log(...data: any[]): voidlog("update in", event: CommitEvent<Collection>event.CommitEvent<Collection>.commit: CommitUpdate<Collection>commit.CommitBase<Collection>.collection: Collectioncollection, event: CommitEvent<Collection>event.CommitEvent<Collection>.commit: CommitUpdate<Collection>commit.CommitBase<Collection>.rkey: stringrkey); } else if (event: CommitEvent<Collection>event.CommitEvent<Collection>.commit: CommitDelete<Collection>commit.CommitDelete<Collection>.type: "d"type === const CommitType: { readonly Create: "c"; readonly Update: "u"; readonly Delete: "d"; }
The types of commits that can be received.
@enum
CommitType
.type Delete: "d"
A record was deleted.
Delete
) {
var console: Consoleconsole.Console.log(...data: any[]): voidlog("delete in", event: CommitEvent<Collection>event.CommitEvent<Collection>.commit: CommitDelete<Collection>commit.CommitBase<Collection>.collection: Collectioncollection, event: CommitEvent<Collection>event.CommitEvent<Collection>.commit: CommitDelete<Collection>commit.CommitBase<Collection>.rkey: stringrkey); } }); // Listen for account status updates const jetstream: Jetstream<CollectionOrWildcard, Collection>jetstream.Jetstream<CollectionOrWildcard, Collection>.on(event: "account", listener: (event: AccountEvent) => void): Jetstream<CollectionOrWildcard, Collection> (+6 overloads)
Emitted when an account is updated.
on
("account", (event: AccountEventevent) => {
var console: Consoleconsole.Console.log(...data: any[]): voidlog("account update", event: AccountEventevent.AccountEvent.account: ComAtprotoSyncSubscribeRepos.Accountaccount.ComAtprotoSyncSubscribeRepos.Account.status?: (string & {}) | "deactivated" | "deleted" | "suspended" | "takendown" | undefined
If active=false, this optional field indicates a reason for why the account is not active.
status
)
}); // Listen for identity updates const jetstream: Jetstream<CollectionOrWildcard, Collection>jetstream.Jetstream<CollectionOrWildcard, Collection>.on(event: "identity", listener: (event: IdentityEvent) => void): Jetstream<CollectionOrWildcard, Collection> (+6 overloads)
Emitted when an identity event is received.
on
("identity", (event: IdentityEventevent) => {
var console: Consoleconsole.Console.log(...data: any[]): voidlog("identity update", event: IdentityEventevent.IdentityEvent.account: ComAtprotoSyncSubscribeRepos.Identityaccount.ComAtprotoSyncSubscribeRepos.Identity.did: `did:${string}`did) });

Using Jetstream over a direct Relay connection can help you save bandwidth and only receive the events you care about.

A Note on Types

When using method such as onCreate, the type of the event will be inferred from the collection name. However, you may encounter an error resulting in missing properties on the record object. Make sure that your tsconfig.json has the following:

{
	"compilerOptions": {
		"moduleResolution": "node16", // or nodenext
	}
}

Event Reference

The Jetstream class may emit the following events:

Update events

EventDescription
commitRepresents a commit to a user’s repository.
identityRepresents a change to an account’s identity. Could be an updated handle, signing key, or PDS hosting endpoint.
accountRepresents a change to an account’s status. The account may be deactivated, suspended, or deleted.

System events

EventDescription
openEmitted when the websocket connection is opened.
closeEmitted when the websocket connection is closed.
errorEmitted when an error occurs while handling a message.