Пример построения микросервисов с использованием apache kafka.
Задача:
Построить событийную архитектуру между приложениями(микросервисы)(не только 1с).
Общая логика решения задачи():
1. Приложение А генерирует событие
2. Другое приложение Б(В, Г..) "слушает" кафка (подписывается на событие)(если коротко)
3. Приложение А отправляет данные кафка (Поставщик)
4. кафка отправляет данные всем кто подписан на это событие (Потребитель)
5. Приложение Б(В, Г..) получает данные
Технологии:
1. 1с (поставщик, потребитель, tcp клиент)
2. Разнородный бэкенд (python, и тд) (поставщик, потребитель)
3. с++ 1с вк native (транспорт для tcp клиента 1с)
4. apache kafka (шина данных) https://kafka.apache.org
5. flask (python micro web сервер) http://flask.pocoo.org
6. tcp сервер (python)
7. KafkaConsumer, KafkaProducer (python) https://github.com/dpkp/kafka-python
Общая принцип работы(модули):
1с(1):
При запуске 1с стартует tcp клиент (вк native)(3) .Устанавливает соединение с tcp сервером(6) (передавая ид пользователя из 1с). Подписывается на событие вк native. При наступления события уходит в контекст 1с через ОбработкаВнешнегоСобытия для последующей обработки. Для генерации событий выполняет гет, пост запросы на веб сервер(5).
tcp сервер(6):
Хеш таблица открытых сокетов(активных соединений) и ид пользователя. Подписывается на события шины данных для отправки клиенту 1с. При наступлении событий отправляет данные в сокет(клиенту 1с). Генерирует события при авторизации(установка сокета), выхода(разрыв, закрытие сокета) в шину данных.
web сервер(5):
Обрабатывает запросы со стороны 1с(прокси для кафка) и отправляет в шину данных. Также подписывается на события сторонних приложений требующих отправку данных для 1с(изменился статус звонка asterisk, прилетел тригер для обновления формы, перегнать что то с &НаСервере на &НаКлиенте , и тд).
Шина данных(4):
Управляет оркестром приложений(упорядоченная очередь). При наступлении события маршутизирует подписчикам, если подсписчиков нет, хранит у себя, доставит потом когда появятся.
Разнородный бэкенд(2):
Подписывается, генерирует события в шину данных.(например web app time line(журнал записи), asterisk ami client(автообзвон, фонер), из предыдущих статей)
Шаг 1. tcp сервер
from socketserver import TCPServer, ThreadingMixIn, BaseRequestHandler
from datetime import datetime, date
from kafka import KafkaConsumer
from kafka import KafkaProducer
import _thread
class ThreadedTCPServer(ThreadingMixIn, TCPServer):
pass
class OnesSocketServerHandler(BaseRequestHandler):
def handle(self):
self.callback(self.server, self.request, self.client_address)
class OnesSocketServer():
handler = OnesSocketServerHandler
users = {}
producer = KafkaProducer(bootstrap_servers=['192.168.777.555:9092'])
def __init__(self):
self.handler.callback = self.callback
_thread.start_new_thread(self.start_listen_event)
def callback(self, server, request, client_address):
print(f"""CONNECTED LISTENER {client_address}""")
u_ref = None
while True:
try:
buf = request.recv(256).decode('utf-8')
except:
break
#print(f"""recv {buf}""")
if not buf:
break
buf = buf.strip('
')
if buf == 'logout':
break
elif buf[0:5] == 'u_ref' and len(buf[6:]) > 1:
u_ref = buf[6:]
u_ref = u_ref.upper()
self.users.setdefault(u_ref, dict())
self.users[u_ref][client_address] = request
authorize(user = u_ref, address = client_address[0], producer = self.producer)
print(f"""DISCONNECTED LISTENER {client_address}""")
if not u_ref is None:
user = self.users.get(u_ref)
if not user is None:
sock = user[client_address]
if not sock is None:
if sock._closed != True:
sock.close()
del user[client_address]
#for x in user:
# sock = self.users[u_ref][client_address]
# if sock._closed:
# continue
# sock.close()
# del self.users[x][address]
if len(user) == 0:
del self.users[u_ref]
unauthorize(user = u_ref, address = client_address[0], producer = self.producer)
pass
def start_listen_event(self):
consumer = KafkaConsumer('ones_socket_send_user', 'ones_socket_send_user_send_all', 'ones_socket_add_listener',
group_id='my-group', bootstrap_servers=['192.168.777.555:9092'])
for message in consumer:
if message.topic=="ones_socket_send_user":
u_ref = message.key.decode('utf-8')
user = self.users.get(u_ref)
if user is None:
continue
for x in user:
sock = user[x]
if sock._closed:
continue
sock.sendall(message.value)
elif message.topic=="ones_socket_send_user_send_all":
for x in self.users:
for y in self.users[x]:
sock = self.users[x][y]
if sock._closed:
continue
sock.sendall(message.value)
elif message.topic=="ones_socket_add_listener":
pass
#user = self.users.get(u_ref)
#if user is None:
# continue
#for x in user:
# sock = user[x]
# if sock._closed:
# continue
# sock.sendall(message.value)
def authorize(**data):
user = data['user']
address = data['address']
producer = data['producer']
key = user.encode('utf-8')
value = address.encode('utf-8')
try:
producer.send(f'user_auth', key=key, value=value)
except:
print(f"""send to kafka failed {value}""")
pass
def unauthorize(**data):
user = data['user']
address = data['address']
producer = data['producer']
key = user.encode('utf-8')
value = address.encode('utf-8')
try:
producer.send(f'user_unauth', key=key, value=value)
except:
print(f"""send to kafka failed {value}""")
pass
TCP_IP = '192.168.777.222'
TCP_PORT = 11000
if __name__ == '__main__':
ones_serv = OnesSocketServer()
server = ThreadedTCPServer((TCP_IP, TCP_PORT), OnesSocketServerHandler)
print('starting ones socket server '+str(TCP_IP)+':'+str(TCP_PORT)+' (use <Ctrl-C> to stop)')
server.serve_forever()
установка сокета(прослушка):
while True:
try:
buf = request.recv(256).decode(‘utf-8’)
except:
break
отправка в сокет:
sock.sendall(message.value)
генерация события для шины данных:
producer.send(f’user_auth’, key=key, value=value)
подписка,обработка события:
consumer = KafkaConsumer(‘ones_socket_send_user’, ‘ones_socket_send_user_send_all’, ‘ones_socket_add_listener’,
group_id=’my-group’, bootstrap_servers=[‘192.168.777.555:9092’])
for message in consumer:
if message.topic=="ones_socket_send_user":
Шаг 2. web сервер
from server import app
if __name__ == '__main__':
HOST = '192.168.777.222'
PORT = 8095
app.run(HOST, PORT, threaded=True)
from flask import request
from server import app, producer
import json
HEADERS = {"Access-Control-Allow-Origin": "*", "Access-Control-Allow-Methods": "POST", "Access-Control-Allow-Headers": "Content-Type"}
@app.route('/')
@app.route('/index')
def index():
return [], 200, HEADERS
@app.route('/api_user_event', methods=['GET', 'POST'])
def api_user_event():
if request.method == 'POST':
body = request.data.decode('utf-8-sig')
data = json.loads(body)
user = data.get('u_ref', '')
jdata = data.get('jdata', '')
else:
user = request.args.get('u_ref', '')
jdata = request.args.get('jdata', '')
if len(user) == 0 or len(jdata) == 0:
return 'bad args', 400, HEADERS
key = user.encode('utf-8')
value = jdata.encode('utf-8')
try:
producer.send(f'ones_socket_send_user', key=key, value=value)
except:
print(f"""send to kafka failed {value}""")
return 'send to kafka', 200, HEADERS
@app.route('/api_user_event_send_all', methods=['GET', 'POST'])
def api_user_event_broadcast():
if request.method == 'POST':
body = request.data.decode('utf-8-sig')
data = json.loads(body)
user = data.get('u_ref', '')
jdata = data.get('jdata', '')
else:
user = request.args.get('u_ref', '')
jdata = request.args.get('jdata', '')
if len(user) == 0 or len(jdata) == 0:
return 'bad args', 400, HEADERS
key = user.encode('utf-8')
value = jdata.encode('utf-8')
try:
producer.send(f'ones_socket_send_user_send_all', key=key, value=value)
except:
print(f"""send to kafka failed {value}""")
return 'send to kafka', 200, HEADERS
from flask import Flask
from flask_cors import CORS
from kafka import KafkaProducer, KafkaConsumer
import _thread
def start_listen_event():
consumer = KafkaConsumer('ami_client_event', 'upd_external_event_trigger',group_id='my-group', bootstrap_servers=['192.168.77.555:9092'])
for message in consumer:
try:
producer.send(f'ones_socket_send_user', key=message.key, value=message.value)
except:
print(f"""send to kafka failed {value}""")
app = Flask(__name__)
producer = KafkaProducer(bootstrap_servers=['192.168.777.555:9092'])
CORS(app, support_credentials=True)
import server.views
_thread.start_new_thread(start_listen_event)
регистрация енд поинт(точка входа, путь и тд):
@app.route(‘/api_user_event’, methods=[‘GET’, ‘POST’])
def api_user_event(): …
Шаг 3. Пример генерации из стороннего приложения
if len(user_list) != 0:
d = {}
d['Действия'] = 'External_Record_Log_Event'
d['Данные'] = {}
d['Данные']['d_ref'] = d_ref
d['Данные']['date'] = date
d['event_date'] = datetime.now()
jdata = json.dumps(d, default=common.json_serial)
user_list = self.users_cache.get(d_ref)
for u_ref in user_list:
key = u_ref.encode('utf-8')
value = jdata.encode('utf-8')
try:
self.producer.send(f'upd_external_event_trigger', key=key, value=value)
except:
print(f"""send to kafka failed {jdata}""")
asterisk ami event triger
def event_listener(event, **kwargs):
if event.name in EVENTS_NOT_LISTEN:
return
#print(f"""{str(event)}""")
if event.name == 'Newstate' or event.name == 'Hangup' or event.name == 'VarSet':
if event.name == 'VarSet':
if not event.keys['Variable'] in EVENTS_VARS_LISTEN:
return
for x in USER_EVENTS:
if USER_EVENTS[x]['channel'] in event.keys['Channel']:
d = {}
d['Действия'] = 'Asterisk_Event'
d['Данные'] = event.keys
d['Данные']['name'] = event.name
d['Данные']['id_ext'] = USER_EVENTS[x]['id_ext']
d['Данные']['event_date'] = datetime.now()
jdata = json.dumps(d, default=json_serial)
do_user_event(jdata, USER_EVENTS[x]['user'])
print(f"""{str(event)}""")
Прослушка ами:
client = AMIClient(address=AMI_ADDRESS, port=AMI_PORT)
future = client.login(username=AMI_USER, secret=AMI_SECRET)
client.add_event_listener(event_listener)
Шаг 4. 1с
Процедура ПодключитьСокет(Кнопка)
ПодключитьВнешнююКомпоненту("СокетВК", "Socket", ТипВнешнейКомпоненты.Native);
Сокет = Новый("AddIn.Socket.EventListener");
Сокет.хост = "192.168.777.222";
Сокет.ссылка = "A0BDD89D6773B96411E789535C2BC380";
Сокет.подключить();
события будут приходить в ОбработкаВнешнегоСобытия(Источник, Событие, Данные),
обработку нужно делать там
КонецПроцедуры
Процедура Get(Кнопка)
СтрокаДжейсон = "тест";
пользователь = "A0BDD89D6773B96411E789535C2BC380";
Сервер = "192.168.777.222";
ПортСервера = "8095";
ТаймАут = 1;
Прокси = Новый ИнтернетПрокси(Ложь);
ТелоЗапроса = "jdata="+СтрокаДжейсон+"&u_ref="+пользователь;
Соединение = Новый HTTPСоединение(Сервер, ПортСервера,,,Прокси,ТаймАут);
Запрос = Новый HTTPЗапрос("/api_user_event?"+ТелоЗапроса);
Результат = Соединение.Получить(Запрос);
КонецПроцедуры
Процедура Post(Элемент)
СтрокаДжейсон = "тест";
пользователь = "A0BDD89D6773B96411E789535C2BC380";
Сервер = "192.168.777.222";
ПортСервера = "8095";
ТаймАут = 1;
Прокси = Новый ИнтернетПрокси(Ложь);
структура = новый Структура;
структура.Вставить("jdata", СтрокаДжейсон);
структура.Вставить("u_ref", пользователь);
ТелоЗапроса = JSON.лЗаписатьJSON(структура);
Соединение = Новый HTTPСоединение(Сервер, ПортСервера,,,Прокси,ТаймАут);
Запрос = Новый HTTPЗапрос("/api_user_event");
Запрос.УстановитьТелоИзСтроки(ТелоЗапроса);
Результат = Соединение.ОтправитьДляОбработки(Запрос);
КонецПроцедуры
Процедура Post_500(Элемент)
Для Сч=0 По 500 Цикл
СтрокаДжейсон = "тест";
пользователь = "A0BDD89D6773B96411E789535C2BC380";
Сервер = "192.168.777.222";
ПортСервера = "8095";
ТаймАут = 1;
Прокси = Новый ИнтернетПрокси(Ложь);
структура = новый Структура;
структура.Вставить("jdata", СтрокаДжейсон+Строка(Сч));
структура.Вставить("u_ref", пользователь);
ТелоЗапроса = JSON.лЗаписатьJSON(структура);
Соединение = Новый HTTPСоединение(Сервер, ПортСервера,,,Прокси,ТаймАут);
Запрос = Новый HTTPЗапрос("/api_user_event");
Запрос.УстановитьТелоИзСтроки(ТелоЗапроса);
Результат = Соединение.ОтправитьДляОбработки(Запрос);
КонецЦикла;
КонецПроцедуры
Шаг 5. вк native
Полное создание вк native в статье не рассматривается.
int EventListener::connect_socket(char *host, u_short port, char *subject) {
SOCKET s;
WSADATA wsadata;
int error = WSAStartup(0x0202, &wsadata);
if (error) return error;
if (wsadata.wVersion != 0x0202) {
WSACleanup();
return wsadata.wVersion;
}
char buf[MAXLEN];
std::wstring ws_buf(MAXLEN, 0);
while (1) {
SOCKADDR_IN target;
target.sin_family = AF_INET;
target.sin_port = htons(port);
target.sin_addr.s_addr = inet_addr(host);
s = ::socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
if (s == INVALID_SOCKET) return s;
try {
if (::connect(s, (SOCKADDR *)&target, sizeof(target)) == SOCKET_ERROR) {
throw std::exception("SOCKET ERROR");
}
}
catch (std::exception &e) {
std::string es(e.what());
std::wstring ew;
copy(es.begin(), es.end(), back_inserter(ew));
callback(std::wstring(L"Socket"),
(std::wstring) L"Connection to server:" + std::to_wstring((long long)port) + L" failed! " + ew);
Sleep(30 * 1000);
continue;
}
callback(std::wstring(L"Socket"), (std::wstring) L"Connected to server:" + std::to_wstring((long long)port));
::send(s, subject, strlen(subject), 0);
buf[0] = 0;
long size;
while (1) {
size = recv(s, buf, MAXLEN, 0);
if (!s || size <= 0 || !strlen(buf)) break;
buf[size] = 0;
size = MultiByteToWideChar(CP_UTF8, 0, &buf[0], sizeof(buf), NULL, 0);
MultiByteToWideChar(CP_UTF8, 0, &buf[0], sizeof(buf), &ws_buf[0], size);
callback(std::wstring(L"Socket"), ws_buf);
}
::closesocket(s);
}
return 0;
}
Шаг 6. 1c web services
Используется для передачи сообщения из бизнес приложения для отправки в 1с (серверная часть)(для дальнейшей обработка бизнес логики в контексте 1с) (создание документа, старт бизнес процесса, заполнение справочника) (конечно можно сделать сделать прямой вызов без использования сообщений, здесь идея в том что на один ответ 1c ws может быть в дальнейшем подписано несколько подписчиков, тем самым все получат сообщение и будут выполнят свою логику далее)
import json
from suds.cache import NoCache
from suds.client import Client
from kafka import KafkaProducer, KafkaConsumer
import _thread
import time
DEF_USER = 'administrator'
DEF_PASSWORD = 'administrator'
DEF_WS = "http://my_ip_base/my_base/ws/my_gate.1cws?wsdl"
upd_event_lists = []
def send(username, password, jdata, method, ws, client):
#try:
# client = Client(ws, username = username, password = password, cache = NoCache(), timeout = 60)
#except Exception as e:
# print(e)
# return
eval_str = 'client.service.%s(%s)' % (method, jdata and "%r" % jdata or '')
try:
res = eval(eval_str)
except Exception as e:
print(e)
return
res = json.loads(res)
client = None
print(f"""eval_str->{res}""")
return res
def upd_loop(ind_upd_list):
while True:
do_upd_loop(ind_upd_list)
def do_upd_loop(ind_upd_list):
t_loop = 0.250
for x in upd_event_lists[ind_upd_list]:
try:
client = Client(ws, username = username, password = password, cache = NoCache(), timeout = 60)
except Exception as e:
print(e)
continue
send(x['username'], x['password'], x['jdata'], x['method'], x['ws'], client)
upd_event_lists[ind_upd_list].remove(x)
time.sleep(t_loop)
upd_event_lists.append([])
upd_event_lists.append([])
upd_event_lists.append([])
upd_event_lists.append([])
#upd_event_lists.append([])
#upd_event_lists.append([])
#upd_event_lists.append([])
#upd_event_lists.append([])
for i, val in enumerate(upd_event_lists):
_thread.start_new_thread(upd_loop, (i,))
#producer = KafkaProducer(bootstrap_servers=['192.168.5.131:9092'])
consumer = KafkaConsumer('ones_ws_event', group_id='my-group', bootstrap_servers=['192.168.5.131:9092'])
for message in consumer:
print(f"""ones_ws_event->{message}""")
method = message.key.decode('utf-8')
data = message.value.decode('utf-8')
jdata = data
username = DEF_USER
password = DEF_PASSWORD
ws = DEF_WS
data = json.loads(data)
if not data.get('event_setting') is None:
event_setting = data.get('event_setting')
username = event_setting.get('username')
password = event_setting.get('password')
ws = event_setting.get('ws')
jdata = event_setting.get('jdata')
lens = []
for i, val in enumerate(upd_event_lists):
lens.append((len(upd_event_lists[i]), i))
min_list = min(lens)
upd_event_lists[min_list[1]].append({'username':username, 'password':password, 'ws':ws, 'jdata':jdata, 'method':method})
#_thread.start_new_thread(send, (username, password, jdata, method, ws,))
#res = send(username, password, jdata, method, ws)
Подключение к ws:
client = Client(ws, username = username, password = password, cache = NoCache(), timeout = 60)
Вызов метода в контексте 1с(метод ws сервиса):
eval_str = ‘client.service.%s(%s)’ % (method, jdata and "%r" % jdata or »)
try:
res = eval(eval_str)
except Exception as e:
print(e)
return
Конструкция вида: upd_event_lists.append([]), и затем _thread.start_new_thread(upd_loop, (i,)), сделано для того чтобы не завалить ws 1c. Те может случится так что прилетит много запросов и если делать send сразу происходит отказ в установке соединения (Удаленный компьютер отверг запрос на подключение, еррор вин сок и тд.)(проверено примерно на 1000 сообщения поданных вход модулю который выполнял _thread.start_new_thread(send, (username, password, jdata, method, ws,)) для каждого принятого сообщения, сгенерировав их в кафку предварительно с другого модуля, произошел отказ(пользователей было 400 на момент приема сообщений) ну и 1с чета затупила сразу, в плане установки новых соединений с рпхостом(рагент сдох короче)).
import json
from kafka import KafkaProducer, KafkaConsumer
#import _thread
import time
import requests
from requests.auth import HTTPBasicAuth
headers = {'Content-type': 'application/json'}
#'Accept': 'text/plain'}
ONES_USER = ''
ONES_PASSWORD = ''
ONES_HOST = '192.168.555.560'
ONES_HTTP = 'http://192.168.555.560/AA'
consumer = KafkaConsumer('ones_http_event', group_id='my-group', bootstrap_servers=['192.168.5.131:9092'])
producer = KafkaProducer(bootstrap_servers=['192.168.5.131:9092'])
for message in consumer:
print(f"""ones_http_event->{message}""")
method = message.key.decode('utf-8')
data = message.value.decode('utf-8')
jdata = data
username = ONES_USER
password = ONES_PASSWORD
data = json.loads(data)
id_request = None
if not data.get('id_request') is None:
id_request = data.get('id_request')
if not data.get('event_setting') is None:
event_setting = data.get('event_setting')
username = event_setting.get('username')
password = event_setting.get('password')
jdata = event_setting.get('jdata')
if not jdata.get('id_request') is None:
id_request = jdata.get('id_request')
auth_ones = HTTPBasicAuth(username, password)
r = None
try:
r = requests.get(f"""{ONES_HTTP}/hs/gate/v1?method={method}&data={jdata}""", headers=headers, auth=auth_ones)
#data=res_str,
except Exception as e:
print(e)
res = ''
if not r is None:
if r.status_code != 200:
res = r.reason
print(r.reason)
else:
res = r.text
if not id_request is None:
key = id_request.encode('utf-8')
value = res.encode('utf-8')
producer.send('ones_http_event_response', key=key, value=value)
в 1с это http сервис. Это работает быстрее ws, потому что ws это soap over http. Чтобы понять нужно cмотреть в структуру вызовов клиент-сервер SOAP.(получение wsdl, вызов метода soap и тд). http сервис — это голый http — get/post/put/head
Примеры работы:
1c посылает гет во внешнее приложение->внешнее приложение получает событие, обрабытвает генерирует событие на обновление клиента 1с отправляет в кафка->кафка марштузирует tcp серверу->tcp сервер отпраляет в сокет всем клиентам которые зарегистрированы по данному ид->1с обрабатывает внешние событие.
1c посылает гет, пост на веб сервер->веб сервер отправляет в кафка->кафка марштузирует tcp серверу->tcp сервер отпраляет в сокет всем клиентам которые зарегистрированы по данному ид->1с обрабатывает внешние событие.
Пример отправки и получения 500 запросов + в процессе отправки генерация из внешних приложений.
Пример подключения/отключения в процессе отправки.
Деплой и идеология apache kafka в статье не рассматривается(написано тысячи статей).
Для мониторинга apache kafka используется https://github.com/yahoo/kafka-manager.
Метрика:
upd для тех кто мало что понял мануал для dot net от ms, смотреть в общие принципы и идеи
С помощью какой программы снимали GIF ?
ну хоть внешнюю компоненту-то выложите ((
Статья для того чтобы похвастаться?
Примеры чтобы самим потестить где?
Чтобы скомпилить одну ДЛЛ-ку надо поставить 2 гигабайтную студию. ОК.
Запускаю билд примера с ИТС — 211 ошибок при сборке. WTF??
kafka-manager как поставить? там в описании указано что надо ставить Play Framework и компилить бинарник через sbt. Так?
(1)
Судя по ватермаркам автор пользовался Icecream Screen Recorder
мне больше нравится LICEcap
Какова производительность отправки?
фух, поставил зукипер, кафку, кафкоманагер. Осталось компоненту внешнюю получить!
(8) не забудьте про питон и скрипты запуска вебсервера
Так я не понял со стороны 1С сообщения получаются на tcp сервер? т.е. на стороне 1С должна быть поднята ВК с tcp сервером, так?
(1) Icecream Screen Recorder дальше в гугле convert web m to gif
(3) примеры чего?
(5) нет есть путь короче имя ему докер хаб
(10) нет. не сервер, tcp клиент. в статье написано.
(9) всё есть в статье
(8) дак там ничего в ней нет. сишный tcp клиент под интерфейсом addin 1c. ну если сильно нужно то конечно могу приложить
(16) код tcp клиента есть в статье. вообще на этом форуме возможно их есть с десяток.
(7) в сравнении с чем? на гифках видно думаю при отправке 500 постов. с учетом того что штатными средствами (без использования бд) межсеансовая клиентская передача не возможна, считаю ваш вопрос не корректным. но суть не сколько во межсеансовой передаче, сколько во внешнем воздействии (события)
(4) ну дак вин сдк нужно для версии длл, иначе как?
(19) Классно, как раз хотел закачать еще 3 ГБ библиотек и почпокаться с С++ первый раз в жизни
(20) больше 3х если для крос
(13) 6 тысяч результатов по «kafka», в первых нескольких нет инфы, что в них есть zkeeper, kafka-manager. Опять нет конкретики
(22)https://www.google.ru/search?client=opera&q=kafka+manager+docker&sourceid=opera&ie=UTF-8&oe=UTF-8
хз первая наверно
(18) сколько пакетов в секунду размером в 1 байт отправляется
(24) сколько пакетов отправляется откуда?
(25) в кафку с клиента
(14)
Меня смутили:
внешнее событие генерит компонента?
клиенты же отправляют…
(24) там все над сишные, крестовые поделки(питон(консумер, продакшен кафка), 1с аддин), отсюда скорость.(ну да ртос далеко, ну так реалтайм до 1с, дальше конечно просадка). более 100 сокетов.сразу. к чему такой вопрос?
(27) генерит компонента? да. клиенты же отправляют не всегда, в чем суть то и есть. поставщиком может быть как 1с так и не 1с. и клиенты чего?? в статье же разделение логики вроде как описывалось. есть часть которая отвечает только за отправку в сокеты для 1с, есть часть которая генерирует эти события, есть часть которая генерирует события которые нужно в сокет отправить. объяснил?
(23) Было бы круто сделать свой докер со всеми упомянутыми средствами — zkeeper, kafka, kafka-man, flask, и одной командой разворачивать всю схему
(2) добавил
(30) zkeeper, kafka, kafka-man, flask, …. windows, 1c, excel, word, doom…
а если по делу суть контейнеров как раз так и заключается в разделении приложения
те.
вот есть кафка
есть кафка менеджер
есть бизнес поделки
вот они как бы вместе взаимодействуют.
оркестр
это мое мнение, и вот так вот мы делаем.
flask вообще пакет питона, ставится одной строкой в докер бизнес приложения(если в имидже его не было).
(26) добавил гифку по скорости передачи, там где с браузера передача идет (последняя гифка). это под рабочей нагрузкой.
(33) господи….цифру напишите — потолок.
Например отправляется 200 сообщений в секунду. Пиковая нагрузка полторы тыщи.
(28) как можно по гифке можно оценить скорость? Вот 100 сокетов — это уже полезно…значит на 100 пользователях возможно не упадет =)
Хочу оценить порядок скорости со своим велосипедом.
(35) должно упасть? если и упадет кафка доставит чуть позже. см идеологию
(34) потолок чего? отказа?
(36) хорошо…
У меня стоит двадцать датчиков которые генерируют 100 событий в секунду. Смогу ли я отправлять в кафку 2000 сообщений из клиента 1С? А если да — сколько я могу поставить еще датчиков что бы уперется в предел отправки?
(36) по гифке? визуально явно там не байты ходят. вам метрика нужна? я могу замерить, потом окажется у вас железо не то, деплой не тот, нагрузка не та., и тд…
(38) не упрется в том то и дело! что упорядоченная очередь. те как там в 1с прийдет дело 3 тье, главное их в стэк очередь. и если честно генерацию датчиков 2000 в сек наверно лучше не делать из 1с. в моем примере отправка 500 постов в первом случае, 5000 в другом на скринах, не упало.
а потянет кафка перекидывания 1..2-х мегабайтными XML-ями?
Еще вопрос, как можно в реальном времени смотреть за нагрузкой/очередями в кафке?
(39)
да какая разница какое железо…там в 100% упрется в 1С. По гифке 1 сообщение в секунду =)
(41) потянет 100% (тут вопрос не кафке уже а в железе и оси), но лучше не так. создать файл, сделать ивент, получить ивент типа прочитай. (типа выставил указатель), получил ответ снял с регистрации, и тд. это как бы и есть событийная логика. а не гонять данные. на крайняк кафка может в стримы.
(42) да на скринах есть(нет передачи потому что нужно включить логи). прям реал тайм график построить кафка манеджер не сможет, можно самому запилить там делов… еще вопросы?
(43) да упрется в 1с так как карусельная виртуальная машина(выполнение потоков по карусели)
(46) почему и высокую нагрузку в обход 1с и предлагал делать, а там куда запишет и как не столь важно. наверно имеет смысл после кафки писать в дб сразу. тогда показания будут корректными. на 1с только отображать
Вы нагрузочное тестирование вообще проводили?
(45) Как все это запускается? У вас все сервисы в докере? Вроде можно композером объединить все в одну кучу?
И кстати tcp-сервер на питоне надо в виде сервиса завернуть? или как запускать чтоб постоянно работал?
(49) в скрипте работает постоянно. ибо main. в статье разные приложения. даже расписано что за чо отвечает. ключевое слово «шаг»
ответ на вопрос » кстати tcp-сервер на питоне надо в виде сервиса завернуть?» screen
(48) отправки 20000 в сек сообщений с 2000 клиентов из 1с. конечно нет. а смысл ? в статье нет речи о посторении скады. тесты были для бизнес приложений и взаимодействия 1с. написано выше было более 100 открытых сокетов и пример передачи. какой вопрос? вы что хотите цифры? дак разверните у себя сделайте замер…
(51) а смысл кафки в вашем контексте? 100 открытых сокетов держит древний селерон с гигом оперативы…а 1 сообщение в секунду можно и в sqllite писать и читать…
Нагнать хайпа ? ))
(52) смысл во внешнем воздействии сторонних приложений и обработкой в 1с. обработка ивента например на обновление формы, реакция на смену статуса звонка, реакция межсеансового события 1с клиента. sqllite писать и читать это возможно ваш подход, не мой. прочитайте статью от начала. просьба.
(50) то что постоянно работает я понял, я имею в виду как сделать автостарт и фоновую работу всех служб. В винде можно специальной софтиной из любой программы сделать обычную службу. У вас все эти сервисы на линуксе как демоны или както по другому?
(54) «У вас все эти сервисы» (он один контейнер из статьи) «на линуксе» -да, «как демоны» — нет. в винде батник нормально сработает. в дебе в полне себе баш сработает вида:
screen -dmUS queue выпелено
.py — это часть из статьи вынесенная отдельно для генерации событий отправки в сокет, можно так не делать, можно делать…
screen -dmUS server python3.6 /OnesSocketServer/OnesSocketServer/ones_socket_server.py
screen -dmUS proxy python3.6 /OnesSocketServer/ProxyOnesSocketServer/runserver.py
screen -dmUS consumer python3.6 /OnesSocketServer/ProxyOnesSocketServer/ones_socket_consumer
Насколько я вижу взаимодействие происходит с вебсервером(TCP-сервером)…что и где там дальше по цепочке впринцепе не важно…меняем кафку на реббит или базу данных — и ничего для 1С не меняется.
Вот в чем вопрос…почему кафка?? а ответа нет.
Так варианты:
1С + webserver + кафка
1C + регистр сведений + чтение периодическое
1С + native api + rabbit mq
(56) ну давайте не будем про 1C + регистр сведений + чтение периодическое
1С + native api + rabbit mq
что за native сможет в туда и сюда(клиент и сервер)?
(57) есть сорсы(крестовые) для натс под 1с(натив вк) стоит ли мерятся? не вижу смысла.
(56) «Насколько я вижу взаимодействие происходит с вебсервером(TCP-сервером) » ну да дальше то сокет уже открыт, считай лонг пол для сторонних приложений. бля, до сих пор не понимаю. в чем вопрос то??
(57)
Вместо метрики — gif.
Вместо внятного объяснения выбора кафки — хрень с натсом.
Нормально бы написали:
Пользовался тем то — устраивало
Пользовался тем то — стало лучше
Пользовался тем-то — еще лучше и стабильней
Внедрил кафку — ваще огонь…
p.s. Yellow RabbitMQ — вроде может и отправлять и принимать, и на сервере и на клиенте. Если что я не от них =)
(58) а толку? Снова же: конфигурация не та…деплой говно, метрику не дам, нагрузочного тестирования не делал =))))
(60) метрика:
(60) еще метрика
(60) еще метрика:
Хватит метрик? или еще добавить?
(61) «конфигурация не та…деплой говно» — возможно, это будут ваши первые слова когда наши метрики не совпадут и у вас будет ниже, «метрику не дам» — дам,
«нагрузочного тестирования не делал» — вероятно делал
Спасибо!
(4) бедняги подвиндозные )))