Апацхе Кафка

Како читати податке са Кафке помоћу Питхона

Како читати податке са Кафке помоћу Питхона
Кафка је систем дистрибуираних порука отвореног кода за слање поруке у подељеним и различитим темама. Стриминг података у реалном времену може се применити коришћењем Кафке за пријем података између апликација. Има три главна дела. То су произвођач, потрошач и теме. Произвођач се користи за слање поруке на одређену тему и свака порука се прилаже кључем. Потрошач се користи за читање поруке на одређену тему из скупа партиција. Подаци примљени од произвођача и ускладиштени на партицијама на основу одређене теме. Многе библиотеке постоје у питхону да би створиле произвођача и потрошача за изградњу система за размену порука користећи Кафку. Како се подаци из Кафке могу читати помоћу питхона, приказано је у овом упутству.

Предуслов

Морате инсталирати потребну питхон библиотеку за читање података из Кафке. Питхон3 се користи у овом упутству за писање скрипте потрошача и произвођача. Ако пип пакет раније није инсталиран у вашем Линук оперативном систему, морате инсталирати пип пре инсталирања Кафка библиотеке за питхон. питхон3-кафка користи се у овом упутству за читање података из Кафке. Покрените следећу команду да бисте инсталирали библиотеку.

$ пип инсталира питхон3-кафка

Читање једноставних текстуалних података са Кафке

Произвођач може послати различите врсте података о одређеној теми коју потрошач може прочитати. Како се једноставни текстуални подаци могу слати и примати од Кафке користећи произвођача и потрошаче, приказано је у овом делу овог водича.

Направите датотеку са именом произвођач1.пи са следећим питхон скриптом. КафкаПродуцер модул се увози из библиотеке Кафка. Листа посредника треба да дефинише у време иницијализације производног објекта да би се повезала са Кафка сервером. Подразумевана лука Кафке је '9092'. Аргумент боотстрап_серверс се користи за дефинисање имена хоста са портом. 'Фирст_Топиц'је постављено као име теме помоћу које ће произвођач послати текстуалну поруку. Даље, једноставна текстуална порука, 'Здраво из Кафке'се шаље помоћу пошаљи () метод КафкаПродуцер на тему, 'Фирст_Топиц'.

произвођач1.пи:

# Увезите КафкаПродуцер из Кафка библиотеке
од кафка импорт КафкаПродуцер
# Дефинишите сервер са портом
боотстрап_серверс = ['лоцалхост: 9092']
# Дефинишите назив теме где ће се порука објавити
топицНаме = 'Прва тема'
# Иницијализација произвођачке променљиве
произвођач = КафкаПродуцер (боотстрап_серверс = боотстрап_серверс)
# Објави текст у дефинисаној теми
произвођач.пошаљи (имеНаме, б'Здраво од кафке ... ')
# Одштампај поруку
принт ("Порука послата")

Направите датотеку са именом потрошач1.пи са следећим питхон скриптом. КафкаЦонсумер модул се увози из библиотеке Кафка за читање података из Кафке. сис овде се користи модул за завршетак скрипте. Исто име хоста и број порта произвођача користе се у скрипти потрошача за читање података из Кафке. Име теме потрошача и произвођача мора бити исто, што је 'Фирст_топиц'.  Затим се потрошачки објект иницијализује са три аргумента. Име теме, ИД групе и подаци о серверу. за петља се овде користи за читање текста послатог од произвођача Кафке.

потрошач1.пи:

# Увезите КафкаЦонсумер из библиотеке Кафка
од кафка импорт КафкаЦонсумер
# Увези сис модул
импорт сис
# Дефинишите сервер са портом
боотстрап_серверс = ['лоцалхост: 9092']
# Дефинишите назив теме одакле ће порука примати
топицНаме = 'Прва тема'
# Иницијализујте потрошачку променљиву
потрошач = КафкаЦонсумер (име теме, гроуп_ид = 'гроуп1', боотстрап_серверс =
боотстрап_серверс)
# Прочитајте и одштампајте поруку потрошача
за мсг у потрошачу:
принт ("Назив теме =% с, Порука =% с"% (мсг.тема, порука.вредност))
# Прекини скрипту
сис.излаз ()

Излаз:

Покрените следећу наредбу са једног терминала да бисте извршили скрипту произвођача.

$ питхон3 произвођач1.пи

Следећи излаз ће се појавити након слања поруке.

Покрените следећу наредбу са другог терминала да бисте извршили потрошачку скрипту.

$ питхон3 потрошач1.пи

Излаз приказује назив теме и текстуалну поруку послату од произвођача.

Читање података обликованих у ЈСОН-у из Кафке

Податке у формату ЈСОН може послати произвођач Кафка, а потрошач Кафка их прочита тхе јсон модул питхон-а. Како се ЈСОН подаци могу серијски и десериализовати пре слања и пријема података помоћу питхон-кафка модула приказано је у овом делу овог водича.

Направите питхон скрипту са именом произвођач2.пи са следећом скриптом. Други модул под називом ЈСОН се увози са КафкаПродуцер модул овде. валуе_сериализер аргумент се користи са боотстрап_серверс аргумент овде да би се иницијализовао објекат произвођача Кафке. Овај аргумент указује на то да ће ЈСОН подаци бити кодирани помоћу 'утф-8'знак подешен у време слања. Следеће, ЈСОН форматирани подаци се шаљу на тему која се назива ЈСОНтопиц.

произвођач2.пи:

# Увезите КафкаПродуцер из Кафка библиотеке
од кафка импорт КафкаПродуцер
# Увезите ЈСОН модул за сериализацију података
импорт јсон
# Иницијализујте произвођачку променљиву и поставите параметар за ЈСОН кодирање
произвођач = КафкаПродуцер (боотстрап_серверс =
['лоцалхост: 9092'], валуе_сериализер = ламбда в: јсон.депоније (в).енцоде ('утф-8'))
# Пошаљите податке у ЈСОН формату
произвођач.сенд ('ЈСОНтопиц', 'наме': 'фахмида', 'емаил': '[емаил протецтед]')
 
# Одштампај поруку
принт ("Порука послата ЈСОНтопицу")

Направите питхон скрипту са именом потрошач2.пи са следећом скриптом. КафкаЦонсумер, сис и ЈСОН модули су увезени у овој скрипти. КафкаЦонсумер модул се користи за читање података обликованих у ЈСОН-у са Кафке. ЈСОН модул се користи за декодирање кодираних ЈСОН података послатих од произвођача Кафка. Сис модул се користи за завршавање скрипте. валуе_десериализер аргумент се користи са боотстрап_серверс како би се дефинисало како ће се ЈСОН подаци декодирати. Следећи, за петља се користи за испис свих потрошачких записа и ЈСОН података преузетих из Кафке.

потрошач2.пи:

# Увезите КафкаЦонсумер из библиотеке Кафка
од кафка импорт КафкаЦонсумер
# Увези сис модул
импорт сис
# Увезите јсон модул за сериализацију података
импорт јсон
# Иницијализујте потрошачку променљиву и поставите својство за ЈСОН декодирање
потрошач = КафкаЦонсумер ('ЈСОНтопиц', боотстрап_серверс = ['лоцалхост: 9092'],
валуе_десериализер = ламбда м: јсон.оптерећења (м.децоде ('утф-8')))
# Читајте податке из кафке
за поруку код потрошача:
испис ("Евиденција потрошача: \ н")
испис (порука)
принт ("\ нЧитање из ЈСОН података \ н")
принт ("Име:", порука [6] ['име'])
принт ("Емаил:", порука [6] ['емаил'])
# Прекини скрипту
сис.излаз ()

Излаз:

Покрените следећу наредбу са једног терминала да бисте извршили скрипту произвођача.

$ питхон3 произвођач2.пи

Скрипта ће исписати следећу поруку након слања ЈСОН података.

Покрените следећу наредбу са другог терминала да бисте извршили потрошачку скрипту.

$ питхон3 потрошач2.пи

Следећи излаз ће се појавити након покретања скрипте.

Закључак:

Подаци се могу слати и примати у различитим форматима од Кафке користећи питхон. Подаци се такође могу сачувати у бази података и преузети из базе података помоћу Кафке и питхона. Кући, овај водич ће помоћи кориснику питхона да почне да ради са Кафком.

Како снимити и стримовати своју играћу сесију на Линук-у
У прошлости се играње игара сматрало само хобијем, али с временом је играчка индустрија забележила огроман раст у погледу технологије и броја играча. ...
Најбоље игре за ручно праћење
Оцулус Куест је недавно представио сјајну идеју ручног праћења без контролера. Са све већим бројем игара и активности које извршавају подршку формално...
Како приказати ОСД прекривач у целом екрану Линук апликација и игара
Играње игара преко целог екрана или коришћење апликација у режиму целог екрана без ометања може вам одсећи релевантне системске информације видљиве на...