diff --git a/src/eddn/Gateway.py b/src/eddn/Gateway.py index dcb4eac..0e1b1db 100644 --- a/src/eddn/Gateway.py +++ b/src/eddn/Gateway.py @@ -45,7 +45,7 @@ def configure(): validator.addSchemaResource(schemaRef, os.path.dirname(__file__) + '/' + schemaFile) -def push_message(string_message): +def push_message(string_message, topic): """ Spawned as a greenlet to push messages (strings) through ZeroMQ. This is a dumb method that just pushes strings; it assumes you've already validated @@ -53,9 +53,9 @@ def push_message(string_message): """ # Push a zlib compressed JSON representation of the message to - # announcers. + # announcers with schema as topic compressed_msg = zlib.compress(string_message) - sender.send(compressed_msg) + sender.send("%s |-| %s" % (topic, compressed_msg)) statsCollector.tally("outbound") @@ -142,7 +142,7 @@ def parse_and_error_handle(data): # Sends the parsed MarketOrderList or MarketHistoryList to the Announcers # as compressed JSON. - gevent.spawn(push_message, simplejson.dumps(parsed_message)) + gevent.spawn(push_message, (simplejson.dumps(parsed_message), parsed_message['$schemaRef'])) logger.info("Accepted %s upload from %s" % ( parsed_message, get_remote_address() )) diff --git a/src/eddn/Relay.py b/src/eddn/Relay.py index 042ecde..42a0285 100644 --- a/src/eddn/Relay.py +++ b/src/eddn/Relay.py @@ -42,12 +42,23 @@ class Relay(Thread): context = zmq.Context() receiver = context.socket(zmq.SUB) - receiver.setsockopt(zmq.SUBSCRIBE, '') + + # Filters on topics or not... + if Settings.RELAY_RECEIVE_ONLY_GATEWAY_EXTRA_JSON == True: + for schemaRef, schemaFile in Settings.GATEWAY_JSON_SCHEMAS.iteritems(): + receiver.setsockopt(zmq.SUBSCRIBE, schemaRef) + for schemaRef, schemaFile in Settings.RELAY_EXTRA_JSON_SCHEMAS.iteritems(): + receiver.setsockopt(zmq.SUBSCRIBE, schemaRef) + else: + receiver.setsockopt(zmq.SUBSCRIBE, '') + + for binding in Settings.RELAY_RECEIVER_BINDINGS: # Relays bind upstream to an Announcer, or another Relay. receiver.connect(binding) sender = context.socket(zmq.PUB) + for binding in Settings.RELAY_SENDER_BINDINGS: # End users, or other relays, may attach here. sender.bind(binding) @@ -58,8 +69,17 @@ class Relay(Thread): to any subscribers. :param str message: A JSON string to re-broadcast. """ + # Separate topic from message + message = message.split(' |-| ') + + # Handle gateway not sending topic + if len(message) > 1: + message = message[1] + else: + message = message[0] + # if is_message_duped(message): - # We've already seen this message recently. Discard it. + # We've already seen this message recently. Discard it. # return if Settings.RELAY_DECOMPRESS_MESSAGES: