Краткое введение в систему отправки и обработки непрерывных SQL-потоков (часть 2: MySQL)

Tags: SQL, MySQL

Применение SocketPro в различных базах данных для непрерывной пакетной обработки запросов / результатов и обработки потока в реальном времени с двунаправленной асинхронной передачей данных

Введение

Большинство систем баз данных клиентского сервера поддерживают только синхронную связь между клиентской базой и бэкэнд-базой данных с использованием блокирующего сокета и некоторого “болтливого” протокола, который требует, чтобы клиент или сервер дождались подтверждения, прежде чем отправлять новый фрагмент данных. Время ожидания, которое также называется задержкой, может начинаться с нескольких десятых миллисекунд для локальной сети (LAN) до сотен миллисекунд для глобальной сети (WAN). Большое время ожидания может значительно ухудшить качество приложения.

К счастью, UDAParts разработала мощную и безопасную инфраструктуру связи под названием SocketPro, которая написана с непрерывной встроенной пакетной обработкой  запросов / результатов и возможностями обработки потока в реальном времени с использованием асинхронной передачи данных и параллельного вычисления для лучшей эффективности сети, простоты разработки, производительности, масштабируемости и множества замечательных и даже уникальных функций на сайте (https://github.com/udaparts/socketpro).

Кроме того, UDAParts применил мощную структуру SocketPro к популярным открытым исходным базам данных, таким как SQLite и MySQL, а также другим пользователям с помощью драйверов ODBC для поддержки непрерывной отправки и обработки SQL-потока. В конце концов, эти предварительно скомпилированные компоненты и открытые исходные коды для баз данных полностью и навсегда свободны для общего использования.

Для уменьшения сложности обучения я рекомендую сначала изучить образец SQL-потока для SQLite (часть 1: SQLite), прежде чем приступать к  этим примеры проектов MySQL, поскольку образцы SQLite и MySQL используют одни и те же функции API-интерфейса клиента.

MySQL в настоящее время является самой популярной системой управления распределенными базами данных с открытым исходным кодом на основе клиент-сервер. Изучив исходный код MySQL, UDAParts применил технологию  SocketPro SQL-stream для MySQL и разработала серверный плагин. UDAParts сравнивает технологию SQL-потоков с MySQL Connector/Net. Исследование эффективности показывает, что технология SQL-потоков может быть в тысячу раз быстрее, чем MySQL Connector/Net.

Исходные коды и образцы

Все связанные исходные коды и образцы расположены по адресу https://github.com/udaparts/socketpro. После копирования его на ваш компьютер с помощью GIT обратите внимание на подкаталог mysql внутри каталога socketpro / stream_sql. Вы можете заметить, что эти образцы создаются из сред разработки .NET, C/C ++, Java и Python. Тем не менее, в этой статье для объяснений мы используем код C # (socketpro/stream_sql/mysql/test_csahrp) для использовании на стороне клиента и кода C ++ (socketpro/stream_sql/mysql/smysql) для разработки на стороне сервера.

В дополнение к приведенным выше примерам вы можете найти образцы исследования производительности с использованием базы данных образцов MySQL sakila в каталоге

socketpro/stream_sql/mysql/DBPerf. Подкаталог содержит три проекта по изучению эффективности, cppperf, netperf и mysqlperf, которые написаны с потоковой передачей C ++/SocketPro SQL, потоковыми потоками .NET/SocketPro SQL и технологиями провайдера ADO.NET соответственно.

Кроме того, плагин сервера SocketPro MySQL поддерживает события обновления таблицы данных (DELETE, INSERT и UPDATE) через триггеры. Вы можете использовать эту функцию, чтобы подтолкнуть события обновления выбранных таблиц на клиента. Пример проекта находится в каталоге socketpro/stream_sql/mysql/test_cache.

Перед запуском этих образцов приложений вы должны распространить системные библиотеки внутри каталога socketpro/bin в системном каталоге.

Что касается инфраструктуры связи SocketPro, вы также можете обратиться к документации по ее разработке в руководстве socketpro/doc/SocketPro development guide.pdf.

Активация подготовленных запросов MySQL в серверном подключаемом модуле

Хотя модуль сервера MySQL поддерживает общие SQL-запросы, он вообще не поддерживает инструкции подготовки. После копания в исходном коде MySQL, UDAParts выяснил, как включить инструкции MySQL в серверный плагин.  Чтобы добавить поддержку инструкций подготовки в серверный плагин MySQL, необходимо изменить четыре файла реализации: protocol_callback.cc, protocol_callback.h, protocol_classic.cc и sql_prepare.cc перед компиляцией приложения mysqld MySQL. Следует отметить, что это временное решение! Однажды, когда подключаемый модуль MySQL поддержит подготовку запроса, UDAParts, как и ожидалось, вместо этого будет использовать встроенную версию MySQL. UDAParts имеет предварительно скомпилированное приложение MySQL-сервера mysqld, расположенное в каталоге socketpro/stream_sql/mysql/(mysql-5.7.18|mysql-5.7.19|mysql-5.7.20), если вы не знакомы с C/C ++ и хотите пропустить следующие абзацы непосредственно в абзаце, начиная с Restart mysqld :.

protocol_callback.cc и protocol_callback.h: В самом начале добавьте объявление предварённого класса (класс Item_param;) в заголовок файла protocol_callback.h. Далее необходимо, чтобы класс Protocol_callback был добавлен двумя общедоступными методами (send_out_parameters и init) и одним защищенным членом m_thd. Эти члены уже реализованы в двух файлах в каталоге socketpro / stream_sql / mysql / mysql-5.7.18.

protocol_classic.cc: Метод Protocol_classic :: flush должен быть изменен, как показано в приведенном ниже фрагменте кода 1, потому что член vio может быть пустым для подключаемого модуля на стороне сервера MySQL.

bool Protocol_classic::flush()

{

#ifndef EMBEDDED_LIBRARY

 bool error = 0;

 m_thd->get_stmt_da()->set_overwrite_status(true);

 if (m_thd->net.vio)

error= net_flush(&m_thd->net);

 m_thd->get_stmt_da()->set_overwrite_status(false);

 return error;

#else

 return 0;

#endif

}

Фрагмент кода 1: Реализация метода Protocol_classic :: flush для включения команд MySQL в серверный плагин

sql_prepare.cc: В самом начале добавьте один include для обращения к файлу protocol_callback.h. Затем найдите метод mysqld_stmt_execute и используйте приведенный ниже код, как показано на следующем фрагменте кода 2, чтобы заменить этот вызов thd-> set_protocol (& thd-> protocol_binary) ;.

if (thd->protocol_binary.get_vio())

 thd->set_protocol(&thd->protocol_binary);

else {

 ((Protocol_callback*)save_protocol)->init(thd);

}

Фрагмент кода 2: Модификация метода mysqld_stmt_execute в файле sql_prepare.cc

Затем найдите метод Prepared_statement :: execute, перейдите к его концу и найдите инструкцию if (if (error == 0 && this-> lex-> sql_command == SQLCOM_CALL)). Используйте фрагмент кода, как показано в приведенном ниже фрагменте кода 3, чтобы заменить весь его внутренний код.

if (is_sql_prepare())

thd->protocol_text.send_out_parameters(&this->lex->param_list);

else if (thd->active_vio)

thd->get_protocol_classic()->send_out_parameters(&this->lex->param_list);

else {

Protocol_callback *pc = (Protocol_callback*)thd->get_protocol();

pc->send_out_parameters(&this->lex->param_list);

}

Фрагмент кода 3: Модификация метода Prepared_statement :: выполнить в файле sql_prepare.cc

Вы можете увидеть приведенный выше код, чтобы проверить, доступно ли активное vio во время выполнения. Если оно недоступно для подготовленных операторов подключаемого модуля, мы используем вместо него протокол обратного вызова.

Перезагрузите mysqld: перед запуском недавно скомпилированного приложения mysqld в вашей системе вы обязательно должны установить plugin_dir в каталог, содержащий библиотеки подключаемых модулей MySQL (например, plugin_dir =/usr/lib/mysql/plugin) в разделе mysqld конфигурационного файла MySQL. Кроме того, лучше увеличить thread_stack до 512K (thread_stack = 512K), изменив файл конфигурации MySQL. Затем скопируйте MySQL SQL-поток плагин libsmysql.so (smysql.dll на платформах Windows) в каталог плагина MySQL. После сброса всех этих параметров конфигурации и замены mysqld на новый, перезапустите службу MySQL или Daemon.

Зарегистрируйте плагин SocketPro MySQL для потоковой передачи SQL и его базу данных конфигурации

Как описано на этом сайте, зарегистрируйте плагин SQL-потока MySQL, вызывая оператор INSTALL PLUGIN UDAParts_SQL_Streaming SONAME 'libsmysql.so' из приложения mysql. В случае успеха вы увидите новую базу данных sp_streaming_db, созданную, как показано на рисунке ниже.





Рисунок 1: Потоковая база данных SocketPro SQL конфигурации sp_streaming_db и таблица config

В базе данных конфигурации есть три простые таблицы: config, service и permission,  как показано на рисунке выше. Предполагается, что плагин SocketPro MySQL SQL-streaming поддерживает промышленный стандарт безопасности SSL3 / TLSv1.x для защиты связи между клиентом и сервером. По умолчанию клиент SocketPro может использовать либо IP v4, либо v6 для доступа к базе данных MySQL по номеру порта 20902. Обратите внимание на запись cached_tables. Если вы правильно установили его значение, все подключенные клиенты SocketPro могут видеть изменения данных в этих таблицах (например, table actor, country, category и language в базе данных sakila) в реальном времени. Ссылаясь на образец test_cache в каталоге socketpro/stream_sql/mysql, вы можете использовать функцию кэширования в реальном времени, чтобы улучшить производительность и масштабируемость среднего уровня за счет сокращения перемещения данных между средним уровнем и базой данных.

Один сервер SocketPro способен одновременно поддерживать множество сервисов с использованием одного TCP-порта. Если хотите, вы можете включить websocket из SocketPro MySQL SQL-потокового плагина, установив значение «1» для протокола enable_http_websocket. Кроме того, вы можете также внедрить другие службы, правильно установив значение служб протокола, как показано на рисунке выше. После изменения любого одного или нескольких значений в конфигурации таблицы вам следует перезапустить MySQL. В противном случае изменения будут работать некорректно.

Что касается разрешения таблицы, технология SocketPro MySQL SQL-streaming использует свои протоколы для аутентификации клиентов для встроенных служб, как показано на следующем рисунке 2. Потоковый модуль SQL использует две таблицы mysql.user и sp_streaming_db.permission для аутентификации всех клиентов для всех служб. Однако его служба потоковой передачи SQL не использует протоколы в таблице sp_streaming_db.permission для аутентификации.



Рисунок 2: Три пользователя (root, user_one и user_two) разрешены для асинхронной службы очереди сообщений SocketPro (ID службы = 257)

В большинстве случаев вам не нужно обращаться к службе таблицы.

Основная функция

SocketPro записывается снизу для поддержки параллельных вычислений с использованием одного или нескольких пулов неблокирующих сокетов. Каждый из пулов может быть выполнен из одного или нескольких потоков, и каждый из потоков содержит один или несколько неблокирующих сокетов на стороне клиента. Однако мы просто используем один пул для четкой демонстрации с этим образцом на стороне клиента, как показано в приведенном ниже фрагменте кода 4.

static void Main(string[] args)

{

   Console.WriteLine("Remote host: ");

   string host = Console.ReadLine();

   CConnectionContext cc = new CConnectionContext(host, 20902, "root", "Smash123");

   using (CSocketPool<cmysql> spMysql = new CSocketPool<cmysql>()) {

       //start one socket pool having 1 worker thread hosting 1 non-block socket

       if (!spMysql.StartSocketPool(cc, 1, 1)) {

           Console.WriteLine("Failed in connecting to remote async mysql server");

           Console.WriteLine("Press any key to close the application ......");

           Console.Read();

           return;

       }

       CMysql mysql = spMysql.Seek(); //get an async handler

 

       //start to stream all types of requests including SQL statements

       bool ok = mysql.Open("", dr); //open a default MySQL database

 

       //create a container to receive all queries data

       List<KeyValuePair<CDBColumnInfoArray, CDBVariantArray>> ra =

                      new List<KeyValuePair<CDBColumnInfoArray, CDBVariantArray>>();

 

       CMysql.DRows r = (handler, rowData) => {

           //rowset data come here

           int last = ra.Count - 1;

           KeyValuePair<CDBColumnInfoArray, CDBVariantArray> item = ra[last];

           item.Value.AddRange(rowData); //populate record data into receiving container ra

       };

 

       CMysql.DRowsetHeader rh = (handler) => {

           //rowset header comes here

           KeyValuePair<CDBColumnInfoArray, CDBVariantArray> item =  

                      new KeyValuePair<CDBColumnInfoArray, CDBVariantArray>

                                   (handler.ColumnInfo, new CDBVariantArray());

           ra.Add(item); //populate query column meta into receiving container ra

       };

           

       TestCreateTables(mysql); //there are 2 DDL requests inside the call

       ok = mysql.Execute("delete from employee;delete from company", er);

       TestPreparedStatements(mysql); //there are 2 requests (1 prepare +

                                      //1 parameterized statements) inside the call

       InsertBLOBByPreparedStatement(mysql); //there are 2 requests (1 prepare +

                                             //1 parameterized statements) inside the call

       ok = mysql.Execute("SELECT * from company;select * from employee;select curtime()", er, r, rh);

       CDBVariantArray vPData = new CDBVariantArray();

       //first set

       vPData.Add(1); //input

       vPData.Add(1.4); //inputoutput

       vPData.Add(0); //output

 

       //second set

       vPData.Add(2); //input

       vPData.Add(2.5); //inputoutput

       vPData.Add(0); //output

 

       //Test MySQL stored procedure

       TestStoredProcedure(mysql, ra, vPData); //there are 2 requests (1 prepare +

                                               //1 parameterized statements) inside the call

 

       //end streaming all types of requests

 

       ok = mysql.WaitAll(); //make sure all streamed requests are processed and returned

 

       Console.WriteLine();

       Console.WriteLine("There are {0} output data returned", mysql.Outputs * 2);

 

       int index = 0;

       Console.WriteLine();

       Console.WriteLine("+++++ Start rowsets +++");

       foreach (KeyValuePair<CDBColumnInfoArray, CDBVariantArray> it in ra) {

           Console.Write("Statement index = {0}", index);

           if (it.Key.Count > 0)

               Console.WriteLine(", rowset with columns = {0}, records = {1}.",

                                 it.Key.Count, it.Value.Count / it.Key.Count);

           else

               Console.WriteLine(", no rowset received.");

           ++index;

       }

       Console.WriteLine("+++++ End rowsets +++");

       Console.WriteLine();

       Console.WriteLine("Press any key to close the application ......");

       Console.Read();

   }

}

Фрагмент кода 4: Основная функция для демонстрации системы SQL-потока SocketPro MySQL на стороне клиента

Запуск одного пула сокетов: приведенный выше фрагмент кода 4 запускает один пул сокетов, который имеет только один рабочий поток, содержащий только один неблокирующий сокет для понятной демонстрации, используя один экземпляр контекста соединения. Однако, следует отметить, что при необходимости можно создать несколько пулов в одном клиентском приложении. Впоследствии мы получаем один асинхронный обработчик MySQL.

Открытие базы данных: мы можем отправить запрос на открытие базы данных сервера MySQL. Если первый вход представляет собой пустую или нулевую строку, как показано в этом примере, мы, в качестве варианта, можем открыть одну базу данных по умолчанию для подключенного пользователя. Если вы хотите открыть указанную базу данных, вы можете просто указать непустую допустимую строку имени базы данных. Кроме того, вам нужно установить обратный вызов или Лямбда-выражение для отслеживания возвращаемого сообщения об ошибке со стороны сервера. Отмечается, что SocketPro поддерживает только асинхронную передачу данных между клиентом и сервером, чтобы запрос мог быть введен с одним или несколькими обратными вызовами для обработки возвращаемых данных. Это полностью расходится с синхронной передачей данных. Кроме того, мы создаем экземпляр контейнера ra, который используется для приема всех наборов записей в будущих запросах.

Потоковые операторы SQL: имейте в виду, что SocketPro поддерживает потоковое воспроизведение всех типов любого количества запросов на одном сеансе неблокирующего сокета без особых усилий по дизайну. Конечно, мы можем легко передавать все операторы SQL, как и другие.  Все службы SocketPro SQL-stream поддерживают эту особенность для лучшей эффективности сети, что значительно улучшает производительность доступа к данным. Насколько нам известно, вы не можете найти такую замечательную особенность из других технологий. Если вы его нашли, сообщите нам. Как и для обычных API-интерфейсов баз данных, технология SocketPro SQL-stream также поддерживает ручную транзакцию, как показано в предыдущей статье.

Ожидание полной обработки: поскольку SocketPro использует асинхронную передачу данных по умолчанию, он должен предоставить способ ожидания, пока все запросы и возвращаемые результаты будут отправлены, возвращены и обработаны. SocketPro предлагает один уникальный метод WaitAll на стороне клиента для этой цели. Если вам нравится, вы можете использовать этот метод для преобразования всех асинхронных запросов в синхронные.

TestCreateTables, TestPreparedStatements и InsertBLOBByPreparedStatement

Вышеупомянутый фрагмент кода 4 имеет три вызова функций: TestCreateTables, TestPreparedStatements и InsertBLOBByPreparedStatement, но мы не хотим повторно объяснять их снова, потому что они действительно те же, что и в предыдущей статье. Давайте сосредоточимся на выполнении хранимых процедур MySQL с параметрами ввода-вывода и вывода.

TestStoredProcedure

MySQL полностью поддерживает хранимые процедуры. Технология SocketPro SQL-stream тоже делает это. Кроме того, она поддерживает выполнение множества наборов хранимых процедур MySQL с параметрами ввода-вывода и вывода в одном вызове, как показано в приведенном выше фрагменте кода 4. В приведенном ниже фрагменте кода 5 показано, как вызвать хранимую процедуру MySQL, которая может иметь  параметров входа, входа-выхода и выхода и и возвращать несколько наборов записей.

static void TestStoredProcedure(CMysql mysql, List<KeyValuePair<CDBColumnInfoArray,

                               CDBVariantArray>> ra, CDBVariantArray vPData) {

   bool ok = mysql.Prepare("call sp_TestProc(?,?,?)", dr);

   CMysql.DRows r = (handler, rowData) => {

       //rowset data come here if available

       int last = ra.Count - 1;

       KeyValuePair<CDBColumnInfoArray, CDBVariantArray> item = ra[last];

       item.Value.AddRange(rowData); //populate record data into receiving container ra

   };

   CMysql.DRowsetHeader rh = (handler) => {

       //rowset header comes here if available

       KeyValuePair<CDBColumnInfoArray, CDBVariantArray> item =

       new KeyValuePair<CDBColumnInfoArray, CDBVariantArray>(handler.ColumnInfo, new CDBVariantArray());

       ra.Add(item); //populate query column meta into receiving container ra

   };

   ok = mysql.Execute(vPData, er, r, rh);

}

Фрагмент кода 5: вызов хранимой процедуры MySQL, которая возвращает несколько наборов записей и параметров вывода

Как показано в приведенном выше фрагменте кода 5, вызвать хранимую процедуру через технологию SocketPro SQL-stream в конце очень просто.  Следует отметить, что все данные выходных параметров будут непосредственно скопированы в массив данных параметров передачи vPData. Обратный вызов rh осуществляется, когда поступает  набор записей метаданных в случае их доступности. Всякий раз, когда он поступает, будет осуществлен обратный вызов r. Вы можете заполнить все запрошенные метаданные и записи данных в произвольный контейнер, например ra, например, из двух обратных вызовов.

Исследование эффективности

Технология SQL-потока SocketPro имеет отличную производительность при доступе к базам данных для запросов и обновлений. Вы можете увидеть два тестовых проекта производительности MySQL (cppperf и netperf), доступные в socketpro/stream_sql/mysql/DBPerf/. Первый образец написан C ++, а другой - C #. Пример проекта mysqlperf writtern из C # предоставляется для сравнения производительности технологии SocketPro SQL-stream с производителем MySQL .NET.

Посмотрите данные об эффективности работы на рисунке 3, которые получены из трех дешевых виртуальных машин Google с твердотельным диском для бесплатной оценки. Все данные запрошены раз в миллисекунду для выполнения 10 000 запросов и 50 000 вставок. Исследование эффективности также сосредоточено на влиянии задержки сети на скорость доступа MySQL.

Рисунок 3: Данные об эффективности потоковой передачи данных технологии SocketPro SQL-stream на трех дешевых виртуальных машинах Google cloud

Наше исследование эффективности показывает, что легко получить запрос, выполняемый со скоростью 6500 (10 000 / 1,54) раз в секунду и сокет-соединение. Для вставки записей вы можете легко получить скорость, например, 43 000 (50 000 / 1,17) вставки в секунду для MySQL в локальной сети (LAN, кросс-машина, 0,2 мс / 2,0 Гбит / с). В локальной сети потоковая передача SocketPro может повысить производительность на 150% по сравнению с традиционным не потоковым подходом (SocketPro + Sync) для запроса. Для SQL-вставок улучшение будет более чем в семь раз (10 400 / 1,170 = 8,9). Возможности потоковой передачи SocketPro и потоковой передачи делают эффективность сети выше, что приводит к значительному улучшению по сравнению с существующим подходом к сотовой связи MySQL.

Рассмотрим глобальную сеть (WAN, кросс-область, 34 мс / 40 Мбит / с).Скорость потока запросов SocketPro SQL может составлять 5000 (10 000 / 2,00) раз в секунду и соединение сокетов. Для входящих записей скорость может легко составлять 17 600 записей (50 000 / 2.84) в секунду. Напротив, скорость запросов будет составлять до 30 запросов в секунду в глобальной сети, если клиент использует традиционный способ связи (поставщик SocketPro + Sync / MySQL.NET) для доступа к базе данных из-за высокой задержки, как показано на рисунке 8. Потоковая передача  SocketPro SQ L может в 170 (349000/2000 = 174,5) раз превышать в запросе быстрее, чем технология без потоковой передачи, при условии, что время обработки базы данных брандмауэра невелико при высокой задержке WAN (кросс-область). Если мы рассмотрим вставки SQL, улучшение может быть более 600 раз (1,726,000 / 2840 = 607).

После анализа данных производительности на рисунке 3 вы увидите, что технология потоковой передачи SocketPro действительно отлична для ускорения не только локального, но и удаленного доступа к базе данных.Во-вторых, данные о производительности для WAN были бы намного лучше, если бы тестовая WAN имела лучшую пропускную способность сети. Кроме того, SocketPro поддерживает встроенное сжатие, но это тестовое исследование не использует его. Если используется встроенная функция сжатия SocketPro, ее потоковые тестовые данные будут дополнительно улучшены в глобальной сети. Наконец, исследование эффективности завершено на дешевых виртуальных машинах только с одним или двумя процессорами. Данные о производительности будут лучше, если для тестирования используются специализированные машины.

Выполнение SQL в параллельном режиме с автоматическим восстановлением ошибок

Параллельное вычисление. Изучив предыдущие два простых примера, пришло время изучить следующий третий образец в каталоге socketpro/samples/auto_recovery/(test_cplusplus|test_java|test_python|test_sharp). SocketPro  создается снизу для поддержки параллельных вычислений. Вы можете распространять несколько операторов SQL на разные базы данных для одновременной обработки. Эта функция предназначена для улучшения масштабируемости приложения, как показано ниже в фрагменте кода 6.

using System;
using SocketProAdapter;
using SocketProAdapter.ClientSide;
class Program {
   static void Main(string[] args) {
       const int sessions_per_host = 2;
       string[] vHost = { "localhost", "192.168.2.172" };
       const int cycles = 10000;
       using (CSocketPool<CMysql> sp = new CSocketPool<CMysql>()) {
           //set a local message queue to backup requests for auto fault recovery
           sp.QueueName = "ar_sharp";
           
           //one thread enough
           CConnectionContext[,] ppCc = new CConnectionContext[1, vHost.Length * sessions_per_host];
           for (int n = 0; n < vHost.Length; ++n) {
               for (int j = 0; j < sessions_per_host; ++j) {
                   ppCc[0, n * sessions_per_host + j] =
                        new CConnectionContext(vHost[n], 20902, "root", "Smash123");
               }
           }
           bool ok = sp.StartSocketPool(ppCc);
           if (!ok) {
               Console.WriteLine("No connection and press any key to close the application ......");
               Console.Read(); return;
           }
           string sql = "SELECT max(amount), min(amount), avg(amount) FROM payment";
           Console.WriteLine("Input a filter for payment_id"); string filter = Console.ReadLine();
           if (filter.Length > 0) sql += (" WHERE " + filter); var v = sp.AsyncHandlers;
           foreach (var h in v) {
               ok = h.Open("sakila", (hsqlite, res, errMsg) => {
                   if (res != 0) Console.WriteLine("Error code: {0}, error message: {1}", res, errMsg);
               });
           }
           int returned = 0;
           double dmax = 0.0, dmin = 0.0, davg = 0.0;
           SocketProAdapter.UDB.CDBVariantArray row = new SocketProAdapter.UDB.CDBVariantArray();
           CAsyncDBHandler.DExecuteResult er = (h, res, errMsg, affected, fail_ok, lastId) => {
               if (res != 0)
                   Console.WriteLine("Error code: {0}, error message: {1}", res, errMsg);
               else {
                   dmax += double.Parse(row[0].ToString());
                   dmin += double.Parse(row[1].ToString());
                   davg += double.Parse(row[2].ToString());
               }
               ++returned;
           };
           CAsyncDBHandler.DRows r = (h, vData) => {
               row.Clear(); row.AddRange(vData);
           };
           CMysql mysql = sp.SeekByQueue(); //get one handler for querying one record
           ok = mysql.Execute(sql, er, r);
           ok = mysql.WaitAll();
           Console.WriteLine("Result: max = {0}, min = {1}, avg = {2}", dmax, dmin, davg);
           returned = 0; dmax = 0.0; dmin = 0.0; davg = 0.0;
           Console.WriteLine("Going to get {0} queries for max, min and avg", cycles);
           for (int n = 0; n < cycles; ++n) {
               mysql = sp.SeekByQueue();
               ok = mysql.Execute(sql, er, r);
           }
           foreach (var h in v) {
               ok = h.WaitAll();
           }
           Console.WriteLine("Returned = {0}, max = {1}, min = {2}, avg = {3}",
                              returned, dmax, dmin, davg);
           Console.WriteLine("Press any key to close the application ......"); Console.Read();
       }
   }
}

Фрагмент кода 6: Демонстрация параллельных вычислений SocketPro и функций автоматического восстановления ошибок

Как показано в приведенном выше фрагменте кода 6, мы могли бы запускать несколько неблокирующих сокетов для разных машин (localhost, 192.168.2.172), и каждая из двух машин базы данных имеет два сокета. Код открывает базу данных sakila по умолчанию для каждого из соединений (foreach (var h in v) {......}). Во-первых, код выполняет один запрос «SELECT max (сумма), min (сумма), avg (amount) FROM payment ...» для одной записи. Наконец, код отправляет запрос 10000 раз на две машины для параллельной обработки (для (int n = 0; n <cycles; ++ n) {......}). Каждая из записей будет суммироваться внутри Лямбда-выражения (CAsyncDBHandler.DExecuteResult er = (h, res, errMsg, affected, fail_ok, lastId) => {......};) в качестве обратного вызова для метода Execute. Следует отметить, что вы можете создавать несколько пулов для разных сервисов, размещенных на разных машинах. Как вы можете видеть, пул сокетов SocketPro может использоваться для значительного улучшения масштабируемости приложений.

Автоматическое восстановление: SocketPro может открывать файл локально и сохранять в нем все данные запроса перед отправкой этих запросов на сервер через сеть. Файл вызывается  локальной очередью сообщений или очередью сообщений клиента. Идея заключается в простой резервной копии всех запросов на автоматическое восстановление. Чтобы использовать эту функцию, вы должны установить имя очереди локальных сообщений (sp.QueueName = "ar_sharp";), как показано в приведенном выше фрагменте кода 6. Когда мы разрабатываем реальное приложение, очень часто приходится писать много кода для решения с различными ошибками связи. На самом деле, это обычно задача разработчиков программного обеспечения. Очередь сообщений клиента SocketPro очень упрощает обработку сообщений. Предположим, что машина 192.168.2.172 недоступна по одной из таких причин, как выключение компьютера, необработанное исключение, обслуживание программного обеспечения / аппаратного обеспечения и отсоединение сети и т. д. Событие закрытия сокета будет уведомлено либо немедленно, либо позже. Как только пул сокетов обнаруживает, что сокет закрыт, SocketPro автоматически объединит все запросы, связанные с соединением сокета, в другой сокет, который еще не закрыт для обработки.

Чтобы проверить эту функцию, вы можете опустить один из серверов MySQL во время выполнения вышеуказанных запросов и посмотреть, верны ли окончательные результаты.

Следует отметить, что UDAParts применил эту функцию ко всем службам SQL-потока SocketPro, асинхронной постоянной службе очереди сообщений и удаленной службе обмена файлами, чтобы упростить вашу разработку.

Точки обзора

Плагин SocketPro MySQL SQL-streaming не поддерживает курсоры вообще, но он обеспечивает все необходимые базовые функции базы данных клиента или сервера. Кроме того, плагин SQL-потока имеет следующие уникальные функции:

  1. Непрерывная встроенная обработка запросов / результатов и обработка SQL-потоков в режиме реального времени для лучшей эффективности сети, особенно для WAN
  2. Двунаправленная асинхронная передача данных между клиентом и сервером, но все асинхронные запросы при необходимости могут быть преобразованы в синхронные.
  3. Превосходная производительность и масштабируемость благодаря мощной коммуникационной архитектуре SocketPro
  4. Кэш в реальном времени для обновления, вставки и удаления таблицы, как показано на примере проекта test_cache в каталоге socketpro/stream_sql/mysql/test_cache
  5. Все запросы отменяются путем выполнения метода Cancel of class CClientSocket на стороне клиента
  6. Поддерживается как Windows, так и Linux
  7. Простая разработка для всех поддерживаемых языков разработки
  8. И клиентский, и серверный компонент являются потокобезопасными. Их можно легко повторно использовать в ваших многопоточных приложениях с гораздо меньшим количеством проблем, связанных с потоком
  9. Все запросы могут быть скопированы на стороне клиента и повторно отправлены на другой сервер для обработки, если сервер не работает по каким-либо причинам -  ошибка автоматического восстановления



No Comments

Add a Comment