--[[
Copyright (C) 2008 optivo GmbH
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; version 2 of the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
--]]
--- A partition lookup service based on a dictionary / lookup mechanism.
--
--
Hashing functions
-- Use one of the following algorithms to hash the partition value. This allows grouping of multiple partition values into one partition.
--
-- NONE() The partition value will be used as provided.
-- DIV(size) The partition value will be calculated as value / size
-- MOD(size) The partition value will be calculated as value % size
-- PREFIX(length) The partition value is the first length characters of the partition value (treated as string).
--
--
-- Set up partitioning for a table
--
-- - If the table you want to partition does not exist create it.
-- - Let HSCALE do the rest:
--
HSCALE SETUP_TABLE('[table]', 'partition column', '[table]_default', backend, 'hashing function, see above')
-- Adjust the values to match your configuration.
--
--
--
-- Add partitions
-- HSCALE ADD_PARTITION('table', 'partition name', 'partition value', backend)
--
-- TODO: Support DATE() hashing function
-- TODO: Block addPartition (and setUpTable) if reloadMode is set to FORCE until it goes back to NORMAL. That should solve all concurrency problems.
-- @author $Author$
-- @release $Date$ $Rev$
module("optivo.hscale.dictionaryPartitionLookup", package.seeall)
local _module = {}
local LOG = require("optivo.common.log4lua.logger").getLogger("optivo.hscale.dictionaryPartitionLookup")
local utils = require("optivo.common.utils")
local proxyState = require("optivo.hscale.proxyState")
-- Information about the current reload mode.
-- The mode will be set from "normal" to "force" whenever something is changed (SETUP_TABLE or ADD_PARTITION).
-- This way it is guaranteed that all proxies will have the same configuration as soon as it changes.
local STATUS_RELOADMODE = "reloadMode"
local RELOADMODE_NORMAL = "normal"
local RELOADMODE_FORCE = "force"
local AUTO_CREATE_PARTITIONS_NOWAIT = "nowait"
local AUTO_CREATE_PARTITIONS_ON = "on"
local luasql = require("luasql.mysql")
local _sqlEnv = assert(luasql.mysql())
local _con = nil
local _tablePrefix = nil
local _status = {}
-- Structure:
-- Table name is key, value is {column, defaultTable, defaultBackend, hashFunction}
local _tables = nil
-- Map with table as key and column name as value
local _tableKeyColumns = nil
-- Structure:
-- Table name is key, value is map with partition value as key and {name, backend} as value.
local _partitions = nil
-- Configuration connection information.
local _configServer = nil
local _configTablesCreated = false
-- Auth to all backends.
local _backendAuth = nil
-- Additional seconds to wait if switching to RELOADMODE_FORCE. Default is 2.
local _reloadForceWait = nil
local _reloadInterval = nil
local _lastReloadTime = nil
-- This is mostly needed for testing purposes.
local __forceCreateConfigTables = false
-- If true partitions are created automatically (see HSCALE-39)
local _autoCreatePartitions = false
-- Internal function that makes sure connection to config server is established.
local function connectToConfigServer()
if (_con) then
-- Test the connection.
local res = _con:execute(_configServer.testSql)
if (res == nil) then
LOG:info("Lost connection to configuration server ".. _configServer.host .. ":" .. _configServer.port .. "/" .. _configServer.db)
_con:close()
_con = nil
elseif (type(res) ~= "number") then
-- Close the result cursor.
res:close()
end
end
if (_con == nil) then
LOG:info("Connecting to configuration server " .. _configServer.host .. ":" .. _configServer.port .. "/" .. _configServer.db)
_con = assert(_sqlEnv:connect(_configServer.db, _configServer.user, _configServer.password, _configServer.host, _configServer.port))
_con:setautocommit(false)
if (_configServer.initSql) then
for _, sql in ipairs(_configServer.initSql) do
LOG:info("Executing init SQL: " .. tostring(sql))
local initResult = assert(_con:execute(sql))
if (initResult ~= nil and type(initResult) ~= "number") then
assert(initResult:close())
end
end
end
end
end
local function setStatus(name, value)
assert(_con:execute(string.format("REPLACE INTO %sstatus (name, value) VALUES (%q, %q)", _tablePrefix, name, value)))
_con:commit()
_status[name] = value
end
-- Internal function that executes an SQL statement on a selected backend.
-- @return the result as table or number.
local function execSqlOnBackend(sql, backend)
LOG:debug("Connecting to backend #" .. tostring(backend))
local con = assert(_sqlEnv:connect(proxyState.getDatabase(), _backendAuth.user, _backendAuth.password, proxyState.getBackendAddress(backend)))
con:setautocommit(true)
if (_backendAuth.initSql) then
for _, initSql in ipairs(_backendAuth.initSql) do
LOG:debug("Executing init SQL for backend #" .. tostring(backend) ..": " .. tostring(initSql))
local initResult = assert(con:execute(initSql))
if (initResult ~= nil and type(initResult) ~= "number") then
assert(initResult:close())
end
end
end
LOG:info("Executing SQL '" .. tostring(sql) .. "' on backend #" .. tostring(backend))
local sqlResult = assert(con:execute(sql))
local result = nil
if (sqlResult ~= nil and type(sqlResult) ~= "number") then
result = {}
local row = {}
while (sqlResult:fetch(row)) do
table.insert(result, row)
end
assert(sqlResult:close())
else
result = sqlResult
end
assert(con:close())
return result
end
--- Initialize the module using the current configuration.
-- The following configuration parameters are required:
--
-- dictionaryTablePrefix (optional) the prefix for all dictionary tables. Default is "hscale_dict_".
-- configServer a table defining the connection to the configuration database. Must contain host, db, user and password.
--
-- @see optivo.hscale.config
function _module.init(config)
_tablePrefix = config.getWithDefault("dictionaryTablePrefix", "hscale_dict_")
_reloadInterval = config.getWithDefault("reloadInterval", 10)
_reloadForceWait = config.getWithDefault("reloadForceWait", 2)
_autoCreatePartitions = config.getWithDefault("autoCreatePartitions", nil)
assert(
_autoCreatePartitions == nil or _autoCreatePartitions == AUTO_CREATE_PARTITIONS_ON or _autoCreatePartitions == AUTO_CREATE_PARTITIONS_NOWAIT,
"Invalid value '" .. tostring(_autoCreatePartitions) .. "' for option 'autoCreatePartitions'."
)
__forceCreateConfigTables = config.getWithDefault("_forceCreateConfigTables", false)
_configServer = config.get("configServer")
assert(_configServer.host, "configServer.host not given.")
assert(_configServer.db, "configServer.db not given.")
assert(_configServer.user, "configServer.user not given.")
assert(_configServer.password, "configServer.password not given.")
_configServer.port = _configServer.port or 3306
_configServer.testSql = _configServer.testSql or "SELECT 1"
_backendAuth = config.get("backendAuth")
assert(_backendAuth.user, "backendAuth.user not given.")
assert(_backendAuth.password, "backendAuth.password not given.")
-- Connect to config database to make sure the configuration is valid.
_module.reload(true)
end
-- Internal function.
-- Parse the algorithm string given and return a function converting a value into a partition value.
local function parseHashFunction(alg)
local fun = nil
local _, _, name, arg = string.find(alg, "(%a*)%((.*)%)")
assert(name ~= nil and arg ~= nil, "Invalid hashing function '" .. tostring(alg) .. "'")
if (string.match(alg, "MOD%(%d*%)")) then
assert(arg and tonumber(arg), "Parameter '" .. tostring(arg) .. "' for MOD is not a valid number.")
arg = tonumber(arg)
assert(arg > 0, "Parameter '" .. tostring(arg) .. "' for MOD must be > 0.")
fun = function(v)
assert(v == nil or tonumber(v), "Number value expected but value '" .. tostring(v) .. "' cannot be converted to a number.")
return tonumber(v or 0) % arg
end
elseif (string.match(alg, "DIV%(%d*%)")) then
assert(arg and tonumber(arg), "Parameter '" .. tostring(arg) .. "' for DIV is not a valid number.")
arg = tonumber(arg)
assert(arg > 0, "Parameter '" .. tostring(arg) .. "' for DIV must be > 0.")
fun = function(v)
assert(v == nil or tonumber(v), "Number value expected but value '" .. tostring(v) .. "' cannot be converted to a number.")
return math.floor(tonumber(v or 0) / arg)
end
elseif (string.match(alg, "PREFIX%(.*%)")) then
assert(arg and tonumber(arg), "Parameter '" .. tostring(arg) .. "' for PREFIX is not a valid number.")
arg = tonumber(arg)
fun = function(v)
if (v ~= nil and type(v) ~= "string") then
v = tostring(v)
end
return utils.utf8Sub(v or "", 1, arg)
end
elseif (string.match(alg, "NONE%(%)")) then
fun = function(v)
return v or ""
end
else
error("Invalid hashing function arguments in '" .. alg .. "'")
end
return fun
end
-- Internal function.
-- Send CREATE TABLE statements to configuration server.
local function createConfigTables()
-- Create the configuration tables if they do not already exist.
assert(_con:execute(
string.format(
[[CREATE TABLE IF NOT EXISTS %stables (
tbl_name VARCHAR(64) NOT NULL,
col_name VARCHAR(64) NOT NULL,
def_tbl_name VARCHAR(64) NOT NULL,
def_tbl_backend TINYINT NOT NULL,
hash_fun VARCHAR(20) NOT NULL,
created TIMESTAMP NOT NULL,
PRIMARY KEY (tbl_name, col_name)
)]],
_tablePrefix)
))
assert(_con:execute(
string.format(
[[CREATE TABLE IF NOT EXISTS %spartitions (
tbl_name VARCHAR(64) NOT NULL,
part_name VARCHAR(32) NOT NULL,
part_value VARCHAR(64) NOT NULL,
part_backend TINYINT NOT NULL,
created TIMESTAMP NOT NULL,
PRIMARY KEY (tbl_name, part_value)
) DEFAULT CHARACTER SET 'utf8' COLLATE 'utf8_bin']],
_tablePrefix)
))
assert(_con:execute(
string.format(
[[CREATE TABLE IF NOT EXISTS %sstatus (
name VARCHAR(30) NOT NULL,
value VARCHAR(255) NOT NULL,
PRIMARY KEY (name)
)]],
_tablePrefix)
))
-- Write the version number into the status table to make upgrades easier.
assert(_con:execute(
string.format("INSERT IGNORE INTO %sstatus (name, value) VALUES ('schemaVersion', '1')", _tablePrefix)
))
_con:commit()
end
--- This method is called regularely to reload the configurations (from the database).
-- @param force if true then reloading is forced even if reload interval is not reached.
function _module.reload(force)
if (
_status[STATUS_RELOADMODE] == RELOADMODE_FORCE
or force == true
or _lastReloadTime == nil
or _lastReloadTime + _reloadInterval <= os.time()) then
_lastReloadTime = os.time()
LOG:debug("Reloading configuration")
-- Make sure the connection is established
connectToConfigServer()
if (not _configTablesCreated or __forceCreateConfigTables or force) then
createConfigTables()
_configTablesCreated = true
end
local cur = nil
local row = {}
local status = {}
-- Read out the status
cur = assert(_con:execute(string.format("SELECT name, value FROM %sstatus", _tablePrefix)))
while (cur:fetch(row)) do
if (LOG:isLevel(LOG.DEBUG)) then
LOG:debug({"Parsing status row", row})
end
status[row[1]] = row[2]
end
local tables = {}
local tableKeyColumns = {}
local tablesFound = 0
cur = assert(_con:execute(string.format("SELECT tbl_name, col_name, def_tbl_name, def_tbl_backend, hash_fun FROM %stables", _tablePrefix)))
while (cur:fetch(row)) do
tablesFound = tablesFound + 1
if (LOG:isLevel(LOG.DEBUG)) then
LOG:debug({"Parsing table row", row})
end
tables[tostring(row[1])] = {
col = tostring(row[2]),
defaultTable = tostring(row[3]),
defaultBackend = tonumber(row[4]),
hashFunction = parseHashFunction(row[5])
}
tableKeyColumns[tostring(row[1])] = tostring(row[2])
end
cur:close()
local partitions = {}
local partitionsFound = 0
local currentTable = nil
local tab = nil
cur = assert(_con:execute(string.format("SELECT tbl_name, part_name, part_value, part_backend FROM %spartitions", _tablePrefix)))
while (cur:fetch(row)) do
partitionsFound = partitionsFound + 1
if (LOG:isLevel(LOG.DEBUG)) then
LOG:debug({"Parsing partition row", row})
end
local tableName = tostring(row[1])
if (tab == nil or tableName ~= currentTable) then
currentTable = tableName
tab = partitions[tableName]
if (tab == nil) then
tab = {}
partitions[tableName] = tab
end
end
tab[tostring(row[3])] = {
name = tostring(row[2]),
backend = tonumber(row[4])
}
end
cur:close()
-- TODO: Test whether the following is secure in a multi-connection environment.
-- Enable new configuration.
_status, _tables, _tableKeyColumns, _partitions = status, tables, tableKeyColumns, partitions
LOG:info("Configuration reloaded. " .. tablesFound .. " partitioned tables and " .. partitionsFound .. " partitions found.")
LOG:info("Current status: " .. utils.convertTableToString(_status, 2))
end
end
--- Handle the given admin command.
--
-- @return resultset containing a server response or false if the tokens are not recognized.
-- @trhow error if something is wrong with the statement
function _module.handleAdminCommand(tokens)
local command = tokens[2].text:upper()
local result = false
if (command == "SETUP_TABLE") then
LOG:debug("SETUP_TABLE recognized.")
-- HSCALE SETUP_TABLE('[table]', 'partition column', '[table]_default', backend, 'hashing function, see above')
assert(
#tokens == 13
and (
tokens[3].token_name == "TK_OBRACE"
and tokens[5].token_name == "TK_COMMA"
and tokens[7].token_name == "TK_COMMA"
and tokens[9].token_name == "TK_COMMA"
and tokens[11].token_name == "TK_COMMA"
and tokens[13].token_name == "TK_CBRACE"
)
and (
tokens[4].token_name == "TK_STRING"
and tokens[6].token_name == "TK_STRING"
and tokens[8].token_name == "TK_STRING"
and (tokens[10].token_name == "TK_INTEGER" or tokens[10].token_name == "TK_STRING")
and tokens[12].token_name == "TK_STRING"
),
"You have an error in your SETUP_TABLE command."
)
_module.setUpTable(tokens[4].text, tokens[6].text, tokens[8].text, tokens[10].text, tokens[12].text)
result = true
elseif (command == "ADD_PARTITION") then
LOG:debug("ADD_PARTITION recognized.")
-- HSCALE ADD_PARTITION('table', 'partition name', 'partition value', backend)
assert(
#tokens == 11
and (
tokens[3].token_name == "TK_OBRACE"
and tokens[5].token_name == "TK_COMMA"
and tokens[7].token_name == "TK_COMMA"
and tokens[9].token_name == "TK_COMMA"
and tokens[11].token_name == "TK_CBRACE"
)
and (
tokens[4].token_name == "TK_STRING"
and tokens[6].token_name == "TK_STRING"
and tokens[8].token_name == "TK_STRING"
and (tokens[10].token_name == "TK_INTEGER" or tokens[10].token_name == "TK_STRING")
),
"You have an error in your ADD_PARTITION command."
)
_module.addPartition(tokens[4].text, tokens[6].text, tokens[8].text, tokens[10].text)
result = true
end
return result
end
--- Set up a table.
-- @param defaultTable must exist on the given backend!
-- @param name the name of the table. The table MUST NOT EXIST on all backend. TODO: Check on all backends.
-- @param hashFunction the definition of a hashing function. Example: "MOD(5)"
-- @throws error if the table is already configured. TODO: Make that a return value. Block if RELOAD_MODE == FORCE.
function _module.setUpTable(name, column, defaultTable, defaultBackend, hashFunction)
LOG:debug(
string.format(
"setUpTable(%q, %q, %q, %q, %q)",
tostring(name), tostring(column), tostring(defaultTable), tostring(defaultBackend), tostring(hashFunction)
)
)
assert(name, "Missing name.")
assert(column, "Missing column.")
assert(defaultTable, "Missing defaultTable.")
assert(defaultBackend, "Missing defaultBackend.")
assert(hashFunction, "Missing hashFunction.")
-- Make sure the hash function is valid
parseHashFunction(hashFunction)
-- Make sure the connection is established
connectToConfigServer()
_module.reload(true)
assert(_tables[name] == nil, "Table '" .. name .. "' is already partitioned.")
-- Switch to forcing reload upon every query.
LOG:info("Switching reload mode to FORCE and waiting " .. (_reloadInterval + _reloadForceWait) .. " seconds.")
setStatus(STATUS_RELOADMODE, RELOADMODE_FORCE)
-- The following code is written in a try-catch style since we are urged to switch back the reload mode.
local success, msg = pcall(
function()
utils.sleep((_reloadInterval + _reloadForceWait) * 1000)
local insertedRows = assert(
_con:execute(
string.format([[
INSERT INTO %stables
(tbl_name, col_name, def_tbl_name, def_tbl_backend, hash_fun, created)
VALUES
(%q, %q, %q, %q, %q, NOW())
]],
_tablePrefix, name, column, defaultTable, tostring(defaultBackend), hashFunction)
)
)
assert(insertedRows == 1, "INSERT statement returned wrong number of rows affected.")
-- Rename the table.
execSqlOnBackend(string.format("RENAME TABLE `%s` TO `%s`", name, defaultTable), tonumber(defaultBackend))
_con:commit()
_module.reload(true)
end
)
LOG:info("Switching reload mode to NORMAL.")
setStatus(STATUS_RELOADMODE, RELOADMODE_NORMAL)
assert(success == true, msg)
end
--- Add a partition.
-- TODO: Use transaction to avoid concurrency issues.
-- @param immediately (optional) if true then RELOADMODE will not be switched to FORCE and and it will not be waited.
-- @throws error if table is not partitioned or partition already exists. TODO: Make that a return value. Block if RELOAD_MODE == FORCE.
function _module.addPartition(tableName, partName, partValue, partBackend, immediately)
LOG:debug(
string.format(
"addPartition(%q, %q, %q, %q)",
tostring(tableName), tostring(partName), tostring(partValue), tostring(partBackend)
)
)
assert(tableName, "Missing tableName.")
assert(partName, "Missing partName.")
assert(partValue, "Missing partValue.")
assert(partBackend, "Missing partBackend.")
-- Make sure the connection is established
connectToConfigServer()
_module.reload(true)
local tableDef = _tables[tableName]
assert(tableDef, "Table '" .. tableName .. "' is not partitioned.")
assert((_partitions[tableName] or {})[partValue] == nil, "Partition for value '" .. partValue .. "' already exists.")
-- Switch to forcing reload upon every query.
if (immediately ~= true) then
LOG:info("Switching reload mode to FORCE and waiting " .. (_reloadInterval + _reloadForceWait) .. " seconds.")
setStatus(STATUS_RELOADMODE, RELOADMODE_FORCE)
end
-- The following code is written in a try-catch style since we are urged to switch back the reload mode.
local success, msg = pcall(
function()
if (immediately ~= true) then
utils.sleep((_reloadInterval + _reloadForceWait) * 1000)
end
-- Create table.
local createSql = execSqlOnBackend("SHOW CREATE TABLE `" .. tableDef.defaultTable .. "`", tonumber(tableDef.defaultBackend))[1][2]
createSql = string.gsub(
createSql,
"CREATE TABLE `" .. tableDef.defaultTable .. "`",
"/* HSCALE addPartition */ CREATE TABLE IF NOT EXISTS `" .. tableName .. "_" .. partName .. "`"
)
assert(
string.find(createSql, "HSCALE addPartition"),
"Error determining create sql for partition '" .. partName .. "' of table '" .. tableName .. "'. SQL got so far: " .. createSql
)
execSqlOnBackend(createSql, tonumber(partBackend))
local sql = string.format([[
INSERT INTO %spartitions
(tbl_name, part_name, part_value, part_backend, created)
VALUES
(%q, %q, %q, %q, NOW())
]],
_tablePrefix, tableName, partName, partValue, tostring(partBackend)
)
local insertedRows = assert(_con:execute(sql))
assert(insertedRows == 1, "INSERT statement returned wrong number of rows affected.")
_con:commit()
_module.reload(true)
end
)
if (immediately ~= true) then
LOG:info("Switching reload mode to NORMAL.")
setStatus(STATUS_RELOADMODE, RELOADMODE_NORMAL)
end
assert(success == true, msg)
end
--- Query the table / key combinations.
function _module.getTableKeyColumns()
return _tableKeyColumns
end
--- Query all available partitions for a given table.
-- @return all partition table names and the backend indexes (Example: { {name=tab1, backend=1}, {name=tab2, backend=2} })
function _module.getAllPartitions(tableName)
local partitions = _partitions[tableName] or {}
local tab = _tables[tableName]
assert(tab ~= nil, "Table '" .. tableName .. "' is not partitioned.")
local result = {}
result[1] = {
name = tab.defaultTable,
backend = tab.defaultBackend
}
local unique = {}
for partValue, partInfo in pairs(partitions) do
if (not unique[partInfo.name]) then
table.insert(result, {
name = tableName .. "_" .. partInfo.name,
backend = partInfo.backend
})
unique[partInfo.name] = 1
end
end
return result
end
-- Internal function that lookups the partition in the local table.
local function lookupPartition(tableName, partitionKey)
local tab = _tables[tableName]
local partitions = _partitions[tableName] or {}
local partHash = tab.hashFunction(partitionKey)
local partition = partitions[tostring(partHash)]
local resultBackend = nil
local resultTable = nil
if (partition ~= nil) then
resultTable = tableName .. "_" .. partition.name
resultBackend = partition.backend
end
return resultTable, resultBackend
end
--- Query a partition.
-- @return the partition table for the given partition key and the backend index (Example: name, backend = getPartition("test", "a"))
function _module.getPartition(tableName, partitionKey)
local tab = _tables[tableName]
local resultTable, resultBackend = lookupPartition(tableName, partitionKey)
if (resultTable == nil and _autoCreatePartitions ~= nil) then
-- Create partition on the least used backend (HSCALE-39).
-- First reload to see if it is already there.
_module.reload(true)
resultTable, resultBackend = lookupPartition(tableName, partitionKey)
if (resultTable == nil) then
-- Search for the least used backend.
local partitions = _partitions[tableName] or {}
local backends = {}
for a = 1, #proxy.backends, 1 do
backends[a] = 0
end
for _, partition in pairs(partitions) do
backends[partition.backend] = (backends[partition.backend] or 0) + 1
end
local curBackend = nil
local curCount = nil
for backend, count in pairs(backends) do
if (curCount == nil or count < curCount) then
curBackend = backend
curCount = count
end
end
local hash = tab.hashFunction(partitionKey)
-- Since all HSCALE instances will try to create auto partitions we do not need to wait
-- while adding the partition. Thus we call addPartition with "immediately" = true.
_module.addPartition(
tableName, utils.formatAsHex(hash, "_", "[^a-zA-Z0-9_]"), hash, curBackend or 1, _autoCreatePartitions == AUTO_CREATE_PARTITIONS_NOWAIT
)
resultTable, resultBackend = lookupPartition(tableName, partitionKey)
end
end
if (resultTable == nil) then
-- If no partition has been found and autoCreatePartitions is disabled then use default table.
resultTable = tab.defaultTable
resultBackend = tab.defaultBackend
end
assert(resultTable and resultBackend, "Not even a default partition found for table '" .. tostring(tableName) .. "'.")
return resultTable, resultBackend
end
return _module