From 79d6d80f6a346b8b4297f1a67a6b39c6a965daf1 Mon Sep 17 00:00:00 2001 From: Daniel Worrall Date: Tue, 12 Jun 2018 01:20:48 +0100 Subject: Add IRC --- shard.yml | 2 ++ spec/irc_spec.cr | 11 ++++++++ src/twitch/irc.cr | 65 ++++++++++++++++++++++++++++++++++++++++++++++ src/twitch/rate_limiter.cr | 39 ++++++++++++++++++++++++++++ 4 files changed, 117 insertions(+) create mode 100644 spec/irc_spec.cr create mode 100644 src/twitch/irc.cr create mode 100644 src/twitch/rate_limiter.cr diff --git a/shard.yml b/shard.yml index 1a9881f..27d4133 100644 --- a/shard.yml +++ b/shard.yml @@ -14,3 +14,5 @@ dependencies: fast_irc: github: rx14/fast_irc.cr version: 0.3.0 + rate_limiter: + github: z64/rate_limiter diff --git a/spec/irc_spec.cr b/spec/irc_spec.cr new file mode 100644 index 0000000..0053024 --- /dev/null +++ b/spec/irc_spec.cr @@ -0,0 +1,11 @@ +require "./spec_helper" + +describe "PING" do + it "responds with PONG" do + response = String.build do |io| + client = Twitch::IRC.new(io, "foo", "bar") + client.dispatch FastIRC::Message.new("PING", nil) + end + response.should eq FastIRC::Message.new("PONG", ["tmi.twitch.tv"]).to_s + end +end diff --git a/src/twitch/irc.cr b/src/twitch/irc.cr new file mode 100644 index 0000000..f5fccfc --- /dev/null +++ b/src/twitch/irc.cr @@ -0,0 +1,65 @@ +require "socket" +require "fast_irc" +require "./rate_limiter" + +class Twitch::IRC + private getter socket : IO + private getter limiter : RateLimiter(String) + + def self.new(nick, token) + new(TCPSocket.new("irc.chat.twitch.tv", 6667), nick, token) + end + + def initialize(@socket : IO, @nick : String, @token : String) + @limiter = RateLimiter(String).new + limiter.bucket(:join, 50_u32, 15.seconds) + end + + def run(channels) + authenticate + spawn join_channels(channels) + @connected = true + FastIRC.parse(socket) do |message| + spawn dispatch(message) + end + end + + def on_message(&@on_message : FastIRC::Message ->) + end + + def join_channel(channel) + return unless @connected + wait = limiter.rate_limited?(:join, "") + sleep(wait) if wait.is_a?(Time::Span) + raw_write("JOIN", ["##{channel}"]) + end + + def dispatch(message) + puts message.inspect + case message.command + when "PING" + pong + when "PRIVMSG" + @on_message.try &.call(message) + end + end + + private def authenticate + raw_write("PASS", [@token]) + raw_write("NICK", [@nick]) + end + + private def join_channels(channels) + channels.each do |channel| + join_channel(channel) + end + end + + private def pong + raw_write("PONG", ["tmi.twitch.tv"]) + end + + private def raw_write(command, params, prefix = nil, tags = nil) + FastIRC::Message.new(command, params, prefix: prefix, tags: tags).to_s(socket) + end +end diff --git a/src/twitch/rate_limiter.cr b/src/twitch/rate_limiter.cr new file mode 100644 index 0000000..929dbc0 --- /dev/null +++ b/src/twitch/rate_limiter.cr @@ -0,0 +1,39 @@ +require "rate_limiter" + +class RateLimiter(T) + class Bucket(K) + def initialize(@limit : UInt32, @time_span : Time::Span, + @delay : Time::Span = 0.seconds) + @bucket = {} of K => Deque(Time) + end + + def rate_limited?(key : K, rate_limit_time = Time.now) + queue = @bucket[key]? + + unless queue + @bucket[key] = Deque(Time).new(1, rate_limit_time) + return false + end + + first_time = queue[0] + last_time = queue[-1] + + if @limit && (queue.size + 1) > @limit + return (first_time + @time_span) - rate_limit_time if (first_time + @time_span) > rate_limit_time + + clean_queue(queue, rate_limit_time - @time_span) + end + + return (last_time + @delay) - rate_limit_time if @delay && (last_time + @delay) > rate_limit_time + + queue.push(rate_limit_time) + false + end + + def clean_queue(queue, rate_limit_time = Time.now) + while queue[0] < rate_limit_time + queue.shift + end + end + end +end -- cgit v1.2.3