# Mythical Data Pulls Pt. 2

*A compact introduction to Link states*

By [Building in Public](https://paragraph.com/@0x330a) · 2024-06-25

rust, farcaster, data

---

### Previously, on Mythical Data Pulls

This is a continuation of a previous post, [which you can find here](https://paragraph.xyz/@0x330a/mythical-data-pulls-pt-1). Some of the content or locations to find certain information related to building Farcaster apps using your own Hub data may be skipped over or implied. I'll try to re-introduce topics where appropriate.

### "If you wish to make a follow list from scratch, you must first iterate through the universe" - Carl Sagan, probably

Previously we fetched a user's profile information: username, display name, bio, profile pic, and URL, all things that could be considered essential displayable information for a user in a Farcaster application. A slightly more difficult metric to track that is just as expected to be seen on a user's profile is the people they follow and more importantly, the people that follow them. If you recall the Hub's perspective on data and the available RPC calls we have you might remember we have the ability to query against every link message created by a fid:

![](https://storage.googleapis.com/papyrus_images/46afefa641c131aaa0f2194acb66c26b.png)

This works fine for getting the list of people that user **follows** but now we are faced with a new problem - we don't know who **follows them**. Instead, we _could_ use the `GetLinksByTarget` RPC, although I am unsure on whether Hubs will store this data properly in the future as they are keyed based on inserting LinkAdd/LinkRemoves, which may not be used via the LinkCompactStateBody messages and lead to new hubs having incomplete data in the future(?). Either way, it probably pays to just iterate every fid to get all their data anyway, arriving at a reduced state of the network we care about, such as links, reactions, casts, etc.

### Implementing a crude follow index

Since we are already iterating through every known fid, we could just as easily add some other query information in there as we go. Picture the following flow: Get the total fid count **N** from our hub, then from fid **i** in **1 to** **N** we would simply query that fid's follow list, add it to some database or however we want to store it, and while we're at it we could get the current user profile information (from before) alongside their follow list. After iterating through every fid we should have the current state of the network mapped to show everyone's follows and their profile information and give us insights like who is the most popular user or even build up some social score metrics by weighting the followers based on some other value. We could imagine a follow by a user who is also followed by a lot of other users is weighted higher than a follow by a fresh account for example.

### The Virgin LinkBody vs the Chad LinkCompactStateBody

It's probably worth exploring the actual messages and how Hubs communicate them between each other and what it means for a consumer to receive certain types of follows, as recently the link compact messages threw me for a loop and might not be intuitive to understand compared to the normal link add / remove message types:

`MessageData` is a protobuf 'message' or type which contains a `MessageType`, the fid sending it, the timestamp they sent it (in seconds from Farcaster epoch, which is different to unix epoch), and a body which can be one of a few different types. The body types can have their own that make sense, for example a `ReactionBody` might have the `target` cast or URL(?) the user is reacting to, a `LinkBody` has the `target_fid` that the link add/remove is relevant to.

      /**
     * A MessageData object contains properties common to all messages and wraps a body object which
     * contains properties specific to the MessageType.
     */
    message MessageData {
      MessageType type = 1; // Type of message contained in the body
      uint64 fid = 2; // Farcaster ID of the user producing the message
      uint32 timestamp = 3; // Farcaster epoch timestamp in seconds
      FarcasterNetwork network = 4; // Farcaster network the message is intended for
      oneof body {
        UserDataBody user_data_body = 12;
        LinkBody link_body = 14;
        /* other message body types etc ... */
        // Compaction messages
        LinkCompactStateBody link_compact_state_body = 17;
      } // Properties specific to the MessageType
    }

![](https://storage.googleapis.com/papyrus_images/f66d907420fb8da556398d7267958e06.png)

Keen-eyed readers will notice the `LinkCompactStateBody` type, which was recently added as part of the [FIP-15 Link Defragmentation](https://github.com/farcasterxyz/protocol/discussions/169) improvement discussion. Essentially this new message should be handled differently by both Hubs and applications consuming hub messages. The improvement took a little bit of re-reading to understand but the crux of the problem this improvement addresses is that users have only a limited number of messages that hubs can store and are pruned once the user's storage limits are exceeded for that message type. As a refresher, you can check out the [Farcaster protocol architecture](https://docs.farcaster.xyz/learn/what-is-farcaster/messages#storage) for how storage works and the limits for each type of data.

Presently, at **2500** Link messages including removes and adds, your historical message will start getting pruned from Hubs until your message count for Links is inside that limit. The [Link CRDT](https://github.com/farcasterxyz/protocol/blob/main/docs/SPECIFICATION.md#316-link-crdt) resolves merging link messages in a specific way, such that older messages for the same **link type** and **target fid** for any given user will be removed and updated with the latest message resolved by later timestamp. This means that fid 1 storing a LinkAdd with a **type** of **"follow"** and a **target\_fid** of **2** will be overwritten by a LinkRemove with **type** of **"follow"** and a **target\_fid** of **2** if the timestamp of the Remove is later than the Add. The `LinkCompactStateBody` is a kind of "signalling" message in this way, as well as useful for storing a larger amount of "currently followed" fids (A `LinkCompactStateBody` can have as many as `10 * STORAGE_UNITS * LINKS_PER_STORAGE_UNIT` fids specified as "currently following", meaning that a user with a single storage unit (the minimum requirement) would be able to express `10 * 1 * 2500 = 250,000` "follows" in a single follow message).

![](https://storage.googleapis.com/papyrus_images/09ac138d8d2fdd30b8f33019e9bc4863.png)

Hubs will store a number of LinkBody messages based on their timestamps and when merging a compact state message, however, only one CompactListStateBody will be stored per user.

The only complications to understand for a developer is how a Hub should treat this message, and what to interpret receiving a message of this type as. Jumping back to the Rust context, you could essentially treat all link messages as a `Vec<LinkAction>` here, where a single `LinkBody` with `MessageType == MessageType::MESSAGE_TYPE_LINK_ADD` is a single-element collection with an **add** link type. A single `LinkBody` with a `MessageType == MessageType::MESSAGE_TYPE_LINK_REMOVE` could be thought of as a single-element collection with a **remove** link type. The `LinkCompactStateBody` then can be considered a 1..250,000 element collection with a Link**Add** type, as any fid in this list should be considered "currently followed" by the application. An important thing to note here is that the `LinkCompactStateBody` body should probably a `MessageType == MessageType::MESSAGE_TYPE_LINK_COMPACT_STATE`, so if you are filtering by `MessageType` in a subscription of **merge** messages, you might miss it expecting only link adds or removes.

Another thing to note is that the `LinkCompactStateBody` inherently cannot preserve the original follow times for each of the users in the list, so it could be assumed that the follow time is the compact message's timestamp if we haven't already merged in a `LinkAdd` with the same target fid and link type for that user. We might want to therefore try to merge all the links we can, and then request any user's `LinkCompactStateBody`, of which there should only be one message anyway, and only add in any missing LinkAdds, not overwrite any timestamped links we have already added of those types and targets. The `LinkCompactStateBody` also doesn't have a link type, so you might assume it is always going to be a "follow" type, even if more link types are added in the future.

### Show me the code already!

Let's start by defining the overall interaction we want to be able to facilitate, a user might want a know their follow list and who follows them, let's say we keep this as a simple index that keeps updated from a subscription to a hub built with an initial index of every fid as well. The end user could be anything from a custom client, a terminal UI, a website dashboard, a frame or even just exported to run some analysis on for building a social graph or user power score.

![](https://storage.googleapis.com/papyrus_images/486eb9397cfd148ef3f8a5b664f8adbe.png)

Our application is the blue, literally just a SQLite table with maybe a rest API in front of it

To start off we are going to bring in the usual suspects from the previous exercise, this time also adding in some sort of data persistence library because we want to actually keep this follow list to query against in the future to save having to make RPC calls or re-indexing every time a user talks to it. Let's start by setting up a simple table to track the link state, here I'm going to use SQLite and Diesel in Rust to make it easier:

    ~/git/fid-playground$ echo DATABASE_URL=sqlite://data.db > .env
    ~/git/fid-playground$ diesel setup
    ~/git/fid-playground$ diesel migration generate create_links

Now we can fill in a super super super basic sql up and down to create the required table, we won't worry about other link types for now and we can filter on only "follow" links in our code

    -- up.sql
    CREATE TABLE IF NOT EXISTS links (
        fid INTEGER,
        target INTEGER,
        timestamp INTEGER,
        PRIMARY KEY (fid, target)
    );

    -- down.sql
    DROP TABLE links;

Running `diesel migration run` should work without any issues and give us a brand new links tables to track the from<->to follow link relationship. I'm going to first define some `Message` to `LinkAction` processes to interpret the `LinkBody` messages with `MESSAGE_TYPE_LINK_ADD` and `MESSAGE_TYPE_LINK_REMOVE` message types, recall the protobuf structure from above if any of this seems like we're speeding through it (This implementation is pretty much all taken from fatline-rs). Recall that we want to create a `Vec` of `LinkAction`'s through the stream of every user's links via the `GetAllLinkMessagesByFid` request which takes a `FidRequest`. This in turn will give us back a list of `Message`s, which should be all of the user's link adds and removes.

We'll start by mapping out our LinkAction, to represent the possible actions that someone can perform via a link add or remove message

    #[derive(Debug, Serialize, Deserialize, Clone)]
    pub enum LinkAction {
        AddFollow(LinkInfo),
        RemoveFollow(LinkInfo)
    }

Next, we'll create two more functions, one to map the message type and link's body information into the previous enum, and one to handle the unwrapping of the protobuf `Message` itself,

    fn map_link_action(message_type: &MessageType, target: u64, timestamp: u32) -> Option<LinkAction> {
        let info = LinkInfo {
            target_fid: target,
            timestamp
        };
        match message_type {
            MessageType::LinkAdd => Some(LinkAction::AddFollow(info)),
            MessageType::LinkRemove => Some(LinkAction::RemoveFollow(info)),
            MessageType::LinkCompactState => Some(LinkAction::AddFollow(info)),
            _ => None
        }
    }
    
    pub(crate) fn link_from_message(message: Message) -> Option<Vec<LinkAction>> {
        let data = message.data?;
        let data_type = data.r#type();
    
        match data.body? {
            Body::LinkBody(link) => {
                let TargetFid(target) = link.target?;
                let mapped = map_link_action(&data_type, target, data.timestamp)?;
                Some(vec![mapped])
            },
            Body::LinkCompactStateBody(compaction) => {
                let mapped = compaction.target_fids
                    .iter().copied()
                    .filter_map(|target| map_link_action(&data_type, target, data.timestamp))
                    .collect::<Vec<_>>();
                if mapped.is_empty() {
                    None
                } else {
                    Some(mapped)
                }
            },
            _ => None
        }
    }

Here we simply add in a way to get a `Vec` of `LinkAction`'s from any message, whether it's a `LinkCompactStateBody` or a `LinkBody`, we just use the `MessageType` to figure out whether it will be an add or remove, treating the `MessagaeType::LinkCompactState` as a list of adds as well. We use an optional in case the data body isn't what we expect, as we are dealing with the proto `Message`, which could contain any of the valid body types as its inner body and lets us extend the types at a future date if other link type messages are added.

fatline-rs gives us a way to receive all of these messages as a `Stream`, which is useful for just iterating or collecting them all from some intermediate steps, here we are going to use that property to iterate over all the link add and remove actions to run a database query for all of them, either creating a row or deleting a row in the table we defined earlier. The primary key being composed of both the source and target fids should let us have only one entry per person per target, which is fine for our needs. We need to define the model class for generating some diesel database types that allow insertion and query that match the previously defined SQLite table:

    // model.rs
    use diesel::prelude::*;
    
    #[derive(Queryable,Selectable,Insertable,Debug, Clone)]
    #[diesel(table_name = crate::schema::links)]
    #[diesel(check_for_backend(diesel::sqlite::Sqlite))]
    pub(crate) struct LinkEntry {
        pub fid: i32,
        pub target: i32,
        pub timestamp: Option<i32>
    }

Now we can wire it all up with a very simple `main.rs` file to consume the hub messages transformed into `LinkAction`s

    use std::pin::pin;
    
    use diesel::{BoolExpressionMethods, Connection, ExpressionMethods, QueryDsl, RunQueryDsl, SqliteConnection};
    use diesel::associations::HasTable;
    use dotenvy_macro::dotenv;
    use fatline_rs::hub::HubInfoService;
    use fatline_rs::HubService;
    use fatline_rs::stream::{LinkAction, StreamService};
    use tokio_stream::StreamExt;
    
    use crate::model::LinkEntry;
    use crate::schema::links::dsl::{fid as link_fid, links, target as link_target};
    
    mod schema;
    mod model;
    
    const DB_URL: &'static str = dotenv!("DATABASE_URL");
    const HUB_URL: &'static str = dotenv!("HUB_URL");
    
    #[tokio::main]
    async fn main() -> Result<(), Box<dyn std::error::Error>> {
        let mut connection = SqliteConnection::establish(DB_URL)?;
        let mut client = HubService::connect(HUB_URL).await?;
    
        // for iterating
        let total_fid_count = client.get_current_fid_count().await?;
    
        for fid in 1..=total_fid_count {
            let page_size = None;
            let reverse = false;
            let fid_links = client.get_all_link_messages(fid, reverse, page_size);
            let mut fid_links = pin!(fid_links);
    
            // our actual stream iteration happens here
            while let Some(link_action) = fid_links.next().await {
                match link_action {
                    LinkAction::AddFollow(info) => {
                        // when it's an add then insert values
                        let model = LinkEntry {
                            fid: fid as i32,
                            target: info.target_fid as i32,
                            timestamp: Some(info.timestamp as i32),
                        };
                        // Database insertion
                        diesel::insert_into(links::table())
                            .values(model)
                            .on_conflict_do_nothing() // my hub has some weird duplicate adds with differing timestamps, maybe a hub sync issue?
                            .execute(&mut connection)?;
                    },
                    LinkAction::RemoveFollow(info) => {
                        // when it's a remove then remove values (if any exist)
                        let source = fid as i32;
                        let target = info.target_fid as i32;
                        // Database deletion
                        diesel::delete(links.filter(link_target.eq(target).and(link_fid.eq(source))))
                            .execute(&mut connection)?;
                    }
                }
            }
            println!("Mapped fid {fid}");
        }
        // we now have every fid link mapped
        Ok(())
    }

Running this application should now ingest all the link information and print every fid that is mapped after consuming all the current link events, minus the compact state list events, but for now it should get us started. The output looks something like this:

    ... etc
    Mapped fid 14
    Mapped fid 15
    Mapped fid 16
    Mapped fid 17
    Mapped fid 18
    Mapped fid 19
    Mapped fid 20
    ... etc

And output a table that looks something like this:

![](https://storage.googleapis.com/papyrus_images/2c7cbd1113a5e9eb3fbe1eecf4e970f1.png)

You should be able to query against this and serve this data in... several days time... or less, depending on your latency to the hub and transfer speed I'm guessing, as well as any SQL tuning we do to optimise the writes. Another optimisation could be starting the initial sync in a breadth-first approach, you might have a specific fid in mind, and queue up that initial fid, followed by all the fids they follow, and their fids they follow etc, instead of brute forcing 1 to 700,000+. At the end of the day these are just some trade-offs you might want to make in your own application to get to a faster result quicker. You might even fall back to using the get links by target RPC for the specific fids you want, and branch out that way to at least build two-way links from an initial fid.

As of writing this I don't have a checkpointed data source for people to analyze, but in theory it would be pretty easy to spin one up if you left the application running, although you might want to throw in some extra data into that export reactions, profile info and casts before actually wanting to use it for anything useful.

Thanks for getting this far and let me know if you have any other useful data questions you want to explore for a future post, or how you use the link data from the Farcaster network in your own applications!

---

*Originally published on [Building in Public](https://paragraph.com/@0x330a/mythical-data-pulls-2)*
