summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJoe Rayhawk <jrayhawk@fairlystable.org>2022-11-23 00:54:49 -0800
committerJoe Rayhawk <jrayhawk@fairlystable.org>2022-11-23 00:54:49 -0800
commit3520e5f5b39bc54d59cff8bddc6b46ecdec06d4f (patch)
tree992259fc0009bc05614223d4a8b1a5885f38aabd
parentbc26a5153c8026443b91202f667bcbd967457191 (diff)
downloadcrystal-obs-websocket-3520e5f5b39bc54d59cff8bddc6b46ecdec06d4f.tar.gz
crystal-obs-websocket-3520e5f5b39bc54d59cff8bddc6b46ecdec06d4f.zip
EventSubscriptions: make event masking possible both at the weboscket level and at the fiber channel level
-rw-r--r--src/obswebsocket.cr71
1 files changed, 58 insertions, 13 deletions
diff --git a/src/obswebsocket.cr b/src/obswebsocket.cr
index 1ac4997..95fe63a 100644
--- a/src/obswebsocket.cr
+++ b/src/obswebsocket.cr
@@ -11,6 +11,35 @@ end
module OBS
extend self
+
+ # How should this be used?
+ @[Flags]
+ enum EventSubscription : UInt32
+ Config
+ Scenes
+ Inputs
+ Transitions
+ Filters
+ Outputs
+ SceneItems
+ MediaInputs
+ Vendors
+ Ui
+ Unused11
+ Unused12
+ Unused13
+ Unused14
+ Unused15
+ InputVolumeMeters
+ InputActiveStateChanged
+ InputShowStateChanged
+ SceneItemTransformChanged
+ # "All" in the protocol docs is not actually "all events", but "all non-high-volume events" because reasons???
+ # The unused enums are presumably reserved for that class.
+ AllNonHigh = Config|Scenes|Inputs|Transitions|Filters|Outputs|SceneItems|MediaInputs|Vendors|Ui|Unused11|Unused12|Unused13|Unused14|Unused15
+ ORMCritical = Scenes|Inputs| Filters|Outputs|SceneItems|SceneItemTransformChanged
+ end
+
def req( type : String, id : String, data : ( String | Nil | JSON::Any ) = nil )
request = JSON.build do |json|
json.object do
@@ -48,7 +77,7 @@ module OBS
@outputs : OBS::Outputs = OBS::Outputs.allocate
@video : OBS::VideoSettings = OBS::VideoSettings.allocate
@stats : OBS::Stats = OBS::Stats.allocate
- @eventsubs = Array( Channel(JSON::Any) ).new
+ @eventsubs = Array( Tuple( Channel(JSON::Any), UInt32 ) ).new # convert this to Hash if anyone ever wants eventsub_remove()
@openrequests = Hash( String, Channel( JSON::Any ) ).new
@negotiated = false
@negotiationdelayqueue = 0
@@ -56,13 +85,20 @@ module OBS
@connecterror = 0
@shutdown = false
@closed = true
+ @eventmask : UInt32 = (EventSubscription::AllNonHigh|EventSubscription::ORMCritical).value
def closed?
return @closed
end
- def eventsub_add( channel : Channel( JSON::Any ) )
- # FIXME: this currently subscribes to everything
- # maybe make it more specific at some point
- @eventsubs.push( channel )
+ def eventsub_add( channel : Channel( JSON::Any ), mask : UInt32 = @eventmask )
+ unless @eventmask.bits_set?( mask )
+ @eventmask = @eventmask|( mask ) # bitwise OR
+ if @websocket
+ STDERR.puts( "Warning: eventsub_add mask requesting more bits than negotiated. Reconnecting." )
+ STDERR.puts( "Warning: To avoid this, initialize OBS::WebSocket with eventmask: #{@eventmask}" )
+ @websocket.not_nil!.close
+ end
+ end
+ @eventsubs.push( { channel, mask } )
end
def negotiated?
return @negotiated
@@ -96,7 +132,6 @@ module OBS
# request channel, response channel
def send( json : String )
unless @negotiated
- STDERR.puts( "DELAYING" )
@negotiationdelayqueue += 1
negotiationresult = @negotiationdelay.receive
if negotiationresult.is_a?( Exception )
@@ -109,7 +144,6 @@ module OBS
end
def send( reschan : ( Channel( JSON::Any ) | Nil ), json : String )
unless @negotiated
- STDERR.puts( "DELAYING" )
@negotiationdelayqueue += 1
negotiationresult = @negotiationdelay.receive
if negotiationresult.is_a?( Exception )
@@ -156,7 +190,7 @@ module OBS
end
self.send( reschan, request.to_s )
end
- def initialize( uri : String, password : String | Nil = nil, retry : Bool = true )
+ def initialize( uri : String, password : String | Nil = nil, retry : Bool = true, @eventmask : UInt32 = @eventmask )
@scenecollection.initialize( self )
@inputs.initialize( self )
@scenes.initialize( self )
@@ -165,6 +199,10 @@ module OBS
@video.initialize( self )
@stats.initialize( self )
+ unless @eventmask.bits_set?( EventSubscription::ORMCritical.value )
+ STDERR.puts( "WARNING: ORM-critical event subscriptions are missing. Expect broken statekeeping." )
+ end
+
spawn do
loop do
if @shutdown
@@ -195,7 +233,7 @@ module OBS
j.field "d" do
j.object do
j.field "rpcVersion", 1
- j.field "eventSubscriptions", 526335
+ j.field "eventSubscriptions", @eventmask
if json["d"]["authentication"]?
if password
secret = Base64.encode(OpenSSL::Digest.new("sha256").update( password + json["d"]["authentication"]["salt"].as_s ).final).chomp
@@ -210,6 +248,7 @@ module OBS
end
end
+ STDERR.puts( "SENT: #{hello.pretty_inspect}" )
@websocket.not_nil!.send( hello.to_s )
when 2 # identified
@negotiated = true
@@ -219,9 +258,10 @@ module OBS
end
@connecterror = 0
when 5 # event
- # FIXME: These should be switched over to Enum flags
- @eventsubs.each do | eventchan |
- eventchan.send( json["d"] )
+ @eventsubs.each do | eventsub |
+ if ( eventsub[1].bits_set?( json["d"]["eventIntent"].as_i ) )
+ eventsub[0].send( json["d"] )
+ end
end
# A Fiber.yield occurs immediately after this to make sure "json" doesn't get overwritten before we can use it.
spawn do
@@ -361,9 +401,15 @@ module OBS
@negotiationdelayqueue -= 1
end
end
+ @closed || @websocket && @websocket.not_nil!.close
+ @negotiated = false
+ sleep 5
rescue ex
STDERR.print( ex.pretty_inspect )
STDERR.print( ex.backtrace?.pretty_inspect )
+ @closed || @websocket && @websocket.not_nil!.close
+ @negotiated = false
+ sleep 5
ensure
@closed || @websocket && @websocket.not_nil!.close
@negotiated = false
@@ -373,7 +419,6 @@ module OBS
@negotiationdelayqueue -= 1
end
end
- sleep 10
# invalidate state on everything
@scenecollection = OBS::SceneCollection.new( self )
@inputs = OBS::Inputs.new( self )