feat(pubky): get successful

This commit is contained in:
nazeh
2024-07-23 21:15:41 +03:00
parent 8cf18a3c0c
commit cc97744f25
9 changed files with 167 additions and 100 deletions

1
Cargo.lock generated
View File

@@ -1250,6 +1250,7 @@ dependencies = [
name = "pubky"
version = "0.1.0"
dependencies = [
"bytes",
"flume",
"pkarr",
"pubky-common",

View File

@@ -1,16 +1,23 @@
use std::fs;
use std::path::Path;
use bytes::Bytes;
use heed::{types::Str, Database, Env, EnvOpenOptions, RwTxn};
mod migrations;
pub mod tables;
use migrations::TABLES_COUNT;
use pubky_common::crypto::Hasher;
use tables::{entries::Entry, Tables, TABLES_COUNT};
use pkarr::PublicKey;
use tables::blobs::{BlobsTable, BLOBS_TABLE};
#[derive(Debug, Clone)]
pub struct DB {
pub(crate) env: Env,
pub(crate) tables: Tables,
}
impl DB {
@@ -19,10 +26,110 @@ impl DB {
let env = unsafe { EnvOpenOptions::new().max_dbs(TABLES_COUNT).open(storage) }?;
migrations::run(&env);
let tables = migrations::run(&env)?;
let db = DB { env };
let db = DB { env, tables };
Ok(db)
}
pub fn put_entry(
&mut self,
public_key: &PublicKey,
path: &str,
rx: flume::Receiver<Bytes>,
) -> anyhow::Result<()> {
let mut wtxn = self.env.write_txn()?;
let mut hasher = Hasher::new();
let mut bytes = vec![];
let mut length = 0;
while let Ok(chunk) = rx.recv() {
hasher.update(&chunk);
bytes.extend_from_slice(&chunk);
length += chunk.len();
}
let hash = hasher.finalize();
self.tables.blobs.put(&mut wtxn, hash.as_bytes(), &bytes)?;
let mut entry = Entry::new();
entry.set_content_hash(hash);
entry.set_content_length(length);
let mut key = vec![];
key.extend_from_slice(public_key.as_bytes());
key.extend_from_slice(path.as_bytes());
self.tables.entries.put(&mut wtxn, &key, &entry.serialize());
wtxn.commit()?;
Ok(())
}
pub fn get_blob(
&mut self,
public_key: &PublicKey,
path: &str,
) -> anyhow::Result<Option<Bytes>> {
let mut rtxn = self.env.read_txn()?;
let mut key = vec![];
key.extend_from_slice(public_key.as_bytes());
key.extend_from_slice(path.as_bytes());
if let Some(bytes) = self.tables.entries.get(&rtxn, &key)? {
let entry = Entry::deserialize(bytes)?;
if let Some(blob) = self.tables.blobs.get(&rtxn, entry.content_hash())? {
return Ok(Some(Bytes::from(blob.to_vec())));
};
};
Ok(None)
}
}
#[cfg(test)]
mod tests {
use pkarr::Keypair;
use pubky_common::timestamp::Timestamp;
use crate::config::Config;
use super::{Bytes, DB};
#[tokio::test]
async fn entries() {
let storage = std::env::temp_dir()
.join(Timestamp::now().to_string())
.join("pubky");
let mut db = DB::open(&storage).unwrap();
let keypair = Keypair::random();
let path = "/pub/foo.txt";
let (tx, rx) = flume::bounded::<Bytes>(0);
let mut cloned = db.clone();
let cloned_keypair = keypair.clone();
let done = tokio::task::spawn_blocking(move || {
cloned.put_entry(&cloned_keypair.public_key(), path, rx);
});
tx.send(vec![1, 2, 3, 4, 5].into());
drop(tx);
done.await;
let blob = db.get_blob(&keypair.public_key(), path).unwrap().unwrap();
assert_eq!(blob, Bytes::from(vec![1, 2, 3, 4, 5]));
}
}

View File

@@ -2,16 +2,16 @@ use heed::{types::Str, Database, Env, RwTxn};
mod m0;
use super::tables;
use super::tables::Tables;
pub const TABLES_COUNT: u32 = 4;
pub fn run(env: &Env) -> anyhow::Result<()> {
pub fn run(env: &Env) -> anyhow::Result<Tables> {
let mut wtxn = env.write_txn()?;
m0::run(env, &mut wtxn);
let tables = Tables::new(env, &mut wtxn)?;
wtxn.commit()?;
Ok(())
Ok(tables)
}

View File

@@ -1,6 +1,6 @@
use heed::{types::Str, Database, Env, RwTxn};
use super::tables::{blobs, entries, sessions, users};
use crate::database::tables::{blobs, entries, sessions, users};
pub fn run(env: &Env, wtxn: &mut RwTxn) -> anyhow::Result<()> {
let _: users::UsersTable = env.create_database(wtxn, Some(users::USERS_TABLE))?;

View File

@@ -2,3 +2,29 @@ pub mod blobs;
pub mod entries;
pub mod sessions;
pub mod users;
use heed::{Env, RwTxn};
use blobs::{BlobsTable, BLOBS_TABLE};
use entries::{EntriesTable, ENTRIES_TABLE};
pub const TABLES_COUNT: u32 = 4;
#[derive(Debug, Clone)]
pub struct Tables {
pub blobs: BlobsTable,
pub entries: EntriesTable,
}
impl Tables {
pub fn new(env: &Env, wtxn: &mut RwTxn) -> anyhow::Result<Self> {
Ok(Self {
blobs: env
.open_database(wtxn, Some(BLOBS_TABLE))?
.expect("Blobs table already created"),
entries: env
.open_database(wtxn, Some(ENTRIES_TABLE))?
.expect("Entries table already created"),
})
}
}

View File

@@ -78,6 +78,6 @@ impl Entry {
panic!("Unknown Entry version");
}
Ok(from_bytes(bytes)?)
from_bytes(bytes)
}
}

View File

@@ -49,6 +49,10 @@ where
pub struct EntryPath(pub(crate) String);
impl EntryPath {
pub fn as_str(&self) -> &str {
self.0.as_str()
}
pub fn as_bytes(&self) -> &[u8] {
self.0.as_bytes()
}

View File

@@ -23,7 +23,7 @@ use crate::{
};
pub async fn put(
State(state): State<AppState>,
State(mut state): State<AppState>,
pubky: Pubky,
path: EntryPath,
mut body: Body,
@@ -45,43 +45,7 @@ pub async fn put(
// TODO: Authorize
let mut wtxn = state.db.env.write_txn()?;
let blobs: BlobsTable = state
.db
.env
.open_database(&wtxn, Some(BLOBS_TABLE))?
.expect("Blobs table already created");
let entries: EntriesTable = state
.db
.env
.open_database(&wtxn, Some(ENTRIES_TABLE))?
.expect("Entries table already created");
let mut hasher = Hasher::new();
let mut bytes = vec![];
let mut length = 0;
while let Ok(chunk) = rx.recv() {
hasher.update(&chunk);
bytes.extend_from_slice(&chunk);
length += chunk.len();
}
let hash = hasher.finalize();
blobs.put(&mut wtxn, hash.as_bytes(), &bytes)?;
let mut entry = Entry::new();
entry.set_content_hash(hash);
entry.set_content_length(length);
let mut key = vec![];
key.extend_from_slice(public_key.as_bytes());
key.extend_from_slice(path.as_bytes());
entries.put(&mut wtxn, &key, &entry.serialize());
state.db.put_entry(public_key, path.as_str(), rx);
Ok(())
});
@@ -101,7 +65,7 @@ pub async fn put(
}
pub async fn get(
State(state): State<AppState>,
State(mut state): State<AppState>,
pubky: Pubky,
path: EntryPath,
) -> Result<impl IntoResponse> {
@@ -111,39 +75,9 @@ pub async fn get(
let public_key = pubky.public_key();
let mut rtxn = state.db.env.read_txn()?;
let entries: EntriesTable = state
.db
.env
.open_database(&rtxn, Some(ENTRIES_TABLE))?
.expect("Entries table already created");
let blobs: BlobsTable = state
.db
.env
.open_database(&rtxn, Some(BLOBS_TABLE))?
.expect("Blobs table already created");
let mut count = 0;
for x in entries.iter(&rtxn)? {
count += 1
match state.db.get_blob(public_key, path.as_str()) {
Err(error) => Err(error)?,
Ok(Some(bytes)) => Ok(bytes),
Ok(None) => Err(Error::with_status(StatusCode::NOT_FOUND)),
}
return Err(Error::new(StatusCode::NOT_FOUND, count.to_string().into()));
let mut key = vec![];
key.extend_from_slice(public_key.as_bytes());
key.extend_from_slice(path.as_bytes());
if let Some(bytes) = entries.get(&rtxn, &key)? {
let entry = Entry::deserialize(bytes)?;
if let Some(blob) = blobs.get(&rtxn, entry.content_hash())? {
return Ok(blob.to_vec());
};
};
Err(Error::new(StatusCode::NOT_FOUND, path.0.into()))
}

View File

@@ -27,15 +27,7 @@ impl PubkyClient {
url.set_path(&format!("/{pubky}/{path}"));
let result = self.request(super::HttpMethod::Get, &url).call();
if let Err(error) = result {
dbg!(&error);
return Err(error)?;
}
let response = result.unwrap();
let response = self.request(super::HttpMethod::Get, &url).call()?;
let len = response
.header("Content-Length")
@@ -45,7 +37,7 @@ impl PubkyClient {
// TODO: bail on too large files.
let mut bytes = Vec::with_capacity(len as usize);
let mut bytes = vec![0; len as usize];
response.into_reader().read_exact(&mut bytes);
@@ -102,14 +94,17 @@ mod tests {
}
}
let response = client.get(&keypair.public_key(), "/pub/foo.txt").await;
let response = client
.get(&keypair.public_key(), "/pub/foo.txt")
.await
.unwrap();
if let Err(Error::Ureq(ureqerror)) = response {
if let Some(r) = ureqerror.into_response() {
dbg!(r.into_string());
}
}
// if let Err(Error::Ureq(ureqerror)) = response {
// if let Some(r) = ureqerror.into_response() {
// dbg!(r.into_string());
// }
// }
// dbg!(response);
assert_eq!(response, bytes::Bytes::from(vec![0, 1, 2, 3, 4]))
}
}