mirror of
https://github.com/aljazceru/plugins.git
synced 2025-12-19 14:14:20 +01:00
backup: Add support for IPv6 addresses in socket backend
Support the bracketed `socket:[::]:1234` syntax for IPv6 addresses. Also, add factor out the socket URI parsing to a function and add tests for it. As a forward compatibility measure, reject query strings (`?a=b`) because I intend to use these for parameters such as SOCKS5 proxy (for Tor) in a future patch.
This commit is contained in:
committed by
Christian Decker
parent
ca75d7e132
commit
cdfcd5a2fe
@@ -1,21 +1,71 @@
|
||||
import json
|
||||
import logging, socket, struct
|
||||
from collections import namedtuple
|
||||
import json, logging, socket, re, struct
|
||||
from typing import Tuple, Iterator
|
||||
from urllib.parse import urlparse
|
||||
from urllib.parse import urlparse, parse_qs
|
||||
|
||||
from backend import Backend, Change
|
||||
from protocol import PacketType, recvall, PKT_CHANGE_TYPES, change_from_packet, packet_from_change, send_packet, recv_packet
|
||||
|
||||
SocketURLInfo = namedtuple('SocketURLInfo', ['host', 'port', 'addrtype'])
|
||||
|
||||
class AddrType:
|
||||
IPv4 = 0
|
||||
IPv6 = 1
|
||||
NAME = 2
|
||||
|
||||
def parse_socket_url(destination: str) -> SocketURLInfo:
|
||||
'''Parse a socket: URL to extract the information contained in it.'''
|
||||
url = urlparse(destination)
|
||||
if url.scheme != 'socket':
|
||||
raise ValueError('Scheme for socket backend must be socket:...')
|
||||
|
||||
if url.path.startswith('['): # bracketed IPv6 address
|
||||
eidx = url.path.find(']')
|
||||
if eidx == -1:
|
||||
raise ValueError('Unterminated bracketed host address.')
|
||||
host = url.path[1:eidx]
|
||||
addrtype = AddrType.IPv6
|
||||
eidx += 1
|
||||
if eidx >= len(url.path) or url.path[eidx] != ':':
|
||||
raise ValueError('Port number missing.')
|
||||
eidx += 1
|
||||
else:
|
||||
eidx = url.path.find(':')
|
||||
if eidx == -1:
|
||||
raise ValueError('Port number missing.')
|
||||
host = url.path[0:eidx]
|
||||
if re.match('\d+\.\d+\.\d+\.\d+$', host): # matches IPv4 address format
|
||||
addrtype = AddrType.IPv4
|
||||
else:
|
||||
addrtype = AddrType.NAME
|
||||
eidx += 1
|
||||
|
||||
try:
|
||||
port = int(url.path[eidx:])
|
||||
except ValueError:
|
||||
raise ValueError('Invalid port number')
|
||||
|
||||
# parse query parameters
|
||||
# reject unknown parameters (currently all of them)
|
||||
qs = parse_qs(url.query)
|
||||
if len(qs):
|
||||
raise ValueError('Invalid query string')
|
||||
|
||||
return SocketURLInfo(host=host, port=port, addrtype=addrtype)
|
||||
|
||||
class SocketBackend(Backend):
|
||||
def __init__(self, destination: str, create: bool):
|
||||
self.version = None
|
||||
self.prev_version = None
|
||||
self.destination = destination
|
||||
self.url = urlparse(self.destination)
|
||||
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
(host, port) = self.url.path.split(':')
|
||||
logging.info('Initialized socket backend')
|
||||
self.sock.connect((host, int(port)))
|
||||
self.url = parse_socket_url(destination)
|
||||
if self.url.addrtype == AddrType.IPv6:
|
||||
self.sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
|
||||
else: # TODO NAME is assumed to be IPv4 for now
|
||||
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
logging.info('Initialized socket backend, connecting to {}:{} (addrtype {})...'.format(
|
||||
self.url.host, self.url.port, self.url.addrtype))
|
||||
self.sock.connect((self.url.host, self.url.port))
|
||||
logging.info('Connected to {}'.format(destination))
|
||||
|
||||
def _send_packet(self, typ: int, payload: bytes) -> None:
|
||||
|
||||
Reference in New Issue
Block a user