Skip to content
Snippets Groups Projects
Commit 0703ba7b authored by George Nachman's avatar George Nachman
Browse files

Move private python module code to the private subdirectory

parent 9ee5285d
No related branches found
No related tags found
No related merge requests found
#!/usr/bin/python
# This is python 2.7 on macOS 10.12.
from __future__ import print_function
import argparse
import dispatchq
import logging
import os
import select
import six
import sys
import thread
import threading
import time
import traceback
import websocket
class AsyncWebsocketApp(websocket.WebSocketApp):
def __init__(self, url, on_message=None, on_error=None, on_close=None, subprotocols=None):
websocket.WebSocketApp.__init__(self, url, on_message=on_message, on_error=on_error,
on_close=on_close, subprotocols=subprotocols)
self.dispatch_queue = dispatchq.IODispatchQueue()
def run_async(self, sockopt=None, sslopt=None,
ping_interval=0, ping_timeout=None,
http_proxy_host=None, http_proxy_port=None,
http_no_proxy=None, http_proxy_auth=None,
skip_utf8_validation=False,
host=None, origin=None):
"""
run event loop for WebSocket framework.
This loop is infinite loop and is alive during websocket is available.
sockopt: values for socket.setsockopt.
sockopt must be tuple
and each element is argument of sock.setsockopt.
sslopt: ssl socket optional dict.
ping_interval: automatically send "ping" command
every specified period(second)
if set to 0, not send automatically.
ping_timeout: timeout(second) if the pong message is not received.
http_proxy_host: http proxy host name.
http_proxy_port: http proxy port. If not set, set to 80.
http_no_proxy: host names, which doesn't use proxy.
skip_utf8_validation: skip utf8 validation.
host: update host header.
origin: update origin header.
"""
if not ping_timeout or ping_timeout <= 0:
ping_timeout = None
if ping_timeout and ping_interval and ping_interval <= ping_timeout:
raise WebSocketException("Ensure ping_interval > ping_timeout")
if sockopt is None:
sockopt = []
if sslopt is None:
sslopt = {}
if self.sock:
raise WebSocketException("socket is already opened")
thread = None
close_frame = None
try:
logging.debug("Starting")
self.sock = websocket.WebSocket(
self.get_mask_key, sockopt=sockopt, sslopt=sslopt,
fire_cont_frame=self.on_cont_message and True or False,
skip_utf8_validation=skip_utf8_validation)
logging.debug("Created socket")
self.sock.settimeout(websocket.getdefaulttimeout())
logging.debug("Connecting")
self.sock.connect(
self.url, header=self.header, cookie=self.cookie,
http_proxy_host=http_proxy_host,
http_proxy_port=http_proxy_port, http_no_proxy=http_no_proxy,
http_proxy_auth=http_proxy_auth, subprotocols=self.subprotocols,
host=host, origin=origin)
logging.debug("Calling on open")
self._callback(self.on_open)
if ping_interval:
event = threading.Event()
thread = threading.Thread(
target=self._send_ping, args=(ping_interval, event))
thread.setDaemon(True)
thread.start()
logging.debug("Entering mainloop")
while self.sock.connected:
logging.debug("Background websocket client calling select")
r, w, e = select.select(
(self.sock.sock, self.dispatch_queue.read_pipe), (), (), ping_timeout)
if not self.keep_running:
break
if r and self.dispatch_queue.read_pipe in r:
logging.debug("Background websocket client running queued jobs")
n = self.dispatch_queue.run_jobs()
if r and self.sock.sock in r:
op_code, frame = self.sock.recv_data_frame(True)
if op_code == websocket.ABNF.OPCODE_CLOSE:
close_frame = frame
break
elif op_code == websocket.ABNF.OPCODE_PING:
self._callback(self.on_ping, frame.data)
elif op_code == websocket.ABNF.OPCODE_PONG:
self.last_pong_tm = time.time()
self._callback(self.on_pong, frame.data)
elif op_code == websocket.ABNF.OPCODE_CONT and self.on_cont_message:
self._callback(self.on_data, data,
frame.opcode, frame.fin)
self._callback(self.on_cont_message,
frame.data, frame.fin)
else:
data = frame.data
if six.PY3 and op_code == websocket.ABNF.OPCODE_TEXT:
data = data.decode("utf-8")
self._callback(self.on_data, data, frame.opcode, True)
self._callback(self.on_message, data)
if ping_timeout and self.last_ping_tm \
and time.time() - self.last_ping_tm > ping_timeout \
and self.last_ping_tm - self.last_pong_tm > ping_timeout:
raise WebSocketTimeoutException("ping/pong timed out")
logging.debug("While loop exited")
except (Exception, KeyboardInterrupt, SystemExit) as e:
traceback.print_exc()
self._callback(self.on_error, e)
if isinstance(e, SystemExit):
# propagate SystemExit further
raise
finally:
if thread and thread.isAlive():
event.set()
thread.join()
self.keep_running = False
if self.sock is not None:
self.sock.close()
close_args = self._get_close_args(
close_frame.data if close_frame else None)
self._callback(self.on_close, *close_args)
self.sock = None
import future
import logging
class DependentFuture(future.Future):
"""If you have a future A and you want to create future B, but B can't be
created yet because the information needed to make it doesn't exist yet, use
this. This provides a future C that creates B when A is realized. Its get()
blocks until A and B are both realized."""
def __init__(self, parent, create_inner):
"""Initializer.
parent: The future this object depends on (future A)
create_inner: A function that takes parent's response as its argument and
returns a new future (future B)
"""
future.Future.__init__(self)
self.parent = parent
self.innerFuture = None
self.create_inner = create_inner
parent.watch(self._parent_did_realize)
def get(self):
"""Waits until both the parent and the subordinate futures (A and B) are
realized. Return's B's value."""
logging.debug("Dependent future %s getting parent future %s" % (str(self), str(self.parent)))
parent = self.parent.get()
logging.debug("Dependent future %s got parent from future %s, produced inner future %s" % (str(self), str(self.parent), str(self.innerFuture)))
return self.innerFuture.get()
def _parent_did_realize(self, response):
logging.debug("PARENT REALIZED FOR %s" % str(self.parent))
self.innerFuture = self.create_inner(response)
for watch in self.watches:
self.innerFuture.watch(watch)
self.watches = None
import logging
import os
import threading
import time
class AbstractDispatchQueue(object):
"""Facilitates running a function on another thread.
Clients invoke dispatch_async() to run a function on the thread that pulls from this queue.
Owning threads invoke run_jobs() periodically.
"""
def __init__(self):
self.queue = []
self.cond = threading.Condition()
def dispatch_async(self, f):
self.cond.acquire()
self.queue.append(f)
self.notify()
self.cond.release()
def run_jobs(self):
n = 0
job = self._dequeue()
while job is not None:
job()
job = self._dequeue()
n += 1
return n
def _run_jobs_locked(self):
n = 0
job = self._dequeue_locked()
while job is not None:
self.cond.release()
job()
n += 1
self.cond.acquire()
job = self._dequeue_locked()
return n
def _dequeue(self):
self.cond.acquire()
job = self._dequeue_locked()
self.cond.release()
return job
def _dequeue_locked(self):
job = None
if len(self.queue) > 0:
job = self.queue[0]
del self.queue[0]
return job
class IODispatchQueue(AbstractDispatchQueue):
"""A dispatch queue owned by a select loop.
The select loop should select on self.read_pipe, which becomes readable when run_jobs has works to do.
"""
def __init__(self):
AbstractDispatchQueue.__init__(self)
self.read_pipe, self.write_pipe = os.pipe()
def run_jobs(self):
n = AbstractDispatchQueue.run_jobs(self)
os.read(self.read_pipe, n)
def notify(self):
os.write(self.write_pipe, " ")
class IdleDispatchQueue(AbstractDispatchQueue):
"""A condition variable-based dispatch queue that adds the ability to wait
for a set period of time and notify the condition variable.
Adds a wait API that blocks until there is work to do.
"""
def notify(self):
self.cond.notify_all()
def wait(self, timeout=None):
"""Waits until there is work to do.
timeout: If None, wait indefinitely. Otherwise, don't block for more than this many seconds.
Returns the number of jobs run.
"""
start_time = time.time()
n = 0
if timeout is None:
self.cond.acquire()
c = self._run_jobs_locked()
n += c
while c == 0:
self.cond.wait()
c = self._run_jobs_locked()
n += c
self.cond.release()
else:
end_time = start_time + timeout
now = time.time()
self.cond.acquire()
while True:
n = self._run_jobs_locked()
if n == 0 and now < end_time:
self.cond.wait(timeout=end_time - now)
now = time.time()
if n > 0 or now >= end_time:
break;
self.cond.release()
return n
import logging
import synchronouscb
"""0-argument callbacks that get invoked just before blocking."""
_idle_observers = []
def add_idle_observer(observer):
"""Adds an idle observer callback. Will be run just before blocking the main thread."""
_idle_observers.append(observer)
class Future(synchronouscb.SynchronousCallback):
"""Represents a value that will become available later.
As this is a subclass of SynchronousCallback, invoke callback() when the
future is resolved. Clients call get() when they're ready to block for a
value.
Also has some bells and whistles. When you create a future you can define a
transform function that will modify the value.
You can add a "watch" function that gets called asynchronously when the value
becomes available.
"""
def __init__(self, transform=None):
"""Initializes a new future.
transform: If not None, the transform function runs immediately when the
value becomes available. It takes one argument, the original response. It
returns a transformed response, which is returned by get().
"""
synchronouscb.SynchronousCallback.__init__(self)
if transform is None:
self.transform = lambda x: x
else:
self.transform = transform
self.transformed_response = None
self.watches = []
def get(self):
"""Returns the existing transformed response if available. Otherwise, waits
until it is available and then returns it."""
if self.transformed_response is None:
logging.debug("Waiting on future")
self.wait()
logging.debug("REALIZING %s" % str(self))
self.transformed_response = self.transform(self.response)
assert self.transformed_response is not None
self._invoke_watches(self.transformed_response)
return self.transformed_response
def watch(self, callback):
"""Adds a watch callback to the future.
The callback will be invoked when the transformed response becomes available.
"""
if self.watches is not None:
logging.debug("Add watch to %s", str(self))
self.watches.append(callback)
else:
logging.debug("Immediately run callback for watch for %s" % str(self))
callback(self.get())
def wait(self):
"""Blocks until a value is available.
Has the side effect of telling idle observers that we're about to block.
"""
self.idle_spin()
synchronouscb.SynchronousCallback.wait(self)
def idle_spin(self):
"""Call this before blocking while idle in the main thread."""
logging.debug("Running idle observers")
for o in _idle_observers:
o()
def realized(self):
return self.response is not None
def _invoke_watches(self, response):
watches = self.watches
self.watches = None
for watch in watches:
watch(response)
from .asyncws import AsyncWebsocketApp
import api_pb2
import logging
import synchronouscb
import threading
import websocket
class RPCSocket(AsyncWebsocketApp):
def __init__(self, handler, url, subprotocols):
AsyncWebsocketApp.__init__(self, url, on_message=self._on_rpc_message, subprotocols=subprotocols)
self.waiting = False
self.callbacks = []
self.callbacks_cond = threading.Condition()
self.handler = handler
thread = threading.Thread(target=self.run_async)
thread.setDaemon(True)
logging.debug("Kick off background websocket client")
thread.start()
def sync_send_rpc(self, message):
callback = synchronouscb.SynchronousCallback()
def f():
logging.debug("Send request")
self.send(message, opcode=websocket.ABNF.OPCODE_BINARY)
self.dispatch_async(f)
self._append_callback(callback.callback)
logging.debug("Wait for callback")
callback.wait()
logging.debug("Done waiting")
return callback.response
def async_send_rpc(self, message, callback):
def f():
request = api_pb2.Request()
request.ParseFromString(message)
logging.debug("SEND:\n" + str(request))
self.send(message, opcode=websocket.ABNF.OPCODE_BINARY)
self.dispatch_queue.dispatch_async(f)
self._append_callback(callback)
def _append_callback(self, callback):
self.callbacks_cond.acquire()
self.callbacks.append(callback)
self.callbacks_cond.release()
def _on_rpc_message(self, ws, message):
logging.debug("Got an RPC message")
parsed = self.handler(message)
if parsed is not None:
logging.debug("Running the next callback")
self.callbacks_cond.acquire()
callback = self.callbacks[0]
del self.callbacks[0]
self.callbacks_cond.notify_all()
self.callbacks_cond.release()
if not self.waiting:
callback(parsed)
else:
logging.debug("Notifying unparsed message")
self.callbacks_cond.acquire()
self.callbacks_cond.notify_all()
self.callbacks_cond.release()
def finish(self):
"""Blocks until all outstanding RPCs have completed. Does not run callbacks."""
logging.debug("Finish acquiring lock")
self.callbacks_cond.acquire()
logging.debug("Finish invoked with " + str(len(self.callbacks)) + " callbacks left")
self.waiting = True
while len(self.callbacks) > 0:
future.idle_spin()
logging.debug("Finish waiting...")
self.callbacks_cond.wait()
logging.debug("Finish done")
self.callbacks_cond.release()
import socket
_socket = None
_notification_handlers = []
def register_notification_handler(handler):
_notification_handlers.append(handler)
def _notification_handler(notification):
for handler in _notification_handlers:
handler(notification)
def get_socket():
global _socket
if _socket is None:
_socket = socket.Connection()
_socket.connect(_notification_handler)
return _socket
def wait():
if _socket is not None:
_socket.wait()
#!/usr/bin/python
# This is python 2.7 on macOS 10.12.
from __future__ import print_function
import api_pb2
import future
import rpcsocket
import logging
class Connection(object):
def __init__(self):
self.last_future = None
def wait(self):
if self.last_future is not None:
self.last_future.get()
self.last_future = None
def finish(self):
self.ws.finish()
def connect(self, notification_handler):
self.notification_handler = notification_handler;
self.ws = rpcsocket.RPCSocket(
self._handler,
"ws://localhost:1912/",
subprotocols = [ 'api.iterm2.com' ])
def request_hierarchy(self):
return self._send_async(self._list_sessions_request(),
lambda response: response.list_sessions_response)
def _list_sessions_request(self):
request = api_pb2.Request()
request.list_sessions_request.SetInParent()
return request
def request_send_text(self, session_id, text):
return self._send_async(self._send_text_request(session_id, text),
lambda response: response.send_text_response)
def _send_text_request(self, session_id, text):
request = api_pb2.Request()
if session_id is not None:
request.send_text_request.session = session_id
request.send_text_request.text = text
return request
def request_create_tab(self, profile=None, window=None, index=None, command=None):
return self._send_async(
self._create_tab_request(profile=profile, window=window, index=index, command=command),
lambda response: response.create_tab_response)
def _create_tab_request(self, profile=None, window=None, index=None, command=None):
request = api_pb2.Request()
request.create_tab_request.SetInParent()
if profile is not None:
request.create_tab_request.profile_name = profile
if window is not None:
request.create_tab_request.window_id = window
if index is not None:
request.create_tab_request.tab_index = index
if command is not None:
request.create_tab_request.command = command
return request
def request_split_pane(self, session=None, vertical=False, before=False, profile=None):
return self._send_async(
self._split_pane_request(session=session, vertical=vertical, before=before, profile=profile),
lambda response: response.split_pane_response)
def _split_pane_request(self, session=None, vertical=False, before=False, profile=None):
request = api_pb2.Request()
request.split_pane_request.SetInParent()
if session is not None:
request.split_pane_request.session = session
if vertical:
request.split_pane_request.split_direction = api_pb2.SplitPaneRequest.VERTICAL
else:
request.split_pane_request.split_direction = api_pb2.SplitPaneRequest.HORIZONTAL;
request.split_pane_request.before = False
if profile is not None:
request.split_pane_request.profile_name = profile
return request;
def request_subscribe(self, subscribe, notification_type, session=None):
return self._send_async(
self._subscribe_request(subscribe, notification_type, session=session),
lambda response: response.notification_response)
def _subscribe_request(self, subscribe, notification_type, session=None):
request = api_pb2.Request()
if session is not None:
request.notification_request.session = session
request.notification_request.subscribe = subscribe
request.notification_request.notification_type = notification_type
return request
def _send_sync(self, request):
return self.ws.sync_send_rpc(request.SerializeToString())
def _send_async(self, request, transform):
f = future.Future(transform)
self.ws.async_send_rpc(request.SerializeToString(), f.callback)
self.last_future = f
return f
def _handler(self, message):
response = api_pb2.Response()
response.ParseFromString(message)
if response.HasField('notification'):
self.notification_handler(response.notification)
return None
else:
logging.debug("Got a non-notification message" + str(response))
return response
import logging
import threading
class SynchronousCallback(object):
"""A wrapper around a condition variable.
Contains one bit of state, self.response.
"""
def __init__(self):
self.cond = threading.Condition()
self.response = None
def callback(self, r):
"""Like notfiying a condition variable, but also sets the response to r."""
logging.debug("Callback invoked")
self.cond.acquire()
self.response = r
self.cond.notify_all()
self.cond.release()
def wait(self):
"""Blocks until there is a response."""
logging.debug("Waiting for callback to be invoked")
self.cond.acquire()
while self.response is None:
self.cond.wait()
logging.debug("Callback was invoked")
self.cond.release()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment