'how to filter None in Row Type Pyflink

I'm using Pyflink and was wondering if there is more Generic way to filter None value or handle none json format.

# main():
    try:
        data_stream = data_stream.map(transform, output_type=Types.ROW([Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING()]))
        data_stream = data_stream.filter(lambda x: x != Row(None, None, None, None, None, None, None))
        # Debugging purpose
        data_stream.print()
# transform()
def transform(value):
    dlq_producer = KafkaProducer(acks=0, compression_type='gzip',
    bootstrap_servers=['192.168.9.221:9091','192.168.9.221:9092','192.168.9.221:9093'],
    value_serializer=lambda x: x.encode('utf-8'))
    try:
        json_dict = json.loads(value)
        mapper_dict = {
                'H1': 'symbol',
                'H2': 'exchange',
                'H3': 'security_type',
                'D2124': 'performance_id',
                'S12': 'company_name',
                'S9': 'currency',
                'S19': 'isin' # optional
        }
        json_dict = dict((mapper_dict[key], value) for (key,value) in json_dict.items())

        if 'isin' not in json_dict:
                json_dict['isin'] = 'null'

        value_list = list(json_dict.values())

        return Row(str(value_list[0]), str(value_list[1]), str(value_list[2]), str(value_list[3]), str(value_list[4]), str(value_list[5]), str(value_list[6]))
    except Exception as e:
        print('data is not JSON Format: ' + str(value))
        dlq_producer.send('dlq_topic', value=value)
        print('msg sent to ms_equity_dlq_topic')
        return Row(None, None, None, None, None, None, None)

I am trying to avoid using Row(None, None, None, None, None, None, None) and find more Generic way to filter this



Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source