From d89da9fdc2dfad70d80304da728bcc57438a0a63 Mon Sep 17 00:00:00 2001 From: Joe Rayhawk Date: Sun, 14 Jan 2024 01:44:29 -0800 Subject: crystal/tcpsocket.cr: add twitchapi fiber, rename detection Note: This needs BungmoBott::Socket implementation --- crystal/tcpsocket.cr | 227 ++++++++++++++++++++++++--------------------------- 1 file changed, 107 insertions(+), 120 deletions(-) diff --git a/crystal/tcpsocket.cr b/crystal/tcpsocket.cr index f97169e..a73b37c 100644 --- a/crystal/tcpsocket.cr +++ b/crystal/tcpsocket.cr @@ -149,6 +149,15 @@ regextwitchuser = /^[0-9a-zA-Z_]+$/ if ( secrets.twitch_access_token && secrets.twitch_client_id ) twitchapi = true + twitchclient = Twitcr::Client.new( Hash( String, String ){ + "client_id" => secrets.twitch_client_id.not_nil!, + "access_token" => secrets.twitch_access_token.not_nil! + + } ) + # derive twitch_channel_id from channel or vice versa + unless config.twitch_user_id + config.twitch_user_id = twitchclient.user( config.chat_user.not_nil!.twitch.not_nil! ).id.to_u32 + end else twitchapi = false secrets.twitch_access_token || STDERR.puts "Warning: #{secretsfile} 'twitch_access_token' is missing; direct Twitch API access disabled." @@ -166,6 +175,10 @@ else end end +# FIXME: might need twitch_channel_id eventually ...? +if twitchapi +end + # enable direct gcloud api? if secrets.gcloud_token gcloud = true @@ -214,11 +227,12 @@ ircipc = Channel( Tuple( String, String ) ).new commandircipc = Channel( Tuple( String, FastIRC::Message ) ).new commandipc = Channel( String ).new # t2sipc: voice, text -t2sipc = Channel( Tuple( String, String ) ).new -twitchipc = Channel( Tuple( String, String ) ).new -bbscliipc = Channel( String ).new -bbssrvipc = Channel( String ).new -waitgroup = Channel( String ).new +t2sipc = Channel( Tuple( String, String ) ).new +twitchircipc = Channel( Tuple( String, String ) ).new +twitchapiipc = Channel( Tuple( String, String | UInt64 ) ).new +bbscliipc = Channel( String ).new +bbssrvipc = Channel( String ).new +waitgroup = Channel( String ).new fiberipc = Channel( Fiber ).new @@ -243,7 +257,7 @@ if config.obs_connect d = json # Copy *immediately* case d["eventType"].as_s when "CurrentProgramSceneChanged" - say( twitchipc, config.chat_user.not_nil!.twitch.not_nil!, "| obs: switched scene to " + ( d["eventData"]["sceneName"]?.as_s? || "unknown" ) ) + say( twitchircipc, config.chat_user.not_nil!.twitch.not_nil!, "| obs: switched scene to " + ( d["eventData"]["sceneName"]?.as_s? || "unknown" ) ) when "MediaInputPlaybackEnded" if d["eventData"]["inputName"].as_s =~ /^media-temporary-/ obs.send( OBS.req( "RemoveInput", JSON.parse({ "inputName" => d["eventData"]["inputName"].as_s }.to_json) ) ) @@ -254,7 +268,7 @@ if config.obs_connect edata = d["eventData"] name = obs.scenes[ edata["sceneName"].as_s ][ edata["sceneItemId"].as_i64 ].name if name !~ /media-temporary/ - say( twitchipc, config.chat_user.not_nil!.twitch.not_nil!, "| obs: source #{name} visibility is now #{edata["sceneItemEnabled"].as_bool}" ) + say( twitchircipc, config.chat_user.not_nil!.twitch.not_nil!, "| obs: source #{name} visibility is now #{edata["sceneItemEnabled"].as_bool}" ) end when "SceneItemTransformChanged" edata = d["eventData"] @@ -276,7 +290,7 @@ if config.obs_connect end when "SourceFilterEnableStateChanged" edata = d["eventData"] - say( twitchipc, config.chat_user.not_nil!.twitch.not_nil!, "| obs: source #{edata["sourceName"].as_s} filter #{edata["filterName"].as_s} visibility is currently #{edata["filterEnabled"].as_bool}" ) + say( twitchircipc, config.chat_user.not_nil!.twitch.not_nil!, "| obs: source #{edata["sourceName"].as_s} filter #{edata["filterName"].as_s} visibility is currently #{edata["filterEnabled"].as_bool}" ) end end Fiber.yield @@ -298,31 +312,6 @@ end # end #end -# FIXME: might need twitch_channel_id eventually ...? -#if twitchapi -# twitchclient = Twitcr::Client.new( defaultsettings ) -# # derive twitch_channel_id from channel or vice versa -# [ "channel", "twitch_channel_id" ].each do |key| -# File.exists?( config.configdir + key ) && ( settings[key] = File.read( settings["config.configdir"] + key ).chomp ) -# end -# if ( config.channel? =~ regextwitchuser ) && ( settings["twitch_channel_id"]? =~ /^[0-9]+$/ ) -# elsif ! ( config.channel? =~ regextwitchuser ) && ( settings["twitch_channel_id"]? =~ /^[0-9]+$/ ) -# config.channel = twitchclient.user( settings["twitch_channel_id"].to_u64 ).login -# elsif ( config.channel? =~ regextwitchuser ) && ! ( settings["twitch_channel_id"]? =~ /^[0-9]+$/ ) -# config.twitch_channel_id = twitchclient.user( settings["channel"] ).id.to_s -# else -# STDERR.puts "ERROR: Missing #{config.configdir}channel and twitch_channel_id configuration keys." -# error = true -# # exit 2 -# end -# if error == true -# {% if flag?(:windows) %} -# STDERR.puts "press enter to end program" -# gets -# {% end %} -# exit 1 -# end -#end def urbandef( term : String ) #{ @@ -451,7 +440,7 @@ def playaudiofile( filepath : String ) p.wait end -def userlog( ircipc : Channel, config : BungmoBott::Config, message : FastIRC::Message ) +def userlog( config : BungmoBott::Config, message : FastIRC::Message ) : Tuple( String, UInt64, String, Int32 | Int64, String | Nil ) | Nil unless ( ( prefix = message.prefix ) && ( chatuser = prefix.source ) && ( uid = message.tags["user-id"]? ) ) return nil end @@ -466,27 +455,21 @@ def userlog( ircipc : Channel, config : BungmoBott::Config, message : FastIRC::M Dir.mkdir_p( userdir + "/names" ) File.touch( userdir ) unless testrefuser2uid( userdir + "/names/" + chatuser ) - namelatest = "" + oldname = nil datelatest = Time::UNIX_EPOCH Dir.each_child( userdir + "/names/" ) do |name| namedate = File.info( userdir + "/names/" + name ).modification_time if namedate > datelatest - namelatest = name + oldname = name datelatest = namedate end end - unless namelatest.empty? - ircipc.send( { "##{config.chat_user.not_nil!.twitch}", "Rename detected: #{uid}: #{namelatest} -> #{chatuser}" } ) - end genrefuser2uid( userdir + "/names/" + chatuser, uid, 3 ) end unless testrefuser2uid( basedir + "/names/" + chatuser ) genrefuser2uid( basedir + "/names/" + chatuser, uid, 1 ) end - unless ( message.params[0] == "##{config.chat_user.not_nil!.twitch}" ) - return nil - end - return ( { chatuser, uid, userdir, lastseen } ) + return ( { chatuser, uid.to_u64, userdir, lastseen, oldname } ) rescue ex puts ex end @@ -594,22 +577,14 @@ spawn name: "command_dispatch" do end end - next unless ( userlogreturn = userlog( twitchipc, config, message ) ) - chatuser, uid, userdir, lastseen = userlogreturn - if config.voice_list - if ( t2sreturn = t2s( t2sipc, config, userdir, chatuser, message.params[1] ) ) - lastvoice.insert( 0, t2sreturn ) - lastvoice = lastvoice[0..4] - end - else - STDERR.puts "WARNING: config.voice_list empty" - end + next unless ( userlogreturn = userlog( config, message ) ) + chatuser, uid, userdir, lastseen, oldname = userlogreturn # Have we seen this user lately? if ( ( Time.utc.to_unix - lastseen ) >= 14400 ) - twitchipc.send( { "get_user", uid.to_s } ) - twitchipc.send( { "get_followers", uid.to_s } ) + twitchapiipc.send( { "get_user", uid.to_s } ) + twitchapiipc.send( { "get_followers", uid.to_s } ) prevnames = Array( String ).new - if ( prevnames = Dir.children( config.statedir + "/uids/" + uid + "/names" ) ) && ( prevnames.size > 1 ) + if ( prevnames = Dir.children( config.statedir + "/uids/#{uid}/names" ) ) && ( prevnames.size > 1 ) prevnames.delete( chatuser ) puts "\033[38;5;14m#{chatuser} previous names: #{prevnames.join(", ")}\033[0m" end @@ -655,6 +630,12 @@ spawn name: "command_dispatch" do ( sub && ( exec.perm && exec.perm.not_nil!.includes?("sub") ) ) || ( vip && ( exec.perm && exec.perm.not_nil!.includes?("vip") ) ) ) + if ( exec.func == "detect_rename" ) + unless oldname.is_a?( String ) + ircipc.send( { "##{config.chat_user.not_nil!.twitch}", "Rename detected: #{uid}: #{oldname} -> #{chatuser}" } ) + end + next + end if obs # FIXME: better validate args ( exec.func == "obs_random_source_enable" ) && obsrandommediaenable( obs, exec.arg.not_nil![0].not_nil! ) && next @@ -670,7 +651,7 @@ spawn name: "command_dispatch" do end end # FIXME: add gamesurge support - ( exec.func == "say" ) && say( twitchipc, config.chat_user.not_nil!.twitch.not_nil!, exec.arg.not_nil![0].not_nil! ) && next + ( exec.func == "say" ) && say( twitchircipc, config.chat_user.not_nil!.twitch.not_nil!, exec.arg.not_nil![0].not_nil! ) && next STDERR.puts "WARNING: unhandled function for /#{commandregex}/: #{exec.func}" else STDOUT.print "DENIED: " @@ -824,11 +805,11 @@ spawn name: "command_dispatch" do # # FIXME: This is only half-implemented # elsif ( ( cmd =~ /^(user)$/ ) && ( mod || own ) ) # if match[2]? && match[2] =~ /[a-z][a-z0-9_]+/ -# twitchipc.send( { "get_user", match[2] } ) +# twitchapiipc.send( { "get_user", match[2] } ) # elsif match[2]? && match[2] =~ /[0-9]+/ -# twitchipc.send( { "get_user", match[2].to_u64 } ) +# twitchapiipc.send( { "get_user", match[2].to_u64 } ) # else -# twitchipc.send( { "get_user", settings["channel_id"].to_u64 } ) +# twitchapiipc.send( { "get_user", settings["channel_id"].to_u64 } ) # end # elsif ( ( cmd =~ /^(status|title)$/ ) && ( mod || own ) ) # if match[2]? @@ -1005,7 +986,7 @@ spawn name: "command_dispatch" do puts ex puts ex.backtrace if service == "twitch" - say( twitchipc, config.chat_user.not_nil!.twitch.not_nil!, "An error occurred! " + ex.message.to_s ) + say( twitchircipc, config.chat_user.not_nil!.twitch.not_nil!, "An error occurred! " + ex.message.to_s ) elsif service == "gamesurge" say( ircipc, config.chat_user.not_nil!.gamesurge.not_nil!, "An error occurred! " + ex.message.to_s ) else @@ -1104,33 +1085,42 @@ fibers[fiber.name.not_nil!] = fiber # Twitch API request handling thread -#spawn do -# loop do -# begin -# while twitchtuple = twitchipc.receive -# cmd, arg = [ *twitchtuple ] -# case cmd -# when "get_user" -# userinfo = JSON.parse( twitchclient.get_user( arg ) )["data"][0] -# pp userinfo -# unless userinfo["broadcaster_type"].as_s.blank? -# puts "\033[38;5;12m#{userinfo["login"]} is #{userinfo["broadcaster_type"]}\033[0m" -# end -# userage = ( Time.utc.to_unix - Time::Format::RFC_3339.parse( userinfo["created_at"].as_s ).to_unix ) -# if ( userage - 172800 ) < 0 -# puts "\033[38;5;1m#{userinfo["login"]}'s account is #{((172800 - userage)/60/60).to_i64} hours old.\033[0m" -# end -# when "get_followers" -# followers = JSON.parse( twitchclient.get_user_follows( to: arg.to_u64 ) )["total"].as_i64 -# if followers > 500 -# puts "\033[38;5;2m#{followers} followers\033[0m" -# end -# end -# end -# end -# end -#end -# +# FIXME: Implement ratelimiting here. +if twitchclient.is_a?( Twitcr::Client ) + spawn name: "Twitcr::Client" do + fiberipc.send( Fiber.current ) + loop do + begin + while twitchtuple = twitchapiipc.receive + cmd, arg = [ *twitchtuple ] + case cmd + when "get_user" + userinfo = JSON.parse( twitchclient.get_user( arg ) )["data"][0] + pp userinfo + unless userinfo["broadcaster_type"].as_s.blank? + puts "\033[38;5;12m#{userinfo["login"]} is #{userinfo["broadcaster_type"]}\033[0m" + end + userage = ( Time.utc.to_unix - Time::Format::RFC_3339.parse( userinfo["created_at"].as_s ).to_unix ) + if ( userage - 172800 ) < 0 + puts "\033[38;5;1m#{userinfo["login"]}'s account is #{((172800 - userage)/60/60).to_i64} hours old.\033[0m" + end + when "get_followers" + followers = JSON.parse( twitchclient.get_channel_followers( to: arg.to_u64 ) )["total"].as_i64 + if followers > 500 + puts "\033[38;5;2m#{followers} followers\033[0m" + end + end + end + rescue ex + pp ex + end + end + ensure + waitgroup.send( Fiber.current.name.not_nil! ) + end + fiber = fiberipc.receive + fibers[fiber.name.not_nil!] = fiber +end if ( secrets.twitch_access_token && config.chat_user.not_nil!.twitch && config.join_channels.not_nil!.twitch ) # Twitch::IRC thread @@ -1145,7 +1135,7 @@ if ( secrets.twitch_access_token && config.chat_user.not_nil!.twitch && config.j # "Most IRC servers limit messages to 512 bytes in length, including the trailing CR-LF characters." # PRIVMSG #channel message\r\n spawn do - while tuple = twitchipc.receive # why does this need to be a tuple? + while tuple = twitchircipc.receive # why does this need to be a tuple? sizelimit=( 512 - ( tuple[0].size + 12 ) ) if ( tuple[0] == "JOIN" ) if ( tuple[0] =~ regextwitchuser ) @@ -1212,7 +1202,7 @@ if ( secrets.twitch_access_token && config.chat_user.not_nil!.twitch && config.j sub = ( message.tags["subscriber"] == "1" ) rescue ex puts ex - #twitchipc.send( { "##{config.channel}", "An error occurred! " + ex.message.to_s } ) + #twitchircipc.send( { "##{config.channel}", "An error occurred! " + ex.message.to_s } ) # Maybe send all error messages out through the API? Have to do channel->client mappings, though. end end @@ -1247,36 +1237,34 @@ if config.bungmobott_connect bbscli_host, bbscli_port = config.bungmobott_connect.not_nil!.split(":") puts "spawning BungmoBott::Socket client" spawn name: "BungmoBott::Socket client" do + fiberipc.send( Fiber.current ) loop do puts "looping BungmoBott::Socket client" user = config.chat_user.not_nil!.twitch.not_nil! bungmobott_key = secrets.bungmobott_key.not_nil! - fiberipc.send( Fiber.current ) - socket = TCPSocket.new( bbscli_host, bbscli_port.to_u16 ) - context = OpenSSL::SSL::Context::Client.new - ssl_socket = OpenSSL::SSL::Socket::Client.new( socket, context ) + ssl_socket = OpenSSL::SSL::Socket::Client.new( TCPSocket.new( bbscli_host, bbscli_port.to_u16 ), OpenSSL::SSL::Context::Client.new ) ssl_socket.sync = true negotiated = false spawn do - while input = bbscliipc.receive - # ssl_socket gets redefined in the event of I/O errors, so we deal with it here. - #if ( climsg = input.match( /^irc +twitch +JOIN \#+([a-zA-Z0-9_]+) *$/ ) ) - #end - ssl_socket.puts( input ) + while message = ssl_socket.gets + puts "BungmoBott::Socket cli recv: " + message.gsub( bungmobott_key, "CENSORED" ) + case message + when /^error/ + raise Exception.new("BungmoBott::Socket Error: #{message}") + when /^authed/ + negotiated = true + ssl_socket.puts( "say twitch #{user} test" ) + when /^msg twitch/ + commandircipc.send( { "twitch", FastIRC.parse_line( message.split(" ")[2..].join(" ") ) } ) + end end end ssl_socket.puts( "auth #{user} #{bungmobott_key}" ) - while message = ssl_socket.gets - puts "BungmoBott::Socket cli recv: " + message.gsub( bungmobott_key, "CENSORED" ) - case message - when /^error/ - raise Exception.new("BungmoBott::Socket Error: #{message}") - when /^authed/ - negotiated = true - ssl_socket.puts( "say twitch #{user} test" ) - when /^msg twitch$/ - commandircipc.send( { "twitch", FastIRC.parse_line( message.split(" ")[2..].join(" ") ) } ) - end + while input = bbscliipc.receive + # ssl_socket gets redefined in the event of I/O errors, so we deal with it here. + #if ( climsg = input.match( /^irc +twitch +JOIN \#+([a-zA-Z0-9_]+) *$/ ) ) + #end + ssl_socket.puts( input ) end end rescue ex @@ -1332,7 +1320,7 @@ if config.bungmobott_listen ircchannel = match[3] if ircservice == "twitch" client.puts "joining #{ircservice} \##{ircchannel}" - twitchipc.send( { "JOIN", ircchannel } ) + twitchircipc.send( { "JOIN", ircchannel } ) unless channelsubs[ { ircservice, ircchannel } ]? channelsubs[ { ircservice, "#" + ircchannel } ] = Array( OpenSSL::SSL::Socket::Server ).new end @@ -1348,7 +1336,7 @@ if config.bungmobott_listen elsif ( match = message.match( /^gcsvoicelist$/i ) ) client.puts "gcsvoicelist " + generatevoicelistgcs( secrets.gcloud_token ).join(" ") elsif ( match = message.match( /^say twitch (.+)/i ) ) - say( twitchipc, connections[client]["user"], match[1] ) + say( twitchircipc, connections[client]["user"], match[1] ) elsif ( message =~ /testchannelsubs/ ) #commandircipc.send( "testchannelsubs" ) end @@ -1379,19 +1367,18 @@ if config.bungmobott_listen end end end - connections.delete( client ) - channelsubs.each_key do |key| - channelsubs[key].delete( client ) - end - puts "Disconnected: #{clientsocket.remote_address}" - pp channelsubs - pp connections rescue ex : IO::Error puts ex next rescue ex puts ex next + ensure + connections.delete( client ) + channelsubs.each_key do |key| + channelsubs[key].delete( client ) + end + puts "Disconnected: #{clientsocket.remote_address}" end end rescue ex @@ -1409,7 +1396,7 @@ if config.join_channels && config.join_channels.not_nil!.twitch if fibers["Twitch::IRC"]? ircservice = "twitch" config.join_channels.not_nil!.twitch.not_nil!.each do |ircchannel| - twitchipc.send( { "JOIN", ircchannel } ) + twitchircipc.send( { "JOIN", ircchannel } ) unless channelsubs[ { ircservice, ircchannel } ]? channelsubs[ { ircservice, "#" + ircchannel } ] = Array( OpenSSL::SSL::Socket::Server ).new # Do we ever care about this? -- cgit v1.2.3