Source code for luafun.game.states

from datetime import datetime
import logging
import select
import socket
import time
import multiprocessing as mp
from struct import unpack
import traceback
import warnings

from google.protobuf.json_format import MessageToDict
from google.protobuf.message import DecodeError

from luafun.game.dota2.dota_gcmessages_common_bot_script_pb2 import CMsgBotWorldState
from luafun.game.performance import ProcessingStates

log = logging.getLogger(__name__)


[docs]class SyncWorldListener: """Connect to the dota2 game and save the messages in a queue to be read Size of the message through time. Think dota might allocate a fixed size of memory for the proto message and if the message grows too much the game stop pushing proto messages. I think with active bots we could limit the message size to something reasonable (i.e if creeps die fast enough that the number of units on the map is fairly constant). .. image:: ../_static/msgsize.png """ def __init__(self, host, port, queue, state, stats, name): self.host = host self.port = port self.queue = queue self.sock = None self.state = state self.stats = stats self.name = name self.namespace = f'word-{self.name}' self.reason = None self.decode_errors = None self.err_cnt = 0 self.error = None self.proto_rcv = 0 self.proto_decode = 0 self.partial = b'' # self.msg_size = open('msg_size.txt', 'w') @property def running(self): return self.state['running']
[docs] def connect(self, retries=20, sleep_time=0.01): pending = None s = None for i in range(retries): if s is not None: s.close() try: s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # this does not do anything # s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPCNT, 5) # Number of Keep alive probes # s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 1) # Idle time before keep alive proves are sent # s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 1) # interval between keep alive probes # s.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 0) # disables the Nagle algorithm for TCP sockets s.setblocking(True) s.connect((self.host, self.port)) log.debug(f'Connection established after {i} retries for {self.name}') return s except ConnectionRefusedError as err: if not self.running: log.debug('Giving up; game has stopped') return None pending = err time.sleep(sleep_time) else: log.debug('Could not establish connection') if pending is not None: raise pending return None
[docs] def read_message(self, read: socket.socket): self.proto_rcv = time.time() msg_size = b'' retries = 0 while msg_size == b'' and retries < 10: msg_size = read.recv(4) # means that the socket was closed # This mainly happens on windows every 1000 messages or so # this also means we are MISSING messages, windows # it critically flawed because of those disconnects # After a bit of investigation it seems the disconnects is coming from Dota itself # there is no resolution in sight # Just tested it again it has not happened in a while # could be resolved now if msg_size == b'': self.sock.close() self.sock = self.connect(10) self.reason = f'Reconnected to server' return None # --- retries += 1 if msg_size == b'': self.reason = f'Message size is empty {msg_size} after {retries} retries' return None msg_len = int(unpack("@I", msg_size)[0]) chunks = [self.partial] bytes_recv = 0 while bytes_recv < msg_len: chunk = read.recv(min(msg_len - bytes_recv, 8192)) if chunk == b'': self.reason = f'Could not read rest of message (received: {bytes_recv}) (length: {msg_len})' return None chunks.append(chunk) bytes_recv = bytes_recv + len(chunk) if bytes_recv > msg_len: self.reason = f'Read more than necessary breaking communication (received: {bytes_recv}) (length: {msg_len})' return None if bytes_recv < msg_len: self.reason = f'Read less than necessary breaking communication (received: {bytes_recv}) (length: {msg_len})' return None msg = b''.join(chunks) try: # Overflow in GetSerializedWorldState! # /home/setepenre/work/LuaFun/luafun/game/states.py:117: # RuntimeWarning: Unexpected end-group tag: Not all data was converted # world_state.ParseFromString(msg) world_state = CMsgBotWorldState() size_read = world_state.ParseFromString(msg) self.partial = b'' if len(msg) != size_read: self.partial = msg[size_read:] return world_state except DecodeError: return self.recover(f'Decode error', msg)
[docs] def recover(self, error, msg): import base64 self.reason = f'{error}-{self.name}: {self.error}' if msg is not None: self.decode_errors = open(f'decode_{self.err_cnt}.txt', 'w') self.decode_errors.write(base64.b64encode(msg).decode('utf-8')) self.decode_errors.close() self.decode_errors = None self.err_cnt += 1 # Reconnect, the connection is tainted # we do not know when a message starts and when it ends # this means we will drop at least one state update self.sock.close() self.sock = self.connect(sleep_time=0.01) return None
[docs] def insert_message(self, msg, s): self.proto_decode = time.time() json_msg = MessageToDict( msg, preserving_proto_field_name=True, use_integers_for_enums=True) perf = ProcessingStates() perf.proto_rcv = self.proto_rcv perf.proto_decode = self.proto_decode perf.proto_send = time.time() json_msg['perf'] = perf self.queue.put(json_msg) self.state[self.namespace] = (datetime.utcnow() - s).total_seconds()
def _run(self): if not self.running: return s = datetime.utcnow() # this needs to be lower than the game.deadline so in case of disconnect we can reconnect fast readable, _, error = select.select([self.sock], [], [self.sock], 0.05) for read in readable: msg = self.read_message(read) if msg is not None: self.insert_message(msg, s) else: log.error(f'Could not read message because: {self.reason}') self.reason = None for err in error: err.close() log.debug(f'socket error') self.sock = self.connect()
[docs] def run(self): # warnings.simplefilter('error') self.sock = self.connect(sleep_time=1) if self.sock is None: raise RuntimeError('Impossible to connect to the game') recover = None while self.running: try: if recover is not None: self.recover(recover, None) recover = None self._run() except KeyboardInterrupt: log.debug('user interrupt') self.state['running'] = False break except ConnectionResetError: recover = 'ConnectionResetError' except ConnectionRefusedError: # Dota2 is not running anymore self.state['running'] = False break except ValueError: self.state['running'] = False log.error(traceback.format_exc()) break except Exception: if self.running: log.error(traceback.format_exc()) if self.sock: self.sock.close() # self.msg_size.close() log.debug('World state listener shutting down')
[docs]def sync_world_listener(host, port, queue, state, stats, level, name): try: import coverage coverage.process_startup() except ImportError: pass logging.basicConfig(level=level) wl = SyncWorldListener(host, port, queue, state, stats, name) wl.run()
[docs]def world_listener_process(host, port, queue, state, stats, name, level): p = mp.Process( name=f'WorldListener-{name}', target=sync_world_listener, args=(host, port, queue, state, stats, level, name) ) p.start() log.debug(f'WorldListener-{name}: {p.pid}') return p