Краткое введение в систему отправки и обработки непрерывных SQL-потоков (часть 1: SQLite)

Tags: SQL

Применение SocketPro в различных базах данных для непрерывной пакетной обработки запросов / результатов и обработки потока в реальном времени с двунаправленной асинхронной передачей данных/

Большинство систем баз данных клиентского сервера поддерживают только синхронную связь между клиентской базой и бэкэнд-базой данных с использованием блокирующего сокета и некоторого “болтливого” протокола, который требует, чтобы клиент или сервер дождались подтверждения, прежде чем отправлять новый фрагмент данных. Время ожидания, которое также называется задержкой, может начинаться с нескольких десятых миллисекунд для локальной сети (LAN) до сотен миллисекунд для глобальной сети (WAN). Большое время ожидания может значительно ухудшить качество приложения.

К счастью, UDAParts разработала мощную и безопасную инфраструктуру связи под названием SocketPro, которая написана с непрерывной встроенной пакетной обработкой  запросов / результатов и возможностями обработки потока в реальном времени с использованием асинхронной передачи данных и параллельного вычисления для лучшей эффективности сети, простоты разработки, производительности, масштабируемости и множества замечательных и даже уникальных функций на сайте (https://github.com/udaparts/socketpro).

Кроме того, UDAParts применил мощную структуру SocketPro к популярным открытым исходным базам данных, таким как SQLite и MySQL, а также другим пользователям с помощью драйверов ODBC для поддержки непрерывной отправки и обработки SQL-потока. В конце концов, эти предварительно скомпилированные компоненты и открытые исходные коды для баз данных полностью и навсегда свободны для общего использования.

Для уменьшения сложности обучения мы используем базу данных SQLite в качестве первого образца для первой статьи, а MySQL - второй образец для второй статьи.

Исходные коды и образцы

Все связанные исходные коды и образцы расположены по адресу https://github.com/udaparts/socketpro. После копирования его на свой компьютер с помощью GIT обратите внимание на подкаталог usqlite внутри каталога socketpro/samples/module_sample.

Вы можете заметить, что эти образцы создаются из сред разработки .NET, C / C ++, Java и Python. Они могут быть скомпилированы и запущены на платформах Linux или Windows. Если вы не привыкли к разработке C / C ++, UDAParts также распространяет предварительно скомпилированные тестовые приложения, test_ssqlite для сервера и test_csqlite для клиента внутри каталога socketpro / bin / (win | linux), потому что эти тестовые приложения написаны на C / C ++

Кроме того, вы можете узнать, как загрузить службу SocketPro в серверное приложение в знакомой среде разработки, просмотрев учебный образец all_servers в каталоге socketpro / tutorials / (cplusplus | csharp | vbnet | java / src)/all_servers. Однако для объяснения мы используем только код C # (socketpro/samples/module_sample/usqlite / test_csharp).

Перед запуском этих образцов приложений вы должны распространить системные библиотеки внутри каталога socketpro / bin в системном каталоге.

В отношении инфраструктуры связи SocketPro вы также можете обратиться к документации по ее разработке в руководстве socketpro/doc/SocketPro.pdf.

Основная функция

SocketPro записывается снизу для поддержки параллельных вычислений с использованием одного или нескольких пулов неблокирующих сокетов. Каждый из пулов может состоять из одного или нескольких потоков, а каждый из потоков содержит один или несколько неблокируемых сокетов на стороне клиента. Однако мы просто используем здесь один пул для понятной демонстрации, а пул для этого примера состоит из одного потока и одного сокета на стороне клиента, как показано в нижеприведенном первом фрагменте кода.

static void Main(string[] args)

{

   Console.WriteLine("Remote host: ");

   string host = Console.ReadLine();

   CConnectionContext cc = new CConnectionContext

                           (host, 20901, "usqlite_client", "password_for_usqlite");

   using (CSocketPool<CSqlite> spSqlite = new CSocketPool<CSqlite>())

   {

       //start a socket pool with 1 thread hosting 1 non-blocking socket

       if (!spSqlite.StartSocketPool(cc, 1, 1))

       {

           Console.WriteLine("Failed in connecting to remote async sqlite server");

           Console.WriteLine("Press any key to close the application ......");

           Console.Read();

           return;

       }

       CSqlite sqlite = spSqlite.Seek(); //get one async sqlite handler

 

       //open a global database at server side because an empty string is given

       bool ok = sqlite.Open("", (handler, res, errMsg) =>

       {

           Console.WriteLine("res = {0}, errMsg: {1}", res, errMsg);

       });

 

       //prepare two test tables, COMPANY and EMPLOYEE

       TestCreateTables(sqlite);

 

       //a container for receiving all tables data

       List<KeyValuePair<CDBColumnInfoArray, CDBVariantArray>> lstRowset =

                        new List<KeyValuePair<CDBColumnInfoArray, CDBVariantArray>>();

 

       ok = sqlite.BeginTrans(); //start manual transaction

 

       //test both prepare and query statements

       TestPreparedStatements(sqlite, lstRowset);

 

       //test both prepare and query statements involved with reading and updating BLOB and large text

       InsertBLOBByPreparedStatement(sqlite, lstRowset);

           

       ok = sqlite.EndTrans(); //end manual transaction

           

       sqlite.WaitAll();

 

       //display received rowsets

       int index = 0;

       Console.WriteLine();

       Console.WriteLine("+++++ Start rowsets +++");

       foreach (KeyValuePair<CDBColumnInfoArray, CDBVariantArray> it in lstRowset)

       {

           Console.Write("Statement index = {0}", index);

           if (it.Key.Count > 0)

               Console.WriteLine(", rowset with columns = {0}, records = {1}.",

                                 it.Key.Count, it.Value.Count / it.Key.Count);

           else

               Console.WriteLine(", no rowset received.");

           ++index;

       }

       Console.WriteLine("+++++ End rowsets +++");

       Console.WriteLine();

       Console.WriteLine("Press any key to close the application ......");

       Console.Read();

   }

}

Фрагмент кода 1: Основная функция для демонстрации использования системы SQL-потока SocketPro на стороне клиента

Запуск одного пула сокетов: приведенный выше фрагмент кода 1 запускает один пул сокетов, который имеет только один рабочий поток, который содержит только один неблокирующий сокет для ясности демонстрации, используя один экземпляр контекста соединения. Однако, при необходимости вы можете создать несколько пулов в одном клиентском приложении. Впоследствии мы получаем один асинхронный обработчик sqlite.

Открытие базы данных: мы можем отправить запрос на открытие базы данных сервера sqlite. Если первый вход представляет собой пустую или нулевую строку, как показано в этом примере, мы открываем, например, один экземпляр глобальной базы данных сервера usqlite.db. Если вам нравится создавать собственную базу данных, вы можете просто указать непустую допустимую строку. Кроме того, вам нужно установить обратный вызов или Лямбда-выражение для отслеживания возвращаемого сообщения об ошибке со стороны сервера, если вам нравится показанное. Отмечается, что SocketPro поддерживает только асинхронную передачу данных между клиентом и сервером, чтобы запрос мог быть введен с одним или несколькими обратными вызовами для обработки возвращаемых данных. Это полностью отличается от синхронной передачи данных. Кроме того, мы создаем экземпляр контейнера, который используется для приема всех наборов записей в следующих запрошенных наборах строк.

Потоковые операторы SQL: имейте в виду, что SocketPro поддерживает потоковое воспроизведение всех типов запросов в любом количестве на одном сеансе неблокирующего сокета без особых усилий по проектированию. Разумеется, мы можем передавать все операторы SQL наравне с другими, как показано в приведенном выше фрагменте кода 1. Все службы SocketPro SQL-stream поддерживают эту особенность для лучшей эффективности сети, что значительно улучшает производительность доступа к данным. Насколько нам известно, вы не можете найти такую замечательную особенность из других технологий. Если вы его нашли, сообщите нам. Как и для обычных API-интерфейсов баз данных, технология SocketPro SQL-stream также поддерживает ручную транзакцию, как показано в приведенном выше фрагменте кода 1. Далее мы собираемся разработать три функции: TestCreateTables, TestPreparedStatements и InsertBLOBByPreparedStatement.

Ожидание полной обработки: поскольку SocketPro использует асинхронную передачу данных по умолчанию, он должен предоставить способ ожидания, пока все запросы и возвращаемые результаты будут отправлены, возвращены и обработаны. SocketPro предлагает один уникальный метод WaitAll на стороне клиента для этой цели. Если вам нравится, вы можете использовать этот метод для преобразования всех асинхронных запросов в синхронные.

TestCreateTables

Эта функция внутренне состоит из отправки двух операторов SQL DDL для создания двух таблиц: COMPANY и EMPLOYEE, как показано в приведенном ниже фрагменте кода 2.

static void TestCreateTables(CSqlite sqlite)

{

   string create_table = "CREATE TABLE COMPANY(ID INT8 PRIMARY KEY NOT NULL,name CHAR(64)NOT NULL,_

                          ADDRESS varCHAR(256)not null,Income float not null)";

   bool ok = sqlite.Execute(create_table, (handler, res, errMsg, affected, fail_ok, id) =>

   {

       Console.WriteLine("affected = {0}, fails = {1}, oks = {2}, res = {3}, errMsg: {4}, _

            last insert id = {5}", affected, (uint)(fail_ok >> 32), (uint)fail_ok, res, errMsg, id);

   });

   create_table = "CREATE TABLE EMPLOYEE(EMPLOYEEID INT8 PRIMARY KEY NOT NULL unique,_

                   CompanyId INT8 not null,name NCHAR(64)NOT NULL,_

                   JoinDate DATETIME not null default(datetime('now')),IMAGE BLOB,_

                   DESCRIPTION NTEXT,Salary real,FOREIGN KEY(CompanyId)REFERENCES COMPANY(id))";

   ok = sqlite.Execute(create_table, (handler, res, errMsg, affected, fail_ok, id) =>

   {

       Console.WriteLine("affected = {0}, fails = {1}, oks = {2}, res = {3}, errMsg: {4}, _

         last insert id = {5}", affected, (uint)(fail_ok >> 32), (uint)fail_ok, res, errMsg, id);

   });

}

Фрагмент кода 2: Создание двух таблиц SQLite при потоковой передаче по технологии SocketPro SQL-stream

Вы можете выполнить любое количество операторов SQL в потоке, как показано в фрагменте кода 2. Каждый из запросов состоит из одного входного оператора SQL и одного необязательного обратного вызова (или Лямбда-выражения) для отслеживания ожидаемых результатов возврата. Опять же, это отличается от обычного подхода к доступу к базе данных, поскольку SocketPro использует асинхронную передачу данных для связи.

TestPreparedStatements

Технология SocketPro SQL-stream поддерживает подготовку SQL-инструкции, как и обычные API-интерфейсы доступа к базе данных. В частности, технология SocketPro SQL-stream даже поддерживает подготовку нескольких инструкций SQL на одном снимке для базы данных сервера SQLite, как показано в приведенном ниже фрагменте кода 3.

static void TestPreparedStatements(CSqlite sqlite,

  List<KeyValuePair<CDBColumnInfoArray, CDBVariantArray>> ra)

{

   //a complex SQL statement combined with query and insert prepare statements

   string sql_insert_parameter = "Select datetime('now');

   INSERT OR REPLACE INTO COMPANY(ID,NAME,ADDRESS,Income)VALUES(?,?,?,?)";

   bool ok = sqlite.Prepare(sql_insert_parameter, (handler, res, errMsg) =>

   {

       Console.WriteLine("res = {0}, errMsg: {1}", res, errMsg);

   });

 

   CDBVariantArray vData = new CDBVariantArray();

   vData.Add(1);

   vData.Add("Google Inc.");

   vData.Add("1600 Amphitheatre Parkway, Mountain View, CA 94043, USA");

   vData.Add(66000000000.0);

 

   vData.Add(2);

   vData.Add("Microsoft Inc.");

   vData.Add("700 Bellevue Way NE- 22nd Floor, Bellevue, WA 98804, USA");

   vData.Add(93600000000.0);

 

   vData.Add(3);

   vData.Add("Apple Inc.");

   vData.Add("1 Infinite Loop, Cupertino, CA 95014, USA");

   vData.Add(234000000000.0);

 

   //send three sets of parameterized data in one shot for processing

   ok = sqlite.Execute(vData, (handler, res, errMsg, affected, fail_ok, id) =>

   {

       Console.WriteLine("affected = {0}, fails = {1}, oks = {2}, res = {3}, errMsg: {4},

       last insert id = {5}", affected, (uint)(fail_ok >> 32), (uint)fail_ok, res, errMsg, id);

   }, (handler, rowData) =>

   {

       //rowset data come here

       int last = ra.Count - 1;

       KeyValuePair<CDBColumnInfoArray, CDBVariantArray> item = ra[last];

       item.Value.AddRange(rowData);

   }, (handler) =>

   {

       //rowset header meta info comes here

       KeyValuePair<CDBColumnInfoArray, CDBVariantArray> item = new KeyValuePair<CDBColumnInfoArray,

                                CDBVariantArray>(handler.ColumnInfo, new CDBVariantArray());

       ra.Add(item);

   });

}

Фрагмент кода 3: Отправка нескольких наборов параметров для обработки нескольких операторов SQL однократным событием по технологии SocketPro SQL-stream

Отмечается, что образец подготовки SQL-запроса состоит из одного запроса и одного оператора insert. Когда  вызывается функция, клиент ожидает три набора возвращенных записей и три записи, вставленные в таблицу COMPANY. Образец предназначен для демонстрации мощностей технологии SocketPro SQL-stream. В реальности вы, вероятно, не подготовили комбинированный оператор SQL, содержащий несколько базовых операторов SQL. Если вы используете параметризованный оператор, вам необходимо сначала отправить запрос на подготовку. После получения массива данных, как показано в приведенном выше фрагменте кода 3, вы можете отправить несколько наборов данных параметров для обработки от клиента к серверу одним событием в конце. Если у вас большой объем данных, вы можете повторно вызвать метод  Execute, не требуя повторной подготовки оператора.

Затем нам нужно больше деталей для обработки возвращаемых наборов записей. Метод Execute имеет три обратных вызова или Лямбда-выражения для второго, третьего и четвертого входных параметров, кроме первого ввода для массива данных параметров. Всякий раз, когда  подходит набор записей, третий обратный вызов будет автоматически вызываться обработчиком клиента SQLite для метаданных столбцов записей. Если фактические записи доступны, будет вызван второй обратный вызов, и вы можете заполнить данные в контейнере ra. В конце, первый обратный вызов будет вызван для того, чтобы вы могли отслеживать количество затронутых записей и последний идентификационный номер вставки в случае успеха. Если мы возьмем фрагмент кода 3 в качестве образца, третий обратный вызов будет вызываться три раза, и первый обратный вызов будет вызываться только один раз, но ожидается, что время вызова второго обратного вызова зависит как от количества записей, так и от размер одной записи.

InsertBLOBByPreparedStatement

Теперь вы можете видеть, что технология SocketPro SQL-stream предоставляет все необходимые функции для доступа к базе данных. До конца этой статьи мы собираемся использовать образец, чтобы показать, как обрабатывать большие двоичные и текстовые объекты в технологии SocketPro-stream. Как правило, трудно получить доступ к большим объектам внутри баз данных эффективно. Тем не менее, это действительно очень просто сделать с технологией SocketPro SQL-stream как для разработки, так и для эффективности, как показано ниже в фрагменте кода 4.

Просмотрев фрагмент кода 4, вы обнаружите, что он такой же, как фрагмент кода 3, хотя этот фрагмент длиннее. Поэтому этот подход действительно хорош для того, чтобы разработчик программного обеспечения повторно использовал технологию SQL-потока SocketPro для обработки всех типов полей таблицы базы данных в одном и том же стиле кодирования для легкой разработки.

SocketPro всегда делит большой двоичный или текстовый объект на куски сначала на стороне клиента и сервера. Впоследствии SocketPro отправляет эти меньшие куски на другую сторону. В конце SocketPro восстановит исходный большой двоичный или текстовый объект из собранных меньших кусков. Это незаметно осуществляется во время выполнения для уменьшения объема памяти.

static void InsertBLOBByPreparedStatement(CSqlite sqlite,

          List<KeyValuePair<CDBColumnInfoArray, CDBVariantArray>> ra)

{

   string wstr = "";

   //prepare junk data for testing

   while (wstr.Length < 128 * 1024)

   {

       wstr += "广告做得不那么夸张的就不说了,看看这三家,都是正儿八经的公立三甲,

        附属医院,不是武警,也不是部队,更不是莆田,都在卫生部门直接监管下,照样明目张胆地骗人。";

   }

   string str = "";

   while (str.Length < 256 * 1024)

   {

       str += "The epic takedown of his opponent on an all-important voting day was

               extraordinary even by the standards of the 2016 campaign -- and quickly drew

               a scathing response from Trump.";

   }

 

   //a complex SQL statement combined with two insert and query prepare statements

   string sqlInsert = "insert or replace into employee(EMPLOYEEID,CompanyId,name,

   JoinDate,image,DESCRIPTION,Salary)values(?,?,?,?,?,?,?);select * from employee where employeeid=?";

   bool ok = sqlite.Prepare(sqlInsert, (handler, res, errMsg) =>

   {

       Console.WriteLine("res = {0}, errMsg: {1}", res, errMsg);

   });

   CDBVariantArray vData = new CDBVariantArray();

   using (CScopeUQueue sbBlob = new CScopeUQueue())

   {

       //first set of data

       vData.Add(1);

       vData.Add(1); //google company id

       vData.Add("Ted Cruz");

       vData.Add(DateTime.Now);

       sbBlob.Save(wstr);

       vData.Add(sbBlob.UQueue.GetBuffer());

       vData.Add(wstr);

       vData.Add(254000.0);

       vData.Add(1);

 

       //second set of data

       vData.Add(2);

       vData.Add(1); //google company id

       vData.Add("Donald Trump");

       vData.Add(DateTime.Now);

       sbBlob.UQueue.SetSize(0);

       sbBlob.Save(str);

       vData.Add(sbBlob.UQueue.GetBuffer());

       vData.Add(str);

       vData.Add(20254000.0);

       vData.Add(2);

 

       //third set of data

       vData.Add(3);

       vData.Add(2); //Microsoft company id

       vData.Add("Hillary Clinton");

       vData.Add(DateTime.Now);

       sbBlob.Save(wstr);

       vData.Add(sbBlob.UQueue.GetBuffer());

       vData.Add(wstr);

       vData.Add(6254000.0);

       vData.Add(3);

   }

   //send three sets of parameterized data in one shot for processing

   ok = sqlite.Execute(vData, (handler, res, errMsg, affected, fail_ok, id) =>

   {

       Console.WriteLine("affected = {0}, fails = {1}, oks = {2}, res = {3}, errMsg: {4},

       last insert id = {5}", affected, (uint)(fail_ok >> 32), (uint)fail_ok, res, errMsg, id);

   }, (handler, rowData) =>

   {

       //rowset data come here

       int last = ra.Count - 1;

       KeyValuePair<CDBColumnInfoArray, CDBVariantArray> item = ra[last];

       item.Value.AddRange(rowData);

   }, (handler) =>

   {

       //rowset header meta info comes here

       KeyValuePair<CDBColumnInfoArray, CDBVariantArray> item =

       new KeyValuePair<CDBColumnInfoArray, CDBVariantArray>(handler.ColumnInfo, new CDBVariantArray());

       ra.Add(item);

   });

}

Фрагмент кода 4: таблицы вставки и запроса, содержащие несколько больших двоичных и текстовых объектов с технологией SocketPro SQL-stream

Исследование эффективности

Технология SQL-потока SocketPro имеет отличную производительность при доступе к базам данных для запросов и обновлений. Вы можете увидеть два проекта тестирования производительности (cppperf и netperf), доступные в socketpro / samples / module_sample / usqlite / DBPerf /. Первый образец написан C ++, а другой - C #. Кроме того, база данных MySQL sakila sample, которая находится в каталоге socketpro / samples / module_sample / usqlite / DBPerf, используется для воспроизведения после запуска образца test_csqlite для создания глобальной базы данных SQLite usqlite.db.

Наше исследование эффективности показывает, что выполнить запрос со скоростью 12 000 раз в секунду и подключение сокетов не составляет сложности. Что касается вставки вы можете легко получить скорость  равную 50 000 вставок в секунду для SQLite.

Точки обзора

Служба SocketPro SQLite SQL-поток обеспечивает все необходимые основные функции базы данных клиента или сервера, но также предоставляет следующие уникальные функции:

  1. Непрерывная встроенная обработка запросов / результатов и обработка SQL-потока в реальном времени для лучшей эффективности сети
  2. Двунаправленная асинхронная передача данных между клиентом и сервером, но все асинхронные запросы могут быть преобразованы в синхронные
  3. Превосходная производительность и масштабируемость благодаря мощной коммуникационной архитектуре SocketPro
  4. Кэш реального времени для обновления, вставки и удаления таблицы. Вы можете установить обратный вызов на стороне клиента для записи таблицы отслеживания, добавления, удаления и обновления
  5. Все запросы отменяются путем выполнения метода Отмена класса CClientSocket на стороне клиента
  6. Поддерживаются как Windows, так и Linux
  7. Простая разработка для всех поддерживаемых языков разработки
  8. Оба клиентских и серверных компонента являются потокобезопасными. Их можно легко повторно использовать в ваших многопоточных приложениях с гораздо меньшим количеством проблем, связанных с потоком

No Comments

Add a Comment