diff --git a/backup/socketbackend.py b/backup/socketbackend.py index 85b2ecf..f6a747c 100644 --- a/backup/socketbackend.py +++ b/backup/socketbackend.py @@ -1,11 +1,20 @@ from collections import namedtuple -import json, logging, socket, re, struct +import json, logging, socket, re, struct, time from typing import Tuple, Iterator from urllib.parse import urlparse, parse_qs from backend import Backend, Change from protocol import PacketType, recvall, PKT_CHANGE_TYPES, change_from_packet, packet_from_change, send_packet, recv_packet +# Total number of reconnection tries +RECONNECT_TRIES=5 + +# Delay in seconds between reconnections (initial) +RECONNECT_DELAY=5 + +# Scale delay factor after each failure +RECONNECT_DELAY_BACKOFF=1.5 + HostPortInfo = namedtuple('HostPortInfo', ['host', 'port', 'addrtype']) SocketURLInfo = namedtuple('SocketURLInfo', ['target', 'proxytype', 'proxytarget']) @@ -85,7 +94,9 @@ class SocketBackend(Backend): self.prev_version = None self.destination = destination self.url = parse_socket_url(destination) + self.connect() + def connect(self): if self.url.proxytype == ProxyType.DIRECT: if self.url.target.addrtype == AddrType.IPv6: self.sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM) @@ -97,11 +108,11 @@ class SocketBackend(Backend): self.sock = socks.socksocket() self.sock.set_proxy(socks.SOCKS5, self.url.proxytarget.host, self.url.proxytarget.port) - logging.info('Initialized socket backend, connecting to {}:{} (addrtype {}, proxytype {}, proxytarget {})...'.format( + logging.info('Connecting to {}:{} (addrtype {}, proxytype {}, proxytarget {})...'.format( self.url.target.host, self.url.target.port, self.url.target.addrtype, self.url.proxytype, self.url.proxytarget)) self.sock.connect((self.url.target.host, self.url.target.port)) - logging.info('Connected to {}'.format(destination)) + logging.info('Connected to {}'.format(self.destination)) def _send_packet(self, typ: int, payload: bytes) -> None: send_packet(self.sock, typ, payload) @@ -114,21 +125,64 @@ class SocketBackend(Backend): Initialize socket backend by request current metadata from server. ''' logging.info('Initializing backend') - self._send_packet(PacketType.REQ_METADATA, b'') - (typ, payload) = self._recv_packet() - assert(typ == PacketType.METADATA) - self.protocol, self.version, self.prev_version, self.version_count = struct.unpack("!IIIQ", payload) + self._request_metadata() logging.info('Initialized SocketBackend: protocol={}, version={}, prev_version={}, version_count={}'.format( self.protocol, self.version, self.prev_version, self.version_count )) return True + def _request_metadata(self) -> None: + self._send_packet(PacketType.REQ_METADATA, b'') + (typ, payload) = self._recv_packet() + assert(typ == PacketType.METADATA) + self.protocol, self.version, self.prev_version, self.version_count = struct.unpack("!IIIQ", payload) + def add_change(self, entry: Change) -> bool: typ, payload = packet_from_change(entry) - self._send_packet(typ, payload) - # Wait for change to be acknowledged before continuing. - (typ, _) = self._recv_packet() - assert(typ == PacketType.ACK) + + base_version = self.version + retry = 0 + retry_delay = RECONNECT_DELAY + need_connect = False + while True: # Retry loop + try: + if need_connect: + self.connect() + # Request metadata, to know where we stand + self._request_metadata() + if self.version == entry.version: + # If the current version at the server side matches the version of the + # entry, the packet was succesfully sent and processed and the error + # happened afterward. Nothing left to do. + return True + elif base_version == self.version: + # The other acceptable option is that the current version still matches + # that on the server side. Then we retry. + pass + else: + raise Exception('Unexpected backup version {} after reconnect'.format(self.version)) + + self._send_packet(typ, payload) + # Wait for change to be acknowledged before continuing. + (typ, _) = self._recv_packet() + assert(typ == PacketType.ACK) + except (BrokenPipeError, OSError): + pass + else: + break + + if retry == RECONNECT_TRIES: + logging.error('Connection was lost while sending change (giving up after {} retries)'.format(retry)) + raise IOError('Connection was lost while sending change') + + retry += 1 + logging.warning('Connection was lost while sending change (retry {} of {}, will try again after {} seconds)'.format(retry, RECONNECT_TRIES, retry_delay)) + time.sleep(retry_delay) + retry_delay *= RECONNECT_DELAY_BACKOFF + need_connect = True + + self.prev_version = self.version + self.version = entry.version return True def rewind(self) -> bool: