- 01
- 02
- 03
- 04
- 05
- 06
- 07
- 08
- 09
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 70
- 71
- 72
- 73
- 74
- 75
- 76
- 77
- 78
- 79
- 80
- 81
- 82
- 83
- 84
- 85
- 86
- 87
- 88
// Private method of server, which dispatches active incoming connection.
// Function receives address string and uses it as key to retrieve cached connection.
// Fetched connection is getting read by bufio.Reader, parsed to header and data string if it's size was pointed in header.
// Next, the parsed data handles by protocol and writes a response message.
// The process turns in loop until whether input stream will get an EOF or an error will be occurred.
// In the last case it will be return some error message to a client.
// Anyway, at the end connection will be broken up.
func (server *Server) dispatch(address string) {
defer server.free_chan()
if server.Stat.Connections[address] != nil {
server.Stat.Connections[address].State = "conn_new_cmd"
}
connection := server.connections[address]
connectionReader := bufio.NewReader(connection)
// let's loop the process for open connection, until it will get closed.
for {
// let's read a header first
if server.Stat.Connections[address] != nil {
server.Stat.Connections[address].State = "conn_read"
}
received_message, n, err := readRequest(connectionReader, -1)
if err != nil {
if server.Stat.Connections[address] != nil {
server.Stat.Connections[address].State = "conn_swallow"
}
if err == io.EOF {
server.Logger.Info("Input stream has got EOF, and now is being closed.")
server.breakConnection(connection)
break
}
server.Logger.Warning("Dispatching error: ", err, " Message: ", received_message)
if !server.makeResponse(connection, []byte("ERROR\r\n"), 5){
break
}
} else {
if server.Stat.Connections[address] != nil {
server.Stat.Connections[address].Cmd_hit_ts = time.Now().Unix()
}
// Here the message should be handled
server.Stat.Read_bytes += uint64(n)
parsed_request := protocol.ParseProtocolHeader(string(received_message[ : n - 2]))
server.Logger.Info("Header: ", *parsed_request)
if (parsed_request.Command() == "cas" || parsed_request.Command() == "gets") && server.cas_disabled ||
parsed_request.Command() == "flush_all" && server.flush_disabled{
err_msg := parsed_request.Command() + " command is forbidden."
server.Logger.Warning(err_msg)
if server.Stat.Connections[address] != nil {
server.Stat.Connections[address].State = "conn_write"
}
err_msg = strings.Replace(protocol.CLIENT_ERROR_TEMP, "%s", err_msg, 1)
server.makeResponse(connection, []byte(err_msg), len(err_msg))
continue
}
if parsed_request.DataLen() > 0 {
if server.Stat.Connections[address] != nil {
server.Stat.Connections[address].State = "conn_nread"
}
received_message, _, err := readRequest(connectionReader, parsed_request.DataLen())
if err != nil {
server.Logger.Error("Error occurred while reading data:", err)
server.breakConnection(connection)
break
}
parsed_request.SetData(received_message[0 : ])
}
server.Logger.Info("Start handling request:", *parsed_request)
response_message, err := parsed_request.HandleRequest(server.storage, server.Stat)
server.Logger.Info("Server is sending response:\n", string(response_message[0 : len(response_message)]))
// if there is no flag "noreply" in the header:
if parsed_request.Reply() {
if server.Stat.Connections[address] != nil {
server.Stat.Connections[address].State = "conn_write"
}
server.makeResponse(connection, response_message, len(response_message))
}
if err != nil {
server.Logger.Error("Impossible to send response:", err)
server.breakConnection(connection)
break
}
}
if server.Stat.Connections[address] != nil {
server.Stat.Connections[address].State = "conn_waiting"
}
}
}