Coder Social home page Coder Social logo

book-notes's People

Contributors

albin3 avatar

Stargazers

 avatar

Watchers

 avatar  avatar  avatar

book-notes's Issues

python使用全局代理

最近调研python中连接在代理后的服务时,每个服务都需要各自设置代理配置。有的资源的连接库甚至不支持代理设置。

中间看了 pyHive及依赖库, requests, urllib3 的源码都无果

今天找到一个可以在全局设置网络代理的方法如下:

需安装 pySocks

参考: https://www.coder.work/article/345603

import socks
import socket

# Can be socks4/5
socks.setdefaultproxy(socks.PROXY_TYPE_SOCKS4,'127.0.0.1', 9050)
socket.socket = socks.socksocket

# Magic!
def getaddrinfo(*args):
    return [(socket.AF_INET, socket.SOCK_STREAM, 6, '', (args[0], args[1]))]

# do some requests

python连接hive

python连接hive一直是个痛点,之前用pyHive一直没走通,今天尝试用 https://github.com/cloudera/impyla 后走通了

mac下安装impyla:

  1. pip install impyla -- impyla-0.17.0 pure-sasl-0.6.2 thrift-0.11.0 thrift-sasl-0.4.3
  2. 参考这里安装sasl pip install git+https://github.com/JoshRosen/python-sasl.git@fix-build-with-newer-xcode
  3. pip install thrift_sasl 完成安装,并用python连接hive

ubuntu18下安装impyla

  1. pip install impyla
  2. 参考这里执行 sudo apt-get install libsasl2-dev
  3. apt-get install libsasl2-dev libsasl2-2 libsasl2-modules-gssapi-mit
  4. pip install thrift_sasl

ref: https://github.com/cloudera/impyla

centos下安装impyla

pip install impyla              -- impyla-0.17.0 pure-sasl-0.6.2 thrift-0.11.0 thrift-sasl-0.4.3
yum install cyrus-sasl-md5 cyrus-sasl-plain cyrus-sasl-gssapi cyrus-sasl-devel

pip install sasl             -- sasl-0.3.1
pip install thrift-sasl         -- thrift-sasl-0.4.3

ref1: https://ask.hellobi.com/blog/ysfyb/18251
ref2: https://github.com/cloudera/impyla

通过socks5使用ssh和scp

ssh访问: ssh -o ProxyCommand="nc -X 5 -x <socks_host>:<socks_port> %h %p" @<ssh_host>
scp: scp -o ProxyCommand="nc -X 5 -x <socks_host>:<socks_port> %h %p" @<ssh_host>:

python 连接 cdh kerberos 认证

pip install bit_array sqlalchemy impyla thrift thrift_sasl impyla krbcontext hdfs[kerberos] pykerberos

#!/usr/bin/env python
# -*- coding=utf-8 -*-
# @Author  : magong
# @Time    : 2022/5/31

import os
import logging

from impala.dbapi import connect
from krbcontext import krbcontext
import pandas as pd
from pydash import py_
from traffic_util.connector.base_query import IBaseQuery
from traffic_util.util import get_logger

from config import config

logger = get_logger(__name__)


class HiveClient(IBaseQuery):
    __instance = None

    @classmethod
    def instance(cls, *args, **kwargs):
        if cls.__instance:
            return cls.__instance
        else:
            try:
                with krbcontext(using_keytab=True, principal=py_.get(args[0], 'principal'), keytab_file=py_.get(args[0], 'keytab_file_path')):
                    logger.warning('Make sure `krb5.conf` is placed in `/etc` directory and host is added in `/etc/hosts`. ')
                    conn = connect(host=py_.get(args[0], 'host'),
                                   port=py_.get(args[0], 'port'),
                                   auth_mechanism=py_.get(args[0], 'auth_mechanism'),
                                   kerberos_service_name=py_.get(args[0], 'kerberos_service_name'),
                                   database=py_.get(args[0], 'database'))
                    return cls(conn)
            except Exception as e:
                # return None
                raise e

    def __init__(self, conn):
        self.conn = conn

    def execute(self, sql):
        """执行."""
        try:
            cur = self.conn.cursor()
            cur.execute('set hive.exec.dynamic.partition.mode=nonstrict')
            cur.execute('set hive.exec.max.dynamic.partitions.pernode=10000')
            cur.execute('set hive.exec.max.dynamic.partitions=10000')
            cur.execute(sql)
        except Exception as err:
            print(err)
            raise err

    def query(self, sql):
        """查询."""
        cur = self.conn.cursor()
        res = None
        try:
            cur.execute('set hive.exec.dynamic.partition.mode=nonstrict')
            cur.execute('set hive.exec.max.dynamic.partitions.pernode=10000')
            cur.execute('set hive.exec.max.dynamic.partitions=10000')
            cur.execute(sql)
            res = cur.fetchall()
        except Exception as err:
            print("查询失败, %s" % err)
            raise err
        finally:
            return res
    
    def execute_with_result(self, sql):
        logger.info('''
            executing sql: >>> {sql} <<<
            '''.format(sql=sql))
        
        cur = self.conn.cursor()
        cur.execute('set hive.exec.dynamic.partition.mode=nonstrict')
        cur.execute('set hive.exec.max.dynamic.partitions.pernode=10000')
        cur.execute('set hive.exec.max.dynamic.partitions=10000')
        cur.execute(sql)
        res = cur.fetchall()
        # cur.close()

        return cur, res

    def create_table(self):
        pass

    def exist_table(self):
        pass

    def query_to_df(self, sql):
        """查询输出DataFrame."""
        with self.conn.cursor() as cursor:
            logger.info('''
                executing sql: >>> {sql} <<<
                '''.format(sql=sql))
            
            cursor.execute('set hive.exec.dynamic.partition.mode=nonstrict')
            cursor.execute('set hive.exec.max.dynamic.partitions.pernode=10000')
            cursor.execute('set hive.exec.max.dynamic.partitions=10000')
            cursor.execute(sql)
            columns = [col[0] for col in cursor.description]
            records = [dict(zip(columns, row)) for row in cursor.fetchall()]
            results = pd.DataFrame(records)
            results.columns = columns
            return results
    
    def close(self):
        self.conn.close()
        
    def execute_sql(self, sql, sql_hints=None, show_log=False):
        if sql.endswith(';'):
            sql = sql[:-1]
        if show_log:
            logger.info('''
executing sql: >>> {sql} <<<
'''.format(sql=sql))
        return self.execute(sql)
    
    def run_sql_return_plain_json(self, sql):
        result = []

        cur, fetched_list = self.execute_with_result(sql)
        if cur.description is None or len(cur.description) < 1:
            return result

        columns = [col[0] for col in cur.description]
        for row in fetched_list:
            meta = {}
            for col, val in zip(columns, row):
                meta[col] = val
            result.append(meta)

        return result
    
    def run_sql_with_logview_return_plain_json(self, sql):
        pass 
    
    def check_table_exists(self, table_name):
        '''检查表是否存在.'''
        # TODO: 
        print(table_name)
        query_res = self.query('''SELECT COUNT(*) FROM information_schema.tables WHERE table_name = '{0}' '''.format(table_name.replace('\'', '\'\'')))
        if query_res[0] == 1:
            print(true) 
        # return len(self.query('''SHOW TABLES LIKE '{table_name}' '''.format(table_name=table_name))) == 1
    
    def truncate_table_if_exists(self, table_name, force_trunc=False):
        '''如果表存在,清空表数据.如果不是algtmp开头的表,需要增加强制清空参数.'''
        if not table_name.startswith('algtmp_') and not force_trunc:
            logger.info('table {table_name} is not algtmp type, truncate it anyway please use force_trunc argument.'.format(table_name=table_name))
        else:
            if self.check_table_exists(table_name):
                self.query(f'''TRUNCATE TABLE {table_name} ''')
                
    
    def execute_without_result(self, sql):
        cur = self.conn.cursor()
        try:
            cur.execute('set hive.exec.dynamic.partition.mode=nonstrict')
            cur.execute('set hive.exec.max.dynamic.partitions.pernode=10000')
            cur.execute('set hive.exec.max.dynamic.partitions=10000')
            cur.execute(sql)
        except Exception as e:
            logger.error('Error occurred when executing:')
            logger.error(sql)
            raise e
    
    
    def overwrite_data_to_partition(self, table_name, partition, data_list, batch_size=100000):
        '''
        data_list字段顺序要完全和已有表字段顺序一致:
        '''

        logger.info('saving to table "%s" partition "%s" total records: %d' % (table_name, partition, len(data_list)))
        # save with batch_size
        batch_n = 0
        names = list(data_list[0].keys())
        while batch_n * batch_size < len(data_list):
            logger.info('saved %d.' % (batch_n * batch_size))
            base_sql = 'insert overwrite table {} partition({})'.format(table_name, partition) + ' values'

            row_values = []
            for meta in data_list[batch_n * batch_size : (batch_n + 1) * batch_size]:
                values = []
                for col in names:
                    # values.append("{}".format(meta[col]))
                    values.append("'{}'".format(str(meta[col]).replace("'", "''")))
                    pass
                row_values.append('(' + ','.join(values) + ')')
            sql = base_sql + ', '.join(row_values)
            self.execute_without_result(sql)

            batch_n += 1
            pass
        logger.info('saved to table "%s" partition "%s" total records: %d' % (table_name,partition, len(data_list)))
        pass
            

if __name__ == '__main__':
    hive_client = HiveClient.instance(config.get('hive_conn_info'))
    # print(hive_client.query('show tables'))
    print(hive_client.run_sql_return_plain_json('''
        select distinct dma_id
        from qdwater_cdm.dwd_res_rltn_wateruser_dma
    '''))
    hive_client.close()

iptables白名单

  1. iptables -nvL INPUT 查看现有规则
  2. iptables -I INPUT -p tcp --dport 10071 -j REJECT
  3. iptables -I INPUT -s 192.168.0.0/16 -p tcp --dport 10071 -j ACCEPT
  4. iptables -nvL INPUT 查看添加效果
  5. iptables -D INPUT -s 192.168.0.0/16 -p tcp --dport 10071 -j ACCEPT

chain 里的规则是顺序执行,所以可以形成白名单效果

ref: https://www.liuvv.com/p/a8480986.html

vimrc

syntax enable
syntax on
set autoindent
set ts=2
set shiftwidth=2
set expandtab
set nobackup
set nu

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.