Working version of the async pub/sub notifier for callbacks

This commit is contained in:
kexkey
2019-05-20 14:33:37 -04:00
committed by kexkey
parent ed71a2ed8f
commit ac2b031bf0
12 changed files with 124 additions and 75 deletions

View File

@@ -13,6 +13,7 @@ main() {
local response_topic
while read msg; do
trace "[main] New msg just arrived!"
trace "[main] msg=${msg}"
cmd=$(echo ${msg} | jq ".cmd" | tr -d '"')
@@ -25,14 +26,14 @@ main() {
web)
response=$(web "${msg}")
publish_response "${response}" "${response_topic}" ${?}
trace "[main] PR"
;;
esac
trace "[main] case finished"
trace "[main] msg processed"
done
}
export TRACING=1
main
trace "[requesthandler] exiting"
exit $?

View File

@@ -13,6 +13,7 @@ publish_response() {
trace "[publish_response] response_topic=${response_topic}"
trace "[publish_response] returncode=${returncode}"
# response=$(echo "${response}" | base64 | tr -d '\n')
trace "[publish_response] mosquitto_pub -h broker -t \"${response_topic}\" -m \"${response}\""
mosquitto_pub -h broker -t "${response_topic}" -m "${response}"
returncode=$?

View File

@@ -9,7 +9,7 @@ web() {
local url
local body
local returncode
local http_code
local response
local result
trace "[web] msg=${msg}"
@@ -20,13 +20,14 @@ web() {
# jq -e will have a return code of 1 if the supplied tag is null.
if [ "$?" -eq "0" ]; then
# body tag not null, so it's a POST
body=$(echo "${body}" | base64 -d)
trace "[web] body=${body}"
else
body=
trace "[web] no body, GET request"
fi
http_code=$(curl_it "${url}" "${body}")
response=$(curl_it "${url}" "${body}")
returncode=$?
trace_rc ${returncode}
@@ -38,7 +39,8 @@ web() {
result="error"
fi
echo "{\"result\":\"${result}\",\"http_code\":\"${http_code}\"}"
echo "${response}"
# echo "{\"result\":\"${result}\",\"http_code\":\"${http_code}\"}"
return ${returncode}
}
@@ -49,20 +51,29 @@ curl_it() {
local url=$(echo "${1}" | tr -d '"')
local data=${2}
local returncode
local response
local rnd=$(dd if=/dev/urandom bs=5 count=1 | xxd -pc 5)
if [ -n "${data}" ]; then
trace "[curl_it] curl -o /dev/null -w \"%{http_code}\" -H \"Content-Type: application/json\" -H \"X-Forwarded-Proto: https\" -d ${data} ${url}"
rc=$(curl -o /dev/null -w "%{http_code}" -H "Content-Type: application/json" -H "X-Forwarded-Proto: https" -d ${data} ${url})
trace "[curl_it] curl -o webresponse-${rnd} -w \"%{http_code}\" -H \"Content-Type: application/json\" -H \"X-Forwarded-Proto: https\" -d ${data} ${url}"
rc=$(curl -o webresponse-${rnd} -w "%{http_code}" -H "Content-Type: application/json" -H "X-Forwarded-Proto: https" -d ${data} ${url})
returncode=$?
else
trace "[curl_it] curl -o /dev/null -w \"%{http_code}\" ${url}"
rc=$(curl -o /dev/null -w "%{http_code}" ${url})
trace "[curl_it] curl -o webresponse-$$ -w \"%{http_code}\" ${url}"
rc=$(curl -o webresponse-${rnd} -w "%{http_code}" ${url})
returncode=$?
fi
trace "[curl_it] HTTP return code=${rc}"
trace_rc ${returncode}
echo "${rc}"
if [ "${returncode}" -eq "0" ]; then
response=$(cat webresponse-${rnd} | base64 | tr -d '"' ; rm webresponse-${rnd})
else
response=
fi
response="{\"curl_code\":${returncode},\"http_code\":${rc},\"body\":\"${response}\"}"
echo "${response}"
if [ "${returncode}" -eq "0" ]; then
if [ "${rc}" -lt "400" ]; then

View File

@@ -13,11 +13,10 @@ WORKDIR ${HOME}
COPY app/data/* ./
COPY app/script/* ./
COPY --from=cyphernode/clightning:v0.7.0-test /usr/local/bin/lightning-cli ./
# COPY --from=eclipse-mosquitto:1.6 /usr/bin/mosquitto_sub ./
# COPY --from=eclipse-mosquitto:1.6 /usr/bin/mosquitto_pub ./
COPY --from=eclipse-mosquitto:1.6 /usr/bin/mosquitto_rr ./
COPY --from=eclipse-mosquitto /usr/bin/mosquitto_rr /usr/bin/mosquitto_sub /usr/bin/mosquitto_pub /usr/bin/
COPY --from=eclipse-mosquitto /usr/lib/libmosquitto* /usr/lib/
RUN chmod +x startproxy.sh requesthandler.sh lightning-cli sqlmigrate*.sh waitanyinvoice.sh \
RUN chmod +x startproxy.sh requesthandler.sh lightning-cli sqlmigrate*.sh waitanyinvoice.sh tests* \
&& chmod o+w . \
&& mkdir db

View File

@@ -56,15 +56,21 @@ get_rawtransaction()
return $?
}
get_transaction()
{
get_transaction() {
trace "Entering get_transaction()..."
local txid=${1}
trace "[get_transaction] txid=${txid}"
local to_spender_node=${2}
trace "[get_transaction] to_spender_node=${to_spender_node}"
local data="{\"method\":\"gettransaction\",\"params\":[\"${txid}\",true]}"
trace "[get_transaction] data=${data}"
send_to_watcher_node "${data}"
if [ -z "${to_spender_node}" ]; then
send_to_watcher_node "${data}"
else
send_to_spender_node "${data}"
fi
return $?
}

View File

@@ -2,6 +2,7 @@
. ./trace.sh
. ./sql.sh
. ./notify.sh
do_callbacks() {
(
@@ -233,28 +234,13 @@ build_callback() {
curl_callback() {
trace "Entering curl_callback()..."
local url=${1}
local data=${2}
local returncode
#trace "[curl_callback] curl -w \"%{http_code}\" -H \"Content-Type: application/json\" -H \"X-Forwarded-Proto: https\" -d \"${data}\" ${url}"
#rc=$(curl -w "%{http_code}" -H "Content-Type: application/json" -H "X-Forwarded-Proto: https" -d "${data}" ${url})
#returncode=$?
trace "[curl_callback] mosquitto_rr -h broker -t notifier -e jefsio -m \"{\"response-topic\":\"jefsio\",\"cmd\":\"web\",\"url\":\"${url}\",\"body\":\"${data}\"}\""
rc=$(./mosquitto_rr -h broker -t notifier -e jefsio -m "{\"response-topic\":\"jefsio\",\"cmd\":\"web\",\"url\":\"${url}\",\"body\":\"${data}\"}")
rc=$(echo "${rc}" | jq ".http_code")
trace "[curl_callback] HTTP return code=${rc}"
notify_web "${1}" "${2}"
returncode=$?
trace_rc ${returncode}
if [ "${returncode}" -eq "0" ]; then
if [ "${rc}" -lt "400" ]; then
return 0
else
return ${rc}
fi
else
return ${returncode}
fi
return ${returncode}
}
case "${0}" in *callbacks_job.sh) do_callbacks $@;; esac

View File

@@ -113,23 +113,11 @@ build_callback_txid() {
curl_callback_txid() {
trace "Entering curl_callback_txid()..."
local url=${1}
local data=${2}
local returncode
trace "[curl_callback_txid] curl -w \"%{http_code}\" -H \"Content-Type: application/json\" -H \"X-Forwarded-Proto: https\" -d \"${data}\" ${url}"
rc=$(curl -w "%{http_code}" -H "Content-Type: application/json" -H "X-Forwarded-Proto: https" -d "${data}" ${url})
notify_web "${1}" "${2}"
returncode=$?
trace "[curl_callback_txid] HTTP return code=${rc}"
trace_rc ${returncode}
if [ "${returncode}" -eq "0" ]; then
if [ "${rc}" -lt "400" ]; then
return 0
else
return ${rc}
fi
else
return ${returncode}
fi
return ${returncode}
}

View File

@@ -112,7 +112,7 @@ confirmation() {
else
# TX found in our DB.
# 1-conf or executecallbacks on an unconfirmed tx or spending watched address (in this case, we probably missed conf)
# 1-conf or executecallbacks on an unconfirmed tx or spending watched address (in this case, we probably missed conf) or spending to a watched address (in this case, spend inserted the tx in the DB)
local tx_blockhash=$(echo "${tx_details}" | jq '.result.blockhash')
trace "[confirmation] tx_blockhash=${tx_blockhash}"
@@ -130,9 +130,8 @@ confirmation() {
raw_tx=readfile('rawtx-${txid}.blob')
WHERE txid=\"${txid}\""
trace_rc $?
id_inserted=${tx}
fi
id_inserted=${tx}
fi
# Delete the temp file containing the raw tx (see above)
rm rawtx-${txid}.blob
@@ -151,8 +150,8 @@ confirmation() {
do
watching_id=$(echo "${row}" | cut -d '|' -f1)
address=$(echo "${row}" | cut -d '|' -f2)
tx_vout_n=$(echo "${tx_details}" | jq ".result.details[] | select(.address==\"${address}\") | .vout")
tx_vout_amount=$(echo "${tx_details}" | jq ".result.details[] | select(.address==\"${address}\") | .amount")
tx_vout_n=$(echo "${tx_details}" | jq ".result.details | map(select(.address==\"${address}\"))[0] | .vout")
tx_vout_amount=$(echo "${tx_details}" | jq ".result.details | map(select(.address==\"${address}\"))[0] | .amount | fabs" | awk '{ printf "%.8f", $0 }')
sql "INSERT OR IGNORE INTO watching_tx (watching_id, tx_id, vout, amount) VALUES (${watching_id}, ${id_inserted}, ${tx_vout_n}, ${tx_vout_amount})"
trace_rc $?
done
@@ -176,13 +175,12 @@ confirmation() {
########################################################################################################
do_callbacks
) 201>./.confirmation.lock
do_callbacks
echo '{"result":"confirmed"}'
return 0
) 201>./.confirmation.lock
}
case "${0}" in *confirmation.sh) confirmation $@;; esac

View File

@@ -0,0 +1,36 @@
#!/bin/sh
. ./trace.sh
notify_web() {
trace "Entering notify_web()..."
local url=${1}
# Let's encode the body to base64 so we won't have to escape the special chars...
local body=$(echo "${2}" | base64 | tr -d '\n')
local returncode
local response
local http_code
trace "[notify_web] mosquitto_rr -h broker -W 5 -t notifier -e \"response/$$\" -m \"{\"response-topic\":\"response/$$\",\"cmd\":\"web\",\"url\":\"${url}\",\"body\":\"${body}\"}\""
response=$(mosquitto_rr -h broker -W 5 -t notifier -e "response/$$" -m "{\"response-topic\":\"response/$$\",\"cmd\":\"web\",\"url\":\"${url}\",\"body\":\"${body}\"}")
returncode=$?
trace_rc ${returncode}
trace "[notify_web] response=${response}"
http_code=$(echo "${response}" | jq ".http_code" | tr -d '"')
trace "[notify_web] http_code=${http_code}"
if [ "${returncode}" -eq "0" ]; then
if [ "${http_code}" -lt "400" ]; then
return 0
else
return ${http_code}
fi
else
return ${returncode}
fi
}

View File

@@ -202,17 +202,13 @@ serve_ots_backoffice() {
trace "[serve_ots_backoffice] url=${url}"
# Call back newly upgraded stamps
#trace "[serve_ots_backoffice] curl -s -o /dev/null -w \"%{http_code}\" -H \"X-Forwarded-Proto: https\" ${url}"
#rc=$(curl -s -o /dev/null -w "%{http_code}" -H "X-Forwarded-Proto: https" ${url})
#returncode=$?
trace "[serve_ots_backoffice] mosquitto_rr -h broker -t notifier -e dhtsggs -m \"{\"response-topic\":\"dhtsggs\",\"cmd\":\"web\",\"url\":\"${url}\"}\""
rc=$(./mosquitto_rr -h broker -t notifier -e dhtsggs -m "{\"response-topic\":\"dhtsggs\",\"cmd\":\"web\",\"url\":\"${url}\"}")
rc=$(echo "${rc}" | jq ".http_code")
notify_web "${url}"
returncode=$?
trace_rc ${returncode}
# Even if curl executed ok, we need to make sure the http return code is also ok
if [ "${returncode}" -eq "0" ] && [ "${rc}" -lt "400" ]; then
if [ "${returncode}" -eq "0" ]; then
sql "UPDATE stamp SET calledback=1 WHERE id=${id}"
trace_rc $?
fi

View File

@@ -379,4 +379,5 @@ export DB_PATH
export DB_FILE
main
trace "[requesthandler] exiting"
exit $?

View File

@@ -14,6 +14,8 @@ spend() {
trace "[spend] amount=${amount}"
local response
local id_inserted
local tx_details
local tx_raw_details
response=$(send_to_spender_node "{\"method\":\"sendtoaddress\",\"params\":[\"${address}\",${amount}]}")
local returncode=$?
@@ -24,8 +26,23 @@ spend() {
local txid=$(echo "${response}" | jq ".result" | tr -d '"')
trace "[spend] txid=${txid}"
tx_details=$(get_transaction ${txid} "spender")
tx_raw_details=$(get_rawtransaction ${txid})
local tx_hash=$(echo "${tx_raw_details}" | jq '.result.hash')
local tx_ts_firstseen=$(echo "${tx_details}" | jq '.result.timereceived')
local tx_amount=$(echo "${tx_details}" | jq '.result.amount | fabs' | awk '{ printf "%.8f", $0 }')
local tx_size=$(echo "${tx_raw_details}" | jq '.result.size')
local tx_vsize=$(echo "${tx_raw_details}" | jq '.result.vsize')
local tx_replaceable=$(echo "${tx_details}" | jq '.result."bip125-replaceable"')
tx_replaceable=$([ ${tx_replaceable} = "yes" ] && echo 1 || echo 0)
local fees=$(echo "${tx_details}" | jq '.result.fee | fabs' | awk '{ printf "%.8f", $0 }')
local rawtx=$(echo "${tx_details}" | jq '.result.hex')
# Let's insert the txid in our little DB to manage the confirmation and tell it's not a watching address
sql "INSERT OR IGNORE INTO tx (txid) VALUES (\"${txid}\")"
#sql "INSERT OR IGNORE INTO tx (txid) VALUES (\"${txid}\")"
sql "INSERT OR IGNORE INTO tx (txid, hash, confirmations, timereceived, fee, size, vsize, is_replaceable, raw_tx) VALUES (\"${txid}\", ${tx_hash}, 0, ${tx_ts_firstseen}, ${fees}, ${tx_size}, ${tx_vsize}, ${tx_replaceable}, ${rawtx})"
trace_rc $?
id_inserted=$(sql "SELECT id FROM tx WHERE txid=\"${txid}\"")
trace_rc $?
@@ -127,16 +144,8 @@ getbalancebyxpub() {
getnewaddress() {
trace "Entering getnewaddress()..."
local address_type=${1}
trace "[getnewaddress] address_type=${address_type}"
local response
local data
if [ -z "${address_type}" ]; then
data='{"method":"getnewaddress"}'
else
data="{\"method\":\"getnewaddress\",\"params\":[\"\",\"${address_type}\"]}"
fi
local data='{"method":"getnewaddress"}'
response=$(send_to_spender_node "${data}")
local returncode=$?
trace_rc ${returncode}
@@ -181,6 +190,8 @@ batchspend() {
local recipientswhere
local recipientsjson
local id_inserted
local tx_details
local tx_raw_details
# We will batch all the addresses in DB without a TXID
local batching=$(sql 'SELECT address, amount FROM recipient WHERE tx_id IS NULL')
@@ -219,8 +230,23 @@ batchspend() {
local txid=$(echo "${response}" | jq ".result" | tr -d '"')
trace "[batchspend] txid=${txid}"
tx_details=$(get_transaction ${txid} "spender")
tx_raw_details=$(get_rawtransaction ${txid})
local tx_hash=$(echo "${tx_raw_details}" | jq '.result.hash')
local tx_ts_firstseen=$(echo "${tx_details}" | jq '.result.timereceived')
local tx_amount=$(echo "${tx_details}" | jq '.result.amount | fabs' | awk '{ printf "%.8f", $0 }')
local tx_size=$(echo "${tx_raw_details}" | jq '.result.size')
local tx_vsize=$(echo "${tx_raw_details}" | jq '.result.vsize')
local tx_replaceable=$(echo "${tx_details}" | jq '.result."bip125-replaceable"')
tx_replaceable=$([ ${tx_replaceable} = "yes" ] && echo 1 || echo 0)
local fees=$(echo "${tx_details}" | jq '.result.fee | fabs' | awk '{ printf "%.8f", $0 }')
local rawtx=$(echo "${tx_details}" | jq '.result.hex')
# Let's insert the txid in our little DB to manage the confirmation and tell it's not a watching address
sql "INSERT OR IGNORE INTO tx (txid) VALUES (\"${txid}\")"
#sql "INSERT OR IGNORE INTO tx (txid) VALUES (\"${txid}\")"
sql "INSERT OR IGNORE INTO tx (txid, hash, confirmations, timereceived, fee, size, vsize, is_replaceable, raw_tx) VALUES (\"${txid}\", ${tx_hash}, 0, ${tx_ts_firstseen}, ${fees}, ${tx_size}, ${tx_vsize}, ${tx_replaceable}, ${rawtx})"
returncode=$?
trace_rc ${returncode}
if [ "${returncode}" -eq 0 ]; then