--[[ Copyright (C) 2008 optivo GmbH This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; version 2 of the License. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA --]] --- A partition lookup service based on a dictionary / lookup mechanism.
-- --

Hashing functions

-- Use one of the following algorithms to hash the partition value. This allows grouping of multiple partition values into one partition. -- -- --

Set up partitioning for a table

--
    --
  1. If the table you want to partition does not exist create it.
  2. --
  3. Let HSCALE do the rest: -- HSCALE SETUP_TABLE('[table]', 'partition column', '[table]_default', backend, 'hashing function, see above') -- Adjust the values to match your configuration. --
  4. --
-- --

Add partitions

-- HSCALE ADD_PARTITION('table', 'partition name', 'partition value', backend) -- -- TODO: Support DATE() hashing function -- TODO: Block addPartition (and setUpTable) if reloadMode is set to FORCE until it goes back to NORMAL. That should solve all concurrency problems. -- @author $Author$ -- @release $Date$ $Rev$ module("optivo.hscale.dictionaryPartitionLookup", package.seeall) local _module = {} local LOG = require("optivo.common.log4lua.logger").getLogger("optivo.hscale.dictionaryPartitionLookup") local utils = require("optivo.common.utils") local proxyState = require("optivo.hscale.proxyState") -- Information about the current reload mode. -- The mode will be set from "normal" to "force" whenever something is changed (SETUP_TABLE or ADD_PARTITION). -- This way it is guaranteed that all proxies will have the same configuration as soon as it changes. local STATUS_RELOADMODE = "reloadMode" local RELOADMODE_NORMAL = "normal" local RELOADMODE_FORCE = "force" local AUTO_CREATE_PARTITIONS_NOWAIT = "nowait" local AUTO_CREATE_PARTITIONS_ON = "on" local luasql = require("luasql.mysql") local _sqlEnv = assert(luasql.mysql()) local _con = nil local _tablePrefix = nil local _status = {} -- Structure: -- Table name is key, value is {column, defaultTable, defaultBackend, hashFunction} local _tables = nil -- Map with table as key and column name as value local _tableKeyColumns = nil -- Structure: -- Table name is key, value is map with partition value as key and {name, backend} as value. local _partitions = nil -- Configuration connection information. local _configServer = nil local _configTablesCreated = false -- Auth to all backends. local _backendAuth = nil -- Additional seconds to wait if switching to RELOADMODE_FORCE. Default is 2. local _reloadForceWait = nil local _reloadInterval = nil local _lastReloadTime = nil -- This is mostly needed for testing purposes. local __forceCreateConfigTables = false -- If true partitions are created automatically (see HSCALE-39) local _autoCreatePartitions = false -- Internal function that makes sure connection to config server is established. local function connectToConfigServer() if (_con) then -- Test the connection. local res = _con:execute(_configServer.testSql) if (res == nil) then LOG:info("Lost connection to configuration server ".. _configServer.host .. ":" .. _configServer.port .. "/" .. _configServer.db) _con:close() _con = nil elseif (type(res) ~= "number") then -- Close the result cursor. res:close() end end if (_con == nil) then LOG:info("Connecting to configuration server " .. _configServer.host .. ":" .. _configServer.port .. "/" .. _configServer.db) _con = assert(_sqlEnv:connect(_configServer.db, _configServer.user, _configServer.password, _configServer.host, _configServer.port)) _con:setautocommit(false) if (_configServer.initSql) then for _, sql in ipairs(_configServer.initSql) do LOG:info("Executing init SQL: " .. tostring(sql)) local initResult = assert(_con:execute(sql)) if (initResult ~= nil and type(initResult) ~= "number") then assert(initResult:close()) end end end end end local function setStatus(name, value) assert(_con:execute(string.format("REPLACE INTO %sstatus (name, value) VALUES (%q, %q)", _tablePrefix, name, value))) _con:commit() _status[name] = value end -- Internal function that executes an SQL statement on a selected backend. -- @return the result as table or number. local function execSqlOnBackend(sql, backend) LOG:debug("Connecting to backend #" .. tostring(backend)) local con = assert(_sqlEnv:connect(proxyState.getDatabase(), _backendAuth.user, _backendAuth.password, proxyState.getBackendAddress(backend))) con:setautocommit(true) if (_backendAuth.initSql) then for _, initSql in ipairs(_backendAuth.initSql) do LOG:debug("Executing init SQL for backend #" .. tostring(backend) ..": " .. tostring(initSql)) local initResult = assert(con:execute(initSql)) if (initResult ~= nil and type(initResult) ~= "number") then assert(initResult:close()) end end end LOG:info("Executing SQL '" .. tostring(sql) .. "' on backend #" .. tostring(backend)) local sqlResult = assert(con:execute(sql)) local result = nil if (sqlResult ~= nil and type(sqlResult) ~= "number") then result = {} local row = {} while (sqlResult:fetch(row)) do table.insert(result, row) end assert(sqlResult:close()) else result = sqlResult end assert(con:close()) return result end --- Initialize the module using the current configuration.
-- The following configuration parameters are required: -- -- @see optivo.hscale.config function _module.init(config) _tablePrefix = config.getWithDefault("dictionaryTablePrefix", "hscale_dict_") _reloadInterval = config.getWithDefault("reloadInterval", 10) _reloadForceWait = config.getWithDefault("reloadForceWait", 2) _autoCreatePartitions = config.getWithDefault("autoCreatePartitions", nil) assert( _autoCreatePartitions == nil or _autoCreatePartitions == AUTO_CREATE_PARTITIONS_ON or _autoCreatePartitions == AUTO_CREATE_PARTITIONS_NOWAIT, "Invalid value '" .. tostring(_autoCreatePartitions) .. "' for option 'autoCreatePartitions'." ) __forceCreateConfigTables = config.getWithDefault("_forceCreateConfigTables", false) _configServer = config.get("configServer") assert(_configServer.host, "configServer.host not given.") assert(_configServer.db, "configServer.db not given.") assert(_configServer.user, "configServer.user not given.") assert(_configServer.password, "configServer.password not given.") _configServer.port = _configServer.port or 3306 _configServer.testSql = _configServer.testSql or "SELECT 1" _backendAuth = config.get("backendAuth") assert(_backendAuth.user, "backendAuth.user not given.") assert(_backendAuth.password, "backendAuth.password not given.") -- Connect to config database to make sure the configuration is valid. _module.reload(true) end -- Internal function. -- Parse the algorithm string given and return a function converting a value into a partition value. local function parseHashFunction(alg) local fun = nil local _, _, name, arg = string.find(alg, "(%a*)%((.*)%)") assert(name ~= nil and arg ~= nil, "Invalid hashing function '" .. tostring(alg) .. "'") if (string.match(alg, "MOD%(%d*%)")) then assert(arg and tonumber(arg), "Parameter '" .. tostring(arg) .. "' for MOD is not a valid number.") arg = tonumber(arg) assert(arg > 0, "Parameter '" .. tostring(arg) .. "' for MOD must be > 0.") fun = function(v) assert(v == nil or tonumber(v), "Number value expected but value '" .. tostring(v) .. "' cannot be converted to a number.") return tonumber(v or 0) % arg end elseif (string.match(alg, "DIV%(%d*%)")) then assert(arg and tonumber(arg), "Parameter '" .. tostring(arg) .. "' for DIV is not a valid number.") arg = tonumber(arg) assert(arg > 0, "Parameter '" .. tostring(arg) .. "' for DIV must be > 0.") fun = function(v) assert(v == nil or tonumber(v), "Number value expected but value '" .. tostring(v) .. "' cannot be converted to a number.") return math.floor(tonumber(v or 0) / arg) end elseif (string.match(alg, "PREFIX%(.*%)")) then assert(arg and tonumber(arg), "Parameter '" .. tostring(arg) .. "' for PREFIX is not a valid number.") arg = tonumber(arg) fun = function(v) if (v ~= nil and type(v) ~= "string") then v = tostring(v) end return utils.utf8Sub(v or "", 1, arg) end elseif (string.match(alg, "NONE%(%)")) then fun = function(v) return v or "" end else error("Invalid hashing function arguments in '" .. alg .. "'") end return fun end -- Internal function. -- Send CREATE TABLE statements to configuration server. local function createConfigTables() -- Create the configuration tables if they do not already exist. assert(_con:execute( string.format( [[CREATE TABLE IF NOT EXISTS %stables ( tbl_name VARCHAR(64) NOT NULL, col_name VARCHAR(64) NOT NULL, def_tbl_name VARCHAR(64) NOT NULL, def_tbl_backend TINYINT NOT NULL, hash_fun VARCHAR(20) NOT NULL, created TIMESTAMP NOT NULL, PRIMARY KEY (tbl_name, col_name) )]], _tablePrefix) )) assert(_con:execute( string.format( [[CREATE TABLE IF NOT EXISTS %spartitions ( tbl_name VARCHAR(64) NOT NULL, part_name VARCHAR(32) NOT NULL, part_value VARCHAR(64) NOT NULL, part_backend TINYINT NOT NULL, created TIMESTAMP NOT NULL, PRIMARY KEY (tbl_name, part_value) ) DEFAULT CHARACTER SET 'utf8' COLLATE 'utf8_bin']], _tablePrefix) )) assert(_con:execute( string.format( [[CREATE TABLE IF NOT EXISTS %sstatus ( name VARCHAR(30) NOT NULL, value VARCHAR(255) NOT NULL, PRIMARY KEY (name) )]], _tablePrefix) )) -- Write the version number into the status table to make upgrades easier. assert(_con:execute( string.format("INSERT IGNORE INTO %sstatus (name, value) VALUES ('schemaVersion', '1')", _tablePrefix) )) _con:commit() end --- This method is called regularely to reload the configurations (from the database). -- @param force if true then reloading is forced even if reload interval is not reached. function _module.reload(force) if ( _status[STATUS_RELOADMODE] == RELOADMODE_FORCE or force == true or _lastReloadTime == nil or _lastReloadTime + _reloadInterval <= os.time()) then _lastReloadTime = os.time() LOG:debug("Reloading configuration") -- Make sure the connection is established connectToConfigServer() if (not _configTablesCreated or __forceCreateConfigTables or force) then createConfigTables() _configTablesCreated = true end local cur = nil local row = {} local status = {} -- Read out the status cur = assert(_con:execute(string.format("SELECT name, value FROM %sstatus", _tablePrefix))) while (cur:fetch(row)) do if (LOG:isLevel(LOG.DEBUG)) then LOG:debug({"Parsing status row", row}) end status[row[1]] = row[2] end local tables = {} local tableKeyColumns = {} local tablesFound = 0 cur = assert(_con:execute(string.format("SELECT tbl_name, col_name, def_tbl_name, def_tbl_backend, hash_fun FROM %stables", _tablePrefix))) while (cur:fetch(row)) do tablesFound = tablesFound + 1 if (LOG:isLevel(LOG.DEBUG)) then LOG:debug({"Parsing table row", row}) end tables[tostring(row[1])] = { col = tostring(row[2]), defaultTable = tostring(row[3]), defaultBackend = tonumber(row[4]), hashFunction = parseHashFunction(row[5]) } tableKeyColumns[tostring(row[1])] = tostring(row[2]) end cur:close() local partitions = {} local partitionsFound = 0 local currentTable = nil local tab = nil cur = assert(_con:execute(string.format("SELECT tbl_name, part_name, part_value, part_backend FROM %spartitions", _tablePrefix))) while (cur:fetch(row)) do partitionsFound = partitionsFound + 1 if (LOG:isLevel(LOG.DEBUG)) then LOG:debug({"Parsing partition row", row}) end local tableName = tostring(row[1]) if (tab == nil or tableName ~= currentTable) then currentTable = tableName tab = partitions[tableName] if (tab == nil) then tab = {} partitions[tableName] = tab end end tab[tostring(row[3])] = { name = tostring(row[2]), backend = tonumber(row[4]) } end cur:close() -- TODO: Test whether the following is secure in a multi-connection environment. -- Enable new configuration. _status, _tables, _tableKeyColumns, _partitions = status, tables, tableKeyColumns, partitions LOG:info("Configuration reloaded. " .. tablesFound .. " partitioned tables and " .. partitionsFound .. " partitions found.") LOG:info("Current status: " .. utils.convertTableToString(_status, 2)) end end --- Handle the given admin command. -- -- @return resultset containing a server response or false if the tokens are not recognized. -- @trhow error if something is wrong with the statement function _module.handleAdminCommand(tokens) local command = tokens[2].text:upper() local result = false if (command == "SETUP_TABLE") then LOG:debug("SETUP_TABLE recognized.") -- HSCALE SETUP_TABLE('[table]', 'partition column', '[table]_default', backend, 'hashing function, see above') assert( #tokens == 13 and ( tokens[3].token_name == "TK_OBRACE" and tokens[5].token_name == "TK_COMMA" and tokens[7].token_name == "TK_COMMA" and tokens[9].token_name == "TK_COMMA" and tokens[11].token_name == "TK_COMMA" and tokens[13].token_name == "TK_CBRACE" ) and ( tokens[4].token_name == "TK_STRING" and tokens[6].token_name == "TK_STRING" and tokens[8].token_name == "TK_STRING" and (tokens[10].token_name == "TK_INTEGER" or tokens[10].token_name == "TK_STRING") and tokens[12].token_name == "TK_STRING" ), "You have an error in your SETUP_TABLE command." ) _module.setUpTable(tokens[4].text, tokens[6].text, tokens[8].text, tokens[10].text, tokens[12].text) result = true elseif (command == "ADD_PARTITION") then LOG:debug("ADD_PARTITION recognized.") -- HSCALE ADD_PARTITION('table', 'partition name', 'partition value', backend) assert( #tokens == 11 and ( tokens[3].token_name == "TK_OBRACE" and tokens[5].token_name == "TK_COMMA" and tokens[7].token_name == "TK_COMMA" and tokens[9].token_name == "TK_COMMA" and tokens[11].token_name == "TK_CBRACE" ) and ( tokens[4].token_name == "TK_STRING" and tokens[6].token_name == "TK_STRING" and tokens[8].token_name == "TK_STRING" and (tokens[10].token_name == "TK_INTEGER" or tokens[10].token_name == "TK_STRING") ), "You have an error in your ADD_PARTITION command." ) _module.addPartition(tokens[4].text, tokens[6].text, tokens[8].text, tokens[10].text) result = true end return result end --- Set up a table.
-- @param defaultTable must exist on the given backend! -- @param name the name of the table. The table MUST NOT EXIST on all backend. TODO: Check on all backends. -- @param hashFunction the definition of a hashing function. Example: "MOD(5)" -- @throws error if the table is already configured. TODO: Make that a return value. Block if RELOAD_MODE == FORCE. function _module.setUpTable(name, column, defaultTable, defaultBackend, hashFunction) LOG:debug( string.format( "setUpTable(%q, %q, %q, %q, %q)", tostring(name), tostring(column), tostring(defaultTable), tostring(defaultBackend), tostring(hashFunction) ) ) assert(name, "Missing name.") assert(column, "Missing column.") assert(defaultTable, "Missing defaultTable.") assert(defaultBackend, "Missing defaultBackend.") assert(hashFunction, "Missing hashFunction.") -- Make sure the hash function is valid parseHashFunction(hashFunction) -- Make sure the connection is established connectToConfigServer() _module.reload(true) assert(_tables[name] == nil, "Table '" .. name .. "' is already partitioned.") -- Switch to forcing reload upon every query. LOG:info("Switching reload mode to FORCE and waiting " .. (_reloadInterval + _reloadForceWait) .. " seconds.") setStatus(STATUS_RELOADMODE, RELOADMODE_FORCE) -- The following code is written in a try-catch style since we are urged to switch back the reload mode. local success, msg = pcall( function() utils.sleep((_reloadInterval + _reloadForceWait) * 1000) local insertedRows = assert( _con:execute( string.format([[ INSERT INTO %stables (tbl_name, col_name, def_tbl_name, def_tbl_backend, hash_fun, created) VALUES (%q, %q, %q, %q, %q, NOW()) ]], _tablePrefix, name, column, defaultTable, tostring(defaultBackend), hashFunction) ) ) assert(insertedRows == 1, "INSERT statement returned wrong number of rows affected.") -- Rename the table. execSqlOnBackend(string.format("RENAME TABLE `%s` TO `%s`", name, defaultTable), tonumber(defaultBackend)) _con:commit() _module.reload(true) end ) LOG:info("Switching reload mode to NORMAL.") setStatus(STATUS_RELOADMODE, RELOADMODE_NORMAL) assert(success == true, msg) end --- Add a partition. -- TODO: Use transaction to avoid concurrency issues. -- @param immediately (optional) if true then RELOADMODE will not be switched to FORCE and and it will not be waited. -- @throws error if table is not partitioned or partition already exists. TODO: Make that a return value. Block if RELOAD_MODE == FORCE. function _module.addPartition(tableName, partName, partValue, partBackend, immediately) LOG:debug( string.format( "addPartition(%q, %q, %q, %q)", tostring(tableName), tostring(partName), tostring(partValue), tostring(partBackend) ) ) assert(tableName, "Missing tableName.") assert(partName, "Missing partName.") assert(partValue, "Missing partValue.") assert(partBackend, "Missing partBackend.") -- Make sure the connection is established connectToConfigServer() _module.reload(true) local tableDef = _tables[tableName] assert(tableDef, "Table '" .. tableName .. "' is not partitioned.") assert((_partitions[tableName] or {})[partValue] == nil, "Partition for value '" .. partValue .. "' already exists.") -- Switch to forcing reload upon every query. if (immediately ~= true) then LOG:info("Switching reload mode to FORCE and waiting " .. (_reloadInterval + _reloadForceWait) .. " seconds.") setStatus(STATUS_RELOADMODE, RELOADMODE_FORCE) end -- The following code is written in a try-catch style since we are urged to switch back the reload mode. local success, msg = pcall( function() if (immediately ~= true) then utils.sleep((_reloadInterval + _reloadForceWait) * 1000) end -- Create table. local createSql = execSqlOnBackend("SHOW CREATE TABLE `" .. tableDef.defaultTable .. "`", tonumber(tableDef.defaultBackend))[1][2] createSql = string.gsub( createSql, "CREATE TABLE `" .. tableDef.defaultTable .. "`", "/* HSCALE addPartition */ CREATE TABLE IF NOT EXISTS `" .. tableName .. "_" .. partName .. "`" ) assert( string.find(createSql, "HSCALE addPartition"), "Error determining create sql for partition '" .. partName .. "' of table '" .. tableName .. "'. SQL got so far: " .. createSql ) execSqlOnBackend(createSql, tonumber(partBackend)) local sql = string.format([[ INSERT INTO %spartitions (tbl_name, part_name, part_value, part_backend, created) VALUES (%q, %q, %q, %q, NOW()) ]], _tablePrefix, tableName, partName, partValue, tostring(partBackend) ) local insertedRows = assert(_con:execute(sql)) assert(insertedRows == 1, "INSERT statement returned wrong number of rows affected.") _con:commit() _module.reload(true) end ) if (immediately ~= true) then LOG:info("Switching reload mode to NORMAL.") setStatus(STATUS_RELOADMODE, RELOADMODE_NORMAL) end assert(success == true, msg) end --- Query the table / key combinations. function _module.getTableKeyColumns() return _tableKeyColumns end --- Query all available partitions for a given table. -- @return all partition table names and the backend indexes (Example: { {name=tab1, backend=1}, {name=tab2, backend=2} }) function _module.getAllPartitions(tableName) local partitions = _partitions[tableName] or {} local tab = _tables[tableName] assert(tab ~= nil, "Table '" .. tableName .. "' is not partitioned.") local result = {} result[1] = { name = tab.defaultTable, backend = tab.defaultBackend } local unique = {} for partValue, partInfo in pairs(partitions) do if (not unique[partInfo.name]) then table.insert(result, { name = tableName .. "_" .. partInfo.name, backend = partInfo.backend }) unique[partInfo.name] = 1 end end return result end -- Internal function that lookups the partition in the local table. local function lookupPartition(tableName, partitionKey) local tab = _tables[tableName] local partitions = _partitions[tableName] or {} local partHash = tab.hashFunction(partitionKey) local partition = partitions[tostring(partHash)] local resultBackend = nil local resultTable = nil if (partition ~= nil) then resultTable = tableName .. "_" .. partition.name resultBackend = partition.backend end return resultTable, resultBackend end --- Query a partition. -- @return the partition table for the given partition key and the backend index (Example: name, backend = getPartition("test", "a")) function _module.getPartition(tableName, partitionKey) local tab = _tables[tableName] local resultTable, resultBackend = lookupPartition(tableName, partitionKey) if (resultTable == nil and _autoCreatePartitions ~= nil) then -- Create partition on the least used backend (HSCALE-39). -- First reload to see if it is already there. _module.reload(true) resultTable, resultBackend = lookupPartition(tableName, partitionKey) if (resultTable == nil) then -- Search for the least used backend. local partitions = _partitions[tableName] or {} local backends = {} for a = 1, #proxy.backends, 1 do backends[a] = 0 end for _, partition in pairs(partitions) do backends[partition.backend] = (backends[partition.backend] or 0) + 1 end local curBackend = nil local curCount = nil for backend, count in pairs(backends) do if (curCount == nil or count < curCount) then curBackend = backend curCount = count end end local hash = tab.hashFunction(partitionKey) -- Since all HSCALE instances will try to create auto partitions we do not need to wait -- while adding the partition. Thus we call addPartition with "immediately" = true. _module.addPartition( tableName, utils.formatAsHex(hash, "_", "[^a-zA-Z0-9_]"), hash, curBackend or 1, _autoCreatePartitions == AUTO_CREATE_PARTITIONS_NOWAIT ) resultTable, resultBackend = lookupPartition(tableName, partitionKey) end end if (resultTable == nil) then -- If no partition has been found and autoCreatePartitions is disabled then use default table. resultTable = tab.defaultTable resultBackend = tab.defaultBackend end assert(resultTable and resultBackend, "Not even a default partition found for table '" .. tostring(tableName) .. "'.") return resultTable, resultBackend end return _module