From 8dad6da45ab36d14b6aa01164726179ffa874931 Mon Sep 17 00:00:00 2001 From: Oliver Gugger Date: Mon, 22 Nov 2021 16:30:48 +0100 Subject: [PATCH 1/5] aperture: add hashmail server --- go.mod | 4 +- go.sum | 30 ++- hashmail_server.go | 626 +++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 657 insertions(+), 3 deletions(-) create mode 100644 hashmail_server.go diff --git a/go.mod b/go.mod index a5972fc..3ca9d72 100644 --- a/go.mod +++ b/go.mod @@ -10,6 +10,7 @@ require ( github.com/fortytw2/leaktest v1.3.0 github.com/golang/protobuf v1.5.2 github.com/jessevdk/go-flags v1.4.0 + github.com/lightninglabs/lightning-node-connect/hashmailrpc v1.0.2 github.com/lightninglabs/lndclient v0.12.0-9 github.com/lightningnetwork/lnd v0.13.0-beta.rc5.0.20210728112744-ebabda671786 github.com/lightningnetwork/lnd/cert v1.0.3 @@ -18,7 +19,8 @@ require ( go.etcd.io/etcd/server/v3 v3.5.0 golang.org/x/crypto v0.0.0-20201002170205-7f63de1d35b0 golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4 - google.golang.org/grpc v1.38.0 + golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba + google.golang.org/grpc v1.39.0 gopkg.in/macaroon.v2 v2.1.0 gopkg.in/yaml.v2 v2.4.0 ) diff --git a/go.sum b/go.sum index d6b5ed4..4540307 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,6 @@ cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= +cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= cloud.google.com/go v0.38.0/go.mod h1:990N+gfupTy94rShfmMCWGDn0LpTmnzTp2qbd1dvSRU= cloud.google.com/go v0.44.1/go.mod h1:iSa0KzasP4Uvy3f1mN/7PiObzGgflwredwwASm/v6AU= cloud.google.com/go v0.44.2/go.mod h1:60680Gw3Yr4ikxnPRS/oxxkBccT6SA1yMk63TGekxKY= @@ -56,6 +57,7 @@ github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRF github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho= github.com/antihax/optional v0.0.0-20180407024304-ca021399b1a6/go.mod h1:V8iCPQYkqmusNa815XgQio277wI47sdRh1dUOLdyC6Q= github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY= +github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY= github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e/go.mod h1:3U/XgcO3hCbHZ8TKRvWD2dDTCfh9M9ya+I9JpbB7O8o= github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da/go.mod h1:Q73ZrmVTwzkszR9V5SSuryQ31EELlFMUz1kKyl939pY= github.com/armon/go-radix v0.0.0-20180808171621-7fddfc383310/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8= @@ -75,6 +77,8 @@ github.com/btcsuite/btcd v0.20.1-beta.0.20200513120220-b470eee47728/go.mod h1:wV github.com/btcsuite/btcd v0.21.0-beta.0.20201208033208-6bd4c64a54fa/go.mod h1:Sv4JPQ3/M+teHz9Bo5jBpkNcP0x6r7rdihlNL/7tTAs= github.com/btcsuite/btcd v0.21.0-beta.0.20210426180113-7eba688b65e5/go.mod h1:9n5ntfhhHQBIhUvlhDvD3Qg6fRUj4jkN0VB8L8svzOA= github.com/btcsuite/btcd v0.21.0-beta.0.20210513141527-ee5896bad5be h1:vDD/JWWS2v4GJUG/RZE/50wT6Saerbujijd7mFqgsKI= +github.com/btcsuite/btcd v0.21.0-beta.0.20210513141527-ee5896bad5be h1:vDD/JWWS2v4GJUG/RZE/50wT6Saerbujijd7mFqgsKI= +github.com/btcsuite/btcd v0.21.0-beta.0.20210513141527-ee5896bad5be/go.mod h1:9n5ntfhhHQBIhUvlhDvD3Qg6fRUj4jkN0VB8L8svzOA= github.com/btcsuite/btcd v0.21.0-beta.0.20210513141527-ee5896bad5be/go.mod h1:9n5ntfhhHQBIhUvlhDvD3Qg6fRUj4jkN0VB8L8svzOA= github.com/btcsuite/btclog v0.0.0-20170628155309-84c8d2346e9f h1:bAs4lUbRJpnnkd9VhRV3jjAVU7DJVjMaK+IsvSeZvFo= github.com/btcsuite/btclog v0.0.0-20170628155309-84c8d2346e9f/go.mod h1:TdznJufoqS23FtqVCzL0ZqgP5MqXbb4fg/WgDys70nA= @@ -136,6 +140,7 @@ github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMn github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk= +github.com/cncf/xds/go v0.0.0-20210312221358-fbca930ec8ed/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= github.com/cockroachdb/datadriven v0.0.0-20200714090401-bf6692d28da5 h1:xD/lrqdvwsc+O2bjSSi3YqY73Ke3LAiSCx49aCesA0E= github.com/cockroachdb/datadriven v0.0.0-20200714090401-bf6692d28da5/go.mod h1:h6jFvWxBdQXxjopDMZyH2UVceIRfR84bdzbkoKrsWNo= github.com/cockroachdb/errors v1.2.4 h1:Lap807SXTH5tri2TivECb/4abUkMZC9zRoLarvcKDqs= @@ -172,6 +177,7 @@ github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.m github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98= github.com/envoyproxy/go-control-plane v0.9.9-0.20201210154907-fd9021fe5dad/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk= github.com/envoyproxy/go-control-plane v0.9.9-0.20210217033140-668b12f5399d/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk= +github.com/envoyproxy/go-control-plane v0.9.9-0.20210512163311-63b5d3c536b0/go.mod h1:hliV/p42l8fGbc6Y9bQ70uLwIvmJyVE5k4iMKlh8wCQ= github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4= github.com/form3tech-oss/jwt-go v3.2.3+incompatible h1:7ZaBxOI7TMoYBfyA3cQHErNNyAWIKUMIwqxEtgHOs5c= @@ -229,6 +235,7 @@ github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5y github.com/golang/protobuf v1.3.3/go.mod h1:vzj43D7+SQXF/4pzW/hwtAqwc6iTitCiVSaWz5lYuqw= github.com/golang/protobuf v1.3.4/go.mod h1:vzj43D7+SQXF/4pzW/hwtAqwc6iTitCiVSaWz5lYuqw= github.com/golang/protobuf v1.3.5/go.mod h1:6O5/vntMXwX2lRkT1hjjk0nAC1IDOTvTlVgjlRvqsdk= +github.com/golang/protobuf v1.3.5/go.mod h1:6O5/vntMXwX2lRkT1hjjk0nAC1IDOTvTlVgjlRvqsdk= github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8= github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA= github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs= @@ -252,6 +259,7 @@ github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMyw github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.4.1/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.1/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= @@ -366,7 +374,11 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/lightninglabs/gozmq v0.0.0-20191113021534-d20a764486bf h1:HZKvJUHlcXI/f/O0Avg7t8sqkPo78HFzjmeYFl6DPnc= github.com/lightninglabs/gozmq v0.0.0-20191113021534-d20a764486bf/go.mod h1:vxmQPeIQxPf6Jf9rM8R+B4rKBqLA2AjttNxkFBL2Plk= +github.com/lightninglabs/lightning-node-connect/hashmailrpc v1.0.2 h1:Er1miPZD2XZwcfE4xoS5AILqP1mj7kqnhbBSxW9BDxY= +github.com/lightninglabs/lightning-node-connect/hashmailrpc v1.0.2/go.mod h1:antQGRDRJiuyQF6l+k6NECCSImgCpwaZapATth2Chv4= github.com/lightninglabs/lndclient v0.12.0-9 h1:w5Ozl74o7hJElxSk4hRu2enCNzdJfshDbJsJzhBaI3I= +github.com/lightninglabs/lndclient v0.12.0-9 h1:w5Ozl74o7hJElxSk4hRu2enCNzdJfshDbJsJzhBaI3I= +github.com/lightninglabs/lndclient v0.12.0-9/go.mod h1:L0R2VOaLxMylGbxgnfiZGc0hMDIIgj91cfgwGuFz9kU= github.com/lightninglabs/lndclient v0.12.0-9/go.mod h1:L0R2VOaLxMylGbxgnfiZGc0hMDIIgj91cfgwGuFz9kU= github.com/lightninglabs/neutrino v0.11.0/go.mod h1:CuhF0iuzg9Sp2HO6ZgXgayviFTn1QHdSTJlMncK80wg= github.com/lightninglabs/neutrino v0.11.1-0.20201210023533-e1978372d15e/go.mod h1:KDWfQDKp+CFBxO1t2NRmWuagTY2sYIjpHB1k5vrojTI= @@ -376,6 +388,8 @@ github.com/lightninglabs/protobuf-hex-display v1.3.3-0.20191212020323-b444784ce7 github.com/lightninglabs/protobuf-hex-display v1.4.3-hex-display/go.mod h1:2oKOBU042GKFHrdbgGiKax4xVrFiZu51lhacUZQ9MnE= github.com/lightningnetwork/lightning-onion v1.0.2-0.20200501022730-3c8c8d0b89ea/go.mod h1:rigfi6Af/KqsF7Za0hOgcyq2PNH4AN70AaMRxcJkff4= github.com/lightningnetwork/lightning-onion v1.0.2-0.20210520211913-522b799e65b1 h1:h1BsjPzWea790mAXISoiT/qr0JRcixTCDNLmjsDThSw= +github.com/lightningnetwork/lightning-onion v1.0.2-0.20210520211913-522b799e65b1 h1:h1BsjPzWea790mAXISoiT/qr0JRcixTCDNLmjsDThSw= +github.com/lightningnetwork/lightning-onion v1.0.2-0.20210520211913-522b799e65b1/go.mod h1:rigfi6Af/KqsF7Za0hOgcyq2PNH4AN70AaMRxcJkff4= github.com/lightningnetwork/lightning-onion v1.0.2-0.20210520211913-522b799e65b1/go.mod h1:rigfi6Af/KqsF7Za0hOgcyq2PNH4AN70AaMRxcJkff4= github.com/lightningnetwork/lnd v0.12.0-beta/go.mod h1:2GyP1IG1kXV5Af/LOCxnXfux1OP3fAGr8zptS5PB2YI= github.com/lightningnetwork/lnd v0.13.0-beta.rc5.0.20210728112744-ebabda671786 h1:DOZ16XjuSJgmgV0jXYcagxg19fRgad3DbzpNNkWuOsk= @@ -677,6 +691,7 @@ golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4Iltr golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20191202225959-858c2ad4c8b6/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= +golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20210615190721-d04028783cf1 h1:x622Z2o4hgCr/4CiKWc51jHVKaWdtVpBNmEI8wI9Qns= golang.org/x/oauth2 v0.0.0-20210615190721-d04028783cf1/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -685,6 +700,8 @@ golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20190227155943-e225da77a7e6/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -747,6 +764,7 @@ golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fq golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= +golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.5 h1:i6eZZ+zk0SOf0xgBpEpPD18qWcJda6q1sxt3S0kzyUQ= golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= @@ -754,6 +772,7 @@ golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2/go.mod h1:tRJNPiyCQ0inRvYxb golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= +golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba h1:O8mE0/t419eoIwhTFpKVkHiTs/Igowgfkj25AcZrtiE= golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/tools v0.0.0-20180221164845-07fd8470d635/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= @@ -865,8 +884,10 @@ google.golang.org/genproto v0.0.0-20200423170343-7949de9c1215/go.mod h1:55QSHmfG google.golang.org/genproto v0.0.0-20200430143042-b979b6f78d84/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c= google.golang.org/genproto v0.0.0-20200511104702-f5ebc3bea380/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c= google.golang.org/genproto v0.0.0-20200513103714-09dca8ec2884/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c= +google.golang.org/genproto v0.0.0-20200513103714-09dca8ec2884/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c= google.golang.org/genproto v0.0.0-20200515170657-fc4c6c6a6587/go.mod h1:YsZOwe1myG/8QRHRsmBRE1LrgQY60beZKjly0O1fX9U= google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo= +google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo= google.golang.org/genproto v0.0.0-20200618031413-b414f8b61790/go.mod h1:jDfRM7FcilCzHH/e9qn6dsT145K34l5v+OpcnNgKAAA= google.golang.org/genproto v0.0.0-20200729003335-053ba62fc06f/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no= google.golang.org/genproto v0.0.0-20200804131852-c06518451d9c/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no= @@ -884,6 +905,7 @@ google.golang.org/grpc v1.24.0/go.mod h1:XDChyiUovWa60DnaeDeZmSW86xtLtjtZbwvSiRn google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY= google.golang.org/grpc v1.26.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk= google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk= +google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk= google.golang.org/grpc v1.27.1/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk= google.golang.org/grpc v1.28.0/go.mod h1:rpkK4SK4GF4Ach/+MFLZUBavHOvF2JJB5uozKKal+60= google.golang.org/grpc v1.29.1/go.mod h1:itym6AZVZYACWQqET3MqgPpjcuV5QH3BxFS3IjizoKk= @@ -892,8 +914,9 @@ google.golang.org/grpc v1.31.0/go.mod h1:N36X2cJ7JwdamYAgDz+s+rVMFjt3numwzf/HckM google.golang.org/grpc v1.33.1/go.mod h1:fr5YgcSWrqhRRxogOsw7RzIpsmvOZ6IcH4kBYTpR3n0= google.golang.org/grpc v1.36.0/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAGRRjU= google.golang.org/grpc v1.37.0/go.mod h1:NREThFqKR1f3iQ6oBuvc5LadQuXVGo9rkm5ZGrQdJfM= -google.golang.org/grpc v1.38.0 h1:/9BgsAsa5nWe26HqOlvlgJnqBuktYOLCgjCPqsa56W0= google.golang.org/grpc v1.38.0/go.mod h1:NREThFqKR1f3iQ6oBuvc5LadQuXVGo9rkm5ZGrQdJfM= +google.golang.org/grpc v1.39.0 h1:Klz8I9kdtkIN6EpHHUOMLCYhTn/2WAe5a0s1hcBkdTI= +google.golang.org/grpc v1.39.0/go.mod h1:PImNr+rS9TWYb2O4/emRugxiyHZ5JyHW5F+RPnDzfrE= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= @@ -902,11 +925,14 @@ google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzi google.golang.org/protobuf v1.22.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= +google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= google.golang.org/protobuf v1.24.0/go.mod h1:r/3tXBNzIEhYS9I1OUVjXDlt8tc493IdKGjtUeSXeh4= google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c= +google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c= google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= -google.golang.org/protobuf v1.26.0 h1:bxAC2xTBsZGibn2RTntX0oH50xLsqy1OxA9tTL3p/lk= google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= +google.golang.org/protobuf v1.27.1 h1:SnqbnDw1V7RiZcXPx5MEeqPv2s79L9i7BJUlG/+RurQ= +google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= diff --git a/hashmail_server.go b/hashmail_server.go new file mode 100644 index 0000000..02fccd0 --- /dev/null +++ b/hashmail_server.go @@ -0,0 +1,626 @@ +package aperture + +import ( + "bufio" + "context" + "fmt" + "io" + "io/ioutil" + "sync" + "time" + + "github.com/btcsuite/btcd/btcec" + "github.com/lightninglabs/lndclient" + "github.com/lightninglabs/lightning-node-connect/hashmailrpc" + "github.com/lightningnetwork/lnd/tlv" + "golang.org/x/time/rate" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +const ( + // DefaultMsgRate is the default message rate for a given mailbox that + // we'll allow. We'll allow one message every 500 milliseconds, or 2 + // messages per second. + DefaultMsgRate = time.Millisecond * 500 + + // DefaultMsgBurstAllowance is the default burst rate that we'll allow + // for messages. If a new message is about to exceed the burst rate, + // then we'll allow it up to this burst allowance. + DefaultMsgBurstAllowance = 10 +) + +// streamID is the identifier of a stream. +type streamID [64]byte + +// newStreamID creates a new stream given an ID as a byte slice. +func newStreamID(id []byte) streamID { + var s streamID + copy(s[:], id) + + return s +} + +// readStream is the read side of the read pipe, which is implemented a +// buffered wrapper around the core reader. +type readStream struct { + io.Reader + + // parentStream is a pointer to the parent stream. We keep this around + // so we can return the stream after we're done using it. + parentStream *stream + + // scratchBuf is a scratch buffer we'll use for decoding message from + // the stream. + scratchBuf [8]byte +} + +// ReadNextMsg attempts to read the next message in the stream. +// +// NOTE: This will *block* until a new message is available. +func (r *readStream) ReadNextMsg() ([]byte, error) { + // First, we'll decode the length of the next message from the stream + // so we know how many bytes we need to read. + msgLen, err := tlv.ReadVarInt(r, &r.scratchBuf) + if err != nil { + return nil, err + } + + // Now that we know the length of the message, we'll make a limit + // reader, then read all the encoded bytes until the EOF is emitted by + // the reader. + msgReader := io.LimitReader(r, int64(msgLen)) + return ioutil.ReadAll(msgReader) +} + +// ReturnStream gives up the read stream by passing it back up through the +// payment stream. +func (r *readStream) ReturnStream() { + r.parentStream.ReturnReadStream(r) +} + +// writeStream is the write side of the read pipe. The stream itself is a +// buffered I/O wrapper around the write end of the io.Writer pipe. +type writeStream struct { + io.Writer + + // parentStream is a pointer to the parent stream. We keep this around + // so we can return the stream after we're done using it. + parentStream *stream + + // scratchBuf is a scratch buffer we'll use for decoding message from + // the stream. + scratchBuf [8]byte +} + +// WriteMsg attempts to write a message to the stream so it can be read using +// the read end of the stream. +// +// NOTE: If the buffer is full, then this call will block until the reader +// consumes bytes from the other end. +func (w *writeStream) WriteMsg(ctx context.Context, msg []byte) error { + // Wait until until we have enough available event slots to write to + // the stream. This'll return an error if the referneded context has + // been cancelled. + if err := w.parentStream.limiter.Wait(ctx); err != nil { + return err + } + + // As we're writing to a stream, we need to delimit each message with a + // length prefix so the reader knows how many bytes to consume for each + // message. + // + // TODO(roasbeef): actually needs to be single write? + msgSize := uint64(len(msg)) + err := tlv.WriteVarInt( + w, msgSize, &w.scratchBuf, + ) + if err != nil { + return err + } + + // Next, we'll write the message directly to the stream. + _, err = w.Write(msg) + if err != nil { + return err + } + + return nil +} + +// ReturnStream returns the write stream back to the parent stream. +func (w *writeStream) ReturnStream() { + w.parentStream.ReturnWriteStream(w) +} + +// stream is a unique pipe implemented using a subscription server, and expose +// over gRPC. Only a single writer and reader can exist within the stream at +// any given time. +type stream struct { + sync.Mutex + + id streamID + + readStreamChan chan *readStream + writeStreamChan chan *writeStream + + // equivAuth is a method used to determine if an authentication + // mechanism to tear down a stream is equivalent to the one used to + // create it in the first place. WE use this to ensure that only the + // original creator of a stream can tear it down. + equivAuth func(auth *hashmailrpc.CipherBoxAuth) error + + tearDown func() error + + wg sync.WaitGroup + + limiter *rate.Limiter +} + +// newStream creates a new stream independent of any given stream ID. +func newStream(id streamID, + equivAuth func(auth *hashmailrpc.CipherBoxAuth) error) *stream { + + // Our stream is actually just a plain io.Pipe. This allows us to avoid + // having to do things like rate limiting, etc as we can limit the + // buffer size. In order to allow non-blocking writes (up to the buffer + // size), but blocking reads, we'll utilize a series of two pipes. + writeReadPipe, writeWritePipe := io.Pipe() + readReadPipe, readWritePipe := io.Pipe() + + s := &stream{ + readStreamChan: make(chan *readStream, 1), + writeStreamChan: make(chan *writeStream, 1), + id: id, + equivAuth: equivAuth, + limiter: rate.NewLimiter( + rate.Every(DefaultMsgRate), + DefaultMsgBurstAllowance, + ), + } + + // Our tear down function will close the write side of the pipe, which + // will cause the goroutine below to get an EOF error when reading, + // which will cause it to close the other ends of the pipe. + s.tearDown = func() error { + err := writeWritePipe.Close() + if err != nil { + return err + } + + s.wg.Wait() + return nil + } + + s.wg.Add(1) + go func() { + defer s.wg.Done() + + // Next, we'll launch a goroutine to copy over the bytes from + // the pipe the writer will write to into the pipe the reader + // will read from. + _, err := io.Copy( + readWritePipe, + // This is where the buffering will happen, as the + // writer writes to the write end of the pipe, this + // goroutine will copy the bytes into the buffer until + // its full, then attempt to write it to the write end + // of the read pipe. + bufio.NewReader(writeReadPipe), + ) + _ = readWritePipe.CloseWithError(err) + _ = writeReadPipe.CloseWithError(err) + }() + + // We'll now initialize our stream by sending the read and write ends + // to their respective holding channels. + s.readStreamChan <- &readStream{ + Reader: readReadPipe, + parentStream: s, + } + s.writeStreamChan <- &writeStream{ + Writer: writeWritePipe, + parentStream: s, + } + + return s +} + +// ReturnReadStream returns the target read stream back to its holding channel. +func (s *stream) ReturnReadStream(r *readStream) { + s.readStreamChan <- r +} + +// ReturnWriteStream returns the target write stream back to its holding +// channel. +func (s *stream) ReturnWriteStream(w *writeStream) { + s.writeStreamChan <- w +} + +// RequestReadStream attempts to request the read stream from the main backing +// stream. If we're unable to obtain it before the timeout, then an error is +// returned. +func (s *stream) RequestReadStream() (*readStream, error) { + log.Tracef("HashMailStream(%x): requesting read stream", s.id[:]) + + select { + case r := <-s.readStreamChan: + return r, nil + default: + return nil, fmt.Errorf("read stream occupied") + } +} + +// RequestWriteStream attempts to request the read stream from the main backing +// stream. If we're unable to obtain it before the timeout, then an error is +// returned. +func (s *stream) RequestWriteStream() (*writeStream, error) { + log.Tracef("HashMailStream(%x): requesting write stream", s.id[:]) + + select { + case w := <-s.writeStreamChan: + return w, nil + default: + return nil, fmt.Errorf("write stream occupied") + } +} + +// hashMailServerConfig is the main config of the mail server. +type hashMailServerConfig struct { + // IsAccountActive returns true of the passed public key belongs to an + // active non-expired account) within the system. + IsAccountActive func(context.Context, *btcec.PublicKey) bool + + // Signer is a reference to the current lnd signer client which will be + // used to verify ECDSA signatures. + Signer lndclient.SignerClient +} + +// hashMailServer is an implementation of the HashMailServer gRPC service that +// implements a simple encrypted mailbox implemented as a series of read and +// write pipes. +type hashMailServer struct { + hashmailrpc.UnimplementedHashMailServer + + sync.RWMutex + streams map[streamID]*stream + + // TODO(roasbeef): index to keep track of total stream tallies + + quit chan struct{} + + cfg hashMailServerConfig +} + +// newHashMailServer returns a new mail server instance given a valid config. +func newHashMailServer(cfg hashMailServerConfig) *hashMailServer { + return &hashMailServer{ + streams: make(map[streamID]*stream), + quit: make(chan struct{}), + cfg: cfg, + } +} + +// Stop attempts to gracefully stop the server by cancelling all pending user +// streams and any goroutines active feeding off them. +func (h *hashMailServer) Stop() { + h.Lock() + defer h.Unlock() + + for _, stream := range h.streams { + if err := stream.tearDown(); err != nil { + log.Warnf("unable to tear down stream: %v", err) + } + } + +} + +// ValidateStreamAuth attempts to validate the authentication mechanism that is +// being used to claim or revoke a stream within the mail server. +func (h *hashMailServer) ValidateStreamAuth(ctx context.Context, + init *hashmailrpc.CipherBoxAuth) error { + + // TODO(guggero): Implement auth. + if true { + return nil + } + + // TODO(roasbeef): throttle the number of streams a given + // ticket/account can have + + return nil +} + +// InitStream attempts to initialize a new stream given a valid descriptor. +func (h *hashMailServer) InitStream( + init *hashmailrpc.CipherBoxAuth) (*hashmailrpc.CipherInitResp, error) { + + h.Lock() + defer h.Unlock() + + streamID := newStreamID(init.Desc.StreamId) + + log.Debugf("Creating new HashMail Stream: %x", streamID) + + // The stream is already active, and we only allow a single session for + // a given stream to exist. + if _, ok := h.streams[streamID]; ok { + return nil, status.Error(codes.AlreadyExists, "stream "+ + "already active") + } + + // TODO(roasbeef): validate that ticket or node doesn't already have + // the same stream going + + freshStream := newStream(streamID, func(auth *hashmailrpc.CipherBoxAuth) error { + return nil + }) + + h.streams[streamID] = freshStream + + return &hashmailrpc.CipherInitResp{ + Resp: &hashmailrpc.CipherInitResp_Success{}, + }, nil +} + +// LookUpReadStream attempts to loop up a new stream. If the stream is found, then +// the stream is marked as being active. Otherwise, an error is returned. +func (h *hashMailServer) LookUpReadStream(streamID []byte) (*readStream, error) { + + h.RLock() + defer h.RUnlock() + + stream, ok := h.streams[newStreamID(streamID)] + if !ok { + return nil, fmt.Errorf("stream not found") + } + + return stream.RequestReadStream() +} + +// LookUpWriteStream attempts to loop up a new stream. If the stream is found, +// then the stream is marked as being active. Otherwise, an error is returned. +func (h *hashMailServer) LookUpWriteStream(streamID []byte) (*writeStream, error) { + + h.RLock() + defer h.RUnlock() + + stream, ok := h.streams[newStreamID(streamID)] + if !ok { + return nil, fmt.Errorf("stream not found") + } + + return stream.RequestWriteStream() +} + +// TearDownStream attempts to tear down a stream which renders both sides of +// the stream unusable and also reclaims resources. +func (h *hashMailServer) TearDownStream(ctx context.Context, streamID []byte, + auth *hashmailrpc.CipherBoxAuth) error { + + h.Lock() + defer h.Unlock() + + sid := newStreamID(streamID) + stream, ok := h.streams[sid] + if !ok { + return fmt.Errorf("stream not found") + } + + // We'll ensure that the same authentication type is used, to ensure + // only the creator can tear down a stream they created. + if err := stream.equivAuth(auth); err != nil { + return fmt.Errorf("invalid auth: %v", err) + } + + // Now that we know the auth type has matched up, we'll validate the + // authentication mechanism as normal. + if err := h.ValidateStreamAuth(ctx, auth); err != nil { + return err + } + + log.Debugf("Tearing down HashMail stream: id=%x, auth=%v", + auth.Desc.StreamId, auth.Auth) + + // At this point we know the auth was valid, so we'll tear down the + // stream. + if err := stream.tearDown(); err != nil { + return err + } + + delete(h.streams, sid) + + return nil +} + +// validateAuthReq does some basic sanity checks on incoming auth methods. +func validateAuthReq(req *hashmailrpc.CipherBoxAuth) error { + switch { + case req.Desc == nil: + return fmt.Errorf("cipher box descriptor required") + + case req.Desc.StreamId == nil: + return fmt.Errorf("stream_id required") + + case req.Auth == nil: + return fmt.Errorf("auth type required") + + default: + return nil + } +} + +// NewCipherBox attempts to create a new cipher box stream given a valid +// authentication mechanism. This call may fail if the stream is already +// active, or the authentication mechanism invalid. +func (h *hashMailServer) NewCipherBox(ctx context.Context, + init *hashmailrpc.CipherBoxAuth) (*hashmailrpc.CipherInitResp, error) { + + // Before we try to process the request, we'll do some basic user input + // validation. + if err := validateAuthReq(init); err != nil { + return nil, err + } + + log.Debugf("New HashMail stream init: id=%x, auth=%v", + init.Desc.StreamId, init.Auth) + + if err := h.ValidateStreamAuth(ctx, init); err != nil { + log.Debugf("Stream creation validation failed (id=%x): %v", + init.Desc.StreamId, err) + return nil, err + } + + resp, err := h.InitStream(init) + if err != nil { + return nil, err + } + + return resp, nil +} + +// DelCipherBox attempts to tear down an existing cipher box pipe. The same +// authentication mechanism used to initially create the stream MUST be +// specified. +func (h *hashMailServer) DelCipherBox(ctx context.Context, + auth *hashmailrpc.CipherBoxAuth) (*hashmailrpc.DelCipherBoxResp, error) { + + // Before we try to process the request, we'll do some basic user input + // validation. + if err := validateAuthReq(auth); err != nil { + return nil, err + } + + log.Debugf("New HashMail stream deletion: id=%x, auth=%v", + auth.Desc.StreamId, auth.Auth) + + if err := h.TearDownStream(ctx, auth.Desc.StreamId, auth); err != nil { + return nil, err + } + + return &hashmailrpc.DelCipherBoxResp{}, nil +} + +// SendStream implements the client streaming call to utilize the write end of +// a stream to send a message to the read end. +func (h *hashMailServer) SendStream(readStream hashmailrpc.HashMail_SendStreamServer) error { + log.Debugf("New HashMail write stream pending...") + + // We'll need to receive the first message in order to determine if + // this stream exists or not + // + // TODO(roasbeef): better way to control? + cipherBox, err := readStream.Recv() + if err != nil { + return err + } + + switch { + case cipherBox.Desc == nil: + return fmt.Errorf("cipher box descriptor required") + + case cipherBox.Desc.StreamId == nil: + return fmt.Errorf("stream_id required") + } + + log.Debugf("New HashMail write stream: id=%x", + cipherBox.Desc.StreamId) + + // Now that we have the first message, we can attempt to look up the + // given stream. + writeStream, err := h.LookUpWriteStream(cipherBox.Desc.StreamId) + if err != nil { + return err + } + + // Now that we know the stream is found, we'll make sure to mark the + // write inactive if the client hangs up on their end. + defer writeStream.ReturnStream() + + log.Tracef("Sending msg_len=%v to stream_id=%x", len(cipherBox.Msg), + cipherBox.Desc.StreamId) + + // We'll send the first message into the stream, then enter our loop + // below to continue to read from the stream and send it to the read + // end. + ctx := readStream.Context() + if err := writeStream.WriteMsg(ctx, cipherBox.Msg); err != nil { + return err + } + + for { + // Check to see if the stream has been closed or if we need to + // exit before shutting down. + select { + case <-ctx.Done(): + return nil + case <-h.quit: + return fmt.Errorf("server shutting down") + + default: + } + + cipherBox, err := readStream.Recv() + if err != nil { + return err + } + + log.Tracef("Sending msg_len=%v to stream_id=%x", + len(cipherBox.Msg), cipherBox.Desc.StreamId) + + if err := writeStream.WriteMsg(ctx, cipherBox.Msg); err != nil { + return err + } + } +} + +// RecvStream implements the read end of the stream. A single client will have +// all messages written to the opposite side of the stream written to it for +// consumption. +func (h *hashMailServer) RecvStream(desc *hashmailrpc.CipherBoxDesc, + reader hashmailrpc.HashMail_RecvStreamServer) error { + + // First, we'll attempt to locate the stream. We allow any single + // entity that knows of the full stream ID to access the read end. + readStream, err := h.LookUpReadStream(desc.StreamId) + if err != nil { + return err + } + + log.Debugf("New HashMail read stream: id=%x", desc.StreamId) + + // If the reader hangs up, then we'll mark the stream as inactive so + // another can take its place. + defer readStream.ReturnStream() + + for { + // Check to see if the stream has been closed or if we need to + // exit before shutting down. + select { + case <-reader.Context().Done(): + return nil + case <-h.quit: + return fmt.Errorf("server shutting down") + + default: + } + + nextMsg, err := readStream.ReadNextMsg() + if err != nil { + return err + } + + log.Tracef("Read %v bytes for HashMail stream_id=%x", + len(nextMsg), desc.StreamId) + + err = reader.Send(&hashmailrpc.CipherBox{ + Desc: desc, + Msg: nextMsg, + }) + if err != nil { + return err + } + } +} + +var _ hashmailrpc.HashMailServer = (*hashMailServer)(nil) From c45cd3a317db03000d1fc85c51cb19266d99b67c Mon Sep 17 00:00:00 2001 From: Oliver Gugger Date: Mon, 22 Nov 2021 16:30:49 +0100 Subject: [PATCH 2/5] proxy+aperture: refactor to local service We want aperture to handle some of the incoming requests on its own, without forwarding/proxying them to a remote backend. Those "local" services can register themselves and will be given every request for inspection. If a service decides to handle it locally, the request is passed to that service and not forwarded. --- aperture.go | 25 ++++++++++++++-- proxy/proxy.go | 79 ++++++++++++++++++++++++++++++++++++-------------- 2 files changed, 80 insertions(+), 24 deletions(-) diff --git a/aperture.go b/aperture.go index 58a6f66..6725ea3 100644 --- a/aperture.go +++ b/aperture.go @@ -9,6 +9,7 @@ import ( "net/http" "os" "path/filepath" + "strings" "sync" "time" @@ -592,9 +593,27 @@ func createProxy(cfg *Config, challenger *LndChallenger, ServiceLimiter: newStaticServiceLimiter(cfg.Services), }) authenticator := auth.NewLsatAuthenticator(minter, challenger) - return proxy.New( - authenticator, cfg.Services, cfg.ServeStatic, cfg.StaticRoot, - ) + + // By default the static file server only returns 404 answers for + // security reasons. Serving files from the staticRoot directory has to + // be enabled intentionally. + staticServer := http.NotFoundHandler() + if cfg.ServeStatic { + if len(strings.TrimSpace(cfg.StaticRoot)) == 0 { + return nil, fmt.Errorf("staticroot cannot be empty, " + + "must contain path to directory that " + + "contains index.html") + } + staticServer = http.FileServer(http.Dir(cfg.StaticRoot)) + } + + localServices := []proxy.LocalService{ + proxy.NewLocalService(staticServer, func(r *http.Request) bool { + return true + }), + } + + return proxy.New(authenticator, cfg.Services, localServices...) } // cleanup closes the given server and shuts down the log rotator. diff --git a/proxy/proxy.go b/proxy/proxy.go index 912c0ad..41926ef 100644 --- a/proxy/proxy.go +++ b/proxy/proxy.go @@ -29,13 +29,48 @@ const ( hdrTypeGrpc = "application/grpc" ) +// LocalService is an interface that describes a service that is handled +// internally by aperture and is not proxied to another backend. +type LocalService interface { + http.Handler + + // IsHandling returns true if the local service is handling the given + // request. If one of the local services returns true on this method + // then a request is not forwarded/proxied to any of the remote + // backends. + IsHandling(r *http.Request) bool +} + +// localService is a struct that represents a service that is local to aperture +// and is not proxied to a remote backend. +type localService struct { + handler http.Handler + isHandling func(r *http.Request) bool +} + +// NewLocalService creates a new local service. +func NewLocalService(h http.Handler, f func(r *http.Request) bool) LocalService { + return &localService{handler: h, isHandling: f} +} + +// ServeHTTP is the http.Handler implementation. +func (l *localService) ServeHTTP(rw http.ResponseWriter, r *http.Request) { + l.handler.ServeHTTP(rw, r) +} + +// IsHandling returns true if the local service is handling the given +// request. +func (l *localService) IsHandling(r *http.Request) bool { + return l.isHandling(r) +} + // Proxy is a HTTP, HTTP/2 and gRPC handler that takes an incoming request, // uses its authenticator to validate the request's headers, and either returns // a challenge to the client or forwards the request to another server and // proxies the response back to the client. type Proxy struct { proxyBackend *httputil.ReverseProxy - staticServer http.Handler + localServices []LocalService authenticator auth.Authenticator services []*Service } @@ -43,24 +78,11 @@ type Proxy struct { // New returns a new Proxy instance that proxies between the services specified, // using the auth to validate each request's headers and get new challenge // headers if necessary. -func New(auth auth.Authenticator, services []*Service, serveStatic bool, - staticRoot string) (*Proxy, error) { - - // By default the static file server only returns 404 answers for - // security reasons. Serving files from the staticRoot directory has to - // be enabled intentionally. - staticServer := http.NotFoundHandler() - if serveStatic { - if len(strings.TrimSpace(staticRoot)) == 0 { - return nil, fmt.Errorf("staticroot cannot be empty, " + - "must contain path to directory that " + - "contains index.html") - } - staticServer = http.FileServer(http.Dir(staticRoot)) - } +func New(auth auth.Authenticator, services []*Service, + localServices ...LocalService) (*Proxy, error) { proxy := &Proxy{ - staticServer: staticServer, + localServices: localServices, authenticator: auth, services: services, } @@ -98,9 +120,24 @@ func (p *Proxy) ServeHTTP(w http.ResponseWriter, r *http.Request) { // will return a 404 for us. target, ok := matchService(r, p.services) if !ok { - prefixLog.Debugf("Dispatching request %s to static file "+ - "server.", r.URL.Path) - p.staticServer.ServeHTTP(w, r) + // This isn't a request for any configured remote backend that + // we are proxying for. So we give it to the local service that + // claims is responsible for it. + for _, ls := range p.localServices { + if ls.IsHandling(r) { + prefixLog.Debugf("Dispatching request %s to "+ + "local service.", r.URL.Path) + ls.ServeHTTP(w, r) + return + } + } + + // If we get here, something is quite wrong. At least the static + // file server should have picked up the request and serve a + // 404 response. So nothing we can do here except returning an + // error. + addCorsHeaders(w.Header()) + sendDirectResponse(w, r, http.StatusInternalServerError, "") return } @@ -337,7 +374,7 @@ func matchService(req *http.Request, services []*Service) (*Service, bool) { service.Address) return service, true } - log.Errorf("No backend service matched request [%s%s].", req.Host, + log.Debugf("No backend service matched request [%s%s].", req.Host, req.URL.Path) return nil, false } From 7bcc8355d0c1faebebcd1a7fdf3b58cc9c615e46 Mon Sep 17 00:00:00 2001 From: Oliver Gugger Date: Mon, 22 Nov 2021 16:30:50 +0100 Subject: [PATCH 3/5] multi: configure and start hashmail server With this commit we make it possible to enable the Lightning Node Connect mailbox server to be enabled and started as a local service within aperture. --- aperture.go | 192 +++++++++++++++++++++++++++++++++++++++++--- config.go | 11 +++ go.mod | 2 + go.sum | 4 + hashmail_server.go | 42 +++++----- proxy/proxy_test.go | 4 +- sample-conf.yaml | 7 ++ 7 files changed, 233 insertions(+), 29 deletions(-) diff --git a/aperture.go b/aperture.go index 6725ea3..fa86f0f 100644 --- a/aperture.go +++ b/aperture.go @@ -1,6 +1,7 @@ package aperture import ( + "context" "crypto/tls" "errors" "fmt" @@ -9,14 +10,17 @@ import ( "net/http" "os" "path/filepath" + "regexp" "strings" "sync" "time" + gateway "github.com/grpc-ecosystem/grpc-gateway/v2/runtime" flags "github.com/jessevdk/go-flags" "github.com/lightninglabs/aperture/auth" "github.com/lightninglabs/aperture/mint" "github.com/lightninglabs/aperture/proxy" + "github.com/lightninglabs/lightning-node-connect/hashmailrpc" "github.com/lightningnetwork/lnd" "github.com/lightningnetwork/lnd/build" "github.com/lightningnetwork/lnd/cert" @@ -27,6 +31,9 @@ import ( "golang.org/x/crypto/acme/autocert" "golang.org/x/net/http2" "golang.org/x/net/http2/h2c" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials" + "google.golang.org/protobuf/encoding/protojson" "gopkg.in/yaml.v2" ) @@ -55,6 +62,14 @@ const ( // the certificate validity length to make the chances bigger for it to // be refreshed on a routine server restart. selfSignedCertExpiryMargin = selfSignedCertValidity / 2 + + // hashMailGRPCPrefix is the prefix a gRPC request URI has when it is + // meant for the hashmailrpc server to be handled. + hashMailGRPCPrefix = "/hashmailrpc.HashMail/" + + // hashMailRESTPrefix is the prefix a REST request URI has when it is + // meant for the hashmailrpc server to be handled. + hashMailRESTPrefix = "/v1/lightning-node-connect/hashmail" ) var ( @@ -69,6 +84,12 @@ var ( tls.TLS_ECDHE_ECDSA_WITH_CHACHA20_POLY1305, tls.TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA256, } + + // clientStreamingURIs is the list of REST URIs that are + // client-streaming and shouldn't be closed after a single message. + clientStreamingURIs = []*regexp.Regexp{ + regexp.MustCompile("^/v1/lightning-node-connect/hashmail/send$"), + } ) // Main is the true entrypoint of Aperture. @@ -139,6 +160,7 @@ type Aperture struct { httpsServer *http.Server torHTTPServer *http.Server proxy *proxy.Proxy + proxyCleanup func() wg sync.WaitGroup quit chan struct{} @@ -190,7 +212,9 @@ func (a *Aperture) Start(errChan chan error) error { } // Create the proxy and connect it to lnd. - a.proxy, err = createProxy(a.cfg, a.challenger, a.etcdClient) + a.proxy, a.proxyCleanup, err = createProxy( + a.cfg, a.challenger, a.etcdClient, + ) if err != nil { return err } @@ -288,6 +312,12 @@ func (a *Aperture) Stop() error { a.challenger.Stop() } + // Stop everything that was started alongside the proxy, for example the + // gRPC and REST servers. + if a.proxyCleanup != nil { + a.proxyCleanup() + } + // Shut down our client and server connections now. This should cause // the first goroutine to quit. cleanup(a.etcdClient, a.httpsServer, a.proxy) @@ -585,7 +615,7 @@ func initTorListener(cfg *Config, etcd *clientv3.Client) (*tor.Controller, error // createProxy creates the proxy with all the services it needs. func createProxy(cfg *Config, challenger *LndChallenger, - etcdClient *clientv3.Client) (*proxy.Proxy, error) { + etcdClient *clientv3.Client) (*proxy.Proxy, func(), error) { minter := mint.New(&mint.Config{ Challenger: challenger, @@ -600,20 +630,112 @@ func createProxy(cfg *Config, challenger *LndChallenger, staticServer := http.NotFoundHandler() if cfg.ServeStatic { if len(strings.TrimSpace(cfg.StaticRoot)) == 0 { - return nil, fmt.Errorf("staticroot cannot be empty, " + - "must contain path to directory that " + + return nil, nil, fmt.Errorf("staticroot cannot be " + + "empty, must contain path to directory that " + "contains index.html") } staticServer = http.FileServer(http.Dir(cfg.StaticRoot)) } - localServices := []proxy.LocalService{ - proxy.NewLocalService(staticServer, func(r *http.Request) bool { - return true - }), + var ( + localServices []proxy.LocalService + proxyCleanup = func() {} + ) + + if cfg.HashMail.Enabled { + hashMailServices, cleanup, err := createHashMailServer(cfg) + if err != nil { + return nil, nil, err + } + + localServices = append(localServices, hashMailServices...) + proxyCleanup = cleanup } - return proxy.New(authenticator, cfg.Services, localServices...) + // The static file server must be last since it will match all calls + // that make it to it. + localServices = append(localServices, proxy.NewLocalService( + staticServer, func(r *http.Request) bool { + return true + }, + )) + + prxy, err := proxy.New(authenticator, cfg.Services, localServices...) + return prxy, proxyCleanup, err +} + +// createHashMailServer creates the gRPC server for the hash mail message +// gateway and an additional REST and WebSocket capable proxy for that gRPC +// server. +func createHashMailServer(cfg *Config) ([]proxy.LocalService, func(), error) { + var localServices []proxy.LocalService + + // Create a gRPC server for the hashmail server. + hashMailServer := newHashMailServer(hashMailServerConfig{ + msgRate: cfg.HashMail.MessageRate, + msgBurstAllowance: cfg.HashMail.MessageBurstAllowance, + }) + hashMailGRPC := grpc.NewServer() + hashmailrpc.RegisterHashMailServer(hashMailGRPC, hashMailServer) + localServices = append(localServices, proxy.NewLocalService( + hashMailGRPC, func(r *http.Request) bool { + return strings.HasPrefix(r.URL.Path, hashMailGRPCPrefix) + }), + ) + + // And a REST proxy for it as well. + // The default JSON marshaler of the REST proxy only sets OrigName to + // true, which instructs it to use the same field names as specified in + // the proto file and not switch to camel case. What we also want is + // that the marshaler prints all values, even if they are falsey. + customMarshalerOption := gateway.WithMarshalerOption( + gateway.MIMEWildcard, &gateway.JSONPb{ + MarshalOptions: protojson.MarshalOptions{ + UseProtoNames: true, + EmitUnpopulated: true, + }, + }, + ) + + // We'll also create and start an accompanying proxy to serve clients + // through REST. + ctxc, cancel := context.WithCancel(context.Background()) + proxyCleanup := func() { + hashMailServer.Stop() + cancel() + } + + mux := gateway.NewServeMux(customMarshalerOption) + err := hashmailrpc.RegisterHashMailHandlerFromEndpoint( + ctxc, mux, cfg.ListenAddr, []grpc.DialOption{ + grpc.WithTransportCredentials(credentials.NewTLS( + &tls.Config{InsecureSkipVerify: true}, + )), + }, + ) + if err != nil { + proxyCleanup() + + return nil, nil, err + } + + // Wrap the default grpc-gateway handler with the WebSocket handler. + restHandler := lnrpc.NewWebSocketProxy( + mux, log, time.Second*30, time.Second*5, + clientStreamingURIs, + ) + + // Create our proxy chain now. A request will pass + // through the following chain: + // req ---> CORS handler --> WS proxy ---> REST proxy --> gRPC endpoint + corsHandler := allowCORS(restHandler, []string{"*"}) + localServices = append(localServices, proxy.NewLocalService( + corsHandler, func(r *http.Request) bool { + return strings.HasPrefix(r.URL.Path, hashMailRESTPrefix) + }, + )) + + return localServices, proxyCleanup, nil } // cleanup closes the given server and shuts down the log rotator. @@ -634,3 +756,55 @@ func cleanup(etcdClient io.Closer, server io.Closer, proxy io.Closer) { log.Errorf("Could not close log rotator: %v", err) } } + +// allowCORS wraps the given http.Handler with a function that adds the +// Access-Control-Allow-Origin header to the response. +func allowCORS(handler http.Handler, origins []string) http.Handler { + allowHeaders := "Access-Control-Allow-Headers" + allowMethods := "Access-Control-Allow-Methods" + allowOrigin := "Access-Control-Allow-Origin" + + // If the user didn't supply any origins that means CORS is disabled + // and we should return the original handler. + if len(origins) == 0 { + return handler + } + + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + origin := r.Header.Get("Origin") + + // Skip everything if the browser doesn't send the Origin field. + if origin == "" { + handler.ServeHTTP(w, r) + return + } + + // Set the static header fields first. + w.Header().Set( + allowHeaders, + "Content-Type, Accept, Grpc-Metadata-Macaroon", + ) + w.Header().Set(allowMethods, "GET, POST, DELETE") + + // Either we allow all origins or the incoming request matches + // a specific origin in our list of allowed origins. + for _, allowedOrigin := range origins { + if allowedOrigin == "*" || origin == allowedOrigin { + // Only set allowed origin to requested origin. + w.Header().Set(allowOrigin, origin) + + break + } + } + + // For a pre-flight request we only need to send the headers + // back. No need to call the rest of the chain. + if r.Method == "OPTIONS" { + return + } + + // Everything's prepared now, we can pass the request along the + // chain of handlers. + handler.ServeHTTP(w, r) + }) +} diff --git a/config.go b/config.go index b7ad35b..5dd5de1 100644 --- a/config.go +++ b/config.go @@ -3,6 +3,7 @@ package aperture import ( "errors" "fmt" + "time" "github.com/btcsuite/btcutil" "github.com/lightninglabs/aperture/proxy" @@ -59,6 +60,12 @@ func (a *AuthConfig) validate() error { return nil } +type HashMailConfig struct { + Enabled bool `long:"enabled"` + MessageRate time.Duration `long:"messagerate" description:"The average minimum time that should pass between each message."` + MessageBurstAllowance int `long:"messageburstallowance" description:"The burst rate we allow for messages."` +} + type TorConfig struct { Control string `long:"control" description:"The host:port of the Tor instance."` ListenPort uint16 `long:"listenport" description:"The port we should listen on for client requests over Tor. Note that this port should not be exposed to the outside world, it is only intended to be reached by clients through the onion service."` @@ -101,6 +108,10 @@ type Config struct { // each backend service to Aperture. Services []*proxy.Service `long:"service" description:"Configurations for each Aperture backend service."` + // HashMail is the configuration section for configuring the Lightning + // Node Connect mailbox server. + HashMail *HashMailConfig `long:"hashmail" description:"Configuration for the Lightning Node Connect mailbox server."` + // DebugLevel is a string defining the log level for the service either // for all subsystems the same or individual level by subsystem. DebugLevel string `long:"debuglevel" description:"Debug level for the Aperture application and its subsystems."` diff --git a/go.mod b/go.mod index 3ca9d72..2c3e38e 100644 --- a/go.mod +++ b/go.mod @@ -9,6 +9,7 @@ require ( github.com/btcsuite/btcwallet/wtxmgr v1.3.1-0.20210706234807-aaf03fee735a github.com/fortytw2/leaktest v1.3.0 github.com/golang/protobuf v1.5.2 + github.com/grpc-ecosystem/grpc-gateway/v2 v2.5.0 github.com/jessevdk/go-flags v1.4.0 github.com/lightninglabs/lightning-node-connect/hashmailrpc v1.0.2 github.com/lightninglabs/lndclient v0.12.0-9 @@ -21,6 +22,7 @@ require ( golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4 golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba google.golang.org/grpc v1.39.0 + google.golang.org/protobuf v1.27.1 gopkg.in/macaroon.v2 v2.1.0 gopkg.in/yaml.v2 v2.4.0 ) diff --git a/go.sum b/go.sum index 4540307..c79c9cd 100644 --- a/go.sum +++ b/go.sum @@ -752,6 +752,7 @@ golang.org/x/sys v0.0.0-20200803210538-64077c9b5642/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20200923182605-d9f96fdee20d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210403161142-5e06dd20ab57/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -760,6 +761,7 @@ golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40 h1:JWgyZ1qgdTaF3N3oxC+MdTV7qvEEgHo3otj+HB5CM7Q= golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= +golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= @@ -965,6 +967,8 @@ gopkg.in/yaml.v2 v2.2.5/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= +gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= +gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b h1:h8qDotaEPuJATrMmW04NCwg7v22aHH28wwpauUhK9Oo= diff --git a/hashmail_server.go b/hashmail_server.go index 02fccd0..18ce3a4 100644 --- a/hashmail_server.go +++ b/hashmail_server.go @@ -9,8 +9,6 @@ import ( "sync" "time" - "github.com/btcsuite/btcd/btcec" - "github.com/lightninglabs/lndclient" "github.com/lightninglabs/lightning-node-connect/hashmailrpc" "github.com/lightningnetwork/lnd/tlv" "golang.org/x/time/rate" @@ -76,6 +74,7 @@ func (r *readStream) ReadNextMsg() ([]byte, error) { // ReturnStream gives up the read stream by passing it back up through the // payment stream. func (r *readStream) ReturnStream() { + log.Debugf("Returning read stream %x", r.parentStream.id[:]) r.parentStream.ReturnReadStream(r) } @@ -158,7 +157,7 @@ type stream struct { } // newStream creates a new stream independent of any given stream ID. -func newStream(id streamID, +func newStream(id streamID, limiter *rate.Limiter, equivAuth func(auth *hashmailrpc.CipherBoxAuth) error) *stream { // Our stream is actually just a plain io.Pipe. This allows us to avoid @@ -173,10 +172,7 @@ func newStream(id streamID, writeStreamChan: make(chan *writeStream, 1), id: id, equivAuth: equivAuth, - limiter: rate.NewLimiter( - rate.Every(DefaultMsgRate), - DefaultMsgBurstAllowance, - ), + limiter: limiter, } // Our tear down function will close the write side of the pipe, which @@ -187,7 +183,6 @@ func newStream(id streamID, if err != nil { return err } - s.wg.Wait() return nil } @@ -267,13 +262,8 @@ func (s *stream) RequestWriteStream() (*writeStream, error) { // hashMailServerConfig is the main config of the mail server. type hashMailServerConfig struct { - // IsAccountActive returns true of the passed public key belongs to an - // active non-expired account) within the system. - IsAccountActive func(context.Context, *btcec.PublicKey) bool - - // Signer is a reference to the current lnd signer client which will be - // used to verify ECDSA signatures. - Signer lndclient.SignerClient + msgRate time.Duration + msgBurstAllowance int } // hashMailServer is an implementation of the HashMailServer gRPC service that @@ -294,6 +284,13 @@ type hashMailServer struct { // newHashMailServer returns a new mail server instance given a valid config. func newHashMailServer(cfg hashMailServerConfig) *hashMailServer { + if cfg.msgRate == 0 { + cfg.msgRate = DefaultMsgRate + } + if cfg.msgBurstAllowance == 0 { + cfg.msgBurstAllowance = DefaultMsgBurstAllowance + } + return &hashMailServer{ streams: make(map[streamID]*stream), quit: make(chan struct{}), @@ -352,9 +349,14 @@ func (h *hashMailServer) InitStream( // TODO(roasbeef): validate that ticket or node doesn't already have // the same stream going - freshStream := newStream(streamID, func(auth *hashmailrpc.CipherBoxAuth) error { - return nil - }) + limiter := rate.NewLimiter( + rate.Every(h.cfg.msgRate), h.cfg.msgBurstAllowance, + ) + freshStream := newStream( + streamID, limiter, func(auth *hashmailrpc.CipherBoxAuth) error { + return nil + }, + ) h.streams[streamID] = freshStream @@ -598,6 +600,7 @@ func (h *hashMailServer) RecvStream(desc *hashmailrpc.CipherBoxDesc, // exit before shutting down. select { case <-reader.Context().Done(): + log.Debugf("Read stream context done.") return nil case <-h.quit: return fmt.Errorf("server shutting down") @@ -607,6 +610,7 @@ func (h *hashMailServer) RecvStream(desc *hashmailrpc.CipherBoxDesc, nextMsg, err := readStream.ReadNextMsg() if err != nil { + log.Debugf("Got error an read stream read: %v", err) return err } @@ -618,6 +622,8 @@ func (h *hashMailServer) RecvStream(desc *hashmailrpc.CipherBoxDesc, Msg: nextMsg, }) if err != nil { + log.Debugf("Got error when sending on read stream: %v", + err) return err } } diff --git a/proxy/proxy_test.go b/proxy/proxy_test.go index e731451..14239c1 100644 --- a/proxy/proxy_test.go +++ b/proxy/proxy_test.go @@ -115,7 +115,7 @@ func runHTTPTest(t *testing.T, tc *testCase) { }} mockAuth := auth.NewMockAuthenticator() - p, err := proxy.New(mockAuth, services, true, "static") + p, err := proxy.New(mockAuth, services) require.NoError(t, err) // Start server that gives requests to the proxy. @@ -264,7 +264,7 @@ func runGRPCTest(t *testing.T, tc *testCase) { // Create the proxy server and start serving on TLS. mockAuth := auth.NewMockAuthenticator() - p, err := proxy.New(mockAuth, services, true, "static") + p, err := proxy.New(mockAuth, services) require.NoError(t, err) server := &http.Server{ Addr: testProxyAddr, diff --git a/sample-conf.yaml b/sample-conf.yaml index 7ee01ff..10b9bb6 100644 --- a/sample-conf.yaml +++ b/sample-conf.yaml @@ -144,3 +144,10 @@ tor: # Whether a v3 onion service should be created to handle requests. v3: false + +# Enable the Lightning Node Connect hashmail server, allowing up to 1k messages +# per burst and a new message every 20 milliseconds. +hashmail: + enabled: true + messagerate: 20ms + messageburstallowance: 1000 From 8d37d8a3d91dc717987bcfc431af4119619a98bb Mon Sep 17 00:00:00 2001 From: Oliver Gugger Date: Mon, 22 Nov 2021 16:30:52 +0100 Subject: [PATCH 4/5] hashmail_server: fix blocking reads With this commit we fix a bug in the hashmail server that didn't return a read stream properly if it was closed from the client side. Co-authored-by: Elle Mouton --- hashmail_server.go | 84 +++++++++++++----- hashmail_server_test.go | 192 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 254 insertions(+), 22 deletions(-) create mode 100644 hashmail_server_test.go diff --git a/hashmail_server.go b/hashmail_server.go index 18ce3a4..7ec0301 100644 --- a/hashmail_server.go +++ b/hashmail_server.go @@ -1,7 +1,7 @@ package aperture import ( - "bufio" + "bytes" "context" "fmt" "io" @@ -26,6 +26,10 @@ const ( // for messages. If a new message is about to exceed the burst rate, // then we'll allow it up to this burst allowance. DefaultMsgBurstAllowance = 10 + + // DefaultBufSize is the default number of bytes that are read in a + // single operation. + DefaultBufSize = 4096 ) // streamID is the identifier of a stream. @@ -42,8 +46,6 @@ func newStreamID(id []byte) streamID { // readStream is the read side of the read pipe, which is implemented a // buffered wrapper around the core reader. type readStream struct { - io.Reader - // parentStream is a pointer to the parent stream. We keep this around // so we can return the stream after we're done using it. parentStream *stream @@ -56,10 +58,22 @@ type readStream struct { // ReadNextMsg attempts to read the next message in the stream. // // NOTE: This will *block* until a new message is available. -func (r *readStream) ReadNextMsg() ([]byte, error) { +func (r *readStream) ReadNextMsg(ctx context.Context) ([]byte, error) { + var reader io.Reader + select { + case b := <-r.parentStream.readBytesChan: + reader = bytes.NewReader(b) + + case <-ctx.Done(): + return nil, ctx.Err() + + case err := <-r.parentStream.readErrChan: + return nil, err + } + // First, we'll decode the length of the next message from the stream // so we know how many bytes we need to read. - msgLen, err := tlv.ReadVarInt(r, &r.scratchBuf) + msgLen, err := tlv.ReadVarInt(reader, &r.scratchBuf) if err != nil { return nil, err } @@ -67,7 +81,7 @@ func (r *readStream) ReadNextMsg() ([]byte, error) { // Now that we know the length of the message, we'll make a limit // reader, then read all the encoded bytes until the EOF is emitted by // the reader. - msgReader := io.LimitReader(r, int64(msgLen)) + msgReader := io.LimitReader(reader, int64(msgLen)) return ioutil.ReadAll(msgReader) } @@ -108,19 +122,18 @@ func (w *writeStream) WriteMsg(ctx context.Context, msg []byte) error { // As we're writing to a stream, we need to delimit each message with a // length prefix so the reader knows how many bytes to consume for each // message. - // - // TODO(roasbeef): actually needs to be single write? + var buf bytes.Buffer msgSize := uint64(len(msg)) - err := tlv.WriteVarInt( - w, msgSize, &w.scratchBuf, - ) - if err != nil { + if err := tlv.WriteVarInt(&buf, msgSize, &w.scratchBuf); err != nil { return err } // Next, we'll write the message directly to the stream. - _, err = w.Write(msg) - if err != nil { + if _, err := buf.Write(msg); err != nil { + return err + } + + if _, err := w.Write(buf.Bytes()); err != nil { return err } @@ -143,6 +156,10 @@ type stream struct { readStreamChan chan *readStream writeStreamChan chan *writeStream + readBytesChan chan []byte + readErrChan chan error + quit chan struct{} + // equivAuth is a method used to determine if an authentication // mechanism to tear down a stream is equivalent to the one used to // create it in the first place. WE use this to ensure that only the @@ -173,6 +190,9 @@ func newStream(id streamID, limiter *rate.Limiter, id: id, equivAuth: equivAuth, limiter: limiter, + readBytesChan: make(chan []byte), + readErrChan: make(chan error, 1), + quit: make(chan struct{}), } // Our tear down function will close the write side of the pipe, which @@ -183,6 +203,7 @@ func newStream(id streamID, limiter *rate.Limiter, if err != nil { return err } + close(s.quit) s.wg.Wait() return nil } @@ -196,21 +217,37 @@ func newStream(id streamID, limiter *rate.Limiter, // will read from. _, err := io.Copy( readWritePipe, - // This is where the buffering will happen, as the - // writer writes to the write end of the pipe, this - // goroutine will copy the bytes into the buffer until - // its full, then attempt to write it to the write end - // of the read pipe. - bufio.NewReader(writeReadPipe), + writeReadPipe, ) _ = readWritePipe.CloseWithError(err) _ = writeReadPipe.CloseWithError(err) }() + s.wg.Add(1) + go func() { + defer s.wg.Done() + + var buf [DefaultBufSize]byte + for { + numBytes, err := readReadPipe.Read(buf[:]) + if err != nil { + s.readErrChan <- err + return + } + + c := make([]byte, numBytes) + copy(c, buf[0:numBytes]) + + select { + case s.readBytesChan <- c: + case <-s.quit: + } + } + }() + // We'll now initialize our stream by sending the read and write ends // to their respective holding channels. s.readStreamChan <- &readStream{ - Reader: readReadPipe, parentStream: s, } s.writeStreamChan <- &writeStream{ @@ -555,6 +592,7 @@ func (h *hashMailServer) SendStream(readStream hashmailrpc.HashMail_SendStreamSe // exit before shutting down. select { case <-ctx.Done(): + log.Debugf("SendStream: Context done, exiting") return nil case <-h.quit: return fmt.Errorf("server shutting down") @@ -564,6 +602,8 @@ func (h *hashMailServer) SendStream(readStream hashmailrpc.HashMail_SendStreamSe cipherBox, err := readStream.Recv() if err != nil { + log.Debugf("SendStream: Exiting write stream RPC "+ + "stream read: %v", err) return err } @@ -608,7 +648,7 @@ func (h *hashMailServer) RecvStream(desc *hashmailrpc.CipherBoxDesc, default: } - nextMsg, err := readStream.ReadNextMsg() + nextMsg, err := readStream.ReadNextMsg(reader.Context()) if err != nil { log.Debugf("Got error an read stream read: %v", err) return err diff --git a/hashmail_server_test.go b/hashmail_server_test.go new file mode 100644 index 0000000..f49dbfd --- /dev/null +++ b/hashmail_server_test.go @@ -0,0 +1,192 @@ +package aperture + +import ( + "context" + "fmt" + "math" + "net/http" + "testing" + "time" + + "github.com/lightninglabs/lightning-node-connect/hashmailrpc" + "github.com/lightningnetwork/lnd/build" + "github.com/lightningnetwork/lnd/lntest/wait" + "github.com/lightningnetwork/lnd/signal" + "github.com/stretchr/testify/require" + "google.golang.org/grpc" +) + +var ( + testApertureAddress = "localhost:8082" + testSID = streamID{1, 2, 3} + testStreamDesc = &hashmailrpc.CipherBoxDesc{ + StreamId: testSID[:], + } + testMessage = []byte("I'm a message!") + apertureStartTimeout = 3 * time.Second +) + +func TestHashMailServerReturnStream(t *testing.T) { + ctxb := context.Background() + + setupAperture(t) + + // Create a client and connect it to the server. + conn, err := grpc.Dial(testApertureAddress, grpc.WithInsecure()) + require.NoError(t, err) + client := hashmailrpc.NewHashMailClient(conn) + + // We'll create a new cipher box that we're going to subscribe to + // multiple times to check disconnecting returns the read stream. + resp, err := client.NewCipherBox(ctxb, &hashmailrpc.CipherBoxAuth{ + Auth: &hashmailrpc.CipherBoxAuth_LndAuth{}, + Desc: testStreamDesc, + }) + require.NoError(t, err) + require.NotNil(t, resp.GetSuccess()) + + // First we make sure there is something to read on the other end of + // that stream by writing something to it. + sendCtx, sendCancel := context.WithCancel(context.Background()) + defer sendCancel() + + writeStream, err := client.SendStream(sendCtx) + require.NoError(t, err) + err = writeStream.Send(&hashmailrpc.CipherBox{ + Desc: testStreamDesc, + Msg: testMessage, + }) + require.NoError(t, err) + + // We need to wait a bit to make sure the message is really sent. + time.Sleep(100 * time.Millisecond) + + // Connect, wait for the stream to be ready, read something, then + // disconnect immediately. + msg, err := readMsgFromStream(t, client) + require.NoError(t, err) + require.Equal(t, testMessage, msg.Msg) + + // Make sure we can connect again immediately and try to read something. + // There is no message to read before we cancel the request so we expect + // an EOF error to be returned upon connection close/context cancel. + _, err = readMsgFromStream(t, client) + require.Error(t, err) + require.Contains(t, err.Error(), "context canceled") + + // Send then receive yet another message to make sure the stream is + // still operational. + testMessage2 := append(testMessage, []byte("test")...) + err = writeStream.Send(&hashmailrpc.CipherBox{ + Desc: testStreamDesc, + Msg: testMessage2, + }) + require.NoError(t, err) + + // We need to wait a bit to make sure the message is really sent. + time.Sleep(100 * time.Millisecond) + + msg, err = readMsgFromStream(t, client) + require.NoError(t, err) + require.Equal(t, testMessage2, msg.Msg) +} + +func setupAperture(t *testing.T) { + logWriter := build.NewRotatingLogWriter() + interceptor, err := signal.Intercept() + require.NoError(t, err) + + SetupLoggers(logWriter, interceptor) + + err = build.ParseAndSetDebugLevels("trace,PRXY=warn", logWriter) + require.NoError(t, err) + + apertureCfg := &Config{ + Insecure: true, + ListenAddr: testApertureAddress, + Authenticator: &AuthConfig{ + Disable: true, + }, + Etcd: &EtcdConfig{}, + HashMail: &HashMailConfig{ + Enabled: true, + MessageRate: time.Millisecond, + MessageBurstAllowance: math.MaxUint32, + }, + } + aperture := NewAperture(apertureCfg) + errChan := make(chan error) + require.NoError(t, aperture.Start(errChan)) + + // Any error while starting? + select { + case err := <-errChan: + t.Fatalf("error starting aperture: %v", err) + default: + } + + err = wait.NoError(func() error { + apertureAddr := fmt.Sprintf("http://%s/dummy", + testApertureAddress) + + resp, err := http.Get(apertureAddr) + if err != nil { + return err + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusNotFound { + return fmt.Errorf("invalid status: %d", resp.StatusCode) + } + + return nil + }, apertureStartTimeout) + require.NoError(t, err) +} + +func readMsgFromStream(t *testing.T, + client hashmailrpc.HashMailClient) (*hashmailrpc.CipherBox, error) { + + ctxc, cancel := context.WithCancel(context.Background()) + readStream, err := client.RecvStream(ctxc, testStreamDesc) + require.NoError(t, err) + + // Wait a bit again to make sure the request is actually sent before our + // context is canceled already again. + time.Sleep(100 * time.Millisecond) + + // We'll start a read on the stream in the background. + var ( + goroutineStarted = make(chan struct{}) + resultChan = make(chan *hashmailrpc.CipherBox) + errChan = make(chan error) + ) + go func() { + close(goroutineStarted) + box, err := readStream.Recv() + if err != nil { + errChan <- err + return + } + resultChan <- box + }() + + // Give the goroutine a chance to actually run, so block the main thread + // until it did. + <-goroutineStarted + + time.Sleep(200 * time.Millisecond) + + // Now close and cancel the stream to make sure the server can clean it + // up and release it. + require.NoError(t, readStream.CloseSend()) + cancel() + + // Interpret the result. + select { + case err := <-errChan: + return nil, err + + case box := <-resultChan: + return box, nil + } +} From 7c7b1145ebcd95649cb6e581c3626040bfa57d56 Mon Sep 17 00:00:00 2001 From: Oliver Gugger Date: Mon, 22 Nov 2021 16:30:53 +0100 Subject: [PATCH 5/5] hashmail_server: fix writes up to 2 MB This commit adds the ability for a mailbox message to be up to 2MB in size. --- aperture.go | 7 +++-- hashmail_server.go | 9 +++++++ hashmail_server_test.go | 59 ++++++++++++++++++++++++++++++++++++++--- 3 files changed, 69 insertions(+), 6 deletions(-) diff --git a/aperture.go b/aperture.go index fa86f0f..c475c1a 100644 --- a/aperture.go +++ b/aperture.go @@ -220,8 +220,11 @@ func (a *Aperture) Start(errChan chan error) error { } handler := http.HandlerFunc(a.proxy.ServeHTTP) a.httpsServer = &http.Server{ - Addr: a.cfg.ListenAddr, - Handler: handler, + Addr: a.cfg.ListenAddr, + Handler: handler, + IdleTimeout: 0, + ReadTimeout: 0, + WriteTimeout: 0, } // Create TLS configuration by either creating new self-signed certs or diff --git a/hashmail_server.go b/hashmail_server.go index 7ec0301..dec6f64 100644 --- a/hashmail_server.go +++ b/hashmail_server.go @@ -238,6 +238,15 @@ func newStream(id streamID, limiter *rate.Limiter, c := make([]byte, numBytes) copy(c, buf[0:numBytes]) + for numBytes == DefaultBufSize { + numBytes, err = readReadPipe.Read(buf[:]) + if err != nil { + s.readErrChan <- err + return + } + c = append(c, buf[0:numBytes]...) + } + select { case s.readBytesChan <- c: case <-s.quit: diff --git a/hashmail_server_test.go b/hashmail_server_test.go index f49dbfd..60b9116 100644 --- a/hashmail_server_test.go +++ b/hashmail_server_test.go @@ -2,6 +2,7 @@ package aperture import ( "context" + "crypto/rand" "fmt" "math" "net/http" @@ -89,16 +90,66 @@ func TestHashMailServerReturnStream(t *testing.T) { msg, err = readMsgFromStream(t, client) require.NoError(t, err) require.Equal(t, testMessage2, msg.Msg) + + // Clean up the stream now. + _, err = client.DelCipherBox(ctxb, &hashmailrpc.CipherBoxAuth{ + Auth: &hashmailrpc.CipherBoxAuth_LndAuth{}, + Desc: testStreamDesc, + }) + require.NoError(t, err) +} + +func TestHashMailServerLargeMessage(t *testing.T) { + ctxb := context.Background() + + setupAperture(t) + + // Create a client and connect it to the server. + conn, err := grpc.Dial(testApertureAddress, grpc.WithInsecure()) + require.NoError(t, err) + client := hashmailrpc.NewHashMailClient(conn) + + // We'll create a new cipher box that we're going to subscribe to + // multiple times to check disconnecting returns the read stream. + resp, err := client.NewCipherBox(ctxb, &hashmailrpc.CipherBoxAuth{ + Auth: &hashmailrpc.CipherBoxAuth_LndAuth{}, + Desc: testStreamDesc, + }) + require.NoError(t, err) + require.NotNil(t, resp.GetSuccess()) + + // Let's create a long message and try to send it. + var largeMessage [512*DefaultBufSize]byte + _, err = rand.Read(largeMessage[:]) + require.NoError(t, err) + + sendCtx, sendCancel := context.WithCancel(context.Background()) + defer sendCancel() + + writeStream, err := client.SendStream(sendCtx) + require.NoError(t, err) + err = writeStream.Send(&hashmailrpc.CipherBox{ + Desc: testStreamDesc, + Msg: largeMessage[:], + }) + require.NoError(t, err) + + // We need to wait a bit to make sure the message is really sent. + time.Sleep(100 * time.Millisecond) + + // Connect, wait for the stream to be ready, read something, then + // disconnect immediately. + msg, err := readMsgFromStream(t, client) + require.NoError(t, err) + require.Equal(t, largeMessage[:], msg.Msg) } func setupAperture(t *testing.T) { logWriter := build.NewRotatingLogWriter() - interceptor, err := signal.Intercept() - require.NoError(t, err) - SetupLoggers(logWriter, interceptor) + SetupLoggers(logWriter, signal.Interceptor{}) - err = build.ParseAndSetDebugLevels("trace,PRXY=warn", logWriter) + err := build.ParseAndSetDebugLevels("trace,PRXY=warn", logWriter) require.NoError(t, err) apertureCfg := &Config{