feat(homeserver): add /events/ endpoint to list PUT/DELETE events

This commit is contained in:
nazeh
2024-08-20 21:53:01 +03:00
parent 0ce4a9da65
commit b4c7fdad45
9 changed files with 154 additions and 9 deletions

View File

@@ -9,6 +9,8 @@ pub mod tables;
use tables::{Tables, TABLES_COUNT};
pub const MAX_LIST_LIMIT: u16 = 100;
#[derive(Debug, Clone)]
pub struct DB {
pub(crate) env: Env,
@@ -43,7 +45,7 @@ mod tests {
.join(Timestamp::now().to_string())
.join("pubky");
let mut db = DB::open(&storage).unwrap();
let db = DB::open(&storage).unwrap();
let keypair = Keypair::random();
let path = "/pub/foo.txt";

View File

@@ -1,6 +1,6 @@
use heed::{Env, RwTxn};
use crate::database::tables::{blobs, entries, sessions, users};
use crate::database::tables::{blobs, entries, events, sessions, users};
pub fn run(env: &Env, wtxn: &mut RwTxn) -> anyhow::Result<()> {
let _: users::UsersTable = env.create_database(wtxn, Some(users::USERS_TABLE))?;
@@ -11,5 +11,7 @@ pub fn run(env: &Env, wtxn: &mut RwTxn) -> anyhow::Result<()> {
let _: entries::EntriesTable = env.create_database(wtxn, Some(entries::ENTRIES_TABLE))?;
let _: events::EventsTable = env.create_database(wtxn, Some(events::EVENTS_TABLE))?;
Ok(())
}

View File

@@ -1,5 +1,6 @@
pub mod blobs;
pub mod entries;
pub mod events;
pub mod sessions;
pub mod users;
@@ -8,12 +9,15 @@ use heed::{Env, RwTxn};
use blobs::{BlobsTable, BLOBS_TABLE};
use entries::{EntriesTable, ENTRIES_TABLE};
pub const TABLES_COUNT: u32 = 4;
use self::events::{EventsTable, EVENTS_TABLE};
pub const TABLES_COUNT: u32 = 5;
#[derive(Debug, Clone)]
pub struct Tables {
pub blobs: BlobsTable,
pub entries: EntriesTable,
pub events: EventsTable,
}
impl Tables {
@@ -25,6 +29,9 @@ impl Tables {
entries: env
.open_database(wtxn, Some(ENTRIES_TABLE))?
.expect("Entries table already created"),
events: env
.open_database(wtxn, Some(EVENTS_TABLE))?
.expect("Events table already created"),
})
}
}

View File

@@ -12,7 +12,7 @@ pub const BLOBS_TABLE: &str = "blobs";
impl DB {
pub fn get_blob(
&mut self,
&self,
public_key: &PublicKey,
path: &str,
) -> anyhow::Result<Option<bytes::Bytes>> {

View File

@@ -13,15 +13,15 @@ use pubky_common::{
timestamp::Timestamp,
};
use crate::database::DB;
use crate::database::{DB, MAX_LIST_LIMIT};
use super::events::Event;
/// full_path(pubky/*path) => Entry.
pub type EntriesTable = Database<Str, Bytes>;
pub const ENTRIES_TABLE: &str = "entries";
const MAX_LIST_LIMIT: u16 = 100;
impl DB {
pub fn put_entry(
&mut self,
@@ -56,6 +56,19 @@ impl DB {
.entries
.put(&mut wtxn, &key, &entry.serialize())?;
if path.starts_with("pub/") {
let url = format!("pubky://{key}");
let event = Event::put(&url);
let value = event.serialize();
let key = entry.timestamp.to_string();
self.tables.events.put(&mut wtxn, &key, &value)?;
// TODO: delete older events.
// TODO: move to events.rs
}
wtxn.commit()?;
Ok(())

View File

@@ -0,0 +1,54 @@
//! Server events (Put and Delete entries)
//!
//! Useful as a realtime sync with Indexers until
//! we implement more self-authenticated merkle data.
use heed::{
types::{Bytes, Str},
Database,
};
use postcard::{from_bytes, to_allocvec};
use serde::{Deserialize, Serialize};
/// Event [Timestamp] base32 => Encoded event.
pub type EventsTable = Database<Str, Bytes>;
pub const EVENTS_TABLE: &str = "events";
#[derive(Clone, Serialize, Deserialize, Debug, Eq, PartialEq)]
pub enum Event {
Put(String),
Delete(String),
}
impl Event {
pub fn put(url: &str) -> Self {
Self::Put(url.to_string())
}
pub fn serialize(&self) -> Vec<u8> {
to_allocvec(self).expect("Session::serialize")
}
pub fn deserialize(bytes: &[u8]) -> core::result::Result<Self, postcard::Error> {
if bytes[0] > 1 {
panic!("Unknown Event version");
}
from_bytes(bytes)
}
pub fn url(&self) -> &str {
match self {
Event::Put(url) => url,
Event::Delete(url) => url,
}
}
pub fn operation(&self) -> &str {
match self {
Event::Put(_) => "PUT",
Event::Delete(_) => "DEL",
}
}
}

View File

@@ -11,6 +11,7 @@ use crate::server::AppState;
use self::pkarr::pkarr_router;
mod auth;
mod feed;
mod pkarr;
mod public;
mod root;
@@ -25,6 +26,7 @@ fn base(state: AppState) -> Router {
.route("/:pubky/*path", put(public::put))
.route("/:pubky/*path", get(public::get))
.route("/:pubky/*path", delete(public::delete))
.route("/events/", get(feed::feed))
.layer(CookieManagerLayer::new())
// TODO: revisit if we enable streaming big payloads
// TODO: maybe add to a separate router (drive router?).

View File

@@ -0,0 +1,65 @@
use std::collections::HashMap;
use axum::{
body::Body,
extract::{Query, State},
http::{header, Response, StatusCode},
response::IntoResponse,
};
use crate::{
database::{tables::events::Event, MAX_LIST_LIMIT},
error::Result,
server::AppState,
};
pub async fn feed(
State(state): State<AppState>,
Query(params): Query<HashMap<String, String>>,
) -> Result<impl IntoResponse> {
let txn = state.db.env.read_txn()?;
let limit = params
.get("limit")
.and_then(|l| l.parse::<u16>().ok())
.unwrap_or(MAX_LIST_LIMIT)
.min(MAX_LIST_LIMIT);
let mut cursor = params
.get("cursor")
.map(|c| c.as_str())
.unwrap_or("0000000000000");
if cursor.len() < 13 {
cursor = "0000000000000"
}
let mut result: Vec<String> = vec![];
let mut next_cursor = "".to_string();
for _ in 0..limit {
match state.db.tables.events.get_greater_than(&txn, cursor)? {
Some((timestamp, event_bytes)) => {
let event = Event::deserialize(event_bytes)?;
let line = format!("{} {}", event.operation(), event.url());
next_cursor = timestamp.to_string();
result.push(line);
}
None => break,
};
}
if !result.is_empty() {
result.push(format!("cursor: {next_cursor}"))
}
txn.commit()?;
Ok(Response::builder()
.status(StatusCode::OK)
.header(header::CONTENT_TYPE, "text/plain")
.body(Body::from(result.join("\n")))
.unwrap())
}

View File

@@ -62,7 +62,7 @@ pub async fn put(
}
pub async fn get(
State(mut state): State<AppState>,
State(state): State<AppState>,
pubky: Pubky,
path: EntryPath,
Query(params): Query<HashMap<String, String>>,
@@ -96,7 +96,7 @@ pub async fn get(
return Ok(Response::builder()
.status(StatusCode::OK)
.header(header::CONTENT_TYPE, "application/json")
.header(header::CONTENT_TYPE, "text/plain")
.body(Body::from(vec.join("\n")))
.unwrap());
}