Provide a msg to watch that will be published to a topic when confirmation

This commit is contained in:
kexkey
2019-11-28 11:41:33 -05:00
committed by kexkey
parent 49299725e0
commit ed40433b28
10 changed files with 108 additions and 49 deletions

View File

@@ -106,6 +106,7 @@ services:
image: eclipse-mosquitto:1.6
networks:
- cyphernodenet
- cyphernodeappsnet
restart: always
# deploy:
# placement:

View File

@@ -9,7 +9,7 @@ Inserts the address and callbacks in the DB and imports the address to the Watch
```http
POST http://cyphernode:8888/watch
with body...
{"address":"2N8DcqzfkYi8CkYzvNNS5amoq3SbAcQNXKp","unconfirmedCallbackURL":"192.168.111.233:1111/callback0conf","confirmedCallbackURL":"192.168.111.233:1111/callback1conf"}
{"address":"2N8DcqzfkYi8CkYzvNNS5amoq3SbAcQNXKp","unconfirmedCallbackURL":"192.168.111.233:1111/callback0conf","confirmedCallbackURL":"192.168.111.233:1111/callback1conf","eventMessage":"{\"bounce_address}\":\"tb1q6s0ppwk2msdewal3mu90ahfhpyepawnw6wdk8t\",\"nb_conf\":6}"}
```
Proxy response:
@@ -26,7 +26,8 @@ Proxy response:
"estimatesmartfee2blocks": "0.000010",
"estimatesmartfee6blocks": "0.000010",
"estimatesmartfee36blocks": "0.000010",
"estimatesmartfee144blocks": "0.000010"
"estimatesmartfee144blocks": "0.000010",
"eventMessage": "{\"bounce_address}\":\"tb1q6s0ppwk2msdewal3mu90ahfhpyepawnw6wdk8t\",\"nb_conf\":6}"
}
```
@@ -66,7 +67,8 @@ Proxy response:
"imported":"1",
"unconfirmedCallbackURL":"192.168.133.233:1111/callback0conf",
"confirmedCallbackURL":"192.168.133.233:1111/callback1conf",
"watching_since":"2018-09-06 21:14:03"}
"watching_since":"2018-09-06 21:14:03",
"eventMessage":"{\"bounce_address}\":\"tb1q6s0ppwk2msdewal3mu90ahfhpyepawnw6wdk8t\",\"nb_conf\":6}"}
]
}
```

View File

@@ -52,6 +52,7 @@ paths:
- "address"
- "confirmedCallbackURL"
- "unconfirmedCallbackURL"
- "event_message"
properties:
address:
$ref: '#/components/schemas/TypeAddressString'
@@ -61,6 +62,8 @@ paths:
confirmedCallbackURL:
type: "string"
format: "url"
event_message:
type: "string"
responses:
'200':
description: "successfully created"
@@ -1543,6 +1546,7 @@ components:
- "address"
- "unconfirmedCallbackURL"
- "confirmedCallbackURL"
- "event_message"
properties:
id:
type: "string"
@@ -1574,6 +1578,8 @@ components:
type: "string"
watching_since:
type: "string"
event_message:
type: "string"
WatchedByXpubAddress:
type: "object"
required:

View File

@@ -23,6 +23,7 @@ CREATE TABLE watching (
imported INTEGER DEFAULT FALSE,
watching_by_pub32_id INTEGER REFERENCES watching_by_pub32,
pub32_index INTEGER,
event_message TEXT,
inserted_ts INTEGER DEFAULT CURRENT_TIMESTAMP
);

View File

@@ -0,0 +1,14 @@
#!/bin/sh
echo "Checking for watching event support in DB..."
count=$(sqlite3 $DB_FILE "select count(*) from pragma_table_info('watching') where name='event_message'")
if [ "${count}" -eq "0" ]; then
# event_message not there, we have to migrate
echo "Migrating database for event triggered on watch notif..."
echo "Backing up current DB..."
cp $DB_FILE $DB_FILE-sqlmigrate20191127_0.2.4-0.3.0
echo "Altering DB..."
cat sqlmigrate20191127_0.2.4-0.3.0.sql | sqlite3 $DB_FILE
else
echo "Database watching event support migration already done, skipping!"
fi

View File

@@ -0,0 +1 @@
ALTER TABLE watching ADD COLUMN event_message TEXT;

View File

@@ -11,7 +11,7 @@ do_callbacks() {
trace "Entering do_callbacks()..."
# Let's fetch all the watching addresses still being watched but not called back
local callbacks=$(sql 'SELECT DISTINCT w.callback0conf, address, txid, vout, amount, confirmations, timereceived, fee, size, vsize, blockhash, blockheight, blocktime, w.id, is_replaceable, pub32_index, pub32, label, derivation_path FROM watching w LEFT JOIN watching_tx ON w.id = watching_id LEFT JOIN tx ON tx.id = tx_id LEFT JOIN watching_by_pub32 w32 ON watching_by_pub32_id = w32.id WHERE NOT calledback0conf AND watching_id NOT NULL AND w.callback0conf NOT NULL AND w.watching')
local callbacks=$(sql 'SELECT DISTINCT w.callback0conf, address, txid, vout, amount, confirmations, timereceived, fee, size, vsize, blockhash, blockheight, blocktime, w.id, is_replaceable, pub32_index, pub32, label, derivation_path, event_message FROM watching w LEFT JOIN watching_tx ON w.id = watching_id LEFT JOIN tx ON tx.id = tx_id LEFT JOIN watching_by_pub32 w32 ON watching_by_pub32_id = w32.id WHERE NOT calledback0conf AND watching_id NOT NULL AND w.callback0conf NOT NULL AND w.watching')
trace "[do_callbacks] callbacks0conf=${callbacks}"
local returncode
@@ -30,7 +30,7 @@ do_callbacks() {
fi
done
callbacks=$(sql 'SELECT DISTINCT w.callback1conf, address, txid, vout, amount, confirmations, timereceived, fee, size, vsize, blockhash, blockheight, blocktime, w.id, is_replaceable, pub32_index, pub32, label, derivation_path FROM watching w, watching_tx wt, tx t LEFT JOIN watching_by_pub32 w32 ON watching_by_pub32_id = w32.id WHERE w.id = watching_id AND tx_id = t.id AND NOT calledback1conf AND confirmations>0 AND w.callback1conf NOT NULL AND w.watching')
callbacks=$(sql 'SELECT DISTINCT w.callback1conf, address, txid, vout, amount, confirmations, timereceived, fee, size, vsize, blockhash, blockheight, blocktime, w.id, is_replaceable, pub32_index, pub32, label, derivation_path, event_message FROM watching w, watching_tx wt, tx t LEFT JOIN watching_by_pub32 w32 ON watching_by_pub32_id = w32.id WHERE w.id = watching_id AND tx_id = t.id AND NOT calledback1conf AND confirmations>0 AND w.callback1conf NOT NULL AND w.watching')
trace "[do_callbacks] callbacks1conf=${callbacks}"
for row in ${callbacks}
@@ -62,14 +62,23 @@ ln_manage_callback() {
local row=$@
trace "[ln_manage_callback] row=${row}"
local callback_url=$(echo "${row}" | cut -d '|' -f4)
trace "[ln_manage_callback] callback_url=${callback_url}"
if [ -z "${callback_url}" ]; then
# No callback url provided for that invoice
trace "[ln_manage_callback] No callback url provided for that invoice"
sql "UPDATE ln_invoice SET calledback=1 WHERE id=\"${id}\""
trace_rc $?
return
fi
local id=$(echo "${row}" | cut -d '|' -f1)
trace "[ln_manage_callback] id=${id}"
local label=$(echo "${row}" | cut -d '|' -f2)
trace "[ln_manage_callback] label=${label}"
local bolt11=$(echo "${row}" | cut -d '|' -f3)
trace "[ln_manage_callback] bolt11=${bolt11}"
local callback_url=$(echo "${row}" | cut -d '|' -f4)
trace "[ln_manage_callback] callback_url=${callback_url}"
local payment_hash=$(echo "${row}" | cut -d '|' -f5)
trace "[ln_manage_callback] payment_hash=${payment_hash}"
local msatoshi=$(echo "${row}" | cut -d '|' -f6)
@@ -88,13 +97,6 @@ ln_manage_callback() {
trace "[ln_manage_callback] expires_at=${expires_at}"
local returncode
if [ -z "${callback_url}" ]; then
# No callback url provided for that invoice
sql "UPDATE ln_invoice SET calledback=1 WHERE id=\"${id}\""
trace_rc $?
return
fi
data="{\"id\":\"${id}\","
data="${data}\"label\":\"${label}\","
data="${data}\"bolt11\":\"${bolt11}\","
@@ -149,13 +151,22 @@ build_callback() {
local label
local derivation_path
# callback0conf, address, txid, vout, amount, confirmations, timereceived, fee, size, vsize, blockhash, blockheight, blocktime, w.id
local event_message
# w.callback0conf, address, txid, vout, amount, confirmations, timereceived, fee, size, vsize, blockhash, blockheight, blocktime,
# w.id, is_replaceable, pub32_index, pub32, label, derivation_path, event_message
url=$(echo "${row}" | cut -d '|' -f1)
trace "[build_callback] url=${url}"
if [ -z "${url}" ]; then
# No callback url provided for that watch
trace "[build_callback] No callback url provided for that watch, skipping webhook call"
return
fi
trace "[build_callback] row=${row}"
id=$(echo "${row}" | cut -d '|' -f14)
trace "[build_callback] id=${id}"
url=$(echo "${row}" | cut -d '|' -f1)
trace "[build_callback] url=${url}"
address=$(echo "${row}" | cut -d '|' -f2)
trace "[build_callback] address=${address}"
txid=$(echo "${row}" | cut -d '|' -f3)
@@ -199,6 +210,8 @@ build_callback() {
derivation_path=$(echo "${row}" | cut -d '|' -f19)
trace "[build_callback] derivation_path=${derivation_path}"
fi
event_message=$(echo "${row}" | cut -d '|' -f20)
trace "[build_callback] event_message=${event_message}"
data="{\"id\":\"${id}\","
data="${data}\"address\":\"${address}\","
@@ -212,19 +225,19 @@ build_callback() {
if [ -n "${fee}" ]; then
data="${data}\"fees\":${fee},"
fi
data="${data}\"is_replaceable\":${is_replaceable}"
data="${data}\"is_replaceable\":${is_replaceable},"
if [ -n "${blocktime}" ]; then
data="${data},\"blockhash\":\"${blockhash}\","
data="${data}\"blockhash\":\"${blockhash}\","
data="${data}\"blocktime\":\"$(date -Is -d @${blocktime})\","
data="${data}\"blockheight\":${blockheight}"
data="${data}\"blockheight\":${blockheight},"
fi
if [ -n "${pub32_index}" ]; then
data="${data}\"pub32\":\"${pub32}\","
data="${data}\"pub32_label\":\"${label}\","
derivation_path=$(echo -e $derivation_path | sed -En "s/n/${pub32_index}/p")
data="${data}\"pub32_derivation_path\":\"${derivation_path}\""
data="${data}\"pub32_derivation_path\":\"${derivation_path}\","
fi
data="${data}}"
data="${data}\"event_message\":\"${event_message}\"}"
trace "[build_callback] data=${data}"
curl_callback "${url}" "${data}"

View File

@@ -58,7 +58,7 @@ confirmation() {
notfirst=true
fi
done
local rows=$(sql "SELECT id, address, watching_by_pub32_id, pub32_index FROM watching WHERE address IN (${addresseswhere}) AND watching")
local rows=$(sql "SELECT id, address, watching_by_pub32_id, pub32_index, event_message FROM watching WHERE address IN (${addresseswhere}) AND watching")
if [ ${#rows} -eq 0 ]; then
trace "[confirmation] No watched address in this tx!"
return 0
@@ -137,45 +137,63 @@ confirmation() {
rm rawtx-${txid}.blob
########################################################################################################
# Let's now insert in the join table if not already done
local event_message
# Let's see if we need to insert tx in the join table
tx=$(sql "SELECT tx_id FROM watching_tx WHERE tx_id=\"${tx}\"")
if [ -z "${tx}" ]; then
trace "[confirmation] For this tx, there's no watching_tx row, let's create it"
local watching_id
for row in ${rows}
do
# If the tx is batched and pays multiple watched addresses, we have to insert
# those additional addresses in watching_tx!
for row in ${rows}
do
address=$(echo "${row}" | cut -d '|' -f2)
########################################################################################################
# Let's now insert in the join table if not already done
if [ -z "${tx}" ]; then
trace "[confirmation] For this tx, there's no watching_tx row, let's create it"
local watching_id
# If the tx is batched and pays multiple watched addresses, we have to insert
# those additional addresses in watching_tx!
watching_id=$(echo "${row}" | cut -d '|' -f1)
address=$(echo "${row}" | cut -d '|' -f2)
# In the case of us spending to a watched address, the address appears twice in the details,
# once on the spend side (negative amount) and once on the receiving side (positive amount)
tx_vout_n=$(echo "${tx_details}" | jq ".result.details | map(select(.address==\"${address}\"))[0] | .vout")
tx_vout_amount=$(echo "${tx_details}" | jq ".result.details | map(select(.address==\"${address}\"))[0] | .amount | fabs" | awk '{ printf "%.8f", $0 }')
sql "INSERT OR IGNORE INTO watching_tx (watching_id, tx_id, vout, amount) VALUES (${watching_id}, ${id_inserted}, ${tx_vout_n}, ${tx_vout_amount})"
trace_rc $?
done
else
trace "[confirmation] For this tx, there's already watching_tx rows"
fi
########################################################################################################
else
trace "[confirmation] For this tx, there's already watching_tx rows"
fi
########################################################################################################
########################################################################################################
# Let's now grow the watch window in the case of a xpub watcher...
trace "[confirmation] Let's now grow the watch window in the case of a xpub watcher"
for row in ${rows}
do
########################################################################################################
# Let's now grow the watch window in the case of a xpub watcher...
watching_by_pub32_id=$(echo "${row}" | cut -d '|' -f3)
pub32_index=$(echo "${row}" | cut -d '|' -f4)
if [ -n "${watching_by_pub32_id}" ]; then
trace "[confirmation] Let's now grow the watch window in the case of a xpub watcher"
pub32_index=$(echo "${row}" | cut -d '|' -f4)
extend_watchers ${watching_by_pub32_id} ${pub32_index}
fi
done
########################################################################################################
########################################################################################################
########################################################################################################
# Let's publish the event if needed
event_message=$(echo "${row}" | cut -d '|' -f5)
if [ -n "${event_message}" ]; then
# There's an event message, let's publish it!
# We use the pid as the response-topic, so there's no conflict in responses.
trace "[confirmation] mosquitto_pub -h broker -t conf_event -m \"{\"address\":\"${address}\",\"confirmations\":${tx_nb_conf},\"event_message\":\"${event_message}\"}\""
response=$(mosquitto_pub -h broker -t conf_event -m "{\"address\":\"${address}\",\"confirmations\":${tx_nb_conf},\"event_message\":\"${event_message}\"}")
returncode=$?
trace_rc ${returncode}
fi
########################################################################################################
done
) 201>./.confirmation.lock

View File

@@ -89,6 +89,7 @@ main() {
watch)
# POST http://192.168.111.152:8080/watch
# BODY {"address":"2N8DcqzfkYi8CkYzvNNS5amoq3SbAcQNXKp","unconfirmedCallbackURL":"192.168.111.233:1111/callback0conf","confirmedCallbackURL":"192.168.111.233:1111/callback1conf"}
# BODY {"address":"2N8DcqzfkYi8CkYzvNNS5amoq3SbAcQNXKp","confirmedCallbackURL":"192.168.111.233:1111/callback1conf","eventMessage":"{\"bounce_address}\":\"tb1q6s0ppwk2msdewal3mu90ahfhpyepawnw6wdk8t\",\"nb_conf\":6}"}
response=$(watchrequest "${line}")
response_to_client "${response}" ${?}

View File

@@ -14,11 +14,12 @@ watchrequest() {
local address=$(echo "${request}" | jq -r ".address")
local cb0conf_url=$(echo "${request}" | jq -r ".unconfirmedCallbackURL")
local cb1conf_url=$(echo "${request}" | jq -r ".confirmedCallbackURL")
local event_message=$(echo "${request}" | jq -r ".event_message")
local imported
local inserted
local id_inserted
local result
trace "[watchrequest] Watch request on address (${address}), cb 0-conf (${cb0conf_url}), cb 1-conf (${cb1conf_url})"
trace "[watchrequest] Watch request on address (${address}), cb 0-conf (${cb0conf_url}), cb 1-conf (${cb1conf_url}) with event_message=${event_message}"
result=$(importaddress_rpc "${address}")
returncode=$?
@@ -29,7 +30,7 @@ watchrequest() {
imported=0
fi
sql "INSERT OR REPLACE INTO watching (address, watching, callback0conf, callback1conf, imported) VALUES (\"${address}\", 1, \"${cb0conf_url}\", \"${cb1conf_url}\", ${imported})"
sql "INSERT OR REPLACE INTO watching (address, watching, callback0conf, callback1conf, imported, event_message) VALUES (\"${address}\", 1, \"${cb0conf_url}\", \"${cb1conf_url}\", ${imported}, '${event_message}')"
returncode=$?
trace_rc ${returncode}
if [ "${returncode}" -eq 0 ]; then
@@ -63,7 +64,8 @@ watchrequest() {
\"estimatesmartfee2blocks\":\"${fees2blocks}\",
\"estimatesmartfee6blocks\":\"${fees6blocks}\",
\"estimatesmartfee36blocks\":\"${fees36blocks}\",
\"estimatesmartfee144blocks\":\"${fees144blocks}\"}"
\"estimatesmartfee144blocks\":\"${fees144blocks}\",
\"event_message\":\"${event_message}\"}"
trace "[watchrequest] responding=${data}"
echo "${data}"