Snorre.io

Bluesky feed generators giving users algorithmic choice

Earlier in august I wrote about creating a Bluesky based comment system. This came about after I had joined the Bluesky community and was looking for a way to allow other Bluesky users to comment on my blog posts. The comment system should also be so simple that it could be used by other bloggers who wanted to allow Bluesky users to comment on their posts. Since then I’ve enjoyed using Bluesky and have been looking for other ways to contribute to the community. One of the things I wanted to do was to build a feed generator that would allow users to browse Norwegian language posts.

The Bluesky feed generator

So what is a feed generator? One of Bluesky’s main objectives is to provide users algorithmic choice over how they browse the social network. This stands in contrast to other social networks where the algorithm is chosen for you. For example on Twitter (now X) you can’t choose to browse your network in chronological order.

The feed generator is part of the federated nature of the Bluesky social network. Instead of all algorithms being hosted by Bluesky, they can be hosted by anyone. Simple put:

Feed Generators are services that provide custom algorithms to users through the AT Protocol. Source: feed-generator.

Feed generator specification and API

So how do you go about creating a feed generator? A feed generator is an HTTP server that implements a specific protocol using JSON as its data exchange format. The server exposes three endpoints.

/.well-known/did.json

This endpoint is used to discover the feed generator. It returns a JSON document that contains the feed generator’s DID (Decentralized Identifier).

{
  "@context": [
    "https://www.w3.org/ns/did/v1"
  ],
  "id": "did:web:norsky.snorre.io",
  "service": [
    {
      "id": "#bsky_fg",
      "serviceEndpoint": "https://norsky.snorre.io",
      "type": "BskyFeedGenerator"
    }
  ]
}

The id field is the feed generator’s DID. Most feeds use the web identifier which means that the DID points to a hostname. The service field contains the feed generator’s HTTP root endpoint. type specifies that this is a Bluesky feed generator.

/xrpc/app.bsky.feed.describeFeedGenerator

This endpoint is used to describe the feed generator and available feeds. Bluesky will use this endpoint to check which feeds are available after you have registered the feeds. If one of the feeds you’ve registered is not listed here it will not be available to users. Each feed is identified by a URI on the form:

at://did:web:{hostname}/app.bsky.feed.generator/{feed-id}

at:// tells Bluesky that this is an AT Protocol URI. did:web:{hostname} is the feed generator’s DID. app.bsky.feed.generator is the lexicon id for the feed generator. {feed-id} is the feed’s id.

A service would then typically return something like this:

{
  "did": "did:web:norsky.snorre.io",
  "feeds": [
    {
      "uri": "at://did:web:norsky.snorre.io/app.bsky.feed.generator/bokmaal"
    },
    {
      "uri": "at://did:web:norsky.snorre.io/app.bsky.feed.generator/nynorsk"
    },
    {
      "uri": "at://did:web:norsky.snorre.io/app.bsky.feed.generator/sami"
    },
    {
      "uri": "at://did:web:norsky.snorre.io/app.bsky.feed.generator/all"
    }
  ]
}

/xrpc/app.bsky.feed.getFeedSkeleton

This endpoint is used to get a feed skeleton. Feed generators do not return feeds containing the full post data. Instead it returns a list of post ids that Bluesky apps can then flesh out. This has many advantages including saving the feed generator bandwidth and making the PDSs (Personal Data Stores) the single source of truth for post data.

The endpoint takes two parameters feed and cursor, cursor being optional. It takes the form:

{serviceEndpoint}/xrpc/app.bsky.feed.getFeedSkeleton?feed={feed-uri}&cursor={cursor}

An example request URL would look like this:

https://norsky.snorre.io/xrpc/app.bsky.feed.getFeedSkeleton?feed=at://did:web:norsky.snorre.io/app.bsky.feed.generator/bokmaal

This yields a JSON response containing a cursor (if there are more posts to load) and list of post ids:

{
  "feed": [
    {
      "post": "at://did:plc:2ouk4ptm336l2lcce6qxs3ar/app.bsky.feed.post/3kaoxdbnvwv2y"
    },
    {
      "post": "at://did:plc:y5zzasijmapifytqvhxnwsrm/app.bsky.feed.post/3kaoxbtnw7n2x"
    },
    {
      "post": "at://did:plc:jqdbfqiyyne7jkxpqlfacray/app.bsky.feed.post/3kaoxal4n3n2m"
    },
    {
      "post": "at://did:plc:hlxobkvuv64wghyltgbgz6f3/app.bsky.feed.post/3kaoxafag2y2s"
    },
    {
      "post": "at://did:plc:b66g2f6utk25ppjzsujhhqgt/app.bsky.feed.post/3kaox5gojov2q"
    }
  ],
  "cursor": "2482"
}

Note that the cursor is an opaque string that is not meant to be parsed by clients. It can contain any data and is only meant to be used by the feed generator to keep track of where it is in the feed. In the above case the cursor relies on the feed generator having a database of post ids that work in a sequential manner.

There are three primary ways to get posts for a feed generator:

  1. Hardcode the post ids in the feed generator (not very interesting)
  2. Make a request to get posts from one or more users’ repositories
  3. Subscribe to the firehose, store the posts, and filter out posts that are not relevant to the feed

Depending on the type of feed you are building all of these are valid options.

Implementing the Norsky feed generator

With the specification in hand I set out to implement the feed generator. I had looked at some starter projects and found several.

All of these looked like good starting points, but I wanted something more lightweight. I started looking into Rust Bluesky clients and found Atrium. After working a bit with Atrium I couldn’t really get a Firehose feed to work. Not all of the library was published to crates.io either, and so the project did not seem quite ready for prime time.

With Rust off the table I decided to look at the next best thing: Go. The Bluesky team was already building an official, albeit experimental, Go client Indigo. Let’s go!

First steps, subscribing to the firehose

For my feed generator it seemed like the best option was to subscribe to the firehose. I couldn’t reliably list all norwegian users anyway, and many of their posts would likely be in English. To get started I googled for an example of subscribing to the firehose in go and found blueskyfirehose. Blueskyfirehose is a command line app that subscribes to the firehose and prints posts to stdout. Having read the source code I found that the indigo library had their own event scheduler setup for processing events from the Firehose.

Subscribing to the firehose then boils down to the following code:

// Setup a websocket dialer
d := websocket.DefaultDialer
con, _, err := d.Dial( "wss://bsky.social/xrpc/com.atproto.sync.subscribeRepos", http.Header{})
if err != nil {
    log.Fatal(err)
}

// Setup a callback event handler function to process events
rscb := &events.RepoStreamCallbacks{
			RepoCommit: func(evt *comatproto.SyncSubscribeRepos_Commit) error {
        // Process a repository commit here
        // This is a bit tricky as you have to do some marshalling and unmarshalling
        // From cbor to json and back to go structs
      }
}

// Setup event scheduler and start processing events
seqScheduler := sequential.NewScheduler(con.RemoteAddr().String(), rscb.EventHandler)
events.HandleRepoStream(ctx, con, seqScheduler)

See full processing example in my repository.

Storing posts

Now that I had a way to subscribe to the firehose I needed a way to store posts. Too keep things simple I wanted an in-process database that would store files on disk. Running things in-process would make it easier to deploy the feed generator and reduce the latency of serving requests. The database should also support some form of querying so that I could filter out posts that were not relevant to a given feed. Based on these requirements I decided to go with a classic database, SQLite. It is known for being fast and reliable and supports my use case well.

I quickly ended up in some trouble though. SQLite is a C program and so would require me to use cgo to interface with it. This is not a problem in itself, but it would make cross-compiling the feed generator more difficult. Fortunately I found a pure Go SQLite implementation called modernc.org/sqlite.

I then created a simple database schema that would allow me to store posts and their languages. While ORM libraries like gorm exist for Go I’ve become a bit skeptical of them. I prefer to model my data in SQL first and then make models in the application code that maps to SQL query results. This allows you to write more efficient queries and makes it easier to reason about the data.

CREATE TABLE posts (
  id INTEGER PRIMARY KEY AUTOINCREMENT, -- We need an id to implement deterministic cursor based pagination
  uri TEXT, -- The URI of the post
  created_at INTEGER NOT NULL -- The time the post was created as a Unix timestamp
);

CREATE TABLE post_languages (
  post_id INTEGER NOT NULL,
  language TEXT NOT NULL,
  PRIMARY KEY (post_id, language),
  FOREIGN KEY (post_id) REFERENCES posts(id) ON DELETE CASCADE
);

To ensure the database is always migrated I use go-migrate. This allows me to write migrations in SQL and have them run automatically when the application starts.

Reading posts from the database

The feed generator needs to be able to read posts from the database to serve the feed skeleton endpoints. Initially I started out by using only the official barebones sql module to write queries. Writing queries by hand and using scanning to map query results to structs is a bit tedious though. So the hunt began for a good query builder library. I found go-sqlbuilder. Using go-sqlbuilder I could write queries like this:

sb := sqlbuilder.NewSelectBuilder()
sb.Select("id", "uri", "created_at", "group_concat(language)").From("posts")
if postId != 0 {
  sb.Where(
    sb.LessThan("id", postId),
  )
}
if lang != "" {
  sb.Where(sb.Equal("language", lang))
}
sb.Join("post_languages", "posts.id = post_languages.post_id")
sb.GroupBy("posts.id")
sb.Limit(limit).OrderBy("id").Desc()

sql, args := sb.BuildWithFlavor(sqlbuilder.Flavor(sqlbuilder.SQLite))

rows, err := reader.db.Query(sql, args...)

This if of course much more verbose than using an ORM, but it is also much more explicit. I can see exactly what the query is doing and how it is being built. It is also possible to log the resulting SQL query and run it manually against the database to debug issues.

Writing posts to the database

It turns out go-sqlbuilder also supports insert statements. This allowed me to write a simple function to insert posts into the database.

insertPost := sqlbuilder.NewInsertBuilder()
sql, args := insertPost.InsertInto("posts").Cols("uri", "created_at").Values(post.Uri, post.CreatedAt).Build()

// Spread args
res, err := db.Exec(sql, args...)
if err != nil {
  log.Error("Error inserting post", err)
  return err
}

// Get inserted id
id, err := res.LastInsertId()
if err != nil {
  log.Error("Error getting inserted id", err)
  return err
}

// Post languages insert query
insertLangs := sqlbuilder.NewInsertBuilder()
insertLangs.InsertInto("post_languages").Cols("post_id", "language")
for _, lang := range post.Languages {
  insertLangs.Values(id, lang)
}
sql, args = insertLangs.Build()

_, err = db.Exec(sql, args...)

Observant readers will notice that I perform the insert of the post and the languages in two separate transactions. This is purely because it is easier to do it this way, and allows me to get a reference to the inserted post’s id.

Serving the feed skeleton

Now that I had a way to subscribe to the firehose and store posts in a database I needed to serve the feed skeleton. Fortunately this is not the first time I have written an HTTP API in Go. So I quickly landed on using fiber. Fiber is a fast HTTP framework for Go that is conceptually similar to Express.js and other Node.js frameworks. You define middleware and routes as callback functions on the server. The middleware is executed in the order they are registered and can be used to do things like logging, authentication, and error handling. Routes are defined by a path and a callback function that is executed when a request matches the path.

app := fiber.New()

// Middleware to track the latency of each request
app.Use(func(c *fiber.Ctx) error {
  start := time.Now()
  err := c.Next()
  stop := time.Now()

  // Diff
  log.WithFields(log.Fields{
    "method":  c.Method(),
    "route":   c.Route().Path,
    "latency": stop.Sub(start),
  }).Info("Request")
  return err
})


app.Get("/", func(c *fiber.Ctx) error {
  return c.SendString("This is the Norsky feed generator for listing Norwegian posts on Bluesky.")
})

// Well known
app.Get("/.well-known/did.json", func(c *fiber.Ctx) error {
  // Return the DID document, using regular map[string]interface{} for now

  return c.JSON(map[string]interface{}{
    "@context": []string{"https://www.w3.org/ns/did/v1"},
    "id":       "did:web:" + config.Hostname,
    "service": []map[string]interface{}{
      {
        "id":              "#bsky_fg",
        "type":            "BskyFeedGenerator",
        "serviceEndpoint": "https://" + config.Hostname,
      },
    },
  })
})

// Rest of the handlers ...

Putting it all together

Now that I had all the pieces I needed to put them together. I wanted the feed generator to be run as a single command in one main process. Having to launch multiple processes would make it more difficult to deploy and manage. I also wanted there to be some way to publish the feed, migrating the database, and other administrative tasks.

Command line interface

To make the feed generator easy to use I decided to create a command line interface. I first used cobra that allows you to create a CLI with subcommands and flags. However I also wanted to be able to configure the feed generator using environment variables. I tried using viper, but it turns out these two libraries don’t play well together.

At the same time while building the publish command I found that there was no client api in the indigo library to authenticate with Bluesky or publish my feed generator. Writing a client api for indigo would be a lot of work! Fortunately Kagi search came to the rescue revealing that the Furry community had already solved this issue. They had built a simple client api for Bluesky. Coincidentally they also used a different command line library urfave/cli. This library was both a bit simpler than Cobra and also supported environment variables as fallback for flags.

Communicating processes

With a way to run commands I set out to build my serve command. It would need to start the HTTP server, subscribe to the firehose, make a process to write posts to the database. Essentially the serve command would run three separate processes in parallel and somehow allow them to communicate. Go is a language that is known for its concurrency primitives. It supports communicating Sequential Processes (CSP) in the form of channels. Channels are typed and can be used to send and receive data between goroutines (go’s coroutines).

So in my serve command I created a channel for each process:

// Channel for subscribing to bluesky posts
postChan := make(chan interface{})

// Setup the server and firehose
app := server.Server(&server.ServerConfig{
  Hostname: hostname,
  Reader:   db.NewReader(database),
})
fh := firehose.New(postChan, ctx.Context)
dbwriter := db.NewWriter(database, postChan)

// Graceful shutdown via wait group
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt)
var wg sync.WaitGroup

go func() {
  <-c
  fmt.Println("Gracefully shutting down...")
  app.ShutdownWithTimeout(60 * time.Second)
  defer wg.Add(-3) // Decrement the waitgroup counter by 2 after shutdown of server and firehose
  fh.Shutdown()
}()

go func() {
  fmt.Println("Subscribing to firehose...")
  if err := fh.Subscribe(); err != nil {
    // Use signal to shutdown all the goroutines
    log.Error(err)
    c <- os.Interrupt
  }
}()

go func() {
  fmt.Println("Starting server...")
  if err := app.Listen(fmt.Sprintf("%s:%d", host, port)); err != nil {
    log.Error(err)
    c <- os.Interrupt
  }
}()

go func() {
  fmt.Println("Starting database writer...")
  dbwriter.Subscribe()
}()

// Wait for both the server and firehose to shutdown
wg.Add(3)
wg.Wait()

log.Info("Norsky feed generator stopped")

return nil

Esentially I first instantiate each service I need to run. A post channel is created to allow the firehose to send posts to the database writer. Finally a goroutine is started for each service which allows them to run in parallel. The wg.Wait() call blocks until the waitgroup counter is zero to tell the app to wait for all goroutines to finish. postChan is passed to both the firehose and the database writer. It is untyped so that it can be used to send any type of firehose event. The database writer then uses type assertions to check if the event is a post and then writes it to the database.

I find the go concurrency primitives to be very powerful and easy to use. They allow you to write code that is easy to reason about and that is also fast. Readers familiar with Clojure will recognize the similarities between go channels and Clojure’s core.async channels. The similarities are not a coincidence as both languages base their concurrency primitives on CSP.

Conclusion

I’ve had a lot of fun building the Norsky feed generator. It has been a great way to learn more about the Bluesky protocol and the AT Protocol. I also learned a lot about Go and how to build concurrent applications.

The go Bluesky library still feels a bit immature. If you want to build a feed generator you should consider using the more mature TypeScript library. But if you are looking for performance Go is a great choice to get started.

Check out Norsky in action on the Norwegian languages feed. The source code is available on GitHub.