Added topic in Gateway/Relay to allow filtering

This commit is contained in:
AnthorNet 2015-05-05 12:37:57 +02:00
parent 02bf4d8c35
commit c7187e90bd
2 changed files with 26 additions and 6 deletions

View File

@ -45,7 +45,7 @@ def configure():
validator.addSchemaResource(schemaRef, os.path.dirname(__file__) + '/' + schemaFile)
def push_message(string_message):
def push_message(string_message, topic):
"""
Spawned as a greenlet to push messages (strings) through ZeroMQ.
This is a dumb method that just pushes strings; it assumes you've already validated
@ -53,9 +53,9 @@ def push_message(string_message):
"""
# Push a zlib compressed JSON representation of the message to
# announcers.
# announcers with schema as topic
compressed_msg = zlib.compress(string_message)
sender.send(compressed_msg)
sender.send("%s |-| %s" % (topic, compressed_msg))
statsCollector.tally("outbound")
@ -142,7 +142,7 @@ def parse_and_error_handle(data):
# Sends the parsed MarketOrderList or MarketHistoryList to the Announcers
# as compressed JSON.
gevent.spawn(push_message, simplejson.dumps(parsed_message))
gevent.spawn(push_message, (simplejson.dumps(parsed_message), parsed_message['$schemaRef']))
logger.info("Accepted %s upload from %s" % (
parsed_message, get_remote_address()
))

View File

@ -42,12 +42,23 @@ class Relay(Thread):
context = zmq.Context()
receiver = context.socket(zmq.SUB)
receiver.setsockopt(zmq.SUBSCRIBE, '')
# Filters on topics or not...
if Settings.RELAY_RECEIVE_ONLY_GATEWAY_EXTRA_JSON == True:
for schemaRef, schemaFile in Settings.GATEWAY_JSON_SCHEMAS.iteritems():
receiver.setsockopt(zmq.SUBSCRIBE, schemaRef)
for schemaRef, schemaFile in Settings.RELAY_EXTRA_JSON_SCHEMAS.iteritems():
receiver.setsockopt(zmq.SUBSCRIBE, schemaRef)
else:
receiver.setsockopt(zmq.SUBSCRIBE, '')
for binding in Settings.RELAY_RECEIVER_BINDINGS:
# Relays bind upstream to an Announcer, or another Relay.
receiver.connect(binding)
sender = context.socket(zmq.PUB)
for binding in Settings.RELAY_SENDER_BINDINGS:
# End users, or other relays, may attach here.
sender.bind(binding)
@ -58,8 +69,17 @@ class Relay(Thread):
to any subscribers.
:param str message: A JSON string to re-broadcast.
"""
# Separate topic from message
message = message.split(' |-| ')
# Handle gateway not sending topic
if len(message) > 1:
message = message[1]
else:
message = message[0]
# if is_message_duped(message):
# We've already seen this message recently. Discard it.
# We've already seen this message recently. Discard it.
# return
if Settings.RELAY_DECOMPRESS_MESSAGES: