From 39861381ac314be8dfa0270ccb16d21ccaa97091 Mon Sep 17 00:00:00 2001 From: Joe Rayhawk Date: Sat, 19 Nov 2022 20:55:36 -0800 Subject: First pass at some logic for non-persistence. --- src/obswebsocket.cr | 57 ++++++++++++++++++++++++++++++++++++++++++----------- 1 file changed, 46 insertions(+), 11 deletions(-) diff --git a/src/obswebsocket.cr b/src/obswebsocket.cr index b02abdf..df84cba 100644 --- a/src/obswebsocket.cr +++ b/src/obswebsocket.cr @@ -29,6 +29,12 @@ module OBSWebSocket return( request.to_s ) end + class WriterUnavailableException < Exception + end + + class PasswordMissingException < Exception + end + class SourceMissingException < Exception end @@ -36,6 +42,7 @@ module OBSWebSocket end class Server + @@writer : Fiber | Nil @@reqchan = Channel( Tuple( ( Channel( JSON::Any ) | Nil ), String ) ).new @scenecollection = OBSWebSocket::SceneCollection.new( @@reqchan, @@inputs ) @@inputs = OBSWebSocket::Inputs.new( @@reqchan ) @@ -47,6 +54,10 @@ module OBSWebSocket @negotiated = false @connecterror = 0 @shutdown = false + @closed = true + def closed? + return @closed + end def negotiated? return @negotiated end @@ -76,15 +87,23 @@ module OBSWebSocket end def transition! reschan = Channel( JSON::Any ).new - @@reqchan.send( { reschan, OBSWebSocket.req( "TriggerStudioModeTransition", UUID.random.to_s ) } ) + send( reschan, OBSWebSocket.req( "TriggerStudioModeTransition", UUID.random.to_s ) ) return reschan.receive end # request channel, response channel def send( json : String ) - self.reqchan.send( { nil, json } ) + if ( @@writer && ! @@writer.not_nil!.dead? ) + @@reqchan.send( { nil, json } ) + else + raise WriterUnavailableException.new( "Writer fiber is not available." ) + end end def send( reschan : ( Channel( JSON::Any ) | Nil ), json : String ) - self.reqchan.send( { reschan, json } ) + if @@writer && ! @@writer.not_nil!.dead? + @@reqchan.send( { reschan, json } ) + else + raise WriterUnavailableException.new( "Writer fiber is not available." ) + end end def request( type : String, id : String, data : ( String | Nil | JSON::Any ) = nil ) request = JSON.build do |json| @@ -120,7 +139,7 @@ module OBSWebSocket end self.reqchan.send( { reschan, request.to_s } ) end - def initialize( uri : String, password : String | Nil = nil ) + def initialize( uri : String, password : String | Nil = nil, retry : Bool = true ) spawn do eventsubs = Array( Channel(JSON::Any) ).new # initialize this here to it survives across connection retries loop do @@ -128,9 +147,13 @@ module OBSWebSocket break end @obs_pubsub = HTTP::WebSocket.new( URI.parse( uri ), HTTP::Headers{"Cookie" => "SESSIONID=1235", "Sec-WebSocket-Protocol" => "obswebsocket.json"} ) + @obs_pubsub.on_close do | message | # for some reason HTTP::WebSocket.closed? SIGSEGVs on a dangling pointer on the 1.6 runtime, so we hack around it with our own state variable for now. + @closed = true + end + @closed = false openrequests = Hash( String, Channel( JSON::Any ) ).new #metachan = Channel( Tuple( String, Channel( JSON::Any ) ) ).new - spawn do + @@writer = spawn do queue = Array( Tuple( Channel( JSON::Any ) | Nil, String ) ).new while msgtuple = @@reqchan.receive if msgtuple[1] == "clear queue" @@ -191,10 +214,14 @@ module OBSWebSocket j.object do j.field "rpcVersion", 1 j.field "eventSubscriptions", 526335 - if json["d"]["authentication"]? && password - secret = Base64.encode(OpenSSL::Digest.new("sha256").update( password + json["d"]["authentication"]["salt"].as_s ).final).chomp - auth = Base64.encode(OpenSSL::Digest.new("sha256").update( secret + json["d"]["authentication"]["challenge"].as_s ).final).chomp - j.field "authentication", auth + if json["d"]["authentication"]? + if password + secret = Base64.encode(OpenSSL::Digest.new("sha256").update( password + json["d"]["authentication"]["salt"].as_s ).final).chomp + auth = Base64.encode(OpenSSL::Digest.new("sha256").update( secret + json["d"]["authentication"]["challenge"].as_s ).final).chomp + j.field "authentication", auth + else + raise PasswordMissingException.new( "Negotiation: missing password" ) + end end end end @@ -349,9 +376,14 @@ module OBSWebSocket pp ex pp ex.backtrace? ensure - @obs_pubsub && @obs_pubsub.close + @closed || @obs_pubsub.close @negotiated = false - @@reqchan.send( { nil, "break" } ) # ask reader fiber to terminate + if @@writer + unless @@writer.not_nil!.dead? # wtf is the type checker smoking that Nil would be an option here? + @@reqchan.send( { nil, "break" } ) # ask writer fiber to terminate + end + @@writer = nil + end sleep 10 # invalidate state on everything @scenecollection = OBSWebSocket::SceneCollection.new( @@reqchan, @@inputs ) @@ -361,11 +393,14 @@ module OBSWebSocket @outputs = OBSWebSocket::Outputs.new( @@reqchan ) @video = OBSWebSocket::VideoSettings.new( @@reqchan ) @stats = OBSWebSocket::Stats.new( @@reqchan ) + retry || ( @shutdown = true ) end end end def close @shutdown = true + # Does it matter if the writer thread is even running, here? + @@reqchan.send( { nil, "break" } ) # ask writer fiber to terminate @obs_pubsub && @obs_pubsub.close end end -- cgit v1.2.3