Skip to content
Snippets Groups Projects
Commit 9ee5285d authored by George Nachman's avatar George Nachman
Browse files

Do some refactoring to the python API.

You now do iterm2.run(f) to run a function `f` within the iterm2 context.

Add session.read_keystroke()

Refactor various classes into their own files
parent 9937fd32
No related branches found
No related tags found
No related merge requests found
Showing with 243 additions and 112 deletions
Loading
Loading
@@ -3,3 +3,9 @@ from .notifications import NewSessionSubscription, TerminateSessionSubscription,
from .session import Session
from .tab import Tab
from .window import Window
import sharedstate
def run(function):
function()
sharedstate.get_socket().finish()
Loading
Loading
@@ -131,7 +131,6 @@ class AsyncWebsocketApp(websocket.WebSocketApp):
# propagate SystemExit further
raise
finally:
logging.debug("Everything has gone to shit")
if thread and thread.isAlive():
event.set()
thread.join()
Loading
Loading
import future
import logging
class DependentFuture(future.Future):
"""If you have a future A and you want to create future B, but B can't be
created yet because the information needed to make it doesn't exist yet, use
this. This provides a future C that creates B when A is realized. Its get()
blocks until A and B are both realized."""
def __init__(self, parent, create_inner):
"""Initializer.
parent: The future this object depends on (future A)
create_inner: A function that takes parent's response as its argument and
returns a new future (future B)
"""
future.Future.__init__(self)
self.parent = parent
self.innerFuture = None
self.create_inner = create_inner
parent.watch(self._parent_did_realize)
def get(self):
"""Waits until both the parent and the subordinate futures (A and B) are
realized. Return's B's value."""
logging.debug("Dependent future %s getting parent future %s" % (str(self), str(self.parent)))
parent = self.parent.get()
logging.debug("Dependent future %s got parent from future %s, produced inner future %s" % (str(self), str(self.parent), str(self.innerFuture)))
return self.innerFuture.get()
def _parent_did_realize(self, response):
logging.debug("PARENT REALIZED FOR %s" % str(self.parent))
self.innerFuture = self.create_inner(response)
for watch in self.watches:
self.innerFuture.watch(watch)
self.watches = None
Loading
Loading
@@ -4,6 +4,12 @@ import threading
import time
 
class AbstractDispatchQueue(object):
"""Facilitates running a function on another thread.
Clients invoke dispatch_async() to run a function on the thread that pulls from this queue.
Owning threads invoke run_jobs() periodically.
"""
def __init__(self):
self.queue = []
self.cond = threading.Condition()
Loading
Loading
@@ -48,6 +54,10 @@ class AbstractDispatchQueue(object):
return job
 
class IODispatchQueue(AbstractDispatchQueue):
"""A dispatch queue owned by a select loop.
The select loop should select on self.read_pipe, which becomes readable when run_jobs has works to do.
"""
def __init__(self):
AbstractDispatchQueue.__init__(self)
self.read_pipe, self.write_pipe = os.pipe()
Loading
Loading
@@ -60,10 +70,21 @@ class IODispatchQueue(AbstractDispatchQueue):
os.write(self.write_pipe, " ")
 
class IdleDispatchQueue(AbstractDispatchQueue):
"""A condition variable-based dispatch queue that adds the ability to wait
for a set period of time and notify the condition variable.
Adds a wait API that blocks until there is work to do.
"""
def notify(self):
self.cond.notify_all()
 
def wait(self, timeout=None):
"""Waits until there is work to do.
timeout: If None, wait indefinitely. Otherwise, don't block for more than this many seconds.
Returns the number of jobs run.
"""
start_time = time.time()
n = 0
if timeout is None:
Loading
Loading
import logging
import synchronouscb
"""0-argument callbacks that get invoked just before blocking."""
_idle_observers = []
def add_idle_observer(observer):
"""Adds an idle observer callback. Will be run just before blocking the main thread."""
_idle_observers.append(observer)
class Future(synchronouscb.SynchronousCallback):
"""Represents a value that will become available later.
As this is a subclass of SynchronousCallback, invoke callback() when the
future is resolved. Clients call get() when they're ready to block for a
value.
Also has some bells and whistles. When you create a future you can define a
transform function that will modify the value.
You can add a "watch" function that gets called asynchronously when the value
becomes available.
"""
def __init__(self, transform=None):
"""Initializes a new future.
transform: If not None, the transform function runs immediately when the
value becomes available. It takes one argument, the original response. It
returns a transformed response, which is returned by get().
"""
synchronouscb.SynchronousCallback.__init__(self)
if transform is None:
self.transform = lambda x: x
else:
self.transform = transform
self.transformed_response = None
self.watches = []
def get(self):
"""Returns the existing transformed response if available. Otherwise, waits
until it is available and then returns it."""
if self.transformed_response is None:
logging.debug("Waiting on future")
self.wait()
logging.debug("REALIZING %s" % str(self))
self.transformed_response = self.transform(self.response)
assert self.transformed_response is not None
self._invoke_watches(self.transformed_response)
return self.transformed_response
def watch(self, callback):
"""Adds a watch callback to the future.
The callback will be invoked when the transformed response becomes available.
"""
if self.watches is not None:
logging.debug("Add watch to %s", str(self))
self.watches.append(callback)
else:
logging.debug("Immediately run callback for watch for %s" % str(self))
callback(self.get())
def wait(self):
"""Blocks until a value is available.
Has the side effect of telling idle observers that we're about to block.
"""
self.idle_spin()
synchronouscb.SynchronousCallback.wait(self)
def idle_spin(self):
"""Call this before blocking while idle in the main thread."""
logging.debug("Running idle observers")
for o in _idle_observers:
o()
def realized(self):
return self.response is not None
def _invoke_watches(self, response):
watches = self.watches
self.watches = None
for watch in watches:
watch(response)
Loading
Loading
@@ -3,9 +3,10 @@
 
from __future__ import print_function
import api_pb2
from sharedstate import get_socket, wait
import future
import notifications
import session
from sharedstate import get_socket, wait
import socket
import tab
import window
Loading
Loading
@@ -26,7 +27,7 @@ class Synchronizer(object):
 
def _layoutDidChange(self, notification):
logging.debug("Layout did change")
self.future = socket.Future()
self.future = future.Future()
self.future.callback(notification.list_sessions_response)
 
def get(self):
Loading
Loading
Loading
Loading
@@ -6,12 +6,14 @@ from __future__ import print_function
import api_pb2
from sharedstate import get_socket, wait, register_notification_handler
import dispatchq
import future
import session
import socket
import tab
import logging
import threading
import time
import traceback
 
_subscriptions = {}
_dispatch_queue = dispatchq.IdleDispatchQueue()
Loading
Loading
@@ -87,9 +89,9 @@ def _extract(notification):
return key, notification
 
def _dispatch_handle_notification(notification):
# Called on the websocket thread
def _run_handlers():
key, sub_notification = _extract(notification)
logging.debug("Got a notification to dispatch. key=" + str(key) +", notification=\n" + str(notification))
if key in _subscriptions:
handlers = _subscriptions[key]
if handlers is not None:
Loading
Loading
@@ -105,6 +107,4 @@ def quick_wait():
n = _dispatch_queue.wait(0)
 
register_notification_handler(_dispatch_handle_notification)
socket.add_idle_observer(quick_wait)
future.add_idle_observer(quick_wait)
Loading
Loading
@@ -2,34 +2,16 @@ from .asyncws import AsyncWebsocketApp
 
import api_pb2
import logging
import synchronouscb
import threading
import websocket
 
class SynchronousCallback(object):
def __init__(self):
self.cond = threading.Condition()
self.response = None
def callback(self, r):
logging.debug("Callback invoked")
self.cond.acquire()
self.response = r
self.cond.notify_all()
self.cond.release()
def wait(self):
logging.debug("Waiting for callback to be invoked")
self.cond.acquire()
while self.response is None:
self.cond.wait()
logging.debug("Callback was invoked")
self.cond.release()
class RPCSocket(AsyncWebsocketApp):
def __init__(self, handler, url, subprotocols):
AsyncWebsocketApp.__init__(self, url, on_message=self._on_rpc_message, subprotocols=subprotocols)
self.waiting = False
self.callbacks = []
self.callbacks_lock = threading.Lock()
self.callbacks_cond = threading.Condition()
self.handler = handler
thread = threading.Thread(target=self.run_async)
thread.setDaemon(True)
Loading
Loading
@@ -37,7 +19,7 @@ class RPCSocket(AsyncWebsocketApp):
thread.start()
 
def sync_send_rpc(self, message):
callback = SynchronousCallback()
callback = synchronouscb.SynchronousCallback()
def f():
logging.debug("Send request")
self.send(message, opcode=websocket.ABNF.OPCODE_BINARY)
Loading
Loading
@@ -58,16 +40,37 @@ class RPCSocket(AsyncWebsocketApp):
self._append_callback(callback)
 
def _append_callback(self, callback):
self.callbacks_lock.acquire()
self.callbacks_cond.acquire()
self.callbacks.append(callback)
self.callbacks_lock.release()
self.callbacks_cond.release()
 
def _on_rpc_message(self, ws, message):
logging.debug("Got an RPC message")
parsed = self.handler(message)
if parsed is not None:
logging.debug("Running the next callback")
self.callbacks_lock.acquire()
self.callbacks_cond.acquire()
callback = self.callbacks[0]
del self.callbacks[0]
self.callbacks_lock.release()
callback(parsed)
self.callbacks_cond.notify_all()
self.callbacks_cond.release()
if not self.waiting:
callback(parsed)
else:
logging.debug("Notifying unparsed message")
self.callbacks_cond.acquire()
self.callbacks_cond.notify_all()
self.callbacks_cond.release()
def finish(self):
"""Blocks until all outstanding RPCs have completed. Does not run callbacks."""
logging.debug("Finish acquiring lock")
self.callbacks_cond.acquire()
logging.debug("Finish invoked with " + str(len(self.callbacks)) + " callbacks left")
self.waiting = True
while len(self.callbacks) > 0:
future.idle_spin()
logging.debug("Finish waiting...")
self.callbacks_cond.wait()
logging.debug("Finish done")
self.callbacks_cond.release()
Loading
Loading
@@ -4,9 +4,12 @@
from __future__ import print_function
 
import api_pb2
import depfuture
import future
from sharedstate import get_socket, wait
import socket
import logging
import notifications
 
class TextSender(object):
def __init__(self, future):
Loading
Loading
@@ -52,7 +55,7 @@ class FutureSession(AbstractSession):
 
def create_inner(response):
return get_socket().request_send_text(self.get_session_id(), text)
sendTextFuture = socket.DependentFuture(self.future, create_inner)
sendTextFuture = depfuture.DependentFuture(self.future, create_inner)
return TextSender(sendTextFuture)
 
def split_pane(self, vertical=False, before=False, profile=None):
Loading
Loading
@@ -62,7 +65,7 @@ class FutureSession(AbstractSession):
def create_inner(response):
return get_socket().request_split_pane(
session=self.get_session_id(), vertical=vertical, before=before, profile=profile)
createSessionFuture = socket.DependentFuture(self.future, create_inner)
createSessionFuture = depfuture.DependentFuture(self.future, create_inner)
return FutureSession(createSessionFuture);
 
def _get_session(self):
Loading
Loading
@@ -96,5 +99,15 @@ class Session(AbstractSession):
return FutureSession(get_socket().request_split_pane(
session=self.session_id, vertical=vertical, before=before, profile=profile))
 
def read_keystroke(self):
"""Blocks until a keystroke is received. Returns a KeystrokeNotification."""
f = future.Future()
def callback(keystroke_notification):
f.callback(keystroke_notification)
subscription = notifications.KeystrokeSubscription(self.session_id, callback)
# Can't do f.get() here because we need to keep dispatching notifications on the main thread
# until we get the notification we care about.
while not f.realized():
notifications.wait()
return f.get()
 
Loading
Loading
@@ -4,78 +4,10 @@
from __future__ import print_function
 
import api_pb2
import future
import rpcsocket
import logging
 
_idle_observers = []
def add_idle_observer(observer):
_idle_observers.append(observer)
class Future(rpcsocket.SynchronousCallback):
def __init__(self, transform=None):
rpcsocket.SynchronousCallback.__init__(self)
if transform is None:
self.transform = lambda x: x
else:
self.transform = transform
self.transformed_response = None
self.watches = []
def get(self):
if self.transformed_response is None:
logging.debug("Waiting on future")
self.wait()
logging.debug("REALIZING %s" % str(self))
self.transformed_response = self.transform(self.response)
assert self.transformed_response is not None
self._invoke_watches(self.transformed_response)
return self.transformed_response
def _invoke_watches(self, response):
watches = self.watches
self.watches = None
for watch in watches:
watch(response)
def watch(self, callback):
if self.watches is not None:
logging.debug("Add watch to %s", str(self))
self.watches.append(callback)
else:
logging.debug("Immediately run callback for watch for %s" % str(self))
callback(self.get())
def wait(self):
for o in _idle_observers:
o()
rpcsocket.SynchronousCallback.wait(self)
class DependentFuture(Future):
"""If you have a future A and you want to create future B, but B can't be
created yet because the information needed to make it doesn't exist yet, use
this. This provides a future C that creates B when A is realized. Its get()
blocks until A and B are both realized."""
def __init__(self, parent, create_inner):
Future.__init__(self)
self.parent = parent
self.innerFuture = None
self.create_inner = create_inner
parent.watch(self._parent_did_realize)
def _parent_did_realize(self, response):
logging.debug("PARENT REALIZED FOR %s" % str(self.parent))
self.innerFuture = self.create_inner(response)
for watch in self.watches:
self.innerFuture.watch(watch)
self.watches = None
def get(self):
logging.debug("Dependent future %s getting parent future %s" % (str(self), str(self.parent)))
parent = self.parent.get()
logging.debug("Dependent future %s got parent from future %s, produced inner future %s" % (str(self), str(self.parent), str(self.innerFuture)))
return self.innerFuture.get()
class Connection(object):
def __init__(self):
self.last_future = None
Loading
Loading
@@ -85,6 +17,9 @@ class Connection(object):
self.last_future.get()
self.last_future = None
 
def finish(self):
self.ws.finish()
def connect(self, notification_handler):
self.notification_handler = notification_handler;
self.ws = rpcsocket.RPCSocket(
Loading
Loading
@@ -166,10 +101,10 @@ class Connection(object):
return self.ws.sync_send_rpc(request.SerializeToString())
 
def _send_async(self, request, transform):
future = Future(transform)
self.ws.async_send_rpc(request.SerializeToString(), future.callback)
self.last_future = future
return future
f = future.Future(transform)
self.ws.async_send_rpc(request.SerializeToString(), f.callback)
self.last_future = f
return f
 
def _handler(self, message):
response = api_pb2.Response()
Loading
Loading
import logging
import threading
class SynchronousCallback(object):
"""A wrapper around a condition variable.
Contains one bit of state, self.response.
"""
def __init__(self):
self.cond = threading.Condition()
self.response = None
def callback(self, r):
"""Like notfiying a condition variable, but also sets the response to r."""
logging.debug("Callback invoked")
self.cond.acquire()
self.response = r
self.cond.notify_all()
self.cond.release()
def wait(self):
"""Blocks until there is a response."""
logging.debug("Waiting for callback to be invoked")
self.cond.acquire()
while self.response is None:
self.cond.wait()
logging.debug("Callback was invoked")
self.cond.release()
Loading
Loading
@@ -3,8 +3,9 @@
 
from __future__ import print_function
 
from sharedstate import get_socket, wait
import depfuture
import session
from sharedstate import get_socket, wait
import socket
import api_pb2
import tab
Loading
Loading
@@ -53,7 +54,7 @@ class FutureWindow(AbstractWindow):
def create_inner(response):
return get_socket().request_create_tab(
profile=profile, window=self.get_window_id(), index=index, command=command)
createTabFuture = socket.DependentFuture(self.future, create_inner)
createTabFuture = depfuture.DependentFuture(self.future, create_inner)
return tab.FutureTab(createTabFuture);
 
def get_status(self):
Loading
Loading
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment