Skip to content

Commit bc5c5cf

Browse files
committed
add object proxy subscribe event test
1 parent 22eeed8 commit bc5c5cf

File tree

8 files changed

+132
-78
lines changed

8 files changed

+132
-78
lines changed

hololinked/client/factory.py

Lines changed: 41 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,8 @@
33
from tornado.httpclient import HTTPClient, HTTPRequest, HTTPResponse
44
from tornado.simple_httpclient import HTTPTimeoutError
55

6-
from ..core.zmq import SyncZMQClient, AsyncZMQClient
76
from ..core import Thing, Action
7+
from ..core.zmq import SyncZMQClient, AsyncZMQClient, EventConsumer, AsyncEventConsumer
88
from ..td.interaction_affordance import PropertyAffordance, ActionAffordance, EventAffordance
99
from ..serializers import Serializers
1010
from .abstractions import ConsumedThingAction, ConsumedThingProperty, ConsumedThingEvent
@@ -55,17 +55,28 @@ def zmq(self, server_id: str, thing_id: str, protocol: str, **kwargs):
5555
execution_timeout=object_proxy.execution_timeout,
5656
)
5757
self.add_property(object_proxy, consumed_property)
58-
if hasattr(affordance, "observable") and affordance.observable:
59-
pass
60-
# consumed_property_event = ZMQEvent(
58+
# if hasattr(affordance, "observable") and affordance.observable:
59+
# sync_event_client = EventConsumer(
60+
# id=f"{TD['id']}|{affordance.name}|sync",
61+
# event_unique_identifier=affordance.zmq_unique_identifier,
62+
# socket_address=affordance.zmq_socket_address,
63+
# logger=object_proxy.logger
64+
# )
65+
# async_event_client = AsyncEventConsumer(
66+
# id=f"{TD['id']}|{affordance.name}|async",
67+
# event_unique_identifier=affordance.zmq_unique_identifier,
68+
# socket_address=affordance.zmq_socket_address,
69+
# logger=object_proxy.logger
70+
# )
71+
# consumed_observable = ZMQEvent(
6172
# resource=affordance,
62-
# sync_zmq_client=sync_zmq_client,
63-
# async_zmq_client=async_zmq_client,
73+
# sync_zmq_client=sync_event_client,
74+
# async_zmq_client=async_event_client,
6475
# owner_inst=object_proxy,
6576
# invokation_timeout=object_proxy.invokation_timeout,
6677
# execution_timeout=object_proxy.execution_timeout,
67-
# )
68-
# self.add_event(object_proxy, consumed_property_event)
78+
# )
79+
# self.add_event(object_proxy, consumed_observable)
6980
for action in TD.get("actions", []):
7081
affordance = ActionAffordance.from_TD(action, TD)
7182
consumed_action = ZMQAction(
@@ -79,13 +90,27 @@ def zmq(self, server_id: str, thing_id: str, protocol: str, **kwargs):
7990
self.add_action(object_proxy, consumed_action)
8091
for event in TD.get("events", []):
8192
affordance = EventAffordance.from_TD(event, TD)
93+
zmq_socket_address = affordance.zmq_socket_address
94+
# if protocol.lower() == "ipc":
95+
# zmq_socket_address = affordance.zmq_socket_address.replace("inproc://", "ipc://")
96+
# zmq_socket_address = f'{zmq_socket_address}.ipc'
97+
sync_event_client = EventConsumer(
98+
id=f"{TD['id']}|{affordance.name}|sync",
99+
event_unique_identifier=affordance.zmq_unique_identifier,
100+
socket_address=zmq_socket_address,
101+
logger=object_proxy.logger
102+
)
103+
async_event_client = AsyncEventConsumer(
104+
id=f"{TD['id']}|{affordance.name}|async",
105+
event_unique_identifier=affordance.zmq_unique_identifier,
106+
socket_address=zmq_socket_address,
107+
logger=object_proxy.logger
108+
)
82109
consumed_event = ZMQEvent(
83110
resource=affordance,
84-
sync_zmq_client=sync_zmq_client,
85-
async_zmq_client=async_zmq_client,
86111
owner_inst=object_proxy,
87-
invokation_timeout=object_proxy.invokation_timeout,
88-
execution_timeout=object_proxy.execution_timeout,
112+
sync_zmq_client=sync_event_client,
113+
async_zmq_client=async_event_client,
89114
)
90115
self.add_event(object_proxy, consumed_event)
91116
for opname, ophandler in zip(['_get_properties', '_set_properties'], [ReadMultipleProperties, WriteMultipleProperties]):
@@ -173,7 +198,7 @@ def add_property(self, client, property: ConsumedThingProperty) -> None:
173198

174199
@classmethod
175200
def add_event(cls, client, event: ConsumedThingEvent) -> None:
176-
setattr(client, event._resource.name, event)
177-
178-
179-
201+
if hasattr(event._resource, "observable") and event._resource.observable:
202+
setattr(client, f"{event._resource.name}_change_event", event)
203+
else:
204+
setattr(client, event._resource.name, event)

hololinked/config.py

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ class Configuration:
9191
# Schema
9292
'VALIDATE_SCHEMA_ON_CLIENT', 'VALIDATE_SCHEMAS',
9393
# ZMQ
94-
"ZMQ_ASYNC_CONTEXT", "ZMQ_SYNC_CONTEXT",
94+
"ZMQ_CONTEXT",
9595
# make debugging easier
9696
"DEBUG",
9797
# serializers
@@ -115,8 +115,7 @@ def load_variables(self, use_environment : bool = False):
115115
self.TRACE_MALLOC = False
116116
self.VALIDATE_SCHEMA_ON_CLIENT = False
117117
self.VALIDATE_SCHEMAS = True
118-
self.ZMQ_ASYNC_CONTEXT = zmq.asyncio.Context()
119-
self.ZMQ_SYNC_CONTEXT = zmq.Context()
118+
self.ZMQ_CONTEXT = zmq.asyncio.Context()
120119
self.DEBUG = False
121120
self.ALLOW_PICKLE = False
122121
self.ALLOW_UNKNOWN_SERIALIZATION = False
@@ -152,15 +151,13 @@ def asdict(self):
152151
"returns this config as a regular dictionary"
153152
return {item: getattr(self, item) for item in self.__slots__}
154153

155-
def zmq_context(self, asynch: bool = True) -> zmq.Context | zmq.asyncio.Context:
154+
def zmq_context(self) -> zmq.asyncio.Context:
156155
"""
157-
Returns a global ZMQ context based on the asynch flag.
158-
Intended to share the same context across an application.
159-
"""
160-
if asynch:
161-
return self.ZMQ_ASYNC_CONTEXT
162-
return self.ZMQ_SYNC_CONTEXT
163-
156+
Returns a global ZMQ async context. Use socket_class argument to retrieve
157+
a synchronous socket if necessary.
158+
"""
159+
return self.ZMQ_CONTEXT
160+
164161
def set_default_server_execution_context(self,
165162
invokation_timeout: typing.Optional[int] = None,
166163
execution_timeout: typing.Optional[int] = None,

hololinked/core/zmq/brokers.py

Lines changed: 31 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -52,9 +52,14 @@ def __del__(self) -> None:
5252

5353

5454
@classmethod
55-
def get_socket(cls, *, id: str, node_type: str, context: zmq.asyncio.Context | zmq.Context,
56-
transport: ZMQ_TRANSPORTS = ZMQ_TRANSPORTS.IPC, socket_type: zmq.SocketType = zmq.ROUTER,
57-
**kwargs) -> typing.Tuple[zmq.Socket, str]:
55+
def get_socket(cls, *,
56+
id: str,
57+
node_type: str,
58+
context: zmq.asyncio.Context | zmq.Context,
59+
transport: ZMQ_TRANSPORTS = ZMQ_TRANSPORTS.IPC,
60+
socket_type: zmq.SocketType = zmq.ROUTER,
61+
**kwargs
62+
) -> typing.Tuple[zmq.Socket, str]:
5863
"""
5964
Create a socket with certain specifications. Supported ZeroMQ transports are TCP, IPC & INPROC.
6065
For IPC sockets, a file is created under TEMP_DIR of global configuration.
@@ -164,7 +169,7 @@ def create_socket(self, *, id: str, node_type: str = 'server', context: zmq.asyn
164169
"""
165170
if context and not isinstance(context, zmq.asyncio.Context):
166171
raise TypeError("async ZMQ message broker accepts only async ZMQ context. supplied type {}".format(type(context)))
167-
self.context = context or global_config.zmq_context(asynch=True)
172+
self.context = context or global_config.zmq_context()
168173
self.socket, self.socket_address = BaseZMQ.get_socket(id=id, node_type=node_type, context=self.context,
169174
transport=transport, socket_type=socket_type, **kwargs)
170175
self.logger.info("created socket {} with address {} & identity {} and {}".format(get_socket_type_name(socket_type),
@@ -183,17 +188,9 @@ def create_socket(self, *, id: str, node_type: str = 'server', context: zmq.Cont
183188
Overloads ``create_socket()`` to create, bind/connect a synchronous socket. A synchronous context is created
184189
if none is supplied.
185190
"""
186-
socket_class = None
187-
if context:
188-
if not isinstance(context, zmq.Context):
189-
raise TypeError("sync ZMQ message broker accepts only sync ZMQ context. supplied type {}".format(type(context)))
190-
if isinstance(context, zmq.asyncio.Context):
191-
# create sync socket when async context is supplied for sync brokers.
192-
# especially useful for INPROC sync client where teh context needs to be shared and the server is async.
193-
socket_class = zmq.Socket
194-
self.context = context or global_config.zmq_context(asynch=False)
191+
self.context = context or global_config.zmq_context()
195192
self.socket, self.socket_address = BaseZMQ.get_socket(id=id, node_type=node_type, context=self.context,
196-
transport=transport, socket_type=socket_type, socket_class=socket_class,
193+
transport=transport, socket_type=socket_type, socket_class=zmq.Socket,
197194
**kwargs)
198195
self.logger.info("created socket {} with address {} & identity {} and {}".format(get_socket_type_name(socket_type),
199196
self.socket_address, id, "bound" if node_type == 'server' else "connected"))
@@ -593,7 +590,7 @@ class ZMQServerPool(BaseZMQServer):
593590
"""
594591

595592
def __init__(self, *, ids: typing.List[str] | None = None, **kwargs) -> None:
596-
self.context = global_config.zmq_context(asynch=True)
593+
self.context = global_config.zmq_context()
597594
self.poller = zmq.asyncio.Poller()
598595
self.pool = dict() # type: typing.Dict[str, AsyncZMQServer]
599596
if ids:
@@ -791,8 +788,13 @@ def exit(self) -> None:
791788
self.poller.unregister(self._monitor_socket)
792789
# print("poller exception did not occur 3")
793790
except Exception as ex:
794-
self.logger.warning(f"unable to deregister from poller - {str(ex)}")
795-
791+
# raises a weird key error for some reason
792+
# unable to deregister from poller - <zmq.asyncio.Socket(zmq.PAIR) at 0x1c9e5028830> - KeyError
793+
# unable to deregister from poller - <zmq.asyncio.Socket(zmq.PAIR) at 0x1c9e502a350> - KeyError
794+
# unable to deregister from poller - <zmq.asyncio.Socket(zmq.PAIR) at 0x1c9e5080750> - KeyError
795+
# unable to deregister from poller - <zmq.asyncio.Socket(zmq.PAIR) at 0x1c9e5082430> - KeyError
796+
# self.logger.warning(f"unable to deregister from poller - {str(ex)} - {type(ex).__name__}")
797+
pass
796798
try:
797799
if self._monitor_socket is not None:
798800
self._monitor_socket.close(0)
@@ -1341,7 +1343,7 @@ def __init__(self,
13411343
if len(client_ids) != len(server_ids):
13421344
raise ValueError("client_ids and server_ids must have same length")
13431345
# this class does not call create_socket method
1344-
self.context = context or global_config.zmq_context(asynch=True)
1346+
self.context = context or global_config.zmq_context()
13451347
self.pool = dict() # type: typing.Dict[str, AsyncZMQClient]
13461348
self.poller = zmq.asyncio.Poller()
13471349
for client_id, server_id in zip(client_ids, server_ids):
@@ -1941,11 +1943,13 @@ def __init__(self,
19411943
**kwargs
19421944
) -> None:
19431945
if isinstance(self, BaseSyncZMQ):
1944-
self.context = context or global_config.zmq_context(asynch=False)
1946+
self.context = context or global_config.zmq_context()
19451947
self.poller = zmq.Poller()
1948+
socket_class = zmq.Socket
19461949
elif isinstance(self, BaseAsyncZMQ):
1947-
self.context = context or global_config.zmq_context(asynch=True)
1950+
self.context = context or global_config.zmq_context()
19481951
self.poller = zmq.asyncio.Poller()
1952+
socket_class = zmq.asyncio.Socket
19491953
else:
19501954
raise TypeError("BaseEventConsumer must be subclassed by either BaseSyncZMQ or BaseAsyncZMQ")
19511955
super().__init__(id=id, server_id=kwargs.get('server_id', None), **kwargs)
@@ -1960,13 +1964,13 @@ def __init__(self,
19601964
)
19611965
self.event_unique_identifier = bytes(event_unique_identifier, encoding='utf-8')
19621966
short_uuid = uuid4().hex[:8]
1963-
self.interruptor = self.context.socket(zmq.PAIR)
1967+
self.interruptor = self.context.socket(zmq.PAIR, socket_class=socket_class)
19641968
self.interruptor.setsockopt_string(zmq.IDENTITY, f'interrupting-server-{short_uuid}')
1965-
self.interrupting_peer = self.context.socket(zmq.PAIR)
1969+
self.interrupting_peer = self.context.socket(zmq.PAIR, socket_class=socket_class)
19661970
self.interrupting_peer.setsockopt_string(zmq.IDENTITY, f'interrupting-client-{short_uuid}')
19671971
self.interruptor.bind(f'inproc://{self.id}-{short_uuid}/interruption')
19681972
self.interrupting_peer.connect(f'inproc://{self.id}-{short_uuid}/interruption')
1969-
1973+
19701974

19711975
def subscribe(self) -> None:
19721976
self.socket.setsockopt(zmq.SUBSCRIBE, self.event_unique_identifier)
@@ -1993,8 +1997,10 @@ def exit(self):
19931997
self.poller.unregister(self.socket)
19941998
self.poller.unregister(self.interruptor)
19951999
except Exception as E:
1996-
self.logger.warning("could not properly terminate socket or attempted to terminate an already terminated socket of event consuming socket at address '{}'. Exception message: {}".format(
1997-
self.socket_address, str(E)))
2000+
# self.logger.warning("could not properly terminate socket or attempted to terminate an already terminated socket of event consuming socket at address '{}'. Exception message: {}".format(
2001+
# self.socket_address, str(E)))
2002+
# above line prints too many warnings
2003+
pass
19982004
try:
19992005
self.socket.close(0)
20002006
self.interruptor.close(0)

hololinked/core/zmq/rpc_server.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ def __init__(self, *,
113113
kwargs['logger'] = self.logger
114114
# contexts and poller
115115
self._run = False # flag to stop all the
116-
self.context = context or global_config.zmq_context(asynch=True)
116+
self.context = context or global_config.zmq_context()
117117

118118
self.req_rep_server = AsyncZMQServer(
119119
id=self.id,
@@ -809,7 +809,7 @@ def prepare_rpc_server(
809809
# dont specify http server as a kwarg, as the other method run_with_http_server has to be used
810810
if context is not None and not isinstance(context, zmq.asyncio.Context):
811811
raise TypeError("context must be an instance of zmq.asyncio.Context")
812-
context = context or global_config.zmq_context(asynch=True)
812+
context = context or global_config.zmq_context()
813813

814814
if transports == 'INPROC' or transports == ZMQ_TRANSPORTS.INPROC:
815815
RPCServer(

hololinked/server/http/handlers.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -516,7 +516,7 @@ async def handle_datastream(self) -> None:
516516
id=f"{self.resource.name}|HTTPEvent|{uuid.uuid4().hex[:8]}",
517517
event_unique_identifier=self.resource.zmq_unique_identifier,
518518
socket_address=self.resource.zmq_socket_address,
519-
context=global_config.zmq_context(asynch=True),
519+
context=global_config.zmq_context(),
520520
logger=self.logger,
521521
)
522522
event_consumer.subscribe()

hololinked/td/interaction_affordance.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
1-
from enum import Enum
21
import typing
2+
import socket
3+
from enum import Enum
34
from typing import ClassVar, Optional
45
from pydantic import ConfigDict
56

@@ -373,6 +374,11 @@ def build_non_compliant_metadata(self):
373374
bound_event_dispatcher = getattr(self.owner, self.objekt.name, None) # type: EventDispatcher
374375
if not isinstance(bound_event_dispatcher, EventDispatcher) or not bound_event_dispatcher.publisher:
375376
return
376-
self.zmq_socket_address = bound_event_dispatcher.publisher.socket_address
377+
socket_address = bound_event_dispatcher.publisher.socket_address
378+
if socket_address.startswith("tcp://"):
379+
socket_address = socket_address.replace("tcp://*", f"tcp://{socket.gethostname()}")
380+
socket_address = socket_address.replace("tcp://localhost", f"tcp://{socket.gethostname()}")
381+
socket_address = socket_address.replace("tcp://0.0.0.0", f"tcp://{socket.gethostname()}")
382+
self.zmq_socket_address = socket_address
377383
self.zmq_unique_identifier = bound_event_dispatcher._unique_identifier
378384

tests/test_09_rpc_broker.py

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22
import threading
33
import typing
44
import unittest
5-
import zmq.asyncio
65
import jsonschema
76
import logging
87
import random
@@ -18,6 +17,7 @@
1817
from hololinked.server.zmq import ZMQServer
1918
from hololinked.td.utils import get_zmq_unique_identifier_from_event_affordance
2019
from hololinked.utils import get_all_sub_things_recusively, get_current_async_loop
20+
from hololinked.config import global_config
2121
from hololinked.td import ActionAffordance, PropertyAffordance, EventAffordance
2222
from hololinked.client.zmq.consumed_interactions import ZMQAction, ZMQProperty, ZMQEvent
2323

@@ -189,7 +189,7 @@ def startServer(self):
189189

190190
@classmethod
191191
def setUpClass(self):
192-
self.context = zmq.asyncio.Context()
192+
self.context = global_config.zmq_context()
193193
super().setUpClass()
194194
print(f"test ZMQ RPC Server {self.__name__}")
195195

@@ -1022,15 +1022,13 @@ def startServer(self):
10221022
server_id=self.thing_id,
10231023
logger=self.logger,
10241024
handshake=False,
1025-
context=self.thing.rpc_server.context,
10261025
transport='INPROC'
10271026
)
10281027
self.async_client = AsyncZMQClient(
10291028
id=self.client_id+'async',
10301029
server_id=self.thing_id,
10311030
logger=self.logger,
10321031
handshake=False,
1033-
context=self.thing.rpc_server.context,
10341032
transport='INPROC'
10351033
)
10361034
time.sleep(2)
@@ -1055,7 +1053,7 @@ def load_tests(loader, tests, pattern):
10551053
suite.addTest(unittest.TestLoader().loadTestsFromTestCase(TestRPCServer))
10561054
suite.addTest(unittest.TestLoader().loadTestsFromTestCase(TestExposedActions))
10571055
suite.addTest(unittest.TestLoader().loadTestsFromTestCase(TestExposedProperties))
1058-
# suite.addTest(unittest.TestLoader().loadTestsFromTestCase(TestExposedEvents))
1056+
suite.addTest(unittest.TestLoader().loadTestsFromTestCase(TestExposedEvents))
10591057
suite.addTest(unittest.TestLoader().loadTestsFromTestCase(TestThingRunRPCServer))
10601058
return suite
10611059

0 commit comments

Comments
 (0)