From 3520e5f5b39bc54d59cff8bddc6b46ecdec06d4f Mon Sep 17 00:00:00 2001
From: Joe Rayhawk <jrayhawk@fairlystable.org>
Date: Wed, 23 Nov 2022 00:54:49 -0800
Subject: EventSubscriptions: make event masking possible both at the weboscket
 level and at the fiber channel level

---
 src/obswebsocket.cr | 71 +++++++++++++++++++++++++++++++++++++++++++----------
 1 file changed, 58 insertions(+), 13 deletions(-)

(limited to 'src')

diff --git a/src/obswebsocket.cr b/src/obswebsocket.cr
index 1ac4997..95fe63a 100644
--- a/src/obswebsocket.cr
+++ b/src/obswebsocket.cr
@@ -11,6 +11,35 @@ end
 
 module OBS
   extend self
+
+  # How should this be used?
+  @[Flags]
+  enum EventSubscription : UInt32
+    Config
+    Scenes
+    Inputs
+    Transitions
+    Filters
+    Outputs
+    SceneItems
+    MediaInputs
+    Vendors
+    Ui
+    Unused11
+    Unused12
+    Unused13
+    Unused14
+    Unused15
+    InputVolumeMeters
+    InputActiveStateChanged
+    InputShowStateChanged
+    SceneItemTransformChanged
+    # "All" in the protocol docs is not actually "all events", but "all non-high-volume events" because reasons???
+    # The unused enums are presumably reserved for that class.
+    AllNonHigh  = Config|Scenes|Inputs|Transitions|Filters|Outputs|SceneItems|MediaInputs|Vendors|Ui|Unused11|Unused12|Unused13|Unused14|Unused15
+    ORMCritical =        Scenes|Inputs|            Filters|Outputs|SceneItems|SceneItemTransformChanged
+  end
+
   def req( type : String, id : String, data : ( String | Nil | JSON::Any ) = nil )
     request = JSON.build do |json|
       json.object do
@@ -48,7 +77,7 @@ module OBS
     @outputs         : OBS::Outputs         = OBS::Outputs.allocate
     @video           : OBS::VideoSettings   = OBS::VideoSettings.allocate
     @stats           : OBS::Stats           = OBS::Stats.allocate
-    @eventsubs = Array( Channel(JSON::Any) ).new
+    @eventsubs = Array( Tuple( Channel(JSON::Any), UInt32 ) ).new # convert this to Hash if anyone ever wants eventsub_remove()
     @openrequests = Hash( String, Channel( JSON::Any ) ).new
     @negotiated = false
     @negotiationdelayqueue = 0
@@ -56,13 +85,20 @@ module OBS
     @connecterror = 0
     @shutdown = false
     @closed = true
+    @eventmask : UInt32 = (EventSubscription::AllNonHigh|EventSubscription::ORMCritical).value
     def closed?
       return @closed
     end
-    def eventsub_add( channel : Channel( JSON::Any ) )
-      # FIXME: this currently subscribes to everything
-      # maybe make it more specific at some point
-      @eventsubs.push( channel )
+    def eventsub_add( channel : Channel( JSON::Any ), mask : UInt32 = @eventmask )
+      unless @eventmask.bits_set?( mask )
+        @eventmask = @eventmask|( mask ) # bitwise OR
+        if @websocket
+          STDERR.puts( "Warning: eventsub_add mask requesting more bits than negotiated. Reconnecting." )
+          STDERR.puts( "Warning: To avoid this, initialize OBS::WebSocket with eventmask: #{@eventmask}" )
+          @websocket.not_nil!.close
+        end
+      end
+      @eventsubs.push( { channel, mask } )
     end
     def negotiated?
       return @negotiated
@@ -96,7 +132,6 @@ module OBS
     # request channel, response channel
     def send( json : String )
       unless @negotiated
-        STDERR.puts( "DELAYING" )
         @negotiationdelayqueue += 1
         negotiationresult = @negotiationdelay.receive
         if negotiationresult.is_a?( Exception )
@@ -109,7 +144,6 @@ module OBS
     end
     def send( reschan : ( Channel( JSON::Any ) | Nil ), json : String )
       unless @negotiated
-        STDERR.puts( "DELAYING" )
         @negotiationdelayqueue += 1
         negotiationresult = @negotiationdelay.receive
         if negotiationresult.is_a?( Exception )
@@ -156,7 +190,7 @@ module OBS
       end
       self.send( reschan, request.to_s )
     end
-    def initialize( uri : String, password : String | Nil = nil, retry : Bool = true )
+    def initialize( uri : String, password : String | Nil = nil, retry : Bool = true, @eventmask : UInt32 = @eventmask )
       @scenecollection.initialize( self )
       @inputs.initialize( self )
       @scenes.initialize( self )
@@ -165,6 +199,10 @@ module OBS
       @video.initialize( self )
       @stats.initialize( self )
 
+      unless @eventmask.bits_set?( EventSubscription::ORMCritical.value )
+        STDERR.puts( "WARNING: ORM-critical event subscriptions are missing. Expect broken statekeeping." )
+      end
+
       spawn do
         loop do
           if @shutdown
@@ -195,7 +233,7 @@ module OBS
                   j.field "d" do
                     j.object do
                       j.field "rpcVersion", 1
-                      j.field "eventSubscriptions", 526335
+                      j.field "eventSubscriptions", @eventmask
                       if json["d"]["authentication"]?
                         if password
                           secret = Base64.encode(OpenSSL::Digest.new("sha256").update( password + json["d"]["authentication"]["salt"].as_s    ).final).chomp
@@ -210,6 +248,7 @@ module OBS
                 end
               end
 
+              STDERR.puts( "SENT: #{hello.pretty_inspect}" )
               @websocket.not_nil!.send( hello.to_s )
             when 2 # identified
               @negotiated = true
@@ -219,9 +258,10 @@ module OBS
               end
               @connecterror = 0
             when 5 # event
-              # FIXME: These should be switched over to Enum flags
-              @eventsubs.each do | eventchan |
-                eventchan.send( json["d"] )
+              @eventsubs.each do | eventsub |
+                if ( eventsub[1].bits_set?( json["d"]["eventIntent"].as_i ) )
+                  eventsub[0].send( json["d"] )
+                end
               end
               # A Fiber.yield occurs immediately after this to make sure "json" doesn't get overwritten before we can use it.
               spawn do
@@ -361,9 +401,15 @@ module OBS
               @negotiationdelayqueue -= 1
             end
           end
+          @closed || @websocket && @websocket.not_nil!.close
+          @negotiated = false
+          sleep 5
         rescue ex
           STDERR.print( ex.pretty_inspect )
           STDERR.print( ex.backtrace?.pretty_inspect )
+          @closed || @websocket && @websocket.not_nil!.close
+          @negotiated = false
+          sleep 5
         ensure
           @closed || @websocket && @websocket.not_nil!.close
           @negotiated = false
@@ -373,7 +419,6 @@ module OBS
               @negotiationdelayqueue -= 1
             end
           end
-          sleep 10
           # invalidate state on everything
           @scenecollection = OBS::SceneCollection.new( self )
           @inputs = OBS::Inputs.new( self )
-- 
cgit v1.2.3