From 43fc3c6d34430bd46a332ff588df0feb66c4bc26 Mon Sep 17 00:00:00 2001 From: "Wladimir J. van der Laan" Date: Thu, 18 Feb 2021 17:41:51 +0100 Subject: [PATCH] backup: Implement automatic reconnect in socket backend Add automatic reconnect (with exponential back-off) to the socket backend. If the connection is lost while pusing a change, try to reconnect. Handle edge-cases that might come up depending on when the disconnect happaned. Currently the parameters are hardcoded: it will retry 5 times, on the first retry it will wait 5 seconds before reconnecting. There is an exponential backoff of 1.5, so on the fifth try it will wait about 25 seconds. This is particularly useful when conencting over Tor, as transient interruptions are fairly common there. --- backup/socketbackend.py | 76 +++++++++++++++++++++++++++++++++++------ 1 file changed, 65 insertions(+), 11 deletions(-) diff --git a/backup/socketbackend.py b/backup/socketbackend.py index 85b2ecf..f6a747c 100644 --- a/backup/socketbackend.py +++ b/backup/socketbackend.py @@ -1,11 +1,20 @@ from collections import namedtuple -import json, logging, socket, re, struct +import json, logging, socket, re, struct, time from typing import Tuple, Iterator from urllib.parse import urlparse, parse_qs from backend import Backend, Change from protocol import PacketType, recvall, PKT_CHANGE_TYPES, change_from_packet, packet_from_change, send_packet, recv_packet +# Total number of reconnection tries +RECONNECT_TRIES=5 + +# Delay in seconds between reconnections (initial) +RECONNECT_DELAY=5 + +# Scale delay factor after each failure +RECONNECT_DELAY_BACKOFF=1.5 + HostPortInfo = namedtuple('HostPortInfo', ['host', 'port', 'addrtype']) SocketURLInfo = namedtuple('SocketURLInfo', ['target', 'proxytype', 'proxytarget']) @@ -85,7 +94,9 @@ class SocketBackend(Backend): self.prev_version = None self.destination = destination self.url = parse_socket_url(destination) + self.connect() + def connect(self): if self.url.proxytype == ProxyType.DIRECT: if self.url.target.addrtype == AddrType.IPv6: self.sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM) @@ -97,11 +108,11 @@ class SocketBackend(Backend): self.sock = socks.socksocket() self.sock.set_proxy(socks.SOCKS5, self.url.proxytarget.host, self.url.proxytarget.port) - logging.info('Initialized socket backend, connecting to {}:{} (addrtype {}, proxytype {}, proxytarget {})...'.format( + logging.info('Connecting to {}:{} (addrtype {}, proxytype {}, proxytarget {})...'.format( self.url.target.host, self.url.target.port, self.url.target.addrtype, self.url.proxytype, self.url.proxytarget)) self.sock.connect((self.url.target.host, self.url.target.port)) - logging.info('Connected to {}'.format(destination)) + logging.info('Connected to {}'.format(self.destination)) def _send_packet(self, typ: int, payload: bytes) -> None: send_packet(self.sock, typ, payload) @@ -114,21 +125,64 @@ class SocketBackend(Backend): Initialize socket backend by request current metadata from server. ''' logging.info('Initializing backend') - self._send_packet(PacketType.REQ_METADATA, b'') - (typ, payload) = self._recv_packet() - assert(typ == PacketType.METADATA) - self.protocol, self.version, self.prev_version, self.version_count = struct.unpack("!IIIQ", payload) + self._request_metadata() logging.info('Initialized SocketBackend: protocol={}, version={}, prev_version={}, version_count={}'.format( self.protocol, self.version, self.prev_version, self.version_count )) return True + def _request_metadata(self) -> None: + self._send_packet(PacketType.REQ_METADATA, b'') + (typ, payload) = self._recv_packet() + assert(typ == PacketType.METADATA) + self.protocol, self.version, self.prev_version, self.version_count = struct.unpack("!IIIQ", payload) + def add_change(self, entry: Change) -> bool: typ, payload = packet_from_change(entry) - self._send_packet(typ, payload) - # Wait for change to be acknowledged before continuing. - (typ, _) = self._recv_packet() - assert(typ == PacketType.ACK) + + base_version = self.version + retry = 0 + retry_delay = RECONNECT_DELAY + need_connect = False + while True: # Retry loop + try: + if need_connect: + self.connect() + # Request metadata, to know where we stand + self._request_metadata() + if self.version == entry.version: + # If the current version at the server side matches the version of the + # entry, the packet was succesfully sent and processed and the error + # happened afterward. Nothing left to do. + return True + elif base_version == self.version: + # The other acceptable option is that the current version still matches + # that on the server side. Then we retry. + pass + else: + raise Exception('Unexpected backup version {} after reconnect'.format(self.version)) + + self._send_packet(typ, payload) + # Wait for change to be acknowledged before continuing. + (typ, _) = self._recv_packet() + assert(typ == PacketType.ACK) + except (BrokenPipeError, OSError): + pass + else: + break + + if retry == RECONNECT_TRIES: + logging.error('Connection was lost while sending change (giving up after {} retries)'.format(retry)) + raise IOError('Connection was lost while sending change') + + retry += 1 + logging.warning('Connection was lost while sending change (retry {} of {}, will try again after {} seconds)'.format(retry, RECONNECT_TRIES, retry_delay)) + time.sleep(retry_delay) + retry_delay *= RECONNECT_DELAY_BACKOFF + need_connect = True + + self.prev_version = self.version + self.version = entry.version return True def rewind(self) -> bool: