Class: Fluent::GroongaInput::BaseInput

Inherits:
Object
  • Object
show all
Includes:
Configurable, DetachMultiProcessMixin
Defined in:
lib/fluent/plugin/in_groonga.rb,
lib/fluent/plugin/in_groonga.rb

Direct Known Subclasses

GQTPInput, HTTPInput

Constant Summary

DEFAULT_EMIT_COMMANDS =
[
  /\Atable_/,
  /\Acolumn_/,
  "delete",
  /\Aplugin_/,
  "register",
  "truncate",
  "load",
]

Instance Method Summary (collapse)

Instance Method Details

- (Object) configure(conf)



109
110
111
112
113
114
# File 'lib/fluent/plugin/in_groonga.rb', line 109

def configure(conf)
  super

  @port ||= default_port
  @real_port ||= default_port
end

- (Object) create_repeater(client)



141
142
143
144
145
# File 'lib/fluent/plugin/in_groonga.rb', line 141

def create_repeater(client)
  repeater = Repeater.connect(@real_host, @real_port, client)
  repeater.attach(@loop)
  repeater
end

- (Object) emit(command, params)



147
148
149
150
# File 'lib/fluent/plugin/in_groonga.rb', line 147

def emit(command, params)
  return unless emit_command?(command)
  Engine.emit("groonga.command.#{command}", Engine.now, params)
end

- (Object) shutdown



134
135
136
137
138
139
# File 'lib/fluent/plugin/in_groonga.rb', line 134

def shutdown
  @loop.stop
  @socket.close
  @shutdown_notifier.signal
  @thread.join
end

- (Object) start



116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
# File 'lib/fluent/plugin/in_groonga.rb', line 116

def start
  listen_socket = TCPServer.new(@bind, @port)
  detach_multi_process do
    @loop = Coolio::Loop.new

    @socket = Coolio::TCPServer.new(listen_socket, nil,
                                    handler_class, self)
    @loop.attach(@socket)

    @shutdown_notifier = Coolio::AsyncWatcher.new
    @loop.attach(@shutdown_notifier)

    @thread = Thread.new do
      run
    end
  end
end