Skip to content
Snippets Groups Projects
Commit 02fff124 authored by Phil Hughes's avatar Phil Hughes
Browse files

updates to sockets file

parent bd49c104
No related branches found
No related tags found
No related merge requests found
Loading
Loading
@@ -15,8 +15,7 @@ class Todo < ActiveRecord::Base
validates :target_id, presence: true, unless: :for_commit?
validates :commit_id, presence: true, if: :for_commit?
 
after_create :publish_event
after_save :publish_event
after_commit :publish_event
 
default_scope { reorder(id: :desc) }
 
Loading
Loading
@@ -65,8 +64,13 @@ class Todo < ActiveRecord::Base
end
end
 
private
def publish_event
Sidekiq::Client.enqueue(TodoWorker, self.id)
Gitlab::Redis.with do |redis|
data = {
user_id: self.user_id,
count: self.user.todos.pending.count,
}
redis.publish("todos", data.to_json)
end
end
end
class TodoWorker
include Sidekiq::Worker
sidekiq_options queue: :default
def perform(todo_id)
Gitlab::Redis.with do |redis|
todo = Todo.find(todo_id)
data = {
count: todo.user.todos.pending.count
}
redis.publish("todos.#{todo.user_id}", {count: todo.user.todos.pending.count}.to_json)
end
end
end
Loading
Loading
@@ -4,28 +4,36 @@ require "json"
 
EM.run do
@channels = Hash.new
@redis = EM::Hiredis.connect("unix:///Users/phil/Projects/gdk-ce/redis/redis.socket")
pubsub = @redis.pubsub
pubsub.subscribe "todos"
pubsub.on(:message) do |redis_channel, message|
message = JSON.parse(message)
data = {
channel: redis_channel,
data: message
}
if redis_channel == "todos"
@channels.each do |key, channel|
if channel[:user_id].to_i == message["user_id"]
channel[:channel].push data.to_json
end
end
end
end
EM::WebSocket.start(host: "0.0.0.0", port: "8080", debug: false) do |socket|
socket.onopen do |handshake|
channel = channel_for_socket(handshake)
 
@redis = EM::Hiredis.connect("unix:///Users/phil/Projects/gdk-ce/redis/redis.socket")
pubsub = @redis.pubsub
pubsub.subscribe "todos.#{path(handshake)}"
pubsub.on(:message) do |redis_channel, message|
data = {
channel: redis_channel.split(".").first,
data: JSON.parse(message)
}
channel.push data.to_json
end
sid = channel.subscribe do |msg|
sid = channel[:channel].subscribe do |msg|
socket.send msg
end
 
socket.onclose do
pubsub.unsubscribe "todos.#{path(handshake)}"
channel.unsubscribe(sid)
channel[:channel].unsubscribe(sid)
@channels.delete(path(handshake))
end
end
end
Loading
Loading
@@ -36,6 +44,10 @@ EM.run do
 
def channel_for_socket(handshake)
channel_path = path(handshake)
@channels[channel_path] ||= EM::Channel.new
@channels[channel_path] ||= {
channel: EM::Channel.new,
user_id: channel_path,
subscribed: ['todos']
}
end
end
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment