commit 9d7d97aaba4a1242553a2dc0c588543a1defa57c Author: Anand Aiyer Date: Fri Apr 29 15:32:48 2011 +0100 first commit - 1.3.0 diff --git a/README b/README new file mode 100644 index 0000000..de8a469 --- /dev/null +++ b/README @@ -0,0 +1,60 @@ + +rfoo - Fast RPC client/server module. + +Contact: Nir Aides +Email: nir@winpdb.org +Website: http://www.winpdb.org/ +Version: 1.3.0 + +rfoo (remote foo) is a fast Python RPC package which can do 160,000 RPC +calls per second on a regular PC. It includes a fast serialization module +called rfoo.marsh which extends the Python built in marshal module by +eliminating serialization of code objects and protecting against bad input. +The result is a safe to use ultra fast serializer. + +Example server code: +> class MyHandler(rfoo.BaseHandler): +> def echo(self, str): +> return str +> +> rfoo.InetServer(MyHandler).start() + +Example client code: +> c = rfoo.InetConnection().connect() +> Proxy(c).echo('Hello World!') + + +rconsole - included with rfoo package is a remote Python console with +auto completion, which can be used to inspect and modify namespace of a +running script. + +To activate in a script do: +> from rfoo.utils import rconsole +> rconsole.spawn_server() + +To attach from a shell do: +$ rconsole + +SECURITY NOTE: +The rconsole listener started with spawn_server() will accept any local +connection and may therefore be insecure to use in shared hosting +or similar environments! + + +Requirements + + Depends on Cython - http://cython.org/ + To install Cython follow the simple instructions at + http://docs.cython.org/src/quickstart/install.html + + Tested on GNU/Linux: + Ubuntu 8.10 64bit, Python 2.5, Python 3.2 + CentOS 64bit, Python 2.4 + + +Installation + + sudo python setup.py install + + + diff --git a/rfoo/__init__.py b/rfoo/__init__.py new file mode 100644 index 0000000..4f1e2e7 --- /dev/null +++ b/rfoo/__init__.py @@ -0,0 +1,59 @@ +""" + rfoo/__init__.py + + Fast RPC server. + + Copyright (c) 2010 Nir Aides and individual contributors. + All rights reserved. + + Redistribution and use in source and binary forms, with or without modification, + are permitted provided that the following conditions are met: + + 1. Redistributions of source code must retain the above copyright notice, + this list of conditions and the following disclaimer. + + 2. Redistributions in binary form must reproduce the above copyright + notice, this list of conditions and the following disclaimer in the + documentation and/or other materials provided with the distribution. + + 3. Neither the name of Nir Aides nor the names of other contributors may + be used to endorse or promote products derived from this software without + specific prior written permission. + + THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND + ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED + WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE + DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR + ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES + (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; + LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON + ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS + SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +""" + +""" + Example: + + class MyHandler(rfoo.BaseHandler): + def echo(self, str): + return str + + rfoo.InetServer(MyHandler).start() + + --- client--- + + c = InetConnection().connect() + rfoo.Proxy(c).echo('Hello World!') +""" + + + +from rfoo._rfoo import * + + + +__version__ = '1.3.0' + + + diff --git a/rfoo/_rfoo.py b/rfoo/_rfoo.py new file mode 100644 index 0000000..f18e843 --- /dev/null +++ b/rfoo/_rfoo.py @@ -0,0 +1,563 @@ +""" + rfoo/_rfoo.py + + Fast RPC server. + + Copyright (c) 2010 Nir Aides and individual contributors. + All rights reserved. + + Redistribution and use in source and binary forms, with or without modification, + are permitted provided that the following conditions are met: + + 1. Redistributions of source code must retain the above copyright notice, + this list of conditions and the following disclaimer. + + 2. Redistributions in binary form must reproduce the above copyright + notice, this list of conditions and the following disclaimer in the + documentation and/or other materials provided with the distribution. + + 3. Neither the name of Nir Aides nor the names of other contributors may + be used to endorse or promote products derived from this software without + specific prior written permission. + + THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND + ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED + WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE + DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR + ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES + (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; + LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON + ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS + SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +""" + +""" + Example: + + class MyHandler(rfoo.BaseHandler): + def echo(self, str): + return str + + rfoo.InetServer(MyHandler).start() + + --- client--- + + c = rfoo.InetConnection().connect() + Proxy(c).echo('Hello World!') +""" + + + +import threading +import logging +import inspect +import socket +import sys +import os + +import marshal as _marshal + +try: + from rfoo.marsh import dumps, loads +except ImportError: + sys.stderr.write(""" +=========================================================== +Did you just try to import rfoo directly from its source distribution? +Well, that's possible, but first you need to build the +Cython extension rfoo.marsh (inplace) with: + python setup.py build_ext --inplace +===========================================================\n""") + raise + +try: + import thread +except ImportError: + import _thread as thread + +try: + import __builtin__ as builtins +except ImportError: + import builtins + + + +# +# Bind to loopback to restrict server to local requests, by default. +# +LOOPBACK = '127.0.0.1' +DEFAULT_PORT = 52431 +BUFFER_SIZE = 4096 + +MAX_THREADS = 128 + +CALL = 0 +NOTIFY = 1 + +# Compatible way to represent binary 'i' across Py2.x Py3.x. +INTEGER = 'i'.encode()[0] + + + +_loads = _marshal.loads +_dumps = _marshal.dumps + + + +def _is_builtin_exception(t): + # Use try except or find way to tell a class in Python 2.4. + try: + return issubclass(t, Exception) + except: + return False + + + +BUILTIN_EXCEPTIONS = set(e for e in vars(builtins).values() if _is_builtin_exception(e)) +BUILTIN_EXCEPTIONS_NAMES = set(e.__name__ for e in BUILTIN_EXCEPTIONS) + + + +class ServerError(IOError): + """Wrap server errors by proxy.""" + + + +class EofError(IOError): + """Socket end of file.""" + + + +class BaseHandler(object): + """ + Handle incomming requests. + Client can call public methods of derived classes. + """ + + def __init__(self, addr=None, context=None): + self._addr = addr + self._context = context + self._methods = {} + + def _close(self): + self._methods = {} + + def _get_method(self, name): + """ + Get public method. + Verify attribute is public method and use cache for performance. + """ + + m = self._methods.get(name, None) + if m is not None: + return m + + if name in ('trait_names', '_getAttributeNames'): + return self._getAttributeNames + + if name.startswith('_'): + logging.warning('Attempt to get non-public, attribute=%s.', name) + raise ValueError(name) + + m = getattr(self, name) + if not inspect.ismethod(m): + logging.warning('Attempt to get non-method, attribute=%s.', name) + raise ValueError(name) + + self._methods[name] = m + + return m + + def _getAttributeNames(self, *args, **kwargs): + """Return list of public methods. + Support auto completion by IPython of proxy methods over network. + """ + + members = inspect.getmembers(self, inspect.ismethod) + return [m[0] for m in members if not m[0].startswith('_')] + + +def restrict_local(foo): + """Decorator to restrict handler method to local proxies only.""" + + def _restrict_local(self, *args, **kwargs): + if self._addr[0] != '127.0.0.1': + raise ValueError('Attempt to invoke method from remote address.') + return foo(self, *args, **kwargs) + + return _restrict_local + + + +class ExampleHandler(BaseHandler): + """ + Demonstrate handler inheritance. + Start server with: start_server(handler=ExampleHandler) + Client calls server with: Proxy(connection).add(...) + """ + + def add(self, x, y): + return x + y + + def echo(self, s): + return s + + + +class Connection(object): + """Wrap socket with buffered read and length prefix for data.""" + + def __init__(self, conn=None): + self._conn = conn + self.recv = self._conn.recv + + def close(self): + """Shut down and close socket.""" + + if self._conn is not None: + try: + self._conn.shutdown(socket.SHUT_RDWR) + except socket.error: + pass + self._conn.close() + + def write(self, data): + """Write length prefixed data to socket.""" + + l = _dumps(len(data)) + self._conn.sendall(l + data) + + def read(self): + """Read length prefixed data from socket.""" + + buffer = self.recv(5) + while len(buffer) < 5: + data = self.recv(5 - len(buffer)) + if not data: + raise EofError(len(buffer)) + buffer += data + + if buffer[0] != INTEGER: + raise IOError() + + length = _loads(buffer) + buffer = self.recv(length) + while len(buffer) < length: + data = self.recv(length - len(buffer)) + if not data: + raise EofError(len(buffer)) + buffer += data + + return buffer + + + +class InetConnection(Connection): + """Connection type for INET sockets.""" + + def __init__(self): + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + #s.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) + Connection.__init__(self, s) + + def connect(self, host=LOOPBACK, port=DEFAULT_PORT): + self._conn.connect((host, port)) + return self + + + +class UnixConnection(Connection): + """Connection type for Unix sockets.""" + + def __init__(self): + s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + Connection.__init__(self, s) + + def connect(self, path): + self._conn.connect(path) + return self + + + +class PipeSocket(object): + """Abstract two pipes into socket like interface.""" + + def __init__(self, to_server=None, to_client=None): + self._to_server = to_server or os.pipe() + self._to_client = to_client or os.pipe() + + def connect(self): + self._r = self._to_client[0] + self._w = self._to_server[1] + + def _connect_server(self): + server_end = PipeSocket(self._to_server, self._to_client) + server_end._r = self._to_server[0] + server_end._w = self._to_client[1] + return server_end + + def recv(self, size): + return os.read(self._r, size) + + def sendall(self, data): + return os.write(self._w, data) + + def shutdown(self, x): + pass + + def close(self): + try: + os.close(self._to_server[0]) + os.close(self._to_server[1]) + os.close(self._to_client[0]) + os.close(self._to_client[1]) + except Exception: + pass + + + +class PipeConnection(Connection): + """Connection type for pipes.""" + + def connect(self, pipe_socket): + self._conn = pipe_socket + self._conn.connect() + return self + + + +class Proxy(object): + """Proxy methods of server handler. + Call Proxy(connection).foo(*args, **kwargs) to invoke method + handler.foo(*args, **kwargs) of server handler. + """ + + def __init__(self, conn, name=None, cache=True): + self._conn = conn + self._name = name + self._cache = True + + def __getattr__(self, name): + attr = type(self)(self._conn, name, self._cache) + + if self._cache: + self.__dict__[name] = attr.__call__ + + return attr.__call__ + + def __call__(self, *args, **kwargs): + """Call method on server.""" + + data = dumps((CALL, self._name, args, kwargs)) + self._conn.write(data) + + response = self._conn.read() + value, error = loads(response) + + if error is None: + return value + + try: + name, args = error + except TypeError: + # Handle old way of returning error as repr. + logging.warning('Unknown error returned by proxy, error=%s.', error) + raise ServerError(error) + + logging.warning('Error returned by proxy, name=%s, args=%s.', name, args) + + # Raise built-in exceptions sent by server. + if name in BUILTIN_EXCEPTIONS_NAMES: + e = getattr(builtins, name)() + e.args = args + raise e # Exception sent from server. Original traceback unavailable. + + self._on_exception(name, args) + + def _on_exception(self, name, args): + """Override to raise custom exceptions.""" + + raise ServerError(name, args) + + + +class Notifier(Proxy): + """Proxy methods of server handler, asynchronously. + Call Notifier(connection).foo(*args, **kwargs) to invoke method + handler.foo(*args, **kwargs) of server handler. + """ + + def __call__(self, *args, **kwargs): + """Call method on server, don't wait for response.""" + + data = dumps((NOTIFY, self._name, args, kwargs)) + self._conn.write(data) + + + +g_threads_semaphore = threading.Semaphore(MAX_THREADS) + +def run_in_thread(foo): + """Decorate to run foo using bounded number of threads.""" + + def wrapper1(*args, **kwargs): + try: + foo(*args, **kwargs) + finally: + g_threads_semaphore.release() + + def wrapper2(*args, **kwargs): + g_threads_semaphore.acquire() + thread.start_new_thread(wrapper1, args, kwargs) + + return wrapper2 + + + +class Server(object): + """Serve calls over connection.""" + + def __init__(self, handler_type, handler_context=None, conn=None): + self._handler_context = handler_context + self._handler_type = handler_type + self._conn = conn + + def close(self): + if self._conn is not None: + try: + self._conn.shutdown(socket.SHUT_RDWR) + except socket.error: + pass + self._conn.close() + + def start(self): + """Start server, is it? + Socket is excpted bound. + """ + + logging.info('Enter.') + + try: + self._conn.listen(5) + + while True: + conn, addr = self._conn.accept() + conn.settimeout(None) + self._on_accept(conn, addr) + + finally: + self.close() + + def _on_accept(self, conn, addr): + """Serve acceptted connection. + Should be used in the context of a threaded server, see + threaded_connection(), or fork server (not implemented here). + """ + + logging.info('Enter, addr=%s.', addr) + + c = Connection(conn) + + try: + # + # Instantiate handler for the lifetime of the connection, + # making it possible to manage a state between calls. + # + handler = self._handler_type(addr, self._handler_context) + + try: + while True: + self._dispatch(handler, c) + + except EofError: + logging.debug('Caught end of file, error=%r.', sys.exc_info()[1]) + + finally: + c.close() + if 'handler' in locals(): + handler._close() + + def _dispatch(self, handler, conn, n=1000): + """Serve single call.""" + + for i in range(n): + data = conn.read() + type, name, args, kwargs = loads(data) + + try: + foo = handler._methods.get(name, None) or handler._get_method(name) + result = foo(*args, **kwargs) + error = None + + except Exception: + logging.debug('Caught exception raised by callable.', exc_info=True) + # Use exc_info for py2.x py3.x compatibility. + t, v, tb = sys.exc_info() + if t in BUILTIN_EXCEPTIONS: + error = (t.__name__, v.args) + else: + error = (repr(t), v.args) + result = None + + if type == CALL: + response = dumps((result, error)) + conn.write(response) + + + +class InetServer(Server): + """Serve calls over INET sockets.""" + + def __init__(self, handler_type, handler_context=None): + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + s.settimeout(None) + Server.__init__(self, handler_type, handler_context, s) + + def start(self, host=LOOPBACK, port=DEFAULT_PORT): + self._conn.bind((host, port)) + Server.start(self) + + _on_accept = run_in_thread(Server._on_accept) + + + +class UnixServer(Server): + """Serve calls over Unix sockets.""" + + def __init__(self, handler_type, handler_context=None): + s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + s.settimeout(None) + Server.__init__(self, handler_type, handler_context, s) + + def start(self, path): + self._conn.bind(path) + Server.start(self) + + _on_accept = run_in_thread(Server._on_accept) + + + +class PipeServer(Server): + """Serve calls over pipes.""" + + def start(self, pipe_socket): + self._conn = pipe_socket._connect_server() + self._on_accept(self._conn, 'pipes') + + + +def start_server(handler, host=LOOPBACK, port=DEFAULT_PORT): + "Start server - depratcated.""" + + InetServer(handler).start(host, port) + + + +def connect(host=LOOPBACK, port=DEFAULT_PORT): + """Connect to server - depracated.""" + + return InetConnection().connect(host, port) + + + diff --git a/rfoo/marsh.pyx b/rfoo/marsh.pyx new file mode 100644 index 0000000..769f495 --- /dev/null +++ b/rfoo/marsh.pyx @@ -0,0 +1,178 @@ +""" + marsh.pyx + + Safe marshal. + + Provide safe marshaling of data ontop of the built in marshal module + by forbidding code objects. + + Does not support long objects since verifying them requires a rocket + scientist. + + Copyright (c) 2010 Nir Aides and individual contributors. + All rights reserved. + + Redistribution and use in source and binary forms, with or without modification, + are permitted provided that the following conditions are met: + + 1. Redistributions of source code must retain the above copyright notice, + this list of conditions and the following disclaimer. + + 2. Redistributions in binary form must reproduce the above copyright + notice, this list of conditions and the following disclaimer in the + documentation and/or other materials provided with the distribution. + + 3. Neither the name of Nir Aides nor the names of other contributors may + be used to endorse or promote products derived from this software without + specific prior written permission. + + THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND + ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED + WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE + DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR + ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES + (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; + LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON + ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS + SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +""" + + +import marshal as _marshal + + +cdef char NONE = 'N' +cdef char TRUE = 'T' +cdef char FALSE = 'F' +cdef char INT32 = 'i' +cdef char INT64 = 'I' +cdef char LONG = 'l' +cdef char FLOAT = 'f' +cdef char BINARY_FLOAT = 'g' +cdef char STRINGREF = 'R' +cdef char UNICODE = 'u' +cdef char STRING = 's' +cdef char INTERNED = 't' +cdef char LIST = '[' +cdef char TUPLE = '(' +cdef char SET = '<' +cdef char FROZEN_SET = '>' +cdef char DICT = '{' +cdef char DICT_CLOSE = '0' +cdef char PAD = '_' + + +cdef int verify_string(char *s_, unsigned int length): + """Verify marshaled data contains supported data types only.""" + + cdef unsigned char *s = s_ + cdef unsigned char *eof = s + length + cdef unsigned int nstrings = 0 + cdef unsigned int i + cdef unsigned int v + cdef unsigned int m + + while s < eof: + if s[0] == INT32: + s += 5 + continue + + if s[0] in (INT64, BINARY_FLOAT): + s += 9 + continue + + if s[0] == LONG: + if s + 5 > eof: + return 0 + + m = 1 + v = 0 + for i in range(1, 5): + v += m * s[i] + m *= 256 + + s += 5 + v * 2 + continue + + if s[0] in (NONE, TRUE, FALSE): + s += 1 + continue + + if s[0] == FLOAT: + if s + 2 > eof: + return 0 + + s += 2 + s[1] + continue + + if s[0] in (UNICODE, STRING, INTERNED): + if s + 5 > eof: + return 0 + + if s[0] == INTERNED: + nstrings += 1 + + m = 1 + v = 0 + for i in range(1, 5): + v += m * s[i] + m *= 256 + + s += 5 + v + continue + + if s[0] == STRINGREF: + if s + 5 > eof: + return 0 + + m = 1 + v = 0 + for i in range(1, 5): + v += m * s[i] + m *= 256 + + # String reference to non-existing string. + if v >= nstrings: + return 0 + + s += 5 + continue + + if s[0] in (LIST, TUPLE, SET, FROZEN_SET): + s += 5 + continue + + if s[0] in (DICT, DICT_CLOSE): + s += 1 + continue + + if s[0] == PAD: + s += 1 + while s < eof and s[0] == PAD: + s += 1 + return s == eof + + return 0 + + if s > eof: + return 0 + + return 1 + + +def loads(s): + if verify_string(s, len(s)) == 0: + raise ValueError('bad marshal data') + + return _marshal.loads(s) + + +def dumps(expr): + s = _marshal.dumps(expr, 1) + if verify_string(s, len(s)) == 0: + raise ValueError('unsupported marshal data') + + return s + + diff --git a/rfoo/utils/__init__.py b/rfoo/utils/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/rfoo/utils/rconsole.py b/rfoo/utils/rconsole.py new file mode 100644 index 0000000..5af93ac --- /dev/null +++ b/rfoo/utils/rconsole.py @@ -0,0 +1,184 @@ +""" + rconsole.py + + A Python console you can embed in a program and attach to remotely. + + To spawn a Python console in a script do the following in global scope + only (!) of any module: + + from rfoo.utils import rconsole + rconsole.spawn_server() + + This will start a listener for connections in a new thread. You may + specify a port to listen on. + + To attach to the console from another shell use the rconsole script + (rfoo/scripts/rconsole) which may be installed in your path. + Alternatively you may simply invoke interact(). + + SECURITY NOTE: + The listener started with spawn_server() will accept any local + connection and may therefore be insecure to use in shared hosting + or similar environments! +""" + + +import rlcompleter +import logging +import pprint +import codeop +import socket +import code +import rfoo +import sys + +try: + import thread +except ImportError: + import _thread as thread + + +PORT = 54321 + + +class BufferedInterpreter(code.InteractiveInterpreter): + """Variation of code.InteractiveInterpreter that outputs to buffer.""" + + def __init__(self, *args, **kwargs): + code.InteractiveInterpreter.__init__(self, *args, **kwargs) + self.buffout = '' + + def write(self, data): + self.buffout += data + + +class ConsoleHandler(rfoo.BaseHandler): + """An rfoo handler that remotes a Python interpreter.""" + + def __init__(self, *args, **kwargs): + rfoo.BaseHandler.__init__(self, *args, **kwargs) + + self._namespace = self._context + self._interpreter = BufferedInterpreter(self._namespace) + self._completer = rlcompleter.Completer(self._namespace) + + @rfoo.restrict_local + def complete(self, phrase, state): + """Auto complete for remote console.""" + logging.debug('Enter, phrase=%r, state=%d.', phrase, state) + return self._completer.complete(phrase, state) + + @rfoo.restrict_local + def runsource(self, source, filename=""): + """Variation of InteractiveConsole which returns expression + result as second element of returned tuple. + """ + + logging.debug('Enter, source=%r.', source) + + # Inject a global variable to capture expression result. + self._namespace['_rcon_result_'] = None + + try: + # In case of an expression, capture result. + compile(source, '', 'eval') + source = '_rcon_result_ = ' + source + logging.debug('source is an expression.') + except SyntaxError: + pass + + more = self._interpreter.runsource(source, filename) + result = self._namespace.pop('_rcon_result_') + + if more is True: + logging.debug('source is incomplete.') + return True, '' + + output = self._interpreter.buffout + self._interpreter.buffout = '' + + if result is not None: + result = pprint.pformat(result) + output += result + '\n' + + return False, output + + +class ProxyConsole(code.InteractiveConsole): + """Proxy interactive console to remote interpreter.""" + + def __init__(self, port=PORT): + code.InteractiveConsole.__init__(self) + self.port = port + self.conn = None + + def interact(self, banner=None): + logging.info('Enter.') + self.conn = rfoo.InetConnection().connect(port=self.port) + return code.InteractiveConsole.interact(self, banner) + + def complete(self, phrase, state): + """Auto complete support for interactive console.""" + + logging.debug('Enter, phrase=%r, state=%d.', phrase, state) + + # Allow tab key to simply insert spaces when proper. + if phrase == '': + if state == 0: + return ' ' + return None + + return rfoo.Proxy(self.conn).complete(phrase, state) + + def runsource(self, source, filename="", symbol="single"): + logging.debug('Enter, source=%r.', source) + + more, output = rfoo.Proxy(self.conn).runsource(source, filename) + if output: + self.write(output) + + return more + + +def spawn_server(namespace=None, port=PORT): + """Start console server in a new thread. + Should be called from global scope only! + May be insecure on shared machines. + """ + + logging.info('Enter, port=%d.', port) + + if namespace is None: + namespace = sys._getframe(1).f_globals + + server = rfoo.InetServer(ConsoleHandler, namespace) + + def _wrapper(): + try: + server.start(rfoo.LOOPBACK, port) + except socket.error: + logging.warning('Failed to bind rconsole to socket port, port=%r.', port) + + thread.start_new_thread(_wrapper, ()) + + +def interact(banner=None, readfunc=None, port=PORT): + """Start console and connect to remote console server.""" + + logging.info('Enter, port=%d.', port) + + console = ProxyConsole(port) + + if readfunc is not None: + console.raw_input = readfunc + else: + try: + import readline + readline.set_completer(console.complete) + readline.parse_and_bind('tab: complete') + except ImportError: + pass + + console.interact(banner) + + diff --git a/scripts/rconsole b/scripts/rconsole new file mode 100644 index 0000000..799bce3 --- /dev/null +++ b/scripts/rconsole @@ -0,0 +1,74 @@ +#! /usr/bin/env python +""" + rconsole + + A Python console you can embed in a program and attach to remotely. + + To spawn a Python console in a script do the following in global scope + only (!) of any module: + + from rfoo.utils import rconsole + rconsole.spawn_server() + + This will start a listener for connections in a new thread. You may + specify a port to listen on. + + To attach to the console from another shell use the rconsole script + (rfoo/scripts/rconsole). + + SECURITY NOTE: + The listener started with spawn_server() will accept any local + connection and may therefore be insecure to use in shared hosting + or similar environments! +""" + + +import getopt +import sys +import os + +import rfoo.utils.rconsole as rconsole + + +def print_usage(): + scriptName = os.path.basename(sys.argv[0]) + sys.stdout.write(""" +Start remote console: +%(name)s [-h] [-pPORT] + +-h, --help Print this help. +-pPORT Set PORT. +""" % {'name': scriptName}) + + +def main(): + """Parse options and run script.""" + + try: + options, args = getopt.getopt( + sys.argv[1:], + 'hp:', + ['help'] + ) + options = dict(options) + + except getopt.GetoptError: + print_usage() + return 2 + + if '-h' in options or '--help' in options: + print_usage() + return + + if '-p' in options: + port = int(options.get('-p')) + else: + port = rconsole.PORT + + rconsole.interact(port=port) + + +if __name__ == '__main__': + main() + + diff --git a/setup.py b/setup.py new file mode 100644 index 0000000..7a96217 --- /dev/null +++ b/setup.py @@ -0,0 +1,90 @@ +""" + setup.py + + Setup script for rfoo + + Copyright (c) 2010 Nir Aides and individual contributors. + All rights reserved. + + Redistribution and use in source and binary forms, with or without modification, + are permitted provided that the following conditions are met: + + 1. Redistributions of source code must retain the above copyright notice, + this list of conditions and the following disclaimer. + + 2. Redistributions in binary form must reproduce the above copyright + notice, this list of conditions and the following disclaimer in the + documentation and/or other materials provided with the distribution. + + 3. Neither the name of Nir Aides nor the names of other contributors may + be used to endorse or promote products derived from this software without + specific prior written permission. + + THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND + ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED + WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE + DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR + ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES + (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; + LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON + ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS + SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +""" + + +import sys + +from distutils.core import setup +from distutils.extension import Extension + +try: + from Cython.Distutils import build_ext +except ImportError: + sys.stderr.write("""=========================================================== +rfoo depends on Cython - http://cython.org/ + +To install Cython follow the simple instructions at: +http://docs.cython.org/src/quickstart/install.html + +Basically, you need gcc installed on your system: + sudo apt-get install build-essential + +and then setup the latest source version of Cython with: + sudo python setup.py install +===========================================================\n""") + sys.exit(1) + + +if 'bdist_egg' in sys.argv: + sys.stderr.write("""=========================================================== +rfoo can not be installed by easy_install due to conflict +between easy_install and Cython: +http://mail.python.org/pipermail/distutils-sig/2007-September/008205.html + +To install rfoo, download the source archive, extract it +and install with: + sudo python setup.py install +===========================================================\n""") + sys.exit(1) + + +ext_modules = [Extension("rfoo.marsh", ["rfoo/marsh.pyx"])] + + +setup( + name = 'rfoo', + version = '1.3.0', + description = 'Fast RPC client/server module.', + author = 'Nir Aides', + author_email = 'nir@winpdb.org', + url = 'http://www.winpdb.org/', + license = 'BSD', + packages = ['rfoo', 'rfoo.utils'], + scripts = ['scripts/rconsole'], + cmdclass = {'build_ext': build_ext}, + ext_modules = ext_modules +) + + + diff --git a/tests/rfoo_runner.py b/tests/rfoo_runner.py new file mode 100644 index 0000000..93284eb --- /dev/null +++ b/tests/rfoo_runner.py @@ -0,0 +1,258 @@ +#! /usr/bin/env python + +""" + tests/rfoo_runner.py + + Fast RPC server. + + Copyright (c) 2010 Nir Aides and individual contributors. + All rights reserved. + + Redistribution and use in source and binary forms, with or without modification, + are permitted provided that the following conditions are met: + + 1. Redistributions of source code must retain the above copyright notice, + this list of conditions and the following disclaimer. + + 2. Redistributions in binary form must reproduce the above copyright + notice, this list of conditions and the following disclaimer in the + documentation and/or other materials provided with the distribution. + + 3. Neither the name of Nir Aides nor the names of other contributors may + be used to endorse or promote products derived from this software without + specific prior written permission. + + THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND + ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED + WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE + DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR + ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES + (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; + LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON + ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS + SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +""" + + + +import threading +import logging +import getopt +import time +import rfoo +import sys +import os + + + +# +# Python 2.5 logging module supports function name in format string. +# +if logging.__version__[:3] >= '0.5': + LOGGING_FORMAT = '[%(process)d:%(thread).5s] %(asctime)s %(levelname)s %(module)s:%(lineno)d %(funcName)s() - %(message)s' +else: + LOGGING_FORMAT = '[%(process)d:%(thread).5s] %(asctime)s %(levelname)s %(module)s:%(lineno)d - %(message)s' + + + +ISPY3K = sys.version_info[0] >= 3 + + + +class DummySocket(object): + def __init__(self, handler, conn): + self._handler = handler + self._conn = conn + self._buffer = '' + self._counter = 0 + self._server = rfoo.Server(self._handler) + + def shutdown(self, x): + pass + + def close(self): + pass + + def sendall(self, data): + self._buffer = data + + self._counter += 1 + if self._counter % 2 == 1: + self._server._dispatch(self._handler, self._conn) + + def recv(self, size): + data = self._buffer[:size] + self._buffer = self._buffer[size:] + return data + + + +class DummyConnection(rfoo.Connection): + """Dispatch without network, for debugging.""" + + def __init__(self, handler): + rfoo.Connection.__init__(self, DummySocket(handler, self)) + + + +def print_usage(): + scriptName = os.path.basename(sys.argv[0]) + sys.stdout.write(""" +Start server: +%(name)s -s [-pPORT] + +Start client: +%(name)s [-c] [-oHOST] [-pPORT] [-nN] [data] + +data, if present should be an integer value, which controls the +length of a CPU intensive loop performed at the server. + +-h, --help Print this help. +-v Debug output. +-s Start server. +-a Use async notifications instead of synchronous calls. +-c Setup and tear down connection with each iteration. +-oHOST Set HOST. +-pPORT Set PORT. +-nN Repeat client call N times. +-tN Number of client threads to use. +-iF Set thread switch interval in seconds (float). +""" % {'name': scriptName}) + + + +def main(): + """Parse options and run script.""" + + try: + options, args = getopt.getopt( + sys.argv[1:], + 'hvsacuo:p:n:t:i:', + ['help'] + ) + options = dict(options) + + except getopt.GetoptError: + print_usage() + return 2 + + if '-h' in options or '--help' in options: + print_usage() + return + + # + # Prevent timing single connection async calls since + # this combination will simply generate a SYN flood, + # and is not a practical use case. + # + if '-a' in options and '-c' in options: + print_usage() + return + + if '-v' in options: + level = logging.DEBUG + verbose = True + else: + level = logging.WARNING + verbose = False + + logging.basicConfig( + level=level, + format=LOGGING_FORMAT, + stream=sys.stderr + ) + + if '-i' in options: + interval = float(options.get('-i')) + sys.setswitchinterval(interval) + + host = options.get('-o', '127.0.0.1') + port = int(options.get('-p', rfoo.DEFAULT_PORT)) + + t0 = time.time() + try: + if '-s' in options: + logging.warning('Start as server.') + rfoo.start_server(host=host, port=port, handler=rfoo.ExampleHandler) + return + + logging.warning('Start as client.') + + if len(args) > 0: + data = 'x' * int(args[0]) + else: + data = 'x' + + n = int(options.get('-n', 1)) + t = int(options.get('-t', 1)) + m = int(n / t) + + if '-a' in options: + gate = rfoo.Notifier + else: + gate = rfoo.Proxy + + def client(): + # + # Time connection setup/teardown. + # + if '-c' in options: + for i in range(m): + connection = rfoo.connect(host=host, port=port) + r = rfoo.Proxy(connection).echo(data) + if level == logging.DEBUG: + logging.debug('Received %r from proxy.', r) + connection.close() + + # + # Time with dummy connection (no network). + # + elif '-u' in options: + handler = rfoo.ExampleHandler() + dummy = DummyConnection(handler) + echo = gate(dummy).echo + for i in range(m): + r = echo(data) + if level == logging.DEBUG: + logging.debug('Received %r from proxy.', r) + + # + # Time calls synched / asynch (notifications). + # + else: + connection = rfoo.connect(host=host, port=port) + echo = gate(connection).echo + for i in range(m): + r = echo(data) + #if level == logging.DEBUG: + # logging.debug('Received %r from proxy.', r) + + logging.warning('Received %r from proxy.', r) + + if t == 1: + client() + return + + threads = [threading.Thread(target=client) for i in range(t)] + t0 = time.time() + + for t in threads: + t.start() + + for t in threads: + t.join() + + finally: + logging.warning('Running time, %f seconds.', time.time() - t0) + + + +if __name__ == '__main__': + #import cProfile; + #cProfile.run('main()', '/tmp/profiled'); + main() + + + +