From 3520e5f5b39bc54d59cff8bddc6b46ecdec06d4f Mon Sep 17 00:00:00 2001 From: Joe Rayhawk Date: Wed, 23 Nov 2022 00:54:49 -0800 Subject: EventSubscriptions: make event masking possible both at the weboscket level and at the fiber channel level --- src/obswebsocket.cr | 71 +++++++++++++++++++++++++++++++++++++++++++---------- 1 file changed, 58 insertions(+), 13 deletions(-) (limited to 'src') diff --git a/src/obswebsocket.cr b/src/obswebsocket.cr index 1ac4997..95fe63a 100644 --- a/src/obswebsocket.cr +++ b/src/obswebsocket.cr @@ -11,6 +11,35 @@ end module OBS extend self + + # How should this be used? + @[Flags] + enum EventSubscription : UInt32 + Config + Scenes + Inputs + Transitions + Filters + Outputs + SceneItems + MediaInputs + Vendors + Ui + Unused11 + Unused12 + Unused13 + Unused14 + Unused15 + InputVolumeMeters + InputActiveStateChanged + InputShowStateChanged + SceneItemTransformChanged + # "All" in the protocol docs is not actually "all events", but "all non-high-volume events" because reasons??? + # The unused enums are presumably reserved for that class. + AllNonHigh = Config|Scenes|Inputs|Transitions|Filters|Outputs|SceneItems|MediaInputs|Vendors|Ui|Unused11|Unused12|Unused13|Unused14|Unused15 + ORMCritical = Scenes|Inputs| Filters|Outputs|SceneItems|SceneItemTransformChanged + end + def req( type : String, id : String, data : ( String | Nil | JSON::Any ) = nil ) request = JSON.build do |json| json.object do @@ -48,7 +77,7 @@ module OBS @outputs : OBS::Outputs = OBS::Outputs.allocate @video : OBS::VideoSettings = OBS::VideoSettings.allocate @stats : OBS::Stats = OBS::Stats.allocate - @eventsubs = Array( Channel(JSON::Any) ).new + @eventsubs = Array( Tuple( Channel(JSON::Any), UInt32 ) ).new # convert this to Hash if anyone ever wants eventsub_remove() @openrequests = Hash( String, Channel( JSON::Any ) ).new @negotiated = false @negotiationdelayqueue = 0 @@ -56,13 +85,20 @@ module OBS @connecterror = 0 @shutdown = false @closed = true + @eventmask : UInt32 = (EventSubscription::AllNonHigh|EventSubscription::ORMCritical).value def closed? return @closed end - def eventsub_add( channel : Channel( JSON::Any ) ) - # FIXME: this currently subscribes to everything - # maybe make it more specific at some point - @eventsubs.push( channel ) + def eventsub_add( channel : Channel( JSON::Any ), mask : UInt32 = @eventmask ) + unless @eventmask.bits_set?( mask ) + @eventmask = @eventmask|( mask ) # bitwise OR + if @websocket + STDERR.puts( "Warning: eventsub_add mask requesting more bits than negotiated. Reconnecting." ) + STDERR.puts( "Warning: To avoid this, initialize OBS::WebSocket with eventmask: #{@eventmask}" ) + @websocket.not_nil!.close + end + end + @eventsubs.push( { channel, mask } ) end def negotiated? return @negotiated @@ -96,7 +132,6 @@ module OBS # request channel, response channel def send( json : String ) unless @negotiated - STDERR.puts( "DELAYING" ) @negotiationdelayqueue += 1 negotiationresult = @negotiationdelay.receive if negotiationresult.is_a?( Exception ) @@ -109,7 +144,6 @@ module OBS end def send( reschan : ( Channel( JSON::Any ) | Nil ), json : String ) unless @negotiated - STDERR.puts( "DELAYING" ) @negotiationdelayqueue += 1 negotiationresult = @negotiationdelay.receive if negotiationresult.is_a?( Exception ) @@ -156,7 +190,7 @@ module OBS end self.send( reschan, request.to_s ) end - def initialize( uri : String, password : String | Nil = nil, retry : Bool = true ) + def initialize( uri : String, password : String | Nil = nil, retry : Bool = true, @eventmask : UInt32 = @eventmask ) @scenecollection.initialize( self ) @inputs.initialize( self ) @scenes.initialize( self ) @@ -165,6 +199,10 @@ module OBS @video.initialize( self ) @stats.initialize( self ) + unless @eventmask.bits_set?( EventSubscription::ORMCritical.value ) + STDERR.puts( "WARNING: ORM-critical event subscriptions are missing. Expect broken statekeeping." ) + end + spawn do loop do if @shutdown @@ -195,7 +233,7 @@ module OBS j.field "d" do j.object do j.field "rpcVersion", 1 - j.field "eventSubscriptions", 526335 + j.field "eventSubscriptions", @eventmask if json["d"]["authentication"]? if password secret = Base64.encode(OpenSSL::Digest.new("sha256").update( password + json["d"]["authentication"]["salt"].as_s ).final).chomp @@ -210,6 +248,7 @@ module OBS end end + STDERR.puts( "SENT: #{hello.pretty_inspect}" ) @websocket.not_nil!.send( hello.to_s ) when 2 # identified @negotiated = true @@ -219,9 +258,10 @@ module OBS end @connecterror = 0 when 5 # event - # FIXME: These should be switched over to Enum flags - @eventsubs.each do | eventchan | - eventchan.send( json["d"] ) + @eventsubs.each do | eventsub | + if ( eventsub[1].bits_set?( json["d"]["eventIntent"].as_i ) ) + eventsub[0].send( json["d"] ) + end end # A Fiber.yield occurs immediately after this to make sure "json" doesn't get overwritten before we can use it. spawn do @@ -361,9 +401,15 @@ module OBS @negotiationdelayqueue -= 1 end end + @closed || @websocket && @websocket.not_nil!.close + @negotiated = false + sleep 5 rescue ex STDERR.print( ex.pretty_inspect ) STDERR.print( ex.backtrace?.pretty_inspect ) + @closed || @websocket && @websocket.not_nil!.close + @negotiated = false + sleep 5 ensure @closed || @websocket && @websocket.not_nil!.close @negotiated = false @@ -373,7 +419,6 @@ module OBS @negotiationdelayqueue -= 1 end end - sleep 10 # invalidate state on everything @scenecollection = OBS::SceneCollection.new( self ) @inputs = OBS::Inputs.new( self ) -- cgit v1.2.3